需求

一个分布式调度系统在如今是非常常见的。

本来希望直接使用已有的一些分布式调度平台,但是又觉得太重。

于是,最后还是决定先自己实现。

如何设计配置的通知机制呢?

如果你把调度系统内嵌到业务系统,那么甚至可以感知到调度配置的变化,这样可能更加简单。

不过一般都是分开的。

1) 如果想设计成为推送的模式,那么就要引入 mq 或者 tcp 等通知机制。

2) 那么,能不能简单点呢?

最简单的就是定时拉取配置的方式,这种方式缺点是会存在一定时间的延迟,不过一般的系统都是支持的。

如何控制任务只被调度一次?

肯定要引入分布式锁。

如果系统有 redis,可以基于 redis 实现分布式锁。

分布式锁

这个话题以前讨论过,感兴趣的可以移步到

分布式锁-01-基于 Zookeeper 实现分布式锁

分布式锁-02-SQL 数据库实现分布式锁

分布式锁-03-基于 mysql 实现分布式锁

和 chatgpt 聊了一会儿分布式锁 redis/zookeeper distributed lock

Redis Learn-27-分布式锁进化史

redis 分布式锁设计 redis lock RedLock

保持架构的简单性

如果你追求性能,且架构中存在 redis,可以优先考虑基于 redis 的锁。

我们对调度精度要求不是特别高,为了架构简单,暂时采用的是 mysql 锁。

无论是那种实现,我们只依赖抽象的锁接口,后续便于替换。

任务的触发与执行

任务要如何触发,又如何保证只被执行一次呢?

任务的触发

作为本篇文章的重点。

主要是 2 点:

1)如何保证唯一一台执行?

这里采用分布式锁,抢到锁的才能执行。

根据实现策略不同,可能会略有差异。

2)如何保证正确的触发?

也取决于实现策略,最经典的应该是时间轮算法。

当然也可以很简单的定时触发。

任务的执行

每次任务触发的时候,找到待执行的任务,然后加锁执行,保证每一次只有一个任务执行。

任务的触发策略

v1-定时加载触发

核心代码,定时触发:

protected void triggerTaskAtFixedRate() {
    triggerTaskExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            log.info("[Schedule] trigger start");
            triggerTask(TimeUnit.MILLISECONDS, triggerIntervalMills);
            log.info("[Schedule] trigger end");
        }
    }, triggerIntervalMills, triggerIntervalMills, TimeUnit.MILLISECONDS);
    log.info("[Schedule] trigger init done triggerIntervalMills={}", triggerIntervalMills);
}

加锁,并获取对应待执行的任务列表:

任务放在数据库中,根据待执行时间,查出需要执行的任务列表即可。

/**
 * 执行任务调度
 * @param timeUnit 时间单位
 * @param lockInterval 锁的间隔
 */
protected void triggerTask(final TimeUnit timeUnit, final long lockInterval) {
    // 抢占锁
    final ILock lock = context.getLock();
    final String lockKey = ScheduleConst.LOCK_TRIGGER;
    try {
        // 锁占用多久?
        boolean lockFlag = lock.tryLock(lockKey, timeUnit, lockInterval);
        if(!lockFlag) {
            log.info("[Schedule] current machine triggerTask tryLock fail, return.");
            return;
        }

        final long currentTime = System.currentTimeMillis();
        //1. 查询所有的任务
        List<TDistributedScheduleTask> taskList = this.queryAllExecuteList(context, currentTime);
        if(CollectionUtil.isEmpty(taskList)) {
            log.warn("");
            return;
        }
        //2. 并行异步执行每一个任务
        final IScheduleExecutor scheduleExecutor = context.getScheduleExecutor();
        taskList.parallelStream()
                .forEach(new Consumer<TDistributedScheduleTask>() {
                    @Override
                    public void accept(TDistributedScheduleTask task) {
                        // 触发调度
                        executeTask(scheduleExecutor, task);
                    }
                });
    } catch (Exception e) {
        //trigger ex handler
        scheduleTriggerErrorHandler.error(context, e);
    } finally {
        // 这里应该不用释放锁,根据时间来判断。不然一个任务执行完,会导致下一次直接开始。
        lock.unlock(ScheduleConst.LOCK_TRIGGER);
    }
}

v2-最简单的遍历

非常简单,性能也最差。

直接遍历所有的任务,找到待执行的任务。

