t-io 入門篇(三)即時消息發送demo學習

前言

     t-io做者在開源其框架的同時還附帶了幾個demo,如:簡單的hello world、im等。接下來這篇博客將會圍繞tio-examples-im-simple-client、tio-examples-im-simple-server展開分析和學習。java

demo項目結構

    im-simple分紅三個maven子項目:nginx

  • tio-examples-im-simple-client就是客戶端聊天工具項目
  • tio-examples-im-simple-common保存了一些公用的類型定義和utils,
  • tio-examples-im-simple-server包含了聊天系統的服務端和一套附帶了nginx的網頁聊天工具。

   整片博客是圍繞着網頁版聊天工具而寫的。web

聊天系統

  1. 概述

      首先介紹一下這個聊天系統demo大概交互的邏輯是怎麼樣子的,以下圖:算法

  • demo中web聊天系統使用了websocket協議來和服務端進行通訊。
  • demo中數據交互不管是瀏覽器端、仍是服務端數據傳輸方式都使用了proto-buf(這個讓我眼前一亮,以前只據說過客戶端和服務器端採用proto-buf,demo裏面居然使用了js的proto-buf,由於以前接觸js這一塊很少,第一次看到就給跪了)。

    2.代碼分析 apache

  • 服務端初始化demo:
..
// 定義handler,全部的請求數據所有都由這個handler來處理,decode/encode/handler等等
// 若是您作web開發必定知道dispatcher的概念,這個handler會將數據解碼,而後將數據分發給對應的handler處理業務
aioHandler = new ImServerAioHandler();
// listenr 能夠在鏈接上、接收到消息、發送消息後等等回調其內部方法
aioListener = new ImServerAioListener();
// 服務端上下文初始化
serverGroupContext = new ServerGroupContext<>(aioHandler, aioListener);

serverGroupContext.setEncodeCareWithChannelContext(true);		
aioServer = new AioServer<>(serverGroupContext);
aioServer.start(ip, port);
..

從初始化的幾句代碼中能夠看出ImServerAioHandler 是聊天系統中的重中之重,咱們來看一下里面的代碼瀏覽器

/..
// 握手請求
handlerMap.put(Command.COMMAND_HANDSHAKE_REQ, new HandshakeReqHandler());
// 受權請求
handlerMap.put(Command.COMMAND_AUTH_REQ, new AuthReqHandler());
// 聊天請求
handlerMap.put(Command.COMMAND_CHAT_REQ, new ChatReqHandler());
// 加入羣組請求
handlerMap.put(Command.COMMAND_JOIN_GROUP_REQ, new JoinReqHandler());
// 心跳請求
handlerMap.put(Command.COMMAND_HEARTBEAT_REQ, new HeartbeatReqHandler());
// 關閉鏈接請求
handlerMap.put(Command.COMMAND_CLOSE_REQ, new CloseReqHandler());
/..

/.. 在這裏經過解析消息頭以後獲取下一步調用哪一個handler來處理業務
public Object handler(ImPacket packet, ChannelContext<ImSessionContext, ImPacket, Object> channelContext) throws Exception
	{
		Command command = packet.getCommand();
		ImBsHandlerIntf handler = handlerMap.get(command);
		if (handler != null)
		{
			Object obj = handler.handler(packet, channelContext);
			CommandStat.getCount(command).handled.incrementAndGet();
			return obj;
		} else
		{
			CommandStat.getCount(command).handled.incrementAndGet();
			log.warn("找不到對應的命令碼[{}]處理類", command);
			return null;
		}

	}

/..

固然,上面的其實都是業務代碼,尚未很明顯的看到框架代碼,接下來咱們看看怎樣發送一條聊天的消息給一個羣組。服務器

  • ChatReqHandler.java是處理髮送聊天消息的類:
// protobuf反序列化消息體         
        ChatReqBody chatReqBody = ChatReqBody.parseFrom(packet.getBody());
        // demo這裏寫死了一些信息,消息發送者
		Integer fromId = 111;
		String fromNick = "test";
        // 把消息發送給誰
		Integer toId = chatReqBody.getToId();
		String toNick = chatReqBody.getToNick();
		String toGroup = chatReqBody.getGroup();
        // 其實demo的代碼這裏寫的不太好,若是chatReqBody==null,前面就報錯了
		if (chatReqBody != null)
		{
            // 構建消息體響應類
			ChatRespBody.Builder builder = ChatRespBody.newBuilder();
			builder.setType(chatReqBody.getType());
			builder.setText(chatReqBody.getText());
			builder.setFromId(fromId);
			builder.setFromNick(fromNick);
			builder.setToId(toId);
			builder.setToNick(toNick);
			builder.setGroup(toGroup);
			builder.setTime(SystemTimer.currentTimeMillis());
            //一樣protobuf序列化
			ChatRespBody chatRespBody = builder.build();
			byte[] bodybyte = chatRespBody.toByteArray();
            
            //組建即時聊天響應包
			ImPacket respPacket = new ImPacket();
			respPacket.setCommand(Command.COMMAND_CHAT_RESP);

			respPacket.setBody(bodybyte);
            // 若是是對羣組發送,直接調用Aio.sendToGroup便可(框架代碼)
			if (Objects.equals(ChatType.CHAT_TYPE_PUBLIC, chatReqBody.getType()))
			{
				Aio.sendToGroup(channelContext.getGroupContext(), toGroup, respPacket);
			} else if (Objects.equals(ChatType.CHAT_TYPE_PRIVATE, chatReqBody.getType()))
			{   // 若是是對單我的發送,也有Aio.sendToUser方法(框架代碼)
				if (toId != null)
				{
					Aio.sendToUser(channelContext.getGroupContext(), toId + "", respPacket);
				}
			}
		}

     看看,真正使用到框架的代碼實際上就一行:Aio.sendToXXX靜態方法。websocket

  • 那也許會有人問,框架是怎麼知道這個group裏面有哪些鏈接啊?框架怎麼知道會有哪些鏈接的user呢?

     注意看衆多handler中,其中有一個JoinReqHandler是在客戶端調用加入羣組命令的時候執行的:框架

public Object handler(ImPacket packet, ChannelContext<ImSessionContext, ImPacket, Object> channelContext) throws Exception
{
// ..
// 鏈接上下文綁定羣組
Aio.bindGroup(channelContext, group);
// ..

}

     一個簡單的bindGroup便可把這個鏈接綁定到一個組裏面,從而經過Aio.sendToGroup便可輕鬆的給一個組發送消息。而在AuthReqHandler中一樣調用了Aio.bindUser方法來綁定有用戶鏈接(這個不貼代碼了,貼多了界面太長,讓人不想看)。socket

     保存user和group,做者使用的是一個帶了讀寫鎖的本身封裝的DualHashBidiMap (apache工具包裏面的,雙向map,能夠經過key獲取value,能夠經過value獲取key)。

     總結t-io在聊天系統中的使用方法

  • 建立了一個server端
  • 客戶端在鑑權的時候Aio.bindUser
  • 客戶端在加入羣組的時候Aio.bindGroup
  • 客戶端在發送消息時Aio.sendToUser/Aio.sendToGroup便可
  • 固然,只要你實現一下ClientAioHandler的heartbeatPacket方法,框架自己給實現了自動心跳檢測,重連也只須要在初始化鏈接上下文時傳入ReconnConf便可。
  • 後面我本身設計config-server的時候,給客戶端推送一組服務IP過去也是至關容易的事情啦。
  • 棋牌類遊戲交互也相似了,一桌四我的,一個的操做,發送到服務端作算法邏輯校驗後,同時推送給其餘三我的,好像這樣實現起來也不復雜了。而須要關心的僅僅只是server自己的集羣問題了。

    demo中我最感興趣的和我學到的

  • 框架提供的user和group概念很方便
  • ByteBuffer消息頭的定義方法和解析,demo裏面定義消息頭定義得很節省空間
    ByteBuffer buffer = ByteBuffer.allocate(allLen);
    buffer.order(groupContext.getByteOrder());
    //這裏比較有迷惑性,這個version並不是隨便亂定,而是須要根據後面是否使用壓縮、序號同步等等標記出來的二進制
    buffer.put(ImPacket.VERSION);
    buffer.put((byte) packet.getCommand().getNumber());
    buffer.put(isCompress ? (byte)1 : (byte)0);
    buffer.putInt(packet.getSynSeq());
    buffer.putShort((short)bodyLen);

    這裏注意,我差點被做者繞進去了,ImPacket.VERSION比較有迷惑性

    //這是客戶端解析第一個字節的定義,不是瀏覽器解析規則
    //其實做者定義得version本身規定的只佔用4個比特位,如:0B00001111,後四位都是用來真正標示版本號
    //實際上前面的幾位是另外分出來標記是否壓縮,是否同步序號等等
    byte version = ImPacket.decodeVersion(firstbyte);
    boolean isCompress = ImPacket.decodeCompress(firstbyte);
    boolean hasSynSeq = ImPacket.decodeHasSynSeq(firstbyte);
    boolean is4ByteLength = ImPacket.decode4ByteLength(firstbyte);

     

  • 消息點對點發送和消息的羣組發送性能和穩定性還有待我作作測試,後續在全面瞭解完t-io框架後準備弄一份關於性能測試的報告出來試試。

     同時感謝t-io做者對個人指導 !

相關文章
相關標籤/搜索