Channel提供了3個方法用來實現關閉清理功能:disconnect,close,deregister。本章重點分析這個3個方法的功能的NIO實現。java
disconnect實現: 斷開鏈接promise
disconnect方法的調用棧以下:多線程
1 io.netty.channel.AbstractChannel#disconnect() 2 io.netty.channel.DefaultChannelPipeline#disconnect() 3 io.netty.channel.AbstractChannelHandlerContext#disconnect() 4 io.netty.channel.AbstractChannelHandlerContext#disconnect(io.netty.channel.ChannelPromise) 5 io.netty.channel.AbstractChannelHandlerContext#invokeDisconnect 6 io.netty.channel.DefaultChannelPipeline.HeadContext#disconnect 7 io.netty.channel.AbstractChannel.AbstractUnsafe#disconnect 8 io.netty.channel.socket.nio.NioSocketChannel#doDisconnect 9 io.netty.channel.socket.nio.NioSocketChannel#doClose
disconnect稍微複雜一些, 在io.netty.channel.AbstractChannelHandlerContext#disconnect(io.netty.channel.ChannelPromise)實現中,會根據channel是否支持disconnect操做來決定下一步動做:socket
if (!channel().metadata().hasDisconnect()) { next.invokeClose(promise); } else { next.invokeDisconnect(promise); }
之因此這樣設計,是由於TCP和UDP的disconnect含義是不同的,對TCP來講disconnect就是關閉socket;對UDP來講,它沒有鏈接的概念,默認狀況下經過udp socket發送數據須要指定遠程地址,但若是調用connect以後,就不需指定這個地址,數據報會被髮送到connect指定的地址上,disconnect含義是刪除connect指定的地址,發送數據時必須指定地址。因此在NIO的Channel實現中,TCP的disconnect是調用socket的close方法,UDP的disconnect是調用socket的disconnect方法,下面是兩種不一樣的disconnect實現。ide
//TCP io.netty.channel.socket.nio.NioSocketChannel#doDisconnect @Override protected void doDisconnect() throws Exception { doClose(); } @Override protected void doClose() throws Exception { super.doClose(); javaChannel().close(); } //UDP io.netty.channel.socket.nio.NioDatagramChannel#doDisconnect @Override protected void doDisconnect() throws Exception { javaChannel().disconnect(); }
io.netty.channel.AbstractChannel.AbstractUnsafe#disconnect實現了disconnect的邏輯,先調用doDisconnect方法,這個方法是io.netty.channel.AbstractChannel定義的的抽象方法。若是channel的狀態從active變成inactive,就調用pipeline的fireChannelInactive方法觸發channelInactive事件。oop
close實現: 關閉channelthis
close方法的調用棧:spa
io.netty.channel.AbstractChannel#close() io.netty.channel.DefaultChannelPipeline#close() io.netty.channel.AbstractChannelHandlerContext#close() io.netty.channel.AbstractChannelHandlerContext#close(io.netty.channel.ChannelPromise) io.netty.channel.AbstractChannelHandlerContext#invokeClose io.netty.channel.DefaultChannelPipeline.HeadContext#close io.netty.channel.AbstractChannel.AbstractUnsafe#close(io.netty.channel.ChannelPromise) io.netty.channel.AbstractChannel.AbstractUnsafe#close(final ChannelPromise promise, final Throwable cause ,final ClosedChannelException closeCause, final boolean notify) io.netty.channel.AbstractChannel.AbstractUnsafe#doClose0 io.netty.channel.socket.nio.NioSocketChannel#doClose
close的邏輯實如今io.netty.channel.AbstractChannel.AbstractUnsafe#close(final ChannelPromise promise, final Throwable cause ,final ClosedChannelException closeCause, final boolean notify)中,這個close方法主要實現了一下幾個功能:.net
如下是這個方法的代碼:線程
1 private void close(final ChannelPromise promise, final Throwable cause, 2 final ClosedChannelException closeCause, final boolean notify) { 3 if (!promise.setUncancellable()) { 4 return; 5 } 6 7 if (closeInitiated) { 8 if (closeFuture.isDone()) { 9 // Closed already. 10 safeSetSuccess(promise); 11 } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise. 12 // This means close() was called before so we just register a listener and return 13 closeFuture.addListener(new ChannelFutureListener() { 14 @Override 15 public void operationComplete(ChannelFuture future) throws Exception { 16 promise.setSuccess(); 17 } 18 }); 19 } 20 return; 21 } 22 23 closeInitiated = true; 24 25 final boolean wasActive = isActive(); 26 final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; 27 this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. 28 Executor closeExecutor = prepareToClose(); 29 if (closeExecutor != null) { 30 closeExecutor.execute(new Runnable() { 31 @Override 32 public void run() { 33 try { 34 // Execute the close. 35 doClose0(promise); 36 } finally { 37 // Call invokeLater so closeAndDeregister is executed in the EventLoop again! 38 invokeLater(new Runnable() { 39 @Override 40 public void run() { 41 if (outboundBuffer != null) { 42 // Fail all the queued messages 43 outboundBuffer.failFlushed(cause, notify); 44 outboundBuffer.close(closeCause); 45 } 46 fireChannelInactiveAndDeregister(wasActive); 47 } 48 }); 49 } 50 } 51 }); 52 } else { 53 try { 54 // Close the channel and fail the queued messages in all cases. 55 doClose0(promise); 56 } finally { 57 if (outboundBuffer != null) { 58 // Fail all the queued messages. 59 outboundBuffer.failFlushed(cause, notify); 60 outboundBuffer.close(closeCause); 61 } 62 } 63 if (inFlush0) { 64 invokeLater(new Runnable() { 65 @Override 66 public void run() { 67 fireChannelInactiveAndDeregister(wasActive); 68 } 69 }); 70 } else { 71 fireChannelInactiveAndDeregister(wasActive); 72 } 73 } 74 }
7-23行,在這個方法被屢次調用的時候,只有一次能夠執行的21行之後的代碼。從代碼看,這一點是用closeInitiated屬性來保證的,但它是一個普通boolean類型的屬性,在多線程狀況下存在可見性問題。事實上一個channel unsafe實例的close方法,只會在一個線程中執行,closeInitiated只在這個方法中使用,所以不存在多線程間的可見性問題。雖然可能在多個不一樣的線程中屢次調用Channel的close方法,可是這個close方法,只會在channel的eventLoop線程中執行。凡是經過io.netty.channel.DefaultChannelPipeline.HeadContext調用的channel unsafe方法,都必定在channel的eventLoop線程中執行。
26,27行,把channel unsafe的outboundBuffer設置爲null, 這樣,在close的過程當中,全部channel的write方法都會經過promise返回錯誤。
28行,prepareToClose默認實現是返回null, 它是一個protected方法,能夠根據須要覆蓋它,用來在關閉以前作一些準備工做,同時指定一個executor,讓接下來的關閉動做都在這個executor中執行。
33-49行,53-72行,這兩段代碼實現的都是功能都是同樣的,不一樣的是33-49行在prepareToClose提供的executor中執行。調用doClose0執行關閉操做,清理outboundBuffer(43,44), 調用fireChannelInactiveAndDeregister(46)觸發channelInactive和channelDeregister事件。63-72行,經過inFlush0屬性檢查當前是否正在進程flush操做,若是是,使用invokerLater確保在當前方法和flush操做完成以後再觸發事件。
doClose0中是真正的關閉操做,它先調用doClose,而後設置promise的返回值:
1 //io.netty.channel.AbstractChannel.AbstractUnsafe#doClose0 2 private void doClose0(ChannelPromise promise) { 3 try { 4 doClose(); 5 closeFuture.setClosed(); 6 safeSetSuccess(promise); 7 } catch (Throwable t) { 8 closeFuture.setClosed(); 9 safeSetFailure(promise, t); 10 } 11 } 12 //io.netty.channel.socket.nio.NioSocketChannel#doClose 13 @Override 14 protected void doClose() throws Exception { 15 super.doClose(); 16 javaChannel().close(); 17 }
fireChannelInactiveAndDeregister是調用deregister實現,也就是說,正常狀況下,調用Channel的close方法以後就會自動完成一個channel最後的清理工做,不須要再調用deregister方法。
1 private void fireChannelInactiveAndDeregister(final boolean wasActive) { 2 deregister(voidPromise(), wasActive && !isActive()); 3 }
deregister實現:從eventLoop中註銷channel
deregister的調用棧:
1 io.netty.channel.AbstractChannel#deregister() 2 io.netty.channel.DefaultChannelPipeline#deregister() 3 io.netty.channel.AbstractChannelHandlerContext#deregister() 4 io.netty.channel.AbstractChannelHandlerContext#deregister(io.netty.channel.ChannelPromise) 5 io.netty.channel.AbstractChannelHandlerContext#invokeDeregister 6 io.netty.channel.DefaultChannelPipeline.HeadContext#deregister 7 io.netty.channel.AbstractChannel.AbstractUnsafe#deregister(io.netty.channel.ChannelPromise) 8 io.netty.channel.AbstractChannel.AbstractUnsafe#deregister(io.netty.channel.ChannelPromise, boolean) 9 io.netty.channel.nio.AbstractNioChannel#doDeregister
deregister的邏輯在中實現io.netty.channel.AbstractChannel.AbstractUnsafe#deregister(final ChannelPromise promise, final boolean fireChannelInactive),這個方法的實現比較簡單,主要就是調用doDeregister方法執行deregister操做,而後觸發channelInactive事件(若是fireChannelInactive參數是true)和channelUnregistered事件。
private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) { if (!promise.setUncancellable()) { return; } if (!registered) { safeSetSuccess(promise); return; } invokeLater(new Runnable() { @Override public void run() { try { doDeregister(); } catch (Throwable t) { logger.warn("Unexpected exception occurred while deregistering a channel.", t); } finally { if (fireChannelInactive) { pipeline.fireChannelInactive(); } if (registered) { registered = false; pipeline.fireChannelUnregistered(); } safeSetSuccess(promise); } } }); }
這裏使用invokeLater執行主要邏輯的目的是爲了保證把當前正在eventLoop隊列中全部任何都執行完以後再執行真正的deregister操做。
doDeregister默認實現是空,什麼都沒作,它是個protected方法。真正的實如今io.netty.channel.nio.AbstractNioChannel中,它只是簡單地調用eventLoop的cancel方法把SocketChannel對應的SelectionKey從Selector中刪除,這樣selector就不會監聽到這個socket上的任何事件了。
1 @Override 2 protected void doDeregister() throws Exception { 3 eventLoop().cancel(selectionKey()); 4 }