smart-socket實戰:玩轉心跳消息

1、背景

在通訊中設計的心跳消息,一般是爲了檢查網絡鏈路是否正常。雖然TCP協議提供keep-alive機制,但須要在鏈路空閒2小時後才觸發檢測,這顯然對業務很是不友好。當存在大量鏈接異常,而服務端卻須要等2個小時後才感知到的時候,有限的系統資源會被逐漸耗盡,最終沒法爲新鏈接請求繼續提供服務。java

2、原理

要解決此類問題,業界的廣泛作法是在應用層加入心跳機制。心跳消息能夠是單向心跳也能夠是雙向心跳,所謂單向心跳錶示由服務端或者客戶端的其中一方主動發送心跳請求消息,而另外一方返回響應消息(以下圖)。雙向心跳錶示服務端與客戶端相互發送心跳請求和響應。由於不管何種類型,實現方案都是同樣的,本文以單向心跳爲例給你們作講解。git

3、方案

心跳消息一般是週期性的發送,或者是在鏈路空閒必定時長後觸發。若是經歷幾個週期後都未收到響應,則能夠視爲鏈路異常。此時能夠繼續嘗試發送心跳,也能夠執行告警並斷開鏈接。網絡

在 smart-socket 中咱們提供了現成的心跳插件 HeartPlugin,能夠很方便的實現心跳。本文是假定讀者朋友對 smart-socket 已有了初步的瞭解,因此不會涉及 smart-socket 的基礎使用,重點描述如何在服務中集成心跳插件。session

3.1 HeartPlugin插件概述

3.1.1 心跳策略

在HeartPlugin中有三種心跳策略可供選擇,經過選擇不一樣的構造方案肯定。socket

  1. HeartPlugin(int heartRate, TimeUnit timeUnit)
    heartRate 表示心跳消息的發送頻率;timeUnit 表示 heartRate 的數值單位。例如:heartRate=3,timeUnit=TimeUnit.SECONDS,表示每 3秒鐘發送一次心跳。heartRate=2000,timeUnit=TimeUnit.MILLISECONDS,表示每 2秒鐘發送一次心跳。該策略爲週期性發送心跳消息,不管對方是否返回響應。
  2. HeartPlugin(int heartRate, int timeout, TimeUnit unit)
    該構造方法相較前一個多出一個參數:timeout(過時時間),必須大於heartRate。若是在timeout時長內發送的心跳消息都沒有收到響應消息,則視爲鏈路異常而且該鏈路會被關閉,釋放資源。
  3. HeartPlugin(int heartRate, int timeout, TimeUnit timeUnit, TimeoutCallback timeoutCallback)
    該構造方法支持指定超時回調策略 timeoutCallback,其實上一個構造方法就是設置了超時斷鏈策略。若是不知足業務所需,用戶可按需定義。

3.1.2 心跳的識別與觸發

心跳策略肯定好後,下一步就是如何去發送心跳消息,以及如何識別收到的消息是否爲響應消息。在 HeartPlugin 中已經定義了這兩個接口,須要開發人員去實現處理邏輯:ide

  • sendHeartRequest
    發送心跳。HeartPlugin 在判斷某個鏈接須要觸發心跳後,會執行該方法。用戶須要在該方法中實現心跳消息的編碼並輸出數據。
    public void sendHeartRequest(AioSession session) throws IOException{
        WriteBuffer writeBuffer = session.writeBuffer();
        byte[] heartBytes = "heart_req".getBytes();
        writeBuffer.writeInt(heartBytes.length);
        writeBuffer.write(heartBytes);
        writeBuffer.flush();
     }
  • isHeartMessage
    請求消息識別。true:表示本次收到的是心跳消息(請求/響應);false:其餘業務消息,交由MessageProcessor#processor處理。
    public boolean isHeartMessage(AioSession session, String msg) {
        //心跳請求消息,返回響應
        if("heart_req".equals(msg)){
            try {
                WriteBuffer writeBuffer = session.writeBuffer();
                byte[] heartBytes = "heart_rsp".getBytes();
                writeBuffer.writeInt(heartBytes.length);
                writeBuffer.write(heartBytes);
                writeBuffer.flush();
            }catch (Exception e){
            }
            return true;
        }
        //是否爲心跳響應消息
        return "heart_rsp".equals(msg);
    }

3.2 代碼演示

3.2.1 服務端

public class HeartServer {

