做者:sprinkle_liz
www.jianshu.com/p/1a28e48edd92
心跳機制java
何爲心跳面試
所謂心跳, 即在 TCP 長鏈接中, 客戶端和服務器之間按期發送的一種特殊的數據包, 通知對方本身還在線, 以確保 TCP 鏈接的有效性.bootstrap
注:心跳包還有另外一個做用,常常被忽略,即: 一個鏈接若是長時間不用,防火牆或者路由器就會斷開該鏈接。
如何實現後端
核心Handler —— IdleStateHandler服務器
在 Netty 中, 實現心跳機制的關鍵是 IdleStateHandler, 那麼這個 Handler 如何使用呢? 先看下它的構造器:多線程
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) { this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS); }
這裏解釋下三個參數的含義:架構
注:這三個參數默認的時間單位是秒。若須要指定其餘時間單位,可使用另外一個構造方法:IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)
在看下面的實現以前,建議先了解一下IdleStateHandler的實現原理。dom
下面直接上代碼,須要注意的地方,會在代碼中經過註釋進行說明。tcp
使用IdleStateHandler實現心跳ide
下面將使用IdleStateHandler來實現心跳,Client端鏈接到Server端後,會循環執行一個任務:隨機等待幾秒,而後ping一下Server端,即發送一個心跳包。當等待的時間超過規定時間,將會發送失敗,覺得Server端在此以前已經主動斷開鏈接了。代碼以下:
Client端
ClientIdleStateTrigger —— 心跳觸發器
類ClientIdleStateTrigger也是一個Handler,只是重寫了userEventTriggered方法,用於捕獲IdleState.WRITER_IDLE事件(未在指定時間內向服務器發送數據),而後向Server端發送一個心跳包。
/** * <p> * 用於捕獲{@link IdleState#WRITER_IDLE}事件(未在指定時間內向服務器發送數據),而後向<code>Server</code>端發送一個心跳包。 * </p> */ public class ClientIdleStateTrigger extends ChannelInboundHandlerAdapter { public static final String HEART_BEAT = "heart beat!"; @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleState state = ((IdleStateEvent) evt).state(); if (state == IdleState.WRITER_IDLE) { // write heartbeat to server ctx.writeAndFlush(HEART_BEAT); } } else { super.userEventTriggered(ctx, evt); } } }
Pinger —— 心跳發射器
/** * <p>客戶端鏈接到服務器端後,會循環執行一個任務:隨機等待幾秒,而後ping一下Server端,即發送一個心跳包。</p> */ public class Pinger extends ChannelInboundHandlerAdapter { private Random random = new Random(); private int baseRandom = 8; private Channel channel; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); this.channel = ctx.channel(); ping(ctx.channel()); } private void ping(Channel channel) { int second = Math.max(1, random.nextInt(baseRandom)); System.out.println("next heart beat will send after " + second + "s."); ScheduledFuture<?> future = channel.eventLoop().schedule(new Runnable() { @Override public void run() { if (channel.isActive()) { System.out.println("sending heart beat to the server..."); channel.writeAndFlush(ClientIdleStateTrigger.HEART_BEAT); } else { System.err.println("The connection had broken, cancel the task that will send a heart beat."); channel.closeFuture(); throw new RuntimeException(); } } }, second, TimeUnit.SECONDS); future.addListener(new GenericFutureListener() { @Override public void operationComplete(Future future) throws Exception { if (future.isSuccess()) { ping(channel); } } }); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 當Channel已經斷開的狀況下, 仍然發送數據, 會拋異常, 該方法會被調用. cause.printStackTrace(); ctx.close(); } }
ClientHandlersInitializer —— 客戶端處理器集合的初始化類
public class ClientHandlersInitializer extends ChannelInitializer<SocketChannel> { private ReconnectHandler reconnectHandler; private EchoHandler echoHandler; public ClientHandlersInitializer(TcpClient tcpClient) { Assert.notNull(tcpClient, "TcpClient can not be null."); this.reconnectHandler = new ReconnectHandler(tcpClient); this.echoHandler = new EchoHandler(); } @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast(new LengthFieldPrepender(4)); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new Pinger()); } }
注: 上面的Handler集合,除了Pinger,其餘都是編解碼器和解決粘包,能夠忽略。
TcpClient —— TCP鏈接的客戶端
public class TcpClient { private String host; private int port; private Bootstrap bootstrap; /** 將<code>Channel</code>保存起來, 可用於在其餘非handler的地方發送數據 */ private Channel channel; public TcpClient(String host, int port) { this(host, port, new ExponentialBackOffRetry(1000, Integer.MAX_VALUE, 60 * 1000)); } public TcpClient(String host, int port, RetryPolicy retryPolicy) { this.host = host; this.port = port; init(); } /** * 向遠程TCP服務器請求鏈接 */ public void connect() { synchronized (bootstrap) { ChannelFuture future = bootstrap.connect(host, port); this.channel = future.channel(); } } private void init() { EventLoopGroup group = new NioEventLoopGroup(); // bootstrap 可重用, 只需在TcpClient實例化的時候初始化便可. bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ClientHandlersInitializer(TcpClient.this)); } public static void main(String[] args) { TcpClient tcpClient = new TcpClient("localhost", 2222); tcpClient.connect(); } }
Server端
ServerIdleStateTrigger —— 斷連觸發器
/** * <p>在規定時間內未收到客戶端的任何數據包, 將主動斷開該鏈接</p> */ public class ServerIdleStateTrigger extends ChannelInboundHandlerAdapter { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleState state = ((IdleStateEvent) evt).state(); if (state == IdleState.READER_IDLE) { // 在規定時間內沒有收到客戶端的上行數據, 主動斷開鏈接 ctx.disconnect(); } } else { super.userEventTriggered(ctx, evt); } } }
ServerBizHandler —— 服務器端的業務處理器
/** * <p>收到來自客戶端的數據包後, 直接在控制檯打印出來.</p> */ @ChannelHandler.Sharable public class ServerBizHandler extends SimpleChannelInboundHandler<String> { private final String REC_HEART_BEAT = "I had received the heart beat!"; @Override protected void channelRead0(ChannelHandlerContext ctx, String data) throws Exception { try { System.out.println("receive data: " + data); // ctx.writeAndFlush(REC_HEART_BEAT); } catch (Exception e) { e.printStackTrace(); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("Established connection with the remote client."); // do something ctx.fireChannelActive(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("Disconnected with the remote client."); // do something ctx.fireChannelInactive(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
ServerHandlerInitializer —— 服務器端處理器集合的初始化類
/** * <p>用於初始化服務器端涉及到的全部<code>Handler</code></p> */ public class ServerHandlerInitializer extends ChannelInitializer<SocketChannel> { protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(5, 0, 0)); ch.pipeline().addLast("idleStateTrigger", new ServerIdleStateTrigger()); ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(4)); ch.pipeline().addLast("decoder", new StringDecoder()); ch.pipeline().addLast("encoder", new StringEncoder()); ch.pipeline().addLast("bizHandler", new ServerBizHandler()); } }
注:new IdleStateHandler(5, 0, 0)該handler表明若是在5秒內沒有收到來自客戶端的任何數據包(包括但不限於心跳包),將會主動斷開與該客戶端的鏈接。
TcpServer —— 服務器端
public class TcpServer { private int port; private ServerHandlerInitializer serverHandlerInitializer; public TcpServer(int port) { this.port = port; this.serverHandlerInitializer = new ServerHandlerInitializer(); } public void start() { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(this.serverHandlerInitializer); // 綁定端口,開始接收進來的鏈接 ChannelFuture future = bootstrap.bind(port).sync(); System.out.println("Server start listen at " + port); future.channel().closeFuture().sync(); } catch (Exception e) { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); e.printStackTrace(); } } public static void main(String[] args) throws Exception { int port = 2222; new TcpServer(port).start(); } }
至此,全部代碼已經編寫完畢。
測試
首先啓動客戶端,再啓動服務器端。啓動完成後,在客戶端的控制檯上,能夠看到打印以下相似日誌:
客戶端控制檯輸出的日誌
在服務器端能夠看到控制檯輸出了相似以下的日誌:
服務器端控制檯輸出的日誌
能夠看到,客戶端在發送4個心跳包後,第5個包由於等待時間較長,等到真正發送的時候,發現鏈接已斷開了;而服務器端收到客戶端的4個心跳數據包後,遲遲等不到下一個數據包,因此果斷斷開該鏈接。
異常狀況
在測試過程當中,有可能會出現以下狀況:
異常狀況
出現這種狀況的緣由是:在鏈接已斷開的狀況下,仍然向服務器端發送心跳包。雖然在發送心跳包以前會使用channel.isActive()判斷鏈接是否可用,但也有可能上一刻判斷結果爲可用,但下一刻發送數據包以前,鏈接就斷了。
目前還沒有找到優雅處理這種狀況的方案,各位看官若是有好的解決方案,還望不吝賜教。拜謝!!!
斷線重連
斷線重連這裏就不過多介紹,相信各位都知道是怎麼回事。這裏只說大體思路,而後直接上代碼。
實現思路
客戶端在監測到與服務器端的鏈接斷開後,或者一開始就沒法鏈接的狀況下,使用指定的重連策略進行重連操做,直到從新創建鏈接或重試次數耗盡。
對於如何監測鏈接是否斷開,則是經過重寫ChannelInboundHandler#channelInactive來實現,但鏈接不可用,該方法會被觸發,因此只須要在該方法作好重連工做便可。
代碼實現
注:如下代碼都是在上面心跳機制的基礎上修改/添加的。
由於斷線重連是客戶端的工做,因此只需對客戶端代碼進行修改。
重試策略
RetryPolicy —— 重試策略接口
public interface RetryPolicy { /** * Called when an operation has failed for some reason. This method should return * true to make another attempt. * * @param retryCount the number of times retried so far (0 the first time) * @return true/false */ boolean allowRetry(int retryCount); /** * get sleep time in ms of current retry count. * * @param retryCount current retry count * @return the time to sleep */ long getSleepTimeMs(int retryCount); }
ExponentialBackOffRetry —— 重連策略的默認實現
/** * <p>Retry policy that retries a set number of times with increasing sleep time between retries</p> */ public class ExponentialBackOffRetry implements RetryPolicy { private static final int MAX_RETRIES_LIMIT = 29; private static final int DEFAULT_MAX_SLEEP_MS = Integer.MAX_VALUE; private final Random random = new Random(); private final long baseSleepTimeMs; private final int maxRetries; private final int maxSleepMs; public ExponentialBackOffRetry(int baseSleepTimeMs, int maxRetries) { this(baseSleepTimeMs, maxRetries, DEFAULT_MAX_SLEEP_MS); } public ExponentialBackOffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs) { this.maxRetries = maxRetries; this.baseSleepTimeMs = baseSleepTimeMs; this.maxSleepMs = maxSleepMs; } @Override public boolean allowRetry(int retryCount) { if (retryCount < maxRetries) { return true; } return false; } @Override public long getSleepTimeMs(int retryCount) { if (retryCount < 0) { throw new IllegalArgumentException("retries count must greater than 0."); } if (retryCount > MAX_RETRIES_LIMIT) { System.out.println(String.format("maxRetries too large (%d). Pinning to %d", maxRetries, MAX_RETRIES_LIMIT)); retryCount = MAX_RETRIES_LIMIT; } long sleepMs = baseSleepTimeMs * Math.max(1, random.nextInt(1 << retryCount)); if (sleepMs > maxSleepMs) { System.out.println(String.format("Sleep extension too large (%d). Pinning to %d", sleepMs, maxSleepMs)); sleepMs = maxSleepMs; } return sleepMs; } }
ReconnectHandler—— 重連處理器
@ChannelHandler.Sharable public class ReconnectHandler extends ChannelInboundHandlerAdapter { private int retries = 0; private RetryPolicy retryPolicy; private TcpClient tcpClient; public ReconnectHandler(TcpClient tcpClient) { this.tcpClient = tcpClient; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("Successfully established a connection to the server."); retries = 0; ctx.fireChannelActive(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (retries == 0) { System.err.println("Lost the TCP connection with the server."); ctx.close(); } boolean allowRetry = getRetryPolicy().allowRetry(retries); if (allowRetry) { long sleepTimeMs = getRetryPolicy().getSleepTimeMs(retries); System.out.println(String.format("Try to reconnect to the server after %dms. Retry count: %d.", sleepTimeMs, ++retries)); final EventLoop eventLoop = ctx.channel().eventLoop(); eventLoop.schedule(() -> { System.out.println("Reconnecting ..."); tcpClient.connect(); }, sleepTimeMs, TimeUnit.MILLISECONDS); } ctx.fireChannelInactive(); } private RetryPolicy getRetryPolicy() { if (this.retryPolicy == null) { this.retryPolicy = tcpClient.getRetryPolicy(); } return this.retryPolicy; } }
ClientHandlersInitializer
在以前的基礎上,添加了重連處理器ReconnectHandler。
public class ClientHandlersInitializer extends ChannelInitializer<SocketChannel> { private ReconnectHandler reconnectHandler; private EchoHandler echoHandler; public ClientHandlersInitializer(TcpClient tcpClient) { Assert.notNull(tcpClient, "TcpClient can not be null."); this.reconnectHandler = new ReconnectHandler(tcpClient); this.echoHandler = new EchoHandler(); } @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(this.reconnectHandler); pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast(new LengthFieldPrepender(4)); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new Pinger()); } }
TcpClient
在以前的基礎上添加劇連、重連策略的支持。
public class TcpClient { private String host; private int port; private Bootstrap bootstrap; /** 重連策略 */ private RetryPolicy retryPolicy; /** 將<code>Channel</code>保存起來, 可用於在其餘非handler的地方發送數據 */ private Channel channel; public TcpClient(String host, int port) { this(host, port, new ExponentialBackOffRetry(1000, Integer.MAX_VALUE, 60 * 1000)); } public TcpClient(String host, int port, RetryPolicy retryPolicy) { this.host = host; this.port = port; this.retryPolicy = retryPolicy; init(); } /** * 向遠程TCP服務器請求鏈接 */ public void connect() { synchronized (bootstrap) { ChannelFuture future = bootstrap.connect(host, port); future.addListener(getConnectionListener()); this.channel = future.channel(); } } public RetryPolicy getRetryPolicy() { return retryPolicy; } private void init() { EventLoopGroup group = new NioEventLoopGroup(); // bootstrap 可重用, 只需在TcpClient實例化的時候初始化便可. bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ClientHandlersInitializer(TcpClient.this)); } private ChannelFutureListener getConnectionListener() { return new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { future.channel().pipeline().fireChannelInactive(); } } }; } public static void main(String[] args) { TcpClient tcpClient = new TcpClient("localhost", 2222); tcpClient.connect(); } }
測試
在測試以前,爲了避開 Connection reset by peer 異常,能夠稍微修改Pinger的ping()方法,添加if (second == 5)的條件判斷。以下:
private void ping(Channel channel) { int second = Math.max(1, random.nextInt(baseRandom)); if (second == 5) { second = 6; } System.out.println("next heart beat will send after " + second + "s."); ScheduledFuture<?> future = channel.eventLoop().schedule(new Runnable() { @Override public void run() { if (channel.isActive()) { System.out.println("sending heart beat to the server..."); channel.writeAndFlush(ClientIdleStateTrigger.HEART_BEAT); } else { System.err.println("The connection had broken, cancel the task that will send a heart beat."); channel.closeFuture(); throw new RuntimeException(); } } }, second, TimeUnit.SECONDS); future.addListener(new GenericFutureListener() { @Override public void operationComplete(Future future) throws Exception { if (future.isSuccess()) { ping(channel); } } }); }
啓動客戶端
先只啓動客戶端,觀察控制檯輸出,能夠看到相似以下日誌:
斷線重連測試——客戶端控制檯輸出
能夠看到,當客戶端發現沒法鏈接到服務器端,因此一直嘗試重連。隨着重試次數增長,重試時間間隔越大,但又不想無限增大下去,因此須要定一個閾值,好比60s。如上圖所示,當下一次重試時間超過60s時,會打印Sleep extension too large(*). Pinning to 60000,單位爲ms。出現這句話的意思是,計算出來的時間超過閾值(60s),因此把真正睡眠的時間重置爲閾值(60s)。
啓動服務器端
接着啓動服務器端,而後繼續觀察客戶端控制檯輸出。
斷線重連測試——服務器端啓動後客戶端控制檯輸出
能夠看到,在第9次重試失敗後,第10次重試以前,啓動的服務器,因此第10次重連的結果爲Successfully established a connection to the server.,即成功鏈接到服務器。接下來由於仍是不定時ping服務器,因此出現斷線重連、斷線重連的循環。
擴展
在不一樣環境,可能會有不一樣的重連需求。有不一樣的重連需求的,只需本身實現RetryPolicy接口,而後在建立TcpClient的時候覆蓋默認的重連策略便可。
完!!!
推薦去個人博客閱讀更多:
2.Spring MVC、Spring Boot、Spring Cloud 系列教程
3.Maven、Git、Eclipse、Intellij IDEA 系列工具教程
以爲不錯,別忘了點贊+轉發哦!