SynchronousQueue

思维导图

是什么

SynchronousQueue 是这样一种阻塞队列,其中每个 put 必须等待一个 take,反之亦然。

简而言之:线程安全,阻塞。

入门案例

我们定义两个线程,一个负责写入,一个负责读取。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; /** * @author 老马啸西风 * @since 1.0.0 */ public class SynchronousQueueDemo { public static void main(String[] args) { SynchronousQueue<Integer> queue = new SynchronousQueue<>(); new Writer(queue).start(); new Reader(queue).start(); } private static class Writer extends Thread { SynchronousQueue<Integer> queue; public Writer(SynchronousQueue<Integer> queue) { this.queue = queue; } @Override public void run() { for (int i = 0; i < 5; i++) { System.out.println("开始设置第 " + i + " 个元素"); try { TimeUnit.SECONDS.sleep(2); queue.put(i); } catch (InterruptedException e) { e.printStackTrace(); } } } } /** * 消息读取者 */ private static class Reader extends Thread { SynchronousQueue<Integer> queue; public Reader(SynchronousQueue<Integer> queue) { this.queue = queue; } @Override public void run() { while (true) { try { System.out.println("读取信息: " + queue.take() + "\n"); } catch (InterruptedException e) { e.printStackTrace(); } } } } }

对应的日志信息如下:

  [plaintext]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
开始设置第 0 个元素 开始设置第 1 个元素 读取信息: 0 开始设置第 2 个元素 读取信息: 1 开始设置第 3 个元素 读取信息: 2 开始设置第 4 个元素 读取信息: 3 读取信息: 4

可以看到当元素被设置之后,就会被立刻读取。这个在实际使用过程中还是非常便利的,比轮训优雅多了。

源码分析

类定义

  [java]
1
2
3
4
5
6
7
public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { // 转换类实现,也是最核心的一个属性。 // 后面会详细讲解 private transient volatile Transferer<E> transferer; }

实现了阻塞队列接口,继承自 AbstractQueue 抽象队列。

实际上对应的 put/take 方法,经过 transfer 的封装之后,都变得非常简单。

我们本文的核心在于对 Transferer 的解析。

构造器

SynchronousQueue 也是支持是否为公平锁模式的。

默认为非公平模式。

是否公平取决于使用的 Transfer 实现子类。

  [java]
1
2
3
4
5
6
public SynchronousQueue() { this(false); } public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }

put 方法

  [java]
1
2
3
4
5
6
7
8
9
10
11
/** * Adds the specified element to this queue, waiting if necessary for * another thread to receive it. */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); } }

take 方法

  [java]
1
2
3
4
5
6
7
8
9
10
11
/** * Retrieves and removes the head of this queue, waiting if necessary * for another thread to insert it. */ public E take() throws InterruptedException { E e = transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); }

算法笔记

下面是源码中的一部分算法笔记,不会出现在文档中。

此类实现W. N. Scherer III和M. L. Scott所著的“不带条件同步的并发对象的无阻塞”中描述的双堆栈和双队列算法的扩展。

第十八届年度大会 (2004年10月,分布式计算)(另请参见 http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/dals.html )。

(Lifo)堆栈用于非公平模式,(Fifo)队列用于公平模式。

两者的性能通常相似。

Fifo通常在竞争下支持更高的吞吐量,但是Lifo在常见应用程序中保持更高的线程局部性

双队列(和类似的堆栈)是在任何给定时间保存“数据”(由put操作提供的项,或“请求”)的插槽,表示 take 操作,或者为空。

fulfill 的调用(即,从保存数据的队列中请求元素的调用,反之亦然)使互补节点出队。

互补节点

这些队列最有趣的功能是,任何操作都可以弄清楚队列所处的模式,并且无需锁就可以采取相应的措施。

队列和堆栈都扩展了抽象类Transferer,它们定义了执行放置或取出操作的单个方法。

