FutureTask源碼解析(2)——深刻理解FutureTask

前言

系列文章目錄java

有了上一篇對預備知識的瞭解以後,分析源碼就容易多了,本篇咱們就直接來看看FutureTask的源碼。node

本文的源碼基於JDK1.8。編程

Future和Task

在深刻分析源碼以前,咱們再來拎一下FutureTask究竟是幹嗎的。人如其名,FutureTask包含了FutureTask兩部分。segmentfault

咱們上一篇說過,FutureTask實現了RunnableFuture接口,即Runnable接口和Future接口。
其中Runnable接口對應了FutureTask名字中的Task,表明FutureTask本質上也是表徵了一個任務。而Future接口就對應了FutureTask名字中的Future,表示了咱們對於這個任務能夠執行某些操做,例如,判斷任務是否執行完畢,獲取任務的執行結果,取消任務的執行等。安全

因此簡單來講,FutureTask本質上就是一個「Task」,咱們能夠把它當作簡單的Runnable對象來使用。可是它又同時實現了Future接口,所以咱們能夠對它所表明的「Task」進行額外的控制操做。數據結構

Java併發工具類的三板斧

關於Java併發工具類的三板斧,咱們在分析AQS源碼的時候已經說過了,即:多線程

狀態,隊列,CAS

以這三個方面爲切入點來看源碼,有助於咱們快速的看清FutureTask的概貌:併發

狀態

首先是找狀態。框架

在FutureTask中,狀態是由state屬性來表示的,不出所料,它是volatile類型的,確保了不一樣線程對它修改的可見性:異步

private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

state屬性是貫穿整個FutureTask的最核心的屬性,該屬性的值表明了任務在運行過程當中的狀態,隨着任務的執行,狀態將不斷地進行轉變,從上面的定義中能夠看出,總共有7種狀態:包括了1個初始態,2箇中間態和4個終止態。

雖然說狀態有這麼多,可是狀態的轉換路徑卻只有四種:

state of FutureTask

  • 任務的初始狀態都是NEW, 這一點是構造函數保證的,咱們後面分析構造函數的時候再講;
  • 任務的終止狀態有4種:

    • NORMAL:任務正常執行完畢
    • EXCEPTIONAL:任務執行過程當中發生異常
    • CANCELLED:任務被取消
    • INTERRUPTED:任務被中斷
  • 任務的中間狀態有2種:

    • COMPLETING 正在設置任務結果
    • INTERRUPTING 正在中斷運行任務的線程

值得一提的是,任務的中間狀態是一個瞬態,它很是的短暫。並且任務的中間態並不表明任務正在執行,而是任務已經執行完了,正在設置最終的返回結果,因此能夠這麼說:

只要state不處於 NEW 狀態,就說明任務已經執行完畢

注意,這裏的執行完畢是指傳入的Callable對象的call方法執行完畢,或者拋出了異常。因此這裏的COMPLETING的名字顯得有點迷惑性,它並不意味着任務正在執行中,而意味着call方法已經執行完畢,正在設置任務執行的結果。

而將一個任務的狀態設置成終止態只有三種方法:

  • set
  • setException
  • cancel

咱們將在下文的源碼解析中分析這三個方法。

隊列

接着咱們來看隊列,在FutureTask中,隊列的實現是一個單向鏈表,它表示全部等待任務執行完畢的線程的集合。咱們知道,FutureTask實現了Future接口,能夠獲取「Task」的執行結果,那麼若是獲取結果時,任務尚未執行完畢怎麼辦呢?那麼獲取結果的線程就會在一個等待隊列中掛起,直到任務執行完畢被喚醒。這一點有點相似於咱們以前學習的AQS中的sync queue,在下文的分析中,你們能夠本身對照它們的異同點。

咱們前面說過,在併發編程中使用隊列一般是將當前線程包裝成某種類型的數據結構扔到等待隊列中,咱們先來看看隊列中的每個節點是怎麼個結構:

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

可見,相比於AQS的sync queue所使用的雙向鏈表中的Node,這個WaitNode要簡單多了,它只包含了一個記錄線程的thread屬性和指向下一個節點的next屬性。

