DelayQueue

简介

延迟元素的无限制BlockingQueu,其中元素只能在其延迟到期后才能获取。

当元素的getDelay(TimeUnit.NANOSECONDS)方法返回小于或等于零的值时,就会发生过期。

即使未到期的元素无法使用take或poll删除,它们也被视为普通元素。

此队列不允许 null 元素。

思考题

为什么不允许有 null 元素?

其实和其他几篇中类似,这里读者可以阅读下后面的源码解读。

希望读完之后,自己可以得到答案。

方法说明

方法 抛出异常 返回值 一直阻塞 超时退出
插入方法 add offer put offer(time)
移除方法 remove poll take poll(time)
检查方法 element peek - -

入门例子

DelayQueue 非常适合指定时间之后,才能让消费者获取到的场景。

延迟对象定义

需要继承自 Delay 接口

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
private static class DelayElem implements Delayed { /** * 延迟时间 */ private final long delay; /** * 到期时间 */ private final long expire; /** * 数据 */ private final String msg; private DelayElem(long delay, String msg) { this.delay = delay; this.msg = msg; //到期时间 = 当前时间+延迟时间 this.expire = System.currentTimeMillis() + this.delay; } /** * 需要实现的接口,获得延迟时间 * * 用过期时间-当前时间 * @param unit 时间单位 * @return 延迟时间 */ @Override public long getDelay(TimeUnit unit) { return unit.convert(this.expire - System.currentTimeMillis() , TimeUnit.MILLISECONDS); } /** * 用于延迟队列内部比较排序 * <p> * 当前时间的延迟时间 - 比较对象的延迟时间 * * @param o 比较对象 * @return 结果 */ @Override public int compareTo(Delayed o) { return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } @Override public String toString() { return "DelayElem{" + "delay=" + delay + ", expire=" + expire + ", msg='" + msg + '\'' + '}'; } }

写入线程

我们模拟定义一个写入线程。

100ms 执行一次,需要放入 1s 之后才能被获取到。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/** * 写入线程 * @author 老马啸西风 */ private static class WriteThread extends Thread { private final DelayQueue<DelayElem> delayQueue; private WriteThread(DelayQueue<DelayElem> delayQueue) { this.delayQueue = delayQueue; } @Override public void run() { for(int i = 0; i < 3; i++) { try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } DelayElem element = new DelayElem(1000,i+"test"); delayQueue.offer(element); System.out.println(System.currentTimeMillis() + " 放入元素 " + i); } } }

读取线程

读者直接使用一个循环等待,并且输出获取到信息的时间。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/** * 读取线程 * @author 老马啸西风 */ private static class ReadThread extends Thread { private final DelayQueue<DelayElem> delayQueue; private ReadThread(DelayQueue<DelayElem> delayQueue) { this.delayQueue = delayQueue; } @Override public void run() { while (true){ try { DelayElem element = delayQueue.take(); System.out.println(System.currentTimeMillis() +" 获取元素:" + element); } catch (InterruptedException e) { e.printStackTrace(); } } } }

测试

  [java]
1
2
3
4
5
public static void main(String[] args) throws InterruptedException { DelayQueue<DelayElem> delayQueue = new DelayQueue<>(); new WriteThread(delayQueue).start(); new ReadThread(delayQueue).start(); }

输出日志:

  [plaintext]
1
2
3
4
5
6
1604067494687 放入元素 0 1604067494787 放入元素 1 1604067494887 放入元素 2 1604067495687 获取元素:DelayElem{delay=1000, expire=1604067495686, msg='0test'} 1604067495788 获取元素:DelayElem{delay=1000, expire=1604067495787, msg='1test'} 1604067495888 获取元素:DelayElem{delay=1000, expire=1604067495887, msg='2test'}

可以看到我们的元素都是间隔 100ms 放入队列。

获取元素都是等待了对应的 1S。

源码解析

知其然,知其所以然。

下面让我们一起学习下 DelayQueue 的源码实现。

类定义

  [java]
1
2
3
4
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { }

延迟队列继承自 AbstractQueue 类,并且实现了 BlockingQueue 接口。

元素必须实现自 Delayed 接口。

  [java]
1
2
3
4
5
6
7
public interface Delayed extends Comparable<Delayed> { /** * 返回对象需要等待多久 */ long getDelay(TimeUnit unit); }

基本属性

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/** ** 可重入读写锁 */ private final transient ReentrantLock lock = new ReentrantLock(); /** ** 可重入读写锁对应的条件信息 */ private final Condition available = lock.newCondition(); /** ** 优先级队列 */ private final PriorityQueue<E> q = new PriorityQueue<E>(); /** ** 看注释这里使用了 leader-follower 的模式 */ private Thread leader = null;

构造器

  [java]
1
2
3
4
5
6
7
8
9
10
11
/** * 平淡无奇的构造器 * @author 老马啸西风 */ public DelayQueue() {} /** * 根据集合初始化当前队列 */ public DelayQueue(Collection<? extends E> c) { this.addAll(c); }

addAll()

我们一起来看一下 addAll 方法:

  [java]
1
2
3
4
5
6
7
8
9
10
11
public boolean addAll(Collection<? extends E> c) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); boolean modified = false; for (E e : c) if (add(e)) modified = true; return modified; }

这个方法实际非常简单,是父类 AbstractQueue 中的默认方法。

循环集合,将元素添加到队列中。然后用一个 modified 标记元素是否发生了改变。

添加元素方法

  [java]
1
2
3
4
5
6
7
public boolean add(E e) { return offer(e); } public void put(E e) { offer(e); }

可见这两个方法都是调用的 offer 方法,实现如下:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public boolean offer(E e) { // 获取可重入锁 final ReentrantLock lock = this.lock; lock.lock(); try { // 优先级队列入队 q.offer(e); // 如果队列的第一个元素是插入的元素(插入成功),则设置 leader=null,并且通知等待锁的线程。 if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }

移除元素

remove

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/** * Removes a single instance of the specified element from this * queue, if it is present, whether or not it has expired. */ public boolean remove(Object o) { // 首先获取锁 final ReentrantLock lock = this.lock; lock.lock(); try { // 调用优先级队列的元素移除 return q.remove(o); } finally { lock.unlock(); } }

poll

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/** * * 指定时间内移除元素 * * @author 老马啸西风 */ public E poll(long timeout, TimeUnit unit) throws InterruptedException { // 时间转换 long nanos = unit.toNanos(timeout); // 获取锁 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // while(true) 循环 for (;;) { // 获取第一个元素 E first = q.peek(); if (first == null) { // 如果第一个元素不存在,且无需等待,直接返回 null。 if (nanos <= 0) return null; else // 进入等待 nanos = available.awaitNanos(nanos); } else { // 获取第一个元素的延迟时间 long delay = first.getDelay(NANOSECONDS); // 如果时间已经到了,直接返回元素。 if (delay <= 0) return q.poll(); // 如果等待时间为小于等于0,直接返回 null。 //? 这个感觉逻辑怪怪的。 if (nanos <= 0) return null; first = null; // don't retain ref while waiting // 如果获取等待的时间小于元素延迟的时间或者有其他线程在处理中,进入等待。 if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { // 设置 leader 为当前线程 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 获取需要等待的时间 long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { // 释放当前 Leader 的信息 if (leader == thisThread) leader = null; } } } } } finally { // 最后执行锁的释放,并且唤醒等待的线程。 if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }

take 方法

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue. * * @return the head of this queue * @throws InterruptedException {@inheritDoc} * @author 老马啸西风 */ public E take() throws InterruptedException { // 获取锁 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); // 如果第一个元素为 null,则进入等待 if (first == null) available.await(); else { // 获取第一个元素的等待时间 long delay = first.getDelay(NANOSECONDS); // 等待时间到了,直接执行 poll 返回元素。 if (delay <= 0) return q.poll(); first = null; // don't retain ref while waiting // 当前有其他线程再处理,则进入等待。 if (leader != null) available.await(); else { // 设置 leader 为当前线程 // 个人理解:通过 leader 控制并发,不过这个 Thread 变量可以保证多线程间可见性吗? Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 执行等待 available.awaitNanos(delay); } finally { // 释放 leader 信息 if (leader == thisThread) leader = null; } } } } } finally { // 释放锁并且通知其他等待线程 if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }

小结

DelayQueue 我也一直听闻很久,不过平时没有自己使用过。现在发现 DelayQueue 执行定时延期执行,还是非常好用的。

本文从 DelayQeueu 的入门使用开始,逐步深入介绍了源码实现原理。

不知道文章开头的思考题你有自己的答案了吗?

希望本文对你有帮助,如果有其他想法的话,也可以评论区和大家分享哦。

各位极客的点赞收藏转发,是老马持续写作的最大动力!

参考资料

jdk 源码

DelayQueue 的使用

Java延时队列DelayQueue的使用