设计背景

现在市面上已经有很多消息中间件了(ActiveMQ, RabbitMQ, Kafka, RocketMQ),那么为什么我们还要造另外一个轮子呢?

首先QMQ是2012年就开始开发的,在这个时期其实消息中间件并没有太多的选择,那个时候Kafka还不太成熟,而RocketMQ也没有出现,大部分公司都会采用ActiveMQ或RabbitMQ。

首先RabbitMQ的开发语言是erlang,这是一门略小众的语言,我们担心无法完全掌控,所以没有选择。而ActiveMQ其实在公司内部已有很长一段时间使用历史,但是ActiveMQ太过于复杂,在使用过程中经常出现消息丢失或者整个进程hang住的情况,并且难以定位。

当然,这都是QMQ诞生的历史背景,那么放在今天QMQ还有开源的意义吗?

QMQ 的特色

我们比较了市面上现有的消息中间件,仍然认为QMQ有自己的特色,而这些也是我们在过去几年运维消息中间件中觉得必须提供的和难以舍弃的。

我们都知道Kafka和RocketMQ都是基于partition的存储模型,也就是每个subject分为一个或多个partition,而Server收到消息后将其分发到某个partition上,而Consumer消费的时候是与partition对应的。

比如,我们某个subject a分配了3个partition(p1, p2, p3),有3个消费者(c1, c2, c3)消费该消息,则会建立c1 - p1, c2 - p2, c3 - p3这样的消费关系。

design1.png

那么如果我们的consumer个数比partition个数多呢?

则有的consumer会是空闲的。

而如果partition个数比consumer个数多呢?

则可能存在有的consumer消费的partition个数会比其他的consumer多的情况。

那么合理的分配策略只有是partition个数与consumer个数成倍数关系。

以上都是基于partition的MQ所带来的负载均衡问题。因为这种静态的绑定的关系,还会导致Consumer扩容缩容麻烦。也就是使用Kafka或者RocketMQ这种基于partition的消息队列时,如果遇到处理速度跟不上时,光简单的增加Consumer并不能马上提高处理能力,需要对应的增加partition个数,而特别在Kafka里partition是一个比较重的资源,增加太多parition还需要考虑整个集群的处理能力;当高峰期过了之后,如果想缩容Consumer也比较麻烦,因为partition只能增加,不能减少。

跟扩容相关的另外一个问题是,已经堆积的消息是不能快速消费的。比如开始的时候我们分配了2个partition,由2个Consumer来消费,但是突然发送方大量发送消息(这个在日常运维中经常遇到),导致消息快速的堆积,这个时候我们如何能快速扩容消费这些消息呢?其实增加partition和Consumer都是没有用的,增加的Consumer爱莫能助,因为堆积的那2个partition只能由2个Consumer来消费,这个时候你只能纵向扩展,而不能横向扩展,而我们都知道纵向扩展很多时候是不现实的,或者执行比较重的再均衡操作。

基于这些考虑我们并没有直接采用Kafka等基于partition存储模型的消息队列,我们的设计考虑是消费和存储模型是完全解耦的关系,Consumer需要很容易的扩容缩容,从现在来看这个选择也是正确的。现在去哪儿网的系统架构基本上呈现为基于消息驱动的架构,在我们内部系统之间的交互大部分都是以消息这种异步的方式来进行。比如我们酒店的订单变更消息就有接近70个不同的消费组订阅(可以将消费组理解为不同的应用),整个交易流程都是靠消息来驱动,那么从上面对基于partition模型的描述来看,要在70个不同应用之间协调partition和Consumer的均衡几乎是不可能的。

项目架构

下图是QMQ中各组件及其交互图:

QMQ-struct

基本概念

meta server提供集群管理和集群发现的作用

server 提供实时消息服务

delay server 提供延时/定时消息服务,延时消息先在delay server排队,时间到之后再发送给server

producer 消息生产者

consumer 消息消费者

交互过程

根据图中的编号描述一下其交互过程

  1. delay server 向meta server注册

  2. 实时server 向meta server注册

  3. producer在发送消息前需要询问meta server获取server list

  4. meta server返回server list给producer(根据producer请求的消息类型返回不同的server list)

  5. producer发送延时/定时消息

  6. 延时时间已到,delay server将消息投递给实时server

  7. producer发送实时消息

  8. consumer需要拉取消息,在拉取之前向meta server获取server list(只会获取实时server的list)

  9. meta server返回server list给consumer

  10. consumer向实时server发起pull请求

  11. 实时server将消息返回给consumer

