数据 striping

在计算机数据存储中,数据条带化是分割逻辑顺序数据(例如文件)的技术,以便连续的段存储在不同的物理存储设备上。

当处理设备比单个存储设备提供数据更快地请求数据时,条带化非常有用。 通过在可以同时访问的多个设备上分布段,增加了总数据吞吐量。

它也是平衡磁盘阵列中I/O负载的有用方法。

条带化用于独立磁盘冗余阵列(RAID)存储中的磁盘驱动器,网络接口控制器,集群文件系统中的不同计算机和面向网格的存储,以及某些系统中的RAM。

Striped64

JDK 8 的 java.util.concurrent.atomic 下有一个包本地的类 Striped64 ,它持有常见表示和机制用于类支持动态 striping 到 64bit 值上。

设计思路

这个类维护一个延迟初始的、原子地更新值的表,加上额外的 “base” 字段。表的大小是 2 的幂。索引使用每线程的哈希码来masked。这个的几乎所有声明都是包私有的,通过子类直接访问。

表的条目是 Cell 类,一个填充过(通过 sun.misc.Contended )的 AtomicLong 的变体,用于减少缓存竞争。填充对于多数 Atomics 是过度杀伤的,因为它们一般不规则地分布在内存里,因此彼此间不会有太多冲突。但存在于数组的原子对象将倾向于彼此相邻地放置,因此将通常共享缓存行(对 性能有巨大的副作用),在没有这个防备下。

部分地,因为Cell相对比较大,我们避免创建它们直到需要时。当没有竞争时,所有的更新都作用到 base 字段。根据第一次竞争(更新 base 的 CAS 失败),表被初始化为大小 2。表的大小根据更多的竞争加倍,直到大于或等于CPU数量的最小的 2 的幂。表的槽在它们需要之前保持空。

一个单独的自旋锁(“cellsBusy”)用于初始化和resize表,还有用新的Cell填充槽。不需要阻塞锁,当锁不可得,线程尝试其他槽(或 base)。在这些重试中,会增加竞争和减少本地性,这仍然好于其他选择。

通过 ThreadLocalRandom 维护线程探针字段,作为每线程的哈希码。我们让它们为 0 来保持未初始化直到它们在槽 0 竞争。然后初始化它们为通常不会互相冲突的值。当执行更新操作时,竞争和/或表冲突通过失败了的 CAS 来指示。根据冲突,如果表的大小小于容量,它的大小加倍,除非有些线程持有了锁。如果一个哈希后的槽是空的,且锁可得,创建新的Cell。否则,如果槽存 在,重试CAS。重试通过 “重散列,double hashing” 来继续,使用一个次要的哈希算法(Marsaglia XorShift)来尝试找到一个自由槽位。

表的大小是有上限的,因为,当线程数多于CPU数时,假如每个线程绑定到一个CPU上,存在一个完美的哈希函数映射线程到槽上,消除了冲突。当我们 到达容量,我们随机改变碰撞线程的哈希码搜索这个映射。因为搜索是随机的,冲突只能通过CAS失败来知道,收敛convergence 是慢的,因为线程通常不会一直绑定到CPU上,可能根本不会发生。然而,尽管有这些限制,在这些案例下观察到的竞争频率显著地低。

当哈希到特定 Cell 的线程终止后,Cell 可能变为空闲的,表加倍后导致没有线程哈希到扩展的 Cell 也会出现这种情况。我们不尝试去检测或移除这些 Cell,在实例长期运行的假设下,观察到的竞争水平将重现,所以 Cell 将最终被再次需要。对于短期存活的实例,这没关系。

设计思路小结

striping和缓存行填充:通过把类数据 striping 为 64bit 的片段,使数据成为缓存行友好的,减少CAS竞争。

分解表示:对于一个数字 5,可以分解为一序列数的和:2 + 3,这个数字加 1 也等价于它的分解序列中的任一 数字加 1:5 + 1 = 2 + (3 + 1)。

通过把分解序列存放在表里面,表的条目都是填充后的 Cell;限制表的大小为 2 的幂,则可以用掩码来实现索引;同时把表的大小限制为大于等于CPU数量的最小的 2 的幂。

