t-io

t-io是一个网络框架,从这一点来说是有点像netty的,但t-io的特点在于,它不仅仅是一个网络框架, 因为它为常见和网络相关的业务(如IM、消息推送、RPC、监控)提供了近乎于现成的解决方案,即开箱即用的API,简单列举如下

  • 一个连接绑定userid(一个userid可以绑定多个连接,用于多端登录,可双向查找)

  • 一个连接绑定token(一个token可以绑定多个连接,用于多端登录,可双向查找)

  • 一个连接绑定群组(用于推送消息、IM中的群聊)

  • 提供了各种各样的发送消息API,这些API都位于Aio.java中,譬如异步发送的都是以send开头的方法

快速入门

官方例子

jar

<dependency>
    <groupId>org.t-io</groupId>
    <artifactId>tio-core</artifactId>
    <version>3.0.0.v20180520-RELEASE</version>
</dependency>

入门代码示例

tio 代码地址

  • 代码文件结构
.
├── HelloClientStarter.java
├── HelloServerStarter.java
├── constant
│   └── Const.java
├── handler
│   ├── HelloClientAioHandler.java
│   └── HelloServerAioHandler.java
└── packet
    └── HelloPacket.java
  • Const.java
public interface Const {

    /**
     * 服务器地址
     */
    String SERVER = "127.0.0.1";

    /**
     * 监听端口
     */
    int PORT = 6789;

    /**
     * 心跳超时时间
     */
    int TIMEOUT = 5000;

}
  • HelloPacket.java
public class HelloPacket extends Packet {

    private static final long serialVersionUID = -172060606924066412L;
    //消息头的长度
    public static final int HEADER_LENGHT = 4;
    public static final String CHARSET = "utf-8";
    private byte[] body;

    /**
     * @return the body
     */
    public byte[] getBody() {
        return body;
    }

    /**
     * @param body the body to set
     */
    public void setBody(byte[] body) {
        this.body = body;
    }

}
  • HelloServerAioHandler.java
import com.github.houbb.tech.validation.tio.packet.HelloPacket;

import org.tio.core.Aio;
import org.tio.core.ChannelContext;
import org.tio.core.GroupContext;
import org.tio.core.exception.AioDecodeException;
import org.tio.core.intf.Packet;
import org.tio.server.intf.ServerAioHandler;

import java.nio.ByteBuffer;

public class HelloServerAioHandler implements ServerAioHandler {

    /**
     * 解码:把接收到的ByteBuffer,解码成应用可以识别的业务消息包
     * 总的消息结构:消息头 + 消息体
     * 消息头结构:    4个字节,存储消息体的长度
     * 消息体结构:   对象的json串的byte[]
     */
    @Override
    public HelloPacket decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws AioDecodeException {
        //提醒:buffer的开始位置并不一定是0,应用需要从buffer.position()开始读取数据
        //收到的数据组不了业务包,则返回null以告诉框架数据不够
        if (readableLength < HelloPacket.HEADER_LENGHT) {
            return null;
        }

        //读取消息体的长度
        int bodyLength = buffer.getInt();

        //数据不正确,则抛出AioDecodeException异常
        if (bodyLength < 0) {
            throw new AioDecodeException("bodyLength [" + bodyLength + "] is not right, remote:" + channelContext.getClientNode());
        }

        //计算本次需要的数据长度
        int neededLength = HelloPacket.HEADER_LENGHT + bodyLength;
        //收到的数据是否足够组包
        int isDataEnough = readableLength - neededLength;
        // 不够消息体长度(剩下的buffe组不了消息体)
        if (isDataEnough < 0) {
            return null;
        } else //组包成功
        {
            HelloPacket imPacket = new HelloPacket();
            if (bodyLength > 0) {
                byte[] dst = new byte[bodyLength];
                buffer.get(dst);
                imPacket.setBody(dst);
            }
            return imPacket;
        }
    }


