我们的WebSocket 示例应用程序

场景

为了让示例应用程序展示它的实时功能,我们将通过使用WebSocket 协议来实现一个基于浏览器的聊天应用程序,就像你可能在Facebook 的文本消息功能中见到过的那样。

我们将通过使得多个用户之间可以同时进行相互通信,从而更进一步。

图12-1 说明了该应用程序的逻辑:

(1)客户端发送一个消息;

(2)该消息将被广播到所有其他连接的客户端。

流程

流程

1 客户端/用户连接到服务器,并且是聊天的一部分

2 聊天消息通过 WebSocket 进行交换

3 消息双向发送

4 服务器处理所有的客户端/用户

这正如你所想的聊天室的工作方式:每个人都可以跟其他人聊天。

此例子将仅提供服务器端,浏览器充当客户端,通过访问网页来聊天。正如您接下来要看到的,WebSocket 让这一切变得简单。

添加 WebSocket 支持

在从标准的HTTP或者HTTPS协议切换到WebSocket时,将会使用一种称为升级握手机制。

因此,使用WebSocket的应用程序将始终以HTTP/S作为开始,然后再执行升级。这个升级动作发生的确切时刻特定于应用程序;它可能会发生在启动时,也可能会发生在请求了某个特定的URL之后。

我们的应用程序将采用下面的约定:如果被请求的 URL 以 /ws 结尾,那么我们将会把该协议升级为WebSocket;否则,服务器将使用基本的HTTP/S。

在连接已经升级完成之后,所有数据都将会使用WebSocket 进行传输。

图12-2 说明了该服务器逻辑,一如在Netty 中一样,它由一组ChannelHandler 实现。

我们将会在下一节中,解释用于处理HTTP 以及WebSocket 协议的技术时,描述它们。

流程图

流程图

1 客户端/用户连接到服务器并加入聊天

2 HTTP 请求页面或 WebSocket 升级握手

3 服务器处理所有客户端/用户

4 响应 URI / 的请求,转到 index.html

5 如果访问的是 URI /ws ,处理 WebSocket 升级握手

6 升级握手完成后 ,通过 WebSocket 发送聊天消息

示例代码

