前景回顾

【mq】从零开始实现 mq-01-生产者、消费者启动

【mq】从零开始实现 mq-02-如何实现生产者调用消费者?

【mq】从零开始实现 mq-03-引入 broker 中间人

【mq】从零开始实现 mq-04-启动检测与实现优化

【mq】从零开始实现 mq-05-实现优雅停机

【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat

【mq】从零开始实现 mq-07-负载均衡 load balance

【mq】从零开始实现 mq-08-配置优化 fluent

【mq】从零开始实现 mq-09-消费者拉取消息 pull message

【mq】从零开始实现 mq-10-消费者拉取消息回执 pull message ack

【mq】从零开始实现 mq-11-消费者消息回执添加分组信息 pull message ack groupName

【mq】从零开始实现 mq-12-消息的批量发送与回执

【mq】从零开始实现 mq-13-注册鉴权 auth

注册鉴权

我们前面实现了 mq 的基本功能,不过还是存在一个问题,那就是 mq 没有进行鉴权。

这就会导致如果部署在公网,任何一个机器都可以连接我们的服务,这显然是不够安全的。

13

生产者实现

属性

生产者启动时新增 2 个属性:

  [java]
1
2
3
4
5
6
7
8
9
10
11
/** * 账户标识 * @since 0.1.4 */ private String appKey; /** * 账户密码 * @since 0.1.4 */ private String appSecret;

注册逻辑调整

注册时,添加这两个属性到服务端。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void registerToBroker() { int successCount = 0; for(RpcChannelFuture channelFuture : this.channelFutureList) { ServiceEntry serviceEntry = new ServiceEntry(); serviceEntry.setGroupName(groupName); serviceEntry.setAddress(channelFuture.getAddress()); serviceEntry.setPort(channelFuture.getPort()); serviceEntry.setWeight(channelFuture.getWeight()); BrokerRegisterReq brokerRegisterReq = new BrokerRegisterReq(); brokerRegisterReq.setServiceEntry(serviceEntry); brokerRegisterReq.setMethodType(MethodType.P_REGISTER); brokerRegisterReq.setTraceId(IdHelper.uuid32()); brokerRegisterReq.setAppKey(appKey); brokerRegisterReq.setAppSecret(appSecret); log.info("[Register] 开始注册到 broker:{}", JSON.toJSON(brokerRegisterReq)); final Channel channel = channelFuture.getChannelFuture().channel(); MqCommonResp resp = callServer(channel, brokerRegisterReq, MqCommonResp.class); log.info("[Register] 完成注册到 broker:{}", JSON.toJSON(resp)); if(MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) { successCount++; } } if(successCount <= 0 && check) { log.error("校验 broker 可用性,可连接成功数为 0"); throw new MqException(MqCommonRespCode.P_REGISTER_TO_BROKER_FAILED); } }

消费者

消费者连接到 broker 也是类似的,此处不做赘述。

Broker 的处理

注册逻辑

以前注册是直接成功,此处加一个业务判断。

  [java]
1
2
3
4
5
6
7
8
9
10
// 生产者注册 if(MethodType.P_REGISTER.equals(methodType)) { BrokerRegisterReq registerReq = JSON.parseObject(json, BrokerRegisterReq.class); if(!brokerRegisterValidService.producerValid(registerReq)) { log.error("{} 生产者注册验证失败", JSON.toJSON(registerReq)); throw new MqException(MqBrokerRespCode.P_REGISTER_VALID_FAILED); } return registerProducerService.register(registerReq.getServiceEntry(), channel); }

首先会校验有效性,这个是一个接口,可自行灵活替换。

其他业务逻辑

其他业务处理时,都需要 registerProducerService.checkValid(channelId); 进行有效性判断。

  [java]
1
2
3
4
5
6
7
// 生产者注销 if(MethodType.P_UN_REGISTER.equals(methodType)) { registerProducerService.checkValid(channelId); BrokerRegisterReq registerReq = JSON.parseObject(json, BrokerRegisterReq.class); return registerProducerService.unRegister(registerReq.getServiceEntry(), channel); }

小结

注册鉴权实现的原理非常简单,不过可以为安全性提供最基础的保障。

希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。

我是老马,期待与你的下次重逢。

开源地址

The message queue in java.(java 简易版本 mq 实现) https://github.com/houbb/mq

拓展阅读

rpc-从零开始实现 rpc https://github.com/houbb/rpc