同步工具类

同步工具类主要包括闭锁(如CountDownLatch),栅栏(如CyclicBarrier),信号量(如Semaphore)和阻塞队列(如LinkedBlockingQueue)等;

FutureTask/Future 也可以达到同样的目的。

使用同步工具类可以协调线程的控制流;

同步工具类封装了一些状态,这些状态决定线程是继续执行还是等待,此外同步工具类还提供了修改状态的方法;

下面将简单介绍以上同步工具类;

常用同步工具类

CountDownLatch(闭锁)

延迟线程的进度,直到其达到终止状态,所有线程将释放进度,当其到达结束状态后,将不会再改变状态

FutureTask(也可以当做闭锁)

可以返回计算结果的任务

包括三种状态:等待运行(Waiting to run)、正在运行(Running)、运行完成(Completed)

执行完成表示计算的所有结束方式:正常结束、由与取消结束、由与异常结束;任务进入完成状态后将会停止在这个状态上

FutureTask表示的计算通过Callable接口来实现

Callable可以抛出受检查的或未受检查的异常,并且任何代码都可能抛出Error

无论任务代码抛出什么异常,都会被封装到一个ExecutionException中,并在Future.get中被重新抛出

上面两条将使get代码变得复杂,因为不仅需要处理可能出现的ExecutionException以及未受检查的CancellationException,而且还由于ExecutionException是做为一个Throwable类返回的

优点:提前启动计算,可以减少等待结果需要的时间

Semaphore

对资源施加边界

CyclicBarrier

与闭锁相同点:栅栏类似于闭锁,他能阻塞一组线程直到某个事件发生

与闭锁不同点:闭锁是一次性的,栅栏可多次使用;闭锁用于等待事件,而栅栏用于等待其他线程

Exchanger

它是一种 Two-Party 栅栏,各方在栅栏位置上交换数据

闭锁

简介

可以让一个线程等待一组事件发生后(不一定要线程结束)继续执行;

闭锁是一种同步工具类,可以延迟线程的进度直到其达到终止状态。闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态。闭锁可以用来确保某些活动直到其它活动都完成后才继续执行,例如:

1、确保某个计算在其需要的所有资源都被初始化之后才继续执行。二元闭锁(包括两个状态)可以用来表示“资源R已经被初始化”,而所有需要R的操作都必须先在这个闭锁上等待。

2、确保某个服务在其依赖的所有其它服务都已经启动之后才启动。每个服务都有一个相关的二元闭锁。当启动服务S时,将首先在S依赖的其它服务的闭锁上等待,在所有依赖的服务都启动后会释放闭锁S,这样其他依赖S的服务才能继续执行。

3、等待直到某个操作的所有参与者(例如,在多玩家游戏中的所有玩家)都就绪再继续执行。在这种情况中,当所有玩家都准备就绪时,闭锁将到达结束状态。

CountDownLatch是一种灵活的闭锁实现,可以在上述各种情况中使用,它可以使一个或多个线程等待一组事件发生。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown方法递减计数器,表示有一个事件已经发生了,而await方法等待计数器到达零,这表示所有需要等待的事件都已经发生。如果计数器的值非零,那么await会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。在下面的程序中给出了闭锁的两种常见用法。TestHarness创建一定数量的线程,利用他们并发地执行制定的任务。它使用两个闭锁,分别表示“起始门(Starting Gate)”和“结束门(Ending Gate)”。起始门计数器的初始值为1,而结束门计数器的初始值为工作线程的数量。每个工作线程首先要做的值就是在起始门上等待,从而确保所有线程都就绪后才开始执行。而每个线程要做的最后一件事情就是将调用结束门的countDown方法减去1,这能使主线程高效的等待直到所有工作线程都执行完成,因此可以统计所消耗的事件。

例子

举个例子如下,main 线程等待其它子线程的事件发生后继续执行 main 线程:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class TestHarness { public long timeTakes(int nThreads, final Runnable task) throws InterruptedException { final CountDownLatch startGate = new CountDownLatch(1); final CountDownLatch endGate = new CountDownLatch(nThreads); for (int i = 0; i < nThreads; i++) { Thread t = new Thread() { public void run() { try { startGate.await(); try { task.run(); } finally { endGate.countDown(); } } catch (InterruptedException e) { e.printStackTrace(); } } }; t.start(); } long start = System.nanoTime(); startGate.countDown(); endGate.await(); long end = System.nanoTime(); return end - start; } public static void main(String[] args) throws InterruptedException { TestHarness testHarness = new TestHarness(); long nanoTime = testHarness.timeTakes(10, new Runnable() { public void run() { try { Thread.sleep(1000); System.out.println("Task is over!!!"); } catch (InterruptedException e) { e.printStackTrace(); } } }); System.out.println(nanoTime); } }

