回顾

大家好,我是老马。

我们前面学习了 5 分钟入门 spring cloud 实战笔记dubbo 2.7 的 3种入门案例实战, 小伙伴肯定已经有了最基本的认识。

工作使用过 rpc 框架的肯定也觉得 so easy,那后面我们就来一起从零实现属于自己的 rpc 框架。

懂得原理,以后哪怕遇到 GRPC 之类的,上手都会变得简单很多。

so easy

知识储备

建议学习的小伙伴有扎实的 java 基础,最好有一定的 rpc 框架使用经验。

建议的基础储备如下:

Java 并发实战学习

TCP/IP 协议学习笔记

Netty 权威指南学习

为了便于大家理解,这个系列采用渐进式开发,希望每一位小伙伴都可以看懂。

let’s go!

服务端的启动

maven 依赖

这里网络包我们使用成熟的 netty,后续有时间将对 netty 进行一下深入学习,此处不做展开。

  [xml]
1
2
3
4
5
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.17.Final</version> </dependency>

其实最基础的实现也可以从 java 的 socket 开始,不过原理的是类似的,netty 在网络通信方面做了很多封装和改良,我们要学会站在巨人的肩膀上。

巨人

服务端核心代码

基于 netty 的服务端,整体部分是固定的,不要求大家死记硬背。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package com.github.houbb.rpc.server.core; import com.github.houbb.log.integration.core.Log; import com.github.houbb.log.integration.core.LogFactory; import com.github.houbb.rpc.server.constant.RpcServerConst; import com.github.houbb.rpc.server.handler.RpcServerHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * rpc 服务端 * @author binbin.hou * @since 0.0.1 */ public class RpcServer extends Thread { private static final Log log = LogFactory.getLog(RpcServer.class); /** * 端口号 */ private final int port; public RpcServer() { this.port = RpcServerConst.DEFAULT_PORT; } public RpcServer(int port) { this.port = port; } @Override public void run() { // 启动服务端 log.info("RPC 服务开始启动服务端"); EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(workerGroup, bossGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new RpcServerHandler()); } }) // 这个参数影响的是还没有被accept 取出的连接 .option(ChannelOption.SO_BACKLOG, 128) // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。 .childOption(ChannelOption.SO_KEEPALIVE, true); // 绑定端口,开始接收进来的链接 ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly(); log.info("RPC 服务端启动完成,监听【" + port + "】端口"); channelFuture.channel().closeFuture().syncUninterruptibly(); log.info("RPC 服务端关闭完成"); } catch (Exception e) { log.error("RPC 服务异常", e); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }

netty 的 api 设计是老马非常喜欢的风格,我的 90% 的工具类都会采用这种 fluent-api 的风格,写起来非常之流畅。

EventLoopGroup 大家可以理解为是一个连接池,用于提升性能。

NIO 相比于传统的 BIO 性能有着巨大的飞跃,可以参考

java NIO 系列教程

最简单的 Handler

netty 为我们的所有实现,都抽象为了 Handler,我们这里的 RpcServerHandler 非常简单,如下:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.github.houbb.rpc.server.handler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * @author binbin.hou * @since 0.0.1 */ public class RpcServerHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { // do nothing now } }

服务端启动

  [java]
1
2
3
4
5
6
7
/** * 服务启动代码测试 * @param args 参数 */ public static void main(String[] args) { new RpcServer().start(); }

启动日志如下:

  [plaintext]
1
2
[INFO] [2020-12-16 22:46:52.176] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服务开始启动服务端 [INFO] [2020-12-16 22:46:54.436] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服务端启动完成,监听【9627】端口

客户端

同理,我们可以创建需要调用服务端的客户端。

依赖

maven 依赖也是 netty

  [xml]
1
2
3
4
5
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.17.Final</version> </dependency>

客户端核心实现

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package com.github.houbb.rpc.client.core; import com.github.houbb.log.integration.core.Log; import com.github.houbb.log.integration.core.LogFactory; import com.github.houbb.rpc.client.handler.RpcClientHandler; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; /** * <p> rpc 客户端 </p> * * <pre> Created: 2019/10/16 11:21 下午 </pre> * <pre> Project: rpc </pre> * * @author houbinbin * @since 0.0.2 */ public class RpcClient extends Thread { private static final Log log = LogFactory.getLog(RpcClient.class); /** * 监听端口号 */ private final int port; public RpcClient(int port) { this.port = port; } public RpcClient() { this(9527); } @Override public void run() { // 启动服务端 log.info("RPC 服务开始启动客户端"); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); ChannelFuture channelFuture = bootstrap.group(workerGroup) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer<Channel>(){ @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline() // 添加日志输出 Handler .addLast(new LoggingHandler(LogLevel.INFO)) // 添加自定义 Handler .addLast(new RpcClientHandler()); } }) // 连接到本地的 port 指定端口 .connect("localhost", port) .syncUninterruptibly(); log.info("RPC 服务启动客户端完成,监听端口:" + port); channelFuture.channel().closeFuture().syncUninterruptibly(); log.info("RPC 服务开始客户端已关闭"); } catch (Exception e) { log.error("RPC 客户端遇到异常", e); } finally { workerGroup.shutdownGracefully(); } } }

客户端 Handler

这里 Handler 仅作为演示,实现也非常简单:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/* * Copyright (c) 2019. houbinbin Inc. * rpc All rights reserved. */ package com.github.houbb.rpc.client.handler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * <p> 客户端处理类 </p> * * <pre> Created: 2019/10/16 11:30 下午 </pre> * <pre> Project: rpc </pre> * * @author houbinbin * @since 0.0.2 */ public class RpcClientHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { // do nothing. } }

客户端启动

我们在服务端启动之后,可以启动一下客户端进行连接。

  [java]
1
2
3
4
5
6
7
/** * 服务启动代码测试 * @param args 参数 */ public static void main(String[] args) { new RpcClient().start(); }

启动日志如下:

  [plaintext]
1
2
3
4
5
6
7
8
[INFO] [2020-12-16 22:54:22.332] [Thread-0] [c.g.h.r.c.c.RpcClient.run] - RPC 服务开始启动客户端 十二月 16, 2020 10:54:25 下午 io.netty.handler.logging.LoggingHandler channelRegistered 信息: [id: 0x20efe557] REGISTERED 十二月 16, 2020 10:54:25 下午 io.netty.handler.logging.LoggingHandler connect 信息: [id: 0x20efe557] CONNECT: localhost/127.0.0.1:9527 十二月 16, 2020 10:54:25 下午 io.netty.handler.logging.LoggingHandler channelActive 信息: [id: 0x20efe557, L:/127.0.0.1:53434 - R:localhost/127.0.0.1:9527] ACTIVE [INFO] [2020-12-16 22:54:25.045] [Thread-0] [c.g.h.r.c.c.RpcClient.run] - RPC 服务启动客户端完成,监听端口:9527

小结

本地演示了如何使用 netty 实现 rpc,是不是感觉非常的简单?

是的,实际上这正是 netty 的魅力所在,将网络通讯的细节都隐藏起来了,让我们可以更加专注于业务。

千里之行,始于足下。

小伙伴看了之后,也要自己动手写一下。

为了便于大家学习,所有源码均已开源:

https://github.com/houbb/rpc

希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。

我是老马,期待与你的下次相遇。

参考资料

https://mp.weixin.qq.com/s/r9F8qYw8PIcyjGR2yS0Jzg