写大型数据
因为网络饱和的可能性,如何在异步框架中高效地写大块的数据是一个特殊的问题。
由于写操作是非阻塞的,所以即使没有写出所有的数据,写操作也会在完成时返回并通知ChannelFuture。
当这种情况发生时,如果仍然不停地写入,就有内存耗尽的风险。
所以在写大型数据时,需要准备好处理到远程节点的连接是慢速连接的情况,这种情况会导致内存释放的延迟。
让我们考虑下将一个文件内容写出到网络的情况。
在我们讨论传输(见4.2 节)的过程中,提到了NIO 的零拷贝特性,这种特性消除了将文件的内容从文件系统移动到网络栈的复制过程。所有的这一切都发生在Netty 的核心中,所以应用程序所有需要做的就是使用一个FileRegion 接口的实现,
其在Netty 的API 文档中的定义是:“通过支持零拷贝的文件传输的Channel 来发送的文件区域。”
示例代码
代码清单11-11 展示了如何通过从FileInputStream创建一个DefaultFileRegion,并将其写入Channel。从而利用零拷贝特性来传输一个文件的内容。
import io.netty.channel.*;
import java.io.File;
import java.io.FileInputStream;
/**
* @author binbin.hou
* @since 1.0.0
*/
public class CommonInit extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
final File file = new File("1.txt");
FileInputStream fileInputStream = new FileInputStream(file);
FileRegion fileRegion = new DefaultFileRegion(fileInputStream.getChannel(), 0, file.length());
// 直接写入
ch.writeAndFlush(fileRegion)
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if(!future.isSuccess()) {
future.cause().printStackTrace();
}
}
});
}
}
当然这种方式非常的原始,可能导致 OOM。
这个示例只适用于文件内容的直接传输,不包括应用程序对数据的任何处理。
ChunkedWriteHandler 类
在需要将数据从文件系统复制到用户内存中时,可以使用 ChunkedWriteHandler,它支持异步写大型数据流,而又不会导致大量的内存消耗。
核心 API
关键是interface ChunkedInput,其中类型参数B 是readChunk()方法返回的类型。
Netty 预置了该接口的4 个实现,如表11-7 中所列出的。每个都代表了一个将由Chunked-WriteHandler 处理的不定长度的数据流。
ChunkedFile 从文件中逐块获取数据,当你的平台不支持零拷贝或者你需要转换数据时使用
ChunkedNioFile 和ChunkedFile 类似,只是它使用了FileChannel
ChunkedStream 从InputStream 中逐块传输内容
ChunkedNioStream 从ReadableByteChannel 中逐块传输内容
示例代码
代码清单11-12 说明了ChunkedStream 的用法,它是实践中最常用的实现。
所示的类使用了一个File 以及一个SslContext 进行实例化。
当initChannel()方法被调用时,它将使用所示的ChannelHandler 链初始化该Channel。
当Channel 的状态变为活动的时,WriteStreamHandler 将会逐块地把来自文件中的数据作为 ChunkedStream 写入。
数据在传输之前将会由SslHandler 加密。
- 流程梳理
-
添加 ssl 加密
-
添加 ChunkedWriteHandler 处理大文件
-
在 channel.active 的时候,通过 ChunkedStream 写入数据。
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedStream;
import io.netty.handler.stream.ChunkedWriteHandler;
import javax.net.ssl.SSLEngine;
import java.io.File;
import java.io.FileInputStream;
/**
* @author binbin.hou
* @since 1.0.0
*/
public class ChunkedChannelInit extends ChannelInitializer<Channel> {
/**
* 待传递的文件
*/
private final File file;
/**
* SSL 上下文
*/
private final SslContext sslContext;
public ChunkedChannelInit(File file, SslContext sslContext) {
this.file = file;
this.sslContext = sslContext;
}
@Override
protected void initChannel(Channel ch) throws Exception {
//1. 构造 ssl
SSLEngine sslEngine = sslContext.newEngine(ch.alloc());
SslHandler sslHandler = new SslHandler(sslEngine, false);
//2. 构建 pipeline
ch.pipeline().addLast(sslHandler)
.addLast(new ChunkedWriteHandler())
.addLast(new WriteStreamHandler());
}
class WriteStreamHandler extends ChannelInboundHandlerAdapter {
/**
* 当 channel active 的时候,执行数据写入
* @param ctx 上下文
* @throws Exception 异常
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
// 使用 ChunkedStream 来进行写入
ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file)));
}
}
}
逐块输入
要使用你自己的ChunkedInput 实现,请在ChannelPipeline 中安装一个ChunkedWriteHandler。
在本节中,我们讨论了如何通过使用零拷贝特性来高效地传输文件,以及如何通过使用ChunkedWriteHandler 来写大型数据而又不必冒着导致OutOfMemoryError 的风险。
下一节中,我们将仔细研究几种序列化POJO 的方法。
参考资料
《Netty in Action》 P172