值得一提的是,FutureTask中的這個單向鏈表是當作來使用的,確切來講是當作Treiber棧來使用的,不瞭解Treiber棧是個啥的能夠簡單的把它當作是一個線程安全的棧,它使用CAS來完成入棧出棧操做(想進一步瞭解的話能夠看這篇文章)。爲啥要使用一個線程安全的棧呢,由於同一時刻可能有多個線程都在獲取任務的執行結果,若是任務還在執行過程當中,則這些線程就要被包裝成WaitNode扔到Treiber棧的棧頂,即完成入棧操做,這樣就有可能出現多個線程同時入棧的狀況,所以須要使用CAS操做保證入棧的線程安全,對於出棧的狀況也是同理。

因爲FutureTask中的隊列本質上是一個Treiber棧,那麼使用這個隊列就只須要一個指向棧頂節點的指針就好了,在FutureTask中,就是waiters屬性:

/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

事實上,它就是整個單向鏈表的頭節點。

綜上,FutureTask中所使用的隊列的結構以下:
Treiber stack

CAS操做

CAS操做大多數是用來改變狀態的,在FutureTask中也不例外。咱們通常在靜態代碼塊中初始化須要CAS操做的屬性的偏移量:

// Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long stateOffset;
    private static final long runnerOffset;
    private static final long waitersOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = FutureTask.class;
            stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state"));
            runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));
            waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

從這個靜態代碼塊中咱們也能夠看出,CAS操做主要針對3個屬性,包括staterunnerwaiters,說明這3個屬性基本是會被多個線程同時訪問的。其中state屬性表明了任務的狀態,waiters屬性表明了指向棧頂節點的指針,這兩個咱們上面已經分析過了。runner屬性表明了執行FutureTask中的「Task」的線程。爲何須要一個屬性來記錄執行任務的線程呢?這是爲了中斷或者取消任務作準備的,只有知道了執行任務的線程是誰,咱們才能去中斷它。

定義完屬性的偏移量以後,接下來就是CAS操做自己了。在FutureTask,CAS操做最終調用的仍是Unsafe類的compareAndSwapXXX方法,關於這一點,咱們上一篇預備知識中已經講過了,這裏再也不贅述。

核心屬性

前面咱們以java併發編程工具類的「三板斧」爲切入點分析了FutureTask的狀態,隊列和CAS操做,對這個工具類有了初步的認識。接下來,咱們就要開始進入源碼分析了。首先咱們先來看看FutureTask的幾個核心屬性:

/**
     * The run state of this task, initially NEW.  The run state
     * transitions to a terminal state only in methods set,
     * setException, and cancel.  During completion, state may take on
     * transient values of COMPLETING (while outcome is being set) or
     * INTERRUPTING (only while interrupting the runner to satisfy a
     * cancel(true)). Transitions from these intermediate to final
     * states use cheaper ordered/lazy writes because values are unique
     * and cannot be further modified.
     *
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    /** The underlying callable; nulled out after running */
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;

能夠看出,FutureTask的核心屬性只有5個:

  • state
  • callable
  • outcome
  • runner
  • waiters

關於 state waiters runner三個屬性咱們上面已經解釋過了。剩下的callable屬性表明了要執行的任務自己,即FutureTask中的「Task」部分,爲Callable類型,這裏之因此用Callable而不用Runnable是由於FutureTask實現了Future接口,須要獲取任務的執行結果。outcome屬性表明了任務的執行結果或者拋出的異常,爲Object類型,也就是說outcome能夠是任意類型的對象,因此當咱們將正常的執行結果返回給調用者時,須要進行強制類型轉換,返回由Callable定義的V類型。這5個屬性綜合起來就完成了整個FutureTask的工做,使用關係以下:

  • 任務本尊:callable
  • 任務的執行者:runner
  • 任務的結果:outcome
  • 獲取任務的結果:state + outcome + waiters
  • 中斷或者取消任務:state + runner + waiters

構造函數

介紹完核心屬性以後,咱們來看看FutureTask的構造函數:

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

FutureTask共有2個構造函數,這2個構造函數一個是直接傳入Callable對象, 一個是傳入一個Runnable對象和一個指定的result, 而後經過Executors工具類將它適配成callable對象, 因此這兩個構造函數的本質是同樣的:

  1. 用傳入的參數初始化callable成員變量
  2. 將FutureTask的狀態設爲NEW

(關於將Runnable對象適配成Callable對象的方法Executors.callable(runnable, result)咱們在上一篇預備知識中已經講過了,不記得的同窗能夠倒回去再看一下)

