netty源碼解解析(4.0)-13 Channel NIO實現: 關閉和清理

  Channel提供了3個方法用來實現關閉清理功能:disconnect,close,deregister。本章重點分析這個3個方法的功能的NIO實現。java

 

  disconnect實現: 斷開鏈接promise

  disconnect方法的調用棧以下:多線程

1 io.netty.channel.AbstractChannel#disconnect()
2 io.netty.channel.DefaultChannelPipeline#disconnect()
3 io.netty.channel.AbstractChannelHandlerContext#disconnect()
4 io.netty.channel.AbstractChannelHandlerContext#disconnect(io.netty.channel.ChannelPromise)
5 io.netty.channel.AbstractChannelHandlerContext#invokeDisconnect
6 io.netty.channel.DefaultChannelPipeline.HeadContext#disconnect
7 io.netty.channel.AbstractChannel.AbstractUnsafe#disconnect
8 io.netty.channel.socket.nio.NioSocketChannel#doDisconnect
9 io.netty.channel.socket.nio.NioSocketChannel#doClose

  disconnect稍微複雜一些, 在io.netty.channel.AbstractChannelHandlerContext#disconnect(io.netty.channel.ChannelPromise)實現中,會根據channel是否支持disconnect操做來決定下一步動做:socket

if (!channel().metadata().hasDisconnect()) {
    next.invokeClose(promise);
} else {
    next.invokeDisconnect(promise);
}   

  之因此這樣設計,是由於TCP和UDP的disconnect含義是不同的,對TCP來講disconnect就是關閉socket;對UDP來講,它沒有鏈接的概念,默認狀況下經過udp socket發送數據須要指定遠程地址,但若是調用connect以後,就不需指定這個地址,數據報會被髮送到connect指定的地址上,disconnect含義是刪除connect指定的地址,發送數據時必須指定地址。因此在NIO的Channel實現中,TCP的disconnect是調用socket的close方法,UDP的disconnect是調用socket的disconnect方法,下面是兩種不一樣的disconnect實現。ide

//TCP io.netty.channel.socket.nio.NioSocketChannel#doDisconnect
@Override
protected void doDisconnect() throws Exception {
    doClose();
}
@Override
protected void doClose() throws Exception {
    super.doClose();
    javaChannel().close();
}
//UDP io.netty.channel.socket.nio.NioDatagramChannel#doDisconnect
@Override
protected void doDisconnect() throws Exception {
    javaChannel().disconnect();
}

  io.netty.channel.AbstractChannel.AbstractUnsafe#disconnect實現了disconnect的邏輯,先調用doDisconnect方法,這個方法是io.netty.channel.AbstractChannel定義的的抽象方法。若是channel的狀態從active變成inactive,就調用pipeline的fireChannelInactive方法觸發channelInactive事件。oop

  

  close實現: 關閉channelthis

  close方法的調用棧:spa

