何时需要 MQ

当你需要使用消息队列时,首先需要考虑它的必要性。

可以使用mq的场景有很多,最常用的几种,是做业务解耦/最终一致性/广播/错峰流控等。

反之,如果需要强一致性,关注业务逻辑的处理结果,则RPC显得更为合适。

解耦

解耦是消息队列要解决的最本质问题。所谓解耦,简单点讲就是一个事务,只关心核心的流程。

而需要依赖其他系统但不那么重要的事情,有通知即可,无需等待结果。换句话说,基于消息的模型,关心的是“通知”,而非“处理”。

最终一致性

最终一致性指的是两个系统的状态保持一致,要么都成功,要么都失败。

当然有个时间限制,理论上越快越好,但实际上在各种异常的情况下,可能会有一定延迟达到最终一致状态,但最后两个系统的状态是一样的。

广播

消息队列的基本功能之一是进行广播。如果没有消息队列,每当一个新的业务方接入,我们都要联调一次新接口。

有了消息队列,我们只需要关心消息是否送达了队列,至于谁希望订阅,是下游的事情,无疑极大地减少了开发和联调的工作量。

错峰与流控

试想上下游对于事情的处理能力是不同的。

比如,Web前端每秒承受上千万的请求,并不是什么神奇的事情,只需要加多一点机器,再搭建一些LVS负载均衡设备和Nginx等即可。但

数据库的处理能力却十分有限,即使使用SSD加分库分表,单机的处理能力仍然在万级。

由于成本的考虑,我们不能奢求数据库的机器数量追上前端。

综述

我们之所以要设计一个消息队列,并且配备broker,无外乎要做两件事情:

  1. 消息的转储,在更合适的时间点投递,或者通过一系列手段辅助消息最终能送达消费机。

  2. 规范一种范式和通用的模式,以满足解耦、最终一致性、错峰等需求。

掰开了揉碎了看,最简单的消息队列可以做成一个消息转发器,把一次RPC做成两次RPC。

发送者把消息投递到服务端(以下简称broker),服务端再将消息转发一手到接收端,就是这么简单。

一般来讲,设计消息队列的整体思路是先build一个整体的数据流,例如producer发送给broker,broker发送给consumer,consumer回复消费确认,broker删除/备份消息等。

利用RPC将数据流串起来。然后考虑RPC的高可用性,尽量做到无状态,方便水平扩展。

之后考虑如何承载消息堆积,然后在合适的时机投递消息,而处理堆积的最佳方式,就是存储,存储的选型需要综合考虑性能/可靠性和开发维护成本等诸多因素。

为了实现广播功能,我们必须要维护消费关系,可以利用zk/config server等保存消费关系。

在完成了上述几个功能后,消息队列基本就实现了。然后我们可以考虑一些高级特性,如可靠投递,事务特性,性能优化等。

基本功能

RPC 通信协议

刚才讲到,所谓消息队列,无外乎两次RPC加一次转储,当然需要消费端最终做消费确认的情况是三次RPC。

RPC 可以使用 thirft 或者 dubbo

高可用

依赖 RPC 本身的高可用。

而消息队列的高可用,只要保证broker接受消息和确认消息的接口是幂等的,并且consumer的几台机器处理消息是幂等的,这样就把消息队列的可用性,转交给RPC框架来处理了。

  • 幂等

那么怎么保证幂等呢?最简单的方式莫过于共享存储。

broker多机器共享一个DB或者一个分布式文件/kv系统,则处理消息自然是幂等的。

就算有单点故障,其他节点可以立刻顶上。另外failover可以依赖定时任务的补偿,这是消息队列本身天然就可以支持的功能。

服务端承载消息堆积的能力

把消息存储下来是一个很必要的过程。

但归结起来,主要有持久化和非持久化两种。

持久化的形式能更大程度地保证消息的可靠性(如断电等不可抗外力),并且理论上能承载更大限度的消息堆积(外存的空间远大于内存)。