接口實現

上一篇咱們提過,FutureTask實現了RunnableFuture接口:

public class FutureTask<V> implements RunnableFuture<V> {
    ...
}

所以,它必須實現Runnable和Future接口的全部方法。

Runnable接口實現

要實現Runnable接口, 就得覆寫run方法, 咱們看看FutureTask的run方法幹了點啥:

public void run() {
    if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

首先咱們看到,在run方法的一開始,就檢查當前狀態是否是New, 而且使用CAS操做將runner屬性設置位當前線程,即記錄執行任務的線程。compareAndSwapObject的用法在上一篇預備知識中已經介紹過了,這裏再也不贅述。可見,runner屬性是在運行時被初始化的。

接下來,咱們就調用Callable對象的call方法來執行任務,若是任務執行成功,就使用set(result)設置結果,不然,用setException(ex)設置拋出的異常。

咱們先來看看set(result)方法:

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

這個方法一開始經過CAS操做將state屬性由原來的NEW狀態修改成COMPLETING狀態,咱們在一開始介紹state狀態的時候說過,COMPLETING是一個很是短暫的中間態,表示正在設置執行的結果。

狀態設置成功後,咱們就把任務執行結果賦值給outcome, 而後直接把state狀態設置成NORMAL,注意,這裏是直接設置,沒有先比較再設置的操做,因爲state屬性被設置成volatile, 結合咱們上一篇預備知識的介紹,這裏putOrderedInt應當和putIntVolatile是等價的,保證了state狀態對其餘線程的可見性。

在這以後,咱們調用了 finishCompletion()來完成執行結果的設置。

接下來咱們再來看看發生了異常的版本setException(ex)

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

可見,除了將outcome屬性賦值爲異常對象,以及將state的終止狀態修改成EXCEPTIONAL,其他都和set方法相似。在方法的最後,都調用了 finishCompletion()來完成執行結果的設置。那麼咱們就來看看 finishCompletion()幹了點啥:

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}

這個方法事實上完成了一個「善後」工做。咱們先來看看if條件語句中的CAS操做:

UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)

該方法是將waiters屬性的值由原值設置爲null, 咱們知道,waiters屬性指向了Treiber棧的棧頂節點,能夠說是表明了整個Treiber棧,將該值設爲null的目的就是清空整個棧。若是設置不成功,則if語句塊不會被執行,又進行下一輪for循環,而下一輪for循環的判斷條件又是waiters!=null ,由此咱們知道,雖然最外層的for循環乍一看好像是什麼遍歷節點的操做,其實只是爲了確保waiters屬性被成功設置成null,本質上至關於一個自旋操做。

將waiters屬性設置成null之後,接下了 for (;;)死循環纔是真正的遍歷節點,能夠看出,循環內部就是一個普通的遍歷鏈表的操做,咱們前面講屬性的時候說過,Treiber棧裏面存放的WaitNode表明了當前等待任務執行結束的線程,這個循環的做用也正是遍歷鏈表中全部等待的線程,並喚醒他們。

將Treiber棧中全部掛起的線程都喚醒後,下面就是執行done方法:

/**
 * Protected method invoked when this task transitions to state
 * {@code isDone} (whether normally or via cancellation). The
 * default implementation does nothing.  Subclasses may override
 * this method to invoke completion callbacks or perform
 * bookkeeping. Note that you can query status inside the
 * implementation of this method to determine whether this task
 * has been cancelled.
 */
protected void done() { }

這個方法是一個空方法,從註釋上看,它是提供給子類覆寫的,以實現一些任務執行結束前的額外操做。

done方法以後就是callable屬性的清理了(callable = null)。

至此,整個run方法分析完了。

真的嗎???

並無!別忘了run方法最後還有一個finally塊呢:

finally {
    // runner must be non-null until state is settled to
    // prevent concurrent calls to run()
    runner = null;
    // state must be re-read after nulling runner to prevent
    // leaked interrupts
    int s = state;
    if (s >= INTERRUPTING)
        handlePossibleCancellationInterrupt(s);
}

在finally塊中,咱們將runner屬性置爲null,而且檢查有沒有遺漏的中斷,若是發現s >= INTERRUPTING, 說明執行任務的線程有可能被中斷了,由於s >= INTERRUPTING 只有兩種可能,state狀態爲INTERRUPTINGINTERRUPTED

