顺序消费消息的必要性

在MQ里,顺序消息的意思是消费消息的顺序和消息发送时(单机发送)的顺序保持一致。

比如ProducerA按照顺序发送msga, msgb, msgc三条消息,那么consumer消费的时候也应该按照msga, msgb, msgc来消费。

对于顺序消息,在我们实际使用中发现,大部分业务系统并不需要或者并不依赖MQ提供的顺序机制,这些业务本身往往就能处理无序的消息,比如很多系统中都有状态机,是否消费消息必须根据状态机当前的状态。

但是在一些场景中顺序消息也有其必要性:比如日志收集和依赖binlog同步驱动业务等。就这两个场景而言,同样是顺序消息但对顺序的需求却不一定一样:比如日志收集中我们一般认为对顺序的要求比较弱,即绝大多数时是有序即可,遇到一些极端情况,比如Server宕机,容量调整的时候我们可以暂时容忍一些无序。但是对于一个依赖MySQL binlog同步来驱动的业务,短暂的无序都将会导致整个业务的错乱。

分析现有的一些MQ后发现,它们并不能在所有情况下提供可靠的顺序支持。现在市面上的MQ基本上都是以partition - based模型来提供顺序支持。我们以Kafka为例:topic分为一个或多个partition,partition可以理解为一个顺序文件,producer发送消息的时候,按照一定的策略选择partition,比如partition = hash(key) % partition num来选择该消息发送给哪个partition,那么具有相同key的消息就会落到相同的partition上,而consumer消费的时候一个consumer独占地绑定在一个partition上。

这样一来,消息就是顺序消费的了:

模型存在的问题

但是这种模型存在一些问题:

partition的个数就是消费的并行度,那么如果现在consumer处理不过来需要增加consumer则需要对应地增加partition。而根据上面的描述partition的个数一旦改变,则顺序将无法保证 (partition = hash(key) % partition num 公式里partition num发生了改变,则选择的partition也会发生变化)。

所以我们一般在业务上线之前,就要做出合理的容量规划,预先创建出足够的partition,但有的时候容量规划是困难的,实践中往往是预先分配大量的partition,比如几百甚至几千,然而大量的partition对性能以及运维都带来麻烦。

扩容partition后,如果高峰期已过,想进行缩容则基本上不可行(比如Kafka就不允许减少partition),除了缩容带来顺序变化外,还有一点是怎么保证被缩容的partition上的消息已经完全消费完成了呢?

partition的移动问题,partition如果分配在某台broker上之后再移动就很麻烦,一旦这台broker容量不足,需要进行负载均衡就很困难了,这可能需要在不同的机器上传输大量的数据。

对可用性的挑战,顺序发送的时候某个key的消息必须总是发送给指定的partition的,如果一旦某台server挂掉,或者正常的停机维护,那么位于这台server的partition就不能收消息了,但是也不能发送给其他partition,否则顺序就会错乱。

虽然我们可以通过多副本机制(Replication)来确保即使该partition所在机器出现故障时候仍然有其他副本提供服务,但是一般选举出一个新的副本通常需要花费几秒到几分钟不等(比如早期的Kafka版本Leader迁移是串行执行的,在分区特别多的时候,选举出新的leader可能需要分钟级时间),在此期间发送到该partition的所有消息都无法发送。

堆积问题,如果预分配时候的partition过少,这个时候堆积了大量的消息,那么即使扩容也没有办法了:

所以我们认为现有的一些所谓顺序消息机制并不是简单可依赖的。你以为MQ给你提供了顺序保障,但实际上在一些时候并不是这样,那么这个时候使用方为了应对这种异常情况就需要做出各种应对措施,增加了使用的复杂度。而我们希望提供一种简单可依赖的顺序消息,也就是使用方可以放心的将顺序保证交给MQ。

设计方案

首先我们来分析无法保证顺序的根源是什么。我们选择partition所使用的公式是 partition = hash(key) % partition num

正是因为partition num发生了变化导致公式的结果发生了变化,进而打破了顺序保证。

其实对于这个公式我们可能并不陌生,除了在MQ中使用,我们在数据库分库分表中往往也有这种套路。

