最近一直在看Netty
相關的內容,也在編寫一個輕量級的RPC
框架來練手,途中發現了Netty
的源碼有不少亮點,某些實現甚至能夠用苛刻來形容。另外,Netty
提供的工具類也是至關優秀,能夠開箱即用。這裏分析一下我的比較喜歡的領域,併發方面的一個Netty
工具模塊 - Promise
。前端
環境版本:java
Netty:4.1.44.Final
JDK1.8
Promise,中文翻譯爲承諾或者許諾,含義是人與人之間,一我的對另外一我的所說的具備必定憧憬的話,通常是能夠實現的。git
io.netty.util.concurrent.Promise
在註釋中只有一句話:特殊的可寫的io.netty.util.concurrent.Future
(Promise
接口是io.netty.util.concurrent.Future
的子接口)。而io.netty.util.concurrent.Future
是java.util.concurrent.Future
的擴展,表示一個異步操做的結果。咱們知道,JDK
併發包中的Future
是不可寫,也沒有提供可監聽的入口(沒有應用觀察者模式),而Promise
很好地彌補了這兩個問題。另外一方面從繼承關係來看,DefaultPromise
是這些接口的最終實現類,因此分析源碼的時候須要把重心放在DefaultPromise
類。通常一個模塊提供的功能都由接口定義,這裏分析一下兩個接口的功能列表:github
io.netty.util.concurrent.Promise
io.netty.util.concurrent.Future
先看io.netty.util.concurrent.Future
接口:shell
public interface Future<V> extends java.util.concurrent.Future<V> { // I/O操做是否執行成功 boolean isSuccess(); // 標記是否能夠經過下面的cancel(boolean mayInterruptIfRunning)取消I/O操做 boolean isCancellable(); // 返回I/O操做的異常實例 - 若是I/O操做自己是成功的,此方法返回null Throwable cause(); // 爲當前Future實例添加監聽Future操做完成的監聽器 - isDone()方法激活以後全部監聽器實例會獲得回調 Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); // 爲當前Future移除監聽Future操做完成的監聽器 Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); // 同步等待Future完成獲得最終結果(成功)或者拋出異常(失敗),響應中斷 Future<V> sync() throws InterruptedException; // 同步等待Future完成獲得最終結果(成功)或者拋出異常(失敗),不響應中斷 Future<V> syncUninterruptibly(); // 等待Future完成,響應中斷 Future<V> await() throws InterruptedException; // 等待Future完成,不響應中斷 Future<V> awaitUninterruptibly(); // 帶超時時限的等待Future完成,響應中斷 boolean await(long timeout, TimeUnit unit) throws InterruptedException; boolean await(long timeoutMillis) throws InterruptedException; // 帶超時時限的等待Future完成,不響應中斷 boolean awaitUninterruptibly(long timeout, TimeUnit unit); boolean awaitUninterruptibly(long timeoutMillis); // 非阻塞立刻返回Future的結果,若是Future未完成,此方法必定返回null;有些場景下若是Future成功獲取到的結果是null則須要二次檢查isDone()方法是否爲true V getNow(); // 取消當前Future實例的執行,若是取消成功會拋出CancellationException異常 @Override boolean cancel(boolean mayInterruptIfRunning); }
sync()
和await()
方法相似,只是sync()
會檢查異常執行的狀況,一旦發現執行異常立刻把異常實例包裝拋出,而await()
方法對異常無感知。數組
接着看io.netty.util.concurrent.Promise
接口:promise
public interface Promise<V> extends Future<V> { // 標記當前Future成功,設置結果,若是設置成功,則通知全部的監聽器,若是Future已經成功或者失敗,則拋出IllegalStateException Promise<V> setSuccess(V result); // 標記當前Future成功,設置結果,若是設置成功,則通知全部的監聽器而且返回true,不然返回false boolean trySuccess(V result); // 標記當前Future失敗,設置結果爲異常實例,若是設置成功,則通知全部的監聽器,若是Future已經成功或者失敗,則拋出IllegalStateException Promise<V> setFailure(Throwable cause); // 標記當前Future失敗,設置結果爲異常實例,若是設置成功,則通知全部的監聽器而且返回true,不然返回false boolean tryFailure(Throwable cause); // 標記當前的Promise實例爲不可取消,設置成功返回true,不然返回false boolean setUncancellable(); // 下面的方法和io.netty.util.concurrent.Future中的方法基本一致,只是修改了返回類型爲Promise @Override Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); @Override Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); @Override Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); @Override Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); @Override Promise<V> await() throws InterruptedException; @Override Promise<V> awaitUninterruptibly(); @Override Promise<V> sync() throws InterruptedException; @Override Promise<V> syncUninterruptibly(); }
到此,Promise
接口的全部功能都分析完畢,接下來從源碼角度詳細分析Promise
的實現。併發
Promise
的實現類爲io.netty.util.concurrent.DefaultPromise
(其實DefaultPromise
還有不少子類,某些實現是爲了定製特定的場景作了擴展),而DefaultPromise
繼承自io.netty.util.concurrent.AbstractFuture
:框架
public abstract class AbstractFuture<V> implements Future<V> { // 永久阻塞等待獲取結果的方法 @Override public V get() throws InterruptedException, ExecutionException { // 調用響應中斷的永久等待方法進行阻塞 await(); // 從永久阻塞中喚醒後,先判斷Future是否執行異常 Throwable cause = cause(); if (cause == null) { // 異常爲空說明執行成功,調用getNow()方法返回結果 return getNow(); } // 異常爲空不爲空,這裏區分特定的取消異常則轉換爲CancellationException拋出 if (cause instanceof CancellationException) { throw (CancellationException) cause; } // 非取消異常的其餘全部異常都被包裝爲執行異常ExecutionException拋出 throw new ExecutionException(cause); } // 帶超時阻塞等待獲取結果的方法 @Override public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { // 調用響應中斷的帶超時時限等待方法進行阻塞 if (await(timeout, unit)) { // 從帶超時時限阻塞中喚醒後,先判斷Future是否執行異常 Throwable cause = cause(); if (cause == null) { // 異常爲空說明執行成功,調用getNow()方法返回結果 return getNow(); } // 異常爲空不爲空,這裏區分特定的取消異常則轉換爲CancellationException拋出 if (cause instanceof CancellationException) { throw (CancellationException) cause; } // 在非等待超時的前提下,非取消異常的其餘全部異常都被包裝爲執行異常ExecutionException拋出 throw new ExecutionException(cause); } // 方法步入此處說明等待超時,則拋出超時異常TimeoutException throw new TimeoutException(); } }
AbstractFuture
僅僅對get()
和get(long timeout, TimeUnit unit)
兩個方法進行了實現,其實這兩處的實現和java.util.concurrent.FutureTask
中的實現方式十分類似。異步
DefaultPromise
的源碼比較多,這裏分開多個部分去閱讀,先看它的屬性和構造函數:
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> { // 正常日誌的日誌句柄,InternalLogger是Netty內部封裝的日誌接口 private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class); // 任務拒絕執行時候的日誌句柄 - Promise須要做爲一個任務提交到線程中執行,若是任務拒絕則使用此日誌句柄打印日誌 private static final InternalLogger rejectedExecutionLogger = InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution"); // 監聽器的最大棧深度,默認值爲8,這個值是防止嵌套回調調用的時候棧深度過大致使內存溢出,後面會舉個例子說明它的用法 private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8, SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8)); // 結果更新器,用於CAS更新結果result的值 @SuppressWarnings("rawtypes") private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result"); // 用於填充result的值,當設置結果result傳入null,Promise執行成功,用這個值去表示成功的結果 private static final Object SUCCESS = new Object(); // 用於填充result的值,表示Promise不能被取消 private static final Object UNCANCELLABLE = new Object(); // CancellationException實例的持有器,用於判斷Promise取消狀態和拋出CancellationException private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace( new CancellationException(), DefaultPromise.class, "cancel(...)")); // CANCELLATION_CAUSE_HOLDER的異常棧信息元素數組 private static final StackTraceElement[] CANCELLATION_STACK = CANCELLATION_CAUSE_HOLDER.cause.getStackTrace(); // 真正的結果對象,使用Object類型,最終有可能爲null、真正的結果實例、SUCCESS、UNCANCELLABLE或者CANCELLATION_CAUSE_HOLDER等等 private volatile Object result; // 事件執行器,這裏暫時不作展開,能夠理解爲單個調度線程 private final EventExecutor executor; // 監聽器集合,多是單個GenericFutureListener實例或者DefaultFutureListeners(監聽器集合)實例 private Object listeners; // 等待獲取結果的線程數量 private short waiters; // 標記是否正在回調監聽器 private boolean notifyingListeners; // 構造函數依賴於EventExecutor public DefaultPromise(EventExecutor executor) { this.executor = checkNotNull(executor, "executor"); } protected DefaultPromise() { // only for subclasses - 這個構造函數預留給子類 executor = null; } // ... 省略其餘代碼 ... // 私有靜態內部類,用於存放Throwable實例,也就是持有異常的緣由實例 private static final class CauseHolder { final Throwable cause; CauseHolder(Throwable cause) { this.cause = cause; } } // 私有靜態內部類,用於覆蓋CancellationException的棧信息爲前面定義的CANCELLATION_STACK,同時覆蓋了toString()返回CancellationException的全類名 private static final class LeanCancellationException extends CancellationException { private static final long serialVersionUID = 2794674970981187807L; @Override public Throwable fillInStackTrace() { setStackTrace(CANCELLATION_STACK); return this; } @Override public String toString() { return CancellationException.class.getName(); } } // ... 省略其餘代碼 ... }
Promise
目前支持兩種類型的監聽器:
GenericFutureListener
:支持泛型的Future
監聽器。GenericProgressiveFutureListener
:它是GenericFutureListener
的子類,支持進度表示和支持泛型的Future
監聽器(有些場景須要多個步驟實現,相似於進度條那樣)。// GenericFutureListener public interface GenericFutureListener<F extends Future<?>> extends EventListener { void operationComplete(F future) throws Exception; } // GenericProgressiveFutureListener public interface GenericProgressiveFutureListener<F extends ProgressiveFuture<?>> extends GenericFutureListener<F> { void operationProgressed(F future, long progress, long total) throws Exception; }
爲了讓Promise
支持多個監聽器,Netty
添加了一個默認修飾符修飾的DefaultFutureListeners
類用於保存監聽器實例數組:
// DefaultFutureListeners final class DefaultFutureListeners { private GenericFutureListener<? extends Future<?>>[] listeners; private int size; private int progressiveSize; // the number of progressive listeners // 這個構造相對特別,是爲了讓Promise中的listeners(Object類型)實例由單個GenericFutureListener實例轉換爲DefaultFutureListeners類型 @SuppressWarnings("unchecked") DefaultFutureListeners(GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) { listeners = new GenericFutureListener[2]; listeners[0] = first; listeners[1] = second; size = 2; if (first instanceof GenericProgressiveFutureListener) { progressiveSize ++; } if (second instanceof GenericProgressiveFutureListener) { progressiveSize ++; } } public void add(GenericFutureListener<? extends Future<?>> l) { GenericFutureListener<? extends Future<?>>[] listeners = this.listeners; final int size = this.size; // 注意這裏,每次擴容數組長度是原來的2倍 if (size == listeners.length) { this.listeners = listeners = Arrays.copyOf(listeners, size << 1); } // 把當前的GenericFutureListener加入數組中 listeners[size] = l; // 監聽器總數量加1 this.size = size + 1; // 若是爲GenericProgressiveFutureListener,則帶進度指示的監聽器總數量加1 if (l instanceof GenericProgressiveFutureListener) { progressiveSize ++; } } public void remove(GenericFutureListener<? extends Future<?>> l) { final GenericFutureListener<? extends Future<?>>[] listeners = this.listeners; int size = this.size; for (int i = 0; i < size; i ++) { if (listeners[i] == l) { // 計算須要須要移動的監聽器的下標 int listenersToMove = size - i - 1; if (listenersToMove > 0) { // listenersToMove後面的元素所有移動到數組的前端 System.arraycopy(listeners, i + 1, listeners, i, listenersToMove); } // 當前監聽器總量的最後一個位置設置爲null,數量減1 listeners[-- size] = null; this.size = size; // 若是監聽器是GenericProgressiveFutureListener,則帶進度指示的監聽器總數量減1 if (l instanceof GenericProgressiveFutureListener) { progressiveSize --; } return; } } } // 返回監聽器實例數組 public GenericFutureListener<? extends Future<?>>[] listeners() { return listeners; } // 返回監聽器總數量 public int size() { return size; } // 返回帶進度指示的監聽器總數量 public int progressiveSize() { return progressiveSize; } }
接下來看DefaultPromise
的剩餘方法實現,筆者以爲DefaultPromise
方法實如今代碼順序上是有必定的藝術的。先看幾個判斷Promise
執行狀態的方法:
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> { // ... 省略其餘代碼 ... @Override public boolean setUncancellable() { // 經過結果更新器CAS更新result爲UNCANCELLABLE,指望舊值爲null,更新值爲UNCANCELLABLE屬性,若是成功則返回true if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) { return true; } Object result = this.result; // 步入這裏說明result當前值不爲null,isDone0()和isCancelled0()都是終態,這裏若是命中終態就返回false //(筆者注:其實能夠這樣認爲,這裏result不能爲null,若是不爲終態,它只能是UNCANCELLABLE屬性實例) return !isDone0(result) || !isCancelled0(result); } @Override public boolean isSuccess() { Object result = this.result; // 若是執行成功,則結果不爲null,同時不爲UNCANCELLABLE,同時不爲CauseHolder類型 //(筆者注:其實能夠這樣認爲,Promise爲成功,則result只能是一個開發者定義的實例或者SUCCESS屬性實例) return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder); } @Override public boolean isCancellable() { // 是否可取消的,result爲null說明Promise處於初始化狀態還沒有執行,則認爲能夠取消 return result == null; } @Override public Throwable cause() { // 經過當前result獲取Throwable實例 return cause0(result); } private Throwable cause0(Object result) { // result非CauseHolder類型,則直接返回null if (!(result instanceof CauseHolder)) { return null; } // 若是result爲CANCELLATION_CAUSE_HOLDER(靜態CancellationException的持有) if (result == CANCELLATION_CAUSE_HOLDER) { // 則新建一個自定義LeanCancellationException實例 CancellationException ce = new LeanCancellationException(); // 若是CAS更新結果result爲LeanCancellationException新實例則返回 if (RESULT_UPDATER.compareAndSet(this, CANCELLATION_CAUSE_HOLDER, new CauseHolder(ce))) { return ce; } // 走到這裏說明了result是非CANCELLATION_CAUSE_HOLDER的自定義CauseHolder實例 result = this.result; } // 兜底返回CauseHolder持有的cause return ((CauseHolder) result).cause; } // 靜態方法,判斷Promise是否爲取消,依據是result必須是CauseHolder類型,同時CauseHolder中的cause必須爲CancellationException類型或者其子類 private static boolean isCancelled0(Object result) { return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException; } // 靜態方法,判斷Promise是否完成,依據是result不爲null同時不爲UNCANCELLABLE屬性實例 private static boolean isDone0(Object result) { return result != null && result != UNCANCELLABLE; } // 判斷Promise實例是否取消 @Override public boolean isCancelled() { return isCancelled0(result); } // 判斷Promise實例是否完成 @Override public boolean isDone() { return isDone0(result); } // ... 省略其餘代碼 ... }
接着看監聽器的添加和移除方法(這其中也包含了通知監聽器的邏輯):
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> { // ... 省略其餘代碼 ... @Override public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { // 入參非空校驗 checkNotNull(listener, "listener"); // 加鎖,鎖定的對象是Promise實例自身 synchronized (this) { // 添加監聽器 addListener0(listener); } // 若是Promise實例已經執行完畢,則通知監聽器進行回調 if (isDone()) { notifyListeners(); } return this; } @Override public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) { // 入參非空校驗 checkNotNull(listeners, "listeners"); // 加鎖,鎖定的對象是Promise實例自身 synchronized (this) { // 遍歷入參數組添加監聽器,有空元素直接跳出 for (GenericFutureListener<? extends Future<? super V>> listener : listeners) { if (listener == null) { break; } addListener0(listener); } } // 若是Promise實例已經執行完畢,則通知監聽器進行回調 if (isDone()) { notifyListeners(); } return this; } @Override public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) { // 入參非空校驗 checkNotNull(listener, "listener"); // 加鎖,鎖定的對象是Promise實例自身 synchronized (this) { // 移除監聽器 removeListener0(listener); } return this; } @Override public Promise<V> removeListeners(final GenericFutureListener<? extends Future<? super V>>... listeners) { // 入參非空校驗 checkNotNull(listeners, "listeners"); // 加鎖,鎖定的對象是Promise實例自身 synchronized (this) { // 遍歷入參數組移除監聽器,有空元素直接跳出 for (GenericFutureListener<? extends Future<? super V>> listener : listeners) { if (listener == null) { break; } removeListener0(listener); } } return this; } private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) { // 若是Promise實例持有listeners爲null,則直接設置爲入參listener if (listeners == null) { listeners = listener; } else if (listeners instanceof DefaultFutureListeners) { // 若是當前Promise實例持有listeners的是DefaultFutureListeners類型,則調用它的add()方法進行添加 ((DefaultFutureListeners) listeners).add(listener); } else { // 步入這裏說明當前Promise實例持有listeners爲單個GenericFutureListener實例,須要轉換爲DefaultFutureListeners實例 listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener); } } private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) { // 若是當前Promise實例持有listeners的是DefaultFutureListeners類型,則調用它的remove()方法進行移除 if (listeners instanceof DefaultFutureListeners) { ((DefaultFutureListeners) listeners).remove(listener); } else if (listeners == listener) { // 若是當前Promise實例持有listeners不爲DefaultFutureListeners類型,也就是單個GenericFutureListener而且和傳入的listener相同, // 則Promise實例持有listeners置爲null listeners = null; } } private void notifyListeners() { EventExecutor executor = executor(); // 當前執行線程是事件循環線程,那麼直接同步調用,簡單來講就是調用notifyListeners()方法的線程和EventExecutor是同一個線程 if (executor.inEventLoop()) { // 下面的ThreadLocal和listenerStackDepth是調用棧深度保護相關,博文會另起一個章節專門講解這個問題,這裏能夠暫時忽略 final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); final int stackDepth = threadLocals.futureListenerStackDepth(); if (stackDepth < MAX_LISTENER_STACK_DEPTH) { threadLocals.setFutureListenerStackDepth(stackDepth + 1); try { notifyListenersNow(); } finally { threadLocals.setFutureListenerStackDepth(stackDepth); } return; } } // 當前執行線程不是事件循環線程,則把notifyListenersNow()包裝爲Runnable實例放到EventExecutor中執行 safeExecute(executor, new Runnable() { @Override public void run() { notifyListenersNow(); } }); } // 使用EventExecutor進行任務執行,execute()方法拋出的異常會使用rejectedExecutionLogger句柄打印 private static void safeExecute(EventExecutor executor, Runnable task) { try { executor.execute(task); } catch (Throwable t) { rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t); } } // 立刻通知全部監聽器進行回調 private void notifyListenersNow() { Object listeners; // 這裏加鎖,在鎖的保護下設置notifyingListeners的值,若是多個線程調用同一個Promise實例的notifyListenersNow()方法 // 命中notifyingListeners的線程能夠直接返回 synchronized (this) { // Only proceed if there are listeners to notify and we are not already notifying listeners. if (notifyingListeners || this.listeners == null) { return; } notifyingListeners = true; // 臨時變量listeners存放瞬時的監聽器實例,方便下一步設置Promise實例的listeners爲null listeners = this.listeners; // 重置當前Promise實例的listeners爲null this.listeners = null; } for (;;) { if (listeners instanceof DefaultFutureListeners) { // 多個監聽器狀況下的通知 notifyListeners0((DefaultFutureListeners) listeners); } else { // 單個監聽器狀況下的通知 notifyListener0(this, (GenericFutureListener<?>) listeners); } synchronized (this) { if (this.listeners == null) { // 這裏由於沒有異常拋出的可能,不用在finally塊中編寫,重置notifyingListeners爲false而且返回跳出循環 notifyingListeners = false; return; } // 臨時變量listeners存放瞬時的監聽器實例,回調操做判斷是基於臨時實例去作 - 這裏可能由另外一個線程更新了listeners的值 listeners = this.listeners; // 重置當前Promise實例的listeners爲null,確保監聽器只會被回調一次,下一次跳出for死循環 this.listeners = null; } } } // 遍歷DefaultFutureListeners中的listeners數組,調用靜態方法notifyListener0() private void notifyListeners0(DefaultFutureListeners listeners) { GenericFutureListener<?>[] a = listeners.listeners(); int size = listeners.size(); for (int i = 0; i < size; i ++) { notifyListener0(this, a[i]); } } // 這個靜態方法是最終監聽器回調的方法,也就是簡單調用GenericFutureListener#operationComplete()傳入的是當前的Promise實例,捕獲一切異常打印warn日誌 @SuppressWarnings({ "unchecked", "rawtypes" }) private static void notifyListener0(Future future, GenericFutureListener l) { try { l.operationComplete(future); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t); } } } }
而後看wait()
和sync()
方法體系:
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> { // ... 省略其餘代碼 ... @Override public Promise<V> await() throws InterruptedException { // 若是Promise執行完畢,直接返回 if (isDone()) { return this; } // 若是當前線程中斷則直接拋出InterruptedException if (Thread.interrupted()) { throw new InterruptedException(toString()); } // 死鎖檢測 checkDeadLock(); // 加鎖,加鎖對象是當前Promise實例 synchronized (this) { // 這裏設置一個死循環,終止條件是isDone()爲true while (!isDone()) { // 等待線程數加1 incWaiters(); try { // 這裏調用的是Object#wait()方法進行阻塞,若是線程被中斷會拋出InterruptedException wait(); } finally { // 解除阻塞後等待線程數減1 decWaiters(); } } } return this; } @Override public Promise<V> awaitUninterruptibly() { // 若是Promise執行完畢,直接返回 if (isDone()) { return this; } // 死鎖檢測 checkDeadLock(); boolean interrupted = false; // 加鎖,加鎖對象是當前Promise實例 synchronized (this) { // 這裏設置一個死循環,終止條件是isDone()爲true while (!isDone()) { // 等待線程數加1 incWaiters(); try { // 這裏調用的是Object#wait()方法進行阻塞,捕獲了InterruptedException異常,若是拋出InterruptedException記錄線程的中斷狀態到interrupted wait(); } catch (InterruptedException e) { // Interrupted while waiting. interrupted = true; } finally { // 解除阻塞後等待線程數減1 decWaiters(); } } } // 若是線程被中斷跳出等待阻塞,則清除線程的中斷標誌位 if (interrupted) { Thread.currentThread().interrupt(); } return this; } // 後面的幾個帶超時時限的wait()方法都是調用await0() @Override public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return await0(unit.toNanos(timeout), true); } @Override public boolean await(long timeoutMillis) throws InterruptedException { return await0(MILLISECONDS.toNanos(timeoutMillis), true); } @Override public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { try { return await0(unit.toNanos(timeout), false); } catch (InterruptedException e) { // Should not be raised at all. throw new InternalError(); } } @Override public boolean awaitUninterruptibly(long timeoutMillis) { try { return await0(MILLISECONDS.toNanos(timeoutMillis), false); } catch (InterruptedException e) { // Should not be raised at all. throw new InternalError(); } } // 檢查死鎖,這裏判斷了等待線程是事件循環線程則直接拋出BlockingOperationException異常 // 簡單來講就是:Promise的執行線程和等待結果的線程,不能是同一個線程,不然依賴會成環 protected void checkDeadLock() { EventExecutor e = executor(); if (e != null && e.inEventLoop()) { throw new BlockingOperationException(toString()); } } @Override public Promise<V> sync() throws InterruptedException { // 同步永久阻塞等待 await(); // 阻塞等待解除,若是執行存在異常,則直接拋出 rethrowIfFailed(); return this; } @Override public Promise<V> syncUninterruptibly() { // 同步永久阻塞等待 - 響應中斷 awaitUninterruptibly(); // 塞等待解除,若是執行存在異常,則直接拋出 rethrowIfFailed(); return this; } // waiters加1,若是超過Short.MAX_VALUE則拋出IllegalStateException private void incWaiters() { if (waiters == Short.MAX_VALUE) { throw new IllegalStateException("too many waiters: " + this); } ++waiters; } // waiters減1 private void decWaiters() { --waiters; } // cause不爲null則拋出 private void rethrowIfFailed() { Throwable cause = cause(); if (cause == null) { return; } PlatformDependent.throwException(cause); } private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException { // 若是Promise執行完畢,直接返回 if (isDone()) { return true; } // 若是超時時限小於0那麼返回isDone()的結果 if (timeoutNanos <= 0) { return isDone(); } // 若是容許中斷,當前線程的中斷標誌位爲true,則拋出InterruptedException if (interruptable && Thread.interrupted()) { throw new InterruptedException(toString()); } // 死鎖檢測 checkDeadLock(); // 記錄當前的納秒時間戳 long startTime = System.nanoTime(); // 等待時間的長度 - 單位爲納秒 long waitTime = timeoutNanos; // 記錄線程是否被中斷 boolean interrupted = false; try { // 死循環 for (;;) { synchronized (this) { // 若是Promise執行完畢,直接返回true - 這一步是先驗判斷,命中了就不須要阻塞等待 if (isDone()) { return true; } // 等待線程數加1 incWaiters(); try { // 這裏調用的是帶超時時限的Object#wait()方法進行阻塞 wait(waitTime / 1000000, (int) (waitTime % 1000000)); } catch (InterruptedException e) { // 線程被中斷而且外部容許中斷,那麼直接拋出InterruptedException if (interruptable) { throw e; } else { // 不然只記錄中斷過的狀態 interrupted = true; } } finally { // 解除阻塞後等待線程數減1 decWaiters(); } } // 解除阻塞後,若是Promise執行完畢,直接返回true if (isDone()) { return true; } else { // 步入這裏說明Promise還沒有執行完畢,則從新計算等待時間間隔的長度數量(修正),若是大於0則進入下一輪循環 waitTime = timeoutNanos - (System.nanoTime() - startTime); if (waitTime <= 0) { return isDone(); } } } } finally { // 若是線程被中斷跳出等待阻塞,則清除線程的中斷標誌位 if (interrupted) { Thread.currentThread().interrupt(); } } } // ... 省略其餘代碼 ... }
最後是幾個設置結果和獲取結果的方法:
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> { // ... 省略其餘代碼 ... @Override public Promise<V> setSuccess(V result) { // 設置成功結果,若是設置成功則返回當前Promise實例 if (setSuccess0(result)) { return this; } // 設置失敗說明了屢次設置,Promise已經執行完畢,則拋出異常 throw new IllegalStateException("complete already: " + this); } @Override public boolean trySuccess(V result) { // 設置成功結果,返回的布爾值表示成功或失敗 return setSuccess0(result); } @Override public Promise<V> setFailure(Throwable cause) { // 設置失敗結果,若是設置成功則返回當前Promise實例 if (setFailure0(cause)) { return this; } // 設置失敗說明了屢次設置,Promise已經執行完畢,則拋出異常 throw new IllegalStateException("complete already: " + this, cause); } @Override public boolean tryFailure(Throwable cause) { // 設置失敗結果,返回的布爾值表示成功或失敗 return setFailure0(cause); } @SuppressWarnings("unchecked") @Override public V getNow() { // 非阻塞獲取結果,若是result是CauseHolder類型、SUCCESS屬性實例或者UNCANCELLABLE實行實例則返回null,不然返回轉換類型後的result值 // 對異常無感知,若是CauseHolder包裹了異常,此方法依然返回null Object result = this.result; if (result instanceof CauseHolder || result == SUCCESS || result == UNCANCELLABLE) { return null; } return (V) result; } @SuppressWarnings("unchecked") @Override public V get() throws InterruptedException, ExecutionException { // 永久阻塞獲取結果 Object result = this.result; // 若是Promise未執行完畢則進行永久阻塞等待 if (!isDone0(result)) { await(); // 更新結果臨時變量 result = this.result; } // result爲SUCCESS屬性實例或者UNCANCELLABLE屬性實例的時候直接返回null if (result == SUCCESS || result == UNCANCELLABLE) { return null; } // 若是result爲CauseHolder類型,則獲取其中持有的cause屬性,也有可能爲null Throwable cause = cause0(result); if (cause == null) { // 執行成功的前提下轉換類型後的result值返回 return (V) result; } // 取消的狀況,拋出CancellationException if (cause instanceof CancellationException) { throw (CancellationException) cause; } // 剩餘的狀況一概封裝爲ExecutionException異常 throw new ExecutionException(cause); } @SuppressWarnings("unchecked") @Override public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { // 帶超時時限的阻塞獲取結果 Object result = this.result; // 若是Promise未執行完畢則進行帶超時時限的阻塞等待 if (!isDone0(result)) { if (!await(timeout, unit)) { // 等待超時直接拋出TimeoutException throw new TimeoutException(); } // 更新結果臨時變量 result = this.result; } // result爲SUCCESS屬性實例或者UNCANCELLABLE屬性實例的時候直接返回null if (result == SUCCESS || result == UNCANCELLABLE) { return null; } // 若是result爲CauseHolder類型,則獲取其中持有的cause屬性,也有可能爲null Throwable cause = cause0(result); if (cause == null) { // 執行成功的前提下轉換類型後的result值返回 return (V) result; } // 取消的狀況,拋出CancellationException if (cause instanceof CancellationException) { throw (CancellationException) cause; } // 剩餘的狀況一概封裝爲ExecutionException異常 throw new ExecutionException(cause); } @Override public boolean cancel(boolean mayInterruptIfRunning) { // CAS更新result爲CANCELLATION_CAUSE_HOLDER,result的指望值必須爲null if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) { // 判斷是否須要進行等待線程的通知 if (checkNotifyWaiters()) { // 通知監聽器進行回調 notifyListeners(); } return true; } return false; } private boolean setSuccess0(V result) { // 設置執行成功的結果,若是入參result爲null,則選用SUCCESS屬性,不然使用result return setValue0(result == null ? SUCCESS : result); } private boolean setFailure0(Throwable cause) { // 設置執行失敗的結果,入參是Throwable類型,封裝爲CauseHolder,存放在CauseHolder實例的cause屬性 return setValue0(new CauseHolder(checkNotNull(cause, "cause"))); } private boolean setValue0(Object objResult) { // CAS更新result爲入參objResult,result的指望值必須爲null或者UNCANCELLABLE才能更新成功 if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { // 判斷是否須要進行等待線程的通知 if (checkNotifyWaiters()) { // 通知監聽器進行回調 notifyListeners(); } return true; } return false; } // 判斷是否須要進行等待線程的通知 - 實際上是判斷是否須要通知監聽器回調 private synchronized boolean checkNotifyWaiters() { // 若是等待線程數量大於0則調用Object#notifyAll()喚醒全部等待線程 if (waiters > 0) { notifyAll(); } // 若是listeners不爲空(也就是存在監聽器)的時候才返回true return listeners != null; } // ... 省略其餘代碼 ... }
要使用Netty
的Promise
模塊,並不須要引入Netty
的全部依賴,這裏只須要引入netty-common
:
<dependency> <groupId>io.netty</groupId> <artifactId>netty-common</artifactId> <version>4.1.44.Final</version> </dependency>
EventExecutor
選取方面,Netty
已經準備了一個GlobalEventExecutor
用於全局事件處理,這裏能夠直接選用(固然也能夠自行實現EventExecutor
或者用EventExecutor
的其餘實現類):
EventExecutor executor = GlobalEventExecutor.INSTANCE; Promise<String> promise = new DefaultPromise<>(executor);
這裏設計一個場景:異步下載一個連接的資源到磁盤上,下載完成以後須要異步通知下載完的磁盤文件路徑,獲得通知以後打印下載結果到控制檯中。
public class PromiseMain { public static void main(String[] args) throws Exception { String url = "http://xxx.yyy.zzz"; EventExecutor executor = GlobalEventExecutor.INSTANCE; Promise<DownloadResult> promise = new DefaultPromise<>(executor); promise.addListener(new DownloadResultListener()); Thread thread = new Thread(() -> { try { System.out.println("開始下載資源,url:" + url); long start = System.currentTimeMillis(); // 模擬下載耗時 Thread.sleep(2000); String location = "C:\\xxx\\yyy\\z.md"; long cost = System.currentTimeMillis() - start; System.out.println(String.format("下載資源成功,url:%s,保存到:%s,耗時:%d ms", url, location, cost)); DownloadResult result = new DownloadResult(); result.setUrl(url); result.setFileDiskLocation(location); result.setCost(cost); // 通知結果 promise.setSuccess(result); } catch (Exception ignore) { } }, "Download-Thread"); thread.start(); Thread.sleep(Long.MAX_VALUE); } @Data private static class DownloadResult { private String url; private String fileDiskLocation; private long cost; } private static class DownloadResultListener implements GenericFutureListener<Future<DownloadResult>> { @Override public void operationComplete(Future<DownloadResult> future) throws Exception { if (future.isSuccess()) { DownloadResult downloadResult = future.getNow(); System.out.println(String.format("下載完成通知,url:%s,文件磁盤路徑:%s,耗時:%d ms", downloadResult.getUrl(), downloadResult.getFileDiskLocation(), downloadResult.getCost())); } } } }
執行後控制檯輸出:
開始下載資源,url:http://xxx.yyy.zzz 下載資源成功,url:http://xxx.yyy.zzz,保存到:C:\xxx\yyy\z.md,耗時:2000 ms 下載完成通知,url:http://xxx.yyy.zzz,文件磁盤路徑:C:\xxx\yyy\z.md,耗時:2000 ms
Promise
適用的場景不少,除了異步通知的場景也能用於同步調用,它在設計上比JUC
的Future
靈活不少,基於Future
擴展出不少新的特性,有須要的能夠單獨引入此依賴直接使用。
有些時候,因爲封裝或者人爲編碼異常等緣由,監聽器的回調可能出現基於多個Promise
造成的鏈(參考Issue-5302,a promise listener chain
),這樣子有可能出現遞歸調用深度過大而致使棧溢出,所以須要設置一個閾值,限制遞歸調用的最大棧深度,這個深度閾值暫且稱爲棧深度保護閾值,默認值是8,能夠經過系統參數io.netty.defaultPromise.maxListenerStackDepth
覆蓋設置。這裏貼出前面提到過的代碼塊:
private void notifyListeners() { EventExecutor executor = executor(); // 事件執行器必須是事件循環類型,也就是executor.inEventLoop()爲true的時候才啓用遞歸棧深度保護 if (executor.inEventLoop()) { // 獲取當前線程綁定的InternalThreadLocalMap實例,這裏相似於ThreadLocal final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); // 獲取當前線程的監聽器調用棧深度 final int stackDepth = threadLocals.futureListenerStackDepth(); // 監聽器調用棧深度若是不超過閾值MAX_LISTENER_STACK_DEPTH if (stackDepth < MAX_LISTENER_STACK_DEPTH) { // 調用notifyListenersNow()前先設置監聽器調用棧深度 + 1 threadLocals.setFutureListenerStackDepth(stackDepth + 1); try { notifyListenersNow(); } finally { // 調用notifyListenersNow()完畢後設置監聽器調用棧深度爲調用前的數值,也就是恢復線程的監聽器調用棧深度 threadLocals.setFutureListenerStackDepth(stackDepth); } return; } } // 若是監聽器調用棧深度超過閾值MAX_LISTENER_STACK_DEPTH,則直接每次通知監聽器當成一個新的異步任務處理 safeExecute(executor, new Runnable() { @Override public void run() { notifyListenersNow(); } }); }
若是咱們想模擬一個例子觸發監聽器調用棧深度保護,那麼只須要想辦法在同一個EventLoop
類型的線程中遞歸調用notifyListeners()
方法便可。
最典型的例子就是在上一個Promise
監聽器回調的方法裏面觸發下一個Promise
的監聽器的setSuccess()
(簡單理解就是套娃),畫個圖理解一下:
測試代碼:
public class PromiseListenerMain { private static final AtomicInteger COUNTER = new AtomicInteger(0); public static void main(String[] args) throws Exception { EventExecutor executor = ImmediateEventExecutor.INSTANCE; // root Promise<String> root = new DefaultPromise<>(executor); Promise<String> p1 = new DefaultPromise<>(executor); Promise<String> p2 = new DefaultPromise<>(executor); Promise<String> p3 = new DefaultPromise<>(executor); Promise<String> p4 = new DefaultPromise<>(executor); Promise<String> p5 = new DefaultPromise<>(executor); Promise<String> p6 = new DefaultPromise<>(executor); Promise<String> p7 = new DefaultPromise<>(executor); Promise<String> p8 = new DefaultPromise<>(executor); Promise<String> p9 = new DefaultPromise<>(executor); Promise<String> p10 = new DefaultPromise<>(executor); p1.addListener(new Listener(p2)); p2.addListener(new Listener(p3)); p3.addListener(new Listener(p4)); p4.addListener(new Listener(p5)); p5.addListener(new Listener(p6)); p6.addListener(new Listener(p7)); p7.addListener(new Listener(p8)); p8.addListener(new Listener(p9)); p9.addListener(new Listener(p10)); root.addListener(new Listener(p1)); root.setSuccess("success"); Thread.sleep(Long.MAX_VALUE); } private static class Listener implements GenericFutureListener<Future<String>> { private final String name; private final Promise<String> promise; public Listener(Promise<String> promise) { this.name = "listener-" + COUNTER.getAndIncrement(); this.promise = promise; } @Override public void operationComplete(Future<String> future) throws Exception { System.out.println(String.format("監聽器[%s]回調成功...", name)); if (null != promise) { promise.setSuccess("success"); } } } }
由於有safeExecute()
兜底執行,上面的全部Promise
都會回調,這裏能夠採用IDEA
的高級斷點功能,在步入斷點的地方添加額外的日誌,輸出以下:
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)執行--- 監聽器[listener-9]回調成功... MAX_LISTENER_STACK_DEPTH(notifyListenersNow)執行--- 監聽器[listener-0]回調成功... MAX_LISTENER_STACK_DEPTH(notifyListenersNow)執行--- 監聽器[listener-1]回調成功... MAX_LISTENER_STACK_DEPTH(notifyListenersNow)執行--- 監聽器[listener-2]回調成功... MAX_LISTENER_STACK_DEPTH(notifyListenersNow)執行--- 監聽器[listener-3]回調成功... MAX_LISTENER_STACK_DEPTH(notifyListenersNow)執行--- 監聽器[listener-4]回調成功... MAX_LISTENER_STACK_DEPTH(notifyListenersNow)執行--- 監聽器[listener-5]回調成功... MAX_LISTENER_STACK_DEPTH(notifyListenersNow)執行--- 監聽器[listener-6]回調成功... safeExecute(notifyListenersNow)執行---------- 監聽器[listener-7]回調成功... safeExecute(notifyListenersNow)執行---------- 監聽器[listener-8]回調成功...
這裏筆者有點疑惑,若是調用棧深度大於8,超出的部分會包裝爲Runnable
實例提交到事件執行器執行,豈不是把遞歸棧溢出的隱患變成了內存溢出的隱患(由於異步任務也有可能積壓,除非拒絕任務提交,那麼具體要看EventExecutor
的實現了)?
Netty
提供的Promise
工具的源碼和使用方式都分析完了,設計理念和代碼都是十分值得借鑑,同時可以開箱即用,能夠在平常編碼中直接引入,減小重複造輪子的勞動和風險。
(本文完 e-a-20200123 c-3-d)