t-io
t-io是一个网络框架,从这一点来说是有点像netty的,但t-io的特点在于,它不仅仅是一个网络框架, 因为它为常见和网络相关的业务(如IM、消息推送、RPC、监控)提供了近乎于现成的解决方案,即开箱即用的API,简单列举如下
-
一个连接绑定userid(一个userid可以绑定多个连接,用于多端登录,可双向查找)
-
一个连接绑定token(一个token可以绑定多个连接,用于多端登录,可双向查找)
-
一个连接绑定群组(用于推送消息、IM中的群聊)
-
提供了各种各样的发送消息API,这些API都位于Aio.java中,譬如异步发送的都是以send开头的方法
快速入门
jar
<dependency>
<groupId>org.t-io</groupId>
<artifactId>tio-core</artifactId>
<version>3.0.0.v20180520-RELEASE</version>
</dependency>
入门代码示例
- 代码文件结构
.
├── HelloClientStarter.java
├── HelloServerStarter.java
├── constant
│ └── Const.java
├── handler
│ ├── HelloClientAioHandler.java
│ └── HelloServerAioHandler.java
└── packet
└── HelloPacket.java
- Const.java
public interface Const {
/**
* 服务器地址
*/
String SERVER = "127.0.0.1";
/**
* 监听端口
*/
int PORT = 6789;
/**
* 心跳超时时间
*/
int TIMEOUT = 5000;
}
- HelloPacket.java
public class HelloPacket extends Packet {
private static final long serialVersionUID = -172060606924066412L;
//消息头的长度
public static final int HEADER_LENGHT = 4;
public static final String CHARSET = "utf-8";
private byte[] body;
/**
* @return the body
*/
public byte[] getBody() {
return body;
}
/**
* @param body the body to set
*/
public void setBody(byte[] body) {
this.body = body;
}
}
- HelloServerAioHandler.java
import com.github.houbb.tech.validation.tio.packet.HelloPacket;
import org.tio.core.Aio;
import org.tio.core.ChannelContext;
import org.tio.core.GroupContext;
import org.tio.core.exception.AioDecodeException;
import org.tio.core.intf.Packet;
import org.tio.server.intf.ServerAioHandler;
import java.nio.ByteBuffer;
public class HelloServerAioHandler implements ServerAioHandler {
/**
* 解码:把接收到的ByteBuffer,解码成应用可以识别的业务消息包
* 总的消息结构:消息头 + 消息体
* 消息头结构: 4个字节,存储消息体的长度
* 消息体结构: 对象的json串的byte[]
*/
@Override
public HelloPacket decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws AioDecodeException {
//提醒:buffer的开始位置并不一定是0,应用需要从buffer.position()开始读取数据
//收到的数据组不了业务包,则返回null以告诉框架数据不够
if (readableLength < HelloPacket.HEADER_LENGHT) {
return null;
}
//读取消息体的长度
int bodyLength = buffer.getInt();
//数据不正确,则抛出AioDecodeException异常
if (bodyLength < 0) {
throw new AioDecodeException("bodyLength [" + bodyLength + "] is not right, remote:" + channelContext.getClientNode());
}
//计算本次需要的数据长度
int neededLength = HelloPacket.HEADER_LENGHT + bodyLength;
//收到的数据是否足够组包
int isDataEnough = readableLength - neededLength;
// 不够消息体长度(剩下的buffe组不了消息体)
if (isDataEnough < 0) {
return null;
} else //组包成功
{
HelloPacket imPacket = new HelloPacket();
if (bodyLength > 0) {
byte[] dst = new byte[bodyLength];
buffer.get(dst);
imPacket.setBody(dst);
}
return imPacket;
}
}
/**
* 编码:把业务消息包编码为可以发送的ByteBuffer
* 总的消息结构:消息头 + 消息体
* 消息头结构: 4个字节,存储消息体的长度
* 消息体结构: 对象的json串的byte[]
*/
@Override
public ByteBuffer encode(Packet packet, GroupContext groupContext, ChannelContext channelContext) {
HelloPacket helloPacket = (HelloPacket) packet;
byte[] body = helloPacket.getBody();
int bodyLen = 0;
if (body != null) {
bodyLen = body.length;
}
//bytebuffer的总长度是 = 消息头的长度 + 消息体的长度
int allLen = HelloPacket.HEADER_LENGHT + bodyLen;
//创建一个新的bytebuffer
ByteBuffer buffer = ByteBuffer.allocate(allLen);
//设置字节序
buffer.order(groupContext.getByteOrder());
//写入消息头----消息头的内容就是消息体的长度
buffer.putInt(bodyLen);
//写入消息体
if (body != null) {
buffer.put(body);
}
return buffer;
}
/**
* 处理消息
*/
@Override
public void handler(Packet packet, ChannelContext channelContext) throws Exception {
HelloPacket helloPacket = (HelloPacket) packet;
byte[] body = helloPacket.getBody();
if (body != null) {
String str = new String(body, HelloPacket.CHARSET);
System.out.println("收到消息:" + str);
HelloPacket resppacket = new HelloPacket();
resppacket.setBody(("收到了你的消息,你的消息是:" + str).getBytes(HelloPacket.CHARSET));
Aio.send(channelContext, resppacket);
}
return;
}
}
- HelloServerStarter.java
import com.github.houbb.tech.validation.tio.constant.Const;
import com.github.houbb.tech.validation.tio.handler.HelloServerAioHandler;
import org.tio.server.AioServer;
import org.tio.server.ServerGroupContext;
import org.tio.server.intf.ServerAioHandler;
import org.tio.server.intf.ServerAioListener;
import java.io.IOException;
public class HelloServerStarter {
//handler, 包括编码、解码、消息处理
public static ServerAioHandler aioHandler = new HelloServerAioHandler();
//事件监听器,可以为null,但建议自己实现该接口,可以参考showcase了解些接口
public static ServerAioListener aioListener = null;
//一组连接共用的上下文对象
public static ServerGroupContext serverGroupContext = new ServerGroupContext("hello-tio-server", aioHandler, aioListener);
//aioServer对象
public static AioServer aioServer = new AioServer(serverGroupContext);
//有时候需要绑定ip,不需要则null
public static String serverIp = null;
//监听的端口
public static int serverPort = Const.PORT;
/**
* 启动程序入口
*/
public static void main(String[] args) throws IOException {
serverGroupContext.setHeartbeatTimeout(Const.TIMEOUT);
aioServer.start(serverIp, serverPort);
}
}
- HelloClientAioHandler.java
import com.github.houbb.tech.validation.tio.packet.HelloPacket;
import org.tio.client.intf.ClientAioHandler;
import org.tio.core.ChannelContext;
import org.tio.core.GroupContext;
import org.tio.core.exception.AioDecodeException;
import org.tio.core.intf.Packet;
import java.nio.ByteBuffer;
public class HelloClientAioHandler implements ClientAioHandler {
private static HelloPacket heartbeatPacket = new HelloPacket();
/**
* 解码:把接收到的ByteBuffer,解码成应用可以识别的业务消息包
* 总的消息结构:消息头 + 消息体
* 消息头结构: 4个字节,存储消息体的长度
* 消息体结构: 对象的json串的byte[]
*/
@Override
public HelloPacket decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws AioDecodeException {
//收到的数据组不了业务包,则返回null以告诉框架数据不够
if (readableLength < HelloPacket.HEADER_LENGHT) {
return null;
}
//读取消息体的长度
int bodyLength = buffer.getInt();
//数据不正确,则抛出AioDecodeException异常
if (bodyLength < 0) {
throw new AioDecodeException("bodyLength [" + bodyLength + "] is not right, remote:" + channelContext.getClientNode());
}
//计算本次需要的数据长度
int neededLength = HelloPacket.HEADER_LENGHT + bodyLength;
//收到的数据是否足够组包
int isDataEnough = readableLength - neededLength;
// 不够消息体长度(剩下的buffe组不了消息体)
if (isDataEnough < 0) {
return null;
} else //组包成功
{
HelloPacket imPacket = new HelloPacket();
if (bodyLength > 0) {
byte[] dst = new byte[bodyLength];
buffer.get(dst);
imPacket.setBody(dst);
}
return imPacket;
}
}
/**
* 编码:把业务消息包编码为可以发送的ByteBuffer
* 总的消息结构:消息头 + 消息体
* 消息头结构: 4个字节,存储消息体的长度
* 消息体结构: 对象的json串的byte[]
*/
@Override
public ByteBuffer encode(Packet packet, GroupContext groupContext, ChannelContext channelContext) {
HelloPacket helloPacket = (HelloPacket) packet;
byte[] body = helloPacket.getBody();
int bodyLen = 0;
if (body != null) {
bodyLen = body.length;
}
//bytebuffer的总长度是 = 消息头的长度 + 消息体的长度
int allLen = HelloPacket.HEADER_LENGHT + bodyLen;
//创建一个新的bytebuffer
ByteBuffer buffer = ByteBuffer.allocate(allLen);
//设置字节序
buffer.order(groupContext.getByteOrder());
//写入消息头----消息头的内容就是消息体的长度
buffer.putInt(bodyLen);
//写入消息体
if (body != null) {
buffer.put(body);
}
return buffer;
}
/**
* 处理消息
*/
@Override
public void handler(Packet packet, ChannelContext channelContext) throws Exception {
HelloPacket helloPacket = (HelloPacket) packet;
byte[] body = helloPacket.getBody();
if (body != null) {
String str = new String(body, HelloPacket.CHARSET);
System.out.println("收到消息:" + str);
}
return;
}
/**
* 此方法如果返回null,框架层面则不会发心跳;如果返回非null,框架层面会定时发本方法返回的消息包
*/
@Override
public HelloPacket heartbeatPacket() {
return heartbeatPacket;
}
}
- HelloClientStarter.java
import com.github.houbb.tech.validation.tio.constant.Const;
import com.github.houbb.tech.validation.tio.handler.HelloClientAioHandler;
import com.github.houbb.tech.validation.tio.packet.HelloPacket;
import org.tio.client.AioClient;
import org.tio.client.ClientChannelContext;
import org.tio.client.ClientGroupContext;
import org.tio.client.ReconnConf;
import org.tio.client.intf.ClientAioHandler;
import org.tio.client.intf.ClientAioListener;
import org.tio.core.Aio;
import org.tio.core.Node;
public class HelloClientStarter {
//服务器节点
public static Node serverNode = new Node(Const.SERVER, Const.PORT);
//handler, 包括编码、解码、消息处理
public static ClientAioHandler aioClientHandler = new HelloClientAioHandler();
//事件监听器,可以为null,但建议自己实现该接口,可以参考showcase了解些接口
public static ClientAioListener aioListener = null;
//断链后自动连接的,不想自动连接请设为null
private static ReconnConf reconnConf = new ReconnConf(5000L);
//一组连接共用的上下文对象
public static ClientGroupContext clientGroupContext = new ClientGroupContext(aioClientHandler, aioListener, reconnConf);
public static AioClient aioClient = null;
public static ClientChannelContext clientChannelContext = null;
/**
* 启动程序入口
*/
public static void main(String[] args) throws Exception {
clientGroupContext.setHeartbeatTimeout(Const.TIMEOUT);
aioClient = new AioClient(clientGroupContext);
clientChannelContext = aioClient.connect(serverNode);
//连上后,发条消息玩玩
send();
}
private static void send() throws Exception {
HelloPacket packet = new HelloPacket();
packet.setBody("hello world".getBytes(HelloPacket.CHARSET));
Aio.send(clientChannelContext, packet);
}
}
测试结果
- Client
收到消息:收到了你的消息,你的消息是:hello world
- Server
收到消息:hello world