在通訊中設計的心跳消息,一般是爲了檢查網絡鏈路是否正常。雖然TCP協議提供keep-alive機制,但須要在鏈路空閒2小時後才觸發檢測,這顯然對業務很是不友好。當存在大量鏈接異常,而服務端卻須要等2個小時後才感知到的時候,有限的系統資源會被逐漸耗盡,最終沒法爲新鏈接請求繼續提供服務。java
要解決此類問題,業界的廣泛作法是在應用層加入心跳機制。心跳消息能夠是單向心跳也能夠是雙向心跳,所謂單向心跳錶示由服務端或者客戶端的其中一方主動發送心跳請求消息,而另外一方返回響應消息(以下圖)。雙向心跳錶示服務端與客戶端相互發送心跳請求和響應。由於不管何種類型,實現方案都是同樣的,本文以單向心跳爲例給你們作講解。git
心跳消息一般是週期性的發送,或者是在鏈路空閒必定時長後觸發。若是經歷幾個週期後都未收到響應,則能夠視爲鏈路異常。此時能夠繼續嘗試發送心跳,也能夠執行告警並斷開鏈接。網絡
在 smart-socket 中咱們提供了現成的心跳插件 HeartPlugin,能夠很方便的實現心跳。本文是假定讀者朋友對 smart-socket 已有了初步的瞭解,因此不會涉及 smart-socket 的基礎使用,重點描述如何在服務中集成心跳插件。session
在HeartPlugin中有三種心跳策略可供選擇,經過選擇不一樣的構造方案肯定。socket
心跳策略肯定好後,下一步就是如何去發送心跳消息,以及如何識別收到的消息是否爲響應消息。在 HeartPlugin 中已經定義了這兩個接口,須要開發人員去實現處理邏輯:ide
public void sendHeartRequest(AioSession session) throws IOException{ WriteBuffer writeBuffer = session.writeBuffer(); byte[] heartBytes = "heart_req".getBytes(); writeBuffer.writeInt(heartBytes.length); writeBuffer.write(heartBytes); writeBuffer.flush(); }
public boolean isHeartMessage(AioSession session, String msg) { //心跳請求消息,返回響應 if("heart_req".equals(msg)){ try { WriteBuffer writeBuffer = session.writeBuffer(); byte[] heartBytes = "heart_rsp".getBytes(); writeBuffer.writeInt(heartBytes.length); writeBuffer.write(heartBytes); writeBuffer.flush(); }catch (Exception e){ } return true; } //是否爲心跳響應消息 return "heart_rsp".equals(msg); }
public class HeartServer { private static final Logger LOGGER = LoggerFactory.getLogger(HeartServer.class); public static void main(String[] args) throws IOException { //定義消息處理器 AbstractMessageProcessor<String> processor = new AbstractMessageProcessor<String>() { @Override public void process0(AioSession<String> session, String msg) { LOGGER.info("收到客戶端:{}消息:{}", session.getSessionID(), msg); } @Override public void stateEvent0(AioSession<String> session, StateMachineEnum stateMachineEnum, Throwable throwable) { switch (stateMachineEnum) { case SESSION_CLOSED: LOGGER.info("客戶端:{} 斷開鏈接", session.getSessionID()); break; } } }; //註冊心跳插件:每隔1秒發送一次心跳請求,5秒內未收到消息超時關閉鏈接 processor.addPlugin(new HeartPlugin<String>(1, 5, TimeUnit.SECONDS) { @Override public void sendHeartRequest(AioSession session) throws IOException { WriteBuffer writeBuffer = session.writeBuffer(); byte[] heartBytes = "heart_req".getBytes(); writeBuffer.writeInt(heartBytes.length); writeBuffer.write(heartBytes); writeBuffer.flush(); } @Override public boolean isHeartMessage(AioSession session, String msg) { //心跳請求消息,返回響應 if ("heart_req".equals(msg)) { try { WriteBuffer writeBuffer = session.writeBuffer(); byte[] heartBytes = "heart_rsp".getBytes(); writeBuffer.writeInt(heartBytes.length); writeBuffer.write(heartBytes); writeBuffer.flush(); } catch (Exception e) { } return true; } //是否爲心跳響應消息 if ("heart_rsp".equals(msg)) { LOGGER.info("收到來自客戶端:{} 的心跳響應消息", session.getSessionID()); return true; } return false; } }); //啓動服務 AioQuickServer<String> server = new AioQuickServer<>(8888, new StringProtocol(), processor); server.start(); } }
public class HeartClient { private static final Logger LOGGER = LoggerFactory.getLogger(HeartClient.class); public static void main(String[] args) throws IOException, ExecutionException, InterruptedException { AbstractMessageProcessor<String> client_1_processor = new AbstractMessageProcessor<String>() { @Override public void process0(AioSession<String> session, String msg) { LOGGER.info("client_1 收到服務端消息:" + msg); } @Override public void stateEvent0(AioSession<String> session, StateMachineEnum stateMachineEnum, Throwable throwable) { LOGGER.info("stateMachineEnum:{}", stateMachineEnum); } }; AioQuickClient<String> client_1 = new AioQuickClient<>("localhost", 8888, new StringProtocol(), client_1_processor); client_1.start(); AbstractMessageProcessor<String> client_2_processor = new AbstractMessageProcessor<String>() { @Override public void process0(AioSession<String> session, String msg) { LOGGER.info("client_2 收到服務端消息:" + msg); try { if ("heart_req".equals(msg)) { WriteBuffer writeBuffer = session.writeBuffer(); byte[] heartBytes = "heart_rsp".getBytes(); writeBuffer.writeInt(heartBytes.length); writeBuffer.write(heartBytes); LOGGER.info("client_2 發送心跳響應消息"); } } catch (Exception e) { e.printStackTrace(); } } @Override public void stateEvent0(AioSession<String> session, StateMachineEnum stateMachineEnum, Throwable throwable) { LOGGER.info("stateMachineEnum:{}", stateMachineEnum); } }; AioQuickClient<String> client_2 = new AioQuickClient<>("localhost", 8888, new StringProtocol(), client_2_processor); client_2.start(); } }
服務端
ui
客戶端
編碼
本文圍繞着心跳原理做了簡單的實踐分享。現實場景中若是對接的設備數量高達幾萬,甚至十幾萬,本文的心跳方案是否依舊適用,歡迎一塊兒交流討論。插件
本文涉及到的示例代碼可從smart-socket倉庫中下載設計