netty源碼解析(4.0)-17 ChannelHandler: IdleStateHandler實現

   io.netty.handler.timeout.IdleStateHandler功能是監測Channel上read, write或者這二者的空閒狀態。當Channel超過了指定的空閒時間時,這個Handler會觸發一個IdleStateEvent事件。html

  在第一次檢測到Channel變成active狀態時向EventExecutor中提交三個延遲任務:git

    ReaderIdleTimeoutTask: 檢測read空閒超時。github

    WriterIdleTimeoutTask: 檢測write空閒超時。promise

    AllIdleTimeoutTask: 檢測全部的空閒超時。ide

  任何一個延遲任務檢測到空閒超時是會觸發一個IdleStateEvent。不管如何,延遲任務都會再次把本身提交到EventExecutor中,等待下次執行。oop

  三個延遲任務對應於三個超時時間,都是能夠獨立設置的:this

 1 public IdleStateHandler(boolean observeOutput,  2             long readerIdleTime, long writerIdleTime, long allIdleTime,  3  TimeUnit unit) {  4         if (unit == null) {  5             throw new NullPointerException("unit");  6  }  7 
 8         this.observeOutput = observeOutput;  9 
10         if (readerIdleTime <= 0) { 11             readerIdleTimeNanos = 0; 12         } else { 13             readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS); 14  } 15         if (writerIdleTime <= 0) { 16             writerIdleTimeNanos = 0; 17         } else { 18             writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS); 19  } 20         if (allIdleTime <= 0) { 21             allIdleTimeNanos = 0; 22         } else { 23             allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS); 24  } 25     }

  這個類繼承自io.netty.channel.ChannelDuplexHandler, 它是一個有狀態的ChannelHandler, 定義了三個狀態:spa

  private byte state; // 0 - none, 1 - initialized, 2 - destroyed.net

  state屬性保存了它的狀態。0:初始狀態,1:已經初始化, 2: 已經銷燬。netty

  這個ChannelHandler被加入到Channel的pipeline中以後,在Channel已經被register到EventLoop中,且處於Active狀態時,會執行一次初始化操做,向EventExecutor提交前面提到的三個延遲任務。這初始化操做在initialize方法中實現。

 1     private void initialize(ChannelHandlerContext ctx) {  2         // Avoid the case where destroy() is called before scheduling timeouts.  3         // See: https://github.com/netty/netty/issues/143
 4         switch (state) {  5         case 1:  6         case 2:  7             return;  8  }  9 
10         state = 1; 11  initOutputChanged(ctx); 12 
13         lastReadTime = lastWriteTime = ticksInNanos(); 14         if (readerIdleTimeNanos > 0) { 15             readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx), 16  readerIdleTimeNanos, TimeUnit.NANOSECONDS); 17  } 18         if (writerIdleTimeNanos > 0) { 19             writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx), 20  writerIdleTimeNanos, TimeUnit.NANOSECONDS); 21  } 22         if (allIdleTimeNanos > 0) { 23             allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx), 24  allIdleTimeNanos, TimeUnit.NANOSECONDS); 25  } 26     }

 

  第4-10行,只有處於初始狀態時才執行後面的操做,避免屢次提交定時任務。

  第11行, 初始化對對Channel的outboundBuffer變化的監視,只有當observeOutput屬性設置爲true時纔開啓這個監視。

  第13-25行,分別提交三個延遲任務。

 

  initialize方法可能在三個地方被調用:

 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isActive() && ctx.channel().isRegistered()) { // channelActive() event has been fired already, which means this.channelActive() will // not be invoked. We have to initialize here instead.
 initialize(ctx); } else { // channelActive() event has not been fired yet. this.channelActive() will be invoked // and initialization will occur there.
 } } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { // Initialize early if channel is active already.
        if (ctx.channel().isActive()) { initialize(ctx); } super.channelRegistered(ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // This method will be invoked only if this handler was added // before channelActive() event is fired. If a user adds this handler // after the channelActive() event, initialize() will be called by beforeAdd().
 initialize(ctx); super.channelActive(ctx); }

 

  若是在Channel初始化的時候把這個Handler添加到pipeline中,那麼這個Handler的channelActive方法必定會被調用,只須要在channleActive中調用initialize就能夠打了。可是Handler能夠在任什麼時候候被加入到pipleline中。當ChannelHandler被添加到pipeline中時,Channel可能已經被register到EventLoop中,且已經處於Active狀態,這種狀況下,channelRegistered和channelActive方法都不會被調用,因此必須在handlerAdded中調用initialize。若是此時,Channnel已經處於Active狀態,但還沒被註冊到EventLoop,只能在channelRegisted中調用initialize。

  

  初始化完成以後,延遲任務到期執行時會把本身再次提交到EventExecutor中,等待下次執行。同時會檢查是否知足觸發事件的條件,若是是就觸發一條自定義的事件。

  

read空閒超時檢查

 1 private final class ReaderIdleTimeoutTask extends AbstractIdleTask {  2  @Override  3         protected void run(ChannelHandlerContext ctx) {  4             long nextDelay = readerIdleTimeNanos;  5             if (!reading) {  6                 nextDelay -= ticksInNanos() - lastReadTime;  7  }  8 
 9             if (nextDelay <= 0) { 10                 // Reader is idle - set a new timeout and notify the callback.
11                 readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS); 12 
13                 boolean first = firstReaderIdleEvent; 14                 firstReaderIdleEvent = false; 15 
16                 try { 17                     IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first); 18  channelIdle(ctx, event); 19                 } catch (Throwable t) { 20  ctx.fireExceptionCaught(t); 21  } 22             } else { 23                 // Read occurred before the timeout - set a new timeout with shorter delay.
24                 readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS); 25  } 26  } 27     }

  4-9行,判斷是否read空閒超時。

  11-21行,read空閒超時,從新把本身提交成延遲任務。

  24行,read沒有空閒超時,從新把本身提交成延遲任務。

  這裏的關鍵是判斷read空閒超時。lastReadTime是最近一次執行read的時間,readerIdleTimeNanos是初始化時設置的空閒超時時間,所以若是readerIdleTimeNanos - (ticksInNanos() - lastReadtime)  <= 0,表示已經read空閒超時了。使人困惑的是第5行,只有在reading==false才檢查進行空閒超時的計算。筆者在<<netty源碼解解析(4.0)-14 Channel NIO實現:讀取數據>>一章中分析過Channel read的實現。一次read操做或觸發多個read和一個readComplete事件,read操做由多個步驟組成。這reading屬性用來表示正在read的狀態。

 1  @Override  2     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  3         if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {  4             reading = true;  5             firstReaderIdleEvent = firstAllIdleEvent = true;  6  }  7  ctx.fireChannelRead(msg);  8  }  9 
