说明

我们上一章的例子中,我们的调用是在客户端启动的时候完成的。

实际使用中,我们希望调用可以有客户端主动发起。

客户端

核心

/**
 * <p> rpc 客户端 </p>
 *
 * <pre> Created: 2019/10/16 11:21 下午  </pre>
 * <pre> Project: rpc  </pre>
 *
 * @author houbinbin
 * @since 0.0.2
 */
public class RpcClient {

    // 和以前保持一致

    /**
     * 开始运行
     */
    public void start() {
        // 启动服务端
        log.info("RPC 服务开始启动客户端");

        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            channelFuture = bootstrap.group(workerGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .handler(new ChannelInitializer<Channel>(){
                        @Override
                        protected void initChannel(Channel ch) throws Exception {
                            channelHandler = new RpcClientHandler();
                            ch.pipeline()
                                    .addLast(new LoggingHandler(LogLevel.INFO))
                                    .addLast(new CalculateRequestEncoder())
                                    .addLast(new CalculateResponseDecoder())
                                    .addLast(channelHandler);
                        }
                    })
                    .connect(RpcConstant.ADDRESS, port)
                    .syncUninterruptibly();
            log.info("RPC 服务启动客户端完成,监听端口:" + port);
        } catch (Exception e) {
            log.error("RPC 客户端遇到异常", e);
            throw new RuntimeException(e);
        }
        // 不要关闭线程池!!!
    }

    /**
     * 调用计算
     * @param request 请求信息
     * @return 结果
     * @since 0.0.4
     */
    public CalculateResponse calculate(final CalculateRequest request) {
        // 发送请求
        final Channel channel = channelFuture.channel();
        log.info("RPC 客户端发送请求,request: {}", request);

        // 关闭当前线程,以获取对应的信息
        channel.writeAndFlush(request);
        channel.closeFuture().syncUninterruptibly();

        return channelHandler.getResponse();
    }

}

我们将计算部分的方法单独抽离出来。

RpcClientHandler

客户端处理类实现如下:

/*
 * Copyright (c)  2019. houbinbin Inc.
 * rpc All rights reserved.
 */

package com.github.houbb.rpc.client.handler;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.client.core.RpcClient;
import com.github.houbb.rpc.common.model.CalculateResponse;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * <p> 客户端处理类 </p>
 *
 * <pre> Created: 2019/10/16 11:30 下午  </pre>
 * <pre> Project: rpc  </pre>
 *
 * @author houbinbin
 * @since 0.0.2
 */
public class RpcClientHandler extends SimpleChannelInboundHandler {

    private static final Log log = LogFactory.getLog(RpcClient.class);

    /**
     * 响应信息
     * @since 0.0.4
     */
    private CalculateResponse response;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        CalculateResponse response = (CalculateResponse)msg;

        this.response = response;
        log.info("[Client] response is :{}", response);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // 每次用完要关闭,不然拿不到response,我也不知道为啥(目测得了解netty才行)
        // 个人理解:如果不关闭,则永远会被阻塞。
        ctx.flush();
        ctx.close();
    }

    public CalculateResponse getResponse() {
        return response;
    }

}

CalculatorProxy

计算的代理实现。

package com.github.houbb.rpc.client.proxy;

import com.github.houbb.rpc.client.core.RpcClient;
import com.github.houbb.rpc.common.model.CalculateRequest;
import com.github.houbb.rpc.common.model.CalculateResponse;
import com.github.houbb.rpc.common.service.Calculator;

/**
 * @author binbin.hou
 * @since 0.0.4
 */
public class CalculatorProxy implements Calculator {

    /**
     * rpc 客户端
     */
    private RpcClient rpcClient;

    /**
     * 创建类
     * (1)默认初始化 client 端
     */
    public CalculatorProxy() {
        rpcClient = new RpcClient();
        rpcClient.start();
    }

    @Override
    public CalculateResponse sum(CalculateRequest request) {
        return rpcClient.calculate(request);
    }

}

编码&解码

和以前保持一致。

服务端保持不变。

测试

服务端

服务端启动,日志如下:

[DEBUG] [2021-10-05 12:29:40.307] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
[INFO] [2021-10-05 12:29:40.314] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服务开始启动服务端
十月 05, 2021 12:29:41 下午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0xb4519e4f] REGISTERED
十月 05, 2021 12:29:41 下午 io.netty.handler.logging.LoggingHandler bind
信息: [id: 0xb4519e4f] BIND: 0.0.0.0/0.0.0.0:9527
十月 05, 2021 12:29:41 下午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0xb4519e4f, L:/0:0:0:0:0:0:0:0:9527] ACTIVE
[INFO] [2021-10-05 12:29:41.832] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服务端启动完成,监听【9527】端口