@Override
protected List<TDistributedScheduleTask> queryAllExecuteList(ScheduleContext context, long currentTime) {
    final long now = InnerScheduleUtil.now();
    // 找到执行时间小于等于当前时间的,加锁移除。因为时间是排序的,所以如果发现不满足,可以直接快速失败
    List<TDistributedScheduleTask> resultList = new ArrayList<>();
    // 加锁- 为什么要求字段为 final?
    // 暂时不加锁,直接处理会如何? 会导致什么问题呢?
    taskDataMap.values().parallelStream().forEach(task -> {
        //信息
        if (now >= task.getScheduleNextTime()) {
            resultList.add(task);
        }
    });
    // 统一放入,而不是一遍处理一遍修改。
    resultList.parallelStream().forEach(ScheduleTriggerLoop.super::removeAndNext);
    return resultList;
}

v3-基于排序的集合

思路:插入数据的时候,按照待执行时间排序。寻找的时候,只需要从前到后寻找,如果不满足,后面的可以跳过。

数据结构:选择 treeMap

核心实现:

@Override
protected List<TDistributedScheduleTask> queryAllExecuteList(ScheduleContext context, long currentTime) {
    // 在内存中,自己判断除当前需要处理的任务
    long currentIndex = calcSlotIndex(currentTime);

    // 找到执行时间小于等于当前时间的,加锁移除。因为时间是排序的,所以如果发现不满足,可以直接快速失败
    List<TDistributedScheduleTask> resultList = new ArrayList<>();

    // 加锁- 为什么要求字段为 final?
    // 暂时不加锁,直接处理会如何? 会导致什么问题呢?
    for(Map.Entry<Long, Set<TDistributedScheduleTask>> entry : taskTreeMap.entrySet())  {
        long time = entry.getKey();
        if(currentIndex < time) {
            break;
        }
        // 移除
        Set<TDistributedScheduleTask> taskSet = taskTreeMap.get(time);
        if(CollectionUtil.isNotEmpty(taskSet)) {
            resultList.addAll(taskSet);
        }
    }

    // 计算下一次的调度时间,重新加入到队列中
    // 好处是可以直接并行
    // 下一次时间
    resultList.parallelStream().forEach(this::removeAndNext);

    // 更新数据
    afterQueryAllExecuteList(resultList, context, currentTime);
    return resultList;
}

v4-简单的时间轮

思路:基于时间轮算法。不过基于 map 做了点优化,可以理论上一直放入任务。

核心实现:

@Override
protected List<TDistributedScheduleTask> queryAllExecuteList(ScheduleContext context, long currentTime) {
    // 拿到当前的 slot
    Set<TDistributedScheduleTask> taskSet = taskIndexSetMap.get(currentTimeIndex.get());

    // 更新时间
    currentTimeIndex.getAndAdd(calcSlotIndex(super.triggerIntervalMills));
    if(CollectionUtil.isEmpty(taskSet)) {
        return Collections.emptyList();
    }

    List<TDistributedScheduleTask> resultList = new ArrayList<>(taskSet);
    // 移除,更新时间
    resultList.parallelStream().forEach(this::removeAndNext);
    return resultList;
}

v5-多维时间轮

这个是为了解决任务执行时候太靠后,避免内存浪费。

核心实现:

@Override
protected List<TDistributedScheduleTask> queryAllExecuteList(ScheduleContext context, long currentTime) {
    // 拿到当前的 slot
    MultiIndexDto currentTriggerSlot = calcSlotIndex(currentTime);
    Set<TDistributedScheduleTask> taskSet = taskDataMap.get(currentTriggerSlot);
    // 更新时间
    this.currentTimeIndex = calcSlotIndex(currentTime + super.triggerIntervalMills);
    if(CollectionUtil.isEmpty(taskSet)) {
        return Collections.emptyList();
    }
    List<TDistributedScheduleTask> resultList = new ArrayList<>(taskSet);
    // 移除,更新时间
    resultList.parallelStream().forEach(this::removeAndNext);
    return resultList;
}

其实和一维度的逻辑很类似,只不过 Index 调整为了多维度。

slot 计算的方式:

protected MultiIndexDto calcSlotIndex(final long time) {
    // 计算出每一层的信息
    long[] indexList = new long[sizes.length];
    // 剩余值
    long remain = time;
    for(int i = 0; i < sizes.length; i++) {
        // 当前层的长度
        long currentLen = sizes[i];
        // 其实有一些浪费,因为实际上的 trigger 一般不会设置为 1ms
        long lenWithWeight = currentLen * triggerIntervalMills;
        // 当前值
        long currentVal = remain / lenWithWeight;
        // 剩余的值
        remain %= lenWithWeight;
        indexList[i] = currentVal;
    }
    return MultiIndexDto.of(indexList);
}

小结

这种方式好处是足够简单,缺点是还是不够实时。

如果想实现一个强大的基于推模式的调度,我们就要费一些功夫了。

后续有机会,老马和大家一起讨论下如何实现一个基于推模式的分布式调度系统。

实现

已有实现,放在了 github,暂时不开源。

distributed-schedule