数据库分库分表

在数据库分库分表中我们会通过一个分区键计算其分区,然后得到表名或库名(如下伪代码所示,user_id是分表键,总共分为100张表):

//计算userId的哈希值对100取模,得到该userId应该落在哪个表
int id = hash(userId) % 100;
String tableName = users_ + id;
String sql = select name from  + tableName + where user_id = ?;

而且在分库分表中前期因为业务量不大,我们往往不会分很多库(或者我们也分了多个库,但是这些库都落在相同的机器上),但是为了后期添加分库方便(扩容)我们会预先分出很多表。比如我们前期分成100张表,但是这100张表都在相同的库里,待到业务增长之后,单库无法支撑,我们会将100张表划分到不同的DB里。

比如我们将表0 - 50落在DB1, 50 - 100落到DB2,这样我们的处理能力就翻倍了,但是因为程序里还是按照100进行分表的,所以对应用没有感知。

这种机制相当于引入了一个中间层,程序面对的是的分表,最后这个表是落在什么DB上通过中间层进行映射过去就可以了。

MQ 引入类似的思想

那么其实我们是可以借鉴这种思路应用在MQ的扩容缩容中的。为此我们引入了logic partition的概念。也就是Producer发送消息的时候,我们并不决定它发送到哪个具体的Server上的具体的partition里(后文将其称之为物理partition, physical partition)。

我们只是先得到logic partition,使用这个计算公式: logic partition = hash(key) % logic partition num

而logic partition num我们会固定住,永不改变。比如我们将logic partition num固定为1000。但是这里跟分库分表中的分1000张表不同,logic partition仅仅是逻辑上的,不存在任何存储实体,所以即使分配的再大也没有性能上的开销。计算得到logic partition后,我们根据logic partition的映射再来决定该消息应该落到具体哪个physical partition上。我们会根据logic partition的范围进行映射,比如logic partition 0 - 500 映射到 physcial partition 1上,500 - 1000 映射到physcial partition 2上。

对于开头问题的解决

接下来我们来看看这种措施如何应对本文开头所提出的一序列问题呢:

  • 扩容

在这里扩容其实就是对physical partition的分裂过程。

比如开始时我们创建了两个分区: physical partition 1, physical partition 2,因为消费不过来,我们要将physcial partiton 1扩容,那么我们将会得到 logic partiton 0 - 250 映射到physical partition 3,logic partition 250 - 500 映射到physical partition 4

(注:范围的分裂不一定是平均的,比如我们也可以按照[0 - 200)和[200 - 500)进行划分 )。

  • 缩容

缩容其实就是对physical partition的合并过程,我们将physical partiton 3和physical partition 4合并得到physical partition 5。

那么现在logic partiton 0 - 500就映射到physical partition 5。

  • 负载均衡

负载均衡其实就是logic partition到physical partiton的重新映射过程。

也就是原来0 - 500 映射到 physical partition 5,现在我们将其映射到physical partition 6,而physical partition 6可以分配在一台空闲的Server上。

不仅如此,重新映射也可以解决可用性问题:一台server停机维护时将落在上面的logic partition进行重新映射,分配到另外一台Server上即可,这样我们就可以打造Always writtable ordered message queue。

MQ 存在的其他问题

这里借鉴分库分表中的预先分表的方法,提出logic partition的抽象层解决物理partition扩容缩容时无法保证顺序的问题。

但是实际实现时候我们会发现MQ的这种logic partition分法要比数据库中分表复杂得多。

因为MQ是的消费是持续性的,也就是我可以读取历史数据。数据库中分库分表一旦调整之后,那么它呈现的就是最终视图,而MQ里昨天我们可能还只有一个physical partition,今天我们划分为两个,那么我们消费昨天的数据和今天的数据的时候如何进行无缝的切换呢?

我们先简单总结一下上面对扩容缩容移动的描述:

扩容即对physical partition按照logic partition的范围进行分裂的过程

缩容即按照logic partition的范围对physical partition进行合并的过程

移动即改变logic partition与physical partition的映射的过程

扩容实例

接下来以实际的例子来进行说明,下面是一个扩容的实例。

