原由是一個朋友問個人一個關於 ServerBootstrap 啓動的問題.
相關 issuejava
他的問題我複述一下:
ServerBootstrap 的綁定流程以下:git
ServerBootstrap.bind -> AbstractBootstrap.bind -> AbstractBootstrap.doBind -> AbstractBootstrap.initAndRegister -> AbstractChannel#AbstractUnsafe.register -> eventLoop.execute( () -> AbstractUnsafe.register0) doBind0() -> channel.eventLoop().execute( () -> channel.bind) -> AbstractUnsafe.bind
在 AbstractUnsafe.register0 中可能會調用 pipeline.fireChannelActive(), 即:github
private void register0(ChannelPromise promise) { try { ... boolean firstRegistration = neverRegistered; doRegister(); ... if (firstRegistration && isActive()) { pipeline.fireChannelActive(); } } catch (Throwable t) { ... } }
而且在 AbstractUnsafe.bind 中也會有 pipeline.fireChannelActive() 的調用, 即:segmentfault
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { ... boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { ... } if (!wasActive && isActive()) { invokeLater(new OneTimeTask() { @Override public void run() { pipeline.fireChannelActive(); } }); } ... }
那麼有沒有可能形成了兩次的 pipeline.fireChannelActive() 調用?promise
個人回答是不會. 爲何呢? 對於直接想知道答案的朋友能夠直接閱讀到最後面的 回答 與 總結 兩節..異步
下面咱們就來根據代碼詳細分析一下.ide
首先, 根據咱們上面所列出的調用流程, 會有 AbstractBootstrap.doBind 的調用, 它的代碼以下:oop
private ChannelFuture doBind(final SocketAddress localAddress) { // 步驟1 final ChannelFuture regFuture = initAndRegister(); ... // 步驟2 if (regFuture.isDone()) { ... doBind0(regFuture, channel, localAddress, promise); ... } else { regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { ... doBind0(regFuture, channel, localAddress, promise); } }); } }
首先在 doBind 中, 執行步驟1, 即調用 initAndRegister 方法, 這個方法會最終調用到AbstractChannel#AbstractUnsafe.register. 而在 AbstractChannel#AbstractUnsafe.register 中, 會經過 eventLoop.execute 的形式將 AbstractUnsafe.register0 的調用提交到任務隊列中(即提交到 eventLoop 線程中, 而當前代碼所在的線程是 main 線程), 即:this
Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { // 當前線程是主線程, 所以這個判斷是 false if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new OneTimeTask() { @Override public void run() { // register0 在 eventLoop 線程中執行. register0(promise); } }); } catch (Throwable t) { ... } } }
接着 AbstractBootstrap.initAndRegister 返回, 回到 AbstractBootstrap.doBind 中, 因而執行到步驟2. 注意, 由於 AbstractUnsafe.register0 是在 eventLoop 中執行的, 所以有可能主線程執行到步驟2 時, AbstractUnsafe.register0 已經執行完畢了, 此時必然有 regFuture.isDone() == true; 但也有可能 AbstractUnsafe.register0 沒有來得及執行, 所以此時 regFuture.isDone() == false. 因此上面的步驟2 考慮到了這兩種狀況, 所以分別針對這兩種狀況作了區分, 即:線程
// 步驟2 if (regFuture.isDone()) { ... doBind0(regFuture, channel, localAddress, promise); ... } else { regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { ... doBind0(regFuture, channel, localAddress, promise); } }); }
通常狀況下, regFuture.isDone() 爲 false, 由於綁定操做是比較費時的, 所以很大概率會執行到 else 分支, 而且 if 分支和 else 分支從結果上說沒有不一樣, 並且 if 分支邏輯還更簡單一些, 所以咱們以 else 分支來分析吧. 在 else 分支中, 會爲 regFuture 設置一個回調監聽器. regFuture 是一個 ChannelFuture
, 而 ChannelFuture
表明了一個 Channel 的異步 IO 的操做結果, 所以這裏 regFuture
表明了 Channel 註冊(register)
的這個異步 IO 的操做結果.
Netty 這裏之因此要爲 regFuture
設置一個回調監聽器, 是爲了保證 register 和 bind 的時序上的正確性: Channel 的註冊必需要發生在 Channel 的綁定以前
.
(關於時序的正確性的問題, 咱們在後面有證實)
接下來咱們來看一下 AbstractUnsafe.register0
方法:
private void register0(ChannelPromise promise) { try { .... // neverRegistered 一開始是 true, 所以 firstRegistration == true boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. // firstRegistration == true, 而 isActive() == false, // 所以不會執行到 pipeline.fireChannelActive() if (firstRegistration && isActive()) { pipeline.fireChannelActive(); } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
注意, 我須要再強調一下, 這裏 AbstractUnsafe.register0 是在 eventLoop 中執行的.
AbstractUnsafe.register0 中會調用 doRegister() 註冊 NioServerSocketChannel, 而後調用 safeSetSuccess() 設置 promise
的狀態爲成功. 而這個 promise
變量是什麼呢? 我將 AbstractBootstrap.doBind 的調用鏈寫詳細一些:
AbstractBootstrap.doBind -> AbstractBootstrap.initAndRegister -> MultithreadEventLoopGroup.register -> SingleThreadEventLoop.register -> AbstractChannel#AbstractUnsafe.register -> eventLoop.execute( () -> AbstractUnsafe.register0)
在 SingleThreadEventLoop.register 中會實例化一個 DefaultChannelPromise, 即:
@Override public ChannelFuture register(Channel channel) { return register(channel, new DefaultChannelPromise(channel, this)); }
接着調用重載的 SingleThreadEventLoop.register 方法:
@Override public ChannelFuture register(final Channel channel, final ChannelPromise promise) { if (channel == null) { throw new NullPointerException("channel"); } if (promise == null) { throw new NullPointerException("promise"); } channel.unsafe().register(this, promise); return promise; }
咱們看到, 實例化的 DefaultChannelPromise 最終會以方法返回值的方式返回到調用方, 即返回到 AbstractBootstrap.doBind 中:
final ChannelFuture regFuture = initAndRegister();
所以咱們這裏有一個共識: regFuture 是一個在 SingleThreadEventLoop.register 中實例化的 DefaultChannelPromise
對象.
再回到 SingleThreadEventLoop.register 中, 在這裏會調用 channel.unsafe().register(this, promise), 將 promise 對象傳遞到 AbstractChannel#AbstractUnsafe.register 中, 所以在 AbstractUnsafe.register0 中的 promise
就是 AbstractBootstrap.doBind 中的 regFuture
.
promise == regFuture
很關鍵.
既然咱們已經肯定了 promise
的身份, 那麼調用的 safeSetSuccess(promise); 咱們也知道是幹嗎的了. safeSetSuccess 方法設置一個 Promise 的狀態爲成功態
, 而 Promise 的 成功態
是最終狀態, 即此時 promise.isDone() == true
. 那麼 設置 promise 爲成功態
後, 會發生什麼呢?
還記得不 promise == regFuture
, 而咱們在 AbstractBootstrap.doBind 的 else 分支中設置了一個回調監聽器:
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.executor = channel.eventLoop(); } doBind0(regFuture, channel, localAddress, promise); } });
所以當 safeSetSuccess(promise); 調用時, 根據 Netty 的 Promise/Future 機制, 會觸發上面的 operationComplete 回調, 在回調中調用 doBind0
方法:
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
注意到, 有一個關鍵的地方, 代碼中將 **channel.bind** 的調用放到了 eventLoop 中執行
. doBind0 返回後, 代碼繼續執行 AbstractUnsafe.register0
方法的剩餘部分代碼, 即:
private void register0(ChannelPromise promise) { try { .... safeSetSuccess(promise); // safeSetSuccess 返回後, 繼續執行以下代碼 pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. // firstRegistration == true, 而 isActive() == false, // 所以不會執行到 pipeline.fireChannelActive() if (firstRegistration && isActive()) { pipeline.fireChannelActive(); } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
當 AbstractUnsafe.register0
方法執行完畢後, 才執行到 channel.bind 方法.
而 channel.bind 方法最終會調用到 AbstractChannel#AbstractUnsafe.bind 方法, 源碼以下:
@Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { boolean wasActive = isActive(); logger.info("---wasActive: {}---", wasActive); try { // 調用 NioServerSocketChannel.bind 方法, // 將底層的 Java NIO SocketChannel 綁定到指定的端口. // 當 SocketChannel 綁定到端口後, isActive() 才爲真. doBind(localAddress); } catch (Throwable t) { ... } boolean activeNow = isActive(); logger.info("---activeNow: {}---", activeNow); // 這裏 wasActive == false // isActive() == true if (!wasActive && isActive()) { invokeLater(new OneTimeTask() { @Override public void run() { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); }
上面的代碼中, 調用了 doBind(localAddress) 將底層的 Java NIO SocketChannel 綁定到指定的端口. 而且當 SocketChannel 綁定到端口後, isActive() 才爲真.
所以咱們知道, 若是 SocketChannel 第一次綁定時, 在調用 doBind 前, wasActive == false == isActive()
, 而當調用了 doBind 後, isActive() == true
, 所以第一次綁定端口時, if 判斷成立, 會調用 pipeline.fireChannelActive().
咱們在前的分析中, 直接認定了 Channel 註冊
在 Channel 的綁定
以前完成, 那麼依據是什麼呢?
其實全部的關鍵在於 EventLoop 的任務隊列機制.
不要閒我囉嗦哦. 咱們須要繼續回到 AbstractUnsafe.register0
的調用中(再次強調一下, 在 eventLoop 線程中執行AbstractUnsafe.register0), 這個方法咱們已經分析了, 它會調用 safeSetSuccess(promise), 並由 Netty 的 Promise/Future 機制, 致使了AbstractBootstrap.doBind 中的 regFuture 所設置的回調監聽器的 operationComplete
方法調用, 而 operationComplete
中調用了 AbstractBootstrap.doBind0:
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
在 doBind0 中, 根據 EventLoop 的任務隊列機制, 會使用 eventLoop().execute 將 channel.bind 封裝爲一個 Task, 放到 eventLoop 的 taskQueue 中.
以下用一幅圖表示上面的過程:
點此下載原圖
而當 channel.bind 被調度時, AbstractUnsafe.register0
早就已經調用結束了.
所以因爲 EventLoop 的任務隊列機制, 咱們知道, 在執行 AbstractUnsafe.register0 時, 是在 EventLoop 線程中的, 而 channel.bind 的調用是以 task 的形式添加到 taskQueue 隊列的末尾, 所以必然是有 EventLoop 線程先執行完 AbstractUnsafe.register0 方法後, 纔有機會從 taskQueue 中取出一個 task 來執行, 所以這個機制從根本上保證了 Channel 註冊發生在綁定
以前.
你的疑惑是, AbstractChannel#AbstractUnsafe.register0 中, 可能會調用 pipeline.fireChannelActive(), 即:
private void register0(ChannelPromise promise) { try { ... boolean firstRegistration = neverRegistered; doRegister(); ... if (firstRegistration && isActive()) { pipeline.fireChannelActive(); } } catch (Throwable t) { ... } }
而且在 AbstractChannel#AbstractUnsafe.bind 中也可能會調用到pipeline.fireChannelActive(), 即:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { ... boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { ... } if (!wasActive && isActive()) { invokeLater(new OneTimeTask() { @Override public void run() { pipeline.fireChannelActive(); } }); } ... }
我以爲是 不會
. 由於根據上面咱們分析的結果可知, Netty 的 Promise/Future 與 EventLoop 的任務隊列機制保證了 NioServerSocketChannel 的註冊和 Channel 的綁定的時序: Channel 的註冊必需要發生在 Channel 的綁定以前
, 而當一個 NioServerSocketChannel 沒有綁定到具體的端口前, 它是不活躍的(Inactive)
, 進而在 register0
中, if (firstRegistration && isActive()) 就不成立, 所以就不會執行到 pipeline.fireChannelActive() 了.
而執行完註冊操做後, 在 AbstractChannel#AbstractUnsafe.bind 纔會調用pipeline.fireChannelActive(), 所以最終只有一次 fireChannelActive 調用.
有兩點須要注意的: