在这一节中,我们将讲解如何使用EmbeddedChannel 来测试ChannelHandler。

Junit 断言

org.junit.Assert 类提供了很多用于测试的静态方法。

失败的断言将导致一个异常被抛出,并将终止当前正在执行中的测试。

导入这些断言的最高效的方式是通过一个import static 语句来实现:

import static org.junit.Assert.*;

一旦这样做了,就可以直接调用Assert 方法了:

assertEquals(buf.readSlice(3), read);

解码器与编码器

解码器

Netty 提供了丰富的解码器抽象基类,我们可以很容易的实现这些基类来自定义解码器。主要分两类:

  1. 解码字节到消息(ByteToMessageDecoder 和 ReplayingDecoder)

  2. 解码消息到消息(MessageToMessageDecoder)

decoder 负责将“入站”数据从一种格式转换到另一种格式,Netty的解码器是一种 ChannelInboundHandler 的抽象实现。

实践中使用解码器很简单,就是将入站数据转换格式后传递到 ChannelPipeline 中的下一个ChannelInboundHandler 进行处理;这样的处理是很灵活的,我们可以将解码器放在 ChannelPipeline 中,重用逻辑。

编码器

就像decoder一样,Netty 也为你提供了一组类来写 encoder ,当然这些类提供的是与 decoder 完全相反的方法,如下所示:

  1. 编码从消息到字节

  2. 编码从消息到消息

测试入站消息

场景

我们先来编写一个简单的 ByteToMessageDecoder 实现,在有足够的数据可以读取的情况下将产生固定大小的包,如果没有足够的数据可以读取,则会等待下一个数据块并再次检查是否可以产生一个完整包。

如图所示,它可能会占用一个以上的“event”以获取足够的字节产生一个数据包,并将它传递到 ChannelPipeline 中的下一个 ChannelHandler,

测试入站消息

正如可以从图9-2 右侧的帧看到的那样,这个特定的解码器将产生固定为 3 字节大小的帧。

因此,它可能会需要多个事件来提供足够的字节数以产生一个帧。

最终,每个帧都会被传递给 ChannelPipeline 中的下一个ChannelHandler。

该解码器的实现,如代码清单9-1 所示。

编码

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**
 * @author binbin.hou
 * @date 2019/5/1
 * @since 0.0.1
 */
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {

    /**
     * 指定帧的长度
     */
    private final int length;

    public FixedLengthFrameDecoder(int length) {
        this.length = length;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 当可读取的字节数大于帧指定的长度,则一直读取,并将内容添加到解码的消息列表中。
        while (in.readableBytes() >= length) {
            ByteBuf byteBuf = in.readBytes(length);
            out.add(byteBuf);
        }
    }

}

单元测试

为了验证我们代码的正确性,我们来编写一个测试用例,测试下我们的代码。

正如我们前面所指出的,即使是在简单的代码中,单元测试也能帮助我们防止在将来代码重构时可能会导致的问题,并且能在问题发生时帮助我们诊断它们。

package com.github.houbb.netty.inaction.chap09;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Assert;
import org.junit.Test;

/**
 * @author binbin.hou
 * @date 2019/5/1
 * @since 0.0.1
 */
public class FixedLengthFrameDecoderTest {

    @Test
    public void inTestOne() {
        // 指定创建一个大小为9的 buffer
        ByteBuf byteBuf = Unpooled.buffer();
        for(int i = 0; i < 9; i++) {
            // 写入信息,此时 writeIndex 会自动加1
            byteBuf.writeByte(i);
        }

        final int frameLength = 3;
        // 复制一个 byteBuffer 的拷贝,二者内容是共享的。(浅拷贝)
        ByteBuf input = byteBuf.duplicate();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new FixedLengthFrameDecoder(frameLength));
        // 引用计数+1,并写入到输入流
        Assert.assertTrue(embeddedChannel.writeInbound(input.retain()));
        Assert.assertTrue(embeddedChannel.finish());


        //读取所生成的消息,并且验证是否有3 帧(切片),其中每帧(切片)都为3 字节
        ByteBuf read = embeddedChannel.readInbound();
        // 断言大小为3
        Assert.assertEquals(3, read.capacity());
        //释放当前字节资源,并且将引用计数-1
        read.release();

        read = embeddedChannel.readInbound();
        Assert.assertEquals(3, read.capacity());
        //释放当前字节资源,并且将引用计数-1
        read.release();

        read = embeddedChannel.readInbound();
        Assert.assertEquals(3, read.capacity());
        //释放当前字节资源,并且将引用计数-1
        read.release();

        // 断言已经没有内容了
        Assert.assertNull(embeddedChannel.readInbound());
        // 释放点 buffer
        byteBuf.release();
    }

}
  • 测试2

二者的区别就是在写入的时候,一开始写入2,因为小于指定的3,所以返回 false。