10  @Override 11     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 12         if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) { 13             lastReadTime = ticksInNanos(); 14             reading = false; 15  } 16  ctx.fireChannelReadComplete(); 17     }

  3-4行,在設置了讀空閒超時或全部空閒超時的狀況下,會吧reading設置成true,表示當前正處於正在read的狀態。

  12-14行,在設置了讀空閒超時或全部空閒超時的狀況下, 若是當前正處於read狀態,把reading設置成false,同時更新最近一次執行read的時間。

 

write空閒超時檢查

 1     private final class WriterIdleTimeoutTask extends AbstractIdleTask {  2 
 3  @Override  4         protected void run(ChannelHandlerContext ctx) {  5 
 6             long lastWriteTime = IdleStateHandler.this.lastWriteTime;  7             long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);  8             if (nextDelay <= 0) {  9                 // Writer is idle - set a new timeout and notify the callback.
10                 writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS); 11 
12                 boolean first = firstWriterIdleEvent; 13                 firstWriterIdleEvent = false; 14 
15                 try { 16                     if (hasOutputChanged(ctx, first)) { 17                         return; 18  } 19 
20                     IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first); 21  channelIdle(ctx, event); 22                 } catch (Throwable t) { 23  ctx.fireExceptionCaught(t); 24  } 25             } else { 26                 // Write occurred before the timeout - set a new timeout with shorter delay.
27                 writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS); 28  } 29  } 30     }

 

  6-8行,檢查write空閒超時,和檢查read空閒超時相似。

  12-21行,若是write空閒超時,且outboundBuffer中的數據沒有變化, 觸發write空閒超時事件。

  這裏調用了hasOutputChanged方法檢查outboundBuffer中的數據是否有變化。筆者在<<netty源碼解解析(4.0)-15 Channel NIO實現:寫數據>>中分write實現時,已經講過,每一個Channel都以一個outboundBuffer, write的數據會先序列化成Byte流追加到outboundBuffer中,而後再從outboundBuffer中順序讀出Byte流執行真正的write操做。在Handler的write方法沒有被調用的狀況下,若是outboundBuffer中有數據,且數據發送了變化,表示正在執行真正的write操做,反之則意味着Channel處於不可寫的狀態,沒法執行真正的write操做。write空閒超時事件只會在write空閒超時且沒有執行真正write操做的時候纔會觸發。另外,這個檢查有個開關屬性,只有observeOutput==true時纔會檢查。

  

  AllIdleTimeoutTask的實現和WriterIdleTimeoutTask相似,只不過檢查超時的條件有些差異:read和write任何一個空閒超時都算超時。

 