FutureTask

简介

FutureTask也可以做闭锁。FutureTask表示的计算是通过Callable来实现的,相当于一种可生成结果的Runnable,并且可以处于以下3种状态:等待运行(Waiting to run),正在运行(Running)和运行完成(Completed)。“执行完成”表示计算的所有可能结束方式,包括正常结束、由于取消而结束和由于异常而结束等。当FutureTask 进入完成状态后,它会永远停在这个状态上。

FutureTask.get的行为取决于任务的状态。如果任务已经完成,那么get会立即返回结果,否则get将阻塞直到任务进入完成状态,然后返回结果或者抛出异常。FutureTask将计算结果从执行计算的线程传递到获取这个结果的线程,而FutureTask的规范确保了这种传递过程能实现结果的安全发布。

代码示例

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Preloader { private final FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() { public String call() throws Exception { Thread.sleep(10000); return "Task is over"; } }); private final Thread thread = new Thread(futureTask); public void start() { thread.start(); } public String get() throws ExecutionException, InterruptedException { return futureTask.get(); } public static void main(String[] args) throws ExecutionException, InterruptedException { Preloader preloader = new Preloader(); preloader.start(); System.out.println(preloader.get()); } }

在Preloader中,当get方法抛出ExecutionException时,可能是以下三种情况之一:Callable抛出的受检查异常,RuntimeException,以及Error。

我们必须对每种情况单独处理。

信号量

简介

计数信号量(Counting Semaphore)用来控制同时访问的某个特定资源的操作数量,或者同时执行某个指定操作的数量[CPJ 3.4.1]。计算信号量还可以用来实现某种资源池,或者对容器施加边界。

Semaphore中管理着一组虚拟许可(permit),许可的初始量可通过构造函数来指定。在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用以后释放许可。如果没有许可,那么acquire将阻塞直到有许可(或者直到被中断或者操作超时)。release方法将返回一个许可给信号量。计算信号量的一种简化形式是二值信号量,即初始化值为1的Semaphore。二值信号量可以用做互斥体(mutex),并具备不可重入的加锁语义:谁拥有这个唯一的许可,谁就拥有了互斥锁。

