DelayQueue

简介

延迟元素的无限制BlockingQueu,其中元素只能在其延迟到期后才能获取。

当元素的getDelay(TimeUnit.NANOSECONDS)方法返回小于或等于零的值时,就会发生过期。

即使未到期的元素无法使用take或poll删除,它们也被视为普通元素。

此队列不允许 null 元素。

思考题

为什么不允许有 null 元素?

其实和其他几篇中类似,这里读者可以阅读下后面的源码解读。

希望读完之后,自己可以得到答案。

方法说明

方法 抛出异常 返回值 一直阻塞 超时退出
插入方法 add offer put offer(time)
移除方法 remove poll take poll(time)
检查方法 element peek - -

入门例子

DelayQueue 非常适合指定时间之后,才能让消费者获取到的场景。

延迟对象定义

需要继承自 Delay 接口

private static class DelayElem implements Delayed {
    /**
     * 延迟时间
     */
    private final long delay;
    /**
     * 到期时间
     */
    private final long expire;
    /**
     * 数据
     */
    private final String msg;

    private DelayElem(long delay, String msg) {
        this.delay = delay;
        this.msg = msg;
        //到期时间 = 当前时间+延迟时间
        this.expire = System.currentTimeMillis() + this.delay;
    }
    /**
     * 需要实现的接口,获得延迟时间
     *
     * 用过期时间-当前时间
     * @param unit 时间单位
     * @return 延迟时间
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expire - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
    }
    /**
     * 用于延迟队列内部比较排序
     * <p>
     * 当前时间的延迟时间 - 比较对象的延迟时间
     *
     * @param o 比较对象
     * @return 结果
     */
    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }
    @Override
    public String toString() {
        return "DelayElem{" +
                "delay=" + delay +
                ", expire=" + expire +
                ", msg='" + msg + '\'' +
                '}';
    }
}

写入线程

我们模拟定义一个写入线程。

100ms 执行一次,需要放入 1s 之后才能被获取到。

/**
 * 写入线程
 * @author 老马啸西风
 */
private static class WriteThread extends Thread {
    private final DelayQueue<DelayElem> delayQueue;
    private WriteThread(DelayQueue<DelayElem> delayQueue) {
        this.delayQueue = delayQueue;
    }
    @Override
    public void run() {
        for(int i = 0; i < 3; i++) {
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            DelayElem element = new DelayElem(1000,i+"test");
            delayQueue.offer(element);
            System.out.println(System.currentTimeMillis() + " 放入元素 " + i);
        }
    }
}

读取线程

读者直接使用一个循环等待,并且输出获取到信息的时间。

/**
 * 读取线程
 * @author 老马啸西风
 */
