register 注册中心
上一节我们实现了 register 注册中心的基本实现,当然客户端和服务端也需要相关的实现调整。
服务端
ServiceRegistry
接口
调整如下:
package com.github.houbb.rpc.server.registry;
/**
* 服务注册类
* (1)每个应用唯一
* (2)每个服务的暴露协议应该保持一致
* 暂时不提供单个服务的特殊处理,后期可以考虑添加
*
* @author binbin.hou
* @since 0.0.6
*/
public interface ServiceRegistry {
// 不变
/**
* 注册中心地址信息
* @param addresses 地址信息
* @return this
* @since 0.0.8
*/
ServiceRegistry registerCenter(final String addresses);
}
实现
package com.github.houbb.rpc.server.registry.impl;
import com.github.houbb.heaven.util.common.ArgUtil;
import com.github.houbb.heaven.util.guava.Guavas;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.common.config.component.RpcAddress;
import com.github.houbb.rpc.common.config.component.RpcAddressBuilder;
import com.github.houbb.rpc.common.config.protocol.ProtocolConfig;
import com.github.houbb.rpc.common.remote.netty.handler.ChannelHandlers;
import com.github.houbb.rpc.common.remote.netty.impl.DefaultNettyClient;
import com.github.houbb.rpc.common.remote.netty.impl.DefaultNettyServer;
import com.github.houbb.rpc.common.util.NetUtil;
import com.github.houbb.rpc.register.domain.entry.ServiceEntry;
import com.github.houbb.rpc.register.domain.entry.impl.ServiceEntryBuilder;
import com.github.houbb.rpc.register.domain.message.RegisterMessage;
import com.github.houbb.rpc.register.domain.message.impl.RegisterMessages;
import com.github.houbb.rpc.register.simple.constant.MessageTypeConst;
import com.github.houbb.rpc.server.config.service.DefaultServiceConfig;
import com.github.houbb.rpc.server.config.service.ServiceConfig;
import com.github.houbb.rpc.server.handler.RpcServerHandler;
import com.github.houbb.rpc.server.handler.RpcServerRegisterHandler;
import com.github.houbb.rpc.server.registry.ServiceRegistry;
import com.github.houbb.rpc.server.service.impl.DefaultServiceFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import java.util.ArrayList;
import java.util.List;
/**
* 默认服务端注册类
* @author binbin.hou
* @since 0.0.6
*/
public class DefaultServiceRegistry implements ServiceRegistry {
/**
* 日志信息
* @since 0.0.8
*/
private static final Log LOG = LogFactory.getLog(DefaultServiceRegistry.class);
/**
* 单例信息
* @since 0.0.6
*/
private static final DefaultServiceRegistry INSTANCE = new DefaultServiceRegistry();
/**
* rpc 服务端端口号
* @since 0.0.6
*/
private int rpcPort;
/**
* 协议配置
* (1)默认只实现 tcp
* (2)后期可以拓展实现 web-service/http/https 等等。
* @since 0.0.6
*/
private ProtocolConfig protocolConfig;
/**
* 服务配置列表
* @since 0.0.6
*/
private List<ServiceConfig> serviceConfigList;
/**
* 注册中心地址列表
* @since 0.0.8
*/
private List<RpcAddress> registerCenterList;
private DefaultServiceRegistry(){
// 初始化默认参数
this.serviceConfigList = new ArrayList<>();
this.rpcPort = 9527;
this.registerCenterList = Guavas.newArrayList();
}
public static DefaultServiceRegistry getInstance() {
return INSTANCE;
}
@Override
public ServiceRegistry port(int port) {
ArgUtil.positive(port, "port");
this.rpcPort = port;
return this;
}
/**
* 注册服务实现
* (1)主要用于后期服务调用
* (2)如何根据 id 获取实现?非常简单,id 是唯一的。
* 有就是有,没有就抛出异常,直接返回。
* (3)如果根据 {@link com.github.houbb.rpc.common.rpc.domain.RpcRequest} 获取对应的方法。
*
* 3.1 根据 serviceId 获取唯一的实现
* 3.2 根据 {@link Class#getMethod(String, Class[])} 方法名称+参数类型唯一获取方法
* 3.3 根据 {@link java.lang.reflect.Method#invoke(Object, Object...)} 执行方法
*
* @param serviceId 服务标识
* @param serviceImpl 服务实现
* @return this
* @since 0.0.6
*/
@Override
@SuppressWarnings("unchecked")
public synchronized DefaultServiceRegistry register(final String serviceId, final Object serviceImpl) {
ArgUtil.notEmpty(serviceId, "serviceId");
ArgUtil.notNull(serviceImpl, "serviceImpl");
// 构建对应的其他信息
ServiceConfig serviceConfig = new DefaultServiceConfig();
//TODO: 是否暴露服务,允许用户指定
serviceConfig.id(serviceId).reference(serviceImpl).register(true);
serviceConfigList.add(serviceConfig);
return this;
}
@Override
public ServiceRegistry expose() {
// 注册所有服务信息
DefaultServiceFactory.getInstance()
.registerServicesLocal(serviceConfigList);
LOG.info("server register local finish.");
// 启动 netty server 信息
final ChannelHandler channelHandler = ChannelHandlers
.objectCodecHandler(new RpcServerHandler());
DefaultNettyServer.newInstance(rpcPort, channelHandler).asyncRun();
LOG.info("server service start finish.");
// 注册到配置中心
this.registerServiceCenter();
LOG.info("server service register finish.");
return this;
}
@Override
public ServiceRegistry registerCenter(String addresses) {
this.registerCenterList = RpcAddressBuilder.of(addresses);
return this;
}
/**
* 注冊服務到注册中心
* (1)循环服务列表注册到配置中心列表
* (2)如果 register 为 false,则不进行注册
* (3)后期可以添加延迟暴露,但是感觉意义不大。
* @since 0.0.8
*/
private void registerServiceCenter() {
// 注册到配置中心
// 初期简单点,直接循环调用即可
// 循环服务信息
for(ServiceConfig config : this.serviceConfigList) {
boolean register = config.register();
final String serviceId = config.id();
if(!register) {
LOG.info("[Rpc Server] serviceId: {} register config is false.",
serviceId);
continue;
}
for(RpcAddress rpcAddress : registerCenterList) {
ChannelHandler registerHandler = ChannelHandlers.objectCodecHandler(new RpcServerRegisterHandler());
LOG.info("[Rpc Server] start register to {}:{}", rpcAddress.address(),
rpcAddress.port());
ChannelFuture channelFuture = DefaultNettyClient.newInstance(rpcAddress.address(), rpcAddress.port(),registerHandler).call();
// 直接写入信息
RegisterMessage registerMessage = buildRegisterMessage(config);
LOG.info("[Rpc Server] register to service center: {}", registerMessage);
channelFuture.channel().writeAndFlush(registerMessage);
}
}
}
/**
* 构建注册信息配置
* @param config 配置信息
* @return 注册信息
* @since 0.0.6
*/
private RegisterMessage buildRegisterMessage(final ServiceConfig config) {
final String hostIp = NetUtil.getLocalHost();
ServiceEntry serviceEntry = ServiceEntryBuilder.of(config.id(),
hostIp, rpcPort);
return RegisterMessages.of(MessageTypeConst.SERVER_REGISTER,
serviceEntry);
}
}
服务端启动的时候,可以指定是否 register 到注册中心。
我们会循环整个注册中心列表,然后把 register=true 的服务,注册到每一个注册中心中去。
注册内容也比较简单,就是将标识,对应的服务端地址信息通知到注册中心。
RpcServerRegisterHandler
这个是服务端关于注册中心的 handler,实现暂时比较简单:
package com.github.houbb.rpc.server.handler;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* 注册中心
* (1)用于和注册中心建立长连接。
* (2)初期设计中服务端不需要做什么事情。
*
* 后期可以调整为接收到影响为准,保证请求成功。
* @author binbin.hou
* @since 0.0.8
*/
public class RpcServerRegisterHandler extends SimpleChannelInboundHandler {
private static final Log LOG = LogFactory.getLog(RpcServerRegisterHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
LOG.info("[Rpc Server] received message: {}", msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOG.error("[Rpc Server] meet ex", cause);
ctx.close();
}
}
客户端
DefaultReferenceConfig
package com.github.houbb.rpc.client.config.reference.impl;
import com.github.houbb.heaven.support.handler.IHandler;
import com.github.houbb.heaven.util.guava.Guavas;
import com.github.houbb.heaven.util.util.CollectionUtil;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.client.config.reference.ReferenceConfig;
import com.github.houbb.rpc.client.handler.RpcClientHandler;
import com.github.houbb.rpc.client.handler.RpcClientRegisterHandler;
import com.github.houbb.rpc.client.invoke.InvokeService;
import com.github.houbb.rpc.client.invoke.impl.DefaultInvokeService;
import com.github.houbb.rpc.client.proxy.ReferenceProxy;
import com.github.houbb.rpc.client.proxy.context.ProxyContext;
import com.github.houbb.rpc.client.proxy.context.impl.DefaultProxyContext;
import com.github.houbb.rpc.common.config.component.RpcAddress;
import com.github.houbb.rpc.common.config.component.RpcAddressBuilder;
import com.github.houbb.rpc.common.exception.RpcRuntimeException;
import com.github.houbb.rpc.common.remote.netty.handler.ChannelHandlers;
import com.github.houbb.rpc.common.remote.netty.impl.DefaultNettyClient;
import com.github.houbb.rpc.common.rpc.domain.RpcResponse;
import com.github.houbb.rpc.common.rpc.domain.impl.RpcResponses;
import com.github.houbb.rpc.register.domain.entry.ServiceEntry;
import com.github.houbb.rpc.register.domain.entry.impl.ServiceEntryBuilder;
import com.github.houbb.rpc.register.domain.message.RegisterMessage;
import com.github.houbb.rpc.register.domain.message.impl.RegisterMessages;
import com.github.houbb.rpc.register.simple.constant.MessageTypeConst;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import java.util.List;
/**
* 引用配置类
*
* 后期配置:
* (1)timeout 调用超时时间
* (2)version 服务版本处理
* (3)callType 调用方式 oneWay/sync/async
* (4)check 是否必须要求服务启动。
*
* spi:
* (1)codec 序列化方式
* (2)netty 网络通讯架构
* (3)load-balance 负载均衡
* (4)失败策略 fail-over/fail-fast
*
* filter:
* (1)路由
* (2)耗时统计 monitor 服务治理
*
* 优化思考:
* (1)对于唯一的 serviceId,其实其 interface 是固定的,是否可以省去?
* @author binbin.hou
* @since 0.0.6
* @param <T> 接口泛型
*/
public class DefaultReferenceConfig<T> implements ReferenceConfig<T> {
private static final Log LOG = LogFactory.getLog(DefaultReferenceConfig.class);
/**
* 服务唯一标识
* @since 0.0.6
*/
private String serviceId;
/**
* 服务接口
* @since 0.0.6
*/
private Class<T> serviceInterface;
/**
* 服务地址信息
* (1)如果不为空,则直接根据地址获取
* (2)如果为空,则采用自动发现的方式
*
* 如果为 subscribe 可以自动发现,然后填充这个字段信息。
* @since 0.0.6
*/
private List<RpcAddress> rpcAddresses;
/**
* 用于写入信息
* (1)client 连接 server 端的 channel future
* (2)后期进行 Load-balance 路由等操作。可以放在这里执行。
* @since 0.0.6
*/
private List<ChannelFuture> channelFutures;
/**
* 调用服务管理类
* @since 0.0.6
*/
private InvokeService invokeService;
/**
* 调用超时时间
* @since 0.0.7
*/
private long timeout;
/**
* 是否进行订阅模式
* @since 0.0.8
*/
private boolean subscribe;
/**
* 注册中心列表
* @since 0.0.8
*/
private List<RpcAddress> registerCenterList;
/**
* 注册中心超时时间
* @since 0.0.8
*/
private long registerCenterTimeOut;
public DefaultReferenceConfig() {
// 初始化信息
this.rpcAddresses = Guavas.newArrayList();
this.channelFutures = Guavas.newArrayList();
this.invokeService = new DefaultInvokeService();
// 默认为 60s 超时
this.timeout = 60*1000;
this.registerCenterList = Guavas.newArrayList();
this.registerCenterTimeOut = 60*1000;
}
// 保持不变
/**
* 获取对应的引用实现
* (1)处理所有的反射代理信息-方法可以抽离,启动各自独立即可。
* (2)启动对应的长连接
* @return 引用代理类
* @since 0.0.6
*/
@Override
public T reference() {
// 1. 启动 client 端到 server 端的连接信息
// 1.1 为了提升性能,可以将所有的 client=>server 的连接都调整为一个 thread。
// 1.2 初期为了简单,直接使用同步循环的方式。
// 循环连接
List<RpcAddress> rpcAddressList = getRpcAddresses();
for(RpcAddress rpcAddress : rpcAddressList) {
final ChannelHandler channelHandler = new RpcClientHandler(invokeService);
final ChannelHandler actualChannlHandler = ChannelHandlers.objectCodecHandler(channelHandler);
ChannelFuture channelFuture = DefaultNettyClient.newInstance(rpcAddress.address(), rpcAddress.port(), actualChannlHandler).call();
channelFutures.add(channelFuture);
}
// 2. 接口动态代理
ProxyContext<T> proxyContext = buildReferenceProxyContext();
return ReferenceProxy.newProxyInstance(proxyContext);
}
@Override
public DefaultReferenceConfig<T> timeout(long timeout) {
this.timeout = timeout;
return this;
}
@Override
public ReferenceConfig<T> subscribe(boolean subscribe) {
this.subscribe = subscribe;
return this;
}
@Override
public ReferenceConfig<T> registerCenter(String addresses) {
this.registerCenterList = RpcAddressBuilder.of(addresses);
return this;
}
/**
* 获取 rpc 地址信息列表
* (1)默认直接通过指定的地址获取
* (2)如果指定列表为空,且
* @return rpc 地址信息列表
* @since 0.0.8
*/
@SuppressWarnings("unchecked")
private List<RpcAddress> getRpcAddresses() {
//0. 快速返回
if(CollectionUtil.isNotEmpty(rpcAddresses)) {
return rpcAddresses;
}
//1. 信息检查
registerCenterParamCheck();
//2. 查询服务信息
List<ServiceEntry> serviceEntries = lookUpServiceEntryList();
LOG.info("[Client] register center serviceEntries: {}", serviceEntries);
//3. 结果转换
return CollectionUtil.toList(serviceEntries, new IHandler<ServiceEntry, RpcAddress>() {
@Override
public RpcAddress handle(ServiceEntry serviceEntry) {
return new RpcAddress(serviceEntry.ip(),
serviceEntry.port(), serviceEntry.weight());
}
});
}
/**
* 注册中心参数检查
* (1)如果可用列表为空,且没有指定自动发现,这个时候服务已经不可用了。
* @since 0.0.8
*/
private void registerCenterParamCheck() {
if(!subscribe) {
LOG.error("[Rpc Client] no available services found for serviceId:{}",
serviceId);
throw new RpcRuntimeException();
}
if(CollectionUtil.isEmpty(registerCenterList)) {
LOG.error("[Rpc Client] register center address can't be null!:{}",
serviceId);
throw new RpcRuntimeException();
}
}
/**
* 查询服务信息列表
* @return 服务明细列表
* @since 0.0.8
*/
@SuppressWarnings("unchecked")
private List<ServiceEntry> lookUpServiceEntryList() {
//1. 连接到注册中心
List<ChannelFuture> channelFutureList = connectRegisterCenter();
//2. 选择一个
// 直接取第一个即可,后续可以使用 load-balance 策略。
ChannelFuture channelFuture = channelFutureList.get(0);
//3. 发送查询请求
ServiceEntry serviceEntry = ServiceEntryBuilder.of(serviceId);
RegisterMessage registerMessage = RegisterMessages.of(MessageTypeConst.CLIENT_LOOK_UP, serviceEntry);
final String seqId = registerMessage.seqId();
invokeService.addRequest(seqId, registerCenterTimeOut);
channelFuture.channel().writeAndFlush(registerMessage);
//4. 等待查询结果
RpcResponse rpcResponse = invokeService.getResponse(seqId);
return (List<ServiceEntry>) RpcResponses.getResult(rpcResponse);
}
/**
* 连接到注册中心
* @return 对应的结果列表
* @since 0.0.8
*/
private List<ChannelFuture> connectRegisterCenter() {
List<ChannelFuture> futureList = Guavas.newArrayList(registerCenterList.size());
ChannelHandler channelHandler = ChannelHandlers.objectCodecHandler(new RpcClientRegisterHandler(invokeService));
for(RpcAddress rpcAddress : registerCenterList) {
final String ip = rpcAddress.address();
final int port = rpcAddress.port();
LOG.info("[Rpc Client] connect to register {}:{} ",
ip, port);
ChannelFuture channelFuture = DefaultNettyClient
.newInstance(ip, port, channelHandler)
.call();
futureList.add(channelFuture);
}
return futureList;
}
/**
* 构建调用上下文
* @return 引用代理上下文
* @since 0.0.6
*/
private ProxyContext<T> buildReferenceProxyContext() {
DefaultProxyContext<T> proxyContext = new DefaultProxyContext<>();
proxyContext.serviceId(this.serviceId);
proxyContext.serviceInterface(this.serviceInterface);
proxyContext.channelFutures(this.channelFutures);
proxyContext.invokeService(this.invokeService);
proxyContext.timeout(this.timeout);
return proxyContext;
}
}
这里客户端启动的时候,会根据是否指定 subscribe 来判断是否需要去注册中心获取服务端地址信息。
测试
注册中心
首先启动注册中心:
RegisterBs.newInstance().start();
日志如下:
[DEBUG] [2021-10-05 16:56:33.022] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
[INFO] [2021-10-05 16:56:33.370] [pool-1-thread-1] [c.g.h.r.c.r.n.i.DefaultNettyServer.run] - [Netty Server] start with port: 8527 and channelHandler:
十月 05, 2021 4:56:34 下午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0x63a10deb] REGISTERED
十月 05, 2021 4:56:34 下午 io.netty.handler.logging.LoggingHandler bind
信息: [id: 0x63a10deb] BIND: 0.0.0.0/0.0.0.0:8527
十月 05, 2021 4:56:34 下午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0x63a10deb, L:/0:0:0:0:0:0:0:0:8527] ACTIVE
[INFO] [2021-10-05 16:56:34.952] [pool-1-thread-1] [c.g.h.r.c.r.n.i.DefaultNettyServer.run] - [Netty Server] 启动完成,监听【8527】端口
服务端
指定了注册中心的地址,默认为本地 8527 端口。
public static void main(String[] args) {
// 启动服务
DefaultServiceRegistry.getInstance()
.register(ServiceIdConst.CALC, new CalculatorServiceImpl())
.registerCenter(ServiceIdConst.REGISTER_CENTER)
.expose();
}
启动日志:
[DEBUG] [2021-10-05 16:59:50.633] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
[INFO] [2021-10-05 16:59:50.703] [main] [c.g.h.r.s.r.i.DefaultServiceRegistry.expose] - server register local finish.
[INFO] [2021-10-05 16:59:50.865] [main] [c.g.h.r.s.r.i.DefaultServiceRegistry.expose] - server service start finish.
[INFO] [2021-10-05 16:59:50.873] [pool-1-thread-1] [c.g.h.r.c.r.n.i.DefaultNettyServer.run] - [Netty Server] start with port: 9527 and channelHandler:
[INFO] [2021-10-05 16:59:50.873] [main] [c.g.h.r.s.r.i.DefaultServiceRegistry.registerServiceCenter] - [Rpc Server] start register to 127.0.0.1:8527
[INFO] [2021-10-05 16:59:50.886] [main] [c.g.h.r.c.r.n.i.DefaultNettyClient.call] - [Netty Client] 开始启动客户端
...
[INFO] [2021-10-05 16:59:51.981] [pool-1-thread-1] [c.g.h.r.c.r.n.i.DefaultNettyServer.run] - [Netty Server] 启动完成,监听【9527】端口
[INFO] [2021-10-05 16:59:51.983] [main] [c.g.h.r.c.r.n.i.DefaultNettyClient.call] - [Netty Client] 启动客户端完成,监听地址 127.0.0.1:8527
[INFO] [2021-10-05 16:59:52.003] [main] [c.g.h.r.s.r.i.DefaultServiceRegistry.registerServiceCenter] - [Rpc Server] register to service center: DefaultRegisterMessage{header=DefaultRegisterMessageHeader{type=1}, body=DefaultServiceEntry{serviceId='calc', description='null', ip='192.168.124.16', port=9527, weight=0}}
[INFO] [2021-10-05 16:59:52.014] [main] [c.g.h.r.s.r.i.DefaultServiceRegistry.expose] - server service register finish.
server 对于 register center 而言,也是客户端。
启动的时候,会向注册中心进行一次注册。
此时,注册中心的日志如下:
[INFO] [2021-10-05 16:59:52.030] [nioEventLoopGroup-2-1] [c.g.h.r.r.s.h.RegisterCenterServerHandler.channelActive] - [Register Server] channel {} connected 00e04cfffe360988-00000954-00000001-108f8fc327e63f2c-d152f0a9
[INFO] [2021-10-05 16:59:52.121] [nioEventLoopGroup-2-1] [c.g.h.r.r.s.h.RegisterCenterServerHandler.channelRead0] - [Register Server] received message type: 1, seqId: 5429a50862f542e4bf451e8624cc7f12
[INFO] [2021-10-05 16:59:52.123] [nioEventLoopGroup-2-1] [c.g.h.r.r.s.s.i.DefaultServerRegisterService.register] - [Register Server] add service: DefaultServiceEntry{serviceId='calc', description='null', ip='192.168.124.16', port=9527, weight=0}
[INFO] [2021-10-05 16:59:52.124] [nioEventLoopGroup-2-1] [c.g.h.r.r.s.c.i.DefaultClientRegisterService.notify] - [Register] notify clients is empty for service: calc
客户端
测试代码
public static void main(String[] args) {
// 服务配置信息
ReferenceConfig<CalculatorService> config = new DefaultReferenceConfig<CalculatorService>();
config.serviceId(ServiceIdConst.CALC);
config.serviceInterface(CalculatorService.class);
// 自动发现服务
config.subscribe(true);
config.registerCenter(ServiceIdConst.REGISTER_CENTER);
CalculatorService calculatorService = config.reference();
CalculateRequest request = new CalculateRequest();
request.setOne(10);
request.setTwo(20);
CalculateResponse response = calculatorService.sum(request);
System.out.println("响应结果:" + response);
}
日志
[DEBUG] [2021-10-05 17:05:52.360] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
[INFO] [2021-10-05 17:05:52.602] [main] [c.g.h.r.c.c.r.i.DefaultReferenceConfig.connectRegisterCenter] - [Rpc Client] connect to register 127.0.0.1:8527
[INFO] [2021-10-05 17:05:52.610] [main] [c.g.h.r.c.r.n.i.DefaultNettyClient.call] - [Netty Client] 开始启动客户端
[INFO] [2021-10-05 17:05:53.665] [main] [c.g.h.r.c.r.n.i.DefaultNettyClient.call] - [Netty Client] 启动客户端完成,监听地址 127.0.0.1:8527
[INFO] [2021-10-05 17:05:53.676] [main] [c.g.h.r.c.i.i.DefaultInvokeService.addRequest] - [Client] start add request for seqId: 4c8455ae4f574478bf78772024608bc2, timeoutMills: 60000
[INFO] [2021-10-05 17:05:53.688] [main] [c.g.h.r.c.i.i.DefaultInvokeService.getResponse] - [Client] seq 4c8455ae4f574478bf78772024608bc2 对应结果为空,进入等待
[INFO] [2021-10-05 17:05:53.752] [nioEventLoopGroup-2-1] [c.g.h.r.c.h.RpcClientRegisterHandler.channelRead0] - [Client Register] response is :DefaultRpcResponse{seqId='4c8455ae4f574478bf78772024608bc2', error=null, result=[DefaultServiceEntry{serviceId='calc', description='null', ip='192.168.124.16', port=9527, weight=0}]}
[INFO] [2021-10-05 17:05:53.753] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] 获取结果信息,seqId: 4c8455ae4f574478bf78772024608bc2, rpcResponse: DefaultRpcResponse{seqId='4c8455ae4f574478bf78772024608bc2', error=null, result=[DefaultServiceEntry{serviceId='calc', description='null', ip='192.168.124.16', port=9527, weight=0}]}
[INFO] [2021-10-05 17:05:53.753] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] seqId:4c8455ae4f574478bf78772024608bc2 信息已经放入,通知所有等待方
[INFO] [2021-10-05 17:05:53.755] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] seqId:4c8455ae4f574478bf78772024608bc2 remove from request map
[INFO] [2021-10-05 17:05:53.755] [main] [c.g.h.r.c.i.i.DefaultInvokeService.getResponse] - [Client] seq 4c8455ae4f574478bf78772024608bc2 对应结果已经获取: DefaultRpcResponse{seqId='4c8455ae4f574478bf78772024608bc2', error=null, result=[DefaultServiceEntry{serviceId='calc', description='null', ip='192.168.124.16', port=9527, weight=0}]}
[INFO] [2021-10-05 17:05:53.759] [main] [c.g.h.r.c.c.r.i.DefaultReferenceConfig.getRpcAddresses] - [Client] register center serviceEntries: [DefaultServiceEntry{serviceId='calc', description='null', ip='192.168.124.16', port=9527, weight=0}]
[INFO] [2021-10-05 17:05:53.761] [main] [c.g.h.r.c.r.n.i.DefaultNettyClient.call] - [Netty Client] 开始启动客户端
[INFO] [2021-10-05 17:05:53.773] [main] [c.g.h.r.c.r.n.i.DefaultNettyClient.call] - [Netty Client] 启动客户端完成,监听地址 192.168.124.16:9527
[INFO] [2021-10-05 17:05:53.785] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start call remote with request: DefaultRpcRequest{seqId='81a4687bce6d41bebf79fd3b292d934c', createTime=1633424753779, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]}
[INFO] [2021-10-05 17:05:53.785] [main] [c.g.h.r.c.i.i.DefaultInvokeService.addRequest] - [Client] start add request for seqId: 81a4687bce6d41bebf79fd3b292d934c, timeoutMills: 60000
[INFO] [2021-10-05 17:05:53.813] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start call channel id: 00e04cfffe360988-000031fc-00000001-36c33ae099fbc46b-7fefae5a
[INFO] [2021-10-05 17:05:53.825] [main] [c.g.h.r.c.i.i.DefaultInvokeService.getResponse] - [Client] seq 81a4687bce6d41bebf79fd3b292d934c 对应结果为空,进入等待
[INFO] [2021-10-05 17:05:53.851] [nioEventLoopGroup-4-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] 获取结果信息,seqId: 81a4687bce6d41bebf79fd3b292d934c, rpcResponse: DefaultRpcResponse{seqId='81a4687bce6d41bebf79fd3b292d934c', error=null, result=CalculateResponse{success=true, sum=30}}
[INFO] [2021-10-05 17:05:53.851] [nioEventLoopGroup-4-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] seqId:81a4687bce6d41bebf79fd3b292d934c 信息已经放入,通知所有等待方
[INFO] [2021-10-05 17:05:53.852] [nioEventLoopGroup-4-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] seqId:81a4687bce6d41bebf79fd3b292d934c remove from request map
[INFO] [2021-10-05 17:05:53.852] [nioEventLoopGroup-4-1] [c.g.h.r.c.h.RpcClientHandler.channelRead0] - [Client] response is :DefaultRpcResponse{seqId='81a4687bce6d41bebf79fd3b292d934c', error=null, result=CalculateResponse{success=true, sum=30}}
[INFO] [2021-10-05 17:05:53.855] [main] [c.g.h.r.c.i.i.DefaultInvokeService.getResponse] - [Client] seq 81a4687bce6d41bebf79fd3b292d934c 对应结果已经获取: DefaultRpcResponse{seqId='81a4687bce6d41bebf79fd3b292d934c', error=null, result=CalculateResponse{success=true, sum=30}}
响应结果:CalculateResponse{success=true, sum=30}
客户端主要分为两个部分:
(1)去注册中心查询对应的服务端地址
(2)根据服务端地址,创建对应的客户端请求代理
不足之处
(1)服务端关闭之后,没有调用 unRegister 方法。
(2)客户端关闭之后,没有调用 unSubscribe 方法。
(3)缺少心跳机制,服务器挂掉无法及时感知。
小结
为了便于大家学习,以上源码已经开源:
我是老马,期待与你的下次重逢。