有的同窗可能就要問了,咱前面已經執行過的set方法或者setException方法不是已經將state狀態設置成NORMAL或者EXCEPTIONAL了嗎?怎麼會出現INTERRUPTING或者INTERRUPTED狀態呢?別忘了,我們在多線程的環境中,在當前線程執行run方法的同時,有可能其餘線程取消了任務的執行,此時其餘線程就可能對state狀態進行改寫,這也就是咱們在設置終止狀態的時候用putOrderedInt方法,而沒有用CAS操做的緣由——咱們沒法確信在設置state前是處於COMPLETING中間態仍是INTERRUPTING中間態。

關於任務取消的操做,咱們後面講Future接口的實現的時候再講,回到如今的問題,咱們來看看handlePossibleCancellationInterrupt方法幹了點啥:

/**
 * Ensures that any interrupt from a possible cancel(true) is only
 * delivered to a task while in run or runAndReset.
 */
private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us.  Let's spin-wait patiently.
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
            Thread.yield(); // wait out pending interrupt
}

可見該方法是一個自旋操做,若是當前的state狀態是INTERRUPTING,咱們在原地自旋,直到state狀態轉換成終止態。

至此,run方法的分析就真的結束了。咱們來總結一下:

run方法重點作了如下幾件事:

  1. 將runner屬性設置成當前正在執行run方法的線程
  2. 調用callable成員變量的call方法來執行任務
  3. 設置執行結果outcome, 若是執行成功, 則outcome保存的就是執行結果;若是執行過程當中發生了異常, 則outcome中保存的就是異常,設置結果以前,先將state狀態設爲中間態
  4. 對outcome的賦值完成後,設置state狀態爲終止態(NORMAL或者EXCEPTIONAL)
  5. 喚醒Treiber棧中全部等待的線程
  6. 善後清理(waiters, callable,runner設爲null)
  7. 檢查是否有遺漏的中斷,若是有,等待中斷狀態完成。

這裏再插一句,咱們前面說「state只要不是NEW狀態,就說明任務已經執行完成了」就體如今這裏,由於run方法中,咱們是在c.call()執行完畢或者拋出了異常以後纔開始設置中間態和終止態的。

Future接口的實現

Future接口一共定義了5個方法,咱們一個個來看:

cancel(boolean mayInterruptIfRunning)

既然上面在分析run方法的最後,咱們提到了任務可能被別的線程取消,那咱們就趁熱打鐵,看看怎麼取消一個任務的執行:

public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}

還記得咱們上一篇在介紹Future接口的時候對cancel方法的說明嗎?

關於cancel方法,這裏要補充說幾點:
首先有如下三種狀況之一的,cancel操做必定是失敗的:

  1. 任務已經執行完成了
  2. 任務已經被取消過了
  3. 任務由於某種緣由不能被取消

其它狀況下,cancel操做將返回true。值得注意的是,cancel操做返回true並不表明任務真的就是被取消了,這取決於發動cancel狀態時,任務所處的狀態:

  1. 若是發起cancel時任務尚未開始運行,則隨後任務就不會被執行;
  2. 若是發起cancel時任務已經在運行了,則這時就須要看mayInterruptIfRunning參數了:

    • 若是mayInterruptIfRunning 爲true, 則當前在執行的任務會被中斷
    • 若是mayInterruptIfRunning 爲false, 則能夠容許正在執行的任務繼續運行,直到它執行完

咱們來看看FutureTask是怎麼實現cancel方法的這幾個規範的:

首先,對於「任務已經執行完成了或者任務已經被取消過了,則cancel操做必定是失敗的(返回false)」這兩條,是經過簡單的判斷state值是否爲NEW實現的,由於咱們前面說過了,只要state不爲NEW,說明任務已經執行完畢了。從代碼中能夠看出,只要state不爲NEW,則直接返回false。

若是state仍是NEW狀態,咱們再往下看:

UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)

這一段是根據mayInterruptIfRunning的值將state的狀態由NEW設置成INTERRUPTING或者CANCELLED,當這一操做也成功以後,就能夠執行後面的try語句了,但不管怎麼,該方法最後都返回了true

咱們再接着看try塊幹了點啥:

