因爲在通訊層的網絡鏈接的不可靠性,好比:網絡閃斷,網絡抖動等,常常會出現鏈接斷開。這樣對於使用長鏈接的應用而言,當忽然高流量衝擊勢必會形成進行網絡鏈接,從而產生網絡堵塞,應用響應速度降低,延遲上升,用戶體驗較差。segmentfault
在通訊層的高可用設計中,須要保活長鏈接的網絡,保證通訊可以正常。通常有兩種設計方式:網絡
本文主要介紹使用netty時應用層如何作鏈接保活,提升應用的可用性。框架
TCP協議層面提供了KeepAlive的機制保證鏈接的活躍,可是其有不少劣勢:tcp
因爲以上的緣由,絕大多數的框架、應用處理鏈接的保活性都是在應用層處理。目前的主流方案是心跳檢測,斷線重連。ide
心跳檢測機制:客戶端每隔一段時間發送PING消息給服務端,服務端接受到後回覆PONG消息。客戶端若是在必定時間內沒有收到PONG響應,則認爲鏈接斷開,服務端若是在必定時間內沒有收到來自客戶端的PING請求,則認爲鏈接已經斷開。經過這種來回的PING-PONG消息機制偵測鏈接的活躍性。oop
netty自己也提供了IdleStateHandler用於檢測鏈接閒置,該Handler能夠檢測鏈接未發生讀寫事件而觸發相應事件。操作系統
首先編寫客戶端心跳檢測的Handler:線程
/** * 心跳檢測: * 1. client發送"PING"消息 * * @author huaijin */ public class ClientHeartBeatHandler extends ChannelHandlerAdapter { /** * PING消息 */ private static final String PING = "0"; /** * PONG消息 */ private static final String PONG = "1"; /** * 分隔符 */ private static final String SPLIT = "$_"; /** * 讀取到服務端響應,若是是PONG響應,則打印。若是是非PONG響應,則傳遞至下一個Handler * * @param ctx 處理上下文 * @param msg 消息 * @throws Exception * @author huaijin */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (PONG.equals(msg)) { System.out.println("from heart bean: " + msg); } else { ctx.fireChannelRead(msg); } } /** * 處理觸發的事件,若是是{@link IdleStateEvent},則判斷是讀或者是寫。若是是du,則斷開鏈接; * 若是是寫,則發送PING消息 * * @param ctx 處理上下文 * @param evt 事件 * @throws Exception * @author huaijin */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent idleStateEvent = (IdleStateEvent) evt; switch (idleStateEvent.state()) { case WRITER_IDLE: sendPing(ctx); break; case READER_IDLE: System.out.println("client close connection."); closeConnection(ctx); break; case ALL_IDLE: closeConnection(ctx); break; default: break; } } } /** * 發送PING消息 * * @param ctx 上下文 * @author huaijin */ private void sendPing(ChannelHandlerContext ctx) { System.out.println("send heart beat: " + PING); ctx.writeAndFlush(Unpooled.copiedBuffer((PING + SPLIT).getBytes())); } /** * 關閉鏈接 * * @param ctx * @author huaijin */ private void closeConnection(ChannelHandlerContext ctx) { ctx.disconnect(); ctx.close(); } }
而後再編寫服務單心跳檢測Handler:設計
/** * 心跳檢測: * 1. server端接受到"PING",返回"PONG"消息 * * @author huaijin */ public class ServerHeartBeatHandler extends ChannelHandlerAdapter { /** * PONG消息 */ private static final String PONG = "1"; /** * PING消息 */ private static final String PING = "0"; /** * 消息分隔符 */ private static final String SPLIT = "$_"; /** * 若是是PING消息,則相應PONG。若是非,則傳遞至下個Handler * * @param ctx 上下文 * @param msg 消息 * @throws Exception * @author huaijin */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (PING.equals(msg)) { System.out.println("from heart beat: " + msg); sendPong(ctx); } else { ctx.fireChannelRead(msg); } } /** * 處理觸發事件,若是是讀事件,則關閉鏈接 * * @param ctx 上下文 * @param evt 事件 * @throws Exception * @author huaijin */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent idleStateEvent = (IdleStateEvent) evt; if (idleStateEvent.state() == READER_IDLE) { System.out.println("server close connection."); closeConnection(ctx); } } } /** * 發送PONG消息 * * @param ctx 上下文 * @author huaijin */ private void sendPong(ChannelHandlerContext ctx) { System.out.println("send heart bean: " + PONG); ctx.writeAndFlush(Unpooled.copiedBuffer((PONG + SPLIT).getBytes())); } /** * 關閉鏈接 * * @param ctx 上下文 * @author huaijin */ private void closeConnection(ChannelHandlerContext ctx) { ctx.disconnect(); ctx.close(); } }
經過以上的ClientHeartbeatHandler和ServerHeartBeatHandler和netty自己提供的IdleStateHandler可以完成心跳檢測。netty
Note:
可是IdleStateHandler中有未讀和未寫的事件設置,這裏須要很是着重注意。客戶端的爲讀時間最好設置爲服務端的未寫時間的兩倍,服務端的未讀時間最好設置爲客戶端的未寫時間的兩倍。
小心跳檢測發現鏈接斷開後,爲了保證通訊層的可用性,仍然須要從新鏈接,保證通訊的可靠。對於短線重連通常有兩種設計方式比較常見:
這裏咱們首先看下第一種實現方式,netty中當Bootstrap執行connect操做後,會得到ChannelFuture對象,在該對象上執行close事件的監聽,若是發生了close則提交重連操做。
public void connect(int port, String host) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new IdleStateHandler( 10, 5, 10)); ch.pipeline().addLast(new ClientHeartBeatHandler()); ch.pipeline().addLast(new EchoClientHandler()); } }); ChannelFuture f = b.connect(host, port).sync(); // 監聽channel上的close事件 f.channel().closeFuture().sync(); } finally { // 提交重連操做 executor.execute(() -> { try { System.out.println("reconnection to: " + "127.0.0.1:8080"); connect(8080, "127.0.0.1"); } catch (InterruptedException e) { e.printStackTrace(); } }); } } public static void main(String[] args) throws InterruptedException { new EchoClient().connect(8080, "127.0.0.1"); Thread.currentThread().join(); }
可是該種方式對於應用而言,須要每一個鏈接都有重連的線程,這樣對於資源消耗比較大。建議採用第二種狀況,使用額外的單線程輪循全部的鏈接,檢測其是否活躍。該種方式在開源框架中有應用。
timerExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { reconnect(); } }, SCHEDULE_INTERVAL_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.SECONDS);
使用Java的定時線程池,定時執行重連操做。在重連操做中將檢測鏈接的活躍性,若是非活躍,則執行重連。