将它们统一为一个方法,因为在双重数据结构中,放置和取出操作是对称的,因此几乎所有代码都可以合并。

最终的传输方法长远来看,但比分解成几乎重复的部分要容易得多。

队列和堆栈数据结构在概念上有很多相似之处,但具体细节很少。

为简单起见,它们保持不同,以便以后可以分别发展。

此处的算法与上述论文中的版本不同,在于扩展了它们以用于同步队列以及处理取消。

主要区别包括:

  1. 原始算法使用带位标记的指针,但此处的算法使用节点中的模式位,从而导致了许多进一步的调整。

  2. SynchronousQueues必须阻塞等待实现的线程。

  3. 支持通过超时和中断进行取消,包括从列表中清除已取消的节点/线程,以避免垃圾保留和内存耗尽。

阻塞主要使用LockSupport park/unpark 来完成,除了看起来像是首先要实现的下一个要暂存的节点外,它还会旋转一点(仅在多处理器上)。

在非常繁忙的同步队列上,旋转可以大大提高吞吐量。

在不那么忙碌的队列上,自旋的量很小,不足以引起注意。

在队列和堆栈中以不同的方式进行清理。

对于队列,我们几乎总是可以在取消节点后的 O(1) 时间内立即删除该节点(进行模数重试以进行一致性检查)。

但是,如果可能将其固定为当前尾巴,则必须等待直到随后的一些取消。

对于堆栈,我们需要潜在的 O(n) 遍历,以确保可以删除节点,但这可以与其他访问堆栈的线程同时运行。

尽管垃圾回收会处理大多数会使非阻塞算法复杂化的节点回收问题,但还是要小心“忘记”对数据,其他节点和可能被阻塞线程长期保留的线程的引用。

如果设置为null会与主要算法冲突,则可以通过将节点的链接更改为现在指向节点本身来完成。

ps: 这是一个不错的技巧。

对于Stack节点,这不会发生太多(因为阻塞的线程不会挂在旧的头部指针上),但是必须积极地忘记Queue节点中的引用,以防止自到达以来任何节点都曾引用的所有内容都可以访问。

内部变量

有几个关于内核数等的相关变量。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/** ** 内核数,用于自旋锁控制。 ** 单核是不能自旋的。 ** ** @author 老马啸西风 */ static final int NCPUS = Runtime.getRuntime().availableProcessors(); /** * 最大的自旋次数,如果是单核,不进行自旋。 * 如果是多核,最多旋转 32 次。 */ static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32; /** * 在阻塞未定时的等待之前旋转的次数。 * 此值大于定时值,因为非定时等待旋转得更快,因为他们不需要每次旋转都检查时间。 */ static final int maxUntimedSpins = maxTimedSpins * 16; /** * 旋转秒级比使用定时停泊更快的纳秒数。 * 粗略的估计就足够了。 */ static final long spinForTimeoutThreshold = 1000L;

Transferer

接口定义

一个非常简单的方法。

  [java]
1
2
3
abstract static class Transferer<E> { abstract E transfer(E e, boolean timed, long nanos); }

这里就像算法笔记中说的,使用了队列(FIFO) 和 堆栈(FILO)

阻塞队列

