Future模式是一個重要的異步併發模式,在JDK有實現。但JDK實現的Future模式功能比較簡單,使用起來比較複雜。Netty在JDK Future基礎上,增強了Future的能力,具體體如今:java
吹了那麼多牛,有一個關鍵問題還沒弄清楚:Future究竟是幹嗎的?io.netty.util.concurrent.Future代碼的第一行註釋簡潔第回答了這個問題:Future就是異步操做的結果。這裏面有三個關鍵字:異步,操做,結果。首先,Future首先是一個「結果」;其次這個結果產生於一個「操做」,操做具體是什麼能夠隨便定義;最後這個操做是"異步"執行的,這就意味着「操做」可能在另外一個線程中併發執行,也可能隨後在同一個線程中執行,何時產生結果是一件不肯定的事。數組
異步調用過程的通常過程是:調用方喚起一個異步操做,在接下來的某個恰當的時間點獲得的異步操做操做的結果。要正確地完成上述步驟,須要解決如下幾個問題:多線程
io.netty.util.concurrent.DefaultPromise是Future的默認實現,以上三個問題的答案都能在這個類的代碼中找到。併發
下面是DefaultPromis及其父類,接口的聲明:異步
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> public abstract class AbstractFuture<V> implements Future<V> public interface Promise<V> extends Future<V> public interface Future<V> extends java.util.concurrent.Future<V>
能夠看出,DefaultPromise派生自AbstractFuture類,並實現了Promise接口。抽象類型AbstractFuture派生自Future, 接口Promise派生自Future。Future派生自JDK的Future接口。ide
和JDK的Future相比,Netty的Future接口增長一些本身的方法:oop
/** 當操做成功時返回true*/ boolean isSuccess(); /**
只有當操做能夠被取消時返回true
*/ boolean isCancellable(); /** 返回操做的異常*/ Throwable cause(); /** 添加一個監聽器到future。當操做完成(成功或失敗都算完成,此事isDone()返回true)時, 會通知這個監聽器。若是添加時操做已經完成,
這個監聽器會當即被通知。*/ Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); /** 和上個方法同樣,能夠同時添加多個監聽器*/ Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); /** 刪除指定的監聽器, 若是這個監聽器還沒被通知的話。*/ Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); /** 功能和上個方法同樣,能夠同時刪除多個監聽器。*/ Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); /** 同步等待直到操做完成。會被打斷。 */ Future<V> sync() throws InterruptedException; /** 同步等着知道操做完成。不會被打斷。 */ Future<V> syncUninterruptibly(); /** 同sync*/ Future<V> await() throws InterruptedException; /** 同synUniterruptibliy*/ Future<V> awaitUninterruptibly(); /** 等待,直到操做完成或超過指定的時間。會被打斷。*/ boolean await(long timeout, TimeUnit unit) throws InterruptedException; /** 同上*/ boolean await(long timeoutMillis) throws InterruptedException; /** 同上,不會被打斷。*/ boolean awaitUninterruptibly(long timeout, TimeUnit unit); /** 同上。*/ boolean awaitUninterruptibly(long timeoutMillis); /** 當即獲得結果,不會阻塞。若是操做沒有完成或沒有成功,返回null*/ V getNow();
Netty的Future最大的特色是增長了Listener被動接收任務完成通知,下面是兩個Listener接口的定義:this
public interface GenericFutureListener<F extends Future<?>> extends EventListener { void operationComplete(F future) throws Exception; } public interface GenericProgressiveFutureListener<F extends ProgressiveFuture<?>> extends GenericFutureListener<F> { void operationProgressed(F future, long progress, long total) throws Exception; }
把一個listener添加到future以後。當異步操做完成以後,listener會被通知一次,同時會回調operationComplete方法。參數future是當前通知的future,這意味這,一個listener能夠被添加到多個future中。spa
當異步操做進度發送變化時,listener會被通知,同時會回調operationProgressed方法。progress是當前進度,total是總進度。progress==total表示操做完成。若是不知道什麼時候完成操做progress=-1。.net
Promise定義的方法:
/** 設置結果。把這個future設置爲success,通知全部的listener,
若是這個future已是success或failed(操做已經完成),會拋出IllegalStateException
*/ Promise<V> setSuccess(V result); /**
同上。只有在操做沒有完成的時候纔會生效,且會返回true */ boolean trySuccess(V result); /** 設置異常。把這個future設置爲failed狀態,通知全部的listener.
若是這個future已經完成,會拋出IllegalStateException */ Promise<V> setFailure(Throwable cause); /** 同上。只有在操做沒有完成時纔會生效,且返回ture */ boolean tryFailure(Throwable cause); /** 設置當前前future的操做不能被取消。這個future沒有完成且能夠設置成功或這個future已經完成,返回true。不然返回false */ boolean setUncancellable();
volatile Object result;
異步操做的結果。能夠經過它的值知道當前future的狀態。
final EventExecutor executor;
通知listener的線程。
Object listeners;
維護添加到當前future的listener對象。
short waiters;
記錄當前真正等待結果的線程數量。
boolean notifyingListeners;
是否正在通知listener,防止多線程併發執行通知操做。
future有4種狀態: 未完成, 未完成-不能取消,完成-成功,完成-失敗。使用isDone()判斷是否完成,它代碼以下:
1 @Override 2 public boolean isDone() { 3 return isDone0(result); 4 } 5 6 private static boolean isDone0(Object result) { 7 return result != null && result != UNCANCELLABLE; 8 }
第7行是判斷當前完成狀態的。result != null 且 result != UNCANCELLABLE,表示處於完成狀態。
result默認是null, 此時future處於未完成狀態。可使用setUncancellable方法把它設置成爲完成-不能取消狀態。
1 @Override 2 public boolean setUncancellable() { 3 if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) { 4 return true; 5 } 6 Object result = this.result; 7 return !isDone0(result) || !isCancelled0(result); 8 }
第3行,使用原子操做設置result的值,只有result==null時才能把result設置成UNCANCELLABLE。當result==UNCANCELLABLE時,不容許取消異步操做。
使用isSuccess方法判斷future是否處於完成-成功狀態。
1 @Override 2 public boolean isSuccess() { 3 Object result = this.result; 4 return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder); 5 }
第4行是完成-成功狀態result的取值:除null, UNCANCELLABLE和CauseHolder對象的任何值。
只有知足isDone() && !isSuccess()時,future處於完成失敗狀態,可使用cause方法獲取異常。
調用setSuccess和trySuccess方法,可以把狀態轉換成完成-成功。
1 @Override 2 public Promise<V> setSuccess(V result) { 3 if (setSuccess0(result)) { 4 notifyListeners(); 5 return this; 6 } 7 throw new IllegalStateException("complete already: " + this); 8 } 9 10 private boolean setSuccess0(V result) { 11 return setValue0(result == null ? SUCCESS : result); 12 }
第3行嘗試把狀態設置成完成-成功狀態。若是能夠,在第4行通知全部的listener。不然第7行拋出錯誤。第11行給出了成功的默認值SUCCESS。trySuccess少了第7行,不會拋出異常。
調用setFailure和tryFailure方法,可以包狀態轉換成完成-失敗狀態。
1 @Override 2 public Promise<V> setFailure(Throwable cause) { 3 if (setFailure0(cause)) { 4 notifyListeners(); 5 return this; 6 } 7 throw new IllegalStateException("complete already: " + this, cause); 8 } 9 10 private boolean setFailure0(Throwable cause) { 11 return setValue0(new CauseHolder(checkNotNull(cause, "cause"))); 12 }
第3行嘗試把專題設置成完成-失敗狀態。若是能夠,在第4行通知全部listener。不然在第7行拋出異常。第11行把異常包裝成CauseHolder對象。tryFailure少了第7行,不會拋出異常。
當異步操做完成時,調用Promise提供的setSuccess和trySuccess設置成功的結果,調用setFailure和tryFailure設置異常結果。不論什麼結果,都會使用setValue0方法保存到result屬性上。
1 private boolean setValue0(Object objResult) { 2 if (RESULT_UPDATER.compareAndSet(this, null, objResult) || 3 RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { 4 checkNotifyWaiters(); 5 return true; 6 } 7 return false; 8 }
第2,3行,使用原子操做設置result的值,只有result==null或result==UNCANCELLABLE時,才能設置成功。若是設置成功,在第4行喚醒全部等待中的線程。可使用get方法獲得result值。若是isSucess()==true, result的值是SUCCESS或異步操做的結果。不然result的值是CauseHolder對象,此時能夠調用cause方法獲得異常對象。
使用get或cause,只有在異步操做完成後才能順利獲得結果。可使用listener,被動等待操做完成通知。
Future的listener是必須實現GenericFutureListener接口,調用方法能夠在operationComplete方法中處理異步操做的結果。
listeners屬性用來保存使用addListener,addListeners方法添加到future的listener。listeners可能使用一個GenericFutureListener對象,也多是一個GenericFutureListener數組。全部添加listener方法都會調用addListener0方法添加listener。
1 private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) { 2 if (listeners == null) { 3 listeners = listener; 4 } else if (listeners instanceof DefaultFutureListeners) { 5 ((DefaultFutureListeners) listeners).add(listener); 6 } else { 7 listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener); 8 } 9 }
這段代碼中使用了一個DefaultFutureListeners類,它內部維護了一個GenericFutureListener數組。
當一次操做完成時,會調用notifyListeners方法通知listeners中全部的listener,並調用listener的operationComplete方法。只有當isDone()==true時纔會調用notifyListeners方法。觸發點在下面的一些方法中:
addListener, addListeners。
setSuccess, trySuccess。
setFailure, tryFailure。
notifyListeners的代碼以下:
1 private void notifyListeners() { 2 EventExecutor executor = executor(); 3 if (executor.inEventLoop()) { 4 final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); 5 final int stackDepth = threadLocals.futureListenerStackDepth(); 6 if (stackDepth < MAX_LISTENER_STACK_DEPTH) { 7 threadLocals.setFutureListenerStackDepth(stackDepth + 1); 8 try { 9 notifyListenersNow(); 10 } finally { 11 threadLocals.setFutureListenerStackDepth(stackDepth); 12 } 13 return; 14 } 15 } 16 17 safeExecute(executor, new Runnable() { 18 @Override 19 public void run() { 20 notifyListenersNow(); 21 } 22 }); 23 }
這段代碼的做用是調用notifyListenersNow。若是當前線程就是executor的線程,在第9行直接調用notifyListenerNow,不然在第20行,把notifyListnerNow放在executor中執行。第4-7行和11行的做用是防止遞歸調用致使線程棧溢出,MAX_LISTENER_STACK_DEPTH就是listener遞歸調用的最大深度。
notifyListenerNow的做用是,確保沒有併發執行notifyListener0或notifyListners0方法,且全部的listener只能被通知一次。
1 private void notifyListenersNow() { 2 Object listeners; 3 synchronized (this) { 4 // Only proceed if there are listeners to notify and we are not already notifying listeners. 5 if (notifyingListeners || this.listeners == null) { 6 return; 7 } 8 notifyingListeners = true; 9 listeners = this.listeners; 10 this.listeners = null; 11 } 12 for (;;) { 13 if (listeners instanceof DefaultFutureListeners) { 14 notifyListeners0((DefaultFutureListeners) listeners); 15 } else { 16 notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners); 17 } 18 synchronized (this) { 19 if (this.listeners == null) { 20 // Nothing can throw from within this method, so setting notifyingListeners back to false does not 21 // need to be in a finally block. 22 notifyingListeners = false; 23 return; 24 } 25 listeners = this.listeners; 26 this.listeners = null; 27 } 28 } 29 }
第3-11行的做用是防止多個線程併發執行11行以後的代碼。
結合第5,9,10行可知, listeners中的全部listener只能被通知一次。
13-17行,通知全部listeners。notifyListener0通知一個listener,notifyListeners0通知全部的listener。
最後,18-27行,檢查在通知listeners的過程當中,是否有新的listener被添加進來。若是有,25,26行獲得全部新添加的listener並清空listeners屬性,13-17行繼續通知新添加的listener。不然,運行22,23行結束通知過程。
1 private void notifyListeners0(DefaultFutureListeners listeners) { 2 GenericFutureListener<?>[] a = listeners.listeners(); 3 int size = listeners.size(); 4 for (int i = 0; i < size; i ++) { 5 notifyListener0(this, a[i]); 6 } 7 } 8 9 @SuppressWarnings({ "unchecked", "rawtypes" }) 10 private static void notifyListener0(Future future, GenericFutureListener l) { 11 try { 12 l.operationComplete(future); 13 } catch (Throwable t) { 14 logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t); 15 } 16 }
1-7行,notifyListeners0對每一個listener調用一次notifyListener0,參數是當前的future。
10-16,調用listener的operationComplete方法,捕獲了全部的異常,確保接下來能夠繼續通知下一個listener。
可使用一系列的await,awaitXXX方法同步等待結果。這些方法能夠分爲: 能被打斷的,不能被打斷的。一直等待的,有超時時間的。await0方法是最複雜的等待實現,全部帶超時時間的await方法都會調用它。
1 private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException { 2 if (isDone()) { 3 return true; 4 } 5 6 if (timeoutNanos <= 0) { 7 return isDone(); 8 } 9 10 if (interruptable && Thread.interrupted()) { 11 throw new InterruptedException(toString()); 12 } 13 14 checkDeadLock(); 15 16 long startTime = System.nanoTime(); 17 long waitTime = timeoutNanos; 18 boolean interrupted = false; 19 try { 20 for (;;) { 21 synchronized (this) { 22 if (isDone()) { 23 return true; 24 } 25 incWaiters(); 26 try { 27 wait(waitTime / 1000000, (int) (waitTime % 1000000)); 28 } catch (InterruptedException e) { 29 if (interruptable) { 30 throw e; 31 } else { 32 interrupted = true; 33 } 34 } finally { 35 decWaiters(); 36 } 37 } 38 if (isDone()) { 39 return true; 40 } else { 41 waitTime = timeoutNanos - (System.nanoTime() - startTime); 42 if (waitTime <= 0) { 43 return isDone(); 44 } 45 } 46 } 47 } finally { 48 if (interrupted) { 49 Thread.currentThread().interrupt(); 50 } 51 } 52 }
這個方法返回的條件有: (1)isDone()==true;(2)容許被打斷(interrupted==true)的狀況下被打斷;(3)已經超時。2-12行分別檢查了這3種狀況。
25,35行管理waiters屬性,這個屬性用來記錄當前正在等待的線程數。inWaiters方法正常狀況下會把waiters加1,當檢查到waiters==Short.MAX_VALUE時會拋出異常,防止過多的線程等待。
27行,調用wait等待,經歷waitTime後超時返回。在等待過程當中,會被setValue0方法調用notifyAll喚醒。
29-33行,處理被打斷的異常,若是運行被打斷,在30行拋出這個異常返回。
38-45行,不論什麼緣由線程被喚醒,檢查是否知足返回條件,若是不知足,繼續循環等待。
沒有超時的wait方法實現要簡單一些,只需判讀返回條件(1)(2)。
若是想要跟蹤異步操做的執行進度,future須要換成DefaultProgressivePromise對象,listener須要換成GenericProgressiveFutureListener類型。DefaultProgressivePromise派生自DefaultPromise同時實現了ProgressivePromise接口。GenericProgressiveFutureListener接口派生自GenericFutureListener接口。
ProgressivePromise定義了setProgress和tryProgress方法用來更新進度,是否是很眼熟,和Promise接口定義返回結果的方法很相似。
ProgressivePromise<V> setProgress(long progress, long total); boolean tryProgress(long progress, long total);
GenericProgressiveFutureListener定義了operationProgressed方法用來處理進度更新通知。
void operationProgressed(F future, long progress, long total) throws Exception;
DefaultProgressivePromise本身只實現了setProgress和tryProgress方法,其它都是複用了DefaultPromise的實現。
1 @Override 2 public ProgressivePromise<V> setProgress(long progress, long total) { 3 if (total < 0) { 4 // total unknown 5 total = -1; // normalize 6 if (progress < 0) { 7 throw new IllegalArgumentException("progress: " + progress + " (expected: >= 0)"); 8 } 9 } else if (progress < 0 || progress > total) { 10 throw new IllegalArgumentException( 11 "progress: " + progress + " (expected: 0 <= progress <= total (" + total + "))"); 12 } 13 14 if (isDone()) { 15 throw new IllegalStateException("complete already"); 16 } 17 18 notifyProgressiveListeners(progress, total); 19 return this; 20 }
3-12行,檢查progress和total的合法性。
14行,如isDone()==true,拋出異常。只有在操做還沒完成的是否更新進度纔有意義。
18行,調用notifyProgressiveListeners觸發進度更新通知,這個方法在DefaultPromise中實現。
notifyProgressiveListeners實現了觸發進度更新通知的主要流程:
1 void notifyProgressiveListeners(final long progress, final long total) { 2 final Object listeners = progressiveListeners(); 3 if (listeners == null) { 4 return; 5 } 6 7 final ProgressiveFuture<V> self = (ProgressiveFuture<V>) this; 8 9 EventExecutor executor = executor(); 10 if (executor.inEventLoop()) { 11 if (listeners instanceof GenericProgressiveFutureListener[]) { 12 notifyProgressiveListeners0( 13 self, (GenericProgressiveFutureListener<?>[]) listeners, progress, total); 14 } else { 15 notifyProgressiveListener0( 16 self, (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners, progress, total); 17 } 18 } else { 19 if (listeners instanceof GenericProgressiveFutureListener[]) { 20 final GenericProgressiveFutureListener<?>[] array = 21 (GenericProgressiveFutureListener<?>[]) listeners; 22 safeExecute(executor, new Runnable() { 23 @Override 24 public void run() { 25 notifyProgressiveListeners0(self, array, progress, total); 26 } 27 }); 28 } else { 29 final GenericProgressiveFutureListener<ProgressiveFuture<V>> l = 30 (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners; 31 safeExecute(executor, new Runnable() { 32 @Override 33 public void run() { 34 notifyProgressiveListener0(self, l, progress, total); 35 } 36 }); 37 } 38 } 39 }
第3行,從listeners中選出GenericProgressiveFutureListener類型的listener。
10-38行。調用notifyProgressiveListeners0, notifyProgressiveListener0通知進度跟新。11-17行,在當前線程中調用。
19-37行,在executor中調用。notifyProgressiveListener0只是簡單地調用listener的operationProgressed方法。notifyProgressiveListeners0是對每一個listener調用一次notifyProgressiveListener0。
和完成通知相比,進度更新通知要更加簡單。進度更新通知沒有處理併發問題,沒有處理棧溢出問題。