Netty-27-Netty 的 HashedWheelTimer 时间轮
任务调度
前段时间在给自己的玩具项目设计的时候就遇到了一个场景需要定时任务,于是就趁机了解了目前主流的一些定时任务方案,比如下面这些:
Timer(halo 博客源码中用到了)
ScheduledExecutorService
ThreadPoolTaskScheduler(基于 ScheduledExecutorService)
Netty 的 schedule(用到了 PriorityQueue)
Netty 的 HashedWheelTimer(时间轮)
Kafka 的 TimingWheel(层级时间轮)
还有一些分布式的定时任务:
Quartz
xxl-job
...
因为我玩具项目实现业务 ACK 的方案就打算用 HashedWheelTimer,所以本节核心是分析 HashedWheelTimer,另外会提下它与 schedule 的区别,其它定时任务实现原理就请自动 Google 吧。
Netty Version:4.1.42
HashedWheelTimer 实现图示

大致有个理解就行,关于蓝色格子中的数字,其实就是剩余时钟轮数,这里听不懂也没关系,等后面看到源码解释就懂了~~(大概)~~。
HashedWheelTimer 简单使用例子
这里顺便列出 schedule 的使用方式,下面是某个 Handler 中的代码:
@Override
public void handlerAdded(final ChannelHandlerContext ctx) {
// 定时任务
ScheduledFuture hello_world = ctx.executor().schedule(() -> {
ctx.channel().write("hello world");
}, 3, TimeUnit.SECONDS);
// 构造一个 Timer 实例,同样只执行一次
Timer timer = new HashedWheelTimer();
Timeout timeout1 = timer.newTimeout(timeout -> System.out.println("5s 后执行该任务"), 5, TimeUnit.SECONDS);
// 取消任务
timeout1.cancel();
}
HashedWheelTimer 源码
接口定义
这个 Timer 接口是 netty 自定义的接口。
/**
* Schedules {@link TimerTask}s for one-time future execution in a background
* thread.
*/
public interface Timer {
/**
* Schedules the specified {@link TimerTask} for one-time execution after
* the specified delay.
* 新增任务
*/
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
/**
* Releases all resources acquired by this {@link Timer} and cancels all
* tasks which were scheduled but not executed yet.
* 任务取消
*/
Set stop();
}
内部变量
public class HashedWheelTimer implements Timer {
static final InternalLogger logger =
InternalLoggerFactory.getInstance(HashedWheelTimer.class);
private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
private static final int INSTANCE_COUNT_LIMIT = 64;
private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
private static final ResourceLeakDetector leakDetector = ResourceLeakDetectorFactory.instance()
.newResourceLeakDetector(HashedWheelTimer.class, 1);
private static final AtomicIntegerFieldUpdater WORKER_STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
private final ResourceLeakTracker leak;
private final Worker worker = new Worker();
private final Thread workerThread;
public static final int WORKER_STATE_INIT = 0;
public static final int WORKER_STATE_STARTED = 1;
public static final int WORKER_STATE_SHUTDOWN = 2;
@SuppressWarnings({ "unused", "FieldMayBeFinal" })
private volatile int workerState; // 0 - init, 1 - started, 2 - shut down
private final long tickDuration;
private final HashedWheelBucket[] wheel;
private final int mask;
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
private final Queue timeouts = PlatformDependent.newMpscQueue();
private final Queue cancelledTimeouts = PlatformDependent.newMpscQueue();
private final AtomicLong pendingTimeouts = new AtomicLong(0);
private final long maxPendingTimeouts;
private volatile long startTime;
}
初始化如下:
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxPendingTimeouts) {
// 参数校验
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (tickDuration = Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}
添加定时任务
添加定时任务其实就是 Timer 接口的 newTimeOut 方法: io.netty.util.HashedWheelTimer#newTimeout
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}
// 获取当前等待任务数
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
// 如果超出最大等待
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
// 尝试启动workerThread,startTime=0时会startTimeInitialized.await(),最终就是调用Worker的run方法
start();
// Add the timeout to the timeout queue which will be processed on the next tick.
// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
// 这条算式我们可以稍微改下,更容易理解些:
// long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// ↓
// long deadline = unit.toNanos(delay) - (System.nanoTime() - startTime)
// 我感觉这样更容易理解些,含义为: 距离任务执行的剩余时间 = 任务截止时间 - (当前时间 - 任务对象初始化时间)
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// Guard against overflow.
if (delay > 0 && deadline unprocessedTimeouts = new HashSet();
// 时钟指针转动的次数
private long tick;
@Override
public void run() {
// Initialize the startTime.
startTime = System.nanoTime();
if (startTime == 0) {
// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
startTime = 1;
}
// Notify the other threads waiting for the initialization at start().
// 之前如果startTime=0,就会进入await状态,这里就要唤醒它
startTimeInitialized.countDown();
do {
/*
* 等待到下一次 tick 时如果没有时间延迟返回tickDuration * (tick + 1);
* 如果延迟了则不空转,立马返回“当前时间”
* 这个“当前时间”是什么呢?比如时钟指针原本第三次 tick 是在300ms,但是由于前面的任务阻塞了50ms,导致进来的时候已经是350ms了
* 那么这里的返回值就会变成350ms,至于返回值变成350ms会怎么样?貌似也没有怎么样,就是不等待马上执行罢了
*/
final long deadline = waitForNextTick();
if (deadline > 0) {
// 与运算取模,取出数组桶的坐标,相信这个没少见过了
int idx = (int) (tick & mask);
// 前面说过HashedWheelTimeout是可以取消任务的,其实就是在这里取消的
processCancelledTasks();
// 在时间轮中取出“指针指向的块”
HashedWheelBucket bucket = wheel[idx];
// 将任务填充到时间块中
transferTimeoutsToBuckets();
// 取出任务并执行
bucket.expireTimeouts(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
// Fill the unprocessedTimeouts so we can return them from stop() method.
for (HashedWheelBucket bucket: wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
// 处理取消的任务
processCancelledTasks();
}
- 取消任务的逻辑这里就不展开看了,也比较简单,有兴趣自行补充即可。
实现如下:
private void processCancelledTasks() {
for (;;) {
HashedWheelTimeout timeout = cancelledTimeouts.poll();
if (timeout == null) {
// all processed
break;
}
try {
timeout.remove();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown while process a cancellation task", t);
}
}
}
}
看看上面的 transferTimeoutsToBuckets 方法,如果你看不懂上面图中蓝色格子数字是什么意思,那就认真看看这个方法:
io.netty.util.HashedWheelTimer.Worker#transferTimeoutsToBuckets
private void transferTimeoutsToBuckets() {
// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
// adds new timeouts in a loop.
for (int i = 0; i deadline (%d)", timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
// 如果任务被取消了
next = remove(timeout);
} else {
// 如果任务没被取消,而且剩余轮数>0,则扣除轮数,让任务继续等到至后面轮数
timeout.remainingRounds --;
}
timeout = next;
}
}
Netty EventLoop#run
记录了NioEventLoop启动前做的一些事情,并最终找到一个方法run,如果不记得可以回上一节看看,因为这个run方法是本篇以及相关章节的入口。
...
// 检查I/O事件
select(wakenUp.getAndSet(false));
...
// 处理上面select查到的I/O事件
processSelectedKeys();
...
// 运行上面处理的事件集
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
...
本节要追踪的就是select方法,除了正常逻辑以外,还可以看到Netty是如何解决jdk空轮询bug。
一些我的口头名词解释:select操作和检测I/O事件是同一个意思。
一些没讲过但是要先知道的前提
Netty中除了普通的任务之外,还会有一些定时任务,而这些定时任务,在执行之前实际上是存储在一个定时任务队列中,这个队列里的元素是按照截止时间排序的(本节会讲到这一点)。
另外,在本节之前的篇章中提到的任务队列都不是定时任务队列,在这一节会简单看一下这个任务队列。
开始追踪select方法
首先将视角切回io.netty.channel.nio.NioEventLoop#run,这是起点,如果忘记怎么过来的请看回顾,下面直接开始。
首先找到这一段代码:
select(wakenUp.getAndSet(false));
ps: 发现 netty 不同版本之间差异比较大,我看目前的版本已经找不到这一行代码了。
含义:执行select方法,并在执行之前,都标识一个状态,表示当前要进行select操作且处于未唤醒状态。
进入select方法,由于代码很长,下面分段贴,此处【坐标1】: io.netty.channel.nio.NioEventLoop#select
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis =0.5ms,才能继续往下走,否则表示即将有定时任务要执行,之后会调用一个非阻塞的selectNow()
关于上面提到的非阻塞,其实看完源码后仍然我不太理解,因为我追进源码最终还是看到synchronized,也许是指这里的synchronized并未上升到重量级锁。
这里再贴下文档原话,我英语再塑料,也不至于翻译错非阻塞吧:This method performs a non-blocking selection operation。
ps: 这里的非阻塞应该指的是IO模型的非阻塞。synchronized 值得是否为排他锁,是完全的 2 个概念。
这里先不往下看,进入delayNanos方法看看: io.netty.util.concurrent.SingleThreadEventExecutor#delayNanos
```java
protected long delayNanos(long currentTimeNanos) {
ScheduledFutureTask scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
return SCHEDULE_PURGE_INTERVAL;
}
return scheduledTask.delayNanos(currentTimeNanos);
}
该方法返回的是当前时间距离最近一个定时任务开始的所剩时间。
peekScheduledTask()就是返回一个最近的定时任务。
再看看ScheduledFutureTask这个类,找到它的compareTo方法: io.netty.util.concurrent.ScheduledFutureTask#compareTo
@Override
public int compareTo(Delayed o) {
if (this == o) {
return 0;
}
ScheduledFutureTask that = (ScheduledFutureTask) o;
long d = deadlineNanos() - that.deadlineNanos();
if (d 0) {
return 1;
} else if (id = currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}//for循环结束
time是执行完select后的时间,而currentTimeNanos则是select之前的时间。
time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos这段代码含义:
select后时间(纳秒)-换算成纳秒的timeoutMillis>=select开始前时间(纳秒),则说明已经执行过一次阻塞式select了,计数器=1,这里没有break但之后有, 因为下次再进入for是时间值和计数器都”不符合if”,最终break。
如果执行代码时,到达了上面if else的前一行,但却没有进入if或else if,就说明发生了空轮询。
只是空轮询次数低于SELECTOR_AUTO_REBUILD_THRESHOLD(默认512)时在不断重试。
解决空轮询bug
再分析一下Netty是如何判断空轮询的:
其实就是将上面if的时间公式反过来想:select操作后时间-timeoutMillis task = (NioTask) a;
invokeChannelUnregistered(task, key, e);
}
}
}
} catch (ConcurrentModificationException e) {
// Probably due to concurrent modification of the key set.
continue;
}
break;
}
selector = newSelector;
try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
Netty解决空轮询bug的手法看上去也很"暴力",就是重建一个新的selector,并把旧selector上的selectedKeys全部复制到新的selector上,再用新的selector替换旧的selector。
之后再尝试select操作就很可能不会再发生空轮询bug了。
# 和 schedule 对比
关于 schedule 方法加入的定时任务什么时候被执行,在时间操作上和 HashedWheelTimer 大同小异。
schedule 方法也是 Netty 的定时任务实现之一,但是底层的数据结构和 HashedWheelTimer 不一样,schedule 方法用到的数据结构其实和 ScheduledExecutorService 类似,是 PriorityQueue,它是一个优先级的队列。
除此之外,schedule 方法其实也用到 MpscQueue,只是任务执行的时候,会把任务从 PriorityQueue 转移到 MpscQueue 上。
下面来跟踪下 schedule 方法看看,由于主要是看数据结构的区别,所以一些地方在这里我就不深追了
首先来到如下代码:
`io.netty.util.concurrent.AbstractScheduledEventExecutor#schedule(java.lang.Runnable, long, java.util.concurrent.TimeUnit)`
```java
@Override
public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) {
ObjectUtil.checkNotNull(callable, "callable");
ObjectUtil.checkNotNull(unit, "unit");
if (delay (this, callable, deadlineNanos(unit.toNanos(delay))));
}
其中 schedule 实现如下:
private ScheduledFuture schedule(final ScheduledFutureTask task) {
if (inEventLoop()) {
scheduleFromEventLoop(task);
} else {
final long deadlineNanos = task.deadlineNanos();
// task will add itself to scheduled task queue when run if not expired
if (beforeScheduledTaskSubmitted(deadlineNanos)) {
execute(task);
} else {
lazyExecute(task);
// Second hook after scheduling to facilitate race-avoidance
if (afterScheduledTaskSubmitted(deadlineNanos)) {
execute(WAKEUP_TASK);
}
}
}
return task;
}
继续跟进 scheduledTaskQueue()方法:
PriorityQueue> scheduledTaskQueue() {
if (scheduledTaskQueue == null) {
scheduledTaskQueue = new DefaultPriorityQueue>(
SCHEDULED_FUTURE_TASK_COMPARATOR,
// Use same initial capacity as java.util.PriorityQueue
11);
}
return scheduledTaskQueue;
}
可以看到返回值就是 PriorityQueue,它是一个最小堆实现的优先队列。
扩展
不同实现的时间复杂度
这里我就直接贴下网上大佬给出的解释:
(1)如果使用最小堆实现的优先级队列:

大致意思就是你的任务如果插入到堆顶,时间复杂度为 O(log(n))。
(2)如果使用链表(既然有说道,那就扩展下):

中间插入后的事件复杂度为 O(n)
(3)单个时间轮:

复杂度可以降至 O(1)。
记录轮数的时间轮(其实就是文章开头的那个):

层级时间轮:

时间复杂度是 O(n),n 是轮子的数量,除此之外还要计算一个轮子上的 bucket。
单时间轮缺点
根据上面的图其实不难理解,如果任务是很久之后才执行的、同时要保证任务低延迟,那么单个时间轮所需的 bucket 数就会变得非常多,从而导致内存占用持续升高(CPU 空转时间还是不变的,仅仅是内存需求变高了),如下图:

Netty 对于单个时间轮的优化方式就是记录下 remainingRounds,从而减少 bucket 过多的内存占用。
时间轮和 PriorityQueue 对比
看完上面的时间复杂度对比,你可能会觉得:
Q:时间轮的复杂度只有 O(1),schedule 和 ScheduledExecutorService 这种都是 O(log(n)),那时间轮不是碾压吗?
A:你不要忘了,如果任务是在很久之后才执行的,那么时间轮就会产生很多空转,这是非常浪费 CPU 性能的,这种空转消耗可以通过增大 tickDuration 来避免,但这样做又会产生降低定时任务的精度,可能导致一些任务推到很迟才执行。
A:而 ScheduledExecutorService 不会有这个问题。
另外,Netty 时间轮的实现模型抽象出来是大概这个样子的:
for(Tasks task : tasks) {
task.doXxx();
}
这个抽象是个什么意思呢?
你要注意一个点,这里的任务循环执行是同步的,这意味着你第一个任务执行很慢延迟很高,那么后面的任务全都会被堵住,所以你加进时间轮的任务不可以是耗时任务,比如一些延迟很高的数据库查询,如果有这种耗时任务,最好再嵌入线程池处理,不要让任务阻塞在这一层。