锁专题(12)高并发进阶 Exchanger 双方栅栏源码深度解析
Exchanger
简介
有时我们须要对元素进行配对和交换线程的同步点,使用 exchange 方法返回其伙伴的对象,这时我们就须要使用线程类中的 Exchanger 类了,
简而言之,可以在不同线程间交换数据。
使用入门
废话少说,直接上代码。
定义执行类
private static class ExchangeRunnable implements Runnable {
private final Exchanger exchanger;
private final String data;
private ExchangeRunnable(Exchanger exchanger, String data) {
this.exchanger = exchanger;
this.data = data;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() +" 正在把数据 "+ data + " 交换出去" );
Thread.sleep((long) (Math.random()*1000));
String data2 = exchanger.exchange(data);
System.out.println(Thread.currentThread().getName() + " 交换数据到 "+ data2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
我们定义一个 Runnable 类,会将传入的 data 已经交换,并打印获取到的数据。
测试
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
final Exchanger exchanger = new Exchanger<>();
executor.execute(new ExchangeRunnable(exchanger, "one"));
executor.execute(new ExchangeRunnable(exchanger, "two"));
executor.shutdown();
}
我们使用线程池执行数据交换测试,日志如下:
pool-1-thread-1 正在把数据 one 交换出去
pool-1-thread-2 正在把数据 two 交换出去
pool-1-thread-1 交换数据到 two
pool-1-thread-2 交换数据到 one
可以看到两个线程的数据已经发生了交换。
这么神奇,到底是如何实现的呢?
感兴趣的小伙伴可以一起来阅读以下源码。
源码解析
类定义
/**
* @since 1.5
* @author Doug Lea and Bill Scherer and Michael Scott
* @param The type of objects that may be exchanged
*/
public class Exchanger {
}
这个类是在 jdk1.5 引入的。
算法笔记
ps: 这里是作者的算法笔记,不出现在 doc 中,主要是便于大家理解。内容较多,可以跳过。阅读完源码后,结合起来看。
概述:对于交换“槽(slot)”,核心算法是一个参与者和一个项目(呼叫者):
for (;;) {
if (slot is empty) { // offer
place item in a Node;
if (can CAS slot from empty to node) {
wait for release;
return matching item in node;
}
}
else if (can CAS slot from node to empty) { // release
get the item in node;
set matching item in node;
release waiting thread;
}
// else retry on CAS failure
}
这是“双重数据结构”(dual data structure)的最简单形式之一-参见Scott和Scherer的DISC 04论文和
http://www.cs.rochester.edu/research/synchronization/pseudocode/duals.html
原则上,这很好。
但是实际上,就像许多集中于单个位置的原子更新的算法一样,当使用同一个Exchanger的参与者多于几个时,它会可怕地扩展。
因此,该实现改为使用消除域的形式,该域通过安排某些线程通常使用不同的插槽来扩展此争用,同时仍确保最终任何两个参与方都能够交换项目。
也就是说,我们不能完全在线程之间进行分区,而是给线程提供竞技场索引,这些索引在争用情况下平均会增长,在缺乏争用的情况下会缩小。
我们通过将我们仍然需要的节点定义为ThreadLocals来实现这一点,并在其中包括每个线程的索引和相关的簿记状态。
(我们可以安全地重用每个线程的节点,而不必每次都重新创建它们,因为插槽在指向节点与空节点之间交替出现,因此不会遇到ABA问题。但是,在每次使用之间重置它们时,我们确实需要谨慎。)
实施有效的竞技场需要分配一堆空间,因此我们仅在检测到争用时这样做(单处理器除外,在单处理器上它们将无济于事,因此不会使用)。
否则,交换使用单槽slotExchange方法。
在争用时,插槽不仅必须位于不同的位置,而且由于位于同一高速缓存行(或更常见的是,相同的一致性单元),这些位置也不能遇到内存争用。
因为在撰写本文时,尚无法确定高速缓存行的大小,所以我们定义了一个足以满足通用平台的值。
此外,在其他地方也要格外小心,以避免其他错误/意外共享并增强位置,包括向节点添加填充(通过sun.misc.Contended),将“ bound”作为Exchanger字段嵌入,以及重新处理比较的某些 park/unpark 机制 到LockSupport版本。
竞技场(arena)仅以一个已使用的插槽开始。
我们通过跟踪碰撞来扩大有效竞技场的规模; 即尝试交换时失败了。
根据上述算法的性质,唯一能够可靠地表明竞争的冲突是两种尝试的释放发生冲突时-两种尝试的提议中的一种可以合法地导致CAS失败,而没有其他多个线程指示争用。
(注意:有可能但不值得通过在CAS故障后读取插槽值来更精确地检测竞争。)
当线程在当前竞技场边界内的每个插槽处发生冲突时,它将尝试将竞技场大小扩大一倍。
我们通过在“bound”字段上使用版本(序列)编号来跟踪边界内的冲突,并在参与者注意到边界已更新(沿任一方向)时保守地重置冲突计数。
通过在一段时间后放弃等待并在到期时尝试减小竞技场的大小,可以减小有效竞技场的大小(当有多个插槽时)。
“一会儿”的值是一个经验问题。
我们通过附带使用 spin-> yield-> block
来实现,这对于获得合理的等待性能是必不可少的-在繁忙的交换器中,offers 通常几乎立即发布,在这种情况下,在多处理器上进行上下文切换非常缓慢/浪费。
Arena 等待,只是省略了阻塞部分,用于替代取消了。
根据经验,将自旋计数选择为一个值,该值可避免在一系列测试机上以最大持续汇率兑换99%的时间。
自旋和产量需要一定程度的随机性(使用廉价的xorshift),以避免可能导致无效的生长/收缩周期的规则模式。
(使用伪随机还可以通过使分支不可预测来帮助调整旋转周期的持续时间。)
另外,在要约期间,服务员可以“知道”在插槽更改后将被释放,但是直到设置了匹配项之后才能进行。
同时,它不能取消要约(cancel the offer),用来替代 spins/yields。
注意:可以通过将线性化点更改为match字段的CAS(如Scott&Scherer DISC论文中的一种情况)来避免这种二次检查,这也会增加异步性,但代价是更差冲突检测以及无法始终重用每个线程节点。
因此,当前方案通常是更好的折衷方案。
发生碰撞时,索引会以相反的顺序循环遍历竞技场,并在范围更改时以最大索引(趋向于最稀疏)重新开始。
(在到期时,索引减半直到达到0。)
可以(并已尝试)使用随机,素值步进或双哈希样式遍历,而不是简单的循环遍历,以减少聚集。
但是从经验上讲,这些好处可能无法克服其增加的开销:
除非存在持续的争用,否则我们将对发生的操作进行快速管理,因此,较简单/较快的控制策略比较准确但较慢的控制策略更有效。
因为我们将到期时间用于竞技场大小控制,所以在竞技场大小缩小到零(或者未启用竞技场)之前,我们无法在定时版本的公共交换方法中引发TimeoutExceptions。
这可能会延迟对超时的响应,但仍在规范范围内。
本质上,所有实现都在slotExchange和arenaExchange方法中。
它们具有相似的总体结构,但是在太多细节上无法组合。
slotExchange方法使用单个Exchanger字段“slot”,而不是竞技场数组元素。
但是,它仍然需要最少的碰撞检测来触发竞技场的建设。
(最混乱的部分是确保在两种方法都可能被调用时在过渡期间正确显示中断状态和InterruptedExceptions。这是通过将null返回作为哨兵来重新检查中断状态来完成的。)
在这种代码中太普遍了,方法是单块的,因为大多数逻辑依赖于字段的读取,这些字段作为局部变量维护,因此无法很好地进行分解-主要是在这里,笨重的 spin-> yield-> block/cancel
代码),并且在很大程度上依赖于内在函数(不安全)来使用内联嵌入式CAS和相关的内存访问操作(当动态编译器隐藏在其他方法后面时,动态编译器通常不会内联它们,因为它们可以更好地命名和封装该方法)预期的效果)。
这包括使用putOrderedX来清除两次使用之间每个线程节点的字段。
请注意,即使通过释放线程读取Node.item字段,也不会将其声明为volatile,因为它们仅在必须进行访问的CAS操作之后才声明为volatile,并且其他线程可以接受地接受拥有线程的所有使用。
(由于原子性的实际点是插槽CASes,所以在发行版中对Node.match的写入要弱于完全易失性写入,这也是合法的。但是,之所以不这样做,是因为它可能允许进一步推迟写入,延迟进度。)
平平无奇的内部变量
我们一起来看几个平平无奇的内部变量:
/**
* 竞技场(arena)上任何两个使用的插槽之间的字节距离(作为移位值)。
*
* 1 = (MMASK >> 1;
/**
* 等待比赛时旋转的界限。
* 由于随机化,实际的实际迭代次数平均约为该值的两倍。 注意:当NCPU == 1时,禁用旋转。
*/
private static final int SPINS = 1 {
public Node initialValue() { return new Node(); }
}
@sun.misc.Contended
这个注解是干什么的?
这个主要是用来避免伪共享的。
这里先简单的解释一下。
伪共享
伪共享,高速缓存与内存之间是以缓存行为单位交换数据的,根据局部性原理,相邻地址空间的数据会被加载到高速缓存的同一个数据块上(缓存行),而数组是连续的(逻辑,涉及到虚拟内存)内存地址空间,因此,多个slot会被加载到同一个缓存行上,当一个slot改变时,会导致这个slot所在的缓存行上所有的数据(包括其他的slot)无效,需要从内存重新加载,影响性能。
所以,为了避免这种情况,需要填充数据,使得有效的slot不被加载到同一个缓存行上。
填充的大小即为1 ek = Exchanger.class;
Class nk = Node.class;
Class ak = Node[].class;
Class tk = Thread.class;
BOUND = U.objectFieldOffset
(ek.getDeclaredField("bound"));
SLOT = U.objectFieldOffset
(ek.getDeclaredField("slot"));
MATCH = U.objectFieldOffset
(nk.getDeclaredField("match"));
BLOCKER = U.objectFieldOffset
(tk.getDeclaredField("parkBlocker"));
s = U.arrayIndexScale(ak);
// ABASE absorbs padding in front of element 0
ABASE = U.arrayBaseOffset(ak) + (1 (1 1 && bound == 0 &&
U.compareAndSwapInt(this, BOUND, 0, SEQ))
arena = new Node[(FULL + 2) 1) ? SPINS : 1;
Object v;
while ((v = p.match) == null) {
// 自旋,直至spins不大于0
if (spins > 0) {
// 伪随机算法, 目的是等h小于0(随机的)
h ^= h >> 3;
h ^= h >> 1) - 1)) == 0)
// 等到h >> 1)
Thread.yield();
}
else if (slot != p)
// 重置自旋的数量,并重试
spins = SPINS;
// 如果线程没被中断,且arena还没被创建,并且没有超时
else if (!t.isInterrupted() && arena == null &&
(!timed || (ns = end - System.nanoTime()) > 0L)) {
// 设置当前线程将阻塞在当前对象上
U.putObject(t, BLOCKER, this);
// 挂在此结点上的阻塞着的线程
p.parked = t;
if (slot == p)
// 挂起当前线程在 Node 节点,等下一个使用该节点交换的线程唤醒
U.park(false, ns);
// 被唤醒后,清空 parked 信息
p.parked = null;
// 清空对应的阻塞对象
U.putObject(t, BLOCKER, null);
}
// 超时或其他(取消),给其他线程腾出slot
else if (U.compareAndSwapObject(this, SLOT, p, null)) {
v = timed && ns yield -> block
如果超时(设置超时的话)或被中断,则退出循环。
最后,重置数据,下次重用,返回结果,结束。
arenaExchange 方法
/**
* 启用竞技场时交换功能。 请参阅上面算法说明。
*
* @param item 待交换的非 null 元素
* @param timed 如果等待已计时,则为true
* @param ns 如果定时,则为最大等待时间,否则为0L
* @return 另一个线程的项目; 或null(如果被中断); 或TIMED_OUT(如果超时和超时)
*/
private final Object arenaExchange(Object item, boolean timed, long ns) {
// arena 这个变量是在 slotExchange 中初始化的。
Node[] a = arena;
// 获取当前线程的 Node 节点信息
Node p = participant.get();
for (int i = p.index;;) { // access slot at i
int b, m, c; long j; // j is raw array offset
// 从场地中选出偏移地址为(i 0) {
// 自旋,这个和 slotExchange 类似,是一个伪随机。
h ^= h >> 3;
h ^= h >> 1, 一半的概率
else if (h >> 1) - 1)) == 0)
// 每一次等待,有 2 次可以让渡 cpu 的机会
Thread.yield(); // two yields per wait
}
// 别的线程已经到来,正在准备数据,自旋等待
else if (U.getObjectVolatile(a, j) != p)
spins = SPINS; // releaser hasn't set match yet
else if (!t.isInterrupted() && m == 0 &&
(!timed ||
(ns = end - System.nanoTime()) > 0L)) {
//ps: 其实下面的流程和 slotExchange 也差不多,就是等待被唤醒。
// 设置当前线程将阻塞在当前对象上
U.putObject(t, BLOCKER, this); // emulate LockSupport
// 挂在此结点上的阻塞着的线程
p.parked = t; // minimize window
if (U.getObjectVolatile(a, j) == p)
// 阻塞, 等着被唤醒或中断
U.park(false, ns);
// 被唤醒后,清空数据
p.parked = null;
// 解除阻塞对象
U.putObject(t, BLOCKER, null);
}
else if (U.getObjectVolatile(a, j) == p &&
U.compareAndSwapObject(a, j, p, null)) {
// 尝试缩减
if (m != 0) // try to shrink
// 更新bound, 高位递增,低位 -1
U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
// 重置元素
p.item = null;
p.hash = h;
// 索引减半,为的是快速找到汇合点(最左侧)
i = p.index >>>= 1; // descend
// 保留中断状态,以便调用者可以重新检查,Thread.interrupted() 会清除中断状态标记
if (Thread.interrupted())
return null;
// 超时
if (timed && m == 0 && ns 0),索引减半;检查是否中断或超时,如果没有,进入【步骤1】;否则,返回,结束。
9. 检查bound是否发生变化,如果变化了,重置collides,索引重置为m或左移,转向【步骤1】;否则,进入下一步。
10. 检查collides是否达到最大值,如果没有,进入【步骤13】,否则下一步。
11. m是否达到FULL,是,进入【步骤13】;否则,下一步。
12. CAS bound加1是否成功,如果成功,i置为m+1,槽位增长,进入【步骤1】;否则,下一步。
13. collides加1,索引左移,进入【步骤1】
流程图如下:

# 小结
希望本文对你有帮助,如果有其他想法的话,也可以评论区和大家分享哦。
各位**极客**的点赞收藏转发,是老马持续写作的最大动力!
# 参考资料
[Java8使用@sun.misc.Contended避免伪共享](https://www.jianshu.com/p/c3c108c3dcfd)
[【并发工具源码系列】 Exchanger 源码解析](https://blog.csdn.net/weixin_43934607/article/details/109088641)
[【JUC源码解析】Exchanger](https://blog.csdn.net/weixin_30299709/article/details/95088048)
[java线程中Exchanger使用](https://www.cnblogs.com/zhchoutai/p/6819451.html)