数据 striping
在计算机数据存储中,数据条带化是分割逻辑顺序数据(例如文件)的技术,以便连续的段存储在不同的物理存储设备上。
当处理设备比单个存储设备提供数据更快地请求数据时,条带化非常有用。 通过在可以同时访问的多个设备上分布段,增加了总数据吞吐量。
它也是平衡磁盘阵列中I/O负载的有用方法。
条带化用于独立磁盘冗余阵列(RAID)存储中的磁盘驱动器,网络接口控制器,集群文件系统中的不同计算机和面向网格的存储,以及某些系统中的RAM。
Striped64
JDK 8 的 java.util.concurrent.atomic 下有一个包本地的类 Striped64 ,它持有常见表示和机制用于类支持动态 striping 到 64bit 值上。
设计思路
这个类维护一个延迟初始的、原子地更新值的表,加上额外的 “base” 字段。表的大小是 2 的幂。索引使用每线程的哈希码来masked。这个的几乎所有声明都是包私有的,通过子类直接访问。
表的条目是 Cell 类,一个填充过(通过 sun.misc.Contended )的 AtomicLong 的变体,用于减少缓存竞争。填充对于多数 Atomics 是过度杀伤的,因为它们一般不规则地分布在内存里,因此彼此间不会有太多冲突。但存在于数组的原子对象将倾向于彼此相邻地放置,因此将通常共享缓存行(对 性能有巨大的副作用),在没有这个防备下。
部分地,因为Cell相对比较大,我们避免创建它们直到需要时。当没有竞争时,所有的更新都作用到 base 字段。根据第一次竞争(更新 base 的 CAS 失败),表被初始化为大小 2。表的大小根据更多的竞争加倍,直到大于或等于CPU数量的最小的 2 的幂。表的槽在它们需要之前保持空。
一个单独的自旋锁(“cellsBusy”)用于初始化和resize表,还有用新的Cell填充槽。不需要阻塞锁,当锁不可得,线程尝试其他槽(或 base)。在这些重试中,会增加竞争和减少本地性,这仍然好于其他选择。
通过 ThreadLocalRandom 维护线程探针字段,作为每线程的哈希码。我们让它们为 0 来保持未初始化直到它们在槽 0 竞争。然后初始化它们为通常不会互相冲突的值。当执行更新操作时,竞争和/或表冲突通过失败了的 CAS 来指示。根据冲突,如果表的大小小于容量,它的大小加倍,除非有些线程持有了锁。如果一个哈希后的槽是空的,且锁可得,创建新的Cell。否则,如果槽存 在,重试CAS。重试通过 “重散列,double hashing” 来继续,使用一个次要的哈希算法(Marsaglia XorShift)来尝试找到一个自由槽位。
表的大小是有上限的,因为,当线程数多于CPU数时,假如每个线程绑定到一个CPU上,存在一个完美的哈希函数映射线程到槽上,消除了冲突。当我们 到达容量,我们随机改变碰撞线程的哈希码搜索这个映射。因为搜索是随机的,冲突只能通过CAS失败来知道,收敛convergence 是慢的,因为线程通常不会一直绑定到CPU上,可能根本不会发生。然而,尽管有这些限制,在这些案例下观察到的竞争频率显著地低。
当哈希到特定 Cell 的线程终止后,Cell 可能变为空闲的,表加倍后导致没有线程哈希到扩展的 Cell 也会出现这种情况。我们不尝试去检测或移除这些 Cell,在实例长期运行的假设下,观察到的竞争水平将重现,所以 Cell 将最终被再次需要。对于短期存活的实例,这没关系。
设计思路小结
striping和缓存行填充:通过把类数据 striping 为 64bit 的片段,使数据成为缓存行友好的,减少CAS竞争。
分解表示:对于一个数字 5,可以分解为一序列数的和:2 + 3,这个数字加 1 也等价于它的分解序列中的任一 数字加 1:5 + 1 = 2 + (3 + 1)。
通过把分解序列存放在表里面,表的条目都是填充后的 Cell;限制表的大小为 2 的幂,则可以用掩码来实现索引;同时把表的大小限制为大于等于CPU数量的最小的 2 的幂。
当表的条目上出现竞争时,在到达容量前表扩容一倍,通过增加条目来减少竞争。
伪共享(False sharing)和cpu缓存行
LongAdder类继承Striped64类,看下Striped64类的一个变量cells:
1
2
3
4/**
* Table of cells. When non-null, size is a power of 2.
*/
transient volatile Cell[] cells;
Cell数组即为存储分割后的每个long值;看下Cell类的定义,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22static final class Cell {
volatile long p0, p1, p2, p3, p4, p5, p6;
volatile long value;
volatile long q0, q1, q2, q3, q4, q5, q6;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
刚看到这个类代码肯定会很奇怪,为什么里面定义了这些没有用到的变量p0, p1, p2, p3, p4, p5, p6。这个就引出了伪共享(False sharing)和cpu缓存行;
CPU CACHE
网上关于这两个方面的文章也很多
Cache Line
CPU不是按单个bytes来读取内存数据的,而是以“块数据”的形式,每块的大小通常为64bytes,这些“块”被成为“Cache Line”(这种说法其实很不太正确,关于Cache Line的知识请参考文末的参考链接)
如果有两个线程(Thread1 和 Thread2)同时修改一个volatile数据,把这个数据记为’x’:
volatile long x;
如果线程1打算更改x的值,而线程2准备读取:
1
2Thread1:x=3;
Thread2: System.out.println(x);
由于x值被更新了,所以x值需要在线程1和线程2之间传递(从线程1到线程2),x的变更会引起整块64bytes被交换,因为cpu核之间以cache lines的形式交换数据(cache lines的大小一般为64bytes)。有可能线程1和线程2在同一个核心里处理,但是在这个简单的例子中我们假设每个线程在不同的核中被处理。
我们知道long values的内存长度为8bytes,在我们例子中”Cache Line”为64bytes,所以一个cache line可以存储8个long型变量,在cache line中已经存储了一个long型变量x,我们假设cache line中剩余的空间用来存储了7个long型变量,
例如从v1到v7
1x,v1,v2,v3,v4,v5,v6,v7
False Sharing
一个cache lien可以被多个不同的线程所使用。如果有其他线程修改了v2的值,线程1和线程2将会强制重新加载cache line。你可以会疑惑我们只是修改了v2的值不应该会影响其他变量,为啥线程1和线程2需要重新加载cache line呢。然后,即使对于多个线程来说这些更新操作是逻辑独立的,但是一致性的保持是以cache line为基础的,而不是以单个独立的元素。
这种明显没有必要的共享数据的方式被称作“False sharing”.
Padding
为了获取一个cache line,核心需要执行几百个指令。
如果核心需要等待一个cache line重新加载,核心将会停止做其他事情,这种现象被称为”Stall”。
Stalls可以通过减少“False Sharing”,一个减少”false sharing”的技巧是填充数据结构,使得线程操作的变量落入到不同的cache line中。
下面是一个填充了的数据结构的例子,尝试着把x和v1放入到不同的cache line中
1
2
3
4
5
6
7
8
9
10
11public class FalseSharingWithPadding {
public volatile long x;
public volatile long p2; // padding
public volatile long p3; // padding
public volatile long p4; // padding
public volatile long p5; // padding
public volatile long p6; // padding
public volatile long p7; // padding
public volatile long p8; // padding
public volatile long v1;
}
在你准备填充你的所有数据结构之前,你必须了解jvm会减少或者重排序没有使用的字段,因此可能会重新引入“false sharing”。因此对象会在堆中的位置是没有办法保证的。
为了减少未使用的填充字段被优化掉的机会,将这些字段设置成为volatile会很有帮助。对于填充的建议是你只需要在高度竞争的并发类上使用填充,并且在你的目标架构上测试使用有很大提升之后采用填充。最好的方式是做10000玄幻迭代,消除JVM的实时优化的影响。
java 8中引入了一个新注解 @Contented
, 主要是用来减少“False sharing”,在你需要避免“false sharing”的字段上标记注解,这可以暗示虚拟机“这个字段可以分离到不同的cache line中”,所以LongAdder在java8中的实现已经采用了 @Contended
Cell 类
Cell 类是 Striped64 的静态内部类。通过注解 @sun.misc.Contended
来自动实现缓存行填充,让Java编译器和JRE运行时来决定如何填充。
本质上是一个填充了的、提供了CAS更新的volatile变量。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
Striped64
Striped64 通过一个 Cell 数组维持了一序列分解数的表示,通过 base 字段维持数的初始值,通过 cellsBusy 字段来控制 resing 和/或 创建Cell。
它还提供了对数进行累加的机制。
1
2
3
4
5
6
7
8
9
10
11
12abstract class Striped64 extends Number {
static final int NCPU = Runtime.getRuntime().availableProcessors();
// 存放 Cell 的表。当不为空时大小是 2 的幂。
transient volatile Cell[] cells;
// base 值,在没有竞争时使用,也作为表初始化竞争时的一个后备。
transient volatile long base;
// 自旋锁,在 resizing 和/或 创建Cell时使用。
transient volatile int cellsBusy;
}
累加机制 longAccumulate
设计思路里针对机制的实现,核心逻辑。该方法处理涉及初始化、resing、创建新cell、和/或竞争的更新。
逻辑如下:
if 表已初始化
1
2
3
4
5
6
7
8
9
10if 映射到的槽是空的,加锁后再次判断,如果仍然是空的,初始化cell并关联到槽。
else if (槽不为空)在槽上之前的CAS已经失败,重试。
else if (槽不为空、且之前的CAS没失败,)在此槽的cell上尝试更新
else if 表已达到容量上限或被扩容了,重试。
else if 如果不存在冲突,则设置为存在冲突,重试。
else if 如果成功获取到锁,则扩容。
else 重散列,尝试其他槽。
else if 锁空闲且获取锁成功,初始化表
else if 回退 base 上更新且成功则退出
else 继续
源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
// 未初始化的
ThreadLocalRandom.current(); // 强制初始化
h = getProbe();
wasUncontended = true;
}
// 最后的槽不为空则 true,也用于控制扩容,false重试。
boolean collide = false;
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
// 表已经初始化
if ((a = as[(n - 1) & h]) == null) {
// 线程所映射到的槽是空的。
if (cellsBusy == 0) { // 尝试关联新的Cell
// 锁未被使用,乐观地创建并初始化cell。
Cell r = new Cell(x);
if (cellsBusy == 0 && casCellsBusy()) {
// 锁仍然是空闲的、且成功获取到锁
boolean created = false;
try { // 在持有锁时再次检查槽是否空闲。
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
// 所映射的槽仍为空
rs[j] = r; // 关联 cell 到槽
created = true;
}
} finally {
cellsBusy = 0; // 释放锁
}
if (created)
break; // 成功创建cell并关联到槽,退出
continue; // 槽现在不为空了
}
}
// 锁被占用了,重试
collide = false;
}
// 槽被占用了
else if (!wasUncontended) // 已经知道 CAS 失败
wasUncontended = true; // 在重散列后继续
// 在当前槽的cell上尝试更新
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// 表大小达到上限或扩容了;
// 表达到上限后就不会再尝试下面if的扩容了,只会重散列,尝试其他槽
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
// 如果不存在冲突,则设置为存在冲突
else if (!collide)
collide = true;
// 有竞争,需要扩容
else if (cellsBusy == 0 && casCellsBusy()) {
// 锁空闲且成功获取到锁
try {
if (cells == as) { // 距上一次检查后表没有改变,扩容:加倍
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0; // 释放锁
}
collide = false;
continue; // 在扩容后的表上重试
}
// 没法获取锁,重散列,尝试其他槽
h = advanceProbe(h);
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
// 加锁的情况下初始化表
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0; // 释放锁
}
if (init)
break; // 成功初始化,已更新,跳出循环
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
// 表未被初始化,可能正在初始化,回退使用 base。
break; // 回退到使用 base
}
}