try {    // in case call to interrupt throws exception
    if (mayInterruptIfRunning) {
        try {
            Thread t = runner;
            if (t != null)
                t.interrupt();
        } finally { // final state
            UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
        }
    }
} finally {
    finishCompletion();
}

咱們知道,runner屬性中存放的是當前正在執行任務的線程,所以,這個try塊的目的就是中斷當前正在執行任務的線程,最後將state的狀態設爲INTERRUPTED,固然,中斷操做完成後,還須要經過finishCompletion()來喚醒全部在Treiber棧中等待的線程。

咱們如今總結一下,cancel方法實際上完成如下兩種狀態轉換之一:

  1. NEW -> CANCELLED (對應於mayInterruptIfRunning=false)
  2. NEW -> INTERRUPTING -> INTERRUPTED (對應於mayInterruptIfRunning=true)

對於第一條路徑,雖然說cancel方法最終返回了true,但它只是簡單的把state狀態設爲CANCELLED,並不會中斷線程的執行。可是這樣帶來的後果是,任務即便執行完畢了,也沒法設置任務的執行結果,由於前面分析run方法的時候咱們知道,設置任務結果有一箇中間態,而這個中間態的設置,是以當前state狀態爲NEW爲前提的。

對於第二條路徑,則會中斷執行任務的線程,咱們在倒回上面的run方法看看:

public void run() {
    if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

雖然第二條路徑中斷了當前正在執行的線程,可是,響不響應這個中斷是由執行任務的線程本身決定的,更具體的說,這取決於c.call()方法內部是否對中斷進行了響應,是否將中斷異常拋出。

那call方法中是怎麼處理中斷的呢?從上面的代碼中能夠看出,catch語句處理了全部的Throwable的異常,這天然也包括了中斷異常。

然而,值得一提的是,即便這裏進入了catch (Throwable ex){}代碼塊,setException(ex)的操做必定是失敗的,由於在咱們取消任務執行的線程中,咱們已經先把state狀態設爲INTERRUPTING了,而setException(ex)的操做要求設置前線程的狀態爲NEW。因此這裏響應cancel方法所形成的中斷最大的意義不是爲了對中斷進行處理,而是簡單的中止任務線程的執行,節省CPU資源。

那讀者可能會問了,既然這個setException(ex)的操做必定是失敗的,那放在這裏有什麼用呢?事實上,這個setException(ex)是用來處理任務本身在正常執行過程當中產生的異常的,在咱們沒有主動去cancel任務時,任務的state狀態在執行過程當中就會始終是NEW,若是任務此時本身發生了異常,則這個異常就會被setException(ex)方法成功的記錄到outcome中。

反正不管如何,run方法最終都會進入finally塊,而這時候它會發現s >= INTERRUPTING,若是檢測發現s = INTERRUPTING,說明cancel方法尚未執行到中斷當前線程的地方,那就等待它將state狀態設置成INTERRUPTED。到這裏,對cancel方法的分析就和上面對run方法的分析對接上了。

cancel方法到這裏就分析完了,若是你一條條的去對照Future接口對於cancel方法的規範,它每一條都是實現了的,而它實現的核心機理,就是對state的當前狀態的判斷和設置。因而可知,state屬性是貫穿整個FutureTask的最核心的屬性。

isCancelled()

說完了cancel,咱們再來看看 isCancelled()方法,相較而言,它就簡單多了:

public boolean isCancelled() {
    return state >= CANCELLED;
}

那麼state >= CANCELLED 包含了那些狀態呢,它包括了: CANCELLED INTERRUPTING INTERRUPTED

咱們再來回憶下上一篇講的Future接口對於isCancelled()方法的規範:

該方法用於判斷任務是否被取消了。若是一個任務在正常執行完成以前被Cancel掉了, 則返回true

再對比state的狀態圖:

isCancelled
可見選取這三個狀態做爲判斷依據是很合理的, 由於只有調用了cancel方法,纔會使state狀態進入這三種狀態。

isDone()

與 isCancelled方法相似,isDone方法也是簡單地經過state狀態來判斷。

public boolean isDone() {
    return state != NEW;
}

關於這一點,其實咱們以前已經說過了,只要state狀態不是NEW,則任務已經執行完畢了,由於state狀態不存在相似「任務正在執行中」這種狀態,即便是短暫的中間態,也是發生在任務已經執行完畢,正在設置任務結果的時候。

get()

最後咱們來看看獲取執行結果的get方法,先來看看無參的版本:

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

該方法其實很簡單,當任務尚未執行完畢或者正在設置執行結果時,咱們就使用awaitDone方法等待任務進入終止態,注意,awaitDone的返回值是任務的狀態,而不是任務的結果。任務進入終止態以後,咱們就根據任務的執行結果來返回計算結果或者拋出異常。

咱們先來看看等待任務完成的awaitDone方法,該方法是獲取任務結果最核心的方法,它完成了獲取結果,掛起線程,響應中斷等諸多操做:

private int awaitDone(boolean timed, long nanos) throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}