    /**
     * 编码:把业务消息包编码为可以发送的ByteBuffer
     * 总的消息结构:消息头 + 消息体
     * 消息头结构:    4个字节,存储消息体的长度
     * 消息体结构:   对象的json串的byte[]
     */
    @Override
    public ByteBuffer encode(Packet packet, GroupContext groupContext, ChannelContext channelContext) {
        HelloPacket helloPacket = (HelloPacket) packet;
        byte[] body = helloPacket.getBody();
        int bodyLen = 0;
        if (body != null) {
            bodyLen = body.length;
        }

        //bytebuffer的总长度是 = 消息头的长度 + 消息体的长度
        int allLen = HelloPacket.HEADER_LENGHT + bodyLen;
        //创建一个新的bytebuffer
        ByteBuffer buffer = ByteBuffer.allocate(allLen);
        //设置字节序
        buffer.order(groupContext.getByteOrder());

        //写入消息头----消息头的内容就是消息体的长度
        buffer.putInt(bodyLen);

        //写入消息体
        if (body != null) {
            buffer.put(body);
        }
        return buffer;
    }


    /**
     * 处理消息
     */
    @Override
    public void handler(Packet packet, ChannelContext channelContext) throws Exception {

        HelloPacket helloPacket = (HelloPacket) packet;
        byte[] body = helloPacket.getBody();
        if (body != null) {
            String str = new String(body, HelloPacket.CHARSET);
            System.out.println("收到消息:" + str);

            HelloPacket resppacket = new HelloPacket();
            resppacket.setBody(("收到了你的消息,你的消息是:" + str).getBytes(HelloPacket.CHARSET));
            Aio.send(channelContext, resppacket);
        }
        return;

    }

}
  • HelloServerStarter.java
import com.github.houbb.tech.validation.tio.constant.Const;
import com.github.houbb.tech.validation.tio.handler.HelloServerAioHandler;

import org.tio.server.AioServer;
import org.tio.server.ServerGroupContext;
import org.tio.server.intf.ServerAioHandler;
import org.tio.server.intf.ServerAioListener;

import java.io.IOException;

public class HelloServerStarter {

    //handler, 包括编码、解码、消息处理
    public static ServerAioHandler aioHandler = new HelloServerAioHandler();

    //事件监听器,可以为null,但建议自己实现该接口,可以参考showcase了解些接口
    public static ServerAioListener aioListener = null;

    //一组连接共用的上下文对象
    public static ServerGroupContext serverGroupContext = new ServerGroupContext("hello-tio-server", aioHandler, aioListener);

    //aioServer对象
    public static AioServer aioServer = new AioServer(serverGroupContext);

    //有时候需要绑定ip,不需要则null
    public static String serverIp = null;

    //监听的端口
    public static int serverPort = Const.PORT;

    /**
     * 启动程序入口
     */
    public static void main(String[] args) throws IOException {
        serverGroupContext.setHeartbeatTimeout(Const.TIMEOUT);

        aioServer.start(serverIp, serverPort);
    }
}
  • HelloClientAioHandler.java
import com.github.houbb.tech.validation.tio.packet.HelloPacket;

import org.tio.client.intf.ClientAioHandler;
import org.tio.core.ChannelContext;
import org.tio.core.GroupContext;
import org.tio.core.exception.AioDecodeException;
import org.tio.core.intf.Packet;

import java.nio.ByteBuffer;

public class HelloClientAioHandler implements ClientAioHandler {
    
    private static HelloPacket heartbeatPacket = new HelloPacket();

    /**
     * 解码:把接收到的ByteBuffer,解码成应用可以识别的业务消息包
     * 总的消息结构:消息头 + 消息体
     * 消息头结构:    4个字节,存储消息体的长度
     * 消息体结构:   对象的json串的byte[]
     */
    @Override
    public HelloPacket decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws AioDecodeException {
        //收到的数据组不了业务包,则返回null以告诉框架数据不够
        if (readableLength < HelloPacket.HEADER_LENGHT) {
            return null;
        }

        //读取消息体的长度
        int bodyLength = buffer.getInt();

        //数据不正确,则抛出AioDecodeException异常
        if (bodyLength < 0) {
            throw new AioDecodeException("bodyLength [" + bodyLength + "] is not right, remote:" + channelContext.getClientNode());
        }

        //计算本次需要的数据长度
        int neededLength = HelloPacket.HEADER_LENGHT + bodyLength;
        //收到的数据是否足够组包
        int isDataEnough = readableLength - neededLength;
        // 不够消息体长度(剩下的buffe组不了消息体)
        if (isDataEnough < 0) {
            return null;
        } else //组包成功
        {
            HelloPacket imPacket = new HelloPacket();
            if (bodyLength > 0) {
                byte[] dst = new byte[bodyLength];
                buffer.get(dst);
                imPacket.setBody(dst);
            }
            return imPacket;
        }
    }

