JUC源碼分析-線程池篇(二)FutureTask

JUC源碼分析-線程池篇(二)FutureTask

JDK5 以後提供了 Callable 和 Future 接口,經過它們就能夠在任務執行完畢以後獲得任務的執行結果。本文從源代碼角度分析下具體的實現原理。html

1. 接口介紹

FutureTask 類結構

1.1 Callable 接口

對於須要執行的任務須要實現 Callable 接口,Callable 接口定義以下:java

public interface Callable<V> {
    V call() throws Exception;
}

能夠看到 Callable 是個泛型接口,泛型 V 就是要 call() 方法返回的類型。Callable 接口和 Runnable 接口很像,均可以被另一個線程執行,可是正如前面所說的,Runnable 不會返回數據也不能拋出異常。node

1.2 Future 接口

Future 接口表明異步計算的結果,經過 Future 接口提供的方法能夠查看異步計算是否執行完成,或者等待執行結果並獲取執行結果,同時還能夠取消執行。Future 接口的定義以下:算法

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
  • cancel(): 用來取消異步任務的執行。若是異步任務已經完成或者已經被取消,或者因爲某些緣由不能取消,則會返回 false。若是任務尚未被執行,則會返回 true 而且異步任務不會被執行。若是任務已經開始執行了可是尚未執行完成,若 mayInterruptIfRunning 爲 true,則會當即中斷執行任務的線程並返回 true,若 mayInterruptIfRunning 爲 false,則會返回 true 且不會中斷任務執行線程。
  • isCanceled(): 判斷任務是否被取消,若是任務在結束(正常執行結束或者執行異常結束)前被取消則返回 true,不然返回 false。
  • isDone(): 判斷任務是否已經完成,若是完成則返回 true,不然返回 false。須要注意的是:任務執行過程當中發生異常、任務被取消也屬於任務已完成,也會返回 true。
  • get(): 獲取任務執行結果,若是任務還沒完成則會阻塞等待直到任務執行完成。若是任務被取消則會拋出 CancellationException 異常,若是任務執行過程發生異常則會拋出 ExecutionException 異常,若是阻塞等待過程當中被中斷則會拋出 InterruptedException 異常。
  • get(long timeout,Timeunit unit): 帶超時時間的 get() 版本,若是阻塞等待過程當中超時則會拋出 TimeoutException 異常。

1.3 FutureTask

Future 只是一個接口,不能直接用來建立對象,FutureTask 是 Future 的實現類,FutureTask 的結構以下:編程

FutureTask -> FutureRunnable -> Future/Runnable

能夠看到,FutureTask 實現了 RunnableFuture 接口,則 RunnableFuture 接口繼承了 Runnable 接口和 Future 接口,因此 FutureTask 既能當作一個 Runnable 直接被 Thread 執行,也能做爲 Future 用來獲得 Callable 的計算結果。segmentfault

2. FutureTask 源碼分析

2.1 FutureTask 生命週期

private volatile int state;
private static final int NEW          = 0;  // 初始狀態
private static final int COMPLETING   = 1;  // 任務已經執行完(正常或者異常),準備賦值結果
private static final int NORMAL       = 2;  // 任務已經正常執行完,並已將任務返回值賦值到結果
private static final int EXCEPTIONAL  = 3;  // 任務執行失敗,並將異常賦值到結果
private static final int CANCELLED    = 4;  // 取消
private static final int INTERRUPTING = 5;  // 準備嘗試中斷執行任務的線程
private static final int INTERRUPTED  = 6;  // 對執行任務的線程進行中斷(未必中斷到)

FutureTask 狀態變化如圖:安全

FutureTask狀態

  • NEW: 表示是個新的任務或者還沒被執行完的任務。這是初始狀態。
  • COMPLETING: 任務已經執行完成或者執行任務的時候發生異常,可是任務執行結果或者異常緣由尚未保存到 outcome 字段(outcome字段用來保存任務執行結果,若是發生異常,則用來保存異常緣由)的時候,狀態會從NEW變動到 COMPLETING。可是這個狀態會時間會比較短,屬於中間狀態。
  • NORMAL: 任務已經執行完成而且任務執行結果已經保存到 outcome 字段,狀態會從 COMPLETING 轉換到 NORMAL。這是一個最終態。
  • EXCEPTIONAL: 任務執行發生異常而且異常緣由已經保存到 outcome 字段中後,狀態會從 COMPLETING 轉換到 EXCEPTIONAL。這是一個最終態。
  • CANCELLED: 任務還沒開始執行或者已經開始執行可是尚未執行完成的時候,用戶調用了 cancel(false)方法取消任務且不中斷任務執行線程,這個時候狀態會從 NEW 轉化爲 CANCELLED 狀態。這是一個最終態。
  • INTERRUPTING: 任務還沒開始執行或者已經執行可是尚未執行完成的時候,用戶調用了 cancel(true)方法取消任務而且要中斷任務執行線程可是尚未中斷任務執行線程以前,狀態會從 NEW 轉化爲 INTERRUPTING。這是一箇中間狀態。
  • INTERRUPTED: 調用 interrupt() 中斷任務執行線程以後狀態會從 INTERRUPTING 轉換到 INTERRUPTED。這是一個最終態。

能夠看到 NEW 爲起始狀態,而 NORMAL, EXCEPTIONAL, CANCELLED, INTERRUPTED 這些狀態爲終止狀態,而 COMPLETING 和 INTERRUPTING 爲中間暫時狀態。爲何要引入 COMPLETING 和 INTERRUPTING 爲兩個中間狀態呢?COMPLETING -> INTERRUPTED 中間有賦值 outcome=v 的過程,INTERRUPTING -> INTERRUPTED 有 t.interrupt 過程。這個過程要保證沒有其它線程干擾。(我的理解)併發

有一點須要注意的是,全部值大於 COMPLETING 的狀態都表示任務已經執行完成(任務正常執行完成,任務執行異常或者任務被取消)。異步

2.2 內部結構

// 內部封裝的 Callable 對象。若是是 Runnable 則會經過 Executors#callable 封裝成 Callable
private Callable<V> callable;
private Object outcome;             // 用於保存計算結果或者異常信息。
private volatile Thread runner;     // 用來運行 Callable 的線程
private volatile WaitNode waiters;  // FutureTask中用了 Trieber Stack 來保存等待的線程
                                    // 這個隊列使用 Treiber stack(能夠理解爲基於 CAS 的無鎖的棧,先進後出)

FutureTask 用了 Treiber Stack 來維護等待任務完成的線程,在 FutureTask 的任務完成/取消/異常後在 finishCompletion 鉤子方法中會喚醒棧中等待的線程。函數

2.3 構造函數

先從 FutureTask 的構造函數看起:

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

能夠看到,waiters 初始值爲 null,初始狀態爲 NEW。

通常發起任務線程跟執行任務線程一般狀況下都不會是同一個線程,在任務執行線程(通常爲線程池)執行任務的時候,任務發起線程能夠查看任務執行狀態、獲取任務執行結果、取消任務等等操做,接下來分析下這些操做。

2.4 任務執行線程

2.4.1 核心方法 run

executor.submit(task) 將任務提交到線程池中後,毫無疑問 FutureTask 會對原有的 run 方法進行包裝,當 run 方法執行完成後設置結果值,並喚醒全部等待的線程。

/**
 * run 方法執行有兩個條件:1. state=NEW; 2. runner=null
 * 1. 執行前 state=NEW & runner=null
 * 2. 執行中 state=NEW & runner=Thread.currentThread()
 * 3. 執行後 state!=NEW & runner=null,根據是否有異常執行 set(result) 或 setException(ex),
 *    set/setException 都會更新 state 狀態,以後線程的狀態就不是 NEW
 * 所以,多個線程同時調用 run 方法的狀況 callable 也只會執行一次
 */