后面读取二者是完全一致的。

    @Test
    public void inTestTwo() {
        //1. 初始化
        ByteBuf byteBuf = Unpooled.buffer(9);
        for(int i = 0; i < 9; i++) {
            byteBuf.writeByte(i);
        }

        //2. 设置输入流
        ByteBuf input = byteBuf.duplicate();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new FixedLengthFrameDecoder(3));
        Assert.assertFalse(embeddedChannel.writeInbound(input.readBytes(2)));
        Assert.assertTrue(embeddedChannel.writeInbound(input.readBytes(7)));
        Assert.assertTrue(embeddedChannel.finish());


        // 读取验证
        ByteBuf read = embeddedChannel.readInbound();
        Assert.assertEquals(3, read.capacity());
        read.release();

        read = embeddedChannel.readInbound();
        Assert.assertEquals(3, read.capacity());
        read.release();

        read = embeddedChannel.readInbound();
        Assert.assertEquals(3, read.capacity());
        read.release();

        read = embeddedChannel.readInbound();
        Assert.assertNull(read);
    }

测试出站消息

入站与出站

二者是类似的。

写入入站,对应读取入站。

写入出站,对应读取出站。

简介

测试出站消息的处理过程和刚才所看到的类似。

在下面的例子中,我们将会展示如何使用EmbeddedChannel 来测试一个编码器形式的ChannelOutboundHandler,编码器是一种将一种消息格式转换为另一种的组件。

你将在下一章中非常详细地学习编码器和解码器,所以现在我们只需要简单地提及我们正在测试的处理器—AbsIntegerEncoder,它是Netty MessageToMessageEncoder 的一个特殊化的实现,用于将负值整数转换为绝对值。

该示例将会按照下列方式工作:

  1. 持有AbsIntegerEncoder 的EmbeddedChannel 将会以4 字节的负整数的形式写出站数据;

  2. 编码器将从传入的ByteBuf 中读取每个负整数,并将会调用Math.abs()方法来获取其绝对值;

  3. 编码器将会把每个负整数的绝对值写到ChannelPipeline 中。

图9-3 展示了该逻辑。

测试出站消息

绝对值整数编码器

package com.github.houbb.netty.inaction.chap09;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;

import java.util.List;

/**
 * 整数绝对值编码器
 * @author binbin.hou
 * @date 2019/5/1
 * @since 0.0.1
 */
public class AbsIntegerEncoder extends MessageToMessageEncoder<ByteBuf> {
    @Override
    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
        // 一个整形是4个字节
        while(msg.readableBytes() >= 4) {
            int value = msg.readInt();
            //将绝对值设置进入编码信息列表
            out.add(Math.abs(value));
        }
    }
}

测试代码

注意:此处写入的时候为 writeInt() 和编码器一一对应。

package com.github.houbb.netty.inaction.chap09;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Assert;
import org.junit.Test;

/**
 * @author binbin.hou
 * @date 2019/5/1
 * @since 0.0.1
 */
public class AbsIntegerEncoderTest {

    @Test
    public void outTest() {
        //1. 初始化 buffer
        ByteBuf byteBuf = Unpooled.buffer();
        for(int i = 1; i < 10; i++) {
            // 写入负数
            byteBuf.writeInt(i*(-1));
        }

        //2. 初始化 channel
        // 通过 api 可以知道,channel 可以指定多个编码/解码器。优秀的设计!
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new AbsIntegerEncoder());
        Assert.assertTrue(embeddedChannel.writeOutbound(byteBuf));
        Assert.assertTrue(embeddedChannel.finish());

        //3. 读取测试
        for(int i = 1; i < 10; i++) {
            // 读取的时候,每4个字节会被转化为对应的int信息流
            int data = embeddedChannel.readOutbound();
            Assert.assertEquals(i, data);
        }
    }

}

参考资料

《Netty in Action》 P137

api

Unpooled

Unpooled

  • public static ByteBuf buffer(int initialCapacity)

Creates a new big-endian Java heap buffer with the specified capacity, which expands its capacity boundlessly on demand.

The new buffer’s readerIndex and writerIndex are 0.

ByteBuf

  • public abstract ByteBuf retain()

Description copied from interface: ReferenceCounted Increases the reference count by 1.

  • readSlice
    /**
     * Returns a new slice of this buffer's sub-region starting at the current
     * {@code readerIndex} and increases the {@code readerIndex} by the size
     * of the new slice (= {@code length}).
     * <p>
     * Also be aware that this method will NOT call {@link #retain()} and so the
     * reference count will NOT be increased.
     *
     * @param length the size of the new slice
     *
     * @return the newly created slice
     *
     * @throws IndexOutOfBoundsException
     *         if {@code length} is greater than {@code this.readableBytes}
     */
    public abstract ByteBuf readSlice(int length);

channel

  • readInBound()

Return received data from this {@link Channel}