在具體分析它的源碼以前,有一點咱們先特別說明一下,FutureTask中會涉及到兩類線程,一類是執行任務的線程,它只有一個,FutureTask的run方法就由該線程來執行;一類是獲取任務執行結果的線程,它能夠有多個,這些線程能夠併發執行,每個線程都是獨立的,均可以調用get方法來獲取任務的執行結果。若是任務尚未執行完,則這些線程就須要進入Treiber棧中掛起,直到任務執行結束,或者等待的線程自身被中斷。

理清了這一點後,咱們再來詳細看看awaitDone方法。能夠看出,該方法的大框架是一個自旋操做,咱們一段一段來看:

for (;;) {
    if (Thread.interrupted()) {
        removeWaiter(q);
        throw new InterruptedException();
    }
    // ...
}

首先一開始,咱們先檢測當前線程是否被中斷了,這是由於get方法是阻塞式的,若是等待的任務尚未執行完,則調用get方法的線程會被扔到Treiber棧中掛起等待,直到任務執行完畢。可是,若是任務遲遲沒有執行完畢,則咱們也有可能直接中斷在Treiber棧中的線程,以中止等待。

當檢測到線程被中斷後,咱們調用了removeWaiter:

private void removeWaiter(WaitNode node) {
    if (node != null) {
        ...
    }
}

removeWaiter的做用是將參數中的node從等待隊列(即Treiber棧)中移除。若是此時線程尚未進入Treiber棧,則 q=null,那麼removeWaiter(q)啥也不幹。在這以後,咱們就直接拋出了InterruptedException異常。

接着往下看:

for (;;) {
    /*if (Thread.interrupted()) {
        removeWaiter(q);
        throw new InterruptedException();
    }*/
    int s = state;
    if (s > COMPLETING) {
        if (q != null)
            q.thread = null;
        return s;
    }
    else if (s == COMPLETING) // cannot time out yet
        Thread.yield();
    else if (q == null)
        q = new WaitNode();
    else if (!queued)
        queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                             q.next = waiters, q);
    else if (timed) {
        nanos = deadline - System.nanoTime();
        if (nanos <= 0L) {
            removeWaiter(q);
            return state;
        }
        LockSupport.parkNanos(this, nanos);
    }
    else
        LockSupport.park(this);
}
  • 若是任務已經進入終止態(s > COMPLETING),咱們就直接返回任務的狀態;
  • 不然,若是任務正在設置執行結果(s == COMPLETING),咱們就讓出當前線程的CPU資源繼續等待
  • 不然,就說明任務尚未執行,或者任務正在執行過程當中,那麼這時,若是q如今還爲null, 說明當前線程尚未進入等待隊列,因而咱們新建了一個WaitNode, WaitNode的構造函數咱們以前已經看過了,就是生成了一個記錄了當前線程的節點;
  • 若是q不爲null,說明表明當前線程的WaitNode已經被建立出來了,則接下來若是queued=false,表示當前線程尚未入隊,因此咱們執行了:
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);

這行代碼的做用是經過CAS操做將新建的q節點添加到waiters鏈表的頭節點以前,其實就是Treiber棧的入棧操做,寫的仍是很簡潔的,一行代碼就搞定了,若是你們仍是以爲暈乎,下面是它等價的僞代碼:

q.next = waiters; //當前節點的next指向目前的棧頂元素
//若是棧頂節點在這個過程當中沒有變,即沒有發生併發入棧的狀況
if(waiters的值仍是上面q.next所使用的waiters值){ 
    waiters = q; //修改棧頂的指針,指向剛剛入棧的節點
}

這個CAS操做就是爲了保證同一時刻若是有多個線程在同時入棧,則只有一個可以操做成功,也即Treiber棧的規範。

