簡易RPC框架-心跳與重連機制

心跳

就是告訴其它人本身還活着。在簡易RPC框架中,採用的是TCP長鏈接,爲了確保長鏈接有效,就須要客戶端與服務端之間有一種通知機制告知對方的存活狀態。css

如何實現

客戶端發送心跳消息

在狀態空閒的時候定時給服務端發送消息類型爲PING消息。java

服務端接收心跳消息

捕獲通道空閒狀態事件,若是接收客戶端PING消息,則發送PONG消息給服務端。若是在必定時間內沒有收到客戶端的PING消息,則說明客戶端已經不在線,此時關閉通道。git

客戶端管理可用鏈接

因爲服務端會由於長時間接收不到服務端的PING消息而關閉通道,這就致使緩存在客戶端的鏈接的可用性發生變化。須要將不可用的從可用列表中轉移出去,並對不可用鏈接進行處理,好比直接丟棄或者是從新鏈接。github

預備知識

ChannelPipeline與handle的關係。netty中的這些handle和spring mvc中的filter做用是相似的,ChannelPipeline能夠理解成handle的容器,裏面能夠被註冊衆多處理不一樣業務功能的事件處理器,好比:web

  • 編碼
  • 解碼
  • 心跳
  • 權限
  • 加密
  • 解密
  • 業務代碼執行
  • ......

具體實現

空閒狀態處理器

能夠利用netty提供的IdleStateHandler來發送PING-PONG消息。這個處理器主要是捕獲通道超時事件,主要有三類spring

  • 讀超時,必定時間內沒有從通道內讀取到任何數據
  • 寫超時,必定時間內沒有從通道內寫入任何數據
  • 讀寫超時,必定時間內沒有從通道內讀取或者是寫入任何數據

客戶端加入空閒狀態處理器

客戶端捕獲讀寫超時,若是事件觸發就給服務端發送PING消息。緩存

服務端加入空閒狀態處理器

服務端只須要捕獲讀超時便可,當讀超時觸發後就關閉通道。mvc

爲何在空閒狀態才發送心跳消息框架

在正常客戶端與服務端有交互的狀況下,說明雙方都在正常工做不須要額外的心跳來告知對方的存活。只有雙方在必定時間內沒有接收到對方的消息時纔開始採用心跳消息來探測對方的存活,這也是一種提高效率的作法。ide

抽象心跳處理器

建立AbstractHeartbeatHandler,並繼承ChannelInboundHandlerAdapter,服務於客戶端與服務端的心跳處理器。在讀取方法中判斷消息類型:

  • 若是是PING消息就發送PONG消息給客戶端
  • 若是收到的是PONG消息,則直接打印消息說明客戶端已經成功接收到服務端返回的PONG消息
  • 若是是其它類型的消息,則通知下一個處理器處理消息
public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {

        if(!(msg instanceof RpcMessage)){
            channelHandlerContext.fireChannelRead(msg);
            return;
        }
        RpcMessage message=(RpcMessage)msg;

        if(null==message||null==message.getMessageHeader()){
            channelHandlerContext.fireChannelRead(msg);
            return;
        }
        if(message.getMessageHeader().getType()== Constants.MESSAGE_TYPE_HEARTBEAT_PONG){
            logger.info("ClientHeartbeatHandler.channelRead0 ,pong data is:{}",message.getMessageBody());
        }
        else if(message.getMessageHeader().getType()== Constants.MESSAGE_TYPE_HEARTBEAT_PING){
            this.sendPong(channelHandlerContext);
        }
        else {
            channelHandlerContext.fireChannelRead(msg);
        }

    }

空閒狀態事件,能夠根據不一樣的狀態作不一樣的行爲處理,定義三個可重寫事件供客戶端與服務端處理器具體確認處理事件。

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            switch (e.state()) {
                case READER_IDLE:
                    this.handleReaderIdle(ctx);
                    break;
                case WRITER_IDLE:
                    this.handleWriterIdle(ctx);
                    break;
                case ALL_IDLE:
                    this.handleAllIdle(ctx);
                    break;
                default:
                    break;
            }
        }
    }

客戶端心跳處理器

繼承抽象心跳處理器,並重寫事件發送PING消息。

public class ClientHeartbeatHandler extends AbstractHeartbeatHandler {

    @Override
    protected void handleAllIdle(ChannelHandlerContext ctx) {
        this.sendPing(ctx);
    }
}

服務端心跳處理器

繼承抽象心跳處理器,並重寫事件關閉通道。

public class ServerHeartbeatHandler extends AbstractHeartbeatHandler {

    @Override
    protected void handleReaderIdle(ChannelHandlerContext ctx) {
        logger.info("ServerHeartbeatHandler.handleReaderIdle reader timeout ,close channel");
        ctx.close();
    }

}

客戶端ChannelPipeline中加入心跳處理器

好比5秒內未寫入或者讀取通道數據就觸發超時事件。

.addLast(new IdleStateHandler(0, 0, Constants.ALLIDLE_TIME_SECONDS));

服務端ChannelPipeline中加入心跳處理器

好比10秒未接收到通道消息就觸發讀超時事件。

.addLast(new IdleStateHandler(Constants.READER_TIME_SECONDS, 0, 0))

客戶端消息示例

正常狀況下心跳消息顯示以下圖所示,消息的內容能夠根據本身的狀況自行定義。

客戶端下線消息示例

中止客戶端程序,而後服務端讀超時事件觸發,並關閉通道。

客戶端可用鏈接管理

因爲上述的服務端心跳處理器,在觸發讀超時後會關閉通訊管道,這致使客戶端緩存的鏈接狀態會出現不可用的狀況,爲了讓客戶端一直只能取到可用鏈接就須要對從緩存中獲取到的鏈接作狀態判斷,若是可用直接返回,若是不可用則將鏈接從可用列表中刪除而後取下一個可用鏈接。

修改獲取鏈接方法

經過channel的isActive屬性能夠判斷鏈接是否可用,若是不能夠作刪除並從新獲取的操做。

public RpcClientInvoker getInvoker() {
        // ...
        int index = loadbalanceService.index(size);
        RpcClientInvoker invoker= RpcClientInvokerCache.get(index);
        if(invoker.getChannel().isActive()) {
            return invoker;
        }
        else {
            RpcClientInvokerCache.removeHandler(invoker);
            logger.info("invoker is not active,so remove it and get next one");
            return this.getInvoker();
        }
    }

後臺啓動任務處理不可用鏈接

啓動一個每隔5秒執行一次任務的線程,定時取出不可用鏈接,而後重連,並將不可用鏈接刪除。

這裏我處理的重連是直接丟棄原有不可用鏈接,而後從新建立新鏈接。

private static final Logger logger = LoggerFactory.getLogger(RpcClientInvokerManager.class);

    static {
        executorService.schedule(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    List<RpcClientInvoker> notConnectedHandlers = RpcClientInvokerCache.getNotConnectedHandlers();
                    if (!CollectionUtils.isEmpty(notConnectedHandlers)) {
                        for (RpcClientInvoker invoker : notConnectedHandlers) {
                            RpcClientInvokerManager.getInstance(referenceConfig).connect();
                        }
                        RpcClientInvokerCache.clearNotConnectedHandler();
                    }
                }
            }
        }, Constants.RECONNECT_TIME_SECONDS,TimeUnit.SECONDS);

    }

本文源碼

https://github.com/jiangmin168168/jim-framework

文中代碼是依賴上述項目的,若是有不明白的可下載源碼

引用

本文中的圖取自於網格

相關文章
相關標籤/搜索