能夠看到,FutureTask實現了RunnableFuture, 而RunnableFuture的JavaDoc對Runnable接口的run方法有了更精確的描述:run方法將該Future設置爲計算的結果,除非計算被取消。node
public void run() { /* * state爲NEW且對runner變量CAS成功。 * 對state的判斷寫在前面,是一種優化。 */ if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; /* * 是否成功運行。 * 之因此用了這樣一個標誌位,而不是把set方法寫在try中call調用的後一句, * 是爲了避免想捕獲set方法出現的異常。 * * 舉例來講,子類覆蓋了FutureTask的done方法, * set -> finishCompletion -> done會拋出異常, * 然而實際上提交的任務是有正常的結果的。 */ boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { /* * * 要清楚,即使在runner被清爲null後,仍然有可能有線程會進入到run方法的外層try塊。 * 舉例:線程A和B都在執行第一行的if語句讀到state == NEW,線程A成功cas了runner,並執行到此處。 * 在此過程當中線程B都沒拿到CPU時間片。此時線程B一旦拿到時間片就能進到外層try塊。 * * 爲了不線程B重複執行任務,必須在set/setException方法被調用,才能把runner清爲null。 * 這時候其餘線程即使進入到了外層try塊,也必定可以讀到state != NEW,從而避免任務重複執行。 */ runner = null; /* * 由於任務執行過程當中因爲cancel方法的調用,狀態爲INTERRUPTING, * 令當前線程等待INTERRUPTING狀態變爲INTERRUPTED。 * 這是爲了避免想讓中斷操做逃逸出run方法以致於線程在執行後續操做時被中斷。 */ int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } protected void set(V v) { // 經過CAS狀態來確認計算沒有被取消,結果也沒有被設置過。 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } } protected void setException(Throwable t) { // 經過CAS狀態來確認計算沒有被取消,結果也沒有被設置過。 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } } private void finishCompletion() { for (WaitNode q; (q = waiters) != null;) { // 必須將棧頂CAS爲null,不然重讀棧頂並重試。 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { // 遍歷並喚醒棧中節點對應的線程。 for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; // 將next域置爲null,這樣對GC友好。 q.next = null; q = next; } break; } } /* * done方法是暴露給子類的一個鉤子方法。 * * 這個方法在ExecutorCompletionService.QueueingFuture中的override實現是把結果加到阻塞隊列裏。 * CompletionService誰用誰知道,奧祕全在這。 */ done(); /* * callable置爲null主要爲了減小內存開銷, * 更多能夠了解JVM memory footprint相關資料。 */ callable = null; } private void handlePossibleCancellationInterrupt(int s) { /* * 這裏的主要目的就是等調用cancel方法的線程完成中斷。 * * 以防止中斷操做逃逸出run或者runAndReset方法,影響後續操做。 * * 實際上,當前調用cancel方法的線程不必定可以中斷到本線程。 * 有可能cancel方法裏讀到runner是null,甚至有多是其它併發調用run/runAndReset方法的線程。 * 可是也沒辦法判斷另外一個線程在cancel方法中讀到的runner究竟是什麼,因此索性自旋讓出CPU時間片也沒事。 */ if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); /* * 下面的代碼在JDK8中已經被註釋掉了。 * 由於在原來的設計中,是想把cancel方法設置的中斷位給清除的。 * 可是實際上也應該容許調用FutureTask的線程使用中斷做爲線程間通訊的機制, * 這裏沒辦法區分中斷位究竟是不是來自cancel方法的調用。 */ // assert state == INTERRUPTED; // We want to clear any interrupt we may have received from // cancel(true). However, it is permissible to use interrupts // as an independent mechanism for a task to communicate with // its caller, and there is no way to clear only the // cancellation interrupt. // // Thread.interrupted(); }
public V get() throws InterruptedException, ExecutionException { int s = state; // NEW或者COMPLETING。 if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; // 完成賦值 if (s > COMPLETING) { // 若是q已經被初始化了,爲了GC須要清q.thread。 if (q != null) q.thread = null; return s; } // COMPLETING是一個很短暫的狀態,調用Thread.yield指望讓出時間片,以後重試循環。 else if (s == COMPLETING) Thread.yield(); // 初始化節點,重試一次循環。 else if (q == null) q = new WaitNode(); // queued記錄是否已經入棧,此處準備將節點壓棧。 else if (!queued) /* * 這是Treiber Stack算法入棧的邏輯。 * Treiber Stack是一個基於CAS的無鎖併發棧實現, * 更多能夠參考https://en.wikipedia.org/wiki/Treiber_Stack */ queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 若是有時限,判斷是否超時,未超時則park剩下的時間。 else if (timed) { nanos = deadline - System.nanoTime(); // 超時,移除棧中節點。 if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } } /** * 清理用於保存等待線程棧裏的節點。 * 所謂節點無效就是內部的thread爲null, * 通常有如下幾種狀況: * 1. 節點調用get超時。 * 2. 節點調用get中斷。 * 3. 節點調用get拿到task的狀態值(> COMPLETING)。 * * 此方法幹了兩件事情: * 1. 置標記參數node的thread爲null。 * 2. 清理棧中的無效節點。 * * 若是在遍歷過程當中發現有競爭則從新遍歷棧。 */ private void removeWaiter(WaitNode node) { if (node != null) { node.thread = null; retry: for (;;) { // restart on removeWaiter race for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; // 若是當前節點仍有效,則置pred爲當前節點,繼續遍歷。 if (q.thread != null) pred = q; /* * 當前節點已無效且有前驅,則將前驅的後繼置爲當前節點的後繼實現刪除節點。 * 若是前驅節點已無效,則從新遍歷waiters棧。 */ else if (pred != null) { pred.next = s; if (pred.thread == null) continue retry; } /* * 當前節點已無效,且當前節點沒有前驅,則將棧頂置爲當前節點的後繼。 * 失敗的話從新遍歷waiters棧。 */ else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } } } /** * 導出結果。 */ private V report(int s) throws ExecutionException { Object x = outcome; // 正常執行完計算任務。 if (s == NORMAL) return (V)x; // 取消。 if (s >= CANCELLED) throw new CancellationException(); // 執行計算任務時發生異常。 throw new ExecutionException((Throwable)x); }
public boolean cancel(boolean mayInterruptIfRunning) { /* * 在狀態還爲NEW的時候,根據參數中的是否容許傳遞, * 將狀態流轉到INTERRUPTING或者CANCELLED。 */ if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { if (mayInterruptIfRunning) { try { // 中斷runner線程。 Thread t = runner; if (t != null) t.interrupt(); } finally { UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { // 該方法上文已經分析過。 finishCompletion(); } return true; }
state == NEW && UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
時刻1: 線程A和B同時讀到state == NEW。
時刻2: 線程A成功對runner變量CAS進入run方法主體。
時刻3: 線程C調用cancel方法,成功將狀態CAS爲CANCELLED。
時刻4: 線程A調用finally中的runner = null。
時刻5: 線程B開始執行run方法第一句if的後半句,成功將runner變量CAS到線程B。
時刻6: 線程C讀到runner爲線程B,準備對線程B進行interrupt()
時刻7: 線程A調用handlePossibleCancellationInterrupt等待狀態從INTERRUPTING流轉至INTERRUPTED。
時刻8: 線程B被中斷。函數
* @param mayInterruptIfRunning {@code true} if the thread executing this * task should be interrupted; otherwise, in-progress tasks are allowed * to complete
上面的狀況到底線程A和B哪一個算是the thread executing this task
經過閱讀源碼,發現FutureTask仍是存在一個隱形的內存佔用問題的,或者按照《Effective Java》上說的應該叫無心識的對象保留。
時刻1: 某線程調用get,已經入等待棧,此時waiters爲該線程對應節點。
時刻2: 有大量線程經過調用get試圖獲取計算結果,get -> awaitDone方法中,通過兩輪循環都讀到狀態是NEW的話,此時它們節點已經被初始化過了,但還沒開始入隊。
時刻3: 有線程調用run方法,經過run -> set -> finishCompletion,將waiters置爲null,並喚醒了已經入棧的那個線程。
時刻4: 調用awaitDone方法的那些線程再試圖入隊的話,後面循環會發現狀態已是NORMAL了,可是waiters棧此時不爲空,並且再也無法被清掉了。
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);