從FutureTask內部類WaitNode深刻淺出分析FutureTask實現原理

最近在看併發包的源碼把本身的理解分享給你們啦,有不正確的地方歡迎你們指正。 FutureTask類中的waiters成員變量保存着調用get方法獲取FutureTask計算結果的線程構成的一個棧。 當FutureTask類run方法沒有執行完時,調用get方法的線程會造成一個阻塞的棧,即waiters。併發

static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
  }

一旦FutureTask類run方法執行完、執行中出現異常或者是調用cancel方法取消執行(能夠通知正在執行 FutureTask類run方法的線程響應中斷,經過設置cancel方法參數mayInterruptIfRunning爲true達到) 時,就必須讓阻塞在waiters棧的全部線程退出阻塞。這個是經過FutureTask類的finishCompletion方法 完成的。源碼以下:this

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
    done();
    callable = null;        // to reduce footprint
}

因此,在FutureTask類run方法中的執行完成員變量的callable的call方法時,正常執行或是執行出現異常 調用set設置執行後的結果或是setException設置執行返回出現的異常時,其內部都調用finishCompletion 方法,同理cancel方法內部也調用了finishCompletion方法。FutureTask類run方法源碼以下:.net

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                /*執行callable的call方法出現異常,設置異常,setException內部調用
                   finishCompletion讓阻塞在waiters棧上的全部要獲取run方法執行結
                   果的線程所有中止阻塞,獲得執行時的異常
                */
                setException(ex);
            }
            if (ran)
                /*正常執行完callable的call方法,設置返回結果,set內部調用finishCompletion
                   讓阻塞在waiters棧上的全部要獲取run方法執行結果的線程所有中止阻塞獲得
                   執行的結果
                */
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

如今反過來分析一下,waiters棧是如何構成的。先看一下FutureTask類get方法,就是獲得Callable 的執行結果,咱們前說的全部要獲取run方法執行結果的線程,就是調用 get方法。源碼以下:線程

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        /*FutureTask類run方法執行callable的call方法沒完成,要獲取run方法執行結果的線程阻塞構成
           waiters棧
        */
        s = awaitDone(false, 0L);
    return report(s);
}

FutureTask類有個get變體用於在指定時間沒有獲取結果拋出異常,同get相似這裏不作分析,來看下 awaitDone源碼:code

/**
* Awaits completion or aborts on interrupt or timeout.
*
* [@param](https://my.oschina.net/u/2303379) timed true if use timed waits
* [@param](https://my.oschina.net/u/2303379) nanos time to wait, if timed
* [@return](https://my.oschina.net/u/556800) state upon completion
*/
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
   //自旋
    for (;;) {
        //調用get的線程被中斷,將其對應waiterNode移除,同時拋出異常,interrupted會重置中斷狀態
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        int s = state;
        //FutureTask中run方法執行,即callable任務執行完,返回執行的狀態
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        //FutureTask執行處於中間狀態,獲取結果的線程將cup執行機會讓給真正要執行FutureTask類run方法的線程
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
        //將要獲取結果的線程封裝成對應的WaitNode,用於後面構建阻塞waiters棧
            q = new WaitNode();
        else if (!queued)
            //構建阻塞waiters棧
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            //阻塞當前獲取FutureTask類執行結果的線程
            LockSupport.park(this);
    }
}

上面註解已詳細分析了awaitDone方法。有問題歡迎指正。rem

相關文章
相關標籤/搜索