rpc 是基于 netty 实现的 java rpc 框架,类似于 dubbo。
主要用于个人学习,由渐入深,理解 rpc 的底层实现原理。
特性
-
基于 netty4 的客户端调用服务端
-
p2p 调用
-
serial 序列化支持
-
timeout 超时处理
-
register center 注册中心
-
load balance 负载均衡
-
callType 支持 oneway sync 等调用方式
-
fail 支持 failOver failFast 等失败处理策略
-
generic 支持泛化调用
-
gracefully 优雅关闭
-
rpcInterceptor 拦截器
-
filter 过滤器
-
check 客户端启动检测服务是否可用
-
heartbeat 服务端心跳
解决的问题
RPC 主要是为了解决的两个问题:
(1)解决分布式系统中,服务之间的调用问题。
(2)远程调用时,要能够像本地调用一样方便,让调用者感知不到远程调用的逻辑。
这一节我们来学习下如何基于 websocket 实现最简单的 rpc 调用,后续会实现基于 netty4 的版本。
上一篇代码基于 socket 的实现非常简单,但是对于实际生产,一般使用 netty。
至于 netty 的优点可以参考:
https://houbb.github.io/2019/05/10/netty-definitive-gudie-04-why-netty
上一篇我们实现了服务端的实现,这一节来一起看一下 client 客户端代码实现。
代码实现
RpcClient
/*
* Copyright (c) 2019. houbinbin Inc.
* rpc All rights reserved.
*/
package com.github.houbb.rpc.client.core;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.client.handler.RpcClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
* rpc 客户端
*
* Created: 2019/10/16 11:21 下午
* Project: rpc
*
* @author houbinbin
* @since 0.0.2
*/
public class RpcClient extends Thread {
private static final Log log = LogFactory.getLog(RpcClient.class);
/**
* 监听端口号
*/
private final int port;
public RpcClient(int port) {
this.port = port;
}
public RpcClient() {
this(9527);
}
@Override
public void run() {
// 启动服务端
log.info("RPC 服务开始启动客户端");
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
ChannelFuture channelFuture = bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer(){
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
.addLast(new LoggingHandler(LogLevel.INFO))
.addLast(new RpcClientHandler());
}
})
.connect("localhost", port)
.syncUninterruptibly();
log.info("RPC 服务启动客户端完成,监听端口:" + port);
channelFuture.channel().closeFuture().syncUninterruptibly();
log.info("RPC 服务开始客户端已关闭");
} catch (Exception e) {
log.error("RPC 客户端遇到异常", e);
} finally {
workerGroup.shutdownGracefully();
}
}
}
java 从零开始手写 RPC (01) 基于 socket 实现
java 从零开始手写 RPC (02)-netty4 实现客户端和服务端
写完了客户端和服务端,那么如何实现客户端和服务端的调用呢?
我们上一章的例子中,我们的调用是在客户端启动的时候完成的。
实际使用中,我们希望调用可以有客户端主动发起。
客户端
核心
/**
* rpc 客户端
*
* Created: 2019/10/16 11:21 下午
* Project: rpc
*
* @author houbinbin
* @since 0.0.2
*/
public class RpcClient {
// 和以前保持一致
/**
* 开始运行
*/
public void start() {
// 启动服务端
log.info("RPC 服务开始启动客户端");
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
channelFuture = bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer(){
@Override
protected void initChannel(Channel ch) throws Exception {
channelHandler = new RpcClientHandler();
ch.pipeline()
.addLast(new LoggingHandler(LogLevel.INFO))
.addLast(new CalculateRequestEncoder())
.addLast(new CalculateResponseDecoder())
.addLast(channelHandler);
}
})
.connect(RpcConstant.ADDRESS, port)
.syncUninterruptibly();
log.info("RPC 服务启动客户端完成,监听端口:" + port);
} catch (Exception e) {
log.error("RPC 客户端遇到异常", e);
throw new RuntimeException(e);
}
// 不要关闭线程池!!!
}
/**
* 调用计算
* @param request 请求信息
* @return 结果
* @since 0.0.4
*/
public CalculateResponse calculate(final CalculateRequest request) {
// 发送请求
final Channel channel = channelFuture.channel();
log.info("RPC 客户端发送请求,request: {}", request);
// 关闭当前线程,以获取对应的信息
channel.writeAndFlush(request);
channel.closeFuture().syncUninterruptibly();
return channelHandler.getResponse();
}
}