若是以上的條件都不知足,則再接下來由於如今是不帶超時機制的get,timed爲false,則else if代碼塊跳過,而後來到最後一個else, 把當前線程掛起,此時線程就處於阻塞等待的狀態。

至此,在任務沒有執行完畢的狀況下,獲取任務執行結果的線程就會在Treiber棧中被LockSupport.park(this)掛起了。

那麼這個掛起的線程何時會被喚醒呢?有兩種狀況:

  1. 任務執行完畢了,在finishCompletion方法中會喚醒全部在Treiber棧中等待的線程
  2. 等待的線程自身由於被中斷等緣由而被喚醒。

咱們接下來就繼續看看線程被喚醒後的狀況,此時,線程將回到for(;;)循環的開頭,繼續下一輪循環:

for (;;) {
    if (Thread.interrupted()) {
        removeWaiter(q);
        throw new InterruptedException();
    }

    int s = state;
    if (s > COMPLETING) {
        if (q != null)
            q.thread = null;
        return s;
    }
    else if (s == COMPLETING) // cannot time out yet
        Thread.yield();
    else if (q == null)
        q = new WaitNode();
    else if (!queued)
        queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                             q.next = waiters, q);
    else if (timed) {
        nanos = deadline - System.nanoTime();
        if (nanos <= 0L) {
            removeWaiter(q);
            return state;
        }
        LockSupport.parkNanos(this, nanos);
    }
    else
        LockSupport.park(this); // 掛起的線程從這裏被喚醒
}

首先天然仍是檢測中斷,所不一樣的是,此時q已經不爲null了,所以在有中斷髮生的狀況下,在拋出中斷以前,多了一步removeWaiter(q)操做,該操做是將當前線程從等待的Treiber棧中移除,相比入棧操做,這個出棧操做要複雜一點,這取決於節點是否位於棧頂。下面咱們來仔細分析這個出棧操做:

private void removeWaiter(WaitNode node) {
    if (node != null) {
        node.thread = null;
        retry:
        for (;;) {          // restart on removeWaiter race
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next;
                if (q.thread != null)
                    pred = q;
                else if (pred != null) {
                    pred.next = s;
                    if (pred.thread == null) // check for race
                        continue retry;
                }
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
                    continue retry;
            }
            break;
        }
    }
}

首先,咱們把要出棧的WaitNode的thread屬性設置爲null, 這至關於一個標記,是咱們後面在waiters鏈表中定位該節點的依據。

(1) 要移除的節點就在棧頂

咱們先來看看該節點就位於棧頂的狀況,這說明在該節點入棧後,並無別的線程再入棧了。因爲一開始咱們就將該節點的thread屬性設爲了null,所以,前面的q.thread != nullpred != null都不知足,咱們直接進入到最後一個else if 分支:

else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
    continue retry;

這一段是棧頂節點出棧的操做,和入棧相似,採用了CAS比較,將棧頂元素設置成原棧頂節點的下一個節點。

值得注意的是,當CAS操做不成功時,程序會回到retry處重來,但即便CAS操做成功了,程序依舊會遍歷完整個鏈表,找尋node.thread == null 的節點,並將它們一併從鏈表中剔除。

(2) 要移除的節點不在棧頂

當要移除的節點不在棧頂時,咱們會一直遍歷整個鏈表,直到找到q.thread == null的節點,找到以後,咱們將進入

else if (pred != null) {
    pred.next = s;
    if (pred.thread == null) // check for race
        continue retry;
}

這是由於節點不在棧頂,則其必然是有前驅節點pred的,這時,咱們只是簡單的讓前驅節點指向當前節點的下一個節點,從而將目標節點從鏈表中剔除。

注意,後面多加的那個if判斷是頗有必要的,由於removeWaiter方法並無加鎖,因此可能有多個線程在同時執行,WaitNode的兩個成員變量threadnext都被設置成volatile,這保證了它們的可見性,若是咱們在這時發現了pred.thread == null,那就意味着它已經被另外一個線程標記了,將在另外一個線程中被拿出waiters鏈表,而咱們當前目標節點的原後繼節點如今是接在這個pred節點上的,所以,若是pred已經被其餘線程標記爲要拿出去的節點,咱們如今這個線程再繼續日後遍歷就沒有什麼意義了,因此這時就調到retry處,從頭再遍歷。

