数据 striping

在计算机数据存储中,数据条带化是分割逻辑顺序数据(例如文件)的技术,以便连续的段存储在不同的物理存储设备上。

当处理设备比单个存储设备提供数据更快地请求数据时,条带化非常有用。 通过在可以同时访问的多个设备上分布段,增加了总数据吞吐量。

它也是平衡磁盘阵列中I/O负载的有用方法。

条带化用于独立磁盘冗余阵列(RAID)存储中的磁盘驱动器,网络接口控制器,集群文件系统中的不同计算机和面向网格的存储,以及某些系统中的RAM。

Striped64

JDK 8 的 java.util.concurrent.atomic 下有一个包本地的类 Striped64 ,它持有常见表示和机制用于类支持动态 striping 到 64bit 值上。

设计思路

这个类维护一个延迟初始的、原子地更新值的表,加上额外的 “base” 字段。表的大小是 2 的幂。索引使用每线程的哈希码来masked。这个的几乎所有声明都是包私有的,通过子类直接访问。

表的条目是 Cell 类,一个填充过(通过 sun.misc.Contended )的 AtomicLong 的变体,用于减少缓存竞争。填充对于多数 Atomics 是过度杀伤的,因为它们一般不规则地分布在内存里,因此彼此间不会有太多冲突。但存在于数组的原子对象将倾向于彼此相邻地放置,因此将通常共享缓存行(对 性能有巨大的副作用),在没有这个防备下。

部分地,因为Cell相对比较大,我们避免创建它们直到需要时。当没有竞争时,所有的更新都作用到 base 字段。根据第一次竞争(更新 base 的 CAS 失败),表被初始化为大小 2。表的大小根据更多的竞争加倍,直到大于或等于CPU数量的最小的 2 的幂。表的槽在它们需要之前保持空。

一个单独的自旋锁(“cellsBusy”)用于初始化和resize表,还有用新的Cell填充槽。不需要阻塞锁,当锁不可得,线程尝试其他槽(或 base)。在这些重试中,会增加竞争和减少本地性,这仍然好于其他选择。

通过 ThreadLocalRandom 维护线程探针字段,作为每线程的哈希码。我们让它们为 0 来保持未初始化直到它们在槽 0 竞争。然后初始化它们为通常不会互相冲突的值。当执行更新操作时,竞争和/或表冲突通过失败了的 CAS 来指示。根据冲突,如果表的大小小于容量,它的大小加倍,除非有些线程持有了锁。如果一个哈希后的槽是空的,且锁可得,创建新的Cell。否则,如果槽存 在,重试CAS。重试通过 “重散列,double hashing” 来继续,使用一个次要的哈希算法(Marsaglia XorShift)来尝试找到一个自由槽位。

表的大小是有上限的,因为,当线程数多于CPU数时,假如每个线程绑定到一个CPU上,存在一个完美的哈希函数映射线程到槽上,消除了冲突。当我们 到达容量,我们随机改变碰撞线程的哈希码搜索这个映射。因为搜索是随机的,冲突只能通过CAS失败来知道,收敛convergence 是慢的,因为线程通常不会一直绑定到CPU上,可能根本不会发生。然而,尽管有这些限制,在这些案例下观察到的竞争频率显著地低。

当哈希到特定 Cell 的线程终止后,Cell 可能变为空闲的,表加倍后导致没有线程哈希到扩展的 Cell 也会出现这种情况。我们不尝试去检测或移除这些 Cell,在实例长期运行的假设下,观察到的竞争水平将重现,所以 Cell 将最终被再次需要。对于短期存活的实例,这没关系。

设计思路小结

striping和缓存行填充:通过把类数据 striping 为 64bit 的片段,使数据成为缓存行友好的,减少CAS竞争。

分解表示:对于一个数字 5,可以分解为一序列数的和:2 + 3,这个数字加 1 也等价于它的分解序列中的任一 数字加 1:5 + 1 = 2 + (3 + 1)。

通过把分解序列存放在表里面,表的条目都是填充后的 Cell;限制表的大小为 2 的幂,则可以用掩码来实现索引;同时把表的大小限制为大于等于CPU数量的最小的 2 的幂。

当表的条目上出现竞争时,在到达容量前表扩容一倍,通过增加条目来减少竞争。

伪共享(False sharing)和cpu缓存行

LongAdder类继承Striped64类,看下Striped64类的一个变量cells:

/**
  * Table of cells. When non-null, size is a power of 2.
  */
transient volatile Cell[] cells;

Cell数组即为存储分割后的每个long值;看下Cell类的定义,

