register 注册中心

需求

服务的注册与发现,是一个很常见也很有用的需求。

可以让我们不关心服务端的 ip 信息,只关心服务本身。

思路

实现的方式

SimpleRpcRegister 为 rpc 提供的默认实现方案。

实际可以结合 redis,zk 等常见的成熟框架实现。

其实可以把 register 当做是服务端,此时的 server/client 都是客户端。

实现的策略时类似的。

当然也可以直接使用 zk 等成熟的框架,只是个人觉得这样不利于学习,而且 zk 太重了。

流程

  • 启动注册中心

首先启动注册中心

  • 启动服务端

服务端启动时,将注册信息注册到注册中心。

  • 启动客户端

客户端启动的时候,去注册中心读取配置。

Register-注册中心

引导类

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.github.houbb.rpc.register.api.config.impl; import com.github.houbb.heaven.util.common.ArgUtil; import com.github.houbb.rpc.common.remote.netty.handler.ChannelHandlers; import com.github.houbb.rpc.common.remote.netty.impl.DefaultNettyServer; import com.github.houbb.rpc.register.api.config.RegisterConfig; import com.github.houbb.rpc.register.simple.handler.RegisterCenterServerHandler; import io.netty.channel.ChannelHandler; /** * 默认注册中心配置 * @author binbin.hou * @since 0.0.8 */ public class RegisterBs implements RegisterConfig { /** * 服务启动端口信息 * @since 0.0.8 */ private int port; private RegisterBs(){} public static RegisterBs newInstance() { RegisterBs registerBs = new RegisterBs(); registerBs.port(8527); return registerBs; } @Override public RegisterBs port(int port) { ArgUtil.notNegative(port, "port"); this.port = port; return this; } @Override public RegisterBs start() { ChannelHandler channelHandler = ChannelHandlers.objectCodecHandler(new RegisterCenterServerHandler()); DefaultNettyServer.newInstance(port, channelHandler).asyncRun(); return this; } }

指定一下 register 的 port,就可以启动了。

RegisterCenterServerHandler

