併發之異步計算任務FutureTask源碼jdk1.8解讀

FutureTask介紹 FutureTask是一種能夠取消的異步的計算任務。它的計算是經過Callable實現的,能夠把它理解爲是能夠返回結果的Runnable。java

使用FutureTask的優點有:node

能夠獲取線程執行後的返回結果; 提供了超時控制功能。 它實現了Runnable接口和Future接口:算法

QQ20170406-230647.png 什麼是異步計算呢?也就是說,在讓該任務執行時,不須要一直等待其運行結束返回結果,而是能夠先去處理其餘的事情,而後再獲取返回結果。例如你想下載一個很大的文件,這時很耗時的操做,不必一直等待着文件下載完,你能夠先去吃個飯,而後再回來看下文件是否下載完成,若是下載完成就可使用了,不然還須要繼續等待。編程

FutureTask的實現 FutureTask的狀態 FutureTask內部有這樣幾種狀態:安全

private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;
複製代碼

看名字應該很好理解了,當建立一個FutureTask對象是,初始的狀態是NEW,在運行時狀態會轉換,有4中狀態的轉換過程:bash

NEW -> COMPLETING -> NORMAL:正常執行並返回; NEW -> COMPLETING -> EXCEPTIONAL:執行過程當中出現了異常; NEW -> CANCELLED;執行前被取消; NEW -> INTERRUPTING -> INTERRUPTED:取消時被中斷。 使用FutureTask 下面看一下具體的使用過程:多線程

public class FutureTaskTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        FutureTask<Integer> future = new FutureTask<>(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                int result = 0;
                for (int i = 0; i < 100; i++) {
                    result += i;
                }
                return result;
            }
        });
        executor.execute(future);
        System.out.println(future.get());
    }
}
複製代碼

FutureTask內部結構併發