但并不是每种消息都需要持久化存储。很多消息对于投递性能的要求大于可靠性的要求,且数量极大(如日志)。

这时候,消息不落地直接暂存内存,尝试几次failover,最终投递出去也未尝不可。

存储子系统的选择

如果你们的消息队列是用来支持支付/交易等对可靠性要求非常高,但对性能和量的要求没有这么高,而且没有时间精力专门做文件存储系统的研究,DB是最好的选择。

分布式KV(如MongoDB,HBase)等,或者持久化的Redis,由于其编程接口较友好,性能也比较可观,如果在可靠性要求不是那么高的场景,也不失为一个不错的选择。

比如 activeMQ 底层的 LevelDB。

消费关系解析

市面上的消息队列定义了一堆让人晕头转向的名词,

如JMS 规范中的Topic/Queue,Kafka里面的Topic/Partition/ConsumerGroup,RabbitMQ里面的Exchange等等。

抛开现象看本质,无外乎是单播与广播的区别。

队列高级特性设计

可靠投递(最终一致性)

方案说简单也简单,就是每当要发生不可靠的事情(RPC等)之前,先将消息落地,然后发送。

当失败或者不知道成功失败(比如超时)时,消息状态是待发送,定时任务不停轮询所有待发送消息,最终一定可以送达。

ps: 有些类似于分布式事务中的本地消息表。首先存储,然后操作。

  • 具体流程
  1. producer往broker发送消息之前,需要做一次落地。

  2. 请求到server后,server确保数据落地后再告诉客户端发送成功。

  3. 支持广播的消息队列需要对每个待发送的endpoint,持久化一个发送状态,直到所有endpoint状态都OK才可删除消息。

消费确认

把消息的送达和消息的处理分开,这样才真正的实现了消息队列的本质-解耦。

所以,允许消费者主动进行消费确认是必要的。

当然,对于没有特殊逻辑的消息,默认Auto Ack也是可以的,但一定要允许消费方主动ack。

重复消息和顺序消息

顺序消息更加苛刻。

  1. 允许消息丢失。

  2. 从发送方到服务方到接受者都是单点单线程。

一般来讲,一个主流消息队列的设计范式里,应该是不丢消息的前提下,尽量减少重复消息,不保证消息的投递顺序。

重复消息

谈到重复消息,主要是两个话题:

  1. 如何鉴别消息重复,并幂等的处理重复消息。

  2. 一个消息队列如何尽量减少重复消息的投递。

幂等的处理消息是一门艺术,因为种种原因重复消息或者错乱的消息还是来到了,说两种通用的解决方案:

  1. 版本号。

  2. 状态机。

版本号

举个简单的例子,一个产品的状态有上线/下线状态。如果消息1是下线,消息2是上线。不巧消息1判重失败,被投递了两次,且第二次发生在2之后,如果不做重复性判断,显然最终状态是错误的。

但是,如果每个消息自带一个版本号。上游发送的时候,标记消息1版本号是1,消息2版本号是2。如果再发送下线消息,则版本号标记为3。下游对于每次消息的处理,同时维护一个版本号。

每次只接受比当前版本号大的消息。初始版本为0,当消息1到达时,将版本号更新为1。消息2到来时,因为版本号>1.可以接收,同时更新版本号为2.当另一条下线消息到来时,如果版本号是3.则是真实的下线消息。如果是1,则是重复投递的消息。

如果业务方只关心消息重复不重复,那么问题就已经解决了。但很多时候另一个头疼的问题来了,就是消息顺序如果和想象的顺序不一致。比如应该的顺序是12,到来的顺序是21。则最后会发生状态错误。

参考TCP/IP协议,如果想让乱序的消息最后能够正确的被组织,那么就应该只接收比当前版本号大一的消息。并且在一个session周期内要一直保存各个消息的版本号。

如果到来的顺序是21,则先把2存起来,待2到来后,再处理1,这样重复性和顺序性要求就都达到了。