private static class ReadThread extends Thread {
    private final DelayQueue<DelayElem> delayQueue;
    private ReadThread(DelayQueue<DelayElem> delayQueue) {
        this.delayQueue = delayQueue;
    }
    @Override
    public void run() {
        while (true){
            try {
                DelayElem element =  delayQueue.take();
                System.out.println(System.currentTimeMillis() +" 获取元素:" + element);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

测试

public static void main(String[] args) throws InterruptedException {
    DelayQueue<DelayElem> delayQueue = new DelayQueue<>();
    new WriteThread(delayQueue).start();
    new ReadThread(delayQueue).start();
}

输出日志:

1604067494687 放入元素 0
1604067494787 放入元素 1
1604067494887 放入元素 2
1604067495687 获取元素:DelayElem{delay=1000, expire=1604067495686, msg='0test'}
1604067495788 获取元素:DelayElem{delay=1000, expire=1604067495787, msg='1test'}
1604067495888 获取元素:DelayElem{delay=1000, expire=1604067495887, msg='2test'}

可以看到我们的元素都是间隔 100ms 放入队列。

获取元素都是等待了对应的 1S。

源码解析

知其然,知其所以然。

下面让我们一起学习下 DelayQueue 的源码实现。

类定义

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

}

延迟队列继承自 AbstractQueue 类,并且实现了 BlockingQueue 接口。

元素必须实现自 Delayed 接口。

public interface Delayed extends Comparable<Delayed> {

    /**
     * 返回对象需要等待多久
     */
    long getDelay(TimeUnit unit);
}

基本属性

/**
** 可重入读写锁
*/
private final transient ReentrantLock lock = new ReentrantLock();

/**
** 可重入读写锁对应的条件信息
*/
private final Condition available = lock.newCondition();

/**
** 优先级队列
*/
private final PriorityQueue<E> q = new PriorityQueue<E>();


/**
** 看注释这里使用了 leader-follower 的模式
*/
private Thread leader = null;

构造器

/**
 * 平淡无奇的构造器
 * @author 老马啸西风
 */
public DelayQueue() {}
/**
 * 根据集合初始化当前队列
 */
public DelayQueue(Collection<? extends E> c) {
    this.addAll(c);
}

addAll()

我们一起来看一下 addAll 方法:

public boolean addAll(Collection<? extends E> c) {
    if (c == null)
        throw new NullPointerException();
    if (c == this)
        throw new IllegalArgumentException();
    boolean modified = false;
    for (E e : c)
        if (add(e))
            modified = true;
    return modified;
}

这个方法实际非常简单,是父类 AbstractQueue 中的默认方法。

循环集合,将元素添加到队列中。然后用一个 modified 标记元素是否发生了改变。

添加元素方法

public boolean add(E e) {
    return offer(e);
}

public void put(E e) {
    offer(e);
}

可见这两个方法都是调用的 offer 方法,实现如下:

public boolean offer(E e) {
    // 获取可重入锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 优先级队列入队
        q.offer(e);

        // 如果队列的第一个元素是插入的元素(插入成功),则设置 leader=null,并且通知等待锁的线程。
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

移除元素

remove

/**
 * Removes a single instance of the specified element from this
 * queue, if it is present, whether or not it has expired.
 */
public boolean remove(Object o) {
    // 首先获取锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 调用优先级队列的元素移除
        return q.remove(o);
    } finally {
        lock.unlock();
    }
}

poll

/**
 *
 * 指定时间内移除元素
 *
 * @author 老马啸西风
 */
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 时间转换
    long nanos = unit.toNanos(timeout);

    // 获取锁
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // while(true) 循环
        for (;;) {
            // 获取第一个元素
            E first = q.peek();
            if (first == null) {
                // 如果第一个元素不存在,且无需等待,直接返回 null。
                if (nanos <= 0)
                    return null;
                else
                    // 进入等待
                    nanos = available.awaitNanos(nanos);
            } else {
                // 获取第一个元素的延迟时间
                long delay = first.getDelay(NANOSECONDS);
                
                // 如果时间已经到了,直接返回元素。
                if (delay <= 0)
                    return q.poll();

                // 如果等待时间为小于等于0,直接返回 null。
                //? 这个感觉逻辑怪怪的。      
                if (nanos <= 0)
                    return null;

                first = null; // don't retain ref while waiting

                // 如果获取等待的时间小于元素延迟的时间或者有其他线程在处理中,进入等待。
                if (nanos < delay || leader != null)
                    nanos = available.awaitNanos(nanos);
                else {
                    // 设置 leader 为当前线程
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 获取需要等待的时间
                        long timeLeft = available.awaitNanos(delay);
                        nanos -= delay - timeLeft;
                    } finally {
                        // 释放当前 Leader 的信息
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 最后执行锁的释放,并且唤醒等待的线程。
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

take 方法

/**
 * Retrieves and removes the head of this queue, waiting if necessary
 * until an element with an expired delay is available on this queue.
 *
 * @return the head of this queue
 * @throws InterruptedException {@inheritDoc}
 * @author 老马啸西风
 */
public E take() throws InterruptedException {
    // 获取锁
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            // 如果第一个元素为 null,则进入等待
            if (first == null)
                available.await();
            else {
                // 获取第一个元素的等待时间
                long delay = first.getDelay(NANOSECONDS);
                // 等待时间到了,直接执行 poll 返回元素。
                if (delay <= 0)
                    return q.poll();
                first = null; // don't retain ref while waiting

                // 当前有其他线程再处理,则进入等待。
                if (leader != null)
                    available.await();
                else {
                    // 设置 leader 为当前线程
                    // 个人理解:通过 leader 控制并发,不过这个 Thread 变量可以保证多线程间可见性吗?
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 执行等待
                        available.awaitNanos(delay);
                    } finally {
                        // 释放 leader 信息
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 释放锁并且通知其他等待线程
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

小结

DelayQueue 我也一直听闻很久,不过平时没有自己使用过。现在发现 DelayQueue 执行定时延期执行,还是非常好用的。

本文从 DelayQeueu 的入门使用开始,逐步深入介绍了源码实现原理。

不知道文章开头的思考题你有自己的答案了吗?

希望本文对你有帮助,如果有其他想法的话,也可以评论区和大家分享哦。

各位极客的点赞收藏转发,是老马持续写作的最大动力!

参考资料

jdk 源码

DelayQueue 的使用

Java延时队列DelayQueue的使用