客户端

客户端主动调用,可以更加灵活。

调用

/**
 * 服务启动代码测试
 * @param args 参数
 */
public static void main(String[] args) {
    Calculator calculator = new CalculatorProxy();
    CalculateRequest request = new CalculateRequest();
    request.setOne(5);
    request.setTwo(6);
    CalculateResponse response = calculator.sum(request);
    System.out.println("rpc call result: " + response);
}

日志如下:

[DEBUG] [2021-10-05 12:30:36.172] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
[INFO] [2021-10-05 12:30:36.182] [main] [c.g.h.r.c.c.RpcClient.start] - RPC 服务开始启动客户端
十月 05, 2021 12:30:38 下午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0x7dbd673d] REGISTERED
十月 05, 2021 12:30:38 下午 io.netty.handler.logging.LoggingHandler connect
信息: [id: 0x7dbd673d] CONNECT: /127.0.0.1:9527
[INFO] [2021-10-05 12:30:38.054] [main] [c.g.h.r.c.c.RpcClient.start] - RPC 服务启动客户端完成,监听端口:9527
[INFO] [2021-10-05 12:30:38.058] [main] [c.g.h.r.c.c.RpcClient.calculate] - RPC 客户端发送请求,request: CalculateRequest{one=5, two=6}
十月 05, 2021 12:30:38 下午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0x7dbd673d, L:/127.0.0.1:60689 - R:/127.0.0.1:9527] ACTIVE
十月 05, 2021 12:30:38 下午 io.netty.handler.logging.LoggingHandler write
信息: [id: 0x7dbd673d, L:/127.0.0.1:60689 - R:/127.0.0.1:9527] WRITE: 8B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05 00 00 00 06                         |........        |
+--------+-------------------------------------------------+----------------+
十月 05, 2021 12:30:38 下午 io.netty.handler.logging.LoggingHandler flush
信息: [id: 0x7dbd673d, L:/127.0.0.1:60689 - R:/127.0.0.1:9527] FLUSH
十月 05, 2021 12:30:38 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0x7dbd673d, L:/127.0.0.1:60689 - R:/127.0.0.1:9527] READ: 5B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 00 00 00 0b                                  |.....           |
+--------+-------------------------------------------------+----------------+
十月 05, 2021 12:30:38 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0x7dbd673d, L:/127.0.0.1:60689 - R:/127.0.0.1:9527] READ COMPLETE
十月 05, 2021 12:30:38 下午 io.netty.handler.logging.LoggingHandler flush
信息: [id: 0x7dbd673d, L:/127.0.0.1:60689 - R:/127.0.0.1:9527] FLUSH
十月 05, 2021 12:30:38 下午 io.netty.handler.logging.LoggingHandler close
信息: [id: 0x7dbd673d, L:/127.0.0.1:60689 - R:/127.0.0.1:9527] CLOSE
十月 05, 2021 12:30:38 下午 io.netty.handler.logging.LoggingHandler channelInactive
信息: [id: 0x7dbd673d, L:/127.0.0.1:60689 ! R:/127.0.0.1:9527] INACTIVE
十月 05, 2021 12:30:38 下午 io.netty.handler.logging.LoggingHandler channelUnregistered
信息: [id: 0x7dbd673d, L:/127.0.0.1:60689 ! R:/127.0.0.1:9527] UNREGISTERED
[INFO] [2021-10-05 12:30:38.186] [nioEventLoopGroup-2-1] [c.g.h.r.c.c.RpcClient.channelRead0] - [Client] response is :CalculateResponse{success=true, sum=11}
rpc call result: CalculateResponse{success=true, sum=11}

编译报错问题

idea 开源社区版本,2020.03 发现编译会报错:

java: Workaround: to make project compile with the current annotation processor implementation, start JPS with VM option: -Djps.track.ap.dependencies=false
  When run from IDE, the option can be set in "Compiler Settings | build process VM options"

原因

升级到idea 2020.3 版本后,出现无法在编译阶段解析,尤其在处理一些注解,类似lombok这类的。

ps:这个应该是社区版本的 BUG。

解决方案

在 setting –> Compiler 中的vm选项加入

-Djps.track.ap.dependencies=false

或者排除掉 lombok 相关的依赖包。

小结

为了便于大家学习,以上源码已经开源:

https://github.com/houbb/rpc

我是老马,期待与你的下次重逢。