JCIP-19-闭锁(如CountDownLatch),栅栏(如CyclicBarrier),信号量(如Semaphore)和阻塞队列(如LinkedBlockingQueue)
同步工具类
同步工具类主要包括闭锁(如CountDownLatch),栅栏(如CyclicBarrier),信号量(如Semaphore)和阻塞队列(如LinkedBlockingQueue)等;
FutureTask/Future 也可以达到同样的目的。
使用同步工具类可以协调线程的控制流;
同步工具类封装了一些状态,这些状态决定线程是继续执行还是等待,此外同步工具类还提供了修改状态的方法;
下面将简单介绍以上同步工具类;
常用同步工具类
CountDownLatch(闭锁)
延迟线程的进度,直到其达到终止状态,所有线程将释放进度,当其到达结束状态后,将不会再改变状态
FutureTask(也可以当做闭锁)
可以返回计算结果的任务
包括三种状态:等待运行(Waiting to run)、正在运行(Running)、运行完成(Completed)
执行完成表示计算的所有结束方式:正常结束、由与取消结束、由与异常结束;任务进入完成状态后将会停止在这个状态上
FutureTask表示的计算通过Callable接口来实现
Callable可以抛出受检查的或未受检查的异常,并且任何代码都可能抛出Error
无论任务代码抛出什么异常,都会被封装到一个ExecutionException中,并在Future.get中被重新抛出
上面两条将使get代码变得复杂,因为不仅需要处理可能出现的ExecutionException以及未受检查的CancellationException,而且还由于ExecutionException是做为一个Throwable类返回的
优点:提前启动计算,可以减少等待结果需要的时间
Semaphore
对资源施加边界
CyclicBarrier
与闭锁相同点:栅栏类似于闭锁,他能阻塞一组线程直到某个事件发生
与闭锁不同点:闭锁是一次性的,栅栏可多次使用;闭锁用于等待事件,而栅栏用于等待其他线程
Exchanger
它是一种 Two-Party 栅栏,各方在栅栏位置上交换数据
闭锁
简介
可以让一个线程等待一组事件发生后(不一定要线程结束)继续执行;
闭锁是一种同步工具类,可以延迟线程的进度直到其达到终止状态。闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态。闭锁可以用来确保某些活动直到其它活动都完成后才继续执行,例如:
1、确保某个计算在其需要的所有资源都被初始化之后才继续执行。二元闭锁(包括两个状态)可以用来表示“资源R已经被初始化”,而所有需要R的操作都必须先在这个闭锁上等待。
2、确保某个服务在其依赖的所有其它服务都已经启动之后才启动。每个服务都有一个相关的二元闭锁。当启动服务S时,将首先在S依赖的其它服务的闭锁上等待,在所有依赖的服务都启动后会释放闭锁S,这样其他依赖S的服务才能继续执行。
3、等待直到某个操作的所有参与者(例如,在多玩家游戏中的所有玩家)都就绪再继续执行。在这种情况中,当所有玩家都准备就绪时,闭锁将到达结束状态。
CountDownLatch是一种灵活的闭锁实现,可以在上述各种情况中使用,它可以使一个或多个线程等待一组事件发生。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown方法递减计数器,表示有一个事件已经发生了,而await方法等待计数器到达零,这表示所有需要等待的事件都已经发生。如果计数器的值非零,那么await会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。在下面的程序中给出了闭锁的两种常见用法。TestHarness创建一定数量的线程,利用他们并发地执行制定的任务。它使用两个闭锁,分别表示“起始门(Starting Gate)”和“结束门(Ending Gate)”。起始门计数器的初始值为1,而结束门计数器的初始值为工作线程的数量。每个工作线程首先要做的值就是在起始门上等待,从而确保所有线程都就绪后才开始执行。而每个线程要做的最后一件事情就是将调用结束门的countDown方法减去1,这能使主线程高效的等待直到所有工作线程都执行完成,因此可以统计所消耗的事件。
例子
举个例子如下,main 线程等待其它子线程的事件发生后继续执行 main 线程:
public class TestHarness {
public long timeTakes(int nThreads, final Runnable task) throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);
for (int i = 0; i futureTask = new FutureTask(new Callable() {
public String call() throws Exception {
Thread.sleep(10000);
return "Task is over";
}
});
private final Thread thread = new Thread(futureTask);
public void start() {
thread.start();
}
public String get() throws ExecutionException, InterruptedException {
return futureTask.get();
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
Preloader preloader = new Preloader();
preloader.start();
System.out.println(preloader.get());
}
}
在Preloader中,当get方法抛出ExecutionException时,可能是以下三种情况之一:Callable抛出的受检查异常,RuntimeException,以及Error。
我们必须对每种情况单独处理。
信号量
简介
计数信号量(Counting Semaphore)用来控制同时访问的某个特定资源的操作数量,或者同时执行某个指定操作的数量[CPJ 3.4.1]。计算信号量还可以用来实现某种资源池,或者对容器施加边界。
Semaphore中管理着一组虚拟许可(permit),许可的初始量可通过构造函数来指定。在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用以后释放许可。如果没有许可,那么acquire将阻塞直到有许可(或者直到被中断或者操作超时)。release方法将返回一个许可给信号量。计算信号量的一种简化形式是二值信号量,即初始化值为1的Semaphore。二值信号量可以用做互斥体(mutex),并具备不可重入的加锁语义:谁拥有这个唯一的许可,谁就拥有了互斥锁。
代码示例
public class TestSemaphore {
public static void main(String[] args) {
// 线程池
ExecutorService exec = Executors.newCachedThreadPool();
// 只能5个线程同时访问
final Semaphore semp = new Semaphore(5);
// 模拟20个客户端访问
for (int index = 0; index exchanger;
private String name;
public Thread1(String name,Exchanger exchanger){
super(name);
this.exchanger = exchanger;
}
@Override
public void run(){
try {
long startTime = System.currentTimeMillis();
Thread.sleep(3000);
System.out.println(Thread.currentThread()+"获取到数据:"+exchanger.exchange("我是Thread1的实例"));
System.out.println("等待了"+(System.currentTimeMillis()-startTime));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Thread2 extends Thread{
private Exchanger exchanger;
private String name;
public Thread2(String name,Exchanger exchanger){
super(name);
this.exchanger = exchanger;
}
@Override
public void run(){
try {
long startTime = System.currentTimeMillis();
System.out.println(Thread.currentThread()+"获取到数据:"+exchanger.exchange("我是Thread2的实例"));
System.out.println("等待了"+(System.currentTimeMillis()-startTime));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Main {
public static void main(String[] args) {
Exchanger exchanger = new Exchanger();
new Thread1("thread1",exchanger).start();
new Thread2("thread2",exchanger).start();
}
}
也就是交换双方先到栅栏处的会等待后到达栅栏处的,直到交换双方都到达栅栏然后开始交换数据。
阻塞队列
简介
阻塞队列提供了可阻塞的入队和出对操作,如果队列满了,入队操作将阻塞直到有空间可用,如果队列空了,出队操作将阻塞直到有元素可用;
队列可以为有界和无界队列,无界队列不会满,因此入队操作将不会阻塞;
下面将使用阻塞队列LinkedBlockingQueue举个生产者-消费者例子,生产者每隔1秒生产1个产品,然后有6个消费者在消费产品,可以发现,每隔1秒,只有一个消费者能够获取到产品消费,其它线程只能等待...
代码示例
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
//生产者
public class Producer implements Runnable {
private final BlockingQueue fileQueue;
public Producer(BlockingQueue queue) {
this.fileQueue = queue;
}
public void run() {
try {
while (true) {
TimeUnit.MILLISECONDS.sleep(1000);
String produce = this.produce();
System.out.println(Thread.currentThread() + "生产:" + produce);
fileQueue.put(produce);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public String produce() {
SimpleDateFormat dfdate = new SimpleDateFormat("HH:mm:ss");
return dfdate.format(new Date());
}
public static void main(String[] args) {
BlockingQueue queue = new LinkedBlockingQueue(10);
for (int i = 0; i queue;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
while (true) {
TimeUnit.MILLISECONDS.sleep(1000);
System.out.println(Thread.currentThread() + "prepare 消费");
System.out.println(Thread.currentThread() + "starting:"
+ queue.take());
System.out.println(Thread.currentThread() + "end 消费");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}