Netty 权威指南-05-拆包和粘包
TCP为什么会粘包/拆包
我们知道,TCP是以一种流的方式来进行网络转播的,当tcp三次握手建立通信后,客户端服务端之间就建立了一种通讯管道,我们可以想象成自来水管道,流出来的水是连城一片的,是没有分界线的。
TCP底层并不了解上层的业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分。
所以对于我们应用层而言。
我们直观是发送一个个连续完整TCP数据包的,而在底层就可能会出现将一个完整的TCP拆分成多个包发送或者将多个包封装成一个大的数据包发送。
这就是所谓的TCP粘包和拆包。
当发生TCP粘包/拆包会发生什么情况
我们举一个简单例子说明:
客户端向服务端发送两个数据包:第一个内容为 123;第二个内容为456。
服务端接受一个数据并做相应的业务处理(这里就是打印接受数据加一个逗号)。
那么服务端输出结果将会出现下面四种情况
服务端响应结果 | 结论 |
---|---|
123,456, | 正常接收,没有发生粘包和拆包 |
123456, | 异常接收,发生tcp粘包 |
123,4,56, | 异常接收,发生tcp拆包 |
12,3456, | 异常接收,发生tcp拆包和粘包 |
如何解决
主流的协议解决方案可以归纳如下:
(1) 消息定长,例如每个报文的大小固定为20个字节,如果不够,空位补空格;
(2) 在包尾增加回车换行符进行切割;
(3) 将消息分为消息头和消息体,消息头中包含表示消息总长度的字段;
(4) 更复杂的应用层协议。
对于之前描述的案例,在这里我们就可以采取方案1和方案3。
以方案1为例:我们每次发送的TCP包只有三个数字,那么我将报文设置为3个字节大小的,此时,服务器就会以三个字节为基准来接受包,以此来解决站包拆包问题。
未考虑拆包/粘包的案例
服务端
- PackTimeServerHandler.java
import java.nio.charset.StandardCharsets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class PackTimeServerHandler extends ChannelInboundHandlerAdapter {
private static final String NEW_LINE = System.getProperty("line.separator");
private int count = 0;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 读取收到的信息
ByteBuf byteBuf = (ByteBuf)msg;
byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
// 获取内容,移除掉换行符号
String body = new String(bytes, 0, bytes.length-NEW_LINE.length(), StandardCharsets.UTF_8);
count++;
System.out.println("Server revice body from client : " + body + ", count is " +count);
// 回写到 client 端时间。
long currentTime = System.currentTimeMillis();
String currentTimeStr = currentTime+""+NEW_LINE;
ByteBuf timeBuffer = Unpooled.copiedBuffer(currentTimeStr.getBytes());
ctx.writeAndFlush(timeBuffer);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
- DefaultServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class DefaultServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
ChannelFuture channelFuture = serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new PackTimeServerHandler())
.bind(8888)
.syncUninterruptibly();
// 优雅关闭
channelFuture.channel().closeFuture().syncUninterruptibly();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
客户端
- PackTimeClientHandler.java
import java.nio.charset.StandardCharsets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class PackTimeClientHandler extends ChannelInboundHandlerAdapter {
private static final String NEW_LINE = System.getProperty("line.separator");
private static final String QUERY_INFO = "query time " + NEW_LINE;
private int count = 0;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 写入到 channel 中
ByteBuf byteBuf;
for(int i = 0 ; i () {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
// 添加解码器
.addLast(new LineBasedFrameDecoder(1024))
.addLast(new StringDecoder())
.addLast(new FixPackTimeServerHandler());
}
})
- FixPackTimeServerHandler.java
public class FixPackTimeServerHandler extends ChannelInboundHandlerAdapter {
private static final String NEW_LINE = System.getProperty("line.separator");
private int count = 0;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 读取收到的信息
String body = (String)msg;
count++;
System.out.println("Server revice body from client : " + body + ", count is " +count);
// 回写到 client 端时间。
long currentTime = System.currentTimeMillis();
String currentTimeStr = currentTime+""+NEW_LINE;
ByteBuf timeBuffer = Unpooled.copiedBuffer(currentTimeStr.getBytes());
ctx.writeAndFlush(timeBuffer);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
客户端
- FixDefaultClient
核心调整
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new LineBasedFrameDecoder(1024))
.addLast(new StringDecoder())
.addLast(new FixPackTimeClientHandler());
}
})
- FixPackTimeClientHandler.java
public class FixPackTimeClientHandler extends ChannelInboundHandlerAdapter {
private static final String NEW_LINE = System.getProperty("line.separator");
private static final String QUERY_INFO = "query time " + NEW_LINE;
private int count = 0;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 写入到 channel 中
ByteBuf message;
for(int i = 0 ; i () {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new DelimiterBasedFrameDecoder(1024,
Unpooled.copiedBuffer("$".getBytes())))
.addLast(new StringDecoder())
.addLast(new DelimiterTimeServerHandler());
}
})
其中:
DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer("$".getBytes())
我们这里指定使用分隔符的方式,去处理我们的拆包/黏包问题。
客户端
- DelimiterTimeClientHandler.java
package com.github.houbb.netty.learn.four.pack.delimiter;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author 老马啸西风
* @since 1.0.0
*/
public class DelimiterTimeClientHandler extends ChannelInboundHandlerAdapter {
private AtomicInteger atomicInteger = new AtomicInteger(0);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// channel 激活是,调用 server 端 20 次
String queryInfo = "ask for time $";
ByteBuf byteBuf;
for(int i = 0; i () {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new DelimiterBasedFrameDecoder(1024,
Unpooled.copiedBuffer("$".getBytes())))
.addLast(new StringDecoder())
.addLast(new DelimiterTimeClientHandler());
}
})
测试验证
(1)启动服务端
(2)启动客户端
- 服务端日志
Server receive from client ask for time , count 1
Server receive from client ask for time , count 2
Server receive from client ask for time , count 3
Server receive from client ask for time , count 4
Server receive from client ask for time , count 5
Server receive from client ask for time , count 6
Server receive from client ask for time , count 7
Server receive from client ask for time , count 8
Server receive from client ask for time , count 9
Server receive from client ask for time , count 10
Server receive from client ask for time , count 11
Server receive from client ask for time , count 12
Server receive from client ask for time , count 13
Server receive from client ask for time , count 14
Server receive from client ask for time , count 15
Server receive from client ask for time , count 16
Server receive from client ask for time , count 17
Server receive from client ask for time , count 18
Server receive from client ask for time , count 19
Server receive from client ask for time , count 20
- 客户端日志
Client receive from server 1568961677739, count 1
Client receive from server 1568961677741, count 2
Client receive from server 1568961677742, count 3
Client receive from server 1568961677742, count 4
Client receive from server 1568961677743, count 5
Client receive from server 1568961677743, count 6
Client receive from server 1568961677743, count 7
Client receive from server 1568961677743, count 8
Client receive from server 1568961677744, count 9
Client receive from server 1568961677744, count 10
Client receive from server 1568961677744, count 11
Client receive from server 1568961677745, count 12
Client receive from server 1568961677745, count 13
Client receive from server 1568961677745, count 14
Client receive from server 1568961677745, count 15
Client receive from server 1568961677746, count 16
Client receive from server 1568961677746, count 17
Client receive from server 1568961677746, count 18
Client receive from server 1568961677746, count 19
Client receive from server 1568961677747, count 20
定长解决方案
方案说明
有时候直接指定长度,根据长度进行截取也是一种常见的方式。
Netty 解决方案
Netty 中提供了类 FixedLengthFrameDecoder
netty 设计的有优点
netty 的这种泳道式设计,使得后期的拓展变得非常简单。
而且提供了大量丰富而强大的类库,极大的降低了重复开发的成本。
服务端
- FixedLengthServerHandler.java
非常简单,直接输出。
public class FixedLengthServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 直接打印信息
String info = (String)msg;
System.out.println(info);
}
}
- 服务器 handler 指定
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new FixedLengthFrameDecoder(20))
.addLast(new StringDecoder())
.addLast(new FixedLengthServerHandler());
}
})
测试
直接使用命令行 telnet localhost 8888
然后输入信息,打印得到
123
asdfasdf
asd
fasdf
asdfasdf
12a
小结
本节的内容相对比较简单,但是确实非常的使用。
这一节主要是参考《Netty 权威指南》中的个人学习笔记,下一节将分析一下 netty 服务端的启动源码。
希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。
我是老马,期待与你的下次相遇。
参考资料
《Netty 权威指南》
- other