最近在看併發包的源碼把本身的理解分享給你們啦,有不正確的地方歡迎你們指正。 FutureTask類中的waiters成員變量保存着調用get方法獲取FutureTask計算結果的線程構成的一個棧。 當FutureTask類run方法沒有執行完時,調用get方法的線程會造成一個阻塞的棧,即waiters。併發
static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
一旦FutureTask類run方法執行完、執行中出現異常或者是調用cancel方法取消執行(能夠通知正在執行 FutureTask類run方法的線程響應中斷,經過設置cancel方法參數mayInterruptIfRunning爲true達到) 時,就必須讓阻塞在waiters棧的全部線程退出阻塞。這個是經過FutureTask類的finishCompletion方法 完成的。源碼以下:this
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
因此,在FutureTask類run方法中的執行完成員變量的callable的call方法時,正常執行或是執行出現異常 調用set設置執行後的結果或是setException設置執行返回出現的異常時,其內部都調用finishCompletion 方法,同理cancel方法內部也調用了finishCompletion方法。FutureTask類run方法源碼以下:.net
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; /*執行callable的call方法出現異常,設置異常,setException內部調用 finishCompletion讓阻塞在waiters棧上的全部要獲取run方法執行結 果的線程所有中止阻塞,獲得執行時的異常 */ setException(ex); } if (ran) /*正常執行完callable的call方法,設置返回結果,set內部調用finishCompletion 讓阻塞在waiters棧上的全部要獲取run方法執行結果的線程所有中止阻塞獲得 執行的結果 */ set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
如今反過來分析一下,waiters棧是如何構成的。先看一下FutureTask類get方法,就是獲得Callable 的執行結果,咱們前說的全部要獲取run方法執行結果的線程,就是調用 get方法。源碼以下:線程
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) /*FutureTask類run方法執行callable的call方法沒完成,要獲取run方法執行結果的線程阻塞構成 waiters棧 */ s = awaitDone(false, 0L); return report(s); }
FutureTask類有個get變體用於在指定時間沒有獲取結果拋出異常,同get相似這裏不作分析,來看下 awaitDone源碼:code
/** * Awaits completion or aborts on interrupt or timeout. * * [@param](https://my.oschina.net/u/2303379) timed true if use timed waits * [@param](https://my.oschina.net/u/2303379) nanos time to wait, if timed * [@return](https://my.oschina.net/u/556800) state upon completion */ private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; //自旋 for (;;) { //調用get的線程被中斷,將其對應waiterNode移除,同時拋出異常,interrupted會重置中斷狀態 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; //FutureTask中run方法執行,即callable任務執行完,返回執行的狀態 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } //FutureTask執行處於中間狀態,獲取結果的線程將cup執行機會讓給真正要執行FutureTask類run方法的線程 else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) //將要獲取結果的線程封裝成對應的WaitNode,用於後面構建阻塞waiters棧 q = new WaitNode(); else if (!queued) //構建阻塞waiters棧 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else //阻塞當前獲取FutureTask類執行結果的線程 LockSupport.park(this); } }
上面註解已詳細分析了awaitDone方法。有問題歡迎指正。rem