ReadTimeoutHandler實現

  ReadTimeoutHandler繼承了IdleStateHandler類,它的功能是在觸發read空閒超時事件時觸發一個ReadTimeoutException異常,同時關閉Channel。 

 @Override protected final void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception { assert evt.state() == IdleState.READER_IDLE; readTimedOut(ctx); } /** * Is called when a read timeout was detected. */
    protected void readTimedOut(ChannelHandlerContext ctx) throws Exception { if (!closed) { ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE); ctx.close(); closed = true; } }

 

 

WriteTimeoutHandler實現

  WriteTimeoutHandler繼承了ChannelOutboundHandlerAdapter,它的功能是在觸發監視Channel的write調用超時,若是超時則關閉掉這個Channel。和ReadTimeoutHandler不一樣,它監控的不是空閒超時,而是Channel的write方法返回的Promise超時。

  首先在write時候,爲每一個Promise添加一個監控超時的延遲任務:

 @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { scheduleTimeout(ctx, promise); ctx.write(msg, promise); } private void scheduleTimeout(final ChannelHandlerContext ctx, final ChannelPromise promise) { // Schedule a timeout.
        final WriteTimeoutTask task = new WriteTimeoutTask(ctx, promise); task.scheduledFuture = ctx.executor().schedule(task, timeoutNanos, TimeUnit.NANOSECONDS); if (!task.scheduledFuture.isDone()) { addWriteTimeoutTask(task); // Cancel the scheduled timeout if the flush promise is complete.
 promise.addListener(task); } }

   而後,若是延遲任務執行的時候檢查到Promise超時,就觸發一個WriteTimeoutException異常,而後關閉掉這個Channel。

protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception { if (!closed) { ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE); ctx.close(); closed = true; } }

   WriteTimeoutTask類同時實現了Runnable和ChannelFutureListener接口,超時後會調用run方法。

 1  @Override  2         public void run() {  3             // Was not written yet so issue a write timeout  4             // The promise itself will be failed with a ClosedChannelException once the close() was issued  5             // See https://github.com/netty/netty/issues/2159
 6             if (!promise.isDone()) {  7                 try {  8  writeTimedOut(ctx);  9                 } catch (Throwable t) { 10  ctx.fireExceptionCaught(t); 11  } 12  } 13             removeWriteTimeoutTask(this); 14         }

  7-10行,promise沒有完成,觸發WriteTimeoutException或其餘異常。

      13行,write已經完成,刪除當前的WriteTimeoutTask對象。

    若是promise已經完成, 會調用operationComplete方法, 清理掉當前的WriteTimeoutTask對象。

 @Override public void operationComplete(ChannelFuture future) throws Exception { // scheduledFuture has already be set when reaching here
            scheduledFuture.cancel(false); removeWriteTimeoutTask(this); }

   

  

原文出處:https://www.cnblogs.com/brandonli/p/11252760.html

相關文章
相關標籤/搜索