register 注册中心

上一节我们实现了 register 注册中心的基本实现,当然客户端和服务端也需要相关的实现调整。

服务端

ServiceRegistry

接口

调整如下:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.github.houbb.rpc.server.registry; /** * 服务注册类 * (1)每个应用唯一 * (2)每个服务的暴露协议应该保持一致 * 暂时不提供单个服务的特殊处理,后期可以考虑添加 * * @author binbin.hou * @since 0.0.6 */ public interface ServiceRegistry { // 不变 /** * 注册中心地址信息 * @param addresses 地址信息 * @return this * @since 0.0.8 */ ServiceRegistry registerCenter(final String addresses); }

实现

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
package com.github.houbb.rpc.server.registry.impl; import com.github.houbb.heaven.util.common.ArgUtil; import com.github.houbb.heaven.util.guava.Guavas; import com.github.houbb.log.integration.core.Log; import com.github.houbb.log.integration.core.LogFactory; import com.github.houbb.rpc.common.config.component.RpcAddress; import com.github.houbb.rpc.common.config.component.RpcAddressBuilder; import com.github.houbb.rpc.common.config.protocol.ProtocolConfig; import com.github.houbb.rpc.common.remote.netty.handler.ChannelHandlers; import com.github.houbb.rpc.common.remote.netty.impl.DefaultNettyClient; import com.github.houbb.rpc.common.remote.netty.impl.DefaultNettyServer; import com.github.houbb.rpc.common.util.NetUtil; import com.github.houbb.rpc.register.domain.entry.ServiceEntry; import com.github.houbb.rpc.register.domain.entry.impl.ServiceEntryBuilder; import com.github.houbb.rpc.register.domain.message.RegisterMessage; import com.github.houbb.rpc.register.domain.message.impl.RegisterMessages; import com.github.houbb.rpc.register.simple.constant.MessageTypeConst; import com.github.houbb.rpc.server.config.service.DefaultServiceConfig; import com.github.houbb.rpc.server.config.service.ServiceConfig; import com.github.houbb.rpc.server.handler.RpcServerHandler; import com.github.houbb.rpc.server.handler.RpcServerRegisterHandler; import com.github.houbb.rpc.server.registry.ServiceRegistry; import com.github.houbb.rpc.server.service.impl.DefaultServiceFactory; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import java.util.ArrayList; import java.util.List; /** * 默认服务端注册类 * @author binbin.hou * @since 0.0.6 */ public class DefaultServiceRegistry implements ServiceRegistry { /** * 日志信息 * @since 0.0.8 */ private static final Log LOG = LogFactory.getLog(DefaultServiceRegistry.class); /** * 单例信息 * @since 0.0.6 */ private static final DefaultServiceRegistry INSTANCE = new DefaultServiceRegistry(); /** * rpc 服务端端口号 * @since 0.0.6 */ private int rpcPort; /** * 协议配置 * (1)默认只实现 tcp * (2)后期可以拓展实现 web-service/http/https 等等。 * @since 0.0.6 */ private ProtocolConfig protocolConfig; /** * 服务配置列表 * @since 0.0.6 */ private List<ServiceConfig> serviceConfigList; /** * 注册中心地址列表 * @since 0.0.8 */ private List<RpcAddress> registerCenterList; private DefaultServiceRegistry(){ // 初始化默认参数 this.serviceConfigList = new ArrayList<>(); this.rpcPort = 9527; this.registerCenterList = Guavas.newArrayList(); } public static DefaultServiceRegistry getInstance() { return INSTANCE; } @Override public ServiceRegistry port(int port) { ArgUtil.positive(port, "port"); this.rpcPort = port; return this; } /** * 注册服务实现 * (1)主要用于后期服务调用 * (2)如何根据 id 获取实现?非常简单,id 是唯一的。 * 有就是有,没有就抛出异常,直接返回。 * (3)如果根据 {@link com.github.houbb.rpc.common.rpc.domain.RpcRequest} 获取对应的方法。 * * 3.1 根据 serviceId 获取唯一的实现 * 3.2 根据 {@link Class#getMethod(String, Class[])} 方法名称+参数类型唯一获取方法 * 3.3 根据 {@link java.lang.reflect.Method#invoke(Object, Object...)} 执行方法 * * @param serviceId 服务标识 * @param serviceImpl 服务实现 * @return this * @since 0.0.6 */ @Override @SuppressWarnings("unchecked") public synchronized DefaultServiceRegistry register(final String serviceId, final Object serviceImpl) { ArgUtil.notEmpty(serviceId, "serviceId"); ArgUtil.notNull(serviceImpl, "serviceImpl"); // 构建对应的其他信息 ServiceConfig serviceConfig = new DefaultServiceConfig(); //TODO: 是否暴露服务,允许用户指定 serviceConfig.id(serviceId).reference(serviceImpl).register(true); serviceConfigList.add(serviceConfig); return this; } @Override public ServiceRegistry expose() { // 注册所有服务信息 DefaultServiceFactory.getInstance() .registerServicesLocal(serviceConfigList); LOG.info("server register local finish."); // 启动 netty server 信息 final ChannelHandler channelHandler = ChannelHandlers .objectCodecHandler(new RpcServerHandler()); DefaultNettyServer.newInstance(rpcPort, channelHandler).asyncRun(); LOG.info("server service start finish."); // 注册到配置中心 this.registerServiceCenter(); LOG.info("server service register finish."); return this; } @Override public ServiceRegistry registerCenter(String addresses) { this.registerCenterList = RpcAddressBuilder.of(addresses); return this; } /** * 注冊服務到注册中心 * (1)循环服务列表注册到配置中心列表 * (2)如果 register 为 false,则不进行注册 * (3)后期可以添加延迟暴露,但是感觉意义不大。 * @since 0.0.8 */ private void registerServiceCenter() { // 注册到配置中心 // 初期简单点,直接循环调用即可 // 循环服务信息 for(ServiceConfig config : this.serviceConfigList) { boolean register = config.register(); final String serviceId = config.id(); if(!register) { LOG.info("[Rpc Server] serviceId: {} register config is false.", serviceId); continue; } for(RpcAddress rpcAddress : registerCenterList) { ChannelHandler registerHandler = ChannelHandlers.objectCodecHandler(new RpcServerRegisterHandler()); LOG.info("[Rpc Server] start register to {}:{}", rpcAddress.address(), rpcAddress.port()); ChannelFuture channelFuture = DefaultNettyClient.newInstance(rpcAddress.address(), rpcAddress.port(),registerHandler).call(); // 直接写入信息 RegisterMessage registerMessage = buildRegisterMessage(config); LOG.info("[Rpc Server] register to service center: {}", registerMessage); channelFuture.channel().writeAndFlush(registerMessage); } } } /** * 构建注册信息配置 * @param config 配置信息 * @return 注册信息 * @since 0.0.6 */ private RegisterMessage buildRegisterMessage(final ServiceConfig config) { final String hostIp = NetUtil.getLocalHost(); ServiceEntry serviceEntry = ServiceEntryBuilder.of(config.id(), hostIp, rpcPort); return RegisterMessages.of(MessageTypeConst.SERVER_REGISTER, serviceEntry); } }

服务端启动的时候,可以指定是否 register 到注册中心。

我们会循环整个注册中心列表,然后把 register=true 的服务,注册到每一个注册中心中去。

注册内容也比较简单,就是将标识,对应的服务端地址信息通知到注册中心。

RpcServerRegisterHandler

这个是服务端关于注册中心的 handler,实现暂时比较简单:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.github.houbb.rpc.server.handler; import com.github.houbb.log.integration.core.Log; import com.github.houbb.log.integration.core.LogFactory; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * 注册中心 * (1)用于和注册中心建立长连接。 * (2)初期设计中服务端不需要做什么事情。 * * 后期可以调整为接收到影响为准,保证请求成功。 * @author binbin.hou * @since 0.0.8 */ public class RpcServerRegisterHandler extends SimpleChannelInboundHandler { private static final Log LOG = LogFactory.getLog(RpcServerRegisterHandler.class); @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { LOG.info("[Rpc Server] received message: {}", msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { LOG.error("[Rpc Server] meet ex", cause); ctx.close(); } }

客户端

DefaultReferenceConfig

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
package com.github.houbb.rpc.client.config.reference.impl; import com.github.houbb.heaven.support.handler.IHandler; import com.github.houbb.heaven.util.guava.Guavas; import com.github.houbb.heaven.util.util.CollectionUtil; import com.github.houbb.log.integration.core.Log; import com.github.houbb.log.integration.core.LogFactory; import com.github.houbb.rpc.client.config.reference.ReferenceConfig; import com.github.houbb.rpc.client.handler.RpcClientHandler; import com.github.houbb.rpc.client.handler.RpcClientRegisterHandler; import com.github.houbb.rpc.client.invoke.InvokeService; import com.github.houbb.rpc.client.invoke.impl.DefaultInvokeService; import com.github.houbb.rpc.client.proxy.ReferenceProxy; import com.github.houbb.rpc.client.proxy.context.ProxyContext; import com.github.houbb.rpc.client.proxy.context.impl.DefaultProxyContext; import com.github.houbb.rpc.common.config.component.RpcAddress; import com.github.houbb.rpc.common.config.component.RpcAddressBuilder; import com.github.houbb.rpc.common.exception.RpcRuntimeException; import com.github.houbb.rpc.common.remote.netty.handler.ChannelHandlers; import com.github.houbb.rpc.common.remote.netty.impl.DefaultNettyClient; import com.github.houbb.rpc.common.rpc.domain.RpcResponse; import com.github.houbb.rpc.common.rpc.domain.impl.RpcResponses; import com.github.houbb.rpc.register.domain.entry.ServiceEntry; import com.github.houbb.rpc.register.domain.entry.impl.ServiceEntryBuilder; import com.github.houbb.rpc.register.domain.message.RegisterMessage; import com.github.houbb.rpc.register.domain.message.impl.RegisterMessages; import com.github.houbb.rpc.register.simple.constant.MessageTypeConst; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import java.util.List; /** * 引用配置类 * * 后期配置: * (1)timeout 调用超时时间 * (2)version 服务版本处理 * (3)callType 调用方式 oneWay/sync/async * (4)check 是否必须要求服务启动。 * * spi: * (1)codec 序列化方式 * (2)netty 网络通讯架构 * (3)load-balance 负载均衡 * (4)失败策略 fail-over/fail-fast * * filter: * (1)路由 * (2)耗时统计 monitor 服务治理 * * 优化思考: * (1)对于唯一的 serviceId,其实其 interface 是固定的,是否可以省去? * @author binbin.hou * @since 0.0.6 * @param <T> 接口泛型 */ public class DefaultReferenceConfig<T> implements ReferenceConfig<T> { private static final Log LOG = LogFactory.getLog(DefaultReferenceConfig.class); /** * 服务唯一标识 * @since 0.0.6 */ private String serviceId; /** * 服务接口 * @since 0.0.6 */ private Class<T> serviceInterface; /** * 服务地址信息 * (1)如果不为空,则直接根据地址获取 * (2)如果为空,则采用自动发现的方式 * * 如果为 subscribe 可以自动发现,然后填充这个字段信息。 * @since 0.0.6 */ private List<RpcAddress> rpcAddresses; /** * 用于写入信息 * (1)client 连接 server 端的 channel future * (2)后期进行 Load-balance 路由等操作。可以放在这里执行。 * @since 0.0.6 */ private List<ChannelFuture> channelFutures; /** * 调用服务管理类 * @since 0.0.6 */ private InvokeService invokeService; /** * 调用超时时间 * @since 0.0.7 */ private long timeout; /** * 是否进行订阅模式 * @since 0.0.8 */ private boolean subscribe; /** * 注册中心列表 * @since 0.0.8 */ private List<RpcAddress> registerCenterList; /** * 注册中心超时时间 * @since 0.0.8 */ private long registerCenterTimeOut; public DefaultReferenceConfig() { // 初始化信息 this.rpcAddresses = Guavas.newArrayList(); this.channelFutures = Guavas.newArrayList(); this.invokeService = new DefaultInvokeService(); // 默认为 60s 超时 this.timeout = 60*1000; this.registerCenterList = Guavas.newArrayList(); this.registerCenterTimeOut = 60*1000; } // 保持不变 /** * 获取对应的引用实现 * (1)处理所有的反射代理信息-方法可以抽离,启动各自独立即可。 * (2)启动对应的长连接 * @return 引用代理类 * @since 0.0.6 */ @Override public T reference() { // 1. 启动 client 端到 server 端的连接信息 // 1.1 为了提升性能,可以将所有的 client=>server 的连接都调整为一个 thread。 // 1.2 初期为了简单,直接使用同步循环的方式。 // 循环连接 List<RpcAddress> rpcAddressList = getRpcAddresses(); for(RpcAddress rpcAddress : rpcAddressList) { final ChannelHandler channelHandler = new RpcClientHandler(invokeService); final ChannelHandler actualChannlHandler = ChannelHandlers.objectCodecHandler(channelHandler); ChannelFuture channelFuture = DefaultNettyClient.newInstance(rpcAddress.address(), rpcAddress.port(), actualChannlHandler).call(); channelFutures.add(channelFuture); } // 2. 接口动态代理 ProxyContext<T> proxyContext = buildReferenceProxyContext(); return ReferenceProxy.newProxyInstance(proxyContext); } @Override public DefaultReferenceConfig<T> timeout(long timeout) { this.timeout = timeout; return this; } @Override public ReferenceConfig<T> subscribe(boolean subscribe) { this.subscribe = subscribe; return this; } @Override public ReferenceConfig<T> registerCenter(String addresses) { this.registerCenterList = RpcAddressBuilder.of(addresses); return this; } /** * 获取 rpc 地址信息列表 * (1)默认直接通过指定的地址获取 * (2)如果指定列表为空,且 * @return rpc 地址信息列表 * @since 0.0.8 */ @SuppressWarnings("unchecked") private List<RpcAddress> getRpcAddresses() { //0. 快速返回 if(CollectionUtil.isNotEmpty(rpcAddresses)) { return rpcAddresses; } //1. 信息检查 registerCenterParamCheck(); //2. 查询服务信息 List<ServiceEntry> serviceEntries = lookUpServiceEntryList(); LOG.info("[Client] register center serviceEntries: {}", serviceEntries); //3. 结果转换 return CollectionUtil.toList(serviceEntries, new IHandler<ServiceEntry, RpcAddress>() { @Override public RpcAddress handle(ServiceEntry serviceEntry) { return new RpcAddress(serviceEntry.ip(), serviceEntry.port(), serviceEntry.weight()); } }); } /** * 注册中心参数检查 * (1)如果可用列表为空,且没有指定自动发现,这个时候服务已经不可用了。 * @since 0.0.8 */ private void registerCenterParamCheck() { if(!subscribe) { LOG.error("[Rpc Client] no available services found for serviceId:{}", serviceId); throw new RpcRuntimeException(); } if(CollectionUtil.isEmpty(registerCenterList)) { LOG.error("[Rpc Client] register center address can't be null!:{}", serviceId); throw new RpcRuntimeException(); } } /** * 查询服务信息列表 * @return 服务明细列表 * @since 0.0.8 */ @SuppressWarnings("unchecked") private List<ServiceEntry> lookUpServiceEntryList() { //1. 连接到注册中心 List<ChannelFuture> channelFutureList = connectRegisterCenter(); //2. 选择一个 // 直接取第一个即可,后续可以使用 load-balance 策略。 ChannelFuture channelFuture = channelFutureList.get(0); //3. 发送查询请求 ServiceEntry serviceEntry = ServiceEntryBuilder.of(serviceId); RegisterMessage registerMessage = RegisterMessages.of(MessageTypeConst.CLIENT_LOOK_UP, serviceEntry); final String seqId = registerMessage.seqId(); invokeService.addRequest(seqId, registerCenterTimeOut); channelFuture.channel().writeAndFlush(registerMessage); //4. 等待查询结果 RpcResponse rpcResponse = invokeService.getResponse(seqId); return (List<ServiceEntry>) RpcResponses.getResult(rpcResponse); } /** * 连接到注册中心 * @return 对应的结果列表 * @since 0.0.8 */ private List<ChannelFuture> connectRegisterCenter() { List<ChannelFuture> futureList = Guavas.newArrayList(registerCenterList.size()); ChannelHandler channelHandler = ChannelHandlers.objectCodecHandler(new RpcClientRegisterHandler(invokeService)); for(RpcAddress rpcAddress : registerCenterList) { final String ip = rpcAddress.address(); final int port = rpcAddress.port(); LOG.info("[Rpc Client] connect to register {}:{} ", ip, port); ChannelFuture channelFuture = DefaultNettyClient .newInstance(ip, port, channelHandler) .call(); futureList.add(channelFuture); } return futureList; } /** * 构建调用上下文 * @return 引用代理上下文 * @since 0.0.6 */ private ProxyContext<T> buildReferenceProxyContext() { DefaultProxyContext<T> proxyContext = new DefaultProxyContext<>(); proxyContext.serviceId(this.serviceId); proxyContext.serviceInterface(this.serviceInterface); proxyContext.channelFutures(this.channelFutures); proxyContext.invokeService(this.invokeService); proxyContext.timeout(this.timeout); return proxyContext; } }

这里客户端启动的时候,会根据是否指定 subscribe 来判断是否需要去注册中心获取服务端地址信息。

测试

注册中心

首先启动注册中心:

  [java]
1
RegisterBs.newInstance().start();

日志如下:

  [plaintext]
1
2
3
4
5
6
7
8
9
10
[DEBUG] [2021-10-05 16:56:33.022] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter. [INFO] [2021-10-05 16:56:33.370] [pool-1-thread-1] [c.g.h.r.c.r.n.i.DefaultNettyServer.run] - [Netty Server] start with port: 8527 and channelHandler: 十月 05, 2021 4:56:34 下午 io.netty.handler.logging.LoggingHandler channelRegistered 信息: [id: 0x63a10deb] REGISTERED 十月 05, 2021 4:56:34 下午 io.netty.handler.logging.LoggingHandler bind 信息: [id: 0x63a10deb] BIND: 0.0.0.0/0.0.0.0:8527 十月 05, 2021 4:56:34 下午 io.netty.handler.logging.LoggingHandler channelActive 信息: [id: 0x63a10deb, L:/0:0:0:0:0:0:0:0:8527] ACTIVE [INFO] [2021-10-05 16:56:34.952] [pool-1-thread-1] [c.g.h.r.c.r.n.i.DefaultNettyServer.run] - [Netty Server] 启动完成,监听【8527】端口

服务端

指定了注册中心的地址,默认为本地 8527 端口。

  [java]
1
2
3
4
5
6
7
public static void main(String[] args) { // 启动服务 DefaultServiceRegistry.getInstance() .register(ServiceIdConst.CALC, new CalculatorServiceImpl()) .registerCenter(ServiceIdConst.REGISTER_CENTER) .expose(); }

启动日志:

  [plaintext]
1
2
3
4
5
6
7
8
9
10
11
[DEBUG] [2021-10-05 16:59:50.633] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter. [INFO] [2021-10-05 16:59:50.703] [main] [c.g.h.r.s.r.i.DefaultServiceRegistry.expose] - server register local finish. [INFO] [2021-10-05 16:59:50.865] [main] [c.g.h.r.s.r.i.DefaultServiceRegistry.expose] - server service start finish. [INFO] [2021-10-05 16:59:50.873] [pool-1-thread-1] [c.g.h.r.c.r.n.i.DefaultNettyServer.run] - [Netty Server] start with port: 9527 and channelHandler: [INFO] [2021-10-05 16:59:50.873] [main] [c.g.h.r.s.r.i.DefaultServiceRegistry.registerServiceCenter] - [Rpc Server] start register to 127.0.0.1:8527 [INFO] [2021-10-05 16:59:50.886] [main] [c.g.h.r.c.r.n.i.DefaultNettyClient.call] - [Netty Client] 开始启动客户端 ... [INFO] [2021-10-05 16:59:51.981] [pool-1-thread-1] [c.g.h.r.c.r.n.i.DefaultNettyServer.run] - [Netty Server] 启动完成,监听【9527】端口 [INFO] [2021-10-05 16:59:51.983] [main] [c.g.h.r.c.r.n.i.DefaultNettyClient.call] - [Netty Client] 启动客户端完成,监听地址 127.0.0.1:8527 [INFO] [2021-10-05 16:59:52.003] [main] [c.g.h.r.s.r.i.DefaultServiceRegistry.registerServiceCenter] - [Rpc Server] register to service center: DefaultRegisterMessage{header=DefaultRegisterMessageHeader{type=1}, body=DefaultServiceEntry{serviceId='calc', description='null', ip='192.168.124.16', port=9527, weight=0}} [INFO] [2021-10-05 16:59:52.014] [main] [c.g.h.r.s.r.i.DefaultServiceRegistry.expose] - server service register finish.

server 对于 register center 而言,也是客户端。

启动的时候,会向注册中心进行一次注册。

此时,注册中心的日志如下:

  [plaintext]
1
2
3
4
[INFO] [2021-10-05 16:59:52.030] [nioEventLoopGroup-2-1] [c.g.h.r.r.s.h.RegisterCenterServerHandler.channelActive] - [Register Server] channel {} connected 00e04cfffe360988-00000954-00000001-108f8fc327e63f2c-d152f0a9 [INFO] [2021-10-05 16:59:52.121] [nioEventLoopGroup-2-1] [c.g.h.r.r.s.h.RegisterCenterServerHandler.channelRead0] - [Register Server] received message type: 1, seqId: 5429a50862f542e4bf451e8624cc7f12 [INFO] [2021-10-05 16:59:52.123] [nioEventLoopGroup-2-1] [c.g.h.r.r.s.s.i.DefaultServerRegisterService.register] - [Register Server] add service: DefaultServiceEntry{serviceId='calc', description='null', ip='192.168.124.16', port=9527, weight=0} [INFO] [2021-10-05 16:59:52.124] [nioEventLoopGroup-2-1] [c.g.h.r.r.s.c.i.DefaultClientRegisterService.notify] - [Register] notify clients is empty for service: calc

客户端

测试代码

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) { // 服务配置信息 ReferenceConfig<CalculatorService> config = new DefaultReferenceConfig<CalculatorService>(); config.serviceId(ServiceIdConst.CALC); config.serviceInterface(CalculatorService.class); // 自动发现服务 config.subscribe(true); config.registerCenter(ServiceIdConst.REGISTER_CENTER); CalculatorService calculatorService = config.reference(); CalculateRequest request = new CalculateRequest(); request.setOne(10); request.setTwo(20); CalculateResponse response = calculatorService.sum(request); System.out.println("响应结果:" + response); }

日志

  [plaintext]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
[DEBUG] [2021-10-05 17:05:52.360] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter. [INFO] [2021-10-05 17:05:52.602] [main] [c.g.h.r.c.c.r.i.DefaultReferenceConfig.connectRegisterCenter] - [Rpc Client] connect to register 127.0.0.1:8527 [INFO] [2021-10-05 17:05:52.610] [main] [c.g.h.r.c.r.n.i.DefaultNettyClient.call] - [Netty Client] 开始启动客户端 [INFO] [2021-10-05 17:05:53.665] [main] [c.g.h.r.c.r.n.i.DefaultNettyClient.call] - [Netty Client] 启动客户端完成,监听地址 127.0.0.1:8527 [INFO] [2021-10-05 17:05:53.676] [main] [c.g.h.r.c.i.i.DefaultInvokeService.addRequest] - [Client] start add request for seqId: 4c8455ae4f574478bf78772024608bc2, timeoutMills: 60000 [INFO] [2021-10-05 17:05:53.688] [main] [c.g.h.r.c.i.i.DefaultInvokeService.getResponse] - [Client] seq 4c8455ae4f574478bf78772024608bc2 对应结果为空,进入等待 [INFO] [2021-10-05 17:05:53.752] [nioEventLoopGroup-2-1] [c.g.h.r.c.h.RpcClientRegisterHandler.channelRead0] - [Client Register] response is :DefaultRpcResponse{seqId='4c8455ae4f574478bf78772024608bc2', error=null, result=[DefaultServiceEntry{serviceId='calc', description='null', ip='192.168.124.16', port=9527, weight=0}]} [INFO] [2021-10-05 17:05:53.753] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] 获取结果信息,seqId: 4c8455ae4f574478bf78772024608bc2, rpcResponse: DefaultRpcResponse{seqId='4c8455ae4f574478bf78772024608bc2', error=null, result=[DefaultServiceEntry{serviceId='calc', description='null', ip='192.168.124.16', port=9527, weight=0}]} [INFO] [2021-10-05 17:05:53.753] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] seqId:4c8455ae4f574478bf78772024608bc2 信息已经放入,通知所有等待方 [INFO] [2021-10-05 17:05:53.755] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] seqId:4c8455ae4f574478bf78772024608bc2 remove from request map [INFO] [2021-10-05 17:05:53.755] [main] [c.g.h.r.c.i.i.DefaultInvokeService.getResponse] - [Client] seq 4c8455ae4f574478bf78772024608bc2 对应结果已经获取: DefaultRpcResponse{seqId='4c8455ae4f574478bf78772024608bc2', error=null, result=[DefaultServiceEntry{serviceId='calc', description='null', ip='192.168.124.16', port=9527, weight=0}]} [INFO] [2021-10-05 17:05:53.759] [main] [c.g.h.r.c.c.r.i.DefaultReferenceConfig.getRpcAddresses] - [Client] register center serviceEntries: [DefaultServiceEntry{serviceId='calc', description='null', ip='192.168.124.16', port=9527, weight=0}] [INFO] [2021-10-05 17:05:53.761] [main] [c.g.h.r.c.r.n.i.DefaultNettyClient.call] - [Netty Client] 开始启动客户端 [INFO] [2021-10-05 17:05:53.773] [main] [c.g.h.r.c.r.n.i.DefaultNettyClient.call] - [Netty Client] 启动客户端完成,监听地址 192.168.124.16:9527 [INFO] [2021-10-05 17:05:53.785] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start call remote with request: DefaultRpcRequest{seqId='81a4687bce6d41bebf79fd3b292d934c', createTime=1633424753779, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]} [INFO] [2021-10-05 17:05:53.785] [main] [c.g.h.r.c.i.i.DefaultInvokeService.addRequest] - [Client] start add request for seqId: 81a4687bce6d41bebf79fd3b292d934c, timeoutMills: 60000 [INFO] [2021-10-05 17:05:53.813] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start call channel id: 00e04cfffe360988-000031fc-00000001-36c33ae099fbc46b-7fefae5a [INFO] [2021-10-05 17:05:53.825] [main] [c.g.h.r.c.i.i.DefaultInvokeService.getResponse] - [Client] seq 81a4687bce6d41bebf79fd3b292d934c 对应结果为空,进入等待 [INFO] [2021-10-05 17:05:53.851] [nioEventLoopGroup-4-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] 获取结果信息,seqId: 81a4687bce6d41bebf79fd3b292d934c, rpcResponse: DefaultRpcResponse{seqId='81a4687bce6d41bebf79fd3b292d934c', error=null, result=CalculateResponse{success=true, sum=30}} [INFO] [2021-10-05 17:05:53.851] [nioEventLoopGroup-4-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] seqId:81a4687bce6d41bebf79fd3b292d934c 信息已经放入,通知所有等待方 [INFO] [2021-10-05 17:05:53.852] [nioEventLoopGroup-4-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] seqId:81a4687bce6d41bebf79fd3b292d934c remove from request map [INFO] [2021-10-05 17:05:53.852] [nioEventLoopGroup-4-1] [c.g.h.r.c.h.RpcClientHandler.channelRead0] - [Client] response is :DefaultRpcResponse{seqId='81a4687bce6d41bebf79fd3b292d934c', error=null, result=CalculateResponse{success=true, sum=30}} [INFO] [2021-10-05 17:05:53.855] [main] [c.g.h.r.c.i.i.DefaultInvokeService.getResponse] - [Client] seq 81a4687bce6d41bebf79fd3b292d934c 对应结果已经获取: DefaultRpcResponse{seqId='81a4687bce6d41bebf79fd3b292d934c', error=null, result=CalculateResponse{success=true, sum=30}} 响应结果:CalculateResponse{success=true, sum=30}

客户端主要分为两个部分:

(1)去注册中心查询对应的服务端地址

(2)根据服务端地址,创建对应的客户端请求代理

不足之处

(1)服务端关闭之后,没有调用 unRegister 方法。

(2)客户端关闭之后,没有调用 unSubscribe 方法。

(3)缺少心跳机制,服务器挂掉无法及时感知。

小结

为了便于大家学习,以上源码已经开源:

https://github.com/houbb/rpc

我是老马,期待与你的下次重逢。