前景回顾

【mq】从零开始实现 mq-01-生产者、消费者启动

【mq】从零开始实现 mq-02-如何实现生产者调用消费者?

【mq】从零开始实现 mq-03-引入 broker 中间人

上一节我们学习了如何实现生产者给消费者发送消息,但是是通过直连的方式。

那么如何才能达到解耦的效果呢?

答案就是引入 broker,消息的中间人。

broker

MqBroker 实现

核心启动类

类似我们前面 consumer 的启动实现:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.github.houbb.mq.broker.core; /** * @author binbin.hou * @since 1.0.0 */ public class MqBroker extends Thread implements IMqBroker { // 省略 private ChannelHandler initChannelHandler() { MqBrokerHandler handler = new MqBrokerHandler(); handler.setInvokeService(invokeService); handler.setRegisterConsumerService(registerConsumerService); handler.setRegisterProducerService(registerProducerService); handler.setMqBrokerPersist(mqBrokerPersist); handler.setBrokerPushService(brokerPushService); handler.setRespTimeoutMills(respTimeoutMills); return handler; } @Override public void run() { // 启动服务端 log.info("MQ 中间人开始启动服务端 port: {}", port); EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { final ByteBuf delimiterBuf = DelimiterUtil.getByteBuf(DelimiterUtil.DELIMITER); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(workerGroup, bossGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline() .addLast(new DelimiterBasedFrameDecoder(DelimiterUtil.LENGTH, delimiterBuf)) .addLast(initChannelHandler()); } }) // 这个参数影响的是还没有被accept 取出的连接 .option(ChannelOption.SO_BACKLOG, 128) // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。 .childOption(ChannelOption.SO_KEEPALIVE, true); // 绑定端口,开始接收进来的链接 ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly(); log.info("MQ 中间人启动完成,监听【" + port + "】端口"); channelFuture.channel().closeFuture().syncUninterruptibly(); log.info("MQ 中间人关闭完成"); } catch (Exception e) { log.error("MQ 中间人启动异常", e); throw new MqException(BrokerRespCode.RPC_INIT_FAILED); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }

initChannelHandler 中有不少新面孔,我们后面会详细介绍。

MqBrokerHandler 处理逻辑

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package com.github.houbb.mq.broker.handler; import java.util.List; /** * @author binbin.hou * @since 1.0.0 */ public class MqBrokerHandler extends SimpleChannelInboundHandler { private static final Log log = LogFactory.getLog(MqBrokerHandler.class); /** * 调用管理类 * @since 1.0.0 */ private IInvokeService invokeService; /** * 消费者管理 * @since 0.0.3 */ private IBrokerConsumerService registerConsumerService; /** * 生产者管理 * @since 0.0.3 */ private IBrokerProducerService registerProducerService; /** * 持久化类 * @since 0.0.3 */ private IMqBrokerPersist mqBrokerPersist; /** * 推送服务 * @since 0.0.3 */ private IBrokerPushService brokerPushService; /** * 获取响应超时时间 * @since 0.0.3 */ private long respTimeoutMills; //set 方法 @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); RpcMessageDto rpcMessageDto = null; try { rpcMessageDto = JSON.parseObject(bytes, RpcMessageDto.class); } catch (Exception exception) { log.error("RpcMessageDto json 格式转换异常 {}", new String(bytes)); return; } if (rpcMessageDto.isRequest()) { MqCommonResp commonResp = this.dispatch(rpcMessageDto, ctx); if(commonResp == null) { log.debug("当前消息为 null,忽略处理。"); return; } // 写回响应,和以前类似。 writeResponse(rpcMessageDto, commonResp, ctx); } else { final String traceId = rpcMessageDto.getTraceId(); // 丢弃掉 traceId 为空的信息 if(StringUtil.isBlank(traceId)) { log.debug("[Server Response] response traceId 为空,直接丢弃", JSON.toJSON(rpcMessageDto)); return; } // 添加消息 invokeService.addResponse(traceId, rpcMessageDto); } } /** * 异步处理消息 * @param mqMessage 消息 * @since 0.0.3 */ private void asyncHandleMessage(MqMessage mqMessage) { List<Channel> channelList = registerConsumerService.getSubscribeList(mqMessage); if(CollectionUtil.isEmpty(channelList)) { log.info("监听列表为空,忽略处理"); return; } BrokerPushContext brokerPushContext = new BrokerPushContext(); brokerPushContext.setChannelList(channelList); brokerPushContext.setMqMessage(mqMessage); brokerPushContext.setMqBrokerPersist(mqBrokerPersist); brokerPushContext.setInvokeService(invokeService); brokerPushContext.setRespTimeoutMills(respTimeoutMills); brokerPushService.asyncPush(brokerPushContext); } }

消息分发

broker 接收到消息以后,dispatch 实现如下:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
/** * 消息的分发 * * @param rpcMessageDto 入参 * @param ctx 上下文 * @return 结果 */ private MqCommonResp dispatch(RpcMessageDto rpcMessageDto, ChannelHandlerContext ctx) { try { final String methodType = rpcMessageDto.getMethodType(); final String json = rpcMessageDto.getJson(); String channelId = ChannelUtil.getChannelId(ctx); final Channel channel = ctx.channel(); log.debug("channelId: {} 接收到 method: {} 内容:{}", channelId, methodType, json); // 生产者注册 if(MethodType.P_REGISTER.equals(methodType)) { BrokerRegisterReq registerReq = JSON.parseObject(json, BrokerRegisterReq.class); return registerProducerService.register(registerReq.getServiceEntry(), channel); } // 生产者注销 if(MethodType.P_UN_REGISTER.equals(methodType)) { BrokerRegisterReq registerReq = JSON.parseObject(json, BrokerRegisterReq.class); return registerProducerService.unRegister(registerReq.getServiceEntry(), channel); } // 生产者消息发送 if(MethodType.P_SEND_MSG.equals(methodType)) { MqMessage mqMessage = JSON.parseObject(json, MqMessage.class); MqMessagePersistPut persistPut = new MqMessagePersistPut(); persistPut.setMqMessage(mqMessage); persistPut.setMessageStatus(MessageStatusConst.WAIT_CONSUMER); MqCommonResp commonResp = mqBrokerPersist.put(persistPut); this.asyncHandleMessage(mqMessage); return commonResp; } // 生产者消息发送-ONE WAY if(MethodType.P_SEND_MSG_ONE_WAY.equals(methodType)) { MqMessage mqMessage = JSON.parseObject(json, MqMessage.class); MqMessagePersistPut persistPut = new MqMessagePersistPut(); persistPut.setMqMessage(mqMessage); persistPut.setMessageStatus(MessageStatusConst.WAIT_CONSUMER); mqBrokerPersist.put(persistPut); this.asyncHandleMessage(mqMessage); return null; } // 消费者注册 if(MethodType.C_REGISTER.equals(methodType)) { BrokerRegisterReq registerReq = JSON.parseObject(json, BrokerRegisterReq.class); return registerConsumerService.register(registerReq.getServiceEntry(), channel); } // 消费者注销 if(MethodType.C_UN_REGISTER.equals(methodType)) { BrokerRegisterReq registerReq = JSON.parseObject(json, BrokerRegisterReq.class); return registerConsumerService.unRegister(registerReq.getServiceEntry(), channel); } // 消费者监听注册 if(MethodType.C_SUBSCRIBE.equals(methodType)) { ConsumerSubscribeReq req = JSON.parseObject(json, ConsumerSubscribeReq.class); return registerConsumerService.subscribe(req, channel); } // 消费者监听注销 if(MethodType.C_UN_SUBSCRIBE.equals(methodType)) { ConsumerUnSubscribeReq req = JSON.parseObject(json, ConsumerUnSubscribeReq.class); return registerConsumerService.unSubscribe(req, channel); } // 消费者主动 pull if(MethodType.C_MESSAGE_PULL.equals(methodType)) { MqConsumerPullReq req = JSON.parseObject(json, MqConsumerPullReq.class); return mqBrokerPersist.pull(req, channel); } throw new UnsupportedOperationException("暂不支持的方法类型"); } catch (Exception exception) { log.error("执行异常", exception); MqCommonResp resp = new MqCommonResp(); resp.setRespCode(MqCommonRespCode.FAIL.getCode()); resp.setRespMessage(MqCommonRespCode.FAIL.getMsg()); return resp; } }

消息推送

this.asyncHandleMessage(mqMessage); 是 broker 接收到消息之后的处理逻辑。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/** * 异步处理消息 * @param mqMessage 消息 * @since 0.0.3 */ private void asyncHandleMessage(MqMessage mqMessage) { List<Channel> channelList = registerConsumerService.getSubscribeList(mqMessage); if(CollectionUtil.isEmpty(channelList)) { log.info("监听列表为空,忽略处理"); return; } BrokerPushContext brokerPushContext = new BrokerPushContext(); brokerPushContext.setChannelList(channelList); brokerPushContext.setMqMessage(mqMessage); brokerPushContext.setMqBrokerPersist(mqBrokerPersist); brokerPushContext.setInvokeService(invokeService); brokerPushContext.setRespTimeoutMills(respTimeoutMills); brokerPushService.asyncPush(brokerPushContext); }

推送的核心实现如下:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.github.houbb.mq.broker.support.push; /** * @author binbin.hou * @since 0.0.3 */ public class BrokerPushService implements IBrokerPushService { private static final Log log = LogFactory.getLog(BrokerPushService.class); private static final ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor(); @Override public void asyncPush(final BrokerPushContext context) { EXECUTOR_SERVICE.submit(new Runnable() { @Override public void run() { log.info("开始异步处理 {}", JSON.toJSON(context)); final List<Channel> channelList = context.getChannelList(); final IMqBrokerPersist mqBrokerPersist = context.getMqBrokerPersist(); final MqMessage mqMessage = context.getMqMessage(); final String messageId = mqMessage.getTraceId(); final IInvokeService invokeService = context.getInvokeService(); final long responseTime = context.getRespTimeoutMills(); for(Channel channel : channelList) { try { String channelId = ChannelUtil.getChannelId(channel); log.info("开始处理 channelId: {}", channelId); //1. 调用 mqMessage.setMethodType(MethodType.B_MESSAGE_PUSH); MqConsumerResultResp resultResp = callServer(channel, mqMessage, MqConsumerResultResp.class, invokeService, responseTime); //2. 更新状态 mqBrokerPersist.updateStatus(messageId, resultResp.getConsumerStatus()); //3. 后期添加重试策略 log.info("完成处理 channelId: {}", channelId); } catch (Exception exception) { log.error("处理异常"); mqBrokerPersist.updateStatus(messageId, ConsumerStatus.FAILED.getCode()); } } log.info("完成异步处理"); } }); } }

此处在消息推送之后,需要更新消息的 ACK 状态。

消息生产者处理类

IBrokerProducerService 接口定义如下:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.github.houbb.mq.broker.api; /** * <p> 生产者注册服务类 </p> * * @author houbinbin * @since 0.0.3 */ public interface IBrokerProducerService { /** * 注册当前服务信息 * (1)将该服务通过 {@link ServiceEntry#getGroupName()} 进行分组 * 订阅了这个 serviceId 的所有客户端 * @param serviceEntry 注册当前服务信息 * @param channel channel * @since 0.0.8 */ MqCommonResp register(final ServiceEntry serviceEntry, Channel channel); /** * 注销当前服务信息 * @param serviceEntry 注册当前服务信息 * @param channel 通道 * @since 0.0.8 */ MqCommonResp unRegister(final ServiceEntry serviceEntry, Channel channel); /** * 获取服务地址信息 * @param channel channel * @return 结果 * @since 0.0.3 */ ServiceEntry getServiceEntry(final Channel channel); }

实现如下:

本地基于 map 存储请求过来的基本信息。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.github.houbb.mq.broker.support.api; /** * <p> 生产者注册服务类 </p> * * @author houbinbin * @since 0.0.3 */ public class LocalBrokerProducerService implements IBrokerProducerService { private static final Log log = LogFactory.getLog(LocalBrokerProducerService.class); private final Map<String, BrokerServiceEntryChannel> registerMap = new ConcurrentHashMap<>(); @Override public MqCommonResp register(ServiceEntry serviceEntry, Channel channel) { final String channelId = ChannelUtil.getChannelId(channel); BrokerServiceEntryChannel entryChannel = InnerChannelUtils.buildEntryChannel(serviceEntry, channel); registerMap.put(channelId, entryChannel); MqCommonResp resp = new MqCommonResp(); resp.setRespCode(MqCommonRespCode.SUCCESS.getCode()); resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg()); return resp; } @Override public MqCommonResp unRegister(ServiceEntry serviceEntry, Channel channel) { final String channelId = ChannelUtil.getChannelId(channel); registerMap.remove(channelId); MqCommonResp resp = new MqCommonResp(); resp.setRespCode(MqCommonRespCode.SUCCESS.getCode()); resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg()); return resp; } @Override public ServiceEntry getServiceEntry(Channel channel) { final String channelId = ChannelUtil.getChannelId(channel); return registerMap.get(channelId); } }

消息消费者处理类

接口定义如下:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.github.houbb.mq.broker.api; /** * <p> 消费者注册服务类 </p> * * @author houbinbin * @since 0.0.3 */ public interface IBrokerConsumerService { /** * 注册当前服务信息 * (1)将该服务通过 {@link ServiceEntry#getGroupName()} 进行分组 * 订阅了这个 serviceId 的所有客户端 * @param serviceEntry 注册当前服务信息 * @param channel channel * @since 0.0.3 */ MqCommonResp register(final ServiceEntry serviceEntry, Channel channel); /** * 注销当前服务信息 * @param serviceEntry 注册当前服务信息 * @param channel channel * @since 0.0.3 */ MqCommonResp unRegister(final ServiceEntry serviceEntry, Channel channel); /** * 监听服务信息 * (1)监听之后,如果有任何相关的机器信息发生变化,则进行推送。 * (2)内置的信息,需要传送 ip 信息到注册中心。 * * @param serviceEntry 客户端明细信息 * @param clientChannel 客户端 channel 信息 * @since 0.0.3 */ MqCommonResp subscribe(final ConsumerSubscribeReq serviceEntry, final Channel clientChannel); /** * 取消监听服务信息 * (1)监听之后,如果有任何相关的机器信息发生变化,则进行推送。 * (2)内置的信息,需要传送 ip 信息到注册中心。 * * @param serviceEntry 客户端明细信息 * @param clientChannel 客户端 channel 信息 * @since 0.0.3 */ MqCommonResp unSubscribe(final ConsumerUnSubscribeReq serviceEntry, final Channel clientChannel); /** * 获取所有匹配的消费者 * 1. 同一个 groupName 只返回一个,注意负载均衡 * 2. 返回匹配当前消息的消费者通道 * * @param mqMessage 消息体 * @return 结果 */ List<Channel> getSubscribeList(MqMessage mqMessage); }

默认实现:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package com.github.houbb.mq.broker.support.api; /** * @author binbin.hou * @since 1.0.0 */ public class LocalBrokerConsumerService implements IBrokerConsumerService { private final Map<String, BrokerServiceEntryChannel> registerMap = new ConcurrentHashMap<>(); /** * 订阅集合 * key: topicName * value: 对应的订阅列表 */ private final Map<String, Set<ConsumerSubscribeBo>> subscribeMap = new ConcurrentHashMap<>(); @Override public MqCommonResp register(ServiceEntry serviceEntry, Channel channel) { final String channelId = ChannelUtil.getChannelId(channel); BrokerServiceEntryChannel entryChannel = InnerChannelUtils.buildEntryChannel(serviceEntry, channel); registerMap.put(channelId, entryChannel); MqCommonResp resp = new MqCommonResp(); resp.setRespCode(MqCommonRespCode.SUCCESS.getCode()); resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg()); return resp; } @Override public MqCommonResp unRegister(ServiceEntry serviceEntry, Channel channel) { final String channelId = ChannelUtil.getChannelId(channel); registerMap.remove(channelId); MqCommonResp resp = new MqCommonResp(); resp.setRespCode(MqCommonRespCode.SUCCESS.getCode()); resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg()); return resp; } @Override public MqCommonResp subscribe(ConsumerSubscribeReq serviceEntry, Channel clientChannel) { final String channelId = ChannelUtil.getChannelId(clientChannel); final String topicName = serviceEntry.getTopicName(); Set<ConsumerSubscribeBo> set = subscribeMap.get(topicName); if(set == null) { set = new HashSet<>(); } ConsumerSubscribeBo subscribeBo = new ConsumerSubscribeBo(); subscribeBo.setChannelId(channelId); subscribeBo.setGroupName(serviceEntry.getGroupName()); subscribeBo.setTopicName(topicName); subscribeBo.setTagRegex(serviceEntry.getTagRegex()); set.add(subscribeBo); subscribeMap.put(topicName, set); MqCommonResp resp = new MqCommonResp(); resp.setRespCode(MqCommonRespCode.SUCCESS.getCode()); resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg()); return resp; } @Override public MqCommonResp unSubscribe(ConsumerUnSubscribeReq serviceEntry, Channel clientChannel) { final String channelId = ChannelUtil.getChannelId(clientChannel); final String topicName = serviceEntry.getTopicName(); ConsumerSubscribeBo subscribeBo = new ConsumerSubscribeBo(); subscribeBo.setChannelId(channelId); subscribeBo.setGroupName(serviceEntry.getGroupName()); subscribeBo.setTopicName(topicName); subscribeBo.setTagRegex(serviceEntry.getTagRegex()); // 集合 Set<ConsumerSubscribeBo> set = subscribeMap.get(topicName); if(CollectionUtil.isNotEmpty(set)) { set.remove(subscribeBo); } MqCommonResp resp = new MqCommonResp(); resp.setRespCode(MqCommonRespCode.SUCCESS.getCode()); resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg()); return resp; } @Override public List<Channel> getSubscribeList(MqMessage mqMessage) { final String topicName = mqMessage.getTopic(); Set<ConsumerSubscribeBo> set = subscribeMap.get(topicName); if(CollectionUtil.isEmpty(set)) { return Collections.emptyList(); } //2. 获取匹配的 tag 列表 final List<String> tagNameList = mqMessage.getTags(); Map<String, List<ConsumerSubscribeBo>> groupMap = new HashMap<>(); for(ConsumerSubscribeBo bo : set) { String tagRegex = bo.getTagRegex(); if(hasMatch(tagNameList, tagRegex)) { //TODO: 这种设置模式,统一添加处理 String groupName = bo.getGroupName(); List<ConsumerSubscribeBo> list = groupMap.get(groupName); if(list == null) { list = new ArrayList<>(); } list.add(bo); groupMap.put(groupName, list); } } //3. 按照 groupName 分组之后,每一组只随机返回一个。最好应该调整为以 shardingkey 选择 final String shardingKey = mqMessage.getShardingKey(); List<Channel> channelList = new ArrayList<>(); for(Map.Entry<String, List<ConsumerSubscribeBo>> entry : groupMap.entrySet()) { List<ConsumerSubscribeBo> list = entry.getValue(); ConsumerSubscribeBo bo = RandomUtils.random(list, shardingKey); BrokerServiceEntryChannel entryChannel = registerMap.get(bo.getChannelId()); channelList.add(entryChannel.getChannel()); } return channelList; } private boolean hasMatch(List<String> tagNameList, String tagRegex) { if(CollectionUtil.isEmpty(tagNameList)) { return false; } Pattern pattern = Pattern.compile(tagRegex); for(String tagName : tagNameList) { if(RegexUtils.match(pattern, tagName)) { return true; } } return false; } }

getSubscribeList 的逻辑可能稍微复杂点,其实就是消息过来,找到匹配的订阅消费者而已。

因为同一个 groupName 的消费者消息只消费一次,所以需要一次分组。

消息持久化

接口如下:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.github.houbb.mq.broker.support.persist; /** * @author binbin.hou * @since 0.0.3 */ public interface IMqBrokerPersist { /** * 保存消息 * @param mqMessage 消息 * @since 0.0.3 */ MqCommonResp put(final MqMessagePersistPut mqMessage); /** * 更新状态 * @param messageId 消息唯一标识 * @param status 状态 * @return 结果 * @since 0.0.3 */ MqCommonResp updateStatus(final String messageId, final String status); /** * 拉取消息 * @param pull 拉取消息 * @return 结果 */ MqConsumerPullResp pull(final MqConsumerPullReq pull, final Channel channel); }

本地默认实现:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package com.github.houbb.mq.broker.support.persist; /** * 本地持久化策略 * @author binbin.hou * @since 1.0.0 */ public class LocalMqBrokerPersist implements IMqBrokerPersist { private static final Log log = LogFactory.getLog(LocalMqBrokerPersist.class); /** * 队列 * ps: 这里只是简化实现,暂时不考虑并发等问题。 */ private final Map<String, List<MqMessagePersistPut>> map = new ConcurrentHashMap<>(); //1. 接收 //2. 持久化 //3. 通知消费 @Override public synchronized MqCommonResp put(MqMessagePersistPut put) { log.info("put elem: {}", JSON.toJSON(put)); MqMessage mqMessage = put.getMqMessage(); final String topic = mqMessage.getTopic(); List<MqMessagePersistPut> list = map.get(topic); if(list == null) { list = new ArrayList<>(); } list.add(put); map.put(topic, list); MqCommonResp commonResp = new MqCommonResp(); commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode()); commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg()); return commonResp; } @Override public MqCommonResp updateStatus(String messageId, String status) { // 这里性能比较差,所以不可以用于生产。仅作为测试验证 for(List<MqMessagePersistPut> list : map.values()) { for(MqMessagePersistPut put : list) { MqMessage mqMessage = put.getMqMessage(); if(mqMessage.getTraceId().equals(messageId)) { put.setMessageStatus(status); break; } } } MqCommonResp commonResp = new MqCommonResp(); commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode()); commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg()); return commonResp; } @Override public MqConsumerPullResp pull(MqConsumerPullReq pull, Channel channel) { //TODO... 待实现 return null; } }

ps: 后续将会基于 springboot+mysql 进行持久化策略实现。

消费者启动调整

我们将生产者、消费者的启动都进行调整,连接到 broker 中。

二者是类似的,此处以消费者为例。

核心启动类

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.github.houbb.mq.consumer.core; /** * 推送消费策略 * * @author binbin.hou * @since 1.0.0 */ public class MqConsumerPush extends Thread implements IMqConsumer { // 属性&设置 @Override public void run() { // 启动服务端 log.info("MQ 消费者开始启动服务端 groupName: {}, brokerAddress: {}", groupName, brokerAddress); //1. 参数校验 this.paramCheck(); try { // channel handler ChannelHandler channelHandler = this.initChannelHandler(); //channel future this.channelFutureList = ChannelFutureUtils.initChannelFutureList(brokerAddress, channelHandler); // register to broker this.registerToBroker(); // 标识为可用 enableFlag = true; log.info("MQ 消费者启动完成"); } catch (Exception e) { log.error("MQ 消费者启动异常", e); throw new MqException(ConsumerRespCode.RPC_INIT_FAILED); } } //订阅&取消订阅 @Override public void registerListener(IMqConsumerListener listener) { this.mqListenerService.register(listener); } }

初始化 handler

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private ChannelHandler initChannelHandler() { final ByteBuf delimiterBuf = DelimiterUtil.getByteBuf(DelimiterUtil.DELIMITER); final MqConsumerHandler mqConsumerHandler = new MqConsumerHandler(invokeService, mqListenerService); // handler 实际上会被多次调用,如果不是 @Shareable,应该每次都重新创建。 ChannelHandler handler = new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline() .addLast(new DelimiterBasedFrameDecoder(DelimiterUtil.LENGTH, delimiterBuf)) .addLast(mqConsumerHandler); } }; return handler; }

注册到服务端

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/** * 注册到所有的服务端 * @since 0.0.3 */ private void registerToBroker() { for(RpcChannelFuture channelFuture : this.channelFutureList) { ServiceEntry serviceEntry = new ServiceEntry(); serviceEntry.setGroupName(groupName); serviceEntry.setAddress(channelFuture.getAddress()); serviceEntry.setPort(channelFuture.getPort()); serviceEntry.setWeight(channelFuture.getWeight()); BrokerRegisterReq brokerRegisterReq = new BrokerRegisterReq(); brokerRegisterReq.setServiceEntry(serviceEntry); brokerRegisterReq.setMethodType(MethodType.C_REGISTER); brokerRegisterReq.setTraceId(IdHelper.uuid32()); log.info("[Register] 开始注册到 broker:{}", JSON.toJSON(brokerRegisterReq)); final Channel channel = channelFuture.getChannelFuture().channel(); MqCommonResp resp = callServer(channel, brokerRegisterReq, MqCommonResp.class); log.info("[Register] 完成注册到 broker:{}", JSON.toJSON(resp)); } }

订阅与取消订阅

消费者对于关心的消息,实现也比较简单:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void subscribe(String topicName, String tagRegex) { ConsumerSubscribeReq req = new ConsumerSubscribeReq(); String messageId = IdHelper.uuid32(); req.setTraceId(messageId); req.setMethodType(MethodType.C_SUBSCRIBE); req.setTopicName(topicName); req.setTagRegex(tagRegex); req.setGroupName(groupName); Channel channel = getChannel(); MqCommonResp resp = callServer(channel, req, MqCommonResp.class); if(!MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) { throw new MqException(ConsumerRespCode.SUBSCRIBE_FAILED); } }

取消订阅:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void unSubscribe(String topicName, String tagRegex) { ConsumerUnSubscribeReq req = new ConsumerUnSubscribeReq(); String messageId = IdHelper.uuid32(); req.setTraceId(messageId); req.setMethodType(MethodType.C_UN_SUBSCRIBE); req.setTopicName(topicName); req.setTagRegex(tagRegex); req.setGroupName(groupName); Channel channel = getChannel(); MqCommonResp resp = callServer(channel, req, MqCommonResp.class); if(!MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) { throw new MqException(ConsumerRespCode.UN_SUBSCRIBE_FAILED); } }

测试

broker 启动

  [java]
1
2
MqBroker broker = new MqBroker(); broker.start();

启动日志:

  [plaintext]
1
2
3
[DEBUG] [2022-04-21 20:36:27.158] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter. [INFO] [2022-04-21 20:36:27.186] [Thread-0] [c.g.h.m.b.c.MqBroker.run] - MQ 中间人开始启动服务端 port: 9999 [INFO] [2022-04-21 20:36:29.060] [Thread-0] [c.g.h.m.b.c.MqBroker.run] - MQ 中间人启动完成,监听【9999】端口

consumer 启动

  [java]
1
2
3
4
5
6
7
8
9
10
11
final MqConsumerPush mqConsumerPush = new MqConsumerPush(); mqConsumerPush.start(); mqConsumerPush.subscribe("TOPIC", "TAGA"); mqConsumerPush.registerListener(new IMqConsumerListener() { @Override public ConsumerStatus consumer(MqMessage mqMessage, IMqConsumerListenerContext context) { System.out.println("---------- 自定义 " + JSON.toJSONString(mqMessage)); return ConsumerStatus.SUCCESS; } });

启动日志:

  [plaintext]
1
2
... [INFO] [2022-04-21 20:37:40.985] [Thread-0] [c.g.h.m.c.c.MqConsumerPush.registerToBroker] - [Register] 完成注册到 broker:{"respMessage":"成功","respCode":"0000"}

启动时会注册到 broker。

producer 启动

  [java]
1
2
3
4
5
6
7
8
9
10
11
MqProducer mqProducer = new MqProducer(); mqProducer.start(); String message = "HELLO MQ!"; MqMessage mqMessage = new MqMessage(); mqMessage.setTopic("TOPIC"); mqMessage.setTags(Arrays.asList("TAGA", "TAGB")); mqMessage.setPayload(message); SendResult sendResult = mqProducer.send(mqMessage); System.out.println(JSON.toJSON(sendResult));

日志:

  [plaintext]
1
2
3
... [INFO] [2022-04-21 20:39:17.885] [Thread-0] [c.g.h.m.p.c.MqProducer.registerToBroker] - [Register] 完成注册到 broker:{"respMessage":"成功","respCode":"0000"} ...

此时消费者消费到我们发送的消息。

  [plaintext]
1
---------- 自定义 {"methodType":"B_MESSAGE_PUSH","payload":"HELLO MQ!","tags":["TAGA","TAGB"],"topic":"TOPIC","traceId":"2237bbfe55b842328134e6a100e36364"}

小结

到这里,我们就实现了基于中间人的生产者与消费者通讯。

希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。

我是老马,期待与你的下次重逢。

开源地址

The message queue in java.(java 简易版本 mq 实现) https://github.com/houbb/mq

拓展阅读

rpc-从零开始实现 rpc https://github.com/houbb/rpc