就是告訴其它人本身還活着。在簡易RPC框架中,採用的是TCP長鏈接,爲了確保長鏈接有效,就須要客戶端與服務端之間有一種通知機制告知對方的存活狀態。css
在狀態空閒的時候定時給服務端發送消息類型爲PING消息。java
捕獲通道空閒狀態事件,若是接收客戶端PING消息,則發送PONG消息給服務端。若是在必定時間內沒有收到客戶端的PING消息,則說明客戶端已經不在線,此時關閉通道。git
因爲服務端會由於長時間接收不到服務端的PING消息而關閉通道,這就致使緩存在客戶端的鏈接的可用性發生變化。須要將不可用的從可用列表中轉移出去,並對不可用鏈接進行處理,好比直接丟棄或者是從新鏈接。github
ChannelPipeline與handle的關係。netty中的這些handle和spring mvc中的filter做用是相似的,ChannelPipeline能夠理解成handle的容器,裏面能夠被註冊衆多處理不一樣業務功能的事件處理器,好比:web
能夠利用netty提供的IdleStateHandler來發送PING-PONG消息。這個處理器主要是捕獲通道超時事件,主要有三類spring
客戶端捕獲讀寫超時,若是事件觸發就給服務端發送PING消息。緩存
服務端只須要捕獲讀超時便可,當讀超時觸發後就關閉通道。mvc
爲何在空閒狀態才發送心跳消息框架
在正常客戶端與服務端有交互的狀況下,說明雙方都在正常工做不須要額外的心跳來告知對方的存活。只有雙方在必定時間內沒有接收到對方的消息時纔開始採用心跳消息來探測對方的存活,這也是一種提高效率的作法。ide
建立AbstractHeartbeatHandler,並繼承ChannelInboundHandlerAdapter,服務於客戶端與服務端的心跳處理器。在讀取方法中判斷消息類型:
public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception { if(!(msg instanceof RpcMessage)){ channelHandlerContext.fireChannelRead(msg); return; } RpcMessage message=(RpcMessage)msg; if(null==message||null==message.getMessageHeader()){ channelHandlerContext.fireChannelRead(msg); return; } if(message.getMessageHeader().getType()== Constants.MESSAGE_TYPE_HEARTBEAT_PONG){ logger.info("ClientHeartbeatHandler.channelRead0 ,pong data is:{}",message.getMessageBody()); } else if(message.getMessageHeader().getType()== Constants.MESSAGE_TYPE_HEARTBEAT_PING){ this.sendPong(channelHandlerContext); } else { channelHandlerContext.fireChannelRead(msg); } }
空閒狀態事件,能夠根據不一樣的狀態作不一樣的行爲處理,定義三個可重寫事件供客戶端與服務端處理器具體確認處理事件。
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; switch (e.state()) { case READER_IDLE: this.handleReaderIdle(ctx); break; case WRITER_IDLE: this.handleWriterIdle(ctx); break; case ALL_IDLE: this.handleAllIdle(ctx); break; default: break; } } }
繼承抽象心跳處理器,並重寫事件發送PING消息。
public class ClientHeartbeatHandler extends AbstractHeartbeatHandler { @Override protected void handleAllIdle(ChannelHandlerContext ctx) { this.sendPing(ctx); } }
繼承抽象心跳處理器,並重寫事件關閉通道。
public class ServerHeartbeatHandler extends AbstractHeartbeatHandler { @Override protected void handleReaderIdle(ChannelHandlerContext ctx) { logger.info("ServerHeartbeatHandler.handleReaderIdle reader timeout ,close channel"); ctx.close(); } }
好比5秒內未寫入或者讀取通道數據就觸發超時事件。
.addLast(new IdleStateHandler(0, 0, Constants.ALLIDLE_TIME_SECONDS));
好比10秒未接收到通道消息就觸發讀超時事件。
.addLast(new IdleStateHandler(Constants.READER_TIME_SECONDS, 0, 0))
正常狀況下心跳消息顯示以下圖所示,消息的內容能夠根據本身的狀況自行定義。
中止客戶端程序,而後服務端讀超時事件觸發,並關閉通道。
因爲上述的服務端心跳處理器,在觸發讀超時後會關閉通訊管道,這致使客戶端緩存的鏈接狀態會出現不可用的狀況,爲了讓客戶端一直只能取到可用鏈接就須要對從緩存中獲取到的鏈接作狀態判斷,若是可用直接返回,若是不可用則將鏈接從可用列表中刪除而後取下一個可用鏈接。
經過channel的isActive屬性能夠判斷鏈接是否可用,若是不能夠作刪除並從新獲取的操做。
public RpcClientInvoker getInvoker() { // ... int index = loadbalanceService.index(size); RpcClientInvoker invoker= RpcClientInvokerCache.get(index); if(invoker.getChannel().isActive()) { return invoker; } else { RpcClientInvokerCache.removeHandler(invoker); logger.info("invoker is not active,so remove it and get next one"); return this.getInvoker(); } }
啓動一個每隔5秒執行一次任務的線程,定時取出不可用鏈接,而後重連,並將不可用鏈接刪除。
這裏我處理的重連是直接丟棄原有不可用鏈接,而後從新建立新鏈接。
private static final Logger logger = LoggerFactory.getLogger(RpcClientInvokerManager.class); static { executorService.schedule(new Runnable() { @Override public void run() { while (true) { List<RpcClientInvoker> notConnectedHandlers = RpcClientInvokerCache.getNotConnectedHandlers(); if (!CollectionUtils.isEmpty(notConnectedHandlers)) { for (RpcClientInvoker invoker : notConnectedHandlers) { RpcClientInvokerManager.getInstance(referenceConfig).connect(); } RpcClientInvokerCache.clearNotConnectedHandler(); } } } }, Constants.RECONNECT_TIME_SECONDS,TimeUnit.SECONDS); }
https://github.com/jiangmin168168/jim-framework
文中代碼是依賴上述項目的,若是有不明白的可下載源碼
本文中的圖取自於網格