代码示例

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class TestSemaphore { public static void main(String[] args) { // 线程池 ExecutorService exec = Executors.newCachedThreadPool(); // 只能5个线程同时访问 final Semaphore semp = new Semaphore(5); // 模拟20个客户端访问 for (int index = 0; index < 20; index++) { final int NO = index; Runnable run = new Runnable() { public void run() { try { // 获取许可 semp.acquire(); System.out.println("Accessing: " + NO); Thread.sleep((long) (Math.random() * 10000)); // 访问完后,释放 semp.release(); System.out.println("-----------------" + semp.availablePermits()); } catch (InterruptedException e) { e.printStackTrace(); } } }; exec.execute(run); } // 退出线程池 exec.shutdown(); } }

栅栏(Barrier)

简介

栅栏(Barrier)类似于闭锁,它能阻塞一组线程直到某个事件发生[CPJ 4.4.3]。闭锁是一次性对象,一旦进入最终状态,就不能被重置了。

栅栏与闭锁的关键区别在于,所有线程必须同时达到栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。

栅栏用于实现一些协议,例如几个家庭决定在某个地方集合:“所有人6:00在麦当劳碰头,到了以后要等其他人,之后再讨论下一步要做的事情。”

CyclicBarrer可以使一定数量的参与方反复地在栅栏位置汇集,它在并行迭代算法中非常有用:这种算法通常将一个问题拆分一些列互相独立的子问题。如果所有线程都达到了栅栏位置,那么栅栏将打开,为此所有线程都被释放,而栅栏将被重置以便下次使用。如果对await的调用超时,或者await阻塞的线程被中断,那么栅栏就认为是打破了,所有阻塞的await调用都将终止并抛出BrokenBarrerException。如果成功通过栅栏,那么await将为每个线程返回一个唯一的到达索引号,我们可以利用这些索引来“选举”产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作。CyclicBarrer还可以使你将一个栅栏操作传递给构造函数,这是Runnable,当成功通过栅栏时会(在一个子任务线程中)执行它,但在阻塞线程被释放之前是不能被执行的。

在模拟程序中通常需要使用栅栏,例如某个步骤中的计算可以并行执行,但必须等到该步骤中的所有计算都执行完成才能进入下一个步骤。

代码示例

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class TestCyclicBarrier { public static void main(String[] args) { int N = 4; CyclicBarrier barrier = new CyclicBarrier(N); // CyclicBarrier barrier = new CyclicBarrier(N,new Runnable() { // @Override // public void run() { // System.out.println("当前线程"+Thread.currentThread().getName()); // } // }); for (int i = 0; i < N; i++) { new Writer(barrier).start(); } try { Thread.sleep(25000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("CyclicBarrier重用"); for (int i = 0; i < N; i++) { new Writer(barrier).start(); } } static class Writer extends Thread { private CyclicBarrier cyclicBarrier; public Writer(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据..."); try { Thread.sleep(5000); //以睡眠来模拟写入数据操作 System.out.println("线程" + Thread.currentThread().getName() + "写入数据完毕,等待其他线程写入完毕"); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "所有线程写入完毕,继续处理其他任务..."); } } }

Exchanger

简介

栅栏在Java同步工具包中的体现还有一个Exchanger,是一个双方栅栏,每一个在栅栏处交换数据。当双方执行的操作不对称的时候,Exchanger会很有用。当双方线程都到达栅栏的时候,将双方的数据进行交换,这个Exchanger对象可以使得两个线程生成的对象能够安全地交换。

空构造函数

这个类只提供了一个空构造函数,提供了两个方法:

  [java]
1
2
3
exchange(V x);//交换双方线程生成对象 交换成功或者被中断 exchange(V x,long timeout, TimeUnit unit);//交换双方线程生成对象 交换成功或者超时抛出超时异常或者被中断

代码示例

我们使用这个栅栏类模拟以下场景,两个线程,一个线程沉睡3000ms后交换字符串,一个线程直接交换字符串,互相输出接收到的字符串已经等待时间:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import java.util.concurrent.Exchanger; class Thread1 extends Thread{ private Exchanger<String> exchanger; private String name; public Thread1(String name,Exchanger<String> exchanger){ super(name); this.exchanger = exchanger; } @Override public void run(){ try { long startTime = System.currentTimeMillis(); Thread.sleep(3000); System.out.println(Thread.currentThread()+"获取到数据:"+exchanger.exchange("我是Thread1的实例")); System.out.println("等待了"+(System.currentTimeMillis()-startTime)); } catch (InterruptedException e) { e.printStackTrace(); } } } class Thread2 extends Thread{ private Exchanger<String> exchanger; private String name; public Thread2(String name,Exchanger<String> exchanger){ super(name); this.exchanger = exchanger; } @Override public void run(){ try { long startTime = System.currentTimeMillis(); System.out.println(Thread.currentThread()+"获取到数据:"+exchanger.exchange("我是Thread2的实例")); System.out.println("等待了"+(System.currentTimeMillis()-startTime)); } catch (InterruptedException e) { e.printStackTrace(); } } } public class Main { public static void main(String[] args) { Exchanger<String> exchanger = new Exchanger<String>(); new Thread1("thread1",exchanger).start(); new Thread2("thread2",exchanger).start(); } }

也就是交换双方先到栅栏处的会等待后到达栅栏处的,直到交换双方都到达栅栏然后开始交换数据。

阻塞队列

简介

阻塞队列提供了可阻塞的入队和出对操作,如果队列满了,入队操作将阻塞直到有空间可用,如果队列空了,出队操作将阻塞直到有元素可用;

队列可以为有界和无界队列,无界队列不会满,因此入队操作将不会阻塞;

下面将使用阻塞队列LinkedBlockingQueue举个生产者-消费者例子,生产者每隔1秒生产1个产品,然后有6个消费者在消费产品,可以发现,每隔1秒,只有一个消费者能够获取到产品消费,其它线程只能等待…

参见 BlockingQueue

代码示例

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; //生产者 public class Producer implements Runnable { private final BlockingQueue<String> fileQueue; public Producer(BlockingQueue<String> queue) { this.fileQueue = queue; } public void run() { try { while (true) { TimeUnit.MILLISECONDS.sleep(1000); String produce = this.produce(); System.out.println(Thread.currentThread() + "生产:" + produce); fileQueue.put(produce); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } public String produce() { SimpleDateFormat dfdate = new SimpleDateFormat("HH:mm:ss"); return dfdate.format(new Date()); } public static void main(String[] args) { BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10); for (int i = 0; i < 1; i++) { new Thread(new Producer(queue)).start(); } for (int i = 0; i < 6; i++) { new Thread(new Consumer(queue)).start(); } } } // 消费者 class Consumer implements Runnable { private final BlockingQueue<String> queue; public Consumer(BlockingQueue<String> queue) { this.queue = queue; } public void run() { try { while (true) { TimeUnit.MILLISECONDS.sleep(1000); System.out.println(Thread.currentThread() + "prepare 消费"); System.out.println(Thread.currentThread() + "starting:" + queue.take()); System.out.println(Thread.currentThread() + "end 消费"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }

参考资料

CountDownLatch 同步倒数计数器

5同步工具类

同步工具类–闭锁

同步工具类

线程同步工具类