t-io

t-io是一个网络框架,从这一点来说是有点像netty的,但t-io的特点在于,它不仅仅是一个网络框架, 因为它为常见和网络相关的业务(如IM、消息推送、RPC、监控)提供了近乎于现成的解决方案,即开箱即用的API,简单列举如下

  • 一个连接绑定userid(一个userid可以绑定多个连接,用于多端登录,可双向查找)

  • 一个连接绑定token(一个token可以绑定多个连接,用于多端登录,可双向查找)

  • 一个连接绑定群组(用于推送消息、IM中的群聊)

  • 提供了各种各样的发送消息API,这些API都位于Aio.java中,譬如异步发送的都是以send开头的方法

快速入门

官方例子

jar

  [xml]
1
2
3
4
5
<dependency> <groupId>org.t-io</groupId> <artifactId>tio-core</artifactId> <version>3.0.0.v20180520-RELEASE</version> </dependency>

入门代码示例

tio 代码地址

  • 代码文件结构
  [plaintext]
1
2
3
4
5
6
7
8
9
10
. ├── HelloClientStarter.java ├── HelloServerStarter.java ├── constant │   └── Const.java ├── handler │   ├── HelloClientAioHandler.java │   └── HelloServerAioHandler.java └── packet └── HelloPacket.java
  • Const.java
  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface Const { /** * 服务器地址 */ String SERVER = "127.0.0.1"; /** * 监听端口 */ int PORT = 6789; /** * 心跳超时时间 */ int TIMEOUT = 5000; }
  • HelloPacket.java
  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class HelloPacket extends Packet { private static final long serialVersionUID = -172060606924066412L; //消息头的长度 public static final int HEADER_LENGHT = 4; public static final String CHARSET = "utf-8"; private byte[] body; /** * @return the body */ public byte[] getBody() { return body; } /** * @param body the body to set */ public void setBody(byte[] body) { this.body = body; } }
  • HelloServerAioHandler.java
  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import com.github.houbb.tech.validation.tio.packet.HelloPacket; import org.tio.core.Aio; import org.tio.core.ChannelContext; import org.tio.core.GroupContext; import org.tio.core.exception.AioDecodeException; import org.tio.core.intf.Packet; import org.tio.server.intf.ServerAioHandler; import java.nio.ByteBuffer; public class HelloServerAioHandler implements ServerAioHandler { /** * 解码:把接收到的ByteBuffer,解码成应用可以识别的业务消息包 * 总的消息结构:消息头 + 消息体 * 消息头结构: 4个字节,存储消息体的长度 * 消息体结构: 对象的json串的byte[] */ @Override public HelloPacket decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws AioDecodeException { //提醒:buffer的开始位置并不一定是0,应用需要从buffer.position()开始读取数据 //收到的数据组不了业务包,则返回null以告诉框架数据不够 if (readableLength < HelloPacket.HEADER_LENGHT) { return null; } //读取消息体的长度 int bodyLength = buffer.getInt(); //数据不正确,则抛出AioDecodeException异常 if (bodyLength < 0) { throw new AioDecodeException("bodyLength [" + bodyLength + "] is not right, remote:" + channelContext.getClientNode()); } //计算本次需要的数据长度 int neededLength = HelloPacket.HEADER_LENGHT + bodyLength; //收到的数据是否足够组包 int isDataEnough = readableLength - neededLength; // 不够消息体长度(剩下的buffe组不了消息体) if (isDataEnough < 0) { return null; } else //组包成功 { HelloPacket imPacket = new HelloPacket(); if (bodyLength > 0) { byte[] dst = new byte[bodyLength]; buffer.get(dst); imPacket.setBody(dst); } return imPacket; } } /** * 编码:把业务消息包编码为可以发送的ByteBuffer * 总的消息结构:消息头 + 消息体 * 消息头结构: 4个字节,存储消息体的长度 * 消息体结构: 对象的json串的byte[] */ @Override public ByteBuffer encode(Packet packet, GroupContext groupContext, ChannelContext channelContext) { HelloPacket helloPacket = (HelloPacket) packet; byte[] body = helloPacket.getBody(); int bodyLen = 0; if (body != null) { bodyLen = body.length; } //bytebuffer的总长度是 = 消息头的长度 + 消息体的长度 int allLen = HelloPacket.HEADER_LENGHT + bodyLen; //创建一个新的bytebuffer ByteBuffer buffer = ByteBuffer.allocate(allLen); //设置字节序 buffer.order(groupContext.getByteOrder()); //写入消息头----消息头的内容就是消息体的长度 buffer.putInt(bodyLen); //写入消息体 if (body != null) { buffer.put(body); } return buffer; } /** * 处理消息 */ @Override public void handler(Packet packet, ChannelContext channelContext) throws Exception { HelloPacket helloPacket = (HelloPacket) packet; byte[] body = helloPacket.getBody(); if (body != null) { String str = new String(body, HelloPacket.CHARSET); System.out.println("收到消息:" + str); HelloPacket resppacket = new HelloPacket(); resppacket.setBody(("收到了你的消息,你的消息是:" + str).getBytes(HelloPacket.CHARSET)); Aio.send(channelContext, resppacket); } return; } }
  • HelloServerStarter.java
  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import com.github.houbb.tech.validation.tio.constant.Const; import com.github.houbb.tech.validation.tio.handler.HelloServerAioHandler; import org.tio.server.AioServer; import org.tio.server.ServerGroupContext; import org.tio.server.intf.ServerAioHandler; import org.tio.server.intf.ServerAioListener; import java.io.IOException; public class HelloServerStarter { //handler, 包括编码、解码、消息处理 public static ServerAioHandler aioHandler = new HelloServerAioHandler(); //事件监听器,可以为null,但建议自己实现该接口,可以参考showcase了解些接口 public static ServerAioListener aioListener = null; //一组连接共用的上下文对象 public static ServerGroupContext serverGroupContext = new ServerGroupContext("hello-tio-server", aioHandler, aioListener); //aioServer对象 public static AioServer aioServer = new AioServer(serverGroupContext); //有时候需要绑定ip,不需要则null public static String serverIp = null; //监听的端口 public static int serverPort = Const.PORT; /** * 启动程序入口 */ public static void main(String[] args) throws IOException { serverGroupContext.setHeartbeatTimeout(Const.TIMEOUT); aioServer.start(serverIp, serverPort); } }
  • HelloClientAioHandler.java
  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import com.github.houbb.tech.validation.tio.packet.HelloPacket; import org.tio.client.intf.ClientAioHandler; import org.tio.core.ChannelContext; import org.tio.core.GroupContext; import org.tio.core.exception.AioDecodeException; import org.tio.core.intf.Packet; import java.nio.ByteBuffer; public class HelloClientAioHandler implements ClientAioHandler { private static HelloPacket heartbeatPacket = new HelloPacket(); /** * 解码:把接收到的ByteBuffer,解码成应用可以识别的业务消息包 * 总的消息结构:消息头 + 消息体 * 消息头结构: 4个字节,存储消息体的长度 * 消息体结构: 对象的json串的byte[] */ @Override public HelloPacket decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws AioDecodeException { //收到的数据组不了业务包,则返回null以告诉框架数据不够 if (readableLength < HelloPacket.HEADER_LENGHT) { return null; } //读取消息体的长度 int bodyLength = buffer.getInt(); //数据不正确,则抛出AioDecodeException异常 if (bodyLength < 0) { throw new AioDecodeException("bodyLength [" + bodyLength + "] is not right, remote:" + channelContext.getClientNode()); } //计算本次需要的数据长度 int neededLength = HelloPacket.HEADER_LENGHT + bodyLength; //收到的数据是否足够组包 int isDataEnough = readableLength - neededLength; // 不够消息体长度(剩下的buffe组不了消息体) if (isDataEnough < 0) { return null; } else //组包成功 { HelloPacket imPacket = new HelloPacket(); if (bodyLength > 0) { byte[] dst = new byte[bodyLength]; buffer.get(dst); imPacket.setBody(dst); } return imPacket; } } /** * 编码:把业务消息包编码为可以发送的ByteBuffer * 总的消息结构:消息头 + 消息体 * 消息头结构: 4个字节,存储消息体的长度 * 消息体结构: 对象的json串的byte[] */ @Override public ByteBuffer encode(Packet packet, GroupContext groupContext, ChannelContext channelContext) { HelloPacket helloPacket = (HelloPacket) packet; byte[] body = helloPacket.getBody(); int bodyLen = 0; if (body != null) { bodyLen = body.length; } //bytebuffer的总长度是 = 消息头的长度 + 消息体的长度 int allLen = HelloPacket.HEADER_LENGHT + bodyLen; //创建一个新的bytebuffer ByteBuffer buffer = ByteBuffer.allocate(allLen); //设置字节序 buffer.order(groupContext.getByteOrder()); //写入消息头----消息头的内容就是消息体的长度 buffer.putInt(bodyLen); //写入消息体 if (body != null) { buffer.put(body); } return buffer; } /** * 处理消息 */ @Override public void handler(Packet packet, ChannelContext channelContext) throws Exception { HelloPacket helloPacket = (HelloPacket) packet; byte[] body = helloPacket.getBody(); if (body != null) { String str = new String(body, HelloPacket.CHARSET); System.out.println("收到消息:" + str); } return; } /** * 此方法如果返回null,框架层面则不会发心跳;如果返回非null,框架层面会定时发本方法返回的消息包 */ @Override public HelloPacket heartbeatPacket() { return heartbeatPacket; } }
  • HelloClientStarter.java
  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import com.github.houbb.tech.validation.tio.constant.Const; import com.github.houbb.tech.validation.tio.handler.HelloClientAioHandler; import com.github.houbb.tech.validation.tio.packet.HelloPacket; import org.tio.client.AioClient; import org.tio.client.ClientChannelContext; import org.tio.client.ClientGroupContext; import org.tio.client.ReconnConf; import org.tio.client.intf.ClientAioHandler; import org.tio.client.intf.ClientAioListener; import org.tio.core.Aio; import org.tio.core.Node; public class HelloClientStarter { //服务器节点 public static Node serverNode = new Node(Const.SERVER, Const.PORT); //handler, 包括编码、解码、消息处理 public static ClientAioHandler aioClientHandler = new HelloClientAioHandler(); //事件监听器,可以为null,但建议自己实现该接口,可以参考showcase了解些接口 public static ClientAioListener aioListener = null; //断链后自动连接的,不想自动连接请设为null private static ReconnConf reconnConf = new ReconnConf(5000L); //一组连接共用的上下文对象 public static ClientGroupContext clientGroupContext = new ClientGroupContext(aioClientHandler, aioListener, reconnConf); public static AioClient aioClient = null; public static ClientChannelContext clientChannelContext = null; /** * 启动程序入口 */ public static void main(String[] args) throws Exception { clientGroupContext.setHeartbeatTimeout(Const.TIMEOUT); aioClient = new AioClient(clientGroupContext); clientChannelContext = aioClient.connect(serverNode); //连上后,发条消息玩玩 send(); } private static void send() throws Exception { HelloPacket packet = new HelloPacket(); packet.setBody("hello world".getBytes(HelloPacket.CHARSET)); Aio.send(clientChannelContext, packet); } }

测试结果

  • Client
  [plaintext]
1
收到消息:收到了你的消息,你的消息是:hello world
  • Server
  [plaintext]
1
收到消息:hello world