    /**
     * 编码:把业务消息包编码为可以发送的ByteBuffer
     * 总的消息结构:消息头 + 消息体
     * 消息头结构:    4个字节,存储消息体的长度
     * 消息体结构:   对象的json串的byte[]
     */
    @Override
    public ByteBuffer encode(Packet packet, GroupContext groupContext, ChannelContext channelContext) {
        HelloPacket helloPacket = (HelloPacket) packet;
        byte[] body = helloPacket.getBody();
        int bodyLen = 0;
        if (body != null) {
            bodyLen = body.length;
        }

        //bytebuffer的总长度是 = 消息头的长度 + 消息体的长度
        int allLen = HelloPacket.HEADER_LENGHT + bodyLen;
        //创建一个新的bytebuffer
        ByteBuffer buffer = ByteBuffer.allocate(allLen);
        //设置字节序
        buffer.order(groupContext.getByteOrder());

        //写入消息头----消息头的内容就是消息体的长度
        buffer.putInt(bodyLen);

        //写入消息体
        if (body != null) {
            buffer.put(body);
        }
        return buffer;
    }

    /**
     * 处理消息
     */
    @Override
    public void handler(Packet packet, ChannelContext channelContext) throws Exception {
        HelloPacket helloPacket = (HelloPacket) packet;
        byte[] body = helloPacket.getBody();
        if (body != null) {
            String str = new String(body, HelloPacket.CHARSET);
            System.out.println("收到消息:" + str);
        }

        return;
    }

    /**
     * 此方法如果返回null,框架层面则不会发心跳;如果返回非null,框架层面会定时发本方法返回的消息包
     */
    @Override
    public HelloPacket heartbeatPacket() {
        return heartbeatPacket;
    }
}
  • HelloClientStarter.java
import com.github.houbb.tech.validation.tio.constant.Const;
import com.github.houbb.tech.validation.tio.handler.HelloClientAioHandler;
import com.github.houbb.tech.validation.tio.packet.HelloPacket;

import org.tio.client.AioClient;
import org.tio.client.ClientChannelContext;
import org.tio.client.ClientGroupContext;
import org.tio.client.ReconnConf;
import org.tio.client.intf.ClientAioHandler;
import org.tio.client.intf.ClientAioListener;
import org.tio.core.Aio;
import org.tio.core.Node;

public class HelloClientStarter {

    //服务器节点
    public static Node serverNode = new Node(Const.SERVER, Const.PORT);

    //handler, 包括编码、解码、消息处理
    public static ClientAioHandler aioClientHandler = new HelloClientAioHandler();

    //事件监听器,可以为null,但建议自己实现该接口,可以参考showcase了解些接口
    public static ClientAioListener aioListener = null;

    //断链后自动连接的,不想自动连接请设为null
    private static ReconnConf reconnConf = new ReconnConf(5000L);

    //一组连接共用的上下文对象
    public static ClientGroupContext clientGroupContext = new ClientGroupContext(aioClientHandler, aioListener, reconnConf);

    public static AioClient            aioClient            = null;
    public static ClientChannelContext clientChannelContext = null;

    /**
     * 启动程序入口
     */
    public static void main(String[] args) throws Exception {
        clientGroupContext.setHeartbeatTimeout(Const.TIMEOUT);
        aioClient = new AioClient(clientGroupContext);
        clientChannelContext = aioClient.connect(serverNode);
        //连上后,发条消息玩玩
        send();
    }

    private static void send() throws Exception {
        HelloPacket packet = new HelloPacket();
        packet.setBody("hello world".getBytes(HelloPacket.CHARSET));
        Aio.send(clientChannelContext, packet);
    }

}

测试结果

  • Client
收到消息:收到了你的消息,你的消息是:hello world
  • Server
收到消息:hello world