何时需要 MQ
当你需要使用消息队列时,首先需要考虑它的必要性。
可以使用mq的场景有很多,最常用的几种,是做业务解耦/最终一致性/广播/错峰流控等。
反之,如果需要强一致性,关注业务逻辑的处理结果,则RPC显得更为合适。
解耦
解耦是消息队列要解决的最本质问题。所谓解耦,简单点讲就是一个事务,只关心核心的流程。
而需要依赖其他系统但不那么重要的事情,有通知即可,无需等待结果。换句话说,基于消息的模型,关心的是“通知”,而非“处理”。
最终一致性
最终一致性指的是两个系统的状态保持一致,要么都成功,要么都失败。
当然有个时间限制,理论上越快越好,但实际上在各种异常的情况下,可能会有一定延迟达到最终一致状态,但最后两个系统的状态是一样的。
广播
消息队列的基本功能之一是进行广播。如果没有消息队列,每当一个新的业务方接入,我们都要联调一次新接口。
有了消息队列,我们只需要关心消息是否送达了队列,至于谁希望订阅,是下游的事情,无疑极大地减少了开发和联调的工作量。
错峰与流控
试想上下游对于事情的处理能力是不同的。
比如,Web前端每秒承受上千万的请求,并不是什么神奇的事情,只需要加多一点机器,再搭建一些LVS负载均衡设备和Nginx等即可。但
数据库的处理能力却十分有限,即使使用SSD加分库分表,单机的处理能力仍然在万级。
由于成本的考虑,我们不能奢求数据库的机器数量追上前端。
综述
我们之所以要设计一个消息队列,并且配备broker,无外乎要做两件事情:
-
消息的转储,在更合适的时间点投递,或者通过一系列手段辅助消息最终能送达消费机。
-
规范一种范式和通用的模式,以满足解耦、最终一致性、错峰等需求。
掰开了揉碎了看,最简单的消息队列可以做成一个消息转发器,把一次RPC做成两次RPC。
发送者把消息投递到服务端(以下简称broker),服务端再将消息转发一手到接收端,就是这么简单。
一般来讲,设计消息队列的整体思路是先build一个整体的数据流,例如producer发送给broker,broker发送给consumer,consumer回复消费确认,broker删除/备份消息等。
利用RPC将数据流串起来。然后考虑RPC的高可用性,尽量做到无状态,方便水平扩展。
之后考虑如何承载消息堆积,然后在合适的时机投递消息,而处理堆积的最佳方式,就是存储,存储的选型需要综合考虑性能/可靠性和开发维护成本等诸多因素。
为了实现广播功能,我们必须要维护消费关系,可以利用zk/config server等保存消费关系。
在完成了上述几个功能后,消息队列基本就实现了。然后我们可以考虑一些高级特性,如可靠投递,事务特性,性能优化等。
基本功能
RPC 通信协议
刚才讲到,所谓消息队列,无外乎两次RPC加一次转储,当然需要消费端最终做消费确认的情况是三次RPC。
高可用
依赖 RPC 本身的高可用。
而消息队列的高可用,只要保证broker接受消息和确认消息的接口是幂等的,并且consumer的几台机器处理消息是幂等的,这样就把消息队列的可用性,转交给RPC框架来处理了。
- 幂等
那么怎么保证幂等呢?最简单的方式莫过于共享存储。
broker多机器共享一个DB或者一个分布式文件/kv系统,则处理消息自然是幂等的。
就算有单点故障,其他节点可以立刻顶上。另外failover可以依赖定时任务的补偿,这是消息队列本身天然就可以支持的功能。
服务端承载消息堆积的能力
把消息存储下来是一个很必要的过程。
但归结起来,主要有持久化和非持久化两种。
持久化的形式能更大程度地保证消息的可靠性(如断电等不可抗外力),并且理论上能承载更大限度的消息堆积(外存的空间远大于内存)。
但并不是每种消息都需要持久化存储。很多消息对于投递性能的要求大于可靠性的要求,且数量极大(如日志)。
这时候,消息不落地直接暂存内存,尝试几次failover,最终投递出去也未尝不可。
存储子系统的选择
如果你们的消息队列是用来支持支付/交易等对可靠性要求非常高,但对性能和量的要求没有这么高,而且没有时间精力专门做文件存储系统的研究,DB是最好的选择。
分布式KV(如MongoDB,HBase)等,或者持久化的Redis,由于其编程接口较友好,性能也比较可观,如果在可靠性要求不是那么高的场景,也不失为一个不错的选择。
比如 activeMQ 底层的 LevelDB。
消费关系解析
市面上的消息队列定义了一堆让人晕头转向的名词,
如JMS 规范中的Topic/Queue,Kafka里面的Topic/Partition/ConsumerGroup,RabbitMQ里面的Exchange等等。
抛开现象看本质,无外乎是单播与广播的区别。
队列高级特性设计
可靠投递(最终一致性)
方案说简单也简单,就是每当要发生不可靠的事情(RPC等)之前,先将消息落地,然后发送。
当失败或者不知道成功失败(比如超时)时,消息状态是待发送,定时任务不停轮询所有待发送消息,最终一定可以送达。
ps: 有些类似于分布式事务中的本地消息表。首先存储,然后操作。
- 具体流程
-
producer往broker发送消息之前,需要做一次落地。
-
请求到server后,server确保数据落地后再告诉客户端发送成功。
-
支持广播的消息队列需要对每个待发送的endpoint,持久化一个发送状态,直到所有endpoint状态都OK才可删除消息。
消费确认
把消息的送达和消息的处理分开,这样才真正的实现了消息队列的本质-解耦。
所以,允许消费者主动进行消费确认是必要的。
当然,对于没有特殊逻辑的消息,默认Auto Ack也是可以的,但一定要允许消费方主动ack。
重复消息和顺序消息
顺序消息更加苛刻。
-
允许消息丢失。
-
从发送方到服务方到接受者都是单点单线程。
一般来讲,一个主流消息队列的设计范式里,应该是不丢消息的前提下,尽量减少重复消息,不保证消息的投递顺序。
重复消息
谈到重复消息,主要是两个话题:
-
如何鉴别消息重复,并幂等的处理重复消息。
-
一个消息队列如何尽量减少重复消息的投递。
幂等的处理消息是一门艺术,因为种种原因重复消息或者错乱的消息还是来到了,说两种通用的解决方案:
-
版本号。
-
状态机。
版本号
举个简单的例子,一个产品的状态有上线/下线状态。如果消息1是下线,消息2是上线。不巧消息1判重失败,被投递了两次,且第二次发生在2之后,如果不做重复性判断,显然最终状态是错误的。
但是,如果每个消息自带一个版本号。上游发送的时候,标记消息1版本号是1,消息2版本号是2。如果再发送下线消息,则版本号标记为3。下游对于每次消息的处理,同时维护一个版本号。
每次只接受比当前版本号大的消息。初始版本为0,当消息1到达时,将版本号更新为1。消息2到来时,因为版本号>1.可以接收,同时更新版本号为2.当另一条下线消息到来时,如果版本号是3.则是真实的下线消息。如果是1,则是重复投递的消息。
如果业务方只关心消息重复不重复,那么问题就已经解决了。但很多时候另一个头疼的问题来了,就是消息顺序如果和想象的顺序不一致。比如应该的顺序是12,到来的顺序是21。则最后会发生状态错误。
参考TCP/IP协议,如果想让乱序的消息最后能够正确的被组织,那么就应该只接收比当前版本号大一的消息。并且在一个session周期内要一直保存各个消息的版本号。
如果到来的顺序是21,则先把2存起来,待2到来后,再处理1,这样重复性和顺序性要求就都达到了。
状态机
基于版本号来处理重复和顺序消息听起来是个不错的主意,但凡事总有瑕疵。使用版本号的最大问题是:
-
对发送方必须要求消息带业务版本号。
-
下游必须存储消息的版本号,对于要严格保证顺序的。
举例子说明,假设产品本身状态是下线,1是上线消息,2是下线消息,3是上线消息,正常情况下,消息应该的到来顺序是123,但实际情况下收到的消息状态变成了3123。
那么下游收到3消息的时候,判断状态机流转是下线->上线,可以接收消息。然后收到消息1,发现是上线->上线,拒绝接收,要求重发。然后收到消息2,状态是上线->下线,于是接收这个消息。
此时无论重发的消息1或者3到来,还是可以接收。另外的重发,在一定次数拒绝后停止重发,业务正确。
消息队列对于重复消息的处理
减少重复消息的关键步骤:
-
broker记录MessageId,直到投递成功后清除,重复的ID到来不做处理,这样只要发送者在清除周期内能够感知到消息投递成功,就基本不会在server端产生重复消息。
-
对于server投递到consumer的消息,由于不确定对端是在处理过程中还是消息发送丢失的情况下,有必要记录下投递的IP地址。决定重发之前询问这个IP,消息处理成功了吗?如果询问无果,再重发。
事务
满足事务的一致性特征,则必须要么都不进行,要么都能成功。
解决方案从大方向上有两种:
-
两阶段提交,分布式事务。
-
本地事务,本地落地,补偿发送。
分布式事务的成本太高。
一个完整的消息队列应该定义清楚自己可以投递的消息类型,如事务型消息,本地非持久型消息,以及服务端不落地的非可靠消息等。
对不同的业务场景做不同的选择。另外事务的使用应该尽量低成本、透明化,可以依托于现有的成熟框架,如Spring的声明式事务做扩展。
业务方只需要使用 @Transactional
标签即可。
性能相关
异步/同步
首先澄清一个概念,异步,同步和 oneway 是三件事。
异步,归根结底你还是需要关心结果的,但可能不是当时的时间点关心,可以用轮询或者回调等方式处理结果;
同步是需要当时关心的结果的;
而 oneway 是发出去就不管死活的方式,这种对于某些完全对可靠性没有要求的场景还是适用的,但不是我们重点讨论的范畴。
场景
- 客户端同步服务端异步
Future<Result> future = request(server);//server立刻返回future
synchronized(future){
while(!future.isDone()){
future.wait();//server处理结束后会notify这个future,并修改isdone标志
}
}
return future.get();
- 客户端同步服务端同步
Result result = request(server);
- 客户端异步服务端同步(这里用线程池的方式)
Future<Result> future = executor.submit(new Callable(){public void call<Result>(){
result = request(server);
}})
return future;
- 客户端异步服务端异步
Future<Result> future = request(server);//server立刻返回future
return future
服务端异步优点
那么,服务端使用异步最大的好处是什么呢?
说到底,是解放了线程和I/O。
试想服务端有一堆I/O等待处理,如果每个请求都需要同步响应,每条消息都需要结果立刻返回,那么就几乎没法做I/O合并。
而如果用异步的方式返回给客户端future,就可以有机会进行I/O的合并,把几个批次发过来的消息一起落地(这种合并对于MySQL等允许batch insert的数据库效果尤其明显),并且彻底释放了线程。不至于说来多少请求开多少线程,能够支持的并发量直线提高。
总结一句,同步能够保证结果,异步能够保证效率,要合理的结合才能做到最好的效率。
批量
谈到批量就不得不提生产者消费者模型。但生产者消费者模型中最大的痛点是:消费者到底应该何时进行消费。
大处着眼来看,消费动作都是事件驱动的。主要事件包括:
-
攒够了一定数量。
-
到达了一定时间。
-
队列里有新的数据到来。
对于及时性要求高的数据,可用采用方式3来完成,比如客户端向服务端投递数据。只要队列有数据,就把队列中的所有数据刷出,否则将自己挂起,等待新数据的到来。
在第一次把队列数据往外刷的过程中,又积攒了一部分数据,第二次又可以形成一个批量。
伪代码如下:
Executor executor = Executors.newFixedThreadPool(4);
final BlockingQueue<Message> queue = new ArrayBlockingQueue<>();
private Runnable task = new Runnable({//这里由于共享队列,Runnable可以复用,故做成全局的
public void run(){
List<Message> messages = new ArrayList<>(20);
queue.drainTo(messages,20);
doSend(messages);//阻塞,在这个过程中会有新的消息到来,如果4个线程都占满,队列就有机会囤新的消息
}
});
public void send(Message message){
queue.offer(message);
executor.submit(task)
}
最后啰嗦几句,曾经有人问我,为什么网络请求小包合并成大包会提高性能?主要原因有两个:
-
减少无谓的请求头,如果你每个请求只有几字节,而头却有几十字节,无疑效率非常低下。
-
减少回复的ack包个数。把请求合并后,ack包数量必然减少,确认和重发的成本就会降低。
push 还是 pull
上文提到的消息队列,大多是针对push模型的设计。
现在市面上有很多经典的也比较成熟的pull模型的消息队列,如Kafka、MetaQ等。这跟JMS中传统的push方式有很大的区别,可谓另辟蹊径。
我们简要分析下push和pull模型各自存在的利弊。
慢消费
慢消费无疑是push模型最大的致命伤,穿成流水线来看,如果消费者的速度比发送者的速度慢很多,势必造成消息在broker的堆积。
假设这些消息都是有用的无法丢弃的,消息就要一直在broker端保存。
当然这还不是最致命的,最致命的是broker给consumer推送一堆consumer无法处理的消息,consumer不是reject就是error,然后来回踢皮球。
反观pull模式,consumer可以按需消费,不用担心自己处理不了的消息来骚扰自己,而broker堆积消息也会相对简单,无需记录每一个要发送消息的状态,只需要维护所有消息的队列和偏移量就可以了。
所以对于建立索引等慢消费,消息量有限且到来的速度不均匀的情况,pull模式比较合适。
消息延迟与忙等
这是pull模式最大的短板。由于主动权在消费方,消费方无法准确地决定何时去拉取最新的消息。
如果一次pull取到消息了还可以继续去pull,如果没有pull取到则需要等待一段时间重新pull。
在阿里的RocketMq里,有一种优化的做法-长轮询,来平衡推拉模型各自的缺点。
基本思路是: 消费者如果尝试拉取失败,不是直接return, 而是把连接挂在那里wait,服务端如果有新的消息到来,把连接notify起来,这也是不错的思路。
但海量的长连接block对系统的开销还是不容小觑的,还是要合理的评估时间间隔,给wait加一个时间上限比较好~
顺序消息
如果push模式的消息队列,支持分区,单分区只支持一个消费者消费,并且消费者只有确认一个消息消费后才能push送另外一个消息,还要发送者保证全局顺序唯一,听起来也能做顺序消息,但成本太高了,尤其是必须每个消息消费确认后才能发下一条消息,这对于本身堆积能力和慢消费就是瓶颈的push模式的消息队列,简直是一场灾难。
反观pull模式,如果想做到全局顺序消息,就相对容易很多:
-
producer对应partition,并且单线程。
-
consumer对应partition,消费确认(或批量确认),继续消费即可。
所以对于日志push送这种最好全局有序,但允许出现小误差的场景,pull模式非常合适。
如果你不想看到通篇乱套的日志~~