public class FutureTask<V> implements RunnableFuture<V> {
    
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;
    /** The underlying callable; nulled out after running */
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** 執行callable的線程 **/
    private volatile Thread runner;
    /**
     * Treiber stack of waiting threads
     * 使用Treiber算法實現的無阻塞的Stack,
     * 用於存放等待的線程
     */
    private volatile WaitNode waiters;
    
    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }
    
    public V get() throws InterruptedException, ExecutionException {
        ...
    }
    
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        ...
    }
    
    ...
    ```
這裏的waiters理解爲一個stack,由於在調用get方法時任務可能尚未執行完,這時須要將調用get方法的線程放入waiters中。

最重要的兩個get方法,用於獲取返回結果,第二種提供了超時控制功能。

FutureTask構造方法
FutureTask有兩個構造方法:

複製代碼

public FutureTask(Callable callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }異步

第二種構造方法傳入一個Runnable對象和一個返回值對象,由於Runnable是沒有返回值的,因此要經過result參數在執行完以後返回結果。

run方法
FutureTask實現了Runnable接口,因此須要實現run方法,代碼以下:

複製代碼

public void run() { /* * 首先判斷狀態,若是不是初始狀態,說明任務已經被執行或取消; * runner是FutureTask的一個屬性,用於保存執行任務的線程, * 若是不爲空則表示已經有線程正在執行,這裏用CAS來設置,失敗則返回。 */ if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable c = callable; // 只有初始狀態纔會執行 if (c != null && state == NEW) { V result; boolean ran; try { // 執行任務 result = c.call(); // 若是沒出現異常,則說明執行成功了 ran = true; } catch (Throwable ex) { result = null; ran = false; // 設置異常 setException(ex); } // 若是執行成功,則設置返回結果 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() // 不管是否執行成功,把runner設置爲null runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; // 若是被中斷,則說明調用的cancel(true), // 這裏要保證在cancel方法中把state設置爲INTERRUPTED // 不然可能在cancel方法中還沒執行中斷,形成中斷的泄露 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }ide

總結一下run方法的執行過程

只有state爲NEW的時候才執行任務;
執行前要設置runner爲當前線程,使用CAS來設置是爲了防止競爭;
若是任務執行成功,任務狀態從NEW轉換爲COMPLETING,若是執行正常,設置最終狀態爲NORMAL;若是執行中出現了異常,設置最終狀態爲EXCEPTIONAL;
喚醒並刪除Treiber Stack中的全部節點;
若是調用了cancel(true)方法進行了中斷,要確保在run方法執行結束前的狀態是INTERRUPTED。
這裏涉及到3個比較重要的方法:setException,set和handlePossibleCancellationInterrupt。

setException方法
複製代碼

protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }

若是在執行過程當中(也就是調用call方法時)出現了異常,則要把狀態先設置爲COMPLETING,若是成功,設置outcome = t,outcome對象是Object類型的,用來保存異常或者返回結果對象,也就是說,在正常的執行過程當中(沒有異常,沒有調用cancel方法),outcome保存着返回結果對象,會被返回,若是出現了異常或者中斷,則不會返回並拋出異常,這個在介紹report方法時會講到。

接着設置狀態爲EXCEPTIONAL,這也是最終的狀態。

finishCompletion方法稍後再分析。

set複製代碼

protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }

很簡單,與setException相似,只不過這裏的outcome是返回結果對象,狀態先設置爲COMPLETING,而後再設置爲MORMAL。

handlePossibleCancellationInterrupt方法
複製代碼

private void handlePossibleCancellationInterrupt(int s) { // It is possible for our interrupter to stall before getting a // chance to interrupt us. Let's spin-wait patiently. if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt // assert state == INTERRUPTED; // We want to clear any interrupt we may have received from // cancel(true). However, it is permissible to use interrupts // as an independent mechanism for a task to communicate with // its caller, and there is no way to clear only the // cancellation interrupt. // // Thread.interrupted(); }

handlePossibleCancellationInterrupt方法要確保cancel(true)產生的中斷髮生在run或runAndReset方法執行的過程當中。這裏會循環的調用Thread.yield()來確保狀態在cancel方法中被設置爲INTERRUPTED。

這裏不可以清除中斷標記,由於不能肯定中斷必定來自於cancel方法。

finishCompletion方法
```
private void finishCompletion() {
    // assert state > COMPLETING;
    // 執行該方法時state必須大於COMPLETING
    // 逐個喚醒waiters中的線程
    for (WaitNode q; (q = waiters) != null;) {
        // 設置棧頂節點爲null
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                // 喚醒線程
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                // 若是next爲空,說明棧空了,跳出循環
                WaitNode next = q.next;
                if (next == null)
                    break;
                // 方便gc回收
                q.next = null; // unlink to help gc
                // 從新設置棧頂node
                q = next;
            }
            break;
        }
    }
    // 鉤子方法
    done();
    callable = null;        // to reduce footprint
}
```
在調用get方法時,若是任務尚未執行結束,則會阻塞調用的線程,而後把調用的線程放入waiters中,這時,若是任務執行完畢,也就是調用了finishCompletion方法,waiters會依次出棧並逐個喚醒對應的線程。

由此能夠想到,WaitNode必定是在get方法中被添加到棧中的,下面來看下get方法的實現。

get方法
```
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}
public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}
```
這兩個方法相似,首先判斷狀態,若是s <= COMPLETING,說明任務已經執行完畢,但set方法或setException方法還未執行結束(還未設置狀態爲NORMAL或EXCEPTIONAL),這時須要將當前線程添加到waiters中並阻塞。

第二種get提供了超時功能,若是在規定時間內任務還未執行完畢或者狀態仍是COMPLETING,則獲取結果超時,拋出TimeoutException。而第一種get會一直阻塞直到state > COMPLETING。

awaitDone方法
awaitDone方法的工做是根據狀態來判斷是否可以返回結果,若是任務還未執行完畢,要添加到waiters中並阻塞,不然返回狀態。代碼以下:

```
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    // 計算到期時間
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        // 若是被中斷,刪除節點,拋出異常
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        int s = state;
        // 若是任務執行完畢而且設置了最終狀態或者被取消,則返回。
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        // s == COMPLETING時經過Thread.yield();讓步其餘線程執行,
        // 主要是爲了讓狀態改變
        else if (s == COMPLETING) // 讓步其餘線程,若是由其餘線程繼續搶佔執行計算結果
            Thread.yield();
        // 建立一個WaitNode
        else if (q == null)
            q = new WaitNode();
        // CAS設置棧頂節點,若是該節點已經設置了,則這裏再也不執行設置棧頂。
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        // 若是設置了超時,則計算是否已經到了開始設置的到期時間
        else if (timed) {
            nanos = deadline - System.nanoTime();
            // 若是已經到了到期時間,刪除節點,返回狀態
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            // 阻塞到到期時間
            LockSupport.parkNanos(this, nanos);
        }
        // 若是沒有設置超時,會一直阻塞,直到被中斷或者被喚醒
        else
            LockSupport.park(this);
    }
}
```
removeWaiter方法
```
private void removeWaiter(WaitNode node) {
    if (node != null) {
        // 將thread設置爲null是由於下面要根據thread是否爲null判斷是否要把node移出
        node.thread = null;
        // 這裏自旋保證刪除成功
        retry:
        for (;;) {          // restart on removeWaiter race
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next;
                // q.thread != null說明該q節點不須要移除
                if (q.thread != null)
                    pred = q;
                // 若是q.thread == null,且pred != null,須要刪除q節點
                else if (pred != null) {
                    // 刪除q節點
                    pred.next = s;
                    // pred.thread == null時說明在併發狀況下被其餘線程修改了;
                    // 返回第一個for循環重試
                    if (pred.thread == null) // check for race
                        continue retry;
                }
                // 若是q.thread != null且pred == null,說明q是棧頂節點
                // 設置棧頂元素爲s節點,若是失敗則返回重試
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                      q, s))
                    continue retry;
            }
            break;
        }
    }
}
```
cancel方法
cancel方法用於取消任務,這裏可能有兩種狀況,一種是任務已經執行了,另外一種是還未執行,代碼以下:
```
public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        // mayInterruptIfRunning參數表示是否要進行中斷
        if (mayInterruptIfRunning) {
            try {
                // runner保存着當前執行任務的線程
                Thread t = runner;
                // 中斷線程
                if (t != null)
                    t.interrupt();
            } finally { // final state
                // 設置最終狀態爲INTERRUPTED
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}
```
第一個if判斷可能有些很差理解,其實等價於以下代碼:
```
if (!state == NEW ||
          !UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED))
