本文爲該系列的第三篇文章,設計需求爲:服務端程序和衆多客戶端程序經過 TCP 協議進行通訊,通訊雙方需通訊的消息種類衆多。上一篇文章以一個具體的需求爲例,探討了指定的 Java 消息對象與其相應的二進制數據幀相互轉換的方法。本文仍以該實例爲例,探討該自定義通訊協議的具體工做流程,以及如何以註冊的形式靈活插拔通訊消息對象。java
經過該系列的第二篇文章可知,各個消息對象的編解碼器類均擁有一個靜態工廠方法,用於手動傳入功能位及功能文字描述,進而生成包含這些參數的編解碼器。如此設計,使得全部消息的功能位和文字描述均可以統一管理,下降維護成本。緩存
根據上述需求,可經過 Map 容器管理全部的編解碼器,有以下優勢:併發
通訊消息對象註冊方法以下所示:工具
/** * 消息對象的註冊 * * @param toolkit 消息對象編解碼器容器的工具類 */ private void initialMsg() { saveNormalMsgCodec(new MsgCodecDeviceUnlock(0x10, 0x11, "客戶端解鎖")); saveNormalMsgCodec(new MsgCodecDeviceClear(0x10, 0x13, "客戶端初始化")); saveNormalMsgCodec(new MsgCodecDeviceId(0x10, 0x1B, "客戶端ID設置")); saveNormalMsgCodec(new MsgCodecEmployeeName(0x10, 0x1C, "客戶端別名設置")); ... ... } /** * 將普通消息對象及其回覆消息對象的編解碼器均保存到 HashMap 中 * * @param baseMsgCodec 特定的消息對象編解碼器 */ private void saveNormalMsgCodec(BaseMsgCodec baseMsgCodec) { saveSpecialMsgCodec(baseMsgCodec); baseMsgCodec = new MsgCodecReplyNormal(baseMsgCodec.getMajorMsgId() + 0x10, baseMsgCodec.getSubMsgId(), baseMsgCodec.getDetail()); saveSpecialMsgCodec(baseMsgCodec); } /** * 將消息對象的編解碼器保存到 HashMap 中 * * @param baseMsgCodec 特定的編解碼器 */ private void saveSpecialMsgCodec(BaseMsgCodec baseMsgCodec) { HASH_MAP.put(figureFrameId(baseMsgCodec.getMajorMsgId(), baseMsgCodec.getSubMsgId()), baseMsgCodec); }
上述代碼代表,若是有新的業務需求,須要增刪「插拔」業務消息對象,只需在 initialMsg()
方法中,對相應編解碼器的註冊語句進行增刪便可。優化
saveNormalMsgCodec(BaseMsgCodec)
方法能夠同時註冊特定業務消息對象及其通用回覆消息對象,操做方法清晰、簡潔。this
因此,在啓動該 Java 程序時,只須要在啓動過程當中,執行上述 initialMsg()
方法,便可完成全部業務消息對象的註冊。編碼
由該系列的第一篇文章可知,若是某二進制數據幀所要傳輸的數據體部份內容不多,致使一個幀的大部分容量均被幀頭佔據,致使有效數據的佔比很小,這就產生了巨大的浪費,故數據幀的數據體部分由子幀組成,同一類子幀都可被組裝進同一個數據幀。如此作法,整個通訊鏈路的數據量會明顯減小,IO 負擔也會所以減輕。線程
該需求的實現原理以下所示:設計
/** * 啓動一個Channel的定時任務,用於間隔指定的時間對消息隊列進行輪詢,併發送指定數據幀 * * @param deque 指定的消息發送隊列 * @param channelId 指定 Channel 的序號 */ private void startMessageQueueTask(LinkedBlockingDeque<BaseMsg> deque, Integer channelId) { executorService.scheduleWithFixedDelay(() -> { try { BaseMsg baseMsg = deque.take(); // 從隊列中取出一個消息對象,隊列爲空時阻塞 Thread.sleep(AWAKE_TO_PROCESS_INTERVAL);// 等待極短的時間,保證隊列中緩存儘量多的對象 Channel channel = touchChannel(channelId); // 獲取指定的待發送的 Channel List<ByteBuf> dataList = new ArrayList<>();// 子幀容器 ByteBuf data = baseMsg.subFrameEncode(channel.alloc().buffer());// 編碼一個子幀 dataList.add(data); touchNeedReplyMsg(baseMsg); // 對該子幀設置檢錯重發任務 int length = data.readableBytes(); int flag = baseMsg.combineFrameFlag(); // 獲取消息對象標識 while (true) { BaseMsg subMsg = deque.peek(); // 查看隊列中的第一個消息對象 if (subMsg == null || subMsg.combineFrameFlag() != flag) { break; // 消息對象標識不一樣,即欲生成的主幀幀頭不一樣,不能組合進同一主幀 } data = subMsg.subFrameEncode(channel.alloc().buffer()); if (length + data.readableBytes() > FrameSetting.MAX_DATA_LENGTH) { break; } length += data.readableBytes(); dataList.add(data); // 組合進了同一主幀 deque.poll(); // 從隊列中移除該消息對象 touchNeedReplyMsg(subMsg); } FrameMajorHeader frameHeader = new FrameMajorHeader( baseMsg.getMajorMsgId(), baseMsg.getGroupId(), baseMsg.getDeviceId(), length); // 生成主幀幀頭消息對象 channel.writeAndFlush(new SendableMsgContainer(frameHeader, dataList)); // 送入Channel進行發送 } catch (InterruptedException e) { logger.warn("消息隊列定時發送任務被中斷"); } }, channelId, CommSetting.FRAME_SEND_INTERVAL, TimeUnit.MILLISECONDS); }
由代碼可知,待發送的消息對象均被送入指定的發送隊列進行緩存,某客戶端相應的線程對隊列進行操做,取出消息對象並進行編碼、組裝、發送等。固然,當客戶端數量較多時,上述的線程實現方式可採用 Netty 的 NIO 方式進行優化,以下降系統開銷。code
由上述描述可知,欲發送一個消息對象,只需將該消息對象送入相應的發送隊列便可。
因爲每一個 Java 消息對象均內含相應編解碼器的引用,故可直接對該消息對象進行編碼操做,代碼以下:
public abstract class BaseMsg implements Cloneable { private final BaseMsgCodec msgCodec; ... ... /** * 將 java 消息對象編碼爲 TCP 子幀 * * @param buffer 空白的 TCP 子幀的容器 * @return 保存有 TCP 子幀的容器 */ public ByteBuf subFrameEncode(ByteBuf buffer) { return msgCodec.code(this, buffer); } }
首先根據數據幀的幀頭,便可解析出 FrameMajorHeader
對象,而後便可調用以下方法完成子幀的解析工做。實現原理文章開頭已指出。
/** * TCP 幀解碼爲 Java 消息對象 * * @param head 主幀頭 * @param subMsgId 子幀功能位 * @param data 子幀數據 * @return 已解碼的 Java 對象 */ public BaseMsg decode(FrameMajorHeader head, int subMsgId, byte[] data) { BaseMsgCodec msgCodec = MsgCodecToolkit.getMsgCodec(head.getMsgId(), subMsgId); return msgCodec.decode(head.getGroupId(), head.getDeviceId(), data); }