public void run() {
    // 1. state爲 NEW 且對 runner 變量 CAS 成功。 對 state 的判斷寫在前面,是一種優化。 
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            // 2. 是否成功運行的標誌位,而不是把 set 方法寫在 try 中,是爲了避免想捕獲 set 的異常。
            //    好比:子類覆寫 FutureTask#done 方法(set->finishCompletion >done) 拋出異常,
            //    然而實際上提交的任務是有正常的結果的。
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // 3. 若是A/B兩個線程同時執行到步驟①,當A線程從新將runner設置爲 null 時,B線程是否可能會從新執行呢?
        //    實際上A線程一旦已經調用 set/setException 方法,整個流程已經結束了,因此不可能重複執行
        runner = null;

        // 4. 等待調用 cancel(true) 的線程完成中斷,防止中斷操做逃逸出 run 或者 runAndReset 方法
        //    以致於線程在執行後續操做時被中斷
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

總結: run 方法是對 callable 進行了一層包裝,在執行完成後設置返回結果,並喚醒全部等待的線程。

  1. 判斷當前任務的 state 不爲 NEW 則說明任務或者已經執行過,或者已經被取消,直接返回。
  2. 若是狀態爲 NEW 則接着會經過 unsafe 類把任務執行線程引用 CAS 的保存在 runner 字段中,有且只有一個線程能成功,至關於一個獨佔鎖。
  3. 若是任務執行完成,則調用 set 或 setException 方法,同時喚醒全部的等待線程。

2.4.2 設置結果 set/setException

// 設置返回結果,喚醒等待線程。思考爲何須要 COMPLETING 這個中間狀態
protected void set(V v) {
    // 可能任務已經取消,state 的狀態就再也不是 NEW
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

總結: 根據前面的分析,不論是任務執行異常仍是任務正常執行完畢,或者取消任務,最後都會調用 finishCompletion() 方法,該方法實現以下:

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        // 1. 必須將棧頂 CAS 爲 null,不然重讀棧頂並重試。
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            // 2. 遍歷並喚醒棧中節點對應的線程。
            for (;;) {
                Thread t = q.thread;        // 從頭節點開始喚醒全部的等待線程
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);  // 喚醒等待線程
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    // 3. ExecutorCompletionService#QueueingFuture 中把結果加到阻塞隊列裏。
    //    CompletionService 誰用誰知道,奧祕全在這。
    done();     // 子類覆蓋

    // 4. callable 置爲 null 主要爲了減小內存開銷,更多能夠了解 JVM memory footprint 相關資料
    callable = null;        // to reduce footprint
}

總結: 這個方法的實現比較簡單,依次遍歷 waiters 鏈表,喚醒節點中的線程,而後把 callable 置空。被喚醒的線程會各自從 awaitDone() 方法中的 LockSupport.park() 阻塞中返回,而後會進行新一輪的循環。在新一輪的循環中會返回執行結果或者更確切的說是返回任務的狀態。

2.5 任務調用線程

2.5.1 獲取執行結果 get

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)    // NEW 或 COMPLETING
        s = awaitDone(false, 0L);
    return report(s);
}

public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException(); // 超時了,任務還未執行結束
    return report(s);
}

總結: 若是任務還未執行結束就調用 awaitDone 阻塞當前線程,這裏的結束包括任務正常執行完畢,任務執行異常,任務被取消。一旦任務執行結束則調用 report() 返回結果。

// report 根據任務最終的狀態,返回結果或拋出異常
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)    // 1. 正常執行完計算任務
        return (V)x;
    if (s >= CANCELLED) // 2. 取消
        throw new CancellationException();
    throw new ExecutionException((Throwable)x); // 3. 異常
}

2.5.2 等待任務結束 awaitDone

當調用 get() 獲取任務結果可是任務還沒執行完成的時候,調用線程會調用 awaitDone() 方法進行阻塞等待,該方法定義以下:

// 阻塞等待線程執行完成(正常、異常、取消)後返回
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        // 1. 調用 get 的線程是否被其餘線程中斷
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        // 2. 任務已經執行完成,不管是正常、異常、取消
        if (s > COMPLETING) {   // 已經執行完成
            if (q != null)      // help GC
                q.thread = null;
            return s;
        // 3. 結果正在賦值,讓出 CPU 等待
        } else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        // 4. 初始化節點,重試一次循環
        else if (q == null)
            q = new WaitNode();
        // 5. queued 記錄是否已經入棧,此處準備將節點壓棧
        //    節點入隊失敗,自旋直至成功
        else if (!queued)
            // 這是 Treiber Stack算法入棧的邏輯。 Treiber Stack 是一個基於 CAS 的無鎖併發棧實現
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
        // 6. 掛起線程
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);    // 超時直接刪除節點後返回
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this); // 只有任務取消(cancel)和任務執行結束(run),才能喚醒線程
                                    // 此時再一次for循環時state的狀態確定大於COMPLETING
    }
}

總結: awaitDone 有如下幾種狀況:

  1. 調用 get 的線程是否被其餘線程中斷,直接拋出 InterruptedException。
  2. 任務結束返回結果,這裏的結果是指任務的狀態。只要大於 COMPLETING 就表示任務結束。
  3. 調用 get 的線程自旋入隊,並將線程掛起,等待喚醒。只有 finishCompletion 都會喚醒線程,而這個方法只有在任務取消 cancel 或任務執行結束 run 方法中才會調用。固然若是任務超時,直接就返回了,這個狀態可能爲 NEW。

2.5.3 刪除節點 removeWaiter