服务器

  • WsServer.java
  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.github.houbb.netty.inaction.chap12.ws; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.net.InetSocketAddress; /** * @author binbin.hou * @since 1.0.0 */ public class WsServer { public static void main(String[] args) { final int port = 8989; ServerBootstrap serverBootstrap = new ServerBootstrap(); EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new WsServerInitializer()); ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(port)).syncUninterruptibly(); if(!channelFuture.isSuccess()) { channelFuture.cause().printStackTrace(); } else { System.out.println("Server listen on port: " + port); } channelFuture.channel().closeFuture().syncUninterruptibly(); //close group bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
  • WsServerInitializer.java
  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.github.houbb.netty.inaction.chap12.ws; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; /** * @author binbin.hou * @since 1.0.0 */ public class WsServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // webSocket 是 http 协议的升级 pipeline.addLast(new HttpServerCodec()) //避免大文件写入 oom .addLast(new ChunkedWriteHandler()) //使用对象聚合 .addLast(new HttpObjectAggregator(64*1024)); //ws://server:port/context_path //ws://localhost:9999/ws //参数指的是contex_path pipeline.addLast(new WebSocketServerProtocolHandler("/ws")) // 自定义的 frame 文本处理 handler .addLast(new TextWebSocketFrameHandler()); } }
  • TextWebSocketFrameHandler
  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.github.houbb.netty.inaction.chap12.ws; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import java.time.LocalDateTime; /** * @author binbin.hou * @since 1.0.0 */ public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { //1. 读取客户端的内容 Channel channel = ctx.channel(); System.out.println(channel.remoteAddress()+": " + msg.text()); //2. 服务端反馈内容 ctx.writeAndFlush(new TextWebSocketFrame("Server: " + LocalDateTime.now())); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("[客户端加入] " + ctx.channel().id().asLongText()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println("[客户端移除] " + ctx.channel().id().asLongText()); } }

客户端

  [html]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
<!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title>Socket</title> <script type="text/javascript"> var websocket; //如果浏览器支持WebSocket if(window.WebSocket){ websocket = new WebSocket("ws://localhost:8989/ws"); //获得WebSocket对象 //当有消息过来的时候触发 websocket.onmessage = function(event){ var respMessage = document.getElementById("respMessage"); respMessage.value = respMessage.value + "\n" + event.data; } //连接关闭的时候触发 websocket.onclose = function(event){ var respMessage = document.getElementById("respMessage"); respMessage.value = respMessage.value + "\n断开连接"; } //连接打开的时候触发 websocket.onopen = function(event){ var respMessage = document.getElementById("respMessage"); respMessage.value = "建立连接"; } }else{ alert("浏览器不支持WebSocket"); } function sendMsg(msg) { //发送消息 if(window.WebSocket){ if(websocket.readyState == WebSocket.OPEN) { //如果WebSocket是打开状态 websocket.send(msg); //send()发送消息 } }else{ return; } } </script> </head> <body> <form onsubmit="return false"> <textarea style="width: 300px; height: 200px;" name="message"></textarea> <input type="button" onclick="sendMsg(this.form.message.value)" value="发送"><br> <h3>信息</h3> <textarea style="width: 300px; height: 200px;" id="respMessage"></textarea> <input type="button" value="清空" onclick="javascript:document.getElementById('respMessage').value = ''"> </form> </body> </html>

测试

  • 页面打开 index.html

可以打开多个,在输入框发送信息,可以接收到服务器的反馈。

关闭代表移除当前连接。

  • 服务端
  [plaintext]
1
2
3
4
5
6
7
8
Server listen on port: 8989 [客户端加入] d89c67fffe99d9d7-00004978-00000001-089a4fe674c43c6c-f4e6a557 /0:0:0:0:0:0:0:1:59205: 啊啊啊 /0:0:0:0:0:0:0:1:59205: 啊啊啊,hello [客户端加入] d89c67fffe99d9d7-00004978-00000002-21719b08d4c491ad-9373573f /0:0:0:0:0:0:0:1:59214: 222 [客户端移除] d89c67fffe99d9d7-00004978-00000001-089a4fe674c43c6c-f4e6a557 [客户端移除] d89c67fffe99d9d7-00004978-00000002-21719b08d4c491ad-9373573f

核心内容讲解

WEBSOCKET 帧

WebSocket 以帧的方式传输数据,每一帧代表消息的一部分。一个完整的消息可能会包含许多帧

由IETF 发布的WebSocket RFC,定义了6 种帧,Netty 为它们每种都提供了一个POJO 实现。

表12-1 列出了这些帧类型,并描述了它们的用法。

  [plaintext]
1
2
3
4
5
6
7
帧 类 型 描 述 BinaryWebSocketFrame 包含了二进制数据 TextWebSocketFrame 包含了文本数据 ContinuationWebSocketFrame 包含属于上一个BinaryWebSocketFrame或TextWebSocketFrame 的文本数据或者二进制数据 CloseWebSocketFrame 表示一个CLOSE 请求,包含一个关闭的状态码和关闭的原因 PingWebSocketFrame 请求传输一个PongWebSocketFrame PongWebSocketFrame 作为一个对于PingWebSocketFrame 的响应被发送

我们的聊天应用程序将使用下面几种帧类型:

  • CloseWebSocketFrame;

  • PingWebSocketFrame;

  • PongWebSocketFrame;

  • TextWebSocketFrame。

TextWebSocketFrame 是我们唯一真正需要处理的帧类型。

为了符合WebSocket RFC,Netty 提供了 WebSocketServerProtocolHandler 来处理其他类型的帧。

Handler 及其职责

  [plaintext]
1
2
3
4
5
6
7
ChannelHandler 职 责 HttpServerCodec 将字节解码为HttpRequest、HttpContent 和LastHttpContent。并将HttpRequest、HttpContent 和LastHttpContent 编码为字节 ChunkedWriteHandler 块写入一个文件的内容 HttpObjectAggregator 将一个HttpMessage 和跟随它的多个HttpContent 聚合为单个FullHttpRequest 或者FullHttpResponse(取决于它是被用来处理请求还是响应)。安装了这个之后,ChannelPipeline 中的下一个ChannelHandler 将只会收到完整的HTTP 请求或响应 HttpRequestHandler 处理FullHttpRequest(那些不发送到/ws URI 的请求) WebSocketServerProtocolHandler 按照WebSocket 规范的要求,处理WebSocket 升级握手、PingWebSocketFrame 、PongWebSocketFrame、CloseWebSocketFrame TextWebSocketFrameHandler 处理TextWebSocketFrame 和握手完成事件

Netty 的WebSocketServerProtocolHandler 处理了所有委托管理的WebSocket 帧类型以及升级握手本身。

如果握手成功,那么所需的ChannelHandler 将会被添加到ChannelPipeline中,而那些不再需要的ChannelHandler 则将会被移除。

当WebSocket 协议升级完成之后,WebSocketServerProtocolHandler 将会把HttpRequestDecoder 替换为WebSocketFrameDecoder,把HttpResponseEncoder 替换为WebSocketFrameEncoder。

为了性能最大化,它将移除任何不再被WebSocket 连接所需要的ChannelHandler。

参考资料

《Netty in Action》 P185

  • other

Netty笔记之六:Netty对websocket的支持

Netty对WebSocket的支持(五)