static final class Cell {
    volatile long p0, p1, p2, p3, p4, p5, p6;
    volatile long value;
    volatile long q0, q1, q2, q3, q4, q5, q6;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }
    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
        try {
            UNSAFE = getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

刚看到这个类代码肯定会很奇怪,为什么里面定义了这些没有用到的变量p0, p1, p2, p3, p4, p5, p6。这个就引出了伪共享(False sharing)和cpu缓存行;

CPU CACHE

网上关于这两个方面的文章也很多

7个示例科普CPU CACHE

Cache Line

CPU不是按单个bytes来读取内存数据的,而是以“块数据”的形式,每块的大小通常为64bytes,这些“块”被成为“Cache Line”(这种说法其实很不太正确,关于Cache Line的知识请参考文末的参考链接)

如果有两个线程(Thread1 和 Thread2)同时修改一个volatile数据,把这个数据记为’x’:

volatile long x;

如果线程1打算更改x的值,而线程2准备读取:

Thread1:x=3;
Thread2: System.out.println(x);

由于x值被更新了,所以x值需要在线程1和线程2之间传递(从线程1到线程2),x的变更会引起整块64bytes被交换,因为cpu核之间以cache lines的形式交换数据(cache lines的大小一般为64bytes)。有可能线程1和线程2在同一个核心里处理,但是在这个简单的例子中我们假设每个线程在不同的核中被处理。

我们知道long values的内存长度为8bytes,在我们例子中”Cache Line”为64bytes,所以一个cache line可以存储8个long型变量,在cache line中已经存储了一个long型变量x,我们假设cache line中剩余的空间用来存储了7个long型变量,

例如从v1到v7

x,v1,v2,v3,v4,v5,v6,v7

False Sharing

一个cache lien可以被多个不同的线程所使用。如果有其他线程修改了v2的值,线程1和线程2将会强制重新加载cache line。你可以会疑惑我们只是修改了v2的值不应该会影响其他变量,为啥线程1和线程2需要重新加载cache line呢。然后,即使对于多个线程来说这些更新操作是逻辑独立的,但是一致性的保持是以cache line为基础的,而不是以单个独立的元素。

这种明显没有必要的共享数据的方式被称作“False sharing”.

Padding

为了获取一个cache line,核心需要执行几百个指令。

如果核心需要等待一个cache line重新加载,核心将会停止做其他事情,这种现象被称为”Stall”。

Stalls可以通过减少“False Sharing”,一个减少”false sharing”的技巧是填充数据结构,使得线程操作的变量落入到不同的cache line中。

下面是一个填充了的数据结构的例子,尝试着把x和v1放入到不同的cache line中

public class FalseSharingWithPadding {
    public volatile long x; 
    public volatile long p2;   // padding 
    public volatile long p3;   // padding 
    public volatile long p4;   // padding 
    public volatile long p5;   // padding 
    public volatile long p6;   // padding 
    public volatile long p7;   // padding 
    public volatile long p8;   // padding 
    public volatile long v1; 
}

在你准备填充你的所有数据结构之前,你必须了解jvm会减少或者重排序没有使用的字段,因此可能会重新引入“false sharing”。因此对象会在堆中的位置是没有办法保证的。

为了减少未使用的填充字段被优化掉的机会,将这些字段设置成为volatile会很有帮助。对于填充的建议是你只需要在高度竞争的并发类上使用填充,并且在你的目标架构上测试使用有很大提升之后采用填充。最好的方式是做10000玄幻迭代,消除JVM的实时优化的影响。

java 8中引入了一个新注解 @Contented, 主要是用来减少“False sharing”,在你需要避免“false sharing”的字段上标记注解,这可以暗示虚拟机“这个字段可以分离到不同的cache line中”,所以LongAdder在java8中的实现已经采用了 @Contended

Cell 类

Cell 类是 Striped64 的静态内部类。通过注解 @sun.misc.Contended 来自动实现缓存行填充,让Java编译器和JRE运行时来决定如何填充。

本质上是一个填充了的、提供了CAS更新的volatile变量。

@sun.misc.Contended static final class Cell {
     volatile long value;
     Cell(long x) { value = x; }
     final boolean cas(long cmp, long val) {
          return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
     }

     // Unsafe mechanics
     private static final sun.misc.Unsafe UNSAFE;
     private static final long valueOffset;
     static {
          try {
               UNSAFE = sun.misc.Unsafe.getUnsafe();
               Class<?> ak = Cell.class;
               valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
          } catch (Exception e) {
               throw new Error(e);
          }
     }
}

Striped64

Striped64 通过一个 Cell 数组维持了一序列分解数的表示,通过 base 字段维持数的初始值,通过 cellsBusy 字段来控制 resing 和/或 创建Cell。

它还提供了对数进行累加的机制。

abstract class Striped64 extends Number {
    static final int NCPU = Runtime.getRuntime().availableProcessors();

     // 存放 Cell 的表。当不为空时大小是 2 的幂。
    transient volatile Cell[] cells;

     // base 值,在没有竞争时使用,也作为表初始化竞争时的一个后备。
    transient volatile long base;

     // 自旋锁,在 resizing 和/或 创建Cell时使用。
    transient volatile int cellsBusy;
}

累加机制 longAccumulate

设计思路里针对机制的实现,核心逻辑。该方法处理涉及初始化、resing、创建新cell、和/或竞争的更新。

逻辑如下:

if 表已初始化

if 映射到的槽是空的,加锁后再次判断,如果仍然是空的,初始化cell并关联到槽。
else if (槽不为空)在槽上之前的CAS已经失败,重试。
else if (槽不为空、且之前的CAS没失败,)在此槽的cell上尝试更新
else if 表已达到容量上限或被扩容了,重试。
else if 如果不存在冲突,则设置为存在冲突,重试。
else if 如果成功获取到锁,则扩容。
else 重散列,尝试其他槽。
else if 锁空闲且获取锁成功,初始化表
else if 回退 base 上更新且成功则退出
else 继续

源码

final void longAccumulate(long x, LongBinaryOperator fn,
                                boolean wasUncontended) {
     int h;
     if ((h = getProbe()) == 0) {
          // 未初始化的
          ThreadLocalRandom.current(); // 强制初始化
          h = getProbe();
          wasUncontended = true;
     }

     // 最后的槽不为空则 true,也用于控制扩容,false重试。
     boolean collide = false;
     for (;;) {
          Cell[] as; Cell a; int n; long v;
          if ((as = cells) != null && (n = as.length) > 0) {
               // 表已经初始化

               if ((a = as[(n - 1) & h]) == null) {
                    // 线程所映射到的槽是空的。

                    if (cellsBusy == 0) {       // 尝试关联新的Cell

                         // 锁未被使用,乐观地创建并初始化cell。
                         Cell r = new Cell(x);

                         if (cellsBusy == 0 && casCellsBusy()) {
                              // 锁仍然是空闲的、且成功获取到锁

                              boolean created = false;
                              try {               // 在持有锁时再次检查槽是否空闲。
                                   Cell[] rs; int m, j;
                                   if ((rs = cells) != null &&
                                        (m = rs.length) > 0 &&
                                        rs[j = (m - 1) & h] == null) {
                                        // 所映射的槽仍为空

                                        rs[j] = r;          // 关联 cell 到槽
                                        created = true;
                                   }
                              } finally {
                                   cellsBusy = 0;     // 释放锁
                              }
                              if (created)
                                   break;               // 成功创建cell并关联到槽,退出
                              continue;           // 槽现在不为空了
                         }
                    }
                    // 锁被占用了,重试
                    collide = false;
               }
               // 槽被占用了

               else if (!wasUncontended)       // 已经知道 CAS 失败
                    wasUncontended = true;      // 在重散列后继续

               // 在当前槽的cell上尝试更新
               else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                                  fn.applyAsLong(v, x))))
                    break;

               // 表大小达到上限或扩容了;
               // 表达到上限后就不会再尝试下面if的扩容了,只会重散列,尝试其他槽
               else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale

               //  如果不存在冲突,则设置为存在冲突
               else if (!collide)
                    collide = true;

               // 有竞争,需要扩容
               else if (cellsBusy == 0 && casCellsBusy()) {
                    // 锁空闲且成功获取到锁
                    try {
                         if (cells == as) {      // 距上一次检查后表没有改变,扩容:加倍
                              Cell[] rs = new Cell[n << 1];
                              for (int i = 0; i < n; ++i)
                                   rs[i] = as[i];
                              cells = rs;
                         }
                    } finally {
                         cellsBusy = 0;     // 释放锁
                    }
                    collide = false;
                    continue;                   // 在扩容后的表上重试
               }

               // 没法获取锁,重散列,尝试其他槽
               h = advanceProbe(h);
          }
          else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
               // 加锁的情况下初始化表

               boolean init = false;
               try {                           // Initialize table
                    if (cells == as) {
                         Cell[] rs = new Cell[2];
                         rs[h & 1] = new Cell(x);
                         cells = rs;
                         init = true;
                    }
               } finally {
                    cellsBusy = 0;     // 释放锁
               }
               if (init)
                    break;     // 成功初始化,已更新,跳出循环
          }
          else if (casBase(v = base, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
               // 表未被初始化,可能正在初始化,回退使用 base。
               break;                          // 回退到使用 base
     }
}

拓展阅读

参考资料

LongAdder类学习小结

Java8 Striped64 和 LongAdder