io.netty.channel.AbstractChannel#close()
io.netty.channel.DefaultChannelPipeline#close()
io.netty.channel.AbstractChannelHandlerContext#close()
io.netty.channel.AbstractChannelHandlerContext#close(io.netty.channel.ChannelPromise)
io.netty.channel.AbstractChannelHandlerContext#invokeClose
io.netty.channel.DefaultChannelPipeline.HeadContext#close
io.netty.channel.AbstractChannel.AbstractUnsafe#close(io.netty.channel.ChannelPromise)
io.netty.channel.AbstractChannel.AbstractUnsafe#close(final ChannelPromise promise, final Throwable cause ,final ClosedChannelException closeCause, final boolean notify)
io.netty.channel.AbstractChannel.AbstractUnsafe#doClose0
io.netty.channel.socket.nio.NioSocketChannel#doClose  

  close的邏輯實如今io.netty.channel.AbstractChannel.AbstractUnsafe#close(final ChannelPromise promise, final Throwable cause ,final ClosedChannelException closeCause, final boolean notify)中,這個close方法主要實現了一下幾個功能:.net

  1. 確保在多線程環境下,屢次調用close和一次調用的影響一致,而且能夠經過promis獲得一樣的結果。
  2. 保證在執行close的過程當中,不能向channel寫數據。
  3. 調用doClose0執行執真正的close操做。
  4. 調用deregister對channel作最後的清理工做,並觸發channelInactive, channelUnregistered事件。

  如下是這個方法的代碼:線程

 1 private void close(final ChannelPromise promise, final Throwable cause,
 2                    final ClosedChannelException closeCause, final boolean notify) {
 3     if (!promise.setUncancellable()) {
 4         return;
 5     }
 6 
 7     if (closeInitiated) {
 8         if (closeFuture.isDone()) {
 9             // Closed already.
10             safeSetSuccess(promise);
11         } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
12             // This means close() was called before so we just register a listener and return
13             closeFuture.addListener(new ChannelFutureListener() {
14                 @Override
15                 public void operationComplete(ChannelFuture future) throws Exception {
16                     promise.setSuccess();
17                 }
18             });
19         }
20         return;
21     }
22 
23     closeInitiated = true;
24 
25     final boolean wasActive = isActive();
26     final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
27     this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
28     Executor closeExecutor = prepareToClose();
29     if (closeExecutor != null) {
30         closeExecutor.execute(new Runnable() {
31             @Override
32             public void run() {
33                 try {
34                     // Execute the close.
35                     doClose0(promise);
36                 } finally {
37                     // Call invokeLater so closeAndDeregister is executed in the EventLoop again!
38                     invokeLater(new Runnable() {
39                         @Override
40                         public void run() {
41                             if (outboundBuffer != null) {
42                                 // Fail all the queued messages
43                                 outboundBuffer.failFlushed(cause, notify);
44                                 outboundBuffer.close(closeCause);
45                             }
46                             fireChannelInactiveAndDeregister(wasActive);
47                         }
48                     });
49                 }
50             }
51         });
52     } else {
53         try {
54             // Close the channel and fail the queued messages in all cases.
55             doClose0(promise);
56         } finally {
57             if (outboundBuffer != null) {
58                 // Fail all the queued messages.
59                 outboundBuffer.failFlushed(cause, notify);
60                 outboundBuffer.close(closeCause);
61             }
62         }
63         if (inFlush0) {
64             invokeLater(new Runnable() {
65                 @Override
66                 public void run() {
67                     fireChannelInactiveAndDeregister(wasActive);
68                 }
69             });
70         } else {
71             fireChannelInactiveAndDeregister(wasActive);
72         }
73     }
74 }

   7-23行,在這個方法被屢次調用的時候,只有一次能夠執行的21行之後的代碼。從代碼看,這一點是用closeInitiated屬性來保證的,但它是一個普通boolean類型的屬性,在多線程狀況下存在可見性問題。事實上一個channel unsafe實例的close方法,只會在一個線程中執行,closeInitiated只在這個方法中使用,所以不存在多線程間的可見性問題。雖然可能在多個不一樣的線程中屢次調用Channel的close方法,可是這個close方法,只會在channel的eventLoop線程中執行。凡是經過io.netty.channel.DefaultChannelPipeline.HeadContext調用的channel unsafe方法,都必定在channel的eventLoop線程中執行。

  26,27行,把channel unsafe的outboundBuffer設置爲null,  這樣,在close的過程當中,全部channel的write方法都會經過promise返回錯誤。

  28行,prepareToClose默認實現是返回null, 它是一個protected方法,能夠根據須要覆蓋它,用來在關閉以前作一些準備工做,同時指定一個executor,讓接下來的關閉動做都在這個executor中執行。

  33-49行,53-72行,這兩段代碼實現的都是功能都是同樣的,不一樣的是33-49行在prepareToClose提供的executor中執行。調用doClose0執行關閉操做,清理outboundBuffer(43,44),  調用fireChannelInactiveAndDeregister(46)觸發channelInactive和channelDeregister事件。63-72行,經過inFlush0屬性檢查當前是否正在進程flush操做,若是是,使用invokerLater確保在當前方法和flush操做完成以後再觸發事件。

  doClose0中是真正的關閉操做,它先調用doClose,而後設置promise的返回值:

 1 //io.netty.channel.AbstractChannel.AbstractUnsafe#doClose0
 2 private void doClose0(ChannelPromise promise) {
 3     try {
 4         doClose();
 5         closeFuture.setClosed();
 6         safeSetSuccess(promise);
 7     } catch (Throwable t) {
 8         closeFuture.setClosed();
 9         safeSetFailure(promise, t);
10     }
11 }
12 //io.netty.channel.socket.nio.NioSocketChannel#doClose
13 @Override
14 protected void doClose() throws Exception {
15     super.doClose();
16     javaChannel().close();
17 }

   fireChannelInactiveAndDeregister是調用deregister實現,也就是說,正常狀況下,調用Channel的close方法以後就會自動完成一個channel最後的清理工做,不須要再調用deregister方法。

1 private void fireChannelInactiveAndDeregister(final boolean wasActive) {
2     deregister(voidPromise(), wasActive && !isActive());
3 }

 

  deregister實現:從eventLoop中註銷channel

  deregister的調用棧:

1 io.netty.channel.AbstractChannel#deregister()
2 io.netty.channel.DefaultChannelPipeline#deregister()
3 io.netty.channel.AbstractChannelHandlerContext#deregister()
4 io.netty.channel.AbstractChannelHandlerContext#deregister(io.netty.channel.ChannelPromise)
5 io.netty.channel.AbstractChannelHandlerContext#invokeDeregister
6 io.netty.channel.DefaultChannelPipeline.HeadContext#deregister
7 io.netty.channel.AbstractChannel.AbstractUnsafe#deregister(io.netty.channel.ChannelPromise)
8 io.netty.channel.AbstractChannel.AbstractUnsafe#deregister(io.netty.channel.ChannelPromise, boolean)
9 io.netty.channel.nio.AbstractNioChannel#doDeregister

   deregister的邏輯在中實現io.netty.channel.AbstractChannel.AbstractUnsafe#deregister(final ChannelPromise promise, final boolean fireChannelInactive),這個方法的實現比較簡單,主要就是調用doDeregister方法執行deregister操做,而後觸發channelInactive事件(若是fireChannelInactive參數是true)和channelUnregistered事件。

private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
    if (!promise.setUncancellable()) {
        return;
    }
    if (!registered) {
        safeSetSuccess(promise);
        return;
    }
    invokeLater(new Runnable() {
        @Override
        public void run() {
            try {
                doDeregister();
            } catch (Throwable t) {
                logger.warn("Unexpected exception occurred while deregistering a channel.", t);
            } finally {
                if (fireChannelInactive) {
                    pipeline.fireChannelInactive();
                }
                if (registered) {
                    registered = false;
                    pipeline.fireChannelUnregistered();
                }
                safeSetSuccess(promise);
            }
        }
    });
}

  這裏使用invokeLater執行主要邏輯的目的是爲了保證把當前正在eventLoop隊列中全部任何都執行完以後再執行真正的deregister操做。

  doDeregister默認實現是空,什麼都沒作,它是個protected方法。真正的實如今io.netty.channel.nio.AbstractNioChannel中,它只是簡單地調用eventLoop的cancel方法把SocketChannel對應的SelectionKey從Selector中刪除,這樣selector就不會監聽到這個socket上的任何事件了。

1 @Override
2 protected void doDeregister() throws Exception {
3     eventLoop().cancel(selectionKey());
4 }
相關文章
相關標籤/搜索