本文爲該系列的第三篇文章,設計需求爲:服務端程序和衆多客戶端程序經過 TCP 協議進行通訊,通訊雙方需通訊的消息種類衆多。上一篇文章以一個具體的需求爲例,探討了指定的 Java 消息對象與其相應的二進制數據幀相互轉換的方法。本文仍以該實例爲例,探討該自定義通訊協議的具體工做流程,以及如何以註冊的形式靈活插拔通訊消息對象。java
經過該系列的第二篇文章可知,各個消息對象的編解碼器類均擁有一個靜態工廠方法,用於手動傳入功能位及功能文字描述,進而生成包含這些參數的編解碼器。如此設計,使得全部消息的功能位和文字描述均可以統一管理,下降維護成本。緩存
根據上述需求,可經過 Map 容器管理全部的編解碼器,有以下優勢:併發
通訊消息對象註冊方法以下所示:工具
/** * 消息對象的註冊 * * @param toolkit 消息對象編解碼器容器的工具類 */
private void initialMsg() {
saveNormalMsgCodec(new MsgCodecDeviceUnlock(0x10, 0x11, "客戶端解鎖"));
saveNormalMsgCodec(new MsgCodecDeviceClear(0x10, 0x13, "客戶端初始化"));
saveNormalMsgCodec(new MsgCodecDeviceId(0x10, 0x1B, "客戶端ID設置"));
saveNormalMsgCodec(new MsgCodecEmployeeName(0x10, 0x1C, "客戶端別名設置"));
... ...
}
/** * 將普通消息對象及其回覆消息對象的編解碼器均保存到 HashMap 中 * * @param baseMsgCodec 特定的消息對象編解碼器 */
private void saveNormalMsgCodec(BaseMsgCodec baseMsgCodec) {
saveSpecialMsgCodec(baseMsgCodec);
baseMsgCodec = new MsgCodecReplyNormal(baseMsgCodec.getMajorMsgId() + 0x10, baseMsgCodec.getSubMsgId(), baseMsgCodec.getDetail());
saveSpecialMsgCodec(baseMsgCodec);
}
/** * 將消息對象的編解碼器保存到 HashMap 中 * * @param baseMsgCodec 特定的編解碼器 */
private void saveSpecialMsgCodec(BaseMsgCodec baseMsgCodec) {
HASH_MAP.put(figureFrameId(baseMsgCodec.getMajorMsgId(), baseMsgCodec.getSubMsgId()), baseMsgCodec);
}
複製代碼
上述代碼代表,若是有新的業務需求,須要增刪「插拔」業務消息對象,只需在 initialMsg()
方法中,對相應編解碼器的註冊語句進行增刪便可。優化
saveNormalMsgCodec(BaseMsgCodec)
方法能夠同時註冊特定業務消息對象及其通用回覆消息對象,操做方法清晰、簡潔。this
因此,在啓動該 Java 程序時,只須要在啓動過程當中,執行上述 initialMsg()
方法,便可完成全部業務消息對象的註冊。編碼
由該系列的第一篇文章可知,若是某二進制數據幀所要傳輸的數據體部份內容不多,致使一個幀的大部分容量均被幀頭佔據,致使有效數據的佔比很小,這就產生了巨大的浪費,故數據幀的數據體部分由子幀組成,同一類子幀都可被組裝進同一個數據幀。如此作法,整個通訊鏈路的數據量會明顯減小,IO 負擔也會所以減輕。spa
該需求的實現原理以下所示:線程
/** * 啓動一個Channel的定時任務,用於間隔指定的時間對消息隊列進行輪詢,併發送指定數據幀 * * @param deque 指定的消息發送隊列 * @param channelId 指定 Channel 的序號 */
private void startMessageQueueTask(LinkedBlockingDeque<BaseMsg> deque, Integer channelId) {
executorService.scheduleWithFixedDelay(() -> {
try {
BaseMsg baseMsg = deque.take(); // 從隊列中取出一個消息對象,隊列爲空時阻塞
Thread.sleep(AWAKE_TO_PROCESS_INTERVAL);// 等待極短的時間,保證隊列中緩存儘量多的對象
Channel channel = touchChannel(channelId); // 獲取指定的待發送的 Channel
List<ByteBuf> dataList = new ArrayList<>();// 子幀容器
ByteBuf data = baseMsg.subFrameEncode(channel.alloc().buffer());// 編碼一個子幀
dataList.add(data);
touchNeedReplyMsg(baseMsg); // 對該子幀設置檢錯重發任務
int length = data.readableBytes();
int flag = baseMsg.combineFrameFlag(); // 獲取消息對象標識
while (true) {
BaseMsg subMsg = deque.peek(); // 查看隊列中的第一個消息對象
if (subMsg == null || subMsg.combineFrameFlag() != flag) {
break; // 消息對象標識不一樣,即欲生成的主幀幀頭不一樣,不能組合進同一主幀
}
data = subMsg.subFrameEncode(channel.alloc().buffer());
if (length + data.readableBytes() > FrameSetting.MAX_DATA_LENGTH) {
break;
}
length += data.readableBytes();
dataList.add(data); // 組合進了同一主幀
deque.poll(); // 從隊列中移除該消息對象
touchNeedReplyMsg(subMsg);
}
FrameMajorHeader frameHeader = new FrameMajorHeader(
baseMsg.getMajorMsgId(),
baseMsg.getGroupId(),
baseMsg.getDeviceId(),
length); // 生成主幀幀頭消息對象
channel.writeAndFlush(new SendableMsgContainer(frameHeader, dataList)); // 送入Channel進行發送
} catch (InterruptedException e) {
logger.warn("消息隊列定時發送任務被中斷");
}
}, channelId, CommSetting.FRAME_SEND_INTERVAL, TimeUnit.MILLISECONDS);
}
複製代碼
由代碼可知,待發送的消息對象均被送入指定的發送隊列進行緩存,某客戶端相應的線程對隊列進行操做,取出消息對象並進行編碼、組裝、發送等。固然,當客戶端數量較多時,上述的線程實現方式可採用 Netty 的 NIO 方式進行優化,以下降系統開銷。設計
由上述描述可知,欲發送一個消息對象,只需將該消息對象送入相應的發送隊列便可。
因爲每一個 Java 消息對象均內含相應編解碼器的引用,故可直接對該消息對象進行編碼操做,代碼以下:
public abstract class BaseMsg implements Cloneable {
private final BaseMsgCodec msgCodec;
... ...
/** * 將 java 消息對象編碼爲 TCP 子幀 * * @param buffer 空白的 TCP 子幀的容器 * @return 保存有 TCP 子幀的容器 */
public ByteBuf subFrameEncode(ByteBuf buffer) {
return msgCodec.code(this, buffer);
}
}
複製代碼
首先根據數據幀的幀頭,便可解析出 FrameMajorHeader
對象,而後便可調用以下方法完成子幀的解析工做。實現原理文章開頭已指出。
/** * TCP 幀解碼爲 Java 消息對象 * * @param head 主幀頭 * @param subMsgId 子幀功能位 * @param data 子幀數據 * @return 已解碼的 Java 對象 */
public BaseMsg decode(FrameMajorHeader head, int subMsgId, byte[] data) {
BaseMsgCodec msgCodec = MsgCodecToolkit.getMsgCodec(head.getMsgId(), subMsgId);
return msgCodec.decode(head.getGroupId(), head.getDeviceId(), data);
}
複製代碼