原文:https://my.oschina.net/u/2984386/blog/1630300java
-
背景介紹
t-io是一款國產開源的網絡編程框架,主要是特色:簡單,易上手,AIP封裝通俗易懂,適合通常企業簡易即時通信工具開發。宣傳性能也不錯:百萬TCP長鏈接,不過我的也沒測試過,因此想試一試看看。本文檔主要記錄了簡單羣組聊天的實現,同時記錄下學習t-io的過程。其實 http://t-io.org/#/ 中有比較完整的Demo,本文也主要是參考其中。 編程
-
服務端
啓動類:網絡
package com.dooper.server; import org.tio.server.AioServer; import org.tio.server.ServerGroupContext; import org.tio.server.intf.ServerAioListener; import com.dooper.common.packet.Constant; import com.dooper.server.handler.MyServerAioHandler; public class ServerStarter { public static MyServerAioHandler aioHandler = new MyServerAioHandler(); public static ServerAioListener aioListener = null; public static ServerGroupContext serverGroupContext = new ServerGroupContext(aioHandler, aioListener); public static AioServer aioServer = new AioServer(serverGroupContext); public static String serverIp = null; public static int serverPort = Constant.PORT; public static void main(String[] args) throws Exception{ serverGroupContext.setHeartbeatTimeout(Constant.TIMEOUT); aioServer.start(serverIp, serverPort); } }
消息處理框架
消息處理中有綁定組的步驟,實際不該該在此處,應該是有額外的處理類來處理羣組綁定,此處由於懶,直接寫在裏面了。ide
package com.dooper.server.handler; import java.nio.ByteBuffer; import org.tio.core.Aio; import org.tio.core.ChannelContext; import org.tio.core.GroupContext; import org.tio.core.exception.AioDecodeException; import org.tio.core.intf.Packet; import org.tio.server.intf.ServerAioHandler; import com.dooper.common.packet.MyPacket; /** * server * * */ public class MyServerAioHandler implements ServerAioHandler{ @Override public Packet decode(ByteBuffer buffer, ChannelContext chanelContext) throws AioDecodeException { int readableLength = buffer.limit() - buffer.position(); if(readableLength < MyPacket.HEADER_LENGHT){ return null; } int bodyLength = buffer.getInt(); if(bodyLength<0){ throw new AioDecodeException("bodyLength ["+bodyLength+"] is not rigth,remote"+chanelContext.getClientNode()); } int neededLength = MyPacket.HEADER_LENGHT+bodyLength; int isDataEnough = readableLength - neededLength; if(isDataEnough < 0){ return null; }else{ MyPacket packet = new MyPacket(); if(bodyLength > 0){ byte[] dst = new byte[bodyLength]; buffer.get(dst); packet.setBody(dst); } return packet; } } @Override public ByteBuffer encode(Packet packet, GroupContext groupContext, ChannelContext channelContext) { MyPacket myPacket = (MyPacket)packet; byte[] body = myPacket.getBody(); int bodyLen = 0; if(body != null){ bodyLen = body.length; } int allLen = MyPacket.HEADER_LENGHT + bodyLen; ByteBuffer buffer = ByteBuffer.allocate(allLen); buffer.order(groupContext.getByteOrder()); buffer.putInt(bodyLen); if(body != null){ buffer.put(body); } return buffer; } @Override public void handler(Packet packet, ChannelContext channelContext) throws Exception { MyPacket myPacket = (MyPacket)packet; byte[] body = myPacket.getBody(); if(body != null){ String str = new String(body,MyPacket.CHARSET); System.out.println("客戶端發送的消息:"+str); Aio.bindGroup(channelContext, "group1"); GroupHandler gh = new GroupHandler(); gh.handler(myPacket, channelContext); } return; } }
自定義的羣組消息處理工具
package com.dooper.server.handler; import org.tio.core.Aio; import org.tio.core.ChannelContext; import org.tio.core.intf.Packet; import com.dooper.common.packet.MyPacket; public class GroupHandler extends MsgHandler{ @Override public void handler(Packet packet, ChannelContext channelContext) throws Exception { MyPacket myPacket = (MyPacket)packet; byte[] body = myPacket.getBody(); if(body!=null){ MyPacket mp = new MyPacket(); System.out.println("服務端收到消息:"+new String(body,MyPacket.CHARSET)); mp.setBody((channelContext.getClientNode()+":"+new String(body,MyPacket.CHARSET)).getBytes(MyPacket.CHARSET)); Aio.sendToGroup(channelContext.getGroupContext(), "group1", mp); } } }
-
客戶端
啓動類oop
package com.dooper.client; import java.util.Scanner; import org.tio.client.AioClient; import org.tio.client.ClientChannelContext; import org.tio.client.ClientGroupContext; import org.tio.client.ReconnConf; import org.tio.client.intf.ClientAioHandler; import org.tio.client.intf.ClientAioListener; import org.tio.core.Aio; import org.tio.core.Node; import com.dooper.common.packet.Constant; import com.dooper.common.packet.MyPacket; public class MyClientStarter { public static Node serverNode = new Node(Constant.SERVER,Constant.PORT); public static ClientAioHandler aioClientHandler = new MyClientAioHandler(); public static ClientAioListener aioListener = null; private static ReconnConf reconnConf = new ReconnConf(5000L); private static ClientGroupContext clientGroupContext = new ClientGroupContext(aioClientHandler, aioListener,reconnConf); public static AioClient aioClient = null; public static ClientChannelContext clientChannelContext = null; public static void main(String[] args) throws Exception{ clientGroupContext.setHeartbeatTimeout(Constant.TIMEOUT); aioClient = new AioClient(clientGroupContext); clientChannelContext = aioClient.connect(serverNode); Scanner sc = new Scanner(System.in); System.out.println("請發送羣組消息:"); String line = sc.nextLine(); // 這個就是用戶輸入的數據 while (true) { if ("exit".equalsIgnoreCase(line)) { System.out.println("Thanks for using! bye bye."); break; } else{ sendGroup(line); } line = sc.nextLine(); // 這個就是用戶輸入的數據 } // send(); sc.close(); } public static void send() throws Exception{ MyPacket packet = new MyPacket(); packet.setBody("hello world".getBytes(MyPacket.CHARSET)); Aio.send(clientChannelContext, packet); } public static void sendGroup(String msg) throws Exception{ Aio.bindGroup(clientChannelContext, "group1"); MyPacket packet = new MyPacket(); packet.setBody(msg.getBytes(MyPacket.CHARSET)); Aio.sendToGroup(clientGroupContext, "group1", packet); } }
消息處理類性能
package com.dooper.client; import java.nio.ByteBuffer; import org.tio.client.intf.ClientAioHandler; import org.tio.core.ChannelContext; import org.tio.core.GroupContext; import org.tio.core.exception.AioDecodeException; import org.tio.core.intf.Packet; import com.dooper.common.packet.MyPacket; public class MyClientAioHandler implements ClientAioHandler { private static MyPacket heartbeatPacket = new MyPacket(); /** * ���룺 */ @Override public Packet decode(ByteBuffer buffer, ChannelContext channelContext) throws AioDecodeException { int readableLength = buffer.limit() - buffer.position(); if(readableLength < MyPacket.HEADER_LENGHT){ return null; } int bodyLength = buffer.getInt(); if(bodyLength < 0){ throw new AioDecodeException("bodyLength ["+bodyLength+"] is not right,remote:"+channelContext.getClientNode()); } int neededLength = MyPacket.HEADER_LENGHT + bodyLength; int isDataEnough = readableLength - neededLength; if(isDataEnough < 0){ return null; }else{ MyPacket myPacket = new MyPacket(); if(bodyLength > 0){ byte[] dst = new byte[bodyLength]; buffer.get(dst); myPacket.setBody(dst); } return myPacket; } } /** * ���룺 */ @Override public ByteBuffer encode(Packet packet, GroupContext groupContext, ChannelContext channelContext) { MyPacket myPacket = (MyPacket)packet; byte[] body = myPacket.getBody(); int bodyLen = 0; if(body != null){ bodyLen = body.length; } int allLen = MyPacket.HEADER_LENGHT +bodyLen; ByteBuffer buffer = ByteBuffer.allocate(allLen); buffer.order(groupContext.getByteOrder()); buffer.putInt(bodyLen); if(body != null){ buffer.put(body); } return buffer; } @Override public void handler(Packet packet, ChannelContext channelContext) throws Exception { MyPacket myPacket = (MyPacket)packet; byte[] body = myPacket.getBody(); if(body!=null){ String str = new String(body,MyPacket.CHARSET); System.out.println(str); } return ; } @Override public Packet heartbeatPacket() { return heartbeatPacket; } }