下面分别对实时消息Server和延时/定时消息Server的存储模型进行描述

实时消息

在设计背景里,已经描述了QMQ没有采用基于partition存储模型,但是在学习Kafka和RocketMQ的存储实现方式后,有很多地方是值得借鉴的:

  1. 顺序append文件,提供很好的性能

  2. 顺序消费文件,使用offset表示消费进度,成本极低

  3. 将所有subject的消息合并在一起,减少parition数量,可以提供更多的subject(RocketMQ)

在演化QMQ的存储模型时,觉得这几点是非常重要的。那如何在不使用基于partition的情况下,又能得到这些特性呢?

正所谓有前辈大师说:计算机中所有问题都可以通过添加一个中间层来解决,一个中间层解决不了那就添加两个。

我们通过添加一层拉取的log(pull log)来动态映射consumer与partition的逻辑关系,这样不仅解决了consumer的动态扩容缩容问题,还可以继续使用一个offset表示消费进度。

QMQ 存储模型

下图是QMQ的存储模型

qmq-store-module

先解释一下上图中的数字的意义。

上图中方框上方的数字,表示该方框在自己log中的偏移,而方框内的数字是该项的内容。

比如message log方框上方的数字:3,6,9几表示这几条消息在message log中的偏移。

而consume log中方框内的数字3,6,9,20正对应着message log的偏移,表示这几个位置上的消息都是subject1的消息,consume log方框上方的1,2,3,4表示这几个方框在consume log中的逻辑偏移。

下面的pull log方框内的内容对应着consume log的逻辑偏移,而pull log方框外的数字表示pull log的逻辑偏移。

在实时Server存储模型中有三种重要的log:

  1. message log 所有subject的消息进入该log,消息的主存储

  2. consume log consume log存储的是message log的索引信息

  3. pull log 每个consumer拉取消息的时候会产生pull log,pull log记录的是拉取的消息在consume log中的sequence

那么消费者就可以使用pull log上的sequence来表示消费进度

延时/定时消息

QMQ提供任意时间的延时/定时消息,你可以指定消息在未来两年内(可配置)任意时间内投递。

比起RocketMQ提供的多个不同延时level的延时消息,QMQ的延时消息更加灵活。

比如在OTA场景中,客人经常是预订未来某个时刻的酒店或者机票,这个时间是不固定的,我们无法使用几个固定的延时level来实现这个场景。

QMQ的延时/定时消息使用的是两层hash wheel来实现的。

第一层位于磁盘上,每个小时为一个刻度(默认为一个小时一个刻度,可以根据实际情况在配置里进行调整),每个刻度会生成一个日志文件(schedule log),因为QMQ支持两年内的延迟消息(默认支持两年内,可以进行配置修改),则最多会生成 2 * 366 * 24 = 17568 个文件(如果需要支持的最大延时时间更短,则生成的文件更少)。

第二层在内存中,当消息的投递时间即将到来的时候,会将这个小时的消息索引(索引包括消息在schedule log中的offset和size)从磁盘文件加载到内存中的hash wheel上,内存中的hash wheel则是以500ms为一个刻度。

qmq-delay-schedue

日志

在延时/定时消息里也存在三种log:

  1. message log 和实时消息里的message log类似,收到消息后append到该log就返回给producer,相当于WAL。

  2. schedule log 按照投递时间组织,每个小时一个。该log是回放message log后根据延时时间放置对应的log上,这是上面描述的两层hash wheel的第一层,位于磁盘上。schedule log里是包含完整的消息内容的,因为消息内容从message log同步到了schedule log,所以历史message log都可以删除(所以message log只需要占用极小的存储空间,所以我们可以使用低容量高性能的ssd来获取极高的吞吐量,比如采用100G极好的SSD只需要RMB2000左右)。另外,schedule log是按照延时时间组织的,所以延时时间已过的schedule log文件也可以删除。

  3. dispatch log 延时/定时消息投递成功后写入,主要用于在应用重启后能够确定哪些消息已经投递,dispatch log里写入的是消息的offset,不包含消息内容。当延时server中途重启时,我们需要判断出当前这个刻度(比如一个小时)里的消息有哪些已经投递了则不重复投递。

个人感受

  1. 别人使用的,未必是最好的。QMQ 有着自己的业务场景,是为去哪儿量身打造的。

  2. 后起之秀。想要有一个优秀的作品,除了结合业务之外,还要学会站在巨人的肩膀上。

参考资料

https://github.com/qunarcorp/qmq/blob/master/docs/cn/design.md