    private static final Logger LOGGER = LoggerFactory.getLogger(HeartServer.class);

    public static void main(String[] args) throws IOException {
        //定義消息處理器
        AbstractMessageProcessor<String> processor = new AbstractMessageProcessor<String>() {
            @Override
            public void process0(AioSession<String> session, String msg) {
                LOGGER.info("收到客戶端:{}消息:{}", session.getSessionID(), msg);
            }

            @Override
            public void stateEvent0(AioSession<String> session, StateMachineEnum stateMachineEnum, Throwable throwable) {
                switch (stateMachineEnum) {
                    case SESSION_CLOSED:
                        LOGGER.info("客戶端:{} 斷開鏈接", session.getSessionID());
                        break;
                }
            }
        };

        //註冊心跳插件:每隔1秒發送一次心跳請求,5秒內未收到消息超時關閉鏈接
        processor.addPlugin(new HeartPlugin<String>(1, 5, TimeUnit.SECONDS) {
            @Override
            public void sendHeartRequest(AioSession session) throws IOException {
                WriteBuffer writeBuffer = session.writeBuffer();
                byte[] heartBytes = "heart_req".getBytes();
                writeBuffer.writeInt(heartBytes.length);
                writeBuffer.write(heartBytes);
                writeBuffer.flush();
            }

            @Override
            public boolean isHeartMessage(AioSession session, String msg) {
                //心跳請求消息,返回響應
                if ("heart_req".equals(msg)) {
                    try {
                        WriteBuffer writeBuffer = session.writeBuffer();
                        byte[] heartBytes = "heart_rsp".getBytes();
                        writeBuffer.writeInt(heartBytes.length);
                        writeBuffer.write(heartBytes);
                        writeBuffer.flush();
                    } catch (Exception e) {
                    }
                    return true;
                }
                //是否爲心跳響應消息
                if ("heart_rsp".equals(msg)) {
                    LOGGER.info("收到來自客戶端:{} 的心跳響應消息", session.getSessionID());
                    return true;
                }
                return false;
            }
        });

        //啓動服務
        AioQuickServer<String> server = new AioQuickServer<>(8888, new StringProtocol(), processor);
        server.start();
    }
}

3.2.2 客戶端

  • client_1:接受服務端的心跳消息,不作任何迴應
  • client_2:及時響應服務端的心跳消息
public class HeartClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(HeartClient.class);


    public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
        AbstractMessageProcessor<String> client_1_processor = new AbstractMessageProcessor<String>() {
            @Override
            public void process0(AioSession<String> session, String msg) {
                LOGGER.info("client_1 收到服務端消息:" + msg);
            }

            @Override
            public void stateEvent0(AioSession<String> session, StateMachineEnum stateMachineEnum, Throwable throwable) {
                LOGGER.info("stateMachineEnum:{}", stateMachineEnum);
            }
        };
        AioQuickClient<String> client_1 = new AioQuickClient<>("localhost", 8888, new StringProtocol(), client_1_processor);
        client_1.start();

        AbstractMessageProcessor<String> client_2_processor = new AbstractMessageProcessor<String>() {
            @Override
            public void process0(AioSession<String> session, String msg) {
                LOGGER.info("client_2 收到服務端消息:" + msg);
                try {
                    if ("heart_req".equals(msg)) {
                        WriteBuffer writeBuffer = session.writeBuffer();
                        byte[] heartBytes = "heart_rsp".getBytes();
                        writeBuffer.writeInt(heartBytes.length);
                        writeBuffer.write(heartBytes);
                        LOGGER.info("client_2 發送心跳響應消息");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

            @Override
            public void stateEvent0(AioSession<String> session, StateMachineEnum stateMachineEnum, Throwable throwable) {
                LOGGER.info("stateMachineEnum:{}", stateMachineEnum);
            }
        };
        AioQuickClient<String> client_2 = new AioQuickClient<>("localhost", 8888, new StringProtocol(), client_2_processor);
        client_2.start();
    }
}

3.2.3觀察控制檯

服務端
ui

客戶端
編碼

總結

本文圍繞着心跳原理做了簡單的實踐分享。現實場景中若是對接的設備數量高達幾萬,甚至十幾萬,本文的心跳方案是否依舊適用,歡迎一塊兒交流討論。插件

本文涉及到的示例代碼可從smart-socket倉庫中下載設計

相關文章
相關標籤/搜索