快速开始

Netty-快速开始 演示了一个丢弃版本的 Netty 入门案例。

现在我们来看一个 Echo 版本的例子。

服务端

Handler 实现

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * 自定义的服务端类
 * 1. 注解标识可以被多个 channel 安全的共享。
 * @author binbin.hou
 * @date 2019/3/31
 * @since v1
 */
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 信息读取
     * @param ctx 上下文
     * @param msg 消息
     * @throws Exception if any
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf)msg;
        final String string = byteBuf.toString(CharsetUtil.UTF_8);
        System.out.println("Echo server received: " + string);

        // 将接收到的消息,写给发送者
        ctx.write(byteBuf);
    }

    /**
     * 消息读取完成
     * @param ctx 上下文
     * @throws Exception if any
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //将未处理的消息冲刷到远程
        // 关闭当前 channel
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
                .addListener(ChannelFutureListener.CLOSE);
    }

    /**
     * 异常处理
     * 1. 处理异常消息
     * 2. 关闭 channel
     * @param ctx 上下文
     * @param cause 原因
     * @throws Exception if any
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.err.println("异常原因:" + cause.getMessage());

        ctx.close();
    }
}

启动服务器

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * 服务端
 * @author binbin.hou
 * @date 2019/3/31
 * @since v1
 */
public class EchoServer {

    /**
     * 服务器监听端口号
     */
    private static final int PORT = 8081;

    /**
     * @param args 参数
     */
    public static void main(String[] args) throws InterruptedException {
        // 执行的线程池组
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {
            //启动服务端的类 ServerBootstrap 是启动服务的辅助类,有关socket的参数可以通过ServerBootstrap进行设置。
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap
                    // 指定执行的线程组
                    .group(eventLoopGroup)
                    // 这里指定NioServerSocketChannel类初始化channel用来接受客户端请求。
                    .channel(NioServerSocketChannel.class)
                    // 新的连接创建时,会新增一个 childHandler,并且将这个 handler 放在 pipeline 中。
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 将我们定义的 channelHandler 添加到 pipeline
                            ch.pipeline().addLast(new EchoServerHandler());
                        }
                    });
            // 同步阻塞线程,直到同步绑定端口完成
            ChannelFuture channelFuture = serverBootstrap
                    // 绑定套接字的地址
                    .bind(PORT)
                    .sync();
            System.out.println("Server started on PORT: " + PORT);
            // 同步阻塞线程,直到获取 closeFuture
            channelFuture.channel().closeFuture().sync();
        } finally {
            // 同步优雅的关闭线程池
            eventLoopGroup.shutdownGracefully().sync();
        }
    }
}

客户端

Handler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.nio.charset.StandardCharsets;

/**
 * @author binbin.hou
 * @date 2019/3/31
 * @since v1
 */
@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        final ByteBuf byteBuf = Unpooled.copiedBuffer("Client channel active!", StandardCharsets.UTF_8);
        ctx.writeAndFlush(byteBuf);
    }

    /**
     * 记录已经接受的消息
     * 1. 每次接受数据的时候,都会调用这个方法。
     * 2. 从服务器接收到的数据可能会被分隔为多次接受,即使数据量比较少。
     * 3. 作为一个面向流的协议,TCP 保证了字节数组将会按照服务器发送它们的顺序被接收。
     * @param ctx 上下文
     * @param msg 消息
     * @throws Exception if any
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        final String string = msg.toString(StandardCharsets.UTF_8);
        System.out.println("Client received msg: " + string);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.err.println("Client meet ex: " + cause.getMessage());
        // 关闭当前 channel
        ctx.close();
    }
}

客户端启动函数

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * @author binbin.hou
 * @date 2019/3/31
 * @since v1
 */
public class EchoClient {

    /**
     * 服务器地址
     */
    private static final String HOST = "127.0.0.1";

    /**
     * 服务器 port
     */
    private static final int PORT = 8081;

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new EchoClientHandler());

            ChannelFuture channelFuture = bootstrap.connect(HOST, PORT).sync();
            System.out.println("Client connect to "+HOST+":"+PORT+" success!");
            channelFuture.channel().closeFuture().sync();
        } finally {
            // 同步优雅的关闭线程池。
            eventLoopGroup.shutdownGracefully().sync();
        }
    }
}

测试

启动服务器

日志信息如下

Server started on PORT: 8081

启动客户端

  • 客户端日志
Client connect to 127.0.0.1:8081 success!
Client received msg: Client channel active!

Process finished with exit code 0
  • 服务器日志

与此同时,服务器端也多出一条日志

Echo server received: Client channel active!

疑问

注解 @Sharable的作用?

注解标识当前 Handler 类可以被多个 channel 安全的共享。

为什么二者继承的 Hanlder 不同?

  • SimpleChannelInboundHandler 与 ChannelInboundHandler

你可能会想:为什么我们在客户端使用的是SimpleChannelInboundHandler,而不是在Echo-ServerHandler 中所使用的ChannelInboundHandlerAdapter 呢?

这和两个因素的相互作用有关:业务逻辑如何处理消息以及 Netty 如何管理资源。

在客户端,当channelRead0()方法完成时,你已经有了传入消息,并且已经处理完它了。当该方法返回时,SimpleChannelInboundHandler 负责释放指向保存该消息的ByteBuf 的内存引用。

在 EchoServerHandler 中,你仍然需要将传入消息回送给发送者,而write()操作是异步的,直到channelRead()方法返回后可能仍然没有完成。

为此,EchoServerHandler扩展了ChannelInboundHandlerAdapter,其在这个时间点上不会释放消息。

消息在EchoServerHandler 的channelReadComplete()方法中,当writeAndFlush()方法被调用时被释放。

后续将对消息的资源管理进行详细的介绍。

Handler 中都做了异常的处理,不处理会怎么样?

每个Channel 都拥有一个与之相关联的ChannelPipeline,其持有一个ChannelHandler 的实例链。

在默认的情况下,ChannelHandler 会把对它的方法的调用转发给链中的下一个ChannelHandler。

因此,如果exceptionCaught()方法没有被该链中的某处实现,那么所接收的异常将会被传递到ChannelPipeline 的尾端并被记录。

为此,你的应用程序应该提供至少有一个实现了exceptionCaught()方法的ChannelHandler。

优雅关闭怎么做到的?

学习新技术的方式

  1. 大概了解下这门技术。

  2. 跟着案例手写几遍。理解,然后直到完全记下来。

  3. 不要直接复制。没有用的。像我复制黏贴 java 注解的上面元注解信息,导致记不得怎么写。其实也没有多少东西。

多写几遍,就记住了。

参考资料

《Netty 实战》P54