基於 Netty 的可插拔業務通訊協議的實現「3」業務註冊及實際工做流程

本文爲該系列的第三篇文章,設計需求爲:服務端程序和衆多客戶端程序經過 TCP 協議進行通訊,通訊雙方需通訊的消息種類衆多。上一篇文章以一個具體的需求爲例,探討了指定的 Java 消息對象與其相應的二進制數據幀相互轉換的方法。本文仍以該實例爲例,探討該自定義通訊協議的具體工做流程,以及如何以註冊的形式靈活插拔通訊消息對象。java

1. 以註冊的形式實現通訊消息對象的統一管理

經過該系列的第二篇文章可知,各個消息對象的編解碼器類均擁有一個靜態工廠方法,用於手動傳入功能位及功能文字描述,進而生成包含這些參數的編解碼器。如此設計,使得全部消息的功能位和文字描述均可以統一管理,下降維護成本。緩存

根據上述需求,可經過 Map 容器管理全部的編解碼器,有以下優勢:併發

  1. 進行消息對象生成操做時,可直接使用相應編解碼器的消息對象靜態建立方法。
  2. 進行消息對象的編碼操做時,已擁有該 Java 消息對象,便可知道消息對象的功能位,據此可獲取相應的編解碼器;或者,每一個 Java 消息對象均內含相應編解碼器的引用,故可直接對該消息對象進行編碼操做。
  3. 進行二進制數據幀的解碼操做時,數據幀中已包含了消息的功能位,據此可獲取相應的編解碼器,然後能夠對該數據幀進行解析,生成相應的 Java 消息對象。

通訊消息對象註冊方法以下所示:工具

/** * 消息對象的註冊 * * @param toolkit 消息對象編解碼器容器的工具類 */
private void initialMsg() {
    saveNormalMsgCodec(new MsgCodecDeviceUnlock(0x10, 0x11, "客戶端解鎖"));
    saveNormalMsgCodec(new MsgCodecDeviceClear(0x10, 0x13, "客戶端初始化"));
    saveNormalMsgCodec(new MsgCodecDeviceId(0x10, 0x1B, "客戶端ID設置"));
    saveNormalMsgCodec(new MsgCodecEmployeeName(0x10, 0x1C, "客戶端別名設置"));
    ... ...
}

/** * 將普通消息對象及其回覆消息對象的編解碼器均保存到 HashMap 中 * * @param baseMsgCodec 特定的消息對象編解碼器 */
private void saveNormalMsgCodec(BaseMsgCodec baseMsgCodec) {
    saveSpecialMsgCodec(baseMsgCodec);
    baseMsgCodec = new MsgCodecReplyNormal(baseMsgCodec.getMajorMsgId() + 0x10, baseMsgCodec.getSubMsgId(), baseMsgCodec.getDetail());
    saveSpecialMsgCodec(baseMsgCodec);
}

/** * 將消息對象的編解碼器保存到 HashMap 中 * * @param baseMsgCodec 特定的編解碼器 */
private void saveSpecialMsgCodec(BaseMsgCodec baseMsgCodec) {
    HASH_MAP.put(figureFrameId(baseMsgCodec.getMajorMsgId(), baseMsgCodec.getSubMsgId()), baseMsgCodec);
}
複製代碼

上述代碼代表,若是有新的業務需求,須要增刪「插拔」業務消息對象,只需在 initialMsg() 方法中,對相應編解碼器的註冊語句進行增刪便可。優化

saveNormalMsgCodec(BaseMsgCodec) 方法能夠同時註冊特定業務消息對象及其通用回覆消息對象,操做方法清晰、簡潔。this

因此,在啓動該 Java 程序時,只須要在啓動過程當中,執行上述 initialMsg() 方法,便可完成全部業務消息對象的註冊。編碼

2. 多個消息對象自由組合進同一個數據幀的實現原理

由該系列的第一篇文章可知,若是某二進制數據幀所要傳輸的數據體部份內容不多,致使一個幀的大部分容量均被幀頭佔據,致使有效數據的佔比很小,這就產生了巨大的浪費,故數據幀的數據體部分由子幀組成,同一類子幀都可被組裝進同一個數據幀。如此作法,整個通訊鏈路的數據量會明顯減小,IO 負擔也會所以減輕。spa

該需求的實現原理以下所示:線程