状态机

基于版本号来处理重复和顺序消息听起来是个不错的主意,但凡事总有瑕疵。使用版本号的最大问题是:

  1. 对发送方必须要求消息带业务版本号。

  2. 下游必须存储消息的版本号,对于要严格保证顺序的。

举例子说明,假设产品本身状态是下线,1是上线消息,2是下线消息,3是上线消息,正常情况下,消息应该的到来顺序是123,但实际情况下收到的消息状态变成了3123。

那么下游收到3消息的时候,判断状态机流转是下线->上线,可以接收消息。然后收到消息1,发现是上线->上线,拒绝接收,要求重发。然后收到消息2,状态是上线->下线,于是接收这个消息。

此时无论重发的消息1或者3到来,还是可以接收。另外的重发,在一定次数拒绝后停止重发,业务正确。

消息队列对于重复消息的处理

减少重复消息的关键步骤:

  1. broker记录MessageId,直到投递成功后清除,重复的ID到来不做处理,这样只要发送者在清除周期内能够感知到消息投递成功,就基本不会在server端产生重复消息。

  2. 对于server投递到consumer的消息,由于不确定对端是在处理过程中还是消息发送丢失的情况下,有必要记录下投递的IP地址。决定重发之前询问这个IP,消息处理成功了吗?如果询问无果,再重发。

事务

满足事务的一致性特征,则必须要么都不进行,要么都能成功。

解决方案从大方向上有两种:

  1. 两阶段提交,分布式事务。

  2. 本地事务,本地落地,补偿发送。

分布式事务的成本太高。

一个完整的消息队列应该定义清楚自己可以投递的消息类型,如事务型消息,本地非持久型消息,以及服务端不落地的非可靠消息等。

对不同的业务场景做不同的选择。另外事务的使用应该尽量低成本、透明化,可以依托于现有的成熟框架,如Spring的声明式事务做扩展。

业务方只需要使用 @Transactional 标签即可。

性能相关

异步/同步

首先澄清一个概念,异步,同步和 oneway 是三件事。

异步,归根结底你还是需要关心结果的,但可能不是当时的时间点关心,可以用轮询或者回调等方式处理结果;

同步是需要当时关心的结果的;

而 oneway 是发出去就不管死活的方式,这种对于某些完全对可靠性没有要求的场景还是适用的,但不是我们重点讨论的范畴。

场景

  • 客户端同步服务端异步
Future<Result> future = request(server);//server立刻返回future
    synchronized(future){
    while(!future.isDone()){
    future.wait();//server处理结束后会notify这个future,并修改isdone标志
    }
}
return future.get();
  • 客户端同步服务端同步
Result result = request(server);
  • 客户端异步服务端同步(这里用线程池的方式)
Future<Result> future = executor.submit(new Callable(){public void call<Result>(){
    result = request(server);
}})
return future;
  • 客户端异步服务端异步
Future<Result> future = request(server);//server立刻返回future
return future

服务端异步优点

那么,服务端使用异步最大的好处是什么呢?

说到底,是解放了线程和I/O。

试想服务端有一堆I/O等待处理,如果每个请求都需要同步响应,每条消息都需要结果立刻返回,那么就几乎没法做I/O合并。

而如果用异步的方式返回给客户端future,就可以有机会进行I/O的合并,把几个批次发过来的消息一起落地(这种合并对于MySQL等允许batch insert的数据库效果尤其明显),并且彻底释放了线程。不至于说来多少请求开多少线程,能够支持的并发量直线提高。

总结一句,同步能够保证结果,异步能够保证效率,要合理的结合才能做到最好的效率。

批量

谈到批量就不得不提生产者消费者模型。但生产者消费者模型中最大的痛点是:消费者到底应该何时进行消费。

大处着眼来看,消费动作都是事件驱动的。主要事件包括:

  1. 攒够了一定数量。

  2. 到达了一定时间。

  3. 队列里有新的数据到来。