这里是注册中心的核心实现类。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
/* * Copyright (c) 2019. houbinbin Inc. * rpc All rights reserved. */ package com.github.houbb.rpc.register.simple.handler; import com.github.houbb.log.integration.core.Log; import com.github.houbb.log.integration.core.LogFactory; import com.github.houbb.rpc.register.domain.entry.ServiceEntry; import com.github.houbb.rpc.register.domain.message.RegisterMessage; import com.github.houbb.rpc.register.domain.message.impl.RegisterMessages; import com.github.houbb.rpc.register.simple.SimpleRpcRegister; import com.github.houbb.rpc.register.simple.client.ClientRegisterService; import com.github.houbb.rpc.register.simple.client.impl.DefaultClientRegisterService; import com.github.houbb.rpc.register.simple.constant.MessageTypeConst; import com.github.houbb.rpc.register.simple.server.ServerRegisterService; import com.github.houbb.rpc.register.simple.server.impl.DefaultServerRegisterService; import com.github.houbb.rpc.register.spi.RpcRegister; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * <p> 注册中心服务器处理类 </p> * * <pre> Created: 2019/10/23 10:29 下午 </pre> * <pre> Project: rpc </pre> * <p> * 请求的标准化: * (1)对于 server 的服务注册,client 的配置拉取。 * 二者都是将 register 作为服务端。所以需要统一请求信息。 * (2)对于 server 的注册,不需要提供对应的反馈信息 * (3)当配置发生变化时,需要及时通知所有的 client 端。 * 这里就需要知道哪些是客户端?? * * @author houbinbin * @since 0.0.8 */ @ChannelHandler.Sharable public class RegisterCenterServerHandler extends SimpleChannelInboundHandler { private static final Log LOG = LogFactory.getLog(RegisterCenterServerHandler.class); /** * 注册中心服务 * @since 0.0.8 */ private final RpcRegister rpcRegister; public RegisterCenterServerHandler() { this.rpcRegister = this.buildSimpleRpcRegister(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { final String id = ctx.channel().id().asLongText(); LOG.info("[Register Server] channel {} connected " + id); } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { RegisterMessage registerMessage = (RegisterMessage) msg; Object body = registerMessage.body(); int type = RegisterMessages.type(registerMessage); String seqId = registerMessage.seqId(); LOG.info("[Register Server] received message type: {}, seqId: {} ", type, seqId); final Channel channel = ctx.channel(); ServiceEntry serviceEntry = (ServiceEntry)body; switch (type) { case MessageTypeConst.SERVER_REGISTER: rpcRegister.register(serviceEntry); break; case MessageTypeConst.SERVER_UN_REGISTER: rpcRegister.unRegister(serviceEntry); break; case MessageTypeConst.CLIENT_SUBSCRIBE: rpcRegister.subscribe(serviceEntry, channel); break; case MessageTypeConst.CLIENT_UN_SUBSCRIBE: rpcRegister.unSubscribe(serviceEntry, channel); break; case MessageTypeConst.CLIENT_LOOK_UP: rpcRegister.lookUp(seqId, serviceEntry, channel); break; default: LOG.warn("[Register Center] not support type: {} and seqId: {}", type, seqId); } } /** * 构建简单注册实现类 * @return 注册实现 * @since 0.0.8 */ private RpcRegister buildSimpleRpcRegister() { final ServerRegisterService serverRegisterService = new DefaultServerRegisterService(); final ClientRegisterService clientRegisterService = new DefaultClientRegisterService(); return new SimpleRpcRegister(serverRegisterService, clientRegisterService); } }

这里主要区分几种常见的实现,分别调用 RpcRegister 中对应的方法。

RpcRegister

接口

常见的方法定义。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/* * Copyright (c) 2019. houbinbin Inc. * rpc All rights reserved. */ package com.github.houbb.rpc.register.spi; import com.github.houbb.rpc.register.domain.entry.ServiceEntry; import io.netty.channel.Channel; /** * <p> 注册中心接口 </p> * * <pre> Created: 2019/10/23 8:01 下午 </pre> * <pre> Project: rpc </pre> * * @author houbinbin * @since 0.0.8 */ public interface RpcRegister { /** * 注册当前服务信息 * 订阅了这个 serviceId 的所有客户端 * @param serviceEntry 注册当前服务信息 * @since 0.0.8 */ void register(final ServiceEntry serviceEntry); /** * 注销当前服务信息 * @param serviceEntry 注册当前服务信息 * @since 0.0.8 */ void unRegister(final ServiceEntry serviceEntry); /** * 监听服务信息 * (1)监听之后,如果有任何相关的机器信息发生变化,则进行推送。 * (2)内置的信息,需要传送 ip 信息到注册中心。 * * @param serviceEntry 客户端明细信息 * @param channel 频道信息 * @since 0.0.8 */ void subscribe(final ServiceEntry serviceEntry, final Channel channel); /** * 取消监听服务信息 * * (1)将改服务从客户端的监听列表中移除即可。 * * @param server 客户端明细信息 * @param channel 频道信息 * @since 0.0.8 */ void unSubscribe(final ServiceEntry server, final Channel channel); /** * 启动时查询 serviceId 对应的所有服务端信息 * @param seqId 请求标识 * @param clientEntry 客户端查询明细 * @param channel 频道信息 * @since 0.0.8 */ void lookUp(String seqId, ServiceEntry clientEntry, final Channel channel); }

SimpleRpcRegister

简单的实现。

主要分为两大块:

(1)客户端指定监听的服务

(2)服务端变更时,通知监听的客户端

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/* * Copyright (c) 2019. houbinbin Inc. * rpc All rights reserved. */ package com.github.houbb.rpc.register.simple; import com.github.houbb.log.integration.core.Log; import com.github.houbb.log.integration.core.LogFactory; import com.github.houbb.rpc.common.rpc.domain.RpcResponse; import com.github.houbb.rpc.common.rpc.domain.impl.DefaultRpcResponse; import com.github.houbb.rpc.register.domain.entry.ServiceEntry; import com.github.houbb.rpc.register.simple.client.ClientRegisterService; import com.github.houbb.rpc.register.simple.server.ServerRegisterService; import com.github.houbb.rpc.register.simple.server.impl.DefaultServerRegisterService; import com.github.houbb.rpc.register.spi.RpcRegister; import io.netty.channel.Channel; import java.util.List; /** * <p> 简单的 rpc 注册 </p> * * <pre> Created: 2019/10/23 8:59 下午 </pre> * <pre> Project: rpc </pre> * * (1)各种关系的关系服务类 * (2)各种关系之间的通讯类 * (3)domain 层 * * @author houbinbin * @since 0.0.8 */ public class SimpleRpcRegister implements RpcRegister { private static final Log LOG = LogFactory.getLog(DefaultServerRegisterService.class); /** * 服务端信息管理 * @since 0.0.8 */ private final ServerRegisterService serverRegisterService; /** * 客户端信息管理 * @since 0.0.8 */ private final ClientRegisterService clientRegisterService; public SimpleRpcRegister(ServerRegisterService serverRegisterService, ClientRegisterService clientRegisterService) { this.serverRegisterService = serverRegisterService; this.clientRegisterService = clientRegisterService; } @Override public void register(ServiceEntry serviceEntry) { List<ServiceEntry> serviceEntryList = serverRegisterService.register(serviceEntry); // 通知监听者 clientRegisterService.notify(serviceEntry.serviceId(), serviceEntryList); } @Override public void unRegister(ServiceEntry serviceEntry) { List<ServiceEntry> serviceEntryList = serverRegisterService.unRegister(serviceEntry); // 通知监听者 clientRegisterService.notify(serviceEntry.serviceId(), serviceEntryList); } @Override public void subscribe(ServiceEntry clientEntry, final Channel channel) { clientRegisterService.subscribe(clientEntry, channel); } @Override public void unSubscribe(ServiceEntry clientEntry, Channel channel) { clientRegisterService.unSubscribe(clientEntry, channel); } @Override public void lookUp(String seqId, ServiceEntry clientEntry, Channel channel) { final String serviceId = clientEntry.serviceId(); List<ServiceEntry> serviceEntryList = serverRegisterService.lookUp(serviceId); // 回写 // 为了复用原先的相应结果,此处直接使用 rpc response RpcResponse rpcResponse = DefaultRpcResponse.newInstance().seqId(seqId) .result(serviceEntryList); channel.writeAndFlush(rpcResponse); } }

ServerRegisterService

服务端的注册管理实现其实比较简单:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
/* * Copyright (c) 2019. houbinbin Inc. * rpc All rights reserved. */ package com.github.houbb.rpc.register.simple.server.impl; import com.github.houbb.heaven.util.common.ArgUtil; import com.github.houbb.heaven.util.guava.Guavas; import com.github.houbb.heaven.util.lang.ObjectUtil; import com.github.houbb.heaven.util.util.CollectionUtil; import com.github.houbb.log.integration.core.Log; import com.github.houbb.log.integration.core.LogFactory; import com.github.houbb.rpc.register.domain.entry.ServiceEntry; import com.github.houbb.rpc.register.simple.server.ServerRegisterService; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** * <p> 默认服务注册类 </p> * * <pre> Created: 2019/10/23 9:16 下午 </pre> * <pre> Project: rpc </pre> * * @author houbinbin * @since 0.0.8 */ public class DefaultServerRegisterService implements ServerRegisterService { private static final Log LOG = LogFactory.getLog(DefaultServerRegisterService.class); /** * 存放对应的 map 信息 * @since 0.0.8 */ private final Map<String, Set<ServiceEntry>> map; public DefaultServerRegisterService(){ map = new ConcurrentHashMap<>(); } @Override public List<ServiceEntry> register(ServiceEntry serviceEntry) { paramCheck(serviceEntry); final String serviceId = serviceEntry.serviceId(); Set<ServiceEntry> serviceEntrySet = map.get(serviceId); if(ObjectUtil.isNull(serviceEntrySet)) { serviceEntrySet = Guavas.newHashSet(); } LOG.info("[Register Server] add service: {}", serviceEntry); serviceEntrySet.add(serviceEntry); map.put(serviceId, serviceEntrySet); // 返回更新后的结果 return Guavas.newArrayList(serviceEntrySet); } @Override public List<ServiceEntry> unRegister(ServiceEntry serviceEntry) { paramCheck(serviceEntry); final String serviceId = serviceEntry.serviceId(); Set<ServiceEntry> serviceEntrySet = map.get(serviceId); if(CollectionUtil.isEmpty(serviceEntrySet)) { // 服务列表为空 LOG.info("[Register Server] remove service set is empty. entry: {}", serviceEntry); return Guavas.newArrayList(); } serviceEntrySet.remove(serviceEntry); LOG.info("[Register Server] remove service: {}", serviceEntry); map.put(serviceId, serviceEntrySet); // 返回更新后的结果 return Guavas.newArrayList(serviceEntrySet); } @Override public List<ServiceEntry> lookUp(String serviceId) { ArgUtil.notEmpty(serviceId, "serviceId"); LOG.info("[Register Server] start lookUp serviceId: {}", serviceId); Set<ServiceEntry> serviceEntrySet = map.get(serviceId); return Guavas.newArrayList(serviceEntrySet); } /** * 参数校验 * @param serviceEntry 服务明细 * @since 0.0.8 */ private void paramCheck(final ServiceEntry serviceEntry) { ArgUtil.notNull(serviceEntry, "serviceEntry"); final String serviceId = serviceEntry.serviceId(); ArgUtil.notEmpty(serviceId, "serviceId"); } }

DefaultClientRegisterService

客户端对应的注册实现类,也是类似的。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
/* * Copyright (c) 2019. houbinbin Inc. * rpc All rights reserved. */ package com.github.houbb.rpc.register.simple.client.impl; import com.github.houbb.heaven.util.common.ArgUtil; import com.github.houbb.heaven.util.guava.Guavas; import com.github.houbb.heaven.util.lang.ObjectUtil; import com.github.houbb.heaven.util.util.CollectionUtil; import com.github.houbb.log.integration.core.Log; import com.github.houbb.log.integration.core.LogFactory; import com.github.houbb.rpc.register.domain.entry.ServiceEntry; import com.github.houbb.rpc.register.domain.message.RegisterMessage; import com.github.houbb.rpc.register.domain.message.impl.RegisterMessages; import com.github.houbb.rpc.register.simple.client.ClientRegisterService; import com.github.houbb.rpc.register.simple.constant.MessageTypeConst; import io.netty.channel.Channel; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** * <p> 默认客户端注册服务实现类 </p> * * <pre> Created: 2019/10/23 9:42 下午 </pre> * <pre> Project: rpc </pre> * * @author houbinbin * @since 0.0.8 */ public class DefaultClientRegisterService implements ClientRegisterService { private static final Log LOG = LogFactory.getLog(DefaultClientRegisterService.class); /** * 服务信息-客户端列表 map * key: serviceId * value: 对应的客户端列表信息。 * * 客户端使用定期拉取的方式: * (1)传入 host 信息,返回对应的 service 列表。 * (2)根据 service 列表,变化时定期推送给客户端。 * * 只是在第一次采用拉取的方式,后面全部采用推送的方式。 * (1)只有变更的时候,才会进行推送,保证实时性。 * (2)客户端启动时拉取,作为保底措施。避免客户端不在线等情况。 * * @since 0.0.8 */ private final Map<String, Set<Channel>> serviceClientChannelMap; public DefaultClientRegisterService() { this.serviceClientChannelMap = new ConcurrentHashMap<>(); } @Override public void subscribe(ServiceEntry clientEntry, Channel clientChannel) { paramCheck(clientEntry); final String serviceId = clientEntry.serviceId(); Set<Channel> channelSet = serviceClientChannelMap.get(serviceId); if (ObjectUtil.isNull(channelSet)) { channelSet = Guavas.newHashSet(); } channelSet.add(clientChannel); serviceClientChannelMap.put(serviceId, channelSet); } @Override public void unSubscribe(ServiceEntry clientEntry, Channel clientChannel) { paramCheck(clientEntry); final String serviceId = clientEntry.serviceId(); Set<Channel> channelSet = serviceClientChannelMap.get(serviceId); if (CollectionUtil.isEmpty(channelSet)) { // 服务列表为空 LOG.info("[Register Client] remove host set is empty. entry: {}", clientEntry); return; } channelSet.remove(clientChannel); serviceClientChannelMap.put(serviceId, channelSet); } @Override public void notify(String serviceId, List<ServiceEntry> serviceEntryList) { ArgUtil.notEmpty(serviceId, "serviceId"); List<Channel> clientChannelList = clientChannelList(serviceId); if (CollectionUtil.isEmpty(clientChannelList)) { LOG.info("[Register] notify clients is empty for service: {}", serviceId); return; } // 循环通知 for(Channel channel : clientChannelList) { RegisterMessage registerMessage = RegisterMessages.of(MessageTypeConst.REGISTER_NOTIFY, serviceEntryList); channel.writeAndFlush(registerMessage); } } /** * 参数校验 * * @param serviceEntry 入参信息 * @since 0.0.8 */ private void paramCheck(final ServiceEntry serviceEntry) { ArgUtil.notNull(serviceEntry, "serverEntry"); ArgUtil.notEmpty(serviceEntry.serviceId(), "serverEntry.serviceId"); ArgUtil.notEmpty(serviceEntry.ip(), "serverEntry.ip"); } /** * 获取所有的客户端列表 * @param serviceId 服务标识 * @return 客户端列表标识 * @since 0.0.8 */ private List<Channel> clientChannelList(String serviceId) { ArgUtil.notEmpty(serviceId, "serviceId"); Set<Channel> clientSet = serviceClientChannelMap.get(serviceId); return Guavas.newArrayList(clientSet); } }

内容较多,客户端与服务端调整放在下一期。

小结

为了便于大家学习,以上源码已经开源:

https://github.com/houbb/rpc

我是老马,期待与你的下次重逢。