/** * 啓動一個Channel的定時任務,用於間隔指定的時間對消息隊列進行輪詢,併發送指定數據幀 * * @param deque 指定的消息發送隊列 * @param channelId 指定 Channel 的序號 */
private void startMessageQueueTask(LinkedBlockingDeque<BaseMsg> deque, Integer channelId) {
    executorService.scheduleWithFixedDelay(() -> {
        try {
            BaseMsg baseMsg = deque.take();         // 從隊列中取出一個消息對象,隊列爲空時阻塞
            Thread.sleep(AWAKE_TO_PROCESS_INTERVAL);// 等待極短的時間,保證隊列中緩存儘量多的對象
            Channel channel = touchChannel(channelId); // 獲取指定的待發送的 Channel
            List<ByteBuf> dataList = new ArrayList<>();// 子幀容器
            ByteBuf data = baseMsg.subFrameEncode(channel.alloc().buffer());// 編碼一個子幀
            dataList.add(data);
            touchNeedReplyMsg(baseMsg);            // 對該子幀設置檢錯重發任務
            int length = data.readableBytes();
            int flag = baseMsg.combineFrameFlag(); // 獲取消息對象標識
            while (true) {
                BaseMsg subMsg = deque.peek();     // 查看隊列中的第一個消息對象
                if (subMsg == null || subMsg.combineFrameFlag() != flag) {
                    break; // 消息對象標識不一樣,即欲生成的主幀幀頭不一樣,不能組合進同一主幀
                }
                data = subMsg.subFrameEncode(channel.alloc().buffer());
                if (length + data.readableBytes() > FrameSetting.MAX_DATA_LENGTH) {
                    break;
                }
                length += data.readableBytes();
                dataList.add(data);                // 組合進了同一主幀
                deque.poll();                      // 從隊列中移除該消息對象
                touchNeedReplyMsg(subMsg);
            }
            FrameMajorHeader frameHeader = new FrameMajorHeader(
                    baseMsg.getMajorMsgId(),
                    baseMsg.getGroupId(),
                    baseMsg.getDeviceId(),
                    length);                       // 生成主幀幀頭消息對象
            channel.writeAndFlush(new SendableMsgContainer(frameHeader, dataList)); // 送入Channel進行發送
        } catch (InterruptedException e) {
            logger.warn("消息隊列定時發送任務被中斷");
        }
    }, channelId, CommSetting.FRAME_SEND_INTERVAL, TimeUnit.MILLISECONDS);
}
複製代碼

由代碼可知,待發送的消息對象均被送入指定的發送隊列進行緩存,某客戶端相應的線程對隊列進行操做,取出消息對象並進行編碼、組裝、發送等。固然,當客戶端數量較多時,上述的線程實現方式可採用 Netty 的 NIO 方式進行優化,以下降系統開銷。設計

由上述描述可知,欲發送一個消息對象,只需將該消息對象送入相應的發送隊列便可。

3. 實際業務消息對象的編解碼

3.1 消息對象的編碼方式

因爲每一個 Java 消息對象均內含相應編解碼器的引用,故可直接對該消息對象進行編碼操做,代碼以下:

public abstract class BaseMsg implements Cloneable {
    private final BaseMsgCodec msgCodec;
    ... ...

    /** * 將 java 消息對象編碼爲 TCP 子幀 * * @param buffer 空白的 TCP 子幀的容器 * @return 保存有 TCP 子幀的容器 */
    public ByteBuf subFrameEncode(ByteBuf buffer) {
        return msgCodec.code(this, buffer);
    }
}
複製代碼

3.2 消息對象的解碼方式

首先根據數據幀的幀頭,便可解析出 FrameMajorHeader 對象,而後便可調用以下方法完成子幀的解析工做。實現原理文章開頭已指出。

/** * TCP 幀解碼爲 Java 消息對象 * * @param head 主幀頭 * @param subMsgId 子幀功能位 * @param data 子幀數據 * @return 已解碼的 Java 對象 */
public BaseMsg decode(FrameMajorHeader head, int subMsgId, byte[] data) {
    BaseMsgCodec msgCodec = MsgCodecToolkit.getMsgCodec(head.getMsgId(), subMsgId);
    return msgCodec.decode(head.getGroupId(), head.getDeviceId(), data);
}
複製代碼
相關文章
相關標籤/搜索