JDK5 以後提供了 Callable 和 Future 接口,經過它們就能夠在任務執行完畢以後獲得任務的執行結果。本文從源代碼角度分析下具體的實現原理。html
對於須要執行的任務須要實現 Callable 接口,Callable 接口定義以下:java
public interface Callable<V> { V call() throws Exception; }
能夠看到 Callable 是個泛型接口,泛型 V 就是要 call() 方法返回的類型。Callable 接口和 Runnable 接口很像,均可以被另一個線程執行,可是正如前面所說的,Runnable 不會返回數據也不能拋出異常。node
Future 接口表明異步計算的結果,經過 Future 接口提供的方法能夠查看異步計算是否執行完成,或者等待執行結果並獲取執行結果,同時還能夠取消執行。Future 接口的定義以下:算法
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
Future 只是一個接口,不能直接用來建立對象,FutureTask 是 Future 的實現類,FutureTask 的結構以下:編程
FutureTask -> FutureRunnable -> Future/Runnable
能夠看到,FutureTask 實現了 RunnableFuture 接口,則 RunnableFuture 接口繼承了 Runnable 接口和 Future 接口,因此 FutureTask 既能當作一個 Runnable 直接被 Thread 執行,也能做爲 Future 用來獲得 Callable 的計算結果。segmentfault
private volatile int state; private static final int NEW = 0; // 初始狀態 private static final int COMPLETING = 1; // 任務已經執行完(正常或者異常),準備賦值結果 private static final int NORMAL = 2; // 任務已經正常執行完,並已將任務返回值賦值到結果 private static final int EXCEPTIONAL = 3; // 任務執行失敗,並將異常賦值到結果 private static final int CANCELLED = 4; // 取消 private static final int INTERRUPTING = 5; // 準備嘗試中斷執行任務的線程 private static final int INTERRUPTED = 6; // 對執行任務的線程進行中斷(未必中斷到)
FutureTask 狀態變化如圖:安全
能夠看到 NEW 爲起始狀態,而 NORMAL, EXCEPTIONAL, CANCELLED, INTERRUPTED 這些狀態爲終止狀態,而 COMPLETING 和 INTERRUPTING 爲中間暫時狀態。爲何要引入 COMPLETING 和 INTERRUPTING 爲兩個中間狀態呢?COMPLETING -> INTERRUPTED 中間有賦值 outcome=v 的過程,INTERRUPTING -> INTERRUPTED 有 t.interrupt 過程。這個過程要保證沒有其它線程干擾。(我的理解)併發
有一點須要注意的是,全部值大於 COMPLETING 的狀態都表示任務已經執行完成(任務正常執行完成,任務執行異常或者任務被取消)。異步
// 內部封裝的 Callable 對象。若是是 Runnable 則會經過 Executors#callable 封裝成 Callable private Callable<V> callable; private Object outcome; // 用於保存計算結果或者異常信息。 private volatile Thread runner; // 用來運行 Callable 的線程 private volatile WaitNode waiters; // FutureTask中用了 Trieber Stack 來保存等待的線程 // 這個隊列使用 Treiber stack(能夠理解爲基於 CAS 的無鎖的棧,先進後出)
FutureTask 用了 Treiber Stack 來維護等待任務完成的線程,在 FutureTask 的任務完成/取消/異常後在 finishCompletion 鉤子方法中會喚醒棧中等待的線程。函數
先從 FutureTask 的構造函數看起:
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
能夠看到,waiters 初始值爲 null,初始狀態爲 NEW。
通常發起任務線程跟執行任務線程一般狀況下都不會是同一個線程,在任務執行線程(通常爲線程池)執行任務的時候,任務發起線程能夠查看任務執行狀態、獲取任務執行結果、取消任務等等操做,接下來分析下這些操做。
executor.submit(task) 將任務提交到線程池中後,毫無疑問 FutureTask 會對原有的 run 方法進行包裝,當 run 方法執行完成後設置結果值,並喚醒全部等待的線程。
/** * run 方法執行有兩個條件:1. state=NEW; 2. runner=null * 1. 執行前 state=NEW & runner=null * 2. 執行中 state=NEW & runner=Thread.currentThread() * 3. 執行後 state!=NEW & runner=null,根據是否有異常執行 set(result) 或 setException(ex), * set/setException 都會更新 state 狀態,以後線程的狀態就不是 NEW * 所以,多個線程同時調用 run 方法的狀況 callable 也只會執行一次 */ public void run() { // 1. state爲 NEW 且對 runner 變量 CAS 成功。 對 state 的判斷寫在前面,是一種優化。 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; // 2. 是否成功運行的標誌位,而不是把 set 方法寫在 try 中,是爲了避免想捕獲 set 的異常。 // 好比:子類覆寫 FutureTask#done 方法(set->finishCompletion >done) 拋出異常, // 然而實際上提交的任務是有正常的結果的。 boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // 3. 若是A/B兩個線程同時執行到步驟①,當A線程從新將runner設置爲 null 時,B線程是否可能會從新執行呢? // 實際上A線程一旦已經調用 set/setException 方法,整個流程已經結束了,因此不可能重複執行 runner = null; // 4. 等待調用 cancel(true) 的線程完成中斷,防止中斷操做逃逸出 run 或者 runAndReset 方法 // 以致於線程在執行後續操做時被中斷 int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
總結: run 方法是對 callable 進行了一層包裝,在執行完成後設置返回結果,並喚醒全部等待的線程。
// 設置返回結果,喚醒等待線程。思考爲何須要 COMPLETING 這個中間狀態 protected void set(V v) { // 可能任務已經取消,state 的狀態就再也不是 NEW if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
總結: 根據前面的分析,不論是任務執行異常仍是任務正常執行完畢,或者取消任務,最後都會調用 finishCompletion() 方法,該方法實現以下:
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { // 1. 必須將棧頂 CAS 爲 null,不然重讀棧頂並重試。 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { // 2. 遍歷並喚醒棧中節點對應的線程。 for (;;) { Thread t = q.thread; // 從頭節點開始喚醒全部的等待線程 if (t != null) { q.thread = null; LockSupport.unpark(t); // 喚醒等待線程 } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } // 3. ExecutorCompletionService#QueueingFuture 中把結果加到阻塞隊列裏。 // CompletionService 誰用誰知道,奧祕全在這。 done(); // 子類覆蓋 // 4. callable 置爲 null 主要爲了減小內存開銷,更多能夠了解 JVM memory footprint 相關資料 callable = null; // to reduce footprint }
總結: 這個方法的實現比較簡單,依次遍歷 waiters 鏈表,喚醒節點中的線程,而後把 callable 置空。被喚醒的線程會各自從 awaitDone() 方法中的 LockSupport.park() 阻塞中返回,而後會進行新一輪的循環。在新一輪的循環中會返回執行結果或者更確切的說是返回任務的狀態。
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) // NEW 或 COMPLETING s = awaitDone(false, 0L); return report(s); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); // 超時了,任務還未執行結束 return report(s); }
總結: 若是任務還未執行結束就調用 awaitDone 阻塞當前線程,這裏的結束包括任務正常執行完畢,任務執行異常,任務被取消。一旦任務執行結束則調用 report() 返回結果。
// report 根據任務最終的狀態,返回結果或拋出異常 private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) // 1. 正常執行完計算任務 return (V)x; if (s >= CANCELLED) // 2. 取消 throw new CancellationException(); throw new ExecutionException((Throwable)x); // 3. 異常 }
當調用 get() 獲取任務結果可是任務還沒執行完成的時候,調用線程會調用 awaitDone() 方法進行阻塞等待,該方法定義以下:
// 阻塞等待線程執行完成(正常、異常、取消)後返回 private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { // 1. 調用 get 的線程是否被其餘線程中斷 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; // 2. 任務已經執行完成,不管是正常、異常、取消 if (s > COMPLETING) { // 已經執行完成 if (q != null) // help GC q.thread = null; return s; // 3. 結果正在賦值,讓出 CPU 等待 } else if (s == COMPLETING) // cannot time out yet Thread.yield(); // 4. 初始化節點,重試一次循環 else if (q == null) q = new WaitNode(); // 5. queued 記錄是否已經入棧,此處準備將節點壓棧 // 節點入隊失敗,自旋直至成功 else if (!queued) // 這是 Treiber Stack算法入棧的邏輯。 Treiber Stack 是一個基於 CAS 的無鎖併發棧實現 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 6. 掛起線程 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); // 超時直接刪除節點後返回 return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); // 只有任務取消(cancel)和任務執行結束(run),才能喚醒線程 // 此時再一次for循環時state的狀態確定大於COMPLETING } }
總結: awaitDone 有如下幾種狀況:
/** * 清理用於保存等待線程棧裏的無效節點,所謂節點無效就是內部的 thread 爲 null(類比 ThreadLocalMap) * * 通常有如下幾種狀況: * 1. 節點調用 get 超時。 * 2. 節點調用 get 中斷。 * 3. 節點調用 get 拿到 task 的狀態值(> COMPLETING)。 * * 此方法幹了兩件事情: * 1. 置標記參數 node 的 thread 爲 null * 2. 清理棧中的無效節點 * * 若是在遍歷過程當中發現有競爭則從新遍歷棧。 */ private void removeWaiter(WaitNode node) { if (node != null) { node.thread = null; retry: for (;;) { // restart on removeWaiter race // pre(前繼節點) -> current(當前節點) -> next(後繼節點),對應下面的 pre -> q -> s // 1.1 q.thread!=null 則更新 pre 節點繼續遍歷 // 1.2 pre!=null && q.thread==null 時 current 不時頭節點,直接刪除 current // 1.3 pre==null && q.thread==null 時 current 是頭節點,更新頭節點爲 next for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; // 1.1 保存當前節點 q 的前繼節點 pre if (q.thread != null) pred = q; // 1.2 q 節點已經失效,此時根據 q 是不是頭節點分兩種狀況 // q 不是頭節點,直接刪除,爲何不須要原子性操做? else if (pred != null) { pred.next = s; // 踢除當前節點 q if (pred.thread == null) // check for race continue retry; // 1.3 q 是頭節點且失效了,原子性更新頭節點 } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } } }
總結: 刪除節點時,執行到 1.2 時直接刪除節點,沒有使用 CAS 原子性操做,難道是線程安全的,仍是說即便出現不一致的狀況,也不影響最終的結果?
/** * mayInterruptIfRunning:false 時不容許在線程運行時中斷,true 時容許運行時中斷線程,但不保證必定會中斷線程。 * 1. true 時,將狀態修改爲 INTERRUPTING,執行 thread.interrupt() * 2. false 時,將狀態修改爲 CANCELLED */ public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); // interrupt 不必定能中斷線程 } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); // 喚醒全部等待線程 } return true; }
總結: cancel 方法會作下面幾件事:
修改任務的狀態。須要注意的是參數名稱既然叫 mayInterruptIfRunning 則說明可能沒法經過 interrupt 中止任務。要看業務線程對 interrupt 是否響應。以下面的代碼將 while 條件改成 true 後,調用 cancel 根本沒法中斷任務。
// !Thread.interrupted() 改成 true 後沒法中斷任務 Future<?> future = executorService.submit(() -> { while (!Thread.interrupted()) System.out.println("1"); }); future.cancel(true);
調用 finishCompletion 喚醒全部的等待任務結果的線程。
與 JDK6 版本不一樣,JDK7 的 FutureTask 再也不基於 AQS 來構建,而是在內部採用簡單的 Treiber Stack 來保存等待線程。
executor.submit(task1).cancel(true); executor.submit(task2);
看上面的代碼,雖然中斷的是 task1,但可能 task2 獲得中斷信號。參考 FutureTask jdk6不一樣的區別
(1) 線程池使用FutureTask
線程池使用 FutureTask 的時候若是拒絕策略設置爲了 DiscardPolicy 和 DiscardOldestPolicy 而且在被拒絕的任務的 Future 對象上調用無參 get 方法那麼調用線程會一直被阻塞。因此當使用 Future 的時候,儘可能使用帶超時時間的 get 方法。參考線程池使用FutureTask時候須要注意的一點事
Treiber Stack 在 R. Kent Treiber 在 1986 年的論文 Systems Programming: Coping with Parallelism 中首次出現。它是一種無鎖併發棧,其無鎖的特性是基於 CAS 原子操做實現的。具體實現參考《Java併發編程實戰》一書的 15.4.1 小結中的實現。
參考:
天天用心記錄一點點。內容也許不重要,但習慣很重要!