锁专题(9) CyclicBarrier 栅栏源码深度解析
栅栏(Barrier)
简介
栅栏(Barrier)类似于闭锁,它能阻塞一组线程直到某个事件发生。
闭锁是一次性对象,一旦进入最终状态,就不能被重置了。
栅栏与闭锁的关键区别在于,所有线程必须同时达到栅栏位置,才能继续执行。
闭锁用于等待事件,而栅栏用于等待其他线程。
而我再等你点赞。
java 手写并发框架(一)异步查询转同步的7种实现方式 我就用到了 CyclicBarrier 作为其中的实现方式之一,感兴趣的小伙伴可以去看看。
使用场景
栅栏用于实现一些协议,例如几个小伙伴定在某个地方集合:“明天 6:00 在老马家碰头,到了以后要等其他人,之后再讨论去哪里玩。”
CyclicBarrer可以使一定数量的参与方反复地在栅栏位置汇集,它在并行迭代算法中非常有用:这种算法通常将一个问题拆分一些列互相独立的子问题。
如果所有线程都达到了栅栏位置,那么栅栏将打开,为此所有线程都被释放,而栅栏将被重置以便下次使用。如果对await的调用超时,或者await阻塞的线程被中断,那么栅栏就认为是打破了,所有阻塞的await调用都将终止并抛出BrokenBarrerException。
如果成功通过栅栏,那么await将为每个线程返回一个唯一的到达索引号,我们可以利用这些索引来“选举”产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作。
CyclicBarrer还可以使你将一个栅栏操作传递给构造函数,这是Runnable,当成功通过栅栏时会(在一个子任务线程中)执行它,但在阻塞线程被释放之前是不能被执行的。
在模拟程序中通常需要使用栅栏,例如某个步骤中的计算可以并行执行,但必须等到该步骤中的所有计算都执行完成才能进入下一个步骤。
例子
代码实现
我们首先定义一个 Writer 模拟我们需要处理的子任务:
static class Writer extends Thread {
private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据...");
try {
Thread.sleep(5000); //以睡眠来模拟写入数据操作
System.out.println("线程" + Thread.currentThread().getName() + "写入数据完毕,等待其他线程写入完毕");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "所有线程写入完毕,继续处理其他任务...");
}
}
线程通过沉睡 5S 来模拟平时的工作执行耗时,然后使用 cyclicBarrier.await();
等待其他线程执行完成。
最后统一输出全部:所有线程写入完毕,继续处理其他任务...
这个就等价于约定好早晨 6:00 到老马家,各位小伙伴都到了,就可以继续进行后去的任务安排了。
public static void main(String[] args) {
int limit = 4;
CyclicBarrier barrier = new CyclicBarrier(limit);
for (int i = 0; i 0L)
// 通过 Condition.awaitNanos 方法控制超时
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 如果被中断,且 g 没变,也不是 broken 状态,直接调用 breakBarrier();
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
// 如果已经处于 broken 状态,抛出异常。
if (g.broken)
throw new BrokenBarrierException();
// 如果 g 和 generation 不匹配,直接返回 index
if (g != generation)
return index;
// 如果超时时间小于等于0,立刻超时。
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 释放锁
lock.unlock();
}
}
中断栅栏
这里是直接设置 broken 的值为 true,然后唤醒所有等待线程。
/**
* Sets current barrier generation as broken and wakes up everyone.
* Called only while holding lock.
*/
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
nextGeneration 更新 generation
唤醒所有的等待者,并且重新设置 generation。
/**
* Updates state on barrier trip and wakes up everyone.
* Called only while holding lock.
*/
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
小结
CyclicBarrier 作为一个并发的控制工具,和 CountDownLatch 对比个人感觉最大的优势就是可重用。而且多个线程执行完成后,才能继续执行,非常适合多线程任务拆分执行等场景。
希望本文对你有帮助,如果有其他想法的话,也可以评论区和大家分享哦。
各位极客的点赞收藏转发,是老马持续写作的最大动力!
参考资料
jdk 源码