```
若是狀態不是NEW,或者設置狀態爲INTERRUPTING或CANCELLED失敗,則取消失敗,返回false。

簡單來講有一下兩種狀況:

若是當前任務尚未執行,那麼state == NEW,那麼會嘗試設置狀態,若是設置狀態失敗會返回false,表示取消失敗;
若是當前任務已經被執行了,那麼state > NEW,也就是!state == NEW爲true,直接返回false。
也就是說,若是任務一旦開始執行了(state != NEW),那麼就不能被取消。

若是mayInterruptIfRunning爲true,要中斷當前執行任務的線程。

report方法
get方法在調用awaitDone方法後,會調用report方法進行返回:
```
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}
```
很簡單,能夠看到有3中執行狀況:

若是s == NORMAL爲true,說明是正常執行結束,那麼根據上述的分析,在正常執行結束時outcome存放的是返回結果,把outcome返回;
若是s >= CANCELLED爲true,說明是被取消了,拋出CancellationException;
若是s < CANCELLED,那麼狀態只能是是EXCEPTIONAL,表示在執行過程當中出現了異常,拋出ExecutionException。
runAndReset方法
該方法和run方法相似,區別在於這個方法不會設置任務的執行結果值,因此在正常執行時,不會修改state,除非發生了異常或者中斷,最後返回是否正確的執行並復位:
```
protected boolean runAndReset() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return false;
    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        if (c != null && s == NEW) {
            try {
                // 不獲取和設置返回值
                c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } // 是否正確的執行並復位 return ran && s == NEW; } ``` 總結 本文分析了FutureTask的執行過程和獲取返回值的過程,要注意如下幾個地方: FutureTask是線程安全的,在多線程下任務也只會被執行一次; 注意在執行時各類狀態的切換; get方法調用時,若是任務沒有結束,要阻塞當前線程,法阻塞的線程會保存在一個Treiber Stack中; get方法超時功能若是超時未獲取成功,會拋出TimeoutException; 注意在取消時的線程中斷,在run方法中必定要保證結束時的狀態是INTERRUPTED,不然在cancel方法中可能沒有執行interrupt,形成中斷的泄露。 部分摘自: 《JDK1.8源碼》 http://www.ideabuffer.cn/2017/04/06/FutureTask%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90/ 《java併發編程的藝術》複製代碼
相關文章
相關標籤/搜索