order.changed这个主题,原来分配了P1, P2两个分区,现在因为容量不够,需要对P2进行扩容(分裂)。

也就是将physical partition P2进行分裂,分裂成P3, P4两个分区。

分裂的原则是按照logic partition的范围进行,logic partition [500, 1000)原来映射到P1,现在logic partition [500, 750)映射到P3, [750, 1000)映射到P4。

也就是分裂以后producer发送新的消息就会按照新的映射关系将消息append到P1, P3或P4,P2不再接收新的消息了。

分裂-01

接下来具体描述一下实现步骤。

在QMQ里有个metaserver的组件,它管理所有元数据信息,比如某topic分配到哪些partition上(我们将其称之为路由):

分裂-qmq-route

metaserver还管理partition分配在哪些server上,以及logic partition与physical partition的映射关系。

在需要对P2进行分裂的时候,metaserver会发送一条消息给P2所在的server,这条消息会被append到P2上,该消息称之为指令消息 (command message),对客户端不可见,也就是业务代码不会消费到这条消息。

P2收到这条指令消息后将不再接收新的消息了,所有业务消息均被拒绝,那么这条指令消息就是P2上的最后一条消息,相当将P2关闭了。

metaserver发送完指令消息后会变更对应topic的路由信息:

分裂-qmq-metaserver

注意看上面的表格的特点,这个路由信息表与众不同的地方在于它有一个version字段。

对于producer而言它总是获取最新版本的路由信息,也就是路由发生变更后,producer就会获得更高版本的路由信息,然后向这些分区上发送消息。

但是对于consumer来讲,它必须将前面的消息消费完成才能消费后面的,否则顺序就乱了。

比如前面分裂的示例,P2分裂为P3, P4了,这个时候P3, P4并不是立即对consumer可见的(只要对consumer不可见,就没有consumer来消费它)。

只有当consumer消费到指令消息时,才会触发consumer的路由变更。

并且指令消息里携带了路由的版本信息,假设路由已经发生了多次变更,consumer消费到某个指令消息的时候,只会将consumer的路由变更到该指令的下一个版本,而不会跳到其他版本,这里触发路由变更的时候会使用乐观锁去更新版本(伪代码):

update routes set version = version + 1 where topic = @topic and version = @current version

总结起来就是producer总是使用最新版本的路由,而consumer使用指定版本的路由,路由的版本由指令消息进行同步。

其实这个流程中最有趣的不是扩容(分裂)和缩容(合并),而是移动。

比如我们现在发现P4分区所在机器负载比较高或磁盘就要满了,现在给集群加了几台机器,怎么做能在继续保持顺序的基础上又能将负载分散过去呢?

那么只需要发送一个移动的指令消息给P4,然后P4就会关闭,然后变更路由,order.changed的路由现在是P1, P3, P5,这次路由变更分区的个数没有发生改变,改变的只是logic partition和physical partition的映射关系:

移动-qmq

因为P5是新分区,所以他可以分配在新机器上了。而且这个特性可以用在提高顺序消息的可用性上,比如需要对某台server停机,那么我们只需要对其上面所有分区发送移动指令即可。

约束条件

另外,在实现的时候我们还增加了如下约束条件:

  1. 版本必须是连续递增的

  2. 每次只能执行一项变更,比如只能对一个partition分裂,不能对多个partition进行分裂

  3. 对logic partition范围的每次操作必须是连续的,比如合并的时候只能将[0, 100) 与[100, 200)合并,而不能将[0, 100)与[200, 300)合并

  4. 路由变更必须是本次变更分区所有的消费者都确认执行到指令消息才能触发。比如将多个分区合并的时候,必须是这几个分区都消费到了指令消息的时候触发。

个人感受

  1. 顺序消费必定会带来很大程度上性能的牺牲。

  2. 集群的套路都是类似的,掌握原理即可。

  3. 这种基于版本的思想其实非常不错,以后设计的时候,完全可以借鉴一下。

参考资料

QMQ 顺序消息设计与实现(上)

https://www.tuicool.com/articles/YF7fmai

https://mp.weixin.qq.com/s/NsoLs0dYSGrF1fBjmnzkHw