栅栏(Barrier)
简介
栅栏(Barrier)类似于闭锁,它能阻塞一组线程直到某个事件发生。
闭锁是一次性对象,一旦进入最终状态,就不能被重置了。
栅栏与闭锁的关键区别在于,所有线程必须同时达到栅栏位置,才能继续执行。
闭锁用于等待事件,而栅栏用于等待其他线程。
而我再等你点赞。
java 手写并发框架(一)异步查询转同步的7种实现方式 我就用到了 CyclicBarrier 作为其中的实现方式之一,感兴趣的小伙伴可以去看看。
使用场景
栅栏用于实现一些协议,例如几个小伙伴定在某个地方集合:“明天 6:00 在老马家碰头,到了以后要等其他人,之后再讨论去哪里玩。”
CyclicBarrer可以使一定数量的参与方反复地在栅栏位置汇集,它在并行迭代算法中非常有用:这种算法通常将一个问题拆分一些列互相独立的子问题。
如果所有线程都达到了栅栏位置,那么栅栏将打开,为此所有线程都被释放,而栅栏将被重置以便下次使用。如果对await的调用超时,或者await阻塞的线程被中断,那么栅栏就认为是打破了,所有阻塞的await调用都将终止并抛出BrokenBarrerException。
如果成功通过栅栏,那么await将为每个线程返回一个唯一的到达索引号,我们可以利用这些索引来“选举”产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作。
CyclicBarrer还可以使你将一个栅栏操作传递给构造函数,这是Runnable,当成功通过栅栏时会(在一个子任务线程中)执行它,但在阻塞线程被释放之前是不能被执行的。
在模拟程序中通常需要使用栅栏,例如某个步骤中的计算可以并行执行,但必须等到该步骤中的所有计算都执行完成才能进入下一个步骤。
例子
代码实现
我们首先定义一个 Writer 模拟我们需要处理的子任务:
static class Writer extends Thread {
private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据...");
try {
Thread.sleep(5000); //以睡眠来模拟写入数据操作
System.out.println("线程" + Thread.currentThread().getName() + "写入数据完毕,等待其他线程写入完毕");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "所有线程写入完毕,继续处理其他任务...");
}
}
线程通过沉睡 5S 来模拟平时的工作执行耗时,然后使用 cyclicBarrier.await();
等待其他线程执行完成。
最后统一输出全部:所有线程写入完毕,继续处理其他任务…
这个就等价于约定好早晨 6:00 到老马家,各位小伙伴都到了,就可以继续进行后去的任务安排了。
public static void main(String[] args) {
int limit = 4;
CyclicBarrier barrier = new CyclicBarrier(limit);
for (int i = 0; i < limit; i++) {
new Writer(barrier).start();
}
try {
Thread.sleep(25000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("CyclicBarrier重用");
barrier.reset();
for (int i = 0; i < limit; i++) {
new Writer(barrier).start();
}
}
任务的执行,我们这里定义了 4 个需要执行的任务。
并且还演示了 CyclicBarrier 非常方便的一个特性,使用 barrier.reset();
就可以重用,非常符合环保节能可持续发展的精神。
测试日志
对应的日志如下:
线程Thread-1正在写入数据...
线程Thread-3正在写入数据...
线程Thread-2正在写入数据...
线程Thread-0正在写入数据...
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
线程Thread-1写入数据完毕,等待其他线程写入完毕
Thread-1所有线程写入完毕,继续处理其他任务...
Thread-2所有线程写入完毕,继续处理其他任务...
Thread-3所有线程写入完毕,继续处理其他任务...
Thread-0所有线程写入完毕,继续处理其他任务...
CyclicBarrier重用
线程Thread-4正在写入数据...
线程Thread-5正在写入数据...
线程Thread-6正在写入数据...
线程Thread-7正在写入数据...
线程Thread-4写入数据完毕,等待其他线程写入完毕
线程Thread-6写入数据完毕,等待其他线程写入完毕
线程Thread-5写入数据完毕,等待其他线程写入完毕
线程Thread-7写入数据完毕,等待其他线程写入完毕
Thread-7所有线程写入完毕,继续处理其他任务...
Thread-4所有线程写入完毕,继续处理其他任务...
Thread-6所有线程写入完毕,继续处理其他任务...
Thread-5所有线程写入完毕,继续处理其他任务...
CyclicBarrier 源码
看完了 CyclicBarrier 的使用例子,让我们一起学习一下 CyclicBarrier 的源码吧。
jdk 版本
老马本次阅读 jdk 的版本是:
java version "1.8.0_191"
Java(TM) SE Runtime Environment (build 1.8.0_191-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.191-b12, mixed mode)
Generation
每一个栅栏都有一个对应的 Generation 维护对应的 broken 状态。
/**
* Each use of the barrier is represented as a generation instance.
* The generation changes whenever the barrier is tripped, or
* is reset. There can be many generations associated with threads
* using the barrier - due to the non-deterministic way the lock
* may be allocated to waiting threads - but only one of these
* can be active at a time (the one to which {@code count} applies)
* and all the rest are either broken or tripped.
* There need not be an active generation if there has been a break
* but no subsequent reset.
*/
private static class Generation {
boolean broken = false;
}
基本属性
可以看到,这里用到了 ReentrantLock 可重入锁及对应的 Condition 等。
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();
/**
* Number of parties still waiting. Counts down from parties to 0
* on each generation. It is reset to parties on each new
* generation or when broken.
*/
private int count;
构造器
我们最常用的就是第一个构造器,可以指定需要等个几个线程。
这里主要初始化 parties 和 count 的值。
barrierCommand 应该是一个可执行的 Runnable 任务,可以在 tripped 状态时执行。
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
await 方法
我们一起来看一下最常用的 await 方法:
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
当然,也有对应的指定超时时间的版本;
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
这里实际上调用的都是 dowait 方法:
好家伙,洋洋洒洒几十行,为了便于阅读,老马将解析写在代码注释中。
/**
* Main barrier code, covering the various policies.
* @author 老马啸西风
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
//通过可重入锁执行加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取当前的 Generation
final Generation g = generation;
if (g.broken)
// 如果已经是 broker 状态,直接抛出异常
throw new BrokenBarrierException();
// 如果当前线程被中断,则中断栅栏(下面有解释),并抛出中断异常
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 执行 --count,对计数器减一,判断是否为0(所有线程都已经 ready)
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 这里会调用一个 runnable 方法,默认是 null。
final Runnable command = barrierCommand;
if (command != null)
command.run();
// 更新标识,并且更新 generation(后续有讲解)
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 如果没有全部 ready,这里就是一个循环,也可以指定对应的等待超时时间。
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
// 如果没有超时设置,直接等待
trip.await();
else if (nanos > 0L)
// 通过 Condition.awaitNanos 方法控制超时
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 如果被中断,且 g 没变,也不是 broken 状态,直接调用 breakBarrier();
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
// 如果已经处于 broken 状态,抛出异常。
if (g.broken)
throw new BrokenBarrierException();
// 如果 g 和 generation 不匹配,直接返回 index
if (g != generation)
return index;
// 如果超时时间小于等于0,立刻超时。
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 释放锁
lock.unlock();
}
}
中断栅栏
这里是直接设置 broken 的值为 true,然后唤醒所有等待线程。
/**
* Sets current barrier generation as broken and wakes up everyone.
* Called only while holding lock.
*/
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
nextGeneration 更新 generation
唤醒所有的等待者,并且重新设置 generation。
/**
* Updates state on barrier trip and wakes up everyone.
* Called only while holding lock.
*/
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
小结
CyclicBarrier 作为一个并发的控制工具,和 CountDownLatch 对比个人感觉最大的优势就是可重用。而且多个线程执行完成后,才能继续执行,非常适合多线程任务拆分执行等场景。
希望本文对你有帮助,如果有其他想法的话,也可以评论区和大家分享哦。
各位极客的点赞收藏转发,是老马持续写作的最大动力!
参考资料
jdk 源码