对于及时性要求高的数据,可用采用方式3来完成,比如客户端向服务端投递数据。只要队列有数据,就把队列中的所有数据刷出,否则将自己挂起,等待新数据的到来。

在第一次把队列数据往外刷的过程中,又积攒了一部分数据,第二次又可以形成一个批量。

伪代码如下:

Executor executor = Executors.newFixedThreadPool(4);
final BlockingQueue<Message> queue = new ArrayBlockingQueue<>();
private Runnable task = new Runnable({//这里由于共享队列,Runnable可以复用,故做成全局的
   public void run(){
      List<Message> messages  = new ArrayList<>(20);
      queue.drainTo(messages20);
      doSend(messages);//阻塞,在这个过程中会有新的消息到来,如果4个线程都占满,队列就有机会囤新的消息
   }
});
public void send(Message message){
    queue.offer(message);
    executor.submit(task)
}

最后啰嗦几句,曾经有人问我,为什么网络请求小包合并成大包会提高性能?主要原因有两个:

  1. 减少无谓的请求头,如果你每个请求只有几字节,而头却有几十字节,无疑效率非常低下。

  2. 减少回复的ack包个数。把请求合并后,ack包数量必然减少,确认和重发的成本就会降低。

push 还是 pull

上文提到的消息队列,大多是针对push模型的设计。

现在市面上有很多经典的也比较成熟的pull模型的消息队列,如Kafka、MetaQ等。这跟JMS中传统的push方式有很大的区别,可谓另辟蹊径。

我们简要分析下push和pull模型各自存在的利弊。

慢消费

慢消费无疑是push模型最大的致命伤,穿成流水线来看,如果消费者的速度比发送者的速度慢很多,势必造成消息在broker的堆积。

假设这些消息都是有用的无法丢弃的,消息就要一直在broker端保存。

当然这还不是最致命的,最致命的是broker给consumer推送一堆consumer无法处理的消息,consumer不是reject就是error,然后来回踢皮球。

反观pull模式,consumer可以按需消费,不用担心自己处理不了的消息来骚扰自己,而broker堆积消息也会相对简单,无需记录每一个要发送消息的状态,只需要维护所有消息的队列和偏移量就可以了。

所以对于建立索引等慢消费,消息量有限且到来的速度不均匀的情况,pull模式比较合适。

消息延迟与忙等

这是pull模式最大的短板。由于主动权在消费方,消费方无法准确地决定何时去拉取最新的消息。

如果一次pull取到消息了还可以继续去pull,如果没有pull取到则需要等待一段时间重新pull。

在阿里的RocketMq里,有一种优化的做法-长轮询,来平衡推拉模型各自的缺点。

基本思路是: 消费者如果尝试拉取失败,不是直接return, 而是把连接挂在那里wait,服务端如果有新的消息到来,把连接notify起来,这也是不错的思路。

但海量的长连接block对系统的开销还是不容小觑的,还是要合理的评估时间间隔,给wait加一个时间上限比较好~

顺序消息

如果push模式的消息队列,支持分区,单分区只支持一个消费者消费,并且消费者只有确认一个消息消费后才能push送另外一个消息,还要发送者保证全局顺序唯一,听起来也能做顺序消息,但成本太高了,尤其是必须每个消息消费确认后才能发下一条消息,这对于本身堆积能力和慢消费就是瓶颈的push模式的消息队列,简直是一场灾难。

反观pull模式,如果想做到全局顺序消息,就相对容易很多:

  1. producer对应partition,并且单线程。

  2. consumer对应partition,消费确认(或批量确认),继续消费即可。

所以对于日志push送这种最好全局有序,但允许出现小误差的场景,pull模式非常合适。

如果你不想看到通篇乱套的日志~~

参考资料

导论

消息队列设计精要

MQ 的设计和实现

中间件

MetaQ

Akka

ZeroMQ