当表的条目上出现竞争时,在到达容量前表扩容一倍,通过增加条目来减少竞争。

伪共享(False sharing)和cpu缓存行

LongAdder类继承Striped64类,看下Striped64类的一个变量cells:

  [java]
1
2
3
4
/** * Table of cells. When non-null, size is a power of 2. */ transient volatile Cell[] cells;

Cell数组即为存储分割后的每个long值;看下Cell类的定义,

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static final class Cell { volatile long p0, p1, p2, p3, p4, p5, p6; volatile long value; volatile long q0, q1, q2, q3, q4, q5, q6; Cell(long x) { value = x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } }

刚看到这个类代码肯定会很奇怪,为什么里面定义了这些没有用到的变量p0, p1, p2, p3, p4, p5, p6。这个就引出了伪共享(False sharing)和cpu缓存行;

CPU CACHE

网上关于这两个方面的文章也很多

7个示例科普CPU CACHE

Cache Line

CPU不是按单个bytes来读取内存数据的,而是以“块数据”的形式,每块的大小通常为64bytes,这些“块”被成为“Cache Line”(这种说法其实很不太正确,关于Cache Line的知识请参考文末的参考链接)

如果有两个线程(Thread1 和 Thread2)同时修改一个volatile数据,把这个数据记为’x’:

volatile long x;

如果线程1打算更改x的值,而线程2准备读取:

  [plaintext]
1
2
Thread1:x=3; Thread2: System.out.println(x);

由于x值被更新了,所以x值需要在线程1和线程2之间传递(从线程1到线程2),x的变更会引起整块64bytes被交换,因为cpu核之间以cache lines的形式交换数据(cache lines的大小一般为64bytes)。有可能线程1和线程2在同一个核心里处理,但是在这个简单的例子中我们假设每个线程在不同的核中被处理。

我们知道long values的内存长度为8bytes,在我们例子中”Cache Line”为64bytes,所以一个cache line可以存储8个long型变量,在cache line中已经存储了一个long型变量x,我们假设cache line中剩余的空间用来存储了7个long型变量,

例如从v1到v7

  [plaintext]
1
x,v1,v2,v3,v4,v5,v6,v7

False Sharing

一个cache lien可以被多个不同的线程所使用。如果有其他线程修改了v2的值,线程1和线程2将会强制重新加载cache line。你可以会疑惑我们只是修改了v2的值不应该会影响其他变量,为啥线程1和线程2需要重新加载cache line呢。然后,即使对于多个线程来说这些更新操作是逻辑独立的,但是一致性的保持是以cache line为基础的,而不是以单个独立的元素。

这种明显没有必要的共享数据的方式被称作“False sharing”.

Padding

为了获取一个cache line,核心需要执行几百个指令。

如果核心需要等待一个cache line重新加载,核心将会停止做其他事情,这种现象被称为”Stall”。

Stalls可以通过减少“False Sharing”,一个减少”false sharing”的技巧是填充数据结构,使得线程操作的变量落入到不同的cache line中。

下面是一个填充了的数据结构的例子,尝试着把x和v1放入到不同的cache line中

  [java]
1
2
3
4
5
6
7
8
9
10
11
public class FalseSharingWithPadding { public volatile long x; public volatile long p2; // padding public volatile long p3; // padding public volatile long p4; // padding public volatile long p5; // padding public volatile long p6; // padding public volatile long p7; // padding public volatile long p8; // padding public volatile long v1; }

在你准备填充你的所有数据结构之前,你必须了解jvm会减少或者重排序没有使用的字段,因此可能会重新引入“false sharing”。因此对象会在堆中的位置是没有办法保证的。

为了减少未使用的填充字段被优化掉的机会,将这些字段设置成为volatile会很有帮助。对于填充的建议是你只需要在高度竞争的并发类上使用填充,并且在你的目标架构上测试使用有很大提升之后采用填充。最好的方式是做10000玄幻迭代,消除JVM的实时优化的影响。

java 8中引入了一个新注解 @Contented, 主要是用来减少“False sharing”,在你需要避免“false sharing”的字段上标记注解,这可以暗示虚拟机“这个字段可以分离到不同的cache line中”,所以LongAdder在java8中的实现已经采用了 @Contended

Cell 类

Cell 类是 Striped64 的静态内部类。通过注解 @sun.misc.Contended 来自动实现缓存行填充,让Java编译器和JRE运行时来决定如何填充。

本质上是一个填充了的、提供了CAS更新的volatile变量。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } }

Striped64

Striped64 通过一个 Cell 数组维持了一序列分解数的表示,通过 base 字段维持数的初始值,通过 cellsBusy 字段来控制 resing 和/或 创建Cell。

它还提供了对数进行累加的机制。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
abstract class Striped64 extends Number { static final int NCPU = Runtime.getRuntime().availableProcessors(); // 存放 Cell 的表。当不为空时大小是 2 的幂。 transient volatile Cell[] cells; // base 值,在没有竞争时使用,也作为表初始化竞争时的一个后备。 transient volatile long base; // 自旋锁,在 resizing 和/或 创建Cell时使用。 transient volatile int cellsBusy; }

累加机制 longAccumulate

设计思路里针对机制的实现,核心逻辑。该方法处理涉及初始化、resing、创建新cell、和/或竞争的更新。

逻辑如下:

if 表已初始化

  [java]
1
2
3
4
5
6
7
8
9
10
if 映射到的槽是空的加锁后再次判断如果仍然是空的初始化cell并关联到槽 else if 槽不为空在槽上之前的CAS已经失败重试 else if 槽不为空且之前的CAS没失败,)在此槽的cell上尝试更新 else if 表已达到容量上限或被扩容了重试 else if 如果不存在冲突则设置为存在冲突重试 else if 如果成功获取到锁则扩容 else 重散列尝试其他槽 else if 锁空闲且获取锁成功初始化表 else if 回退 base 上更新且成功则退出 else 继续

源码

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0) { // 未初始化的 ThreadLocalRandom.current(); // 强制初始化 h = getProbe(); wasUncontended = true; } // 最后的槽不为空则 true,也用于控制扩容,false重试。 boolean collide = false; for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { // 表已经初始化 if ((a = as[(n - 1) & h]) == null) { // 线程所映射到的槽是空的。 if (cellsBusy == 0) { // 尝试关联新的Cell // 锁未被使用,乐观地创建并初始化cell。 Cell r = new Cell(x); if (cellsBusy == 0 && casCellsBusy()) { // 锁仍然是空闲的、且成功获取到锁 boolean created = false; try { // 在持有锁时再次检查槽是否空闲。 Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { // 所映射的槽仍为空 rs[j] = r; // 关联 cell 到槽 created = true; } } finally { cellsBusy = 0; // 释放锁 } if (created) break; // 成功创建cell并关联到槽,退出 continue; // 槽现在不为空了 } } // 锁被占用了,重试 collide = false; } // 槽被占用了 else if (!wasUncontended) // 已经知道 CAS 失败 wasUncontended = true; // 在重散列后继续 // 在当前槽的cell上尝试更新 else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // 表大小达到上限或扩容了; // 表达到上限后就不会再尝试下面if的扩容了,只会重散列,尝试其他槽 else if (n >= NCPU || cells != as) collide = false; // At max size or stale // 如果不存在冲突,则设置为存在冲突 else if (!collide) collide = true; // 有竞争,需要扩容 else if (cellsBusy == 0 && casCellsBusy()) { // 锁空闲且成功获取到锁 try { if (cells == as) { // 距上一次检查后表没有改变,扩容:加倍 Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; // 释放锁 } collide = false; continue; // 在扩容后的表上重试 } // 没法获取锁,重散列,尝试其他槽 h = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // 加锁的情况下初始化表 boolean init = false; try { // Initialize table if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; // 释放锁 } if (init) break; // 成功初始化,已更新,跳出循环 } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) // 表未被初始化,可能正在初始化,回退使用 base。 break; // 回退到使用 base } }

拓展阅读

参考资料

LongAdder类学习小结

Java8 Striped64 和 LongAdder