若是pred節點沒有被其餘線程標記,那咱們就接着往下遍歷,直到整個鏈表遍歷完。

至此,將節點從waiters鏈表中移除的removeWaiter操做咱們就分析完了,咱們總結一下該方法:

在該方法中,會傳入一個須要移除的節點,咱們會將這個節點的thread屬性設置成null,以標記該節點。而後不管如何,咱們會遍歷整個鏈表,清除那些被標記的節點(只是簡單的將節點從鏈表中剔除)。若是要清除的節點就位於棧頂,則還須要注意從新設置waiters的值,指向新的棧頂節點。因此能夠看出,雖然說removeWaiter方法傳入了須要剔除的節點,可是事實上它可能剔除的不止是傳入的節點,而是全部已經被標記了的節點,這樣不只清除操做容易了些(不須要專門去定位傳入的node在哪裏),並且提高了效率(能夠同時清除全部已經被標記的節點)。

咱們再回到awaitDone方法裏:

private int awaitDone(boolean timed, long nanos) throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q); // 剛剛分析到這裏了,咱們接着往下看
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}

若是線程不是由於中斷被喚醒,則會繼續往下執行,此時會再次獲取當前的state狀態。所不一樣的是,此時q已經不爲null, queued已經爲true了,因此已經不須要將當前節點再入waiters棧了。

至此咱們知道,除非被中斷,不然get方法會在原地自旋等待(用的是Thread.yield,對應於s == COMPLETING)或者直接掛起(對應任務尚未執行完的狀況),直到任務執行完成。而咱們前面分析run方法和cancel方法的時候知道,在run方法結束後,或者cancel方法取消完成後,都會調用finishCompletion()來喚醒掛起的線程,使它們得以進入下一輪循環,獲取任務執行結果。

最後,等awaitDone函數返回後,get方法返回了report(s),以根據任務的狀態,彙報執行結果:

@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

可見,report方法很是簡單,它根據當前state狀態,返回正常執行的結果,或者拋出指定的異常。

至此,get方法就分析結束了。

值得注意的是,awaitDone方法和get方法都沒有加鎖,這在多個線程同時執行get方法的時候會不會產生線程安全問題呢?經過查看方法內部的參數咱們知道,整個方法內部用的大多數是局部變量,所以不會產生線程安全問題,對於全局的共享變量waiters的修改時,也使用了CAS操做,保證了線程安全,而state變量自己是volatile的,保證了讀取時的可見性,所以整個方法調用雖然沒有加鎖,它仍然是線程安全的。

get(long timeout, TimeUnit unit)

最後咱們來看看帶超時版本的get方法:

public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

它和上面不帶超時時間的get方法很相似,只是在awaitDone方法中多了超時檢測:

else if (timed) {
    nanos = deadline - System.nanoTime();
    if (nanos <= 0L) {
        removeWaiter(q);
        return state;
    }
    LockSupport.parkNanos(this, nanos);
}

即,若是指定的超時時間到了,則直接返回,若是返回時,任務尚未進入終止狀態,則直接拋出TimeoutException異常,不然就像get()方法同樣,正常的返回執行結果。

總結

FutureTask實現了Runnable和Future接口,它表示了一個帶有任務狀態和任務結果的任務,它的各類操做都是圍繞着任務的狀態展開的,值得注意的是,在全部的7個任務狀態中,只要不是NEW狀態,就表示任務已經執行完畢或者再也不執行了,並無表示「任務正在執行中」的狀態。

除了表明了任務的Callable對象、表明任務執行結果的outcome屬性,FutureTask還包含了一個表明全部等待任務結束的線程的Treiber棧,這一點其實和各類鎖的等待隊列特別像,即若是拿不到鎖,則當前線程就會被扔進等待隊列中;這裏則是若是任務尚未執行結束,則全部等待任務執行完畢的線程就會被扔進Treiber棧中,直到任務執行完畢了,纔會被喚醒。

FutureTask雖然爲咱們提供了獲取任務執行結果的途徑,遺憾的是,在獲取任務結果時,若是任務尚未執行完成,則當前線程會自旋或者掛起等待,這和咱們實現異步的初衷是相違背的,咱們後面將繼續介紹另外一個同步工具類CompletableFuture, 它解決了這個問題。

(完)

系列文章目錄

相關文章
相關標籤/搜索