基于队列

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
/** * * 这扩展了Scherer-Scott双队列算法,其不同之处在于,通过使用节点内的模式而不是标记的指针来实现。 * * 该算法比堆栈的算法更简单,因为实现者节点,并且匹配是通过CAS将QNode.item字段从非null转换为null(用于放置)或反之亦然(用于take)来完成的。 * * @author 老马啸西风 * */ static final class TransferQueue<E> extends Transferer<E> { /** 头结点 */ transient volatile QNode head; /** 尾节点 */ transient volatile QNode tail; /** * 对已取消节点的引用,该节点可能尚未取消与队列的链接,因为它是取消时最后插入的节点。 */ transient volatile QNode cleanMe; TransferQueue() { QNode h = new QNode(null, false); // initialize to dummy node. head = h; tail = h; } /** * 尝试以 cas nh 作为新元素; 如果成功,请取消链接旧头的下一个节点,以免造成垃圾遗留。 */ void advanceHead(QNode h, QNode nh) { if (h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh)) h.next = h; // forget old next } /** * T尝试设置 nt 作为新的尾巴元素 */ void advanceTail(QNode t, QNode nt) { if (tail == t) UNSAFE.compareAndSwapObject(this, tailOffset, t, nt); } /** * 尝试 CAS 设置 cleanMe 元素 */ boolean casCleanMe(QNode cmp, QNode val) { return cleanMe == cmp && UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val); } // Unsafe 相关属性 // 这个很多并发类中都有,基本是固定的。 private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; private static final long tailOffset; private static final long cleanMeOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = TransferQueue.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); tailOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("tail")); cleanMeOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("cleanMe")); } catch (Exception e) { throw new Error(e); } } }

节点定义

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/** Node class for TransferQueue. */ static final class QNode { volatile QNode next; // next node in queue volatile Object item; // CAS'ed to or from null volatile Thread waiter; // to control park/unpark final boolean isData; QNode(Object item, boolean isData) { this.item = item; this.isData = isData; } // 基于 CAS 的设置方法 boolean casNext(QNode cmp, QNode val) { return next == cmp && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } boolean casItem(Object cmp, Object val) { return item == cmp && UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } /** * 尝试通过 CAS 取消元素的引用 */ void tryCancel(Object cmp) { UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this); } // 判断是否已经取消 boolean isCancelled() { return item == this; } /** * 下一个指针指向当前类,说明什么? * 说明节点已经不再队列中 */ boolean isOffList() { return next == this; } // Unsafe 的相关实现 private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = QNode.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }

transfer 核心方法

基本算法是循环尝试执行以下两个操作之一:

  1. 如果队列明显为空或持有相同模式的节点,请尝试将节点添加到等待队列中,等待被实现(或取消)并返回匹配项。

  2. 如果队列显然包含等待项,并且此调用是互补模式,请尝试通过对等待节点的CAS’ing item字段进行出队并使其出队,然后返回匹配项来实现。

在每种情况下,一路检查并尝试帮助其他停滞/缓慢的线程推进头和尾。

循环以空检查开始,以防止看到未初始化的头或尾值。

这在当前的 SynchronousQueue 中永远不会发生,但是如果调用者持有对传输者的非易失性/最终引用(non-volatile/final),则可能会发生这种情况。

无论如何,这里的检查是因为将空检查放在循环的顶部,通常比隐式散布(implicitly interspersed)检查要快。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/** * 放入或者获取一个元素 * @author 老马啸西风 */ @SuppressWarnings("unchecked") E transfer(E e, boolean timed, long nanos) { QNode s = null; // constructed/reused as needed boolean isData = (e != null); for (;;) { QNode t = tail; QNode h = head; // 元素未初始化完成,自旋。 if (t == null || h == null) continue; // 如果为空,或者持有相同模式的节点 if (h == t || t.isData == isData) { QNode tn = t.next; // 元素被其他线程修改,重来 if (t != tail) continue; // 尝试节点加入到 tail if (tn != null) { advanceTail(t, tn); continue; } // 时间不等人,直接返回 null if (timed && nanos <= 0) return null; // 设置新元素失败,重来 if (s == null) s = new QNode(e, isData); if (!t.casNext(null, s)) continue; // 添加到尾部等待 advanceTail(t, s); // 旋转/阻塞,直到满足节点s为止。 Object x = awaitFulfill(s, e, timed, nanos); // 等待被取消,清空元素,并且返回 null if (x == s) { clean(t, s); return null; } // 元素已经不再队列中了, if (!s.isOffList()) { // not already unlinked // 设置 s 为新的头结点 advanceHead(t, s); if (x != null) // and forget fields s.item = s; s.waiter = null; } return (x != null) ? (E)x : e; } else { // 互补模式 complementary-mode // 这里就是上面说的第二种算法模式 QNode m = h.next; // 已经被其他线程修改,重来 if (t != tail || m == null || h != head) continue; // inconsistent read Object x = m.item; // 元素 CAS 失败,执行出队,进行重试。 if (isData == (x != null) || // m already fulfilled x == m || // m cancelled !m.casItem(x, e)) { // lost CAS advanceHead(h, m); // dequeue and retry continue; } // 成功设置 advanceHead(h, m); // successfully fulfilled LockSupport.unpark(m.waiter); return (x != null) ? (E)x : e; } } }

awaitFulfill 等待直到节点满足条件

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/** * 旋转/阻止,直到满足节点s为止。 * * @param s the waiting node * @param e the comparison value for checking match * @param timed true if timed wait * @param nanos timeout value * @return matched item, or s if cancelled * @author 老马啸西风 */ Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { /* Same idea as TransferStack.awaitFulfill */ // deadline 是第一生产力 // 我们设置一个超时时间,避免一直等待下去。 final long deadline = timed ? System.nanoTime() + nanos : 0L; // 这 3 目运算符用的人脑袋疼。 Thread w = Thread.currentThread(); int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { // 如果当前线程被打断,尝试取消。 if (w.isInterrupted()) s.tryCancel(e); Object x = s.item; // 如果 s.item 与 e 不等,直接返回 x if (x != e) return x; if (timed) { // 计算超时时间,超时之后尝试取消。 nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(e); continue; } } // 更新自旋次数 if (spins > 0) --spins; // 如果 waiter 为 null,设置为当前线程。 else if (s.waiter == null) s.waiter = w; else if (!timed) // 通过 LockSupport 进行 park,区别只是是否有超时时间。 LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }

clean 清空操作

在任何给定时间,列表中的一个节点都不能删除-最后插入的节点。

为了解决这个问题,如果我们不能删除s,我们将其前身保存为 cleanMe,首先删除之前保存的版本。

可以始终删除节点s或先前保存的节点中的至少一个,因此该操作始终终止。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/** * 使用原始的前任pred摆脱已取消的节点s。 * @author 老马啸西风 */ void clean(QNode pred, QNode s) { // 清空 waiter 信息 s.waiter = null; while (pred.next == s) { //如果已取消链接,请提早返回 QNode h = head; QNode hn = h.next; //设置已取消的第一个节点为head if (hn != null && hn.isCancelled()) { advanceHead(h, hn); continue; } // 保证一致性读 QNode t = tail; // Ensure consistent read for tail // 为空,直接返回 if (t == h) return; // 不一致,重试 QNode tn = t.next; if (t != tail) continue; // 尝试设置 tail 信息 if (tn != null) { advanceTail(t, tn); continue; } // 如果 s 节点不是尾巴节点,尝试进行 unsplice if (s != t) { QNode sn = s.next; if (sn == s || pred.casNext(s, sn)) return; } QNode dp = cleanMe; //尝试取消链接先前取消的节点 if (dp != null) { QNode d = dp.next; QNode dn; if (d == null || // d is gone or d == dp || // d is off list or !d.isCancelled() || // d not cancelled or (d != t && // d not tail and (dn = d.next) != null && // has successor dn != d && // that is on list dp.casNext(d, dn))) // d unspliced // 通过 CAS 清空 dp casCleanMe(dp, null); // s 已经保存在信息中,直接返回 if (dp == pred) return; } else if (casCleanMe(null, pred)) return; // Postpone cleaning s } }

基于堆栈

还有基于 stack 的实现。

不过思想都是类似的。

类定义

这扩展了Scherer-Scott双栈算法,除了使用“覆盖”节点而不是位标记的指针以外,其他方式也有所不同:

完成操作会推动标记节点(将FULFILLING位设置为模式)以保留一个点以匹配等待的节点。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/** ** 基于 Stack 的转换实现 ** @author 老马啸西风 */ static final class TransferStack<E> extends Transferer<E> { /* Modes for SNodes, ORed together in node fields */ /** 节点代表未实现的消费者 */ static final int REQUEST = 0; /** 节点代表未完成的生产者 */ static final int DATA = 1; /** 节点正在执行另一个未完成的数据或请求 */ static final int FULFILLING = 2; /** 如果m设置了满足位,则返回true。 */ static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; } /** stack 的头结点 */ volatile SNode head; }

那么这个通过 volatile 修饰的头结点又是什么呢?

节点

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/** * Node class for TransferStacks. * @author 老马啸西风 */ static final class SNode { volatile SNode next; // next node in stack volatile SNode match; // the node matched to this volatile Thread waiter; // to control park/unpark Object item; // data; or null for REQUESTs int mode; //注意:item和mode字段不需要是volatile的,因为它们总是在其他volatile /原子操作之前和之后写入。 SNode(Object item) { this.item = item; } //cas 方法设置 next 节点 boolean casNext(SNode cmp, SNode val) { return cmp == next && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } /** * 尝试将节点s与此节点匹配,如果是,则唤醒线程。 * Fulfiller呼叫tryMatch来识别其 waiter 。 * waiters 封锁直到他们被匹配。 */ boolean tryMatch(SNode s) { if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { Thread w = waiter; // waiters 需要最多一次 unpark if (w != null) { waiter = null; LockSupport.unpark(w); } return true; } return match == s; } /** * 尝试通过将节点与其自身匹配来取消等待。 */ void tryCancel() { UNSAFE.compareAndSwapObject(this, matchOffset, null, this); } boolean isCancelled() { return match == this; } // Unsafe 相关配置信息 private static final sun.misc.Unsafe UNSAFE; private static final long matchOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = SNode.class; matchOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("match")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }

节点相关的几个方法:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
boolean casHead(SNode h, SNode nh) { return h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh); } /** * 创建或重置节点的字段。 * 仅从传输中调用,在该传输中延迟创建要推送到栈上的节点,并在可能时进行重用,以帮助减少读取和头的CASes之间的间隔,并避免当CASes推送到节点由于争用而失败时产生的大量垃圾。 */ static SNode snode(SNode s, Object e, SNode next, int mode) { if (s == null) s = new SNode(e); s.mode = mode; s.next = next; return s; }

转换方法

这个应该和基于队列的类似。

基本算法是循环尝试以下三个动作之一:

  1. 如果显然是空的或已经包含相同模式的节点,请尝试将节点压入堆栈并等待匹配,然后将其返回,如果取消则返回null。

  2. 如果显然包含互补模式的节点,请尝试将满足要求的节点推入堆栈,与相应的等待节点匹配,从堆栈中弹出两者,然后返回匹配的项目。 由于其他线程正在执行操作3,因此实际上可能不需要匹配或取消链接:

  3. 如果堆栈顶部已经包含另一个充实的节点,请通过执行其匹配和/或弹出操作来对其进行帮助,然后继续。

帮助代码与实现代码基本相同,不同之处在于它不返回项目。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/** * Puts or takes an item. */ @SuppressWarnings("unchecked") E transfer(E e, boolean timed, long nanos) { SNode s = null; // constructed/reused as needed int mode = (e == null) ? REQUEST : DATA; for (;;) { SNode h = head; if (h == null || h.mode == mode) { // empty or same-mode if (timed && nanos <= 0) { // can't wait if (h != null && h.isCancelled()) casHead(h, h.next); // pop cancelled node else return null; } else if (casHead(h, s = snode(s, e, h, mode))) { SNode m = awaitFulfill(s, timed, nanos); if (m == s) { // wait was cancelled clean(s); return null; } if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller return (E) ((mode == REQUEST) ? m.item : s.item); } } else if (!isFulfilling(h.mode)) { // try to fulfill if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { for (;;) { // loop until matched or waiters disappear SNode m = s.next; // m is s's match if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop } SNode mn = m.next; if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m return (E) ((mode == REQUEST) ? m.item : s.item); } else // lost match s.casNext(m, mn); // help unlink } } } else { // help a fulfiller SNode m = h.next; // m is h's match if (m == null) // waiter is gone casHead(h, null); // pop fulfilling node else { SNode mn = m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else // lost match h.casNext(m, mn); // help unlink } } } }

等待方法

当节点/线程将要阻塞时,它将设置其 waiter 字段,然后在实际停车之前至少再检查一次状态,从而涵盖了竞争者与实现者的关系,并注意到 waiter 为非空,因此应将其唤醒。

当由出现在调用点位于堆栈顶部的节点调用时,对停放的调用之前会进行旋转,以避免在生产者和消费者及时到达时阻塞。

这可能足以只在多处理器上发生。

从主循环返回的检查顺序反映了这样一个事实,即中断的优先级高于正常的返回,而正常的返回优先于超时。 (因此,在超时时,在放弃之前要进行最后一次匹配检查)。

除了来自非定时SynchronousQueue的调用。{poll / offer}不会检查中断,也根本不需要等待,因此被困在传输方法中,而不是调用awaitFulfill。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/** * 旋转/阻止,直到节点s通过执行操作匹配。 * * @author 老马啸西风 */ SNode awaitFulfill(SNode s, boolean timed, long nanos) { // 设置超时时间 final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); // 是否需要等待? int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { // 被中断,则尝试取消 if (w.isInterrupted()) s.tryCancel(); // 如果不等,直接返回 SNode m = s.match; if (m != null) return m; // 指定超时时间,超时之后,尝试取消,重来 if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(); continue; } } // 自旋次数降低 if (spins > 0) spins = shouldSpin(s) ? (spins-1) : 0; // 更新 waiter 信息 else if (s.waiter == null) s.waiter = w; // establish waiter so can park next iter else if (!timed) // 通过 LockSupport 实现。 LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }

看的出来,这个和基于 Queue 的基本没什么区别。

只不过算法笔记中作者也说了,为了让二者便于后期的发展,就将其分开实现。

这个方法和 queue 中不同的是有一个是否需要自旋的方法:

  [java]
1
2
3
4
5
6
7
/** * 如果节点s在头或有一个活跃的履行者,则返回true。 */ boolean shouldSpin(SNode s) { SNode h = head; return (h == s || h == null || isFulfilling(h.mode)); }

清空元素

最糟糕的是,我们可能需要遍历整个堆栈以取消s的链接。

如果有多个并发调用要清理,则如果另一个线程已将其删除,则可能看不到。

但是当看到任何已知跟随s的节点时,我们可以停止。

除非也将其取消,否则我们将使用s.next,在这种情况下,我们将尝试过去一个节点。

我们不做进一步检查,因为我们不想为了找到标记而进行双重遍历

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/** * 从堆栈取消链接 s。 * * @author 老马啸西风 */ void clean(SNode s) { s.item = null; // forget item s.waiter = null; // forget thread SNode past = s.next; if (past != null && past.isCancelled()) past = past.next; // Absorb cancelled nodes at head SNode p; while ((p = head) != null && p != past && p.isCancelled()) casHead(p, p.next); // Unsplice embedded nodes while (p != null && p != past) { SNode n = p.next; if (n != null && n.isCancelled()) p.casNext(n, n.next); else p = n; } }

小结

阻塞队列大家族的成员共计 5 位,SynchronousQueue 是其中非常优秀的一份子。

工作学习中希望可以活学活用,提升工作效率,写出更加优异的代码。

希望本文对你有帮助,如果有其他想法的话,也可以评论区和大家分享哦。

各位极客的点赞收藏转发,是老马持续写作的最大动力!

参考资料

jdk 源码

DelayQueue 的使用

Java延时队列DelayQueue的使用

SynchronousQueue 的使用