双端队列
Deque 是 Double ended queue (双端队列) 的缩写,读音和 deck 一样,蛋壳。
Deque 主要实现类有ArrayDeque 和 LinkedBlockingDeque。
使用场景
Deque 的实现类主要分为两种场景:
- 一般场景
LinkedList 大小可变的链表双端队列,允许元素为 null
ArrayDeque 大下可变的数组双端队列,不允许 null
- 并发场景
LinkedBlockingDeque 如果队列为空时,获取操作将会阻塞,知道有元素添加
性能最好的应该是:ConcurrentLinkedDeque
常见实现及说明
ArrayBlockingQueue:基于数组实现的一个有界阻塞队,大小不能重新定义。所以当你试图向一个满的队列添加元素的时候,就会受到阻塞,直到另一个方法从队列中取出元素。
ConcurrentLinkedDeque / ConcurrentLinkedQueue:基于链表实现的无界队列,添加元素不会堵塞。但是这就要求这个集合的消费者工作速度至少要和生产这一样快,不然内存就会耗尽。严重依赖于CAS(compare-and-set)操作。
DelayQueue:无界的保存Delayed元素的集合。元素只有在延时已经过期的时候才能被取出。队列的第一个元素延期最小(包含负值——延时已经过期)。当你要实现一个延期任务的队列的时候使用(不要自己手动实现——使用ScheduledThreadPoolExecutor)。
LinkedBlockingDeque / LinkedBlockingQueue:可选择有界或者无界基于链表的实现。在队列为空或者满的情况下使用ReentrantLock-s。
LinkedTransferQueue:基于链表的无界队列。除了通常的队列操作,它还有一系列的transfer方法,可以让生产者直接给等待的消费者传递信息,这样就不用将元素存储到队列中了。这是一个基于CAS操作的无锁集合。 PriorityBlockingQueue:PriorityQueue的无界的版本。
SynchronousQueue:一个有界队列,其中没有任何内存容量。这就意味着任何插入操作必须等到响应的取出操作才能执行,反之亦反。如果不需要Queue接口的话,通过Exchanger类也能完成响应的功能。
list 适合做双端队列吗?
list是天生的双端队列,前后伸展都很容易。如果说它不适合,是因为:
1) 如果单个数据节点不大,每个节点都加上prev和next两个指针域(占八个字节),实在太浪费了。如数据仅仅是一个int,list就要用三倍的空间!
2) 如果空间配置器不支持内存池分配,数据单位字节数少而数量多,产生的内存碎片会很多,还有堆空间前面四个字节的小甜饼占了太多内存,理由跟1)一样,还是内存!
工作密取
在 生产者-消费者 模式中,所有消费者都从一个工作队列中取元素,一般使用阻塞队列实现; 而在 工作密取 模式中,每个消费者有其单独的工作队列,如果它完成了自己双端队列中的全部工作,那么它就可以从其他消费者的双端队列末尾秘密地获取工作。
vs 生产者-消费者
工作密取模式对比传统的 生产者-消费者模式,更为灵活。
因为多个线程不会因为在同一个工作队列中抢占内容发生竞争。
在大多数时候,它们只是访问自己的双端队列。即使需要访问另一个队列时,也是从队列的尾部获取工作,降低了队列上的竞争程度。
适用场景
工作密取非常适用于即是消费者也是生产者问题。
当执行某个工作时可能导致出现更多的工作。
例如,在网页爬虫程序中处理一个页面时,通常会发现有更多的页面需要处理。类似的还有许多搜索图的算法,例如在垃圾回收阶段对堆进行标记,都可以通过工作密取机制来实现高效并行。当一个工作线程找到新的任务单元时,它会将其放到自己队列的末尾(或者在工作共享模式中,放入其它工作者线程的队列中)。当双端队列为空时,它会在另一个线程的队列末尾查找新的任务,从而确保每个线程都保持忙碌状态。
工作密取算法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130public interface WorkStealingEnableChannel<P> extends Chanel<P> {
P take(BlockingDeque<P> preferredQueue) throws InterruptedException;
}
public class WorkStealingChannel<P> implements WorkStealingEnableChannel<P> {
//双端队列,可以从2端插入值或获取值,继承了BlockingQueue
private final BlockingDeque<P>[] managedQueues;
public WorkStealingChannel(BlockingDeque<P>[] managedQueues) {
super();
this.managedQueues = managedQueues;
}
@Override
public P take() throws InterruptedException {
return take(null);
}
@Override
public void put(P product) throws InterruptedException {
int targetIndex = (product.hashCode() % managedQueues.length);
BlockingQueue<P> targetQueue = managedQueues[targetIndex];
targetQueue.put(product);
}
@Override
public P take(BlockingDeque<P> preferredQueue) throws InterruptedException {
BlockingDeque<P> targetQueue = preferredQueue;
P product = null;
//优先从指定的队列获取值
if(null != targetQueue){
product = targetQueue.poll();
}
int queueIndex = -1;
while(null != product){
queueIndex = (queueIndex +1) % managedQueues.length;
targetQueue = managedQueues[queueIndex];
//试图从其他受管队列的队尾“窃取”“产品”
product = targetQueue.pollLast();
if(preferredQueue == targetQueue){
break;
}
}
if(null == product){
//随机窃取 其他受管队列的产品
queueIndex = (int) (System.currentTimeMillis() % managedQueues.length);
targetQueue = managedQueues[queueIndex];
product = targetQueue.pollLast();
System.out.println("stealed from " + queueIndex + ": " + product);
}
return product;
}
}
public class WorkStealingExample {
private final WorkStealingEnableChannel<String> channel;
private final TerminationToken token = new TerminationToken();
public static void main(String[] args) throws InterruptedException {
WorkStealingExample wse = new WorkStealingExample();
//Thread.sleep(3500);
}
public WorkStealingExample(){
int nCPU = Runtime.getRuntime().availableProcessors();
int consumerCount = nCPU/2 + 1;
BlockingDeque<String>[] managedQueues = new LinkedBlockingDeque[consumerCount];
channel = new WorkStealingChannel<String>(managedQueues);
Consumer[] consumers = new Consumer[consumerCount];
for(int i=0; i<consumerCount; i++){
managedQueues[i] = new LinkedBlockingDeque<String>();
consumers[i] = new Consumer(token, managedQueues[i]);
}
for(int i=0; i<nCPU; i++){
new Producer().start();
}
for(int i=0; i<consumerCount; i++){
consumers[i].start();
}
}
private class Producer extends AbstractTerminatableThread{
private int i = 0;
@Override
protected void doRun() throws Exception {
channel.put(String.valueOf(i++));
Thread.sleep(10);
token.reservations.incrementAndGet();
}
}
private class Consumer extends AbstractTerminatableThread{
private final BlockingDeque<String> workQueue;
public Consumer(TerminationToken token, BlockingDeque<String> workQueue) {
super(token);
this.workQueue = workQueue;
}
@Override
protected void doRun() throws Exception {
/**
* 实现了工作窃取算法
*/
String product = channel.take();
if(product != null){
}
System.out.println("Processing product:" + product);
try {
Thread.sleep(new Random().nextInt(50));
} catch (Exception e) {
}finally{
token.reservations.decrementAndGet();
}
}
}
}
参考资料
https://www.cnblogs.com/drizzlewithwind/p/6392229.html?utm_source=itdadao&utm_medium=referral
https://blog.csdn.net/u011240877/article/details/52865173
https://blog.csdn.net/liangdong2014/article/details/40790037