/**
 * 清理用於保存等待線程棧裏的無效節點,所謂節點無效就是內部的 thread 爲 null(類比 ThreadLocalMap)
 *   
 * 通常有如下幾種狀況:
 * 1. 節點調用 get 超時。
 * 2. 節點調用 get 中斷。
 * 3. 節點調用 get 拿到 task 的狀態值(> COMPLETING)。
 *
 * 此方法幹了兩件事情:
 * 1. 置標記參數 node 的 thread 爲 null
 * 2. 清理棧中的無效節點
 *
 * 若是在遍歷過程當中發現有競爭則從新遍歷棧。
 */
private void removeWaiter(WaitNode node) {
    if (node != null) {
        node.thread = null;
        retry:
        for (;;) {          // restart on removeWaiter race
            // pre(前繼節點) -> current(當前節點) -> next(後繼節點),對應下面的 pre -> q -> s
            // 1.1 q.thread!=null 則更新 pre 節點繼續遍歷
            // 1.2 pre!=null && q.thread==null 時 current 不時頭節點,直接刪除 current
            // 1.3 pre==null && q.thread==null 時 current 是頭節點,更新頭節點爲 next
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next;
                // 1.1 保存當前節點 q 的前繼節點 pre
                if (q.thread != null)
                    pred = q;
                // 1.2 q 節點已經失效,此時根據 q 是不是頭節點分兩種狀況
                //     q 不是頭節點,直接刪除,爲何不須要原子性操做?
                else if (pred != null) {
                    pred.next = s;              // 踢除當前節點 q
                    if (pred.thread == null)    // check for race
                        continue retry;
                // 1.3 q 是頭節點且失效了,原子性更新頭節點
                } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
                    continue retry;
            }
            break;
        }
    }
}

總結: 刪除節點時,執行到 1.2 時直接刪除節點,沒有使用 CAS 原子性操做,難道是線程安全的,仍是說即便出現不一致的狀況,也不影響最終的結果?

2.5.4 取消任務 cancel

/**
 * mayInterruptIfRunning:false 時不容許在線程運行時中斷,true 時容許運行時中斷線程,但不保證必定會中斷線程。
 * 1. true 時,將狀態修改爲 INTERRUPTING,執行 thread.interrupt()
 * 2. false 時,將狀態修改爲 CANCELLED
 */
public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();  // interrupt 不必定能中斷線程
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion(); // 喚醒全部等待線程
    }
    return true;
}

總結: cancel 方法會作下面幾件事:

  1. 修改任務的狀態。須要注意的是參數名稱既然叫 mayInterruptIfRunning 則說明可能沒法經過 interrupt 中止任務。要看業務線程對 interrupt 是否響應。以下面的代碼將 while 條件改成 true 後,調用 cancel 根本沒法中斷任務。

    // !Thread.interrupted() 改成 true 後沒法中斷任務
    Future<?> future = executorService.submit(() -> { 
        while (!Thread.interrupted()) System.out.println("1"); 
    });
    future.cancel(true);
  2. 調用 finishCompletion 喚醒全部的等待任務結果的線程。

3. Other

3.1 FutureTask 版本說明

與 JDK6 版本不一樣,JDK7 的 FutureTask 再也不基於 AQS 來構建,而是在內部採用簡單的 Treiber Stack 來保存等待線程。

executor.submit(task1).cancel(true);  
executor.submit(task2);

看上面的代碼,雖然中斷的是 task1,但可能 task2 獲得中斷信號。參考 FutureTask jdk6不一樣的區別

3.2 FutureTask 問題

參考FutureTask存在的問題

3.3 FutureTask 使用注意事項

(1) 線程池使用FutureTask

線程池使用 FutureTask 的時候若是拒絕策略設置爲了 DiscardPolicy 和 DiscardOldestPolicy 而且在被拒絕的任務的 Future 對象上調用無參 get 方法那麼調用線程會一直被阻塞。因此當使用 Future 的時候,儘可能使用帶超時時間的 get 方法。參考線程池使用FutureTask時候須要注意的一點事

3.4 Treiber Stack

Treiber Stack 在 R. Kent Treiber 在 1986 年的論文 Systems Programming: Coping with Parallelism 中首次出現。它是一種無鎖併發棧,其無鎖的特性是基於 CAS 原子操做實現的。具體實現參考《Java併發編程實戰》一書的 15.4.1 小結中的實現。

  1. Treiber Stack介紹
  2. Treiber Stack簡單分析
  3. Synchronized方式和CAS方式實現線程安全性能思考

參考:

  1. 本文轉載至 《深刻學習 FutureTask》:http://www.importnew.com/25286.html
  2. 《FutureTask源碼解讀》:http://www.javashuo.com/article/p-hiuyxojq-ka.html

天天用心記錄一點點。內容也許不重要,但習慣很重要!

相關文章
相關標籤/搜索