FutureTask 源碼分析

FutureTask是一個支持取消行爲的異步任務執行器。該類實現了Future接口的方法。
如:node

  1. 取消任務執行
  2. 查詢任務是否執行完成
  3. 獲取任務執行結果(」get「任務必須得執行完成才能獲取結果,不然會阻塞直至任務完成)。
    注意:一旦任務執行完成或取消任務,則不能執行取消任務或者從新啓動任務。(除非一開始就使用runAndReset模式運行任務)

FutureTask實現了Runnable接口和Future接口,所以FutureTask能夠傳遞到線程對象Thread或Excutor(線程池)來執行。dom

若是在當前線程中須要執行比較耗時的操做,但又不想阻塞當前線程時,能夠把這些做業交給FutureTask,另開一個線程在後臺完成,噹噹前線程未來須要時,就能夠經過FutureTask對象得到後臺做業的計算結果或者執行狀態。異步

示例

public class FutureTaskDemo {

    public static void main(String[] args) throws InterruptedException{
        FutureTask<Integer> ft = new FutureTask<>(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                int num = new Random().nextInt(10);
                TimeUnit.SECONDS.sleep(num);
                return num;
            }
        });
        Thread t = new Thread(ft);
        t.start();
        //這裏能夠作一些其它的事情,跟futureTask任務並行,等須要futureTask的運行結果時,能夠調用get方法獲取。
        try {
            //等待任務執行完成,獲取返回值
            Integer num = ft.get();
            System.out.println(num);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

FutureTask 源碼分析

JDK1.7及以前,FutureTask 經過使用內部類Sync繼承AQS來實現。
內部使用的AQS的共享鎖。
AQS具體實現可參考 AbstractQueuedSynchronizer 源碼分析ide

JDK1.8沒有使用AQS,而是本身實現了一個同步等待隊列,在結果返回以前,全部的線程都被阻塞,存放到等待隊列中。源碼分析

下面咱們來分析下JDK1.8的FutureTask 源碼this

FutureTask 類結構

public class FutureTask<V> implements RunnableFuture<V> {
 
   /**
     * 當前任務的運行狀態。
     *
     * 可能存在的狀態轉換
     * NEW -> COMPLETING -> NORMAL(有正常結果)
     * NEW -> COMPLETING -> EXCEPTIONAL(結果爲異常)
     * NEW -> CANCELLED(無結果)
     * NEW -> INTERRUPTING -> INTERRUPTED(無結果)
     */
    private volatile int state;
    private static final int NEW          = 0;  //初始狀態
    private static final int COMPLETING   = 1;  //結果計算完成或響應中斷到賦值給返回值之間的狀態。
    private static final int NORMAL       = 2;  //任務正常完成,結果被set
    private static final int EXCEPTIONAL  = 3;  //任務拋出異常
    private static final int CANCELLED    = 4;  //任務已被取消
    private static final int INTERRUPTING = 5;  //線程中斷狀態被設置ture,但線程未響應中斷
    private static final int INTERRUPTED  = 6;  //線程已被中斷

    //將要執行的任務
    private Callable<V> callable;
    //用於get()返回的結果,也多是用於get()方法拋出的異常
    private Object outcome; // non-volatile, protected by state reads/writes
    //執行callable的線程,調用FutureTask.run()方法經過CAS設置
    private volatile Thread runner;
    //棧結構的等待隊列,該節點是棧中的最頂層節點。
    private volatile WaitNode waiters;
    ....

FutureTask實現的接口信息以下:spa

RunnableFuture 接口

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

RunnableFuture 接口基礎了Runnable和Future接口.net

Future 接口

public interface Future<V> {
    //取消任務
    boolean cancel(boolean mayInterruptIfRunning);
    //判斷任務是否已經取消
    boolean isCancelled();
    //判斷任務是否結束(執行完成或取消)
    boolean isDone();
    //阻塞式獲取任務執行結果
    V get() throws InterruptedException, ExecutionException;
    //支持超時獲取任務執行結果
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

run 方法

public void run() {
    //保證callable任務只被運行一次
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //執行任務
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        runner = null;
        int s = state;
        //判斷該任務是否正在響應中斷,若是中斷沒有完成,則等待中斷操做完成
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

1.若是state狀態不爲New或者設置運行線程runner失敗則直接返回false,說明線程已經啓動過,保證任務在同一時刻只被一個線程執行。
2.調用callable.call()方法,若是調用成功則執行set(result)方法,將state狀態設置成NORMAL。若是調用失敗拋出異常則執行setException(ex)方法,將state狀態設置成EXCEPTIONAL,喚醒全部在get()方法上等待的線程。
3.若是當前狀態爲INTERRUPTING(步驟2已CAS失敗),則一直調用Thread.yield()直至狀態不爲INTERRUPTING線程

set方法rest

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}
  1. 首先經過CAS把state的NEW狀態修改爲COMPLETING狀態。
  2. 修改爲功則把v值賦給outcome變量。而後再把state狀態修改爲NORMAL,表示如今能夠獲取返回值。
  3. 最後調用finishCompletion()方法,喚醒等待隊列中的全部節點。

setException 方法

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

同上 set(V v) 方法

  1. 首先經過CAS把state的NEW狀態修改爲COMPLETING狀態。
  2. 修改爲功則把v值賦給outcome變量。而後再把state狀態修改爲EXCEPTIONAL,表示待返回的異常信息設置成功。
  3. 最後調用finishCompletion()方法,喚醒等待隊列中的全部節點。

handlePossibleCancellationInterrupt方法

private void handlePossibleCancellationInterrupt(int s) {
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
            Thread.yield(); // wait out pending interrupt
}

該方法是若是正在響應中斷(EXCEPTIONAL),則等待響應中斷結束(INTERRUPTED)。

finishCompletion方法

private void finishCompletion() {
    for (WaitNode q; (q = waiters) != null;) {
        //經過CAS把棧頂的元素置爲null,至關於彈出棧頂元素
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
    done();
    callable = null;        // to reduce footprint
}

把棧中的元素一個一個彈出,並經過 LockSupport.unpark(t)喚醒每個節點,通知每一個線程,該任務執行完成(多是執行完成,也可能cancel,異常等)

runAndReset 方法 (可被子類重寫,外部沒法直接調用)

protected boolean runAndReset() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return false;
    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        if (c != null && s == NEW) {
            try {
                c.call(); // don't set result
                ran = true;
            } catch (Throwable ex) {
                setException(ex);
            }
        }
    } finally {
        runner = null;
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    return ran && s == NEW;
}

該方法和run方法的區別是,run方法只能被運行一次任務,而該方法能夠屢次運行任務。而runAndReset這個方法不會設置任務的執行結果值,若是該任務成功執行完成後,不修改state的狀態,仍是可運行(NEW)狀態,若是取消任務或出現異常,則不會再次執行。
而只是執行任務完以後,

get方法

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

若是state狀態小於等於COMPLETING,說明任務還沒開始執行或還未執行完成,而後調用awaitDone方法阻塞該調用線程。若是state的狀態大於COMPLETING,則說明任務執行完成,或發生異常、中斷、取消狀態。直接經過report方法返回執行結果。

get(long timeout, TimeUnit unit) 方法

public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

同上面get方法,該get方法支持阻塞等待多長時間,若是超時直接拋出TimeoutException異常。

report方法

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

若是state的狀態爲NORMAL,說明任務正確執行完成,直接返回計算後的值。
若是state的狀態大於等於CANCELLED,說明任務被成功取消執行、或響應中斷,直接返回CancellationException異常
不然返回ExecutionException異常。

awaitDone 方法

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        //若是該線程執行interrupt()方法,則從隊列中移除該節點,並拋出異常
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        int s = state;
        //若是state狀態大於COMPLETING 則說明任務執行完成,或取消
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        //若是state=COMPLETING,則使用yield,由於此狀態的時間特別短,經過yield比掛起響應更快。
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        //構建節點
        else if (q == null)
            q = new WaitNode();
        //把當前節點入棧
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
        //若是須要阻塞指定時間,則使用LockSupport.parkNanos阻塞指定時間
        //若是到指定時間還沒執行完,則從隊列中移除該節點,並返回當前狀態
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            //阻塞當前線程
            else
                LockSupport.park(this);
        }
}

構建棧鏈表的節點元素,並將該節點入站,同時阻塞當前線程等待運行主任務的線程喚醒該節點。
JDK1.7版本是使用AQS的雙向鏈表隊列實現的。

removeWaiter 方法

private void removeWaiter(WaitNode node) {
    if (node != null) {
        node.thread = null;
        retry:
        for (;;) {          // restart on removeWaiter race
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next;
                if (q.thread != null)
                    pred = q;
                else if (pred != null) {
                    pred.next = s;
                    if (pred.thread == null) // check for race
                        continue retry;
                }
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
                    continue retry;
            }
            break;
        }
    }
}

移除棧中的節點元素,須要使用CAS自旋來保障移除成功。

cancel方法

public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}
  1. 根據mayInterruptIfRunning是否爲true,CAS設置狀態爲INTERRUPTING或CANCELLED,設置成功,繼續第二步,不然返回false
  2. 若是mayInterruptIfRunning爲true,調用runner.interupt(),設置狀態爲INTERRUPTED
  3. 喚醒全部在get()方法等待的線程

總結:狀態爲NEW時,cancel和run方法才能夠被運行。

  1. 任務開始運行後,不能在次運行,保證只運行一次(runAndReset 方法除外)
  2. 任務還未開始,或者任務已被運行,但未結束,這兩種狀況下均可以取消; 若是任務已經結束,則不能夠被取消 。
相關文章
相關標籤/搜索