FutureTask源碼分析

FutureTask介紹

FutureTask是一種可取消的異步計算任務。它實現了Future接口,表明了異步任務的返回結果。從而FutureTask能夠啓動和取消異步計算任務、查詢異步計算任務是否完成和獲取異步計算任務的返回結果。只有到異步計算任務結束時才能獲取返回結果,當異步計算任務還未結束時調用get方法會使線程阻塞。一旦異步計算任務完成,計算任務不能從新啓動或者取消,除非調用了runAndReset。node

clipboard.png

FutureTask實現了RunnableFuture,RunnableFuture結合了Future和Runnable。安全

FutureTask原理分析

在ThreadPoolExecutor分析中咱們沒有看它的父類AbstractExecutorService,其中有一個方法submit,返回一個Future,說明該方法能夠獲取異步任務的返回結果。該方法有三個重載,能夠接收Runnable和Callable,Callable是能夠返回結果的一個Runnable,而Callable就是FutureTask的一個重要的變量。異步

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

FutureTask的一些變量和狀態

/**
 * The run state of this task, initially NEW.  The run state
 * transitions to a terminal state only in methods set,
 * setException, and cancel.  During completion, state may take on
 * transient values of COMPLETING (while outcome is being set) or
 * INTERRUPTING (only while interrupting the runner to satisfy a
 * cancel(true)). Transitions from these intermediate to final
 * states use cheaper ordered/lazy writes because values are unique
 * and cannot be further modified.
 *
 * Possible state transitions:
 * NEW -> COMPLETING -> NORMAL
 * NEW -> COMPLETING -> EXCEPTIONAL
 * NEW -> CANCELLED
 * NEW -> INTERRUPTING -> INTERRUPTED
 */
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

/** The underlying callable; nulled out after running */
//一個能夠返回結果的任務
private Callable<V> callable;
/** The result to return or exception to throw from get() */
//包裝返回結果或者異常,沒有被volatile修飾,狀態保護讀寫安全
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
//運行線程
private volatile Thread runner;
/** Treiber stack of waiting threads */
//單鏈表,是一個線程的棧的結構
private volatile WaitNode waiters;

FutureTask有7中狀態,介紹一下狀態之間的轉換:
NEW -> COMPLETING -> NORMAL:任務正常執行;
NEW -> COMPLETING -> EXCEPTIONAL:任務發生異常;
NEW -> CANCELLED:任務被取消;
NEW -> INTERRUPTING -> INTERRUPTED:任務被中斷;oop

run方法

public void run() {
    //若是state不爲NEW,說明任務已經在執行或者取消
    //若是設置運行線程失敗,說明任務已經有運行線程搶在前面
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        //NEW狀態才能夠執行
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //執行任務
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                //設置異常信息
                setException(ex);
            }
            if (ran)
                //設置任務運行結果
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        //將運行線程清空,在state被更改以前要保證runner非空,這樣能包裝run方法不被屢次執行
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        //中斷處理
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

set、setException和handlePossibleCancellationInterrupt

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

當執行時發生異常,調用setException,首先將state設置爲COMPLETING,設置成功後將outcome設置爲異常,而後將state設置爲EXCEPTIONAL。this

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

當callable執行成功並返回,調用set,首先將state設置爲COMPLETING,設置成功後將結果設置爲outcome,而後設置state爲NORMAL。spa

finally中若是state爲中斷,調用handlePossibleCancellationInterrupt:線程

private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us.  Let's spin-wait patiently.
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
            Thread.yield(); // wait out pending interrupt

    // assert state == INTERRUPTED;

    // We want to clear any interrupt we may have received from
    // cancel(true).  However, it is permissible to use interrupts
    // as an independent mechanism for a task to communicate with
    // its caller, and there is no way to clear only the
    // cancellation interrupt.
    //
    // Thread.interrupted();
}

若是狀態一直是INTERRUPTING,稍稍等待。rest

finishCompletion和get

在上面set和setException中最後都調用了finishCompletion方法:code

private void finishCompletion() {
    // assert state > COMPLETING;
    //該方法必須在state > COMPLETING時調用
    //從頭至尾喚醒WaitNode中阻塞的線程
    for (WaitNode q; (q = waiters) != null;) {
        //設置棧頂爲空
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                //喚醒線程
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                //若是next爲空,break
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}

在調用get方法時,若是任務還在執行,線程會阻塞,FutureTask會將阻塞的線程放入waiters單鏈表。等待任務結束時被喚醒,咱們繼續看get方法:接口

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        //若是任務還在執行,阻塞當前線程,放入waiters單鏈表
        s = awaitDone(false, 0L);
    return report(s);
}

awaitDone

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        //若是線程被中斷,移除當前node,拋出異常
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        //若是任務完成或者被取消,直接返回
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        //若是任務正在執行,線程等待一下
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        //若是q爲空,新建一個node
        else if (q == null)
            q = new WaitNode();
        //若是還未入列,嘗試將新建的node放入鏈表
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        //若是設置了超時且超時了
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                //超時,移除node
                removeWaiter(q);
                return state;
            }
            //阻塞線程
            LockSupport.parkNanos(this, nanos);
        }
        //阻塞當前線程
        else
            LockSupport.park(this);
    }
}

removeWaiter

private void removeWaiter(WaitNode node) {
    if (node != null) {
        //設置節點的線程爲空,作刪除標記
        node.thread = null;
        retry:
        for (;;) {          // restart on removeWaiter race
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next;
                //thread不爲空,continue
                if (q.thread != null)
                    pred = q;
                //thread爲空且pred不爲空
                else if (pred != null) {
                    //刪除q
                    pred.next = s;
                    //檢查一下pred的thread,若是被其餘線程修改,retry outer loop
                    if (pred.thread == null) // check for race
                        continue retry;
                }
                //thread爲空且pred爲空說明q爲棧頂,將q.next設置爲棧頂,失敗則retry
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                      q, s))
                    continue retry;
            }
            break;
        }
    }
}

report方法

get方法最後調用了report方法:

private V report(int s) throws ExecutionException {
    Object x = outcome;
    //NORMAL表示任務執行正常,返回結果
    if (s == NORMAL)
        return (V)x;
    //任務被取消,拋出異常
    if (s >= CANCELLED)
        throw new CancellationException();
    //其餘狀況只有可能發生異常,拋出該異常
    throw new ExecutionException((Throwable)x);
}

cancel方法

最後看一下cancel方法:

public boolean cancel(boolean mayInterruptIfRunning) {
    //當state不爲NEW說明任務已經開始,不能被取消,返回false
    //當設置state失敗時,返回false
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                //中斷線程
                if (t != null)
                    t.interrupt();
            } finally { // final state
                //設置任務爲INTERRUPTED
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}
相關文章
相關標籤/搜索