今天是520,可是做爲一隻散發着清香的單身汪,趁着夜深人靜,因而我寫下了這篇關於FutureTask源碼分析的文章。碼字不易,關注一波行不行,行文匆匆,若是有寫得不對的地方歡迎在評論區留言指正,祝你們學習快樂。java
FutureTask是提供了一個可取消的異步計算的過程類,FutureTask實現了Future的基本方法,提供了start和cancel操做,能夠查詢計算是否已經完成,而且能夠獲取計算的結果。結果只能夠在計算完成以後獲取,當計算沒有完成的時候get方法會阻塞,一旦計算已經完成,那麼計算就不能再次啓動或是取消。一個FutureTask能夠用來包裝一個Callable或是一個Runnable對象。由於FurtureTask實現了Runnable方法,因此一個FutureTask能夠提交(submit)給一個Excutor執行(excution)。node
FutureTask可用於異步獲取執行結果或取消執行任務的場景。經過傳入Runnable或者Callable的任務給FutureTask,直接調用其run方法或者放入線程池執行,以後能夠在外部經過FutureTask的get方法異步獲取執行結果,所以,FutureTask很是適合用於耗時的計算,主線程能夠在完成本身的任務後,再去獲取結果。另外,FutureTask還能夠確保即便調用了屢次run方法,它都只會執行一次Runnable或者Callable任務,或者經過cancel取消FutureTask的執行等。微信
在獲取任務執行結果時,FutureTask內部維護了一個由WaitNode類實現的簡單鏈表,它保存了全部等待返回數據的線程,並在結果返回以前將這些線程掛起,只有等任務執行完成或者等待超時時纔會喚醒這些線程。markdown
FutureTask內部維護了一個用volatile修飾的int型成員state來表示狀態,其中共有7種狀態,具體以下源碼所示:多線程
// 狀態 private volatile int state; // 新建狀態 private static final int NEW = 0; // 正在執行狀態 private static final int COMPLETING = 1; // 完成狀態 private static final int NORMAL = 2; // 異常狀態 private static final int EXCEPTIONAL = 3; // 取消狀態 private static final int CANCELLED = 4; // 正在中斷狀態 private static final int INTERRUPTING = 5; // 中斷狀態 private static final int INTERRUPTED = 6; 複製代碼
可能發生的狀態轉換:異步
FutureTask提供了兩個構造方法,分別能夠包裝Callable和Runable的對象,並將狀態初始化爲NEW。由於Runable沒有返回,因此包裝Runable時,會將Runable封裝在RunnableAdapter類裏,RunnableAdapter的call方法返回的就是構造方法傳進來的result,具體代碼以下:源碼分析
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; // 將狀態初始化爲NEW this.state = NEW; } public FutureTask(Runnable runnable, V result) { // 將Runable包裝爲RunnableAdapter類 this.callable = Executors.callable(runnable, result); // 將狀態初始化爲NEW this.state = NEW; } 複製代碼
FutureTask使用內部類WaitNode構建了一個簡單的單鏈表用來記錄等待任務執行結果的線程,並將這個類對象實例放在waiters成員變量上,定義以下:學習
// 等待結果的線程 private volatile WaitNode waiters; static final class WaitNode { // 等待的線程 volatile Thread thread; // 下一個節點 volatile WaitNode next; // 構造方法,將當前線程賦值給thread WaitNode() { thread = Thread.currentThread(); } } 複製代碼
run方法是線程執行任務的關鍵方法,具體任務執行邏輯的調用和返回值的獲取及異常捕獲都在這個方法進行,具體過程以下:this
具體源碼以下:spa
/** * 任務執行方法 */ public void run() { // 1.判斷是否爲新建狀態 // 2.將當前線程經過CAS賦值給runner成員變量,保證了任務只會執行一次 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) // 不知足則直接返回 return; try { // 當前任務對應的callable對象 Callable<V> c = callable; // double check state if (c != null && state == NEW) { V result; boolean ran; try { // 執行任務 result = c.call(); ran = true; } catch (Throwable ex) { // 捕獲異常 result = null; ran = false; // 標記異常 setException(ex); } if (ran) // 將結果賦值到outcome成員變量中 set(result); } } finally { // runner必須爲非空,直到任務執行完成 // 以確保任務只執行一次 runner = null; // 必須在runner爲空後從新讀取狀態,以防止中斷被遺漏 int s = state; if (s >= INTERRUPTING) // 若是正在中斷或中斷狀態 // 調用handlePossibleCancellationInterrupt執行線程讓步 // 確保來自cancel(true)的任何中斷僅在運行或runAndReset時才傳遞給任務 handlePossibleCancellationInterrupt(s); } } 複製代碼
任務執行完成結果賦值及成功狀態轉換,並喚醒全部正在等待任務結果的線程,這裏會有NEW -> COMPLETING -> NORMAL的狀態變化,源碼以下:
/** * 任務執行完成結果賦值及成功狀態轉換 */ protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 將結果賦值給outcome outcome = v; // 標記爲成功狀態 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 喚醒並移除全部等待結果的線程 finishCompletion(); } } /** * 喚醒並移除全部等待結果的線程 */ private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { // 將waiters變量置爲null if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { // 循環遍歷單鏈表 for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; // 喚醒等待線程 LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint } 複製代碼
若是call異常,捕獲異常後經過setException方法標記異常,這裏會有NEW -> COMPLETING -> EXCEPTIONAL的狀態變化,源碼以下:
/** * 標記異常 */ protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 將異常信息賦值給outcome outcome = t; // 標記爲異常狀態 UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 喚醒並移除全部等待結果的線程 finishCompletion(); } } 複製代碼
handlePossibleCancellationInterrupt方法執行線程讓步,確保來自cancel(true)的任何中斷僅在運行或runAndReset時才傳遞給任務,源碼以下:
/** * 確保來自cancel(true)的任何中斷僅在運行或runAndReset時才傳遞給任務 */ private void handlePossibleCancellationInterrupt(int s) { // It is possible for our interrupter to stall before getting a // chance to interrupt us. Let's spin-wait patiently. if (s == INTERRUPTING) while (state == INTERRUPTING) // 等待待處理的中斷 Thread.yield(); } 複製代碼
get方法是用來獲取任務執行的結果,有兩個重載的方法,一個帶超時時間,一個不帶超時時間。兩個方法在任務執行完成以前都會將線程掛起,帶超時時間的get方法會在超時後拋出TimeoutException,不帶超時時間的get方法會一直被掛起,兩個方法都會響應中斷並拋出InterruptedException,線程被喚醒後會調用report方法獲取並返回任務執行結果,源碼以下:
/** * 不帶超時時間 */ public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) // 任務還未執行完成 // 掛起當前線程,沒有超時時間 s = awaitDone(false, 0L); // 獲取並返回任務執行結果 return report(s); } /** * 帶超時時間 */ public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) // 任務還未執行完成而且掛起超時,拋出TimeoutException throw new TimeoutException(); // 獲取並返回任務執行結果 return report(s); } 複製代碼
等待任務完成,或者中斷任務或等待超時時停止,源碼以下:
/** * 等待和超時 */ private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; // 等待結果節點 WaitNode q = null; // 標記是否已將當前等待節點加入等待鏈表waiters boolean queued = false; for (;;) { if (Thread.interrupted()) { // 線程中斷則移除當前等待節點 // 並拋出InterruptedException removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { // 當前狀態 > COMPLETING表示任務已經完成或者被取消或者異常或者被中斷 if (q != null) // 將等待節點thread置爲null // 以便等待節點能夠被回收 q.thread = null; return s; } else if (s == COMPLETING) // 任務正在執行 // 線程讓步以待任務執行完成 Thread.yield(); else if (q == null) // 建立當前的等待節點 q = new WaitNode(); else if (!queued) // 將當前等待節點加入到等待鏈表waiters頭部 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { // 等待已經超時,移除節點 removeWaiter(q); return state; } // 線程掛起 LockSupport.parkNanos(this, nanos); } else // 線程掛起 LockSupport.park(this); } } /** * 將超時或者被中斷的等待節點移除 */ private void removeWaiter(WaitNode node) { if (node != null) { node.thread = null; retry: for (;;) { // 從新開始遍歷 for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; if (q.thread != null) // 將thread不爲null的節點做爲先驅節點 pred = q; else if (pred != null) { // 先驅節點不爲null // 將thread爲null的節點移除 pred.next = s; if (pred.thread == null) // 先驅節點的thread爲null // 從新遍歷 continue retry; } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) // 當前節點的thread爲null而且pred爲null // 就將下一個節點s經過CAS修改waiters的頭結點 // 而後從新遍歷 continue retry; } break; } } } 複製代碼
獲取並返回任務執行結果,源碼以下:
/** * 獲取並返回任務執行結果 */ @SuppressWarnings("unchecked") private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) // 若是狀態等於NORMAL表示任務執行完成 // 返回正常結果 return (V)x; if (s >= CANCELLED) // 狀態 >= CANCELLED表示任務被取消或者任務被中斷 // 拋出CancellationException throw new CancellationException(); // 不然任務執行異常 throw new ExecutionException((Throwable)x); } 複製代碼
取消任務執行,源碼以下:
/** * 取消任務執行 * @param mayInterruptIfRunning true:表示中斷任務;false:表示取消任務 */ public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) // 非新建狀態 // 或者修改狀態失敗都返回false return false; try { if (mayInterruptIfRunning) { // 中斷任務 try { Thread t = runner; if (t != null) // 添加中斷標記 t.interrupt(); } finally { // 修改中斷狀態 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { // 喚醒並移除全部等待結果的線程 finishCompletion(); } return true; } 複製代碼
FutureTask是經過LockSupport來阻塞線程、喚醒線程;
對於多線程訪問成員變量waiters、state,都採用CAS來操做。
微信號 : silentao_com