23 消息轨迹、ACL 与多副本搭建

消息轨迹

消息轨迹含义

一条消息什么时候由哪台机器产生的、发送的耗时、消息大小、发送状态、存储在哪个 Broker 上、什么时候存储的以及存储在哪台 Broker 上、什么时候消费的、消费状态等信息,这些信息即消息轨迹,用于追踪消息从诞生到被消费的整个生命周期。

这些信息对于业务同学排查定位有着重要的意义,发送和消费往往在不同的业务部门。有了消息轨迹后一条消息有没有发送,发送成功了没,有没有消费一目了然,降低了彼此的沟通成本。

消息轨迹使用

1. Broker 设置

RocketMQ 的默认消息轨迹功能默认是关闭的,可以将 Broker 属性 traceTopicEnable 设置为 true 开启。消息轨迹默认存储在 RMQ_SYS_TRACE_TOPIC 的主题中,可以通过 msgTraceTopicName 修改。 属性 默认参数 traceTopicEnable false msgTraceTopicName RMQ_SYS_TRACE_TOPIC

2. 发送端使用

发送轨迹 API public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic){ this(null, producerGroup, null, enableMsgTrace, customizedTraceTopic); }

说明:enableMsgTrace 是否开启发送轨迹,默认 false;customizedTraceTopic 设置收集消息轨迹的自定义主题,默认为 RMQ_SYS_TRACE_TOPIC。

发送代码示例 public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer(“ProducerGroupName”,true); producer.setNamesrvAddr(“127.0.0.1:9876”); producer.start(); for (int i = 0; i < 1; i++) try { Message msg = new Message(“TopicTest”, “TagA”, “OrderID111”, “Hello world”.getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf(“%s%n”, sendResult); } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); }

说明:创建 DefaultMQProducer 时将 enableMsgTrace 设置为 true 开启发送消息轨迹。

3. 消费端使用

消费轨迹 API public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic) { this(null, consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic); }

说明:enableMsgTrace 是否开启消费轨迹,默认 false;customizedTraceTopic 设置收集消息轨迹的自定义主题,默认为 RMQ_SYS_TRACE_TOPIC。

消费代码示例 public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“CID_JODIE_1”,true); consumer.setNamesrvAddr(“127.0.0.1:9876”); consumer.subscribe(“TopicTest”, “/*”); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp(“20181109221800”); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); }

说明:创建 DefaultMQPushConsumer 将 enableMsgTrace 设置为 true 开启消费消息轨迹。

4. 消息轨迹效果

通过发送和消费一条消息,在 RocketMQ-Console 中看下消息轨迹的效果截图。

发送消息内容 SendResult [sendStatus=SEND_OK, msgId=A9FE1075810A18B4AAC24A40738B0000, offsetMsgId=A9FE107500002A9F0000000000002147, messageQueue=MessageQueue [topic=TopicTest, brokerName=liangyong, queueId=1], queueOffset=2]

消费消息内容

Receive New Messages: [MessageExt [brokerName=liangyong, queueId=1, storeSize=189, queueOffset=2, sysFlag=0, bornTimestamp=1600135337872, bornHost=/169.254.16.117:65532, storeTimestamp=1600135337883, storeHost=/169.254.16.117:10911, msgId=A9FE107500002A9F0000000000002147, commitLogOffset=8519, bodyCRC=198614610, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=’TopicTest’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, KEYS=OrderID111, CONSUME_START_TIME=1600135337915, UNIQ_KEY=A9FE1075810A18B4AAC24A40738B0000, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100], transactionId=’null’}]]

消息轨迹展现

在 RocketMQ 控制台,可以通过 message key 或者 message id 检索消息内容,如下图:

img

点击 MESSAGE TRACE DETAIL 可以查看消息轨迹,如下图:

img

消息轨迹原理

发送轨迹原理:在消息发送前与发送后收集指标信息,并将指标信息异步发送到轨迹主题。

img

消费轨迹原理:消费的消息轨迹有两部分,一部分在拉取消息后处理消息前收集指标异步发送都轨迹主题;另一部分处理消息后收集消息指标异步发送到轨迹主题。

img

轨迹格式说明

消息轨迹类型有三种,Pub 指发送轨迹,SubBefore 指消费前轨迹,SubAfter 指消费后轨迹。

发送轨迹 Pub 名称 说明 TraceType Pub 表示发送轨迹 timeStamp 存储时间 regionId 机房可用区,默认为 DefaultRegion(目前没有使用) groupName 生产者组 producerGroup topic 主题名称 msgId 消息 ID,由客户端生成 tags 消息 tag keys 消息 key storeHost 消息存储 Broker IP bodyLength 消息体大小 costTime 发送消息耗时 msgType 消息类型:普通消息(Normal_Msg)、事物半消息(Trans_Msg_Half)、 事物提交消息(Trans_msg_Commit)、延迟消息(Delay_Msg) offsetMsgId 消息 Id,由 Broker 生成 isSuccess 发送是否成功,true 表示成功、false 表示失败

消费前轨迹 SubBefore

名称 说明 traceType SubBefore 表示消费前轨迹 timeStamp 消息存储时间 regionId 机房可用区(目前未使用) groupName 消费组名称 requestId 请求标识 msgId 消息 Id retryTimes 重试次数 keys 消息 key

消费前轨迹 SubAfter

名称 说明 traceType SubAfter 表示消费后轨迹 requestId 请求标识 msgId 消息 Id costTime 消费耗时 isSuccess 消费结果,true 消费成功、false 消费失败 keys 消息 key contextCode Broker 返回的消费状态,0:SUCCESS,1:TIME_OUT,2:EXCEPTION,3:RETURNNULL,4:FAILED

消息轨迹结语

  • 在生产环境中使用消息轨迹,可以将 MQ 集群的一台 Broker 用于收集消息轨迹,避免轨迹消息对集群性能造成影响。
  • 开源版本的消息轨迹中缺少消费的 IP 信息,即:我们不能查询到消息被哪个机器消费掉了。
  • 开源版本中的消息轨迹组织格式用 char 字符拼接,解析使用数组,扩展性和兼容性不太友好。
  • 基于此两位作者负责的 RocketMQ 集群中均未开启轨迹功能。

ACL

ACL 含义

访问控制表(Access Control List,ACL)描述用户或角色对资源的访问控制权限,RocketMQ 中的 ACL 见下表说明。

RocketMQ 中的 ACL 含义说明: 含义 说明 用户 在 plain_acl.yml 配置文件用 accessKey 表示 角色 admin 和其他角色 资源 包括主题和消费组 权限 DENY 表示无权限 ANY 表示拥有 PUB 或者 SUB 权限 PUB 表示拥有主题发送权限 SUB 表示拥有消费组订阅权限

ACL 使用示例

aclEnable = true 添加到 Broker 配置文件中,另外添加

${ROCKETMQ_HOME}/conf/plain_acl.yml 文件,用于 ACL 控制。

1. Broker 配置 brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH traceTopicEnable = true aclEnable = true

说明

参数 含义 aclEnable 默认 false,开启 ACL 需要设置为 true ROCKETMQ_HOME 可以通过 -Drocketmq.home.dir 指定 MQ 根目录 ACL fileName 默认 /conf/plain_acl.yml,可以通过 -Drocketmq.acl.plain.file 指定 ACL 文件名称

2. plain_acl.yml 配置

ACL 配置文件由全局白名单配置(globalWhiteRemoteAddresses)和账户配置(accounts)两部分构成。 globalWhiteRemoteAddresses: accounts: - accessKey: RocketMQ secretKey: 12345678 whiteRemoteAddress: admin: false defaultTopicPerm: DENY defaultGroupPerm: SUB topicPerms: - TopicTes1=DENY - TopicTest2=PUB|SUB groupPerms: - consumerTest=DENY - accessKey: rocketmq2 secretKey: 12345678 whiteRemoteAddress: 192.168.1./* admin: true

说明

参数 含义 globalWhiteRemoteAddresses 全局白名单配置,策略如下: 空:忽略白名单,继续执行下面校验 全匹配模式:全部放行不会执行后面校验 例如:/* 或 ...::::::: 多 IP 模式:表示白名单 IP 在设置区间段的放行 例如:192.168.0.{1,2} 或 192.168.1.1,192.168.1.2 或 192.168./. 或 *192.168.1-10.5-50 accessKey 唯一用户标识 secretKey 访问密码 whiteRemoteAddress 用户级白名单,格式同 globalWhiteRemoteAddresses admin 是否为管理员,管理员拥有所有资源访问权限 defaultTopicPerm 默认主题权限,默认值 DENY defaultGroupPerm 默认消费组权限,默认值 DENY topicPerms 详细的主题权限 groupPerms 详细的消费组权限

3. ACL 发送示例

在上面的配置文件中,将 TopicTes1 设置了 DENY 权限,即禁止发送和消费;将 TopicTest2 设置成了 PUB SUB 权限,即允许发送和订阅权限。下面例子尝试向主题 TopicTes1 发送消息,观察其是否可以成功。

禁止发送示例 public class AclSendTest { private static final String ACL_ACCESS_KEY = “RocketMQ”; private static final String ACL_SECRET_KEY = “12345678”; public static void main(String[] args) throws MQClientException, InterruptedException { producer(); } public static void producer() throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer(“ProducerGroupName”, getAclRPCHook()); producer.setNamesrvAddr(“127.0.0.1:9876”); producer.start(); for (int i = 0; i < 1; i++) try { { Message msg = new Message(“TopicTest1”, “TagA”, “OrderID188”, “Hello world”.getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf(“%s%n”, sendResult); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY)); } }

禁止发送截图

img

禁止发送说明

用户 RocketMQ 向主题 TopicTes1 发送消息时抛出 AclException,拒绝访问,如果将代码中主题换成 TopicTes2,则可以发送成功,接着看下文。

4. ACL 消费示例

在上面的配置文件中,将 consumerTest 设置了 DENY 权限,即禁止消费消息。由于 TopicTes2 设置为允许发送,我们下面尝试向 TopicTes2 发送一条消息,consumerTest 订阅了 TopicTes2 观察其是否可以消费。

允许发送示例 public class AclSendTest { private static final String ACL_ACCESS_KEY = “RocketMQ”; private static final String ACL_SECRET_KEY = “12345678”; public static void main(String[] args) throws MQClientException, InterruptedException { producer(); } public static void producer() throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer(“ProducerGroupName”, getAclRPCHook()); producer.setNamesrvAddr(“127.0.0.1:9876”); producer.start(); for (int i = 0; i < 1; i++) try { { Message msg = new Message(“TopicTest2”, “TagA”, “OrderID188”, “Hello world”.getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf(“%s%n”, sendResult); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY)); } }

允许发送结果

SendResult [sendStatus=SEND_OK, msgId=C0A800667FB218B4AAC2663AB66F0000, offsetMsgId=C0A8006600002A9F00000000000085EA, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0], queueOffset=2]

禁止消费示例

public class AclConsumeTest { private static final String ACL_ACCESS_KEY = “RocketMQ”; private static final String ACL_SECRET_KEY = “12345678”; public static void main(String[] args) throws MQClientException, InterruptedException { pushConsumer(); } public static void pushConsumer() throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“consumerTest”, getAclRPCHook(), new AllocateMessageQueueAveragely()); consumer.setNamesrvAddr(“127.0.0.1:9876”); consumer.subscribe(“TopicTest2”, “/*”); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp(“20180422221800”); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); printBody(msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY)); } private static void printBody(List msg) { if (msg == null || msg.size() == 0) return; for (MessageExt m : msg) { if (m != null) { System.out.printf("msgId : %s body : %s \n\r", m.getMsgId(), new String(m.getBody())); } } } }

禁止消费截图

img

禁止消费说明

我们向 TopicTest2 成功发送了一条消息,但由于消费组 consumerTest 被设置成禁止消费,所有未能收到该消息。

ACL 命令汇总

RocketMQ 提供了一系列命令动态更新 Acl 配置文件,使设置的权限及时生效。

1. 获取 ACL 配置版本

使用 clusterAclConfigVersion 命令查看版本信息。

参数说明 参数 说明 -b Broker 地址,更新特定的 Broker -c 集群名称,更新集群中的所有 Broker -n namesrv 地址

命令示例

$ bin/mqadmin clusterAclConfigVersion -n x.x.x.x:9876 -c DefaultCluster RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0). RocketMQLog:WARN Please initialize the logger system properly. /#Cluster Name /#Broker Name /#Broker Addr /#AclConfigVersionNum /#AclLastUpdateTime DefaultCluster broker-a x.x.x.x:10911 0 2020-09-20 22:42:59 get cluster’s plain access config version success.

2. 获取 Acl 权限配置

使用 getAccessConfigSubCommand 获取 ACL 的配置信息。

参数说明 参数 说明 -b Broker 地址,更新特定的 Broker -c 集群名称,更新集群中的所有 Broker -n namesrv 地址

命令示例

$ bin/mqadmin getAccessConfigSubCommand -n x.x.x.x:9876 -c DefaultCluster RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0). RocketMQLog:WARN Please initialize the logger system properly. globalWhiteRemoteAddresses: [10.10.103./, 192.168.0./] accounts: accessKey : RocketMQ secretKey : 12345678 whiteRemoteAddress: admin : false defaultTopicPerm : DENY defaultGroupPerm : SUB topicPerms : [topicA=DENY, topicB=PUB SUB, topicC=SUB] groupPerms : [groupA=DENY, groupB=PUB SUB, groupC=SUB] accessKey : rocketmq2 secretKey : 12345678 whiteRemoteAddress: 192.168.1./* admin : true defaultTopicPerm : defaultGroupPerm : topicPerms : groupPerms :

3. 更新全局白名单

使用 updateGlobalWhiteAddr 对 ACL 的全局白名单 globalWhiteRemoteAddresses 进行变更。

参数说明 参数 说明 -b Broker 地址,更新特定的 Broker -c 集群名称,更新集群中的所有 Broker -n namesrv 地址 -g 全局白名单值,例如:10.10.103.,192.168.0.

命令示例

$ bin/mqadmin updateGlobalWhiteAddr -n x.x.x.x:9876 -c DefaultCluster -g 10.10.113./,192.168.20./ RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0). RocketMQLog:WARN Please initialize the logger system properly. update global white remote addresses to x.x.x.x:10911 success.

查看生效

$ bin/mqadmin getAccessConfigSubCommand -n x.x.x.x:9876 -c DefaultCluster RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0). RocketMQLog:WARN Please initialize the logger system properly. globalWhiteRemoteAddresses: [10.10.113./, 192.168.20./] accounts: accessKey : RocketMQ secretKey : 12345678 whiteRemoteAddress: admin : false defaultTopicPerm : DENY defaultGroupPerm : SUB topicPerms : [topicA=DENY, topicB=PUB SUB, topicC=SUB] groupPerms : [groupA=DENY, groupB=PUB SUB, groupC=SUB] accessKey : rocketmq2 secretKey : 12345678 whiteRemoteAddress: 192.168.1./* admin : true defaultTopicPerm : defaultGroupPerm : topicPerms : groupPerms :

说明:全局白名单已经被更新。

4. 更新用户配置

对于用户账户的配置的变更通过 updateAclConfig 来实现。

参数说明 参数 说明 -a 指定 accessKey,变更哪个用户的配置 -b Broker 地址,更新特定的 Broker -c 集群名称,更新集群中的所有 Broker -n namesrv 地址 -g 设置 groupPerms 消费组权限,格式为:

groupD=DENY,groupD=SUB -i 设置 Acl 文件中 defaultTopicPerm 权限 -m 设置 Acl 文件中 admin 权限 -s 设置 Acl 文件中 secretKey 密钥值 -t 设置 topicPerms 主题权限,格式为:topicA=DENY,topicD=SUB -u 设置 Acl 文件中的默认消费组 defaultGroupPerm 权限 -w 设置 Acl 文件中该用户下的白名单权限 whiteRemoteAddress

命令示例

$ bin/mqadmin updateAclConfig -n x.x.x.x:9876 -c DefaultCluster -a RocketMQ -s 87654321 -t testTopicA=DENY,testTopicb=SUB RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0). RocketMQLog:WARN Please initialize the logger system properly. create or update plain access config to x.x.x.x:10911 success.

查看生效

$ bin/mqadmin getAccessConfigSubCommand -n uat-mq2.ttbike.com.cn:9876 -c DefaultCluster RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0). RocketMQLog:WARN Please initialize the logger system properly. globalWhiteRemoteAddresses: [10.10.113./, 192.168.20./] accounts: accessKey : rocketmq2 secretKey : 12345678 whiteRemoteAddress: 192.168.1./* admin : true defaultTopicPerm : defaultGroupPerm : topicPerms : groupPerms : accessKey : RocketMQ secretKey : 87654321 whiteRemoteAddress: admin : false defaultTopicPerm : DENY defaultGroupPerm : SUB topicPerms : [testTopicA=DENY, testTopicb=SUB] groupPerms : [groupA=DENY, groupB=PUB SUB, groupC=SUB]

说明:用户 RocketMQ 的密钥 secretKey 和主题权限 topicPerms 已变更生效。

5. 删除用户配置

通过 deleteAccessConfig 删除指定用户的 ACL 配置信息。

参数说明 参数 说明 -b Broker 地址,更新特定的 Broker -c 集群名称,更新集群中的所有 Broker -n namesrv 地址 -a 指定特定用户 accessKey

命令示例

$ bin/mqadmin deleteAccessConfig -n x.x.x.x:9876 -c DefaultCluster -a RocketMQ RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0). RocketMQLog:WARN Please initialize the logger system properly. delete plain access config account to x.x.x.x:10911 success.

查看生效

$ bin/mqadmin getAccessConfigSubCommand -n x.x.x.x:9876 -c DefaultCluster RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0). RocketMQLog:WARN Please initialize the logger system properly. globalWhiteRemoteAddresses: [10.10.113./, 192.168.20./] accounts: accessKey : rocketmq2 secretKey : 12345678 whiteRemoteAddress: 192.168.1./* admin : true defaultTopicPerm : defaultGroupPerm : topicPerms : groupPerms :

说明:用户 RocketMQ 的相关 Acl 配置已被全部删除。

ACL 原理简述

1. 规则加载

规则配置在 plain_acl.yml 文件中,需要加载到 Broker 缓存中使其生效。启动时会加载 acl 文件的内容,当其变更了也需要动态加载规则内容,详见如下流程。

img

2. 权限校验

注册的钩子程序通过 NettyServerHandler 实现,在 Broker 当前的 channel 接收到客户端消息时执行校验逻辑,入口为 NettyServerHandler/#channelRead0() 以及 processRequestCommand/#doBeforeRpcHooks。规则校验的入口位于 PlainAccessValidator/#validate 方法,下面是校验简图。

img

ACL 结语

  • 评估是否需要 ACL 通常大部分场景是不需要的,如果对数据敏感可以通过对内容加密实现。
  • 如果几千个资源(主题和消费组)都配置了 ACL 那么配置文件是庞大的。
  • 如果要用建议局部使用,比如:资金往来的主题、消费组等。
  • 基于此两位作者负责的 RocketMQ 集群均未开启 ACL 功能。

多副本搭建

多副本意义

RocketMQ 开源版本在 4.5.0 版本开始支持多副本(DLedger),在以前的版本中只支持主从模式。

主从模式存在的问题:

  • 如果主节点挂掉了不能动态切换到从节点,这一组 Broker 节点不能提供写入服务;
  • 设置主从异步复制模式时,如果主节点意外挂掉,数据可能没有全部复制到从节点,存在数据丢失风险。

多副本使用 Raft 协议在节点意外掉线后能够完成自动选主,提高集群的高可用和保证数据的一致性。

多副本搭建

由于 DLedger 基于 Raft 协议开发的功能,需要过半数选举,最少 3 个节点组成一个 Raft 组。

broker-n0.conf brokerClusterName = RaftCluster brokerName=RaftNode00 listenPort=30911 namesrvAddr=127.0.0.1:9876 storePathRootDir=/tmp/rmqstore/node00 storePathCommitLog=/tmp/rmqstore/node00/commitlog enableDLegerCommitLog=true dLegerGroup=RaftNode00 dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913 /#/#/# must be unique dLegerSelfId=n0 sendMessageThreadPoolNums=16

broker-n1.conf

brokerClusterName = RaftCluster brokerName=RaftNode00 listenPort=30921 namesrvAddr=127.0.0.1:9876 storePathRootDir=/tmp/rmqstore/node01 storePathCommitLog=/tmp/rmqstore/node01/commitlog enableDLegerCommitLog=true dLegerGroup=RaftNode00 dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913 /#/#/# must be unique dLegerSelfId=n1 sendMessageThreadPoolNums=16

broker-n2.conf

brokerClusterName = RaftCluster brokerName=RaftNode00 listenPort=30931 namesrvAddr=127.0.0.1:9876 storePathRootDir=/tmp/rmqstore/node02 storePathCommitLog=/tmp/rmqstore/node02/commitlog enableDLegerCommitLog=true dLegerGroup=RaftNode00 dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913 /#/#/# must be unique dLegerSelfId=n2 sendMessageThreadPoolNums=16

启动三个节点:

nohup bin/mqbroker -c conf/dledger/broker-n0.conf & nohup bin/mqbroker -c conf/dledger/broker-n1.conf & nohup bin/mqbroker -c conf/dledger/broker-n2.conf &

查看是否启动成功:

$ bin/mqadmin clusterList -n localhost:9876 RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0). RocketMQLog:WARN Please initialize the logger system properly. /#Cluster Name /#Broker Name /#BID /#Addr /#Version /#InTPS(LOAD) /#OutTPS(LOAD) /#PCWait(ms) /#Hour /#SPACE RaftCluster RaftNode00 0 x.x.x.x:30921 V4_7_0 0.00(0,0ms) 0.00(0,0ms) 0 444663.49 -1.0000 RaftCluster RaftNode00 1 x.x.x.x:30911 V4_7_0 0.00(0,0ms) 0.00(0,0ms) 0 444663.49 -1.0000 RaftCluster RaftNode00 3 x.x.x.x:30931 V4_7_0 0.00(0,0ms) 0.00(0,0ms) 0 444663.49 -1.0000

说明:BID 为 0 表示表示 Master,其他两个均为 Follower。

控制台截图:

img

查看发送消息:

img

说明:通过以上步骤,我们完成多副本的搭建过程。

重新选主

我们通过 kill 掉 Master 的方式来验证 DLedger 选主情况,上面的 clusterList 截图中我们看到 Master 为 x.x.x.x:30921,将该进程 kill 掉后观察一下。 $ bin/mqadmin clusterList -n localhost:9876 RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0). RocketMQLog:WARN Please initialize the logger system properly. /#Cluster Name /#Broker Name /#BID /#Addr /#Version /#InTPS(LOAD) /#OutTPS(LOAD) /#PCWait(ms) /#Hour /#SPACE RaftCluster RaftNode00 0 x.x.x.x:30931 V4_7_0 0.00(0,0ms) 0.00(0,0ms) 0 444664.03 -1.0000 RaftCluster RaftNode00 1 x.x.x.x:30911 V4_7_0 0.00(0,0ms) 0.00(0,0ms) 0 444664.03 -1.0000

说明:kill 掉原 Master 后,完成自动选主,新的 Master 为 x.x.x.x:30931。

参数说明

配置文件中多副本的参数说明见下面表格。 参数 说明 enableDLegerCommitLog 是否启用 DLedger,默认 false dLegerGroup 节点所属的 Raft 组,建议与 broker 一致 dLegerPeers 集群节点信息,示例:n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913 dLegerSelfId 当前节点 id。取自 legerPeers 中条目的开头,即上述示例中的 n0,并且特别需要强调,只能第一个字符为英文,其他字符需要配置成数字

参考资料

Raft 的学习资料见下面链接,供学习使用。DLedger 的源码解读,见《RocketMQ 技术内幕》第二版。

多副本结语

使用多副本时,请做好压测,压测的 TPS 是否满足业务的需求,作者曾做过多副本压测与主从异步的 TPS 有相当大的差距。

在 TPS 满足的情况下,建议使用多副本架构,尤其是支付类可以优先使用。

如果线上已经存在了主从默认的架构如何升级到 DLedger 模式呢?

  • 可以参考前面平滑扩所容的方式,将多副本组成的 Raft 组加入到原集群中
  • 关闭原主从架构节点的写入权限
  • 在过了日志存储时间后,将主从架构节点下线

参考资料

https://learn.lianglianglee.com/%e4%b8%93%e6%a0%8f/RocketMQ%20%e5%ae%9e%e6%88%98%e4%b8%8e%e8%bf%9b%e9%98%b6%ef%bc%88%e5%ae%8c%ef%bc%89/23%20%e6%b6%88%e6%81%af%e8%bd%a8%e8%bf%b9%e3%80%81ACL%20%e4%b8%8e%e5%a4%9a%e5%89%af%e6%9c%ac%e6%90%ad%e5%bb%ba.md