Future是Java執行異步任務時的經常使用接口。咱們一般會往ExecutorService中提交一個Callable/Runnable並獲得一個Future對象,Future對象表示異步計算的結果,支持獲取結果,取消計算等操做。在Java提供的Executor框架中,Future的默認實現爲java.util.concurrent.FutureTask。本文針對FutureTask的源碼進行分析與解讀。java
能夠看到,FutureTask實現了RunnableFuture, 而RunnableFuture的JavaDoc對Runnable接口的run方法有了更精確的描述:run方法將該Future設置爲計算的結果,除非計算被取消。node
下面開始針對FutureTask的實現源碼進行解讀。算法
FutureTask內置一個被volatile修飾的state變量。
按照生命週期的階段能夠分爲:併發
這裏先給出自制的狀態流轉圖。
能夠看到NEW爲起始狀態,而NORMAL, EXCEPTIONAL, CANCELLED, INTERRUPTED這些狀態爲終止狀態,而COMPLETING和INTERRUPTING爲中間暫時狀態。框架
public void run() { /* * state爲NEW且對runner變量CAS成功。 * 對state的判斷寫在前面,是一種優化。 */ if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; /* * 是否成功運行。 * 之因此用了這樣一個標誌位,而不是把set方法寫在try中call調用的後一句, * 是爲了避免想捕獲set方法出現的異常。 * * 舉例來講,子類覆蓋了FutureTask的done方法, * set -> finishCompletion -> done會拋出異常, * 然而實際上提交的任務是有正常的結果的。 */ boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { /* * * 要清楚,即使在runner被清爲null後,仍然有可能有線程會進入到run方法的外層try塊。 * 舉例:線程A和B都在執行第一行的if語句讀到state == NEW,線程A成功cas了runner,並執行到此處。 * 在此過程當中線程B都沒拿到CPU時間片。此時線程B一旦拿到時間片就能進到外層try塊。 * * 爲了不線程B重複執行任務,必須在set/setException方法被調用,才能把runner清爲null。 * 這時候其餘線程即使進入到了外層try塊,也必定可以讀到state != NEW,從而避免任務重複執行。 */ runner = null; /* * 由於任務執行過程當中因爲cancel方法的調用,狀態爲INTERRUPTING, * 令當前線程等待INTERRUPTING狀態變爲INTERRUPTED。 * 這是爲了避免想讓中斷操做逃逸出run方法以致於線程在執行後續操做時被中斷。 */ int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } protected void set(V v) { // 經過CAS狀態來確認計算沒有被取消,結果也沒有被設置過。 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } } protected void setException(Throwable t) { // 經過CAS狀態來確認計算沒有被取消,結果也沒有被設置過。 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } } private void finishCompletion() { for (WaitNode q; (q = waiters) != null;) { // 必須將棧頂CAS爲null,不然重讀棧頂並重試。 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { // 遍歷並喚醒棧中節點對應的線程。 for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; // 將next域置爲null,這樣對GC友好。 q.next = null; q = next; } break; } } /* * done方法是暴露給子類的一個鉤子方法。 * * 這個方法在ExecutorCompletionService.QueueingFuture中的override實現是把結果加到阻塞隊列裏。 * CompletionService誰用誰知道,奧祕全在這。 */ done(); /* * callable置爲null主要爲了減小內存開銷, * 更多能夠了解JVM memory footprint相關資料。 */ callable = null; } private void handlePossibleCancellationInterrupt(int s) { /* * 這裏的主要目的就是等調用cancel方法的線程完成中斷。 * * 以防止中斷操做逃逸出run或者runAndReset方法,影響後續操做。 * * 實際上,當前調用cancel方法的線程不必定可以中斷到本線程。 * 有可能cancel方法裏讀到runner是null,甚至有多是其它併發調用run/runAndReset方法的線程。 * 可是也沒辦法判斷另外一個線程在cancel方法中讀到的runner究竟是什麼,因此索性自旋讓出CPU時間片也沒事。 */ if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); /* * 下面的代碼在JDK8中已經被註釋掉了。 * 由於在原來的設計中,是想把cancel方法設置的中斷位給清除的。 * 可是實際上也應該容許調用FutureTask的線程使用中斷做爲線程間通訊的機制, * 這裏沒辦法區分中斷位究竟是不是來自cancel方法的調用。 */ // assert state == INTERRUPTED; // We want to clear any interrupt we may have received from // cancel(true). However, it is permissible to use interrupts // as an independent mechanism for a task to communicate with // its caller, and there is no way to clear only the // cancellation interrupt. // // Thread.interrupted(); }
public V get() throws InterruptedException, ExecutionException { int s = state; // NEW或者COMPLETING。 if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; // 完成賦值 if (s > COMPLETING) { // 若是q已經被初始化了,爲了GC須要清q.thread。 if (q != null) q.thread = null; return s; } // COMPLETING是一個很短暫的狀態,調用Thread.yield指望讓出時間片,以後重試循環。 else if (s == COMPLETING) Thread.yield(); // 初始化節點,重試一次循環。 else if (q == null) q = new WaitNode(); // queued記錄是否已經入棧,此處準備將節點壓棧。 else if (!queued) /* * 這是Treiber Stack算法入棧的邏輯。 * Treiber Stack是一個基於CAS的無鎖併發棧實現, * 更多能夠參考https://en.wikipedia.org/wiki/Treiber_Stack */ queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 若是有時限,判斷是否超時,未超時則park剩下的時間。 else if (timed) { nanos = deadline - System.nanoTime(); // 超時,移除棧中節點。 if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } } /** * 清理用於保存等待線程棧裏的節點。 * 所謂節點無效就是內部的thread爲null, * 通常有如下幾種狀況: * 1. 節點調用get超時。 * 2. 節點調用get中斷。 * 3. 節點調用get拿到task的狀態值(> COMPLETING)。 * * 此方法幹了兩件事情: * 1. 置標記參數node的thread爲null。 * 2. 清理棧中的無效節點。 * * 若是在遍歷過程當中發現有競爭則從新遍歷棧。 */ private void removeWaiter(WaitNode node) { if (node != null) { node.thread = null; retry: for (;;) { // restart on removeWaiter race for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; // 若是當前節點仍有效,則置pred爲當前節點,繼續遍歷。 if (q.thread != null) pred = q; /* * 當前節點已無效且有前驅,則將前驅的後繼置爲當前節點的後繼實現刪除節點。 * 若是前驅節點已無效,則從新遍歷waiters棧。 */ else if (pred != null) { pred.next = s; if (pred.thread == null) continue retry; } /* * 當前節點已無效,且當前節點沒有前驅,則將棧頂置爲當前節點的後繼。 * 失敗的話從新遍歷waiters棧。 */ else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } } } /** * 導出結果。 */ private V report(int s) throws ExecutionException { Object x = outcome; // 正常執行完計算任務。 if (s == NORMAL) return (V)x; // 取消。 if (s >= CANCELLED) throw new CancellationException(); // 執行計算任務時發生異常。 throw new ExecutionException((Throwable)x); }
public boolean cancel(boolean mayInterruptIfRunning) { /* * 在狀態還爲NEW的時候,根據參數中的是否容許傳遞, * 將狀態流轉到INTERRUPTING或者CANCELLED。 */ if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { if (mayInterruptIfRunning) { try { // 中斷runner線程。 Thread t = runner; if (t != null) t.interrupt(); } finally { UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { // 該方法上文已經分析過。 finishCompletion(); } return true; }
至此已經將FutureTask的源碼解讀分析完畢,在讀過源碼以後,我我的認爲JDK8u111的FutureTask源碼存在兩個問題,目前還須要進一步確認。異步
FutureTask的run方法的進入條件是ide
state == NEW && UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
假設有兩個線程A和B調用run方法,線程C調用cancel方法。
時刻1: 線程A和B同時讀到state == NEW。
時刻2: 線程A成功對runner變量CAS進入run方法主體。
時刻3: 線程C調用cancel方法,成功將狀態CAS爲CANCELLED。
時刻4: 線程A調用finally中的runner = null。
時刻5: 線程B開始執行run方法第一句if的後半句,成功將runner變量CAS到線程B。
時刻6: 線程C讀到runner爲線程B,準備對線程B進行interrupt()
時刻7: 線程A調用handlePossibleCancellationInterrupt等待狀態從INTERRUPTING流轉至INTERRUPTED。
時刻8: 線程B被中斷。函數
這裏的問題是,調用cancel方法的線程C中斷的是實質上沒有對callable進行call調用的線程B,而線程A還試圖防止中斷操做逃逸出run方法。
這個東西在Future的JavaDoc上說了很含糊,以下所示:優化
* @param mayInterruptIfRunning {@code true} if the thread executing this * task should be interrupted; otherwise, in-progress tasks are allowed * to complete
上面的狀況到底線程A和B哪一個算是the thread executing this task
說不清。this
經過閱讀源碼,發現FutureTask仍是存在一個隱形的內存佔用問題的,或者按照《Effective Java》上說的應該叫無心識的對象保留。
這個問題就是在FutureTask計算完成後,可能內部用於保存等待線程的棧留有一些已經無用的等待節點。
時刻1: 某線程調用get,已經入等待棧,此時waiters爲該線程對應節點。
時刻2: 有大量線程經過調用get試圖獲取計算結果,get -> awaitDone方法中,通過兩輪循環都讀到狀態是NEW的話,此時它們節點已經被初始化過了,但還沒開始入隊。
時刻3: 有線程調用run方法,經過run -> set -> finishCompletion,將waiters置爲null,並喚醒了已經入棧的那個線程。
時刻4: 調用awaitDone方法的那些線程再試圖入隊的話,後面循環會發現狀態已是NORMAL了,可是waiters棧此時不爲空,並且再也無法被清掉了。
這樣下來,該FutureTask內部可能會留有一些的無效節點。具體會留多少實際上取決於那個瞬間有多少線程準備執行以及多少可以成功CAS。
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);