永順大牛寫的系列教程《源碼之下無祕密 ── 作最好的 Netty 源碼分析教程》是目前我讀過最好的netty源碼分析文章。但不知道什麼緣由,做者在寫到第三章的時候停更了。所以,我想嘗試憑着我的的理解,續寫後邊幾個章節。java
永順前輩已經寫完章節有以下:git
續寫章節:github
本文使用的netty版本爲4.1.33編程
Netty內部的io.netty.util.concurrent.Future<V> 繼承自java.util.concurrent.Future<V>,而Promise<V>是前者的一個特殊實現。
bootstrap
Java併發編程包下提供了Future<V>接口。Future在異步編程中表示該異步操做的結果,經過Future<V>的內部方法能夠實現狀態檢查、取消執行、獲取執行結果等操做。內部的方法以下:segmentfault
// 嘗試取消執行 boolean cancel(boolean mayInterruptIfRunning); // 是否已經被取消執行 boolean isCancelled(); // 是否已經執行完畢 boolean isDone(); // 阻塞獲取執行結果 V get() throws InterruptedException, ExecutionException; // 阻塞獲取執行結果或超時後返回 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
原生的Future<V>功能比較有限,Netty擴展了Future<V>並增長了如下方法:數組
// 判斷是否執行成功 boolean isSuccess(); // 判斷是否能夠取消執行 boolean isCancellable();
Throwable cause();
// 增長回調方法 Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); // 增長多個回調方法 Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); // 刪除回調方法 Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); // 刪除多個回調方法 Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 阻塞等待,且若是失敗拋出異常 Future<V> sync() throws InterruptedException; // 同上,區別是不可中斷阻塞等待過程 Future<V> syncUninterruptibly(); // 阻塞等待 Future<V> await() throws InterruptedException; // 同上,區別是不可中斷阻塞等待過程 Future<V> awaitUninterruptibly();
Promise<V>接口繼續繼承了Future<V>,並增長若干個設置狀態並回調的方法:promise
// 設置成功狀態並回調 Promise<V> setSuccess(V result); boolean trySuccess(V result); // 設置失敗狀態並回調 Promise<V> setFailure(Throwable cause); boolean tryFailure(Throwable cause); // 設置爲不可取消狀態 boolean setUncancellable();
可見,Promise<V>做爲一個特殊的Future<V>,只是增長了一些狀態設置方法。因此它經常使用於傳入I/O業務代碼中,用於I/O結束後設置成功(或失敗)狀態,並回調方法。緩存
以客戶端鏈接的註冊過程爲例,調用鏈路以下:服務器
io.netty.bootstrap.Bootstrap.connect() --> io.netty.bootstrap.Bootstrap.doResolveAndConnect() ---->io.netty.bootstrap.AbstractBootstrap.initAndRegister() ------>io.netty.channel.MultithreadEventLoopGroup.register() -------->io.netty.channel.SingleThreadEventLoop.register()
一直跟蹤到SingleThreadEventLoop中,會看到這段代碼:
@Override public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); }
此處新建了一個DefaultChannelPromise,構造函數傳入了當前的channel以及當前所在的線程this。從第一節的類圖咱們知道,DefaultChannelPromise同時實現了Future和Promise,具備上述提到的全部方法。
而後繼續將該promise傳遞進另一個register方法中:
@Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }
在該register方法中,繼續將promise傳遞到Unsafe的register方法中,而當即返回了以ChannelFuture的形式返回了該promise。顯然這裏是一個異步回調處理:上層的業務能夠拿到返回的ChannelFuture阻塞等待結果或者設置回調方法,而繼續往下傳的Promise能夠用於設置執行狀態而且回調設置的方法。
咱們繼續往下debug能夠看到:
// io.netty.channel.AbstractChannel.AbstractUnsafe.java @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { // 若是已經註冊過,則置爲失敗 promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { // 若是線程類型不兼容,則置爲失敗 promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); // 出現異常狀況置promise爲失敗 safeSetFailure(promise, t); } } } private void register0(ChannelPromise promise) { try { // 註冊以前,先將promise置爲不可取消轉態 if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; pipeline.invokeHandlerAddedIfNeeded(); // promise置爲成功 safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); // 出現異常狀況置promise爲失敗 safeSetFailure(promise, t); } }
可見,底層的I/O操做成功與否均可以經過Promise設置狀態,並使得外層的ChannelFuture能夠感知獲得I/O操做的結果。
咱們再來看看被返回的ChannelFuture的用途:
// io.netty.bootstrap.AbstractBootstrap.java final ChannelFuture initAndRegister() { //... ChannelFuture regFuture = config().group().register(channel); // 若是異常不爲null,則意味着底層的I/O已經失敗,而且promise設置了失敗異常 if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
這裏經過檢查失敗異常棧是否爲空,能夠提早檢查到I/O是否失敗。繼續回溯,還能夠看到:
// io.netty.bootstrap.AbstractBootstrap.java private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // 若是註冊已經成功 ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // 若是註冊還沒有完成 // ... } }
此處,經過ChannelFuture#isDone()方法能夠知道底層的註冊是否完成,若是完成,則繼續進行bind操做。
可是由於註冊是個異步操做,若是此時註冊可能還沒完成,那就會進入以下邏輯:
// io.netty.bootstrap.AbstractBootstrap.java //... else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; }
這裏新建了一個新的PendingRegistrationPromise,併爲原來的ChannelFuture對象添加了一個回調方法,並在回調中更改PendingRegistrationPromise的狀態,並且PendingRegistrationPromise會繼續被傳遞到上層。當底層的Promise狀態被設置而且回調,就會進入該回調方法。從而將I/O狀態繼續向外傳遞。
咱們已經瞭解清楚了Promise和Future的異步模型。再來看看底層是如何實現的。以最經常使用的DefaultChannelPromise爲例,內部很是簡單,咱們主要看它的父類DefaultPromise:
// result字段的原子更新器 @SuppressWarnings("rawtypes") private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result"); // 緩存執行結果的字段 private volatile Object result; // promise所在的線程 private final EventExecutor executor; // 一個或者多個回調方法 private Object listeners; // 阻塞線程數量計數器 private short waiters;
以設置成功狀態爲例(setSuccess):
@Override public Promise<V> setSuccess(V result) { if (setSuccess0(result)) { // 調用回調方法 notifyListeners(); return this; } throw new IllegalStateException("complete already: " + this); } private boolean setSuccess0(V result) { return setValue0(result == null ? SUCCESS : result); } private boolean setValue0(Object objResult) { // 原子修改result字段爲objResult if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { checkNotifyWaiters(); return true; } return false; } private synchronized void checkNotifyWaiters() { if (waiters > 0) { // 若是有其餘線程在等待該promise的結果,則喚醒他們 notifyAll(); } }
設置promise的狀態其實就是原子地修改result字段爲傳入的執行結果。值得注意的是,result字段帶有volatile關鍵字來確保多線程之間的可見性。另外,設置完畢狀態後,會嘗試喚醒全部在阻塞等待該promise返回結果的線程。
其餘設置狀態方法再也不贅言,基本上大同小異。
上文提到其餘線程會阻塞等待該promise返回結果,具體實現以sync方法爲例:
@Override public Promise<V> sync() throws InterruptedException { // 阻塞等待 await(); // 若是有異常則拋出 rethrowIfFailed(); return this; } @Override public Promise<V> await() throws InterruptedException { if (isDone()) { // 若是已經完成,直接返回 return this; } // 能夠被中斷 if (Thread.interrupted()) { throw new InterruptedException(toString()); } //檢查死循環 checkDeadLock(); synchronized (this) { while (!isDone()) { // 遞增計數器(用於記錄有多少個線程在等待該promise返回結果) incWaiters(); try { // 阻塞等待結果 wait(); } finally { // 遞減計數器 decWaiters(); } } } return this; }
全部調用sync方法的線程,都會被阻塞,直到promise被設置爲成功或者失敗。這也解釋了爲什麼Netty客戶端或者服務端啓動的時候通常都會調用sync方法,本質上都是阻塞當前線程而異步地等待I/O結果返回,以下:
Bootstrap bootstrap = new Bootstrap(); ChannelFuture future = bootstrap.group(new NioEventLoopGroup(10)) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 管道中添加基於換行符分割字符串的解析器 ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); // 管道中添加字符串編碼解碼器 ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8"))); ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8"))); // 管道中添加服務端處理邏輯 ch.pipeline().addLast(new MyClientEchoHandler()); } }).connect("127.0.0.1", 9898).sync(); future.channel().closeFuture().sync();
@Override public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { checkNotNull(listener, "listener"); synchronized (this) { // 添加回調方法 addListener0(listener); } if (isDone()) { // 若是I/O操做已經結束,直接觸發回調 notifyListeners(); } return this; } private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) { if (listeners == null) { // 只有一個回調方法直接賦值 listeners = listener; } else if (listeners instanceof DefaultFutureListeners) { // 將回調方法添加到DefaultFutureListeners內部維護的listeners數組中 ((DefaultFutureListeners) listeners).add(listener); } else { // 若是有多個回調方法,新建一個DefaultFutureListeners以保存更多的回調方法 listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener); } }
從上邊能夠看到,添加回調方法完成以後,會當即檢查promise是否已經完成;若是promise已經完成,則立刻調用回調方法。
Netty的Promise和Future機制是基於Java併發包下的Future<V>開發的。其中Future支持阻塞等待、添加回調方法、判斷執行狀態等,而Promise主要是支持狀態設置相關方法。當底層I/O操做經過Promise改變執行狀態,咱們能夠經過同步等待的Future當即獲得結果。
所以,就像永順大牛標題所言,在Netty的異步模型裏,Promise和Future就像是雙子星通常緊密相連。但我以爲這二者更像是量子糾纏裏的兩個電子,由於改變其中一個方的狀態,另一方可以立刻感知。
至此,Promise和Future的核心原理已經分析完畢。