客戶端和服務端的鏈接屬於socket鏈接,也屬於長鏈接,每每會存在客戶端在鏈接了服務端以後就沒有任何操做了,但仍是佔用了一個鏈接;當愈來愈多相似的客戶端出現就會浪費不少鏈接,netty中能夠經過心跳檢測來找出必定程度(自定義規則判斷哪些鏈接是無效連接)的無效連接並斷開鏈接,保存真正活躍的鏈接。
我理解的心跳檢測應該是客戶端/服務端定時發送一個數據包給服務端/客戶端,檢測對方是否有響應; 若是是存活的鏈接,在必定的時間內應該會收到響應回來的數據包; 若是在必定時間內仍是收不到接收方的響應的話,就能夠當作是掛機,能夠斷開此鏈接; 若是檢測到了掉線以後還能夠進行重連;
idleStateHandler在通道註冊以後會開啓一個定時任務,定時去檢測通道中後續是否還有進行數據傳輸,若是在規定的時間內沒有進行數據傳輸則會觸發對應的超時事件,使用者能夠根據對應的事件自定義規則來判別當前鏈接是不是活躍,是否須要關閉鏈接等來進行操做。 通常idleStateHandler觸發的事件IdleStateEvent會在心跳handler中的userEventTriggered方法中捕獲到對應的超時事件。
IdleStateHandler的繼承關係:經過ChannelDuplexHandler類繼承ChannelInboundHandler和實現ChannelOutboundHandler來實現對入站和出站的重寫和監控java
IdleStateHandler初始化:爲0表明不監控git
/** * @observeOutput 觀察輸出 * @readerIdleTime 讀超時時間 自定義時間內檢測Channel通道有沒有讀取到數據,爲0表明不監控 * @writerIdleTime 寫超時時間 自定義時間內檢測Channel通道有沒有write數據,爲0表明不監控 * @allIdleTime 總超時時間 自定義時間內檢測Channel通道有沒有讀/寫數據,爲0表明不監控 * @unit 時間單位 */ public IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) { if (unit == null) { throw new NullPointerException("unit"); } this.observeOutput = observeOutput; // 初始化讀取空閒時間,最小值爲0 if (readerIdleTime <= 0) { readerIdleTimeNanos = 0; } else { // 定義讀取超時時間爲自定義設置時間 readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS); } if (writerIdleTime <= 0) { writerIdleTimeNanos = 0; } else { // 設置寫超時時間 writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS); } if (allIdleTime <= 0) { allIdleTimeNanos = 0; } else { // 設置總超時時間 allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS); } }
IdleStateHandler和channel通道的關聯:經過類圖能夠得知,idleStateHandler能夠重寫入站和出站的方法,不過只是經過channelRead和write方法來記錄閱讀的時間等不作其餘操做github
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 初始化檢測器,開啓定時任務 initialize(ctx); super.channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // 在通道非活躍狀態的時候銷燬定時任務 destroy(); super.channelInactive(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 若是設置的讀超時時間大於0則設置是否讀操做爲true if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) { reading = true; firstReaderIdleEvent = firstAllIdleEvent = true; } // 記錄時間和標識標誌以後就直接fire當前read到下一個ChannelHandler處理類中 ctx.fireChannelRead(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // 當讀取完畢時,設置是否正在讀取爲false,設置最後讀取時間爲系統當前時間 if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) { lastReadTime = ticksInNanos(); reading = false; } // 一樣直接fire掉,跳入到下一個handler中 ctx.fireChannelReadComplete(); } // idleStateHandler重寫的write方法 @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { // Allow writing with void promise if handler is only configured for read timeout events. if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) { ctx.write(msg, promise.unvoid()).addListener(writeListener); } else { ctx.write(msg, promise); } } // ChannelOutboundHandler提供的write接口方法 /** * Called once a write operation is made. The write operation will write the messages through the * {@link ChannelPipeline}. Those are then ready to be flushed to the actual {@link Channel} once * {@link Channel#flush()} is called * 執行一次寫操做;寫操做經過ChannelPipeline來傳輸信息;最後經過channel的flush()方法來刷新 * * @param ctx the {@link ChannelHandlerContext} for which the write operation is made 實際寫操做者 * @param msg the message to write 寫的消息 * @param promise the {@link ChannelPromise} to notify once the operation completes 在操做完成時當即通知(相似future異步通知) * @throws Exception thrown if an error occurs */ void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
定時器初始化/銷燬:將使用者自定義的超時時間設置爲延遲定時任務bootstrap
初始化: private void initialize(ChannelHandlerContext ctx) { // Avoid the case where destroy() is called before scheduling timeouts. // See: https://github.com/netty/netty/issues/143 // state; // 0 - none, 1 - 初始化, 2 - 銷燬 switch (state) { case 1: case 2: return; } state = 1; initOutputChanged(ctx); // 最後讀取時間 lastReadTime = lastWriteTime = ticksInNanos(); // 若是設置的空閒時間大於0則開啓定時任務進行監控 if (readerIdleTimeNanos > 0) { readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx), readerIdleTimeNanos, TimeUnit.NANOSECONDS); } // 寫超時時間 if (writerIdleTimeNanos > 0) { writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx), writerIdleTimeNanos, TimeUnit.NANOSECONDS); } // 總超時時間 if (allIdleTimeNanos > 0) { allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx), allIdleTimeNanos, TimeUnit.NANOSECONDS); } }
銷燬: /** * @readerIdleTimeout ==> ScheduledFuture類 * @writerIdleTimeout ==> ScheduledFuture類 */ private void destroy() { // 設置銷燬狀態 state = 2; // 銷燬線程 // ScheduledFuture if (readerIdleTimeout != null) { readerIdleTimeout.cancel(false); readerIdleTimeout = null; } if (writerIdleTimeout != null) { writerIdleTimeout.cancel(false); writerIdleTimeout = null; } if (allIdleTimeout != null) { allIdleTimeout.cancel(false); allIdleTimeout = null; } }
開啓定時任務promise
ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) { return ctx.executor().schedule(task, delay, unit); }
讀/寫定時任務:定時任務啓動的時候,經過設置的超時時間和上一次觸發channelRead的時間進行相減比較來判斷是否超時了網絡
// 僅分析讀取超時時間定時任務,寫超時差很少就是觸發的時間不同,比對的變量換成了設置的寫超時時間 private final class ReaderIdleTimeoutTask extends AbstractIdleTask { ReaderIdleTimeoutTask(ChannelHandlerContext ctx) { super(ctx); } @Override protected void run(ChannelHandlerContext ctx) { // readerIdleTimeNanos:初始化IdleStateHandler設置的讀取超時時間 long nextDelay = readerIdleTimeNanos; // 若是沒有任何讀取操做 if (!reading) { // 判斷是否有超時 // nextDelay = nextDelay-(ticksInNanos() - lastReadTime) // 即設置的超時時間減去距離上一次讀取的時間 nextDelay -= ticksInNanos() - lastReadTime; } // 若是小於等於0 則觸發讀取超時事件,設置新的延遲時間 if (nextDelay <= 0) { // Reader is idle - set a new timeout and notify the callback. readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS); // 標記爲第一次 boolean first = firstReaderIdleEvent; // 設置成非第一次 firstReaderIdleEvent = false; try { IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first); channelIdle(ctx, event); } catch (Throwable t) { ctx.fireExceptionCaught(t); } } else { // Read occurred before the timeout - set a new timeout with shorter delay. // 超時的時候發生的讀取事件,則從新延遲執行 readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS); } } }
項目結構異步
├─src │ ├─main │ │ ├─java │ │ │ └─com │ │ │ └─hetangyuese │ │ │ └─netty │ │ │ ├─client │ │ │ │ MyChannelFutureListener.java │ │ │ │ MyClient05.java │ │ │ │ MyClientChannelHandler.java │ │ │ │ MyClientChannelInitializer.java │ │ │ │ │ │ │ └─server │ │ │ │ MyServer05.java │ │ │ │ MyServerChannelInitializer.java │ │ │ │ MyServerHandler.java │ │ │ │ │ │ │ └─decoder
在ChannelPipeline中註冊IdleStateHandlersocket
public class MyServerChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline(). addLast(new StringEncoder(Charset.forName("GBK"))) .addLast(new StringDecoder(Charset.forName("GBK"))) .addLast(new LoggingHandler(LogLevel.INFO)) // 設置讀取超時時間爲5秒,寫超時和總超時爲0即不作監控 .addLast(new IdleStateHandler(5, 0, 0)) .addLast(new MyServerHandler()); } }
服務端處理handler(其中userEventTriggered爲接收心跳任務觸發的事件,此次作了計數三次觸發讀空閒超時則斷開鏈接)ide
public class MyServerHandler extends ChannelInboundHandlerAdapter { private AtomicInteger count = new AtomicInteger(1); /** * 心跳檢測機制會進入 * @param ctx * @param evt * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { System.out.println("心跳檢測觸發了事件, object: , time: " + evt + new Date().toLocaleString()); super.userEventTriggered(ctx, evt); if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; // 客戶端鏈接應該是請求 write if (e.state() == IdleState.READER_IDLE) { System.out.println("服務端監測到了讀取超時"); count.incrementAndGet(); if (count.get() > 3) { System.out.println("客戶端還在?? 已經3次檢測沒有訪問了,我要斷開了哦!!!"); ctx.channel().close(); } } else if (e.state() == IdleState.WRITER_IDLE) { // 若是一直有交互則會發送writer_idle System.out.println("服務端收到了寫入超時"); } else { System.out.println("服務端收到了All_idle"); } } else { super.userEventTriggered(ctx,evt); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("myServerHandler is active, time: " + new Date().toLocaleString()); ctx.writeAndFlush("成功鏈接服務端, 當前時間:" + new Date().toLocaleString()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("服務端與客戶端斷開了鏈接, time: " + new Date().toLocaleString()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("myServerHandler 收到了客戶端的信息 msg:" + msg + ", time: " + new Date().toLocaleString()); ctx.writeAndFlush("您好,客戶端,我是服務端"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
運行結果oop
myServer is start time: 2019-11-8 14:47:16 myServerHandler is active, time: 2019-11-8 14:47:18 十一月 08, 2019 2:47:18 下午 io.netty.handler.logging.LoggingHandler channelRegistered 信息: [id: 0x8d924d06, L:/127.0.0.1:9001 - R:/127.0.0.1:62195] REGISTERED 十一月 08, 2019 2:47:18 下午 io.netty.handler.logging.LoggingHandler channelActive 信息: [id: 0x8d924d06, L:/127.0.0.1:9001 - R:/127.0.0.1:62195] ACTIVE 十一月 08, 2019 2:47:18 下午 io.netty.handler.logging.LoggingHandler write 信息: [id: 0x8d924d06, L:/127.0.0.1:9001 - R:/127.0.0.1:62195] WRITE: 成功鏈接服務端, 當前時間:2019-11-8 14:47:18 十一月 08, 2019 2:47:18 下午 io.netty.handler.logging.LoggingHandler flush 信息: [id: 0x8d924d06, L:/127.0.0.1:9001 - R:/127.0.0.1:62195] FLUSH 心跳檢測觸發了事件, object: , time: io.netty.handler.timeout.IdleStateEvent@a3a0212019-11-8 14:47:23 服務端監測到了讀取超時 心跳檢測觸發了事件, object: , time: io.netty.handler.timeout.IdleStateEvent@18692702019-11-8 14:47:28 服務端監測到了讀取超時 心跳檢測觸發了事件, object: , time: io.netty.handler.timeout.IdleStateEvent@18692702019-11-8 14:47:33 服務端監測到了讀取超時 客戶端還在?? 已經3次檢測沒有訪問了,我要斷開了哦!!! 十一月 08, 2019 2:47:33 下午 io.netty.handler.logging.LoggingHandler close 信息: [id: 0x8d924d06, L:/127.0.0.1:9001 - R:/127.0.0.1:62195] CLOSE 十一月 08, 2019 2:47:33 下午 io.netty.handler.logging.LoggingHandler channelInactive 信息: [id: 0x8d924d06, L:/127.0.0.1:9001 ! R:/127.0.0.1:62195] INACTIVE 服務端與客戶端斷開了鏈接, time: 2019-11-8 14:47:33 十一月 08, 2019 2:47:33 下午 io.netty.handler.logging.LoggingHandler channelUnregistered 信息: [id: 0x8d924d06, L:/127.0.0.1:9001 ! R:/127.0.0.1:62195] UNREGISTERED
出現斷線重連的狀況:
ChannelFutureListener進行重連
public class MyChannelFutureListener implements ChannelFutureListener { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { System.out.println("當前已鏈接"); return; } System.out.println("啓動鏈接客戶端失敗,開始重連"); final EventLoop loop = future.channel().eventLoop(); loop.schedule(new Runnable() { @Override public void run() { try { MyClient05.reConnection(); System.out.println("客戶端重連成功"); } catch (Exception e){ e.printStackTrace(); } } }, 1L, TimeUnit.SECONDS); } }
client啓動 不知道爲啥想要測試listener的效果時,connect的sync()不能帶,是因爲同步阻塞的緣由?
public void start() { EventLoopGroup group = new NioEventLoopGroup(); try { bootstrap = getBootstrap(); bootstrap.group(group) .option(ChannelOption.AUTO_READ, true) .option(ChannelOption.TCP_NODELAY, true) .channel(NioSocketChannel.class) .handler(new MyClientChannelInitializer()); // ChannelFuture future = bootstrap.connect(new InetSocketAddress(ip, port)).sync(); ChannelFuture future = bootstrap.connect(new InetSocketAddress(ip, port)); // 增長監聽 future.addListener(new MyChannelFutureListener()); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } }
直接啓動客戶端,能夠看到listener輸出:啓動鏈接客戶端失敗,開始重連
channelInactive進行重連(我是直接new了一個線程去重連)
@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("客戶端斷開了鏈接, time: " + new Date().toLocaleString()); new Thread(new Runnable() { @Override public void run() { try { new MyClient05("127.0.0.1", 9001).start(); System.out.println("客戶端從新鏈接了服務端 time:" + new Date().toLocaleString()); } catch (Exception e) { e.printStackTrace(); } } }).start(); }
測試方法:
1.能夠直接經過上面的心跳機制斷開鏈接後,客戶端的channelInactive檢測到斷開會自動執行重連
測試結果:
服務端: ------ 心跳檢測觸發了事件, object: , time: io.netty.handler.timeout.IdleStateEvent@ab81552019-11-8 16:43:12 服務端監測到了讀取超時 心跳檢測觸發了事件, object: , time: io.netty.handler.timeout.IdleStateEvent@15192622019-11-8 16:43:17 服務端監測到了讀取超時 心跳檢測觸發了事件, object: , time: io.netty.handler.timeout.IdleStateEvent@15192622019-11-8 16:43:22 服務端監測到了讀取超時 客戶端還在?? 已經3次檢測沒有訪問了,我要斷開了哦!!! 十一月 08, 2019 4:43:22 下午 io.netty.handler.logging.LoggingHandler close 信息: [id: 0x6bfc0d90, L:/192.168.0.118:9001 - R:/192.168.0.118:51031] CLOSE 十一月 08, 2019 4:43:22 下午 io.netty.handler.logging.LoggingHandler channelInactive 信息: [id: 0x6bfc0d90, L:/192.168.0.118:9001 ! R:/192.168.0.118:51031] INACTIVE 十一月 08, 2019 4:43:22 下午 io.netty.handler.logging.LoggingHandler channelUnregistered 信息: [id: 0x6bfc0d90, L:/192.168.0.118:9001 ! R:/192.168.0.118:51031] UNREGISTERED 服務端與客戶端斷開了鏈接, time: 2019-11-8 16:43:22 十一月 08, 2019 4:43:22 下午 io.netty.handler.logging.LoggingHandler channelRegistered 信息: [id: 0xbcb2ec62, L:/127.0.0.1:9001 - R:/127.0.0.1:51061] REGISTERED 十一月 08, 2019 4:43:22 下午 io.netty.handler.logging.LoggingHandler channelActive 信息: [id: 0xbcb2ec62, L:/127.0.0.1:9001 - R:/127.0.0.1:51061] ACTIVE myServerHandler is active, time: 2019-11-8 16:43:22 十一月 08, 2019 4:43:22 下午 io.netty.handler.logging.LoggingHandler write 信息: [id: 0xbcb2ec62, L:/127.0.0.1:9001 - R:/127.0.0.1:51061] WRITE: 成功鏈接服務端, 當前時間:2019-11-8 16:43:22 十一月 08, 2019 4:43:22 下午 io.netty.handler.logging.LoggingHandler flush 信息: [id: 0xbcb2ec62, L:/127.0.0.1:9001 - R:/127.0.0.1:51061] FLUSH 十一月 08, 2019 4:43:26 下午 io.netty.handler.logging.LoggingHandler channelReadComplete ------------------------------------------------------------------------------------- 客戶端: ------- 當前已鏈接 客戶端與服務端創建了鏈接 time: 2019-11-8 16:43:07 客戶端接收到了服務的響應的數據 msg: 成功鏈接服務端, 當前時間:2019-11-8 16:43:07, time: 2019-11-8 16:43:07 客戶端斷開了鏈接, time: 2019-11-8 16:43:22 當前已鏈接 客戶端與服務端創建了鏈接 time: 2019-11-8 16:43:22 客戶端接收到了服務的響應的數據 msg: 成功鏈接服務端, 當前時間:2019-11-8 16:43:22, time: 2019-11-8 16:43:22