閱讀 JDK 源碼:異步任務 FutureTask

在 Java 中,Runnable 接口表示一個沒有返回結果的任務,而 Callable 接口表示具備返回結果的任務。
在併發編程中,異步執行任務,再獲取任務結果,能夠提升系統的吞吐量。Future 接口應運而生,它表示異步任務的執行結果,並提供了檢查任務是否執行完、取消任務、獲取任務執行結果等功能。FutureTask 是 Future 接口的基本實現,常與線程池實現類 ThreadPoolExecutor 配合使用。java

本文基於 jdk1.8.0_91

1. 繼承體系

繼承體系
RunnableFuture 接口同時實現了 Runnable 接口和 Future 接口,是一種冗餘設計。node

java.util.concurrent.RunnableFuture編程

/**
 * A {@link Future} that is {@link Runnable}. Successful execution of
 * the {@code run} method causes completion of the {@code Future}
 * and allows access to its results.
 * 
 * @see FutureTask
 * @see Executor
 * @since 1.6
 * @author Doug Lea
 * @param <V> The result type returned by this Future's {@code get} method
 */
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

FutureTask 是一個可取消的異步任務,是對 Future 接口的基本實現,具備如下功能:segmentfault

  • 啓動或中斷的任務的執行;
  • 判斷任務是否執行完成;
  • 獲取任務執行完成後的結果。

同時,FutureTask 能夠用於包裝 Callable 或 Runnable 對象。
因爲它實現了 Runnable 接口,能夠提交給 Executor 執行。安全

/**
 * A cancellable asynchronous computation. 
 *
 * @since 1.5
 * @author Doug Lea
 * @param <V> The result type returned by this FutureTask's {@code get} methods
 */
public class FutureTask<V> implements RunnableFuture<V>

java.util.concurrent.Executor併發

/**
 * An object that executes submitted {@link Runnable} tasks.
 *
 * @since 1.5
 * @author Doug Lea
 */
public interface Executor {

    void execute(Runnable command);
}

2. 屬性

java.util.concurrent.FutureTaskless

// The run state of this task, initially NEW.
// 任務的執行狀態,初始爲 NEW。
private volatile int state;

/** The underlying callable; nulled out after running */
// 須要執行的任務,任務執行完後爲空
private Callable<V> callable;

/** The result to return or exception to throw from get() */
// 任務的執行結果,或者任務拋出的異常
private Object outcome; // non-volatile, protected by state reads/writes

/** The thread running the callable; CASed during run() */
// 執行任務的線程
private volatile Thread runner;

/** Treiber stack of waiting threads */
// 指向棧頂的指針,棧結構用於存儲等待任務執行結果的線程
private volatile WaitNode waiters;

其中 state、runner、waiters 三個屬性在併發時存在爭用,採用 CAS 維護其準確性。異步

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = FutureTask.class;
        stateOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("state"));
        runnerOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("runner"));
        waitersOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("waiters"));
    } catch (Exception e) {
        throw new Error(e);
    }
}

2.1 狀態定義

/**
 * The run state of this task, initially NEW.  The run state
 * transitions to a terminal state only in methods set,
 * setException, and cancel.  During completion, state may take on
 * transient values of COMPLETING (while outcome is being set) or
 * INTERRUPTING (only while interrupting the runner to satisfy a
 * cancel(true)). Transitions from these intermediate to final
 * states use cheaper ordered/lazy writes because values are unique
 * and cannot be further modified.
 *
 * Possible state transitions:
 * NEW -> COMPLETING -> NORMAL
 * NEW -> COMPLETING -> EXCEPTIONAL
 * NEW -> CANCELLED
 * NEW -> INTERRUPTING -> INTERRUPTED
 */
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

FutureTask 中使用 state 表明任務在運行過程當中的狀態。隨着任務的執行,狀態將不斷地進行轉變。async

狀態的說明ide

  • NEW: 新建狀態,任務都從該狀態開始。
  • COMPLETING: 任務正在執行中。
  • NORMAL: 任務正常執行完成。
  • EXCEPTIONAL: 任務執行過程當中拋出了異常。
  • CANCELLED: 任務被取消(不響應中斷)。
  • INTERRUPTING:任務正在被中斷。
  • INTERRUPTED: 任務已經中斷。

狀態轉移過程

NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED

狀態的分類

  • 任務的初始狀態:NEW
  • 任務的中間狀態:COMPLETING、INTERRUPTING
  • 任務的終止狀態:NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTED

2.2 狀態使用

FutureTask 中判斷任務是否已取消、是否已完成,是根據 state 來判斷的。

public boolean isCancelled() {
    return state >= CANCELLED; // CANCELLED、INTERRUPTING、INTERRUPTED
}

public boolean isDone() {
    return state != NEW;
}

能夠看到:

  • 被取消或被中斷的任務(CANCELLED、INTERRUPTING、INTERRUPTED),都視爲已取消。
  • 當任務離開了初始狀態 NEW,就視爲任務已結束。任務的中間態很短暫,並不表明任務正在執行,而是任務已經執行完了,正在設置最終的返回結果。

根據狀態值,FutureTask 能夠保證已經完成的任務不會被再次運行或者被取消。

中間狀態雖然是一個瞬時狀態,在 FutureTask 中用於線程間的通信。例如:

  • 在 FutureTask#run 中檢測到狀態 >= INTERRUPTING,說明其餘線程發起了取消操做,當前線程需等待對方完成中斷。
  • 在 FutureTask#get 中檢測到狀態 <= COMPLETING,說明執行任務的線程還沒有處理完,當前線程需等待對方完成任務。

2.2 棧(Treiber stack)

/** Treiber stack of waiting threads */
private volatile WaitNode waiters; // 棧頂指針

/**
 * Simple linked list nodes to record waiting threads in a Treiber
 * stack.  See other classes such as Phaser and SynchronousQueue
 * for more detailed explanation.
 */
static final class WaitNode {
    volatile Thread thread; // 等待任務執行結果的線程
    volatile WaitNode next; // 棧的下一個節點
    WaitNode() { thread = Thread.currentThread(); }
}

FutureTask 使用鏈表來構造棧(Treiber stack,使用 CAS 保證棧操做的線程安全,參考 java.util.concurrent.SynchronousQueue.TransferStack)。
其中 waiters 是鏈表的頭節點,表明棧頂的指針。

棧的做用
FutureTask 實現了 Future 接口,若是獲取結果時,任務尚未執行完畢,那麼獲取結果的線程就在棧中掛起,直到任務執行完畢被喚醒。

3. 構造函數

賦值任務,設置任務的初始狀態。

/**
 * Creates a {@code FutureTask} that will, upon running, execute the
 * given {@code Callable}.
 *
 * @param  callable the callable task
 * @throws NullPointerException if the callable is null
 */
public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

/**
 * Creates a {@code FutureTask} that will, upon running, execute the
 * given {@code Runnable}, and arrange that {@code get} will return the
 * given result on successful completion.
 *
 * @param runnable the runnable task
 * @param result the result to return on successful completion. If
 * you don't need a particular result, consider using
 * constructions of the form:
 * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
 * @throws NullPointerException if the runnable is null
 */
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

值得注意的兩個地方:

  • FutureTask 建立的時候,狀態爲 NEW。
  • 因爲 FutureTask 使用 Callable 表示任務,需用 Executors#callable 方法將 Runnable 轉換爲 Callable。

測試:

@Test
public void executors() throws Exception {
    Callable<String> callable = Executors.callable(new Runnable() {
        @Override
        public void run() {
            System.out.println("run!");
        }
    }, "haha");
    String call = callable.call();
    System.out.println("call = " + call);
}

執行結果:

run!
call = haha

4. Runnable 實現

4.1 FutureTask#run

代碼流程:

  1. 校驗任務是否可執行:任務已執行或其餘線程已獲取執行權,則沒法執行。
  2. 調用 Callable#call 執行任務。
  3. 若任務執行失敗,使用 setException 方法設置異常。
  4. 若任務執行成功,使用 set 方法設置返回結果。
  5. 最後,清除對當前線程的記錄,判斷是否等待中斷。

注意,在任務執行結束後,屬性 runner、callable 都會被清空。

java.util.concurrent.FutureTask#run

public void run() {
    // state != NEW 說明任務已經執行完畢,再也不重複執行
    // 將 runner 屬性設置爲當前線程,若設置失敗說明其餘線程已獲取執行權
    if (state != NEW || 
        !UNSAFE.compareAndSwapObject(this, runnerOffset,  
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call(); // 執行 Callable#call
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex); // 執行失敗,設置異常
            }
            if (ran)
                set(result); // 執行成功,設置結果
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING) // INTERRUPTING、INTERRUPTED
            handlePossibleCancellationInterrupt(s);
    }
}

4.1.1 FutureTask#set

任務執行成功以後,調用該方法。
用於設置任務狀態、設置任務執行結果、喚醒棧中等待任務執行結果的線程。

java.util.concurrent.FutureTask#set

/**
 * Sets the result of this future to the given value unless
 * this future has already been set or has been cancelled.
 *
 * <p>This method is invoked internally by the {@link #run} method
 * upon successful completion of the computation.
 *
 * @param v the value
 */
protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // state: NEW -> COMPLETING
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state: COMPLETING -> NORMAL
        finishCompletion();
    }
}

狀態變化:NEW -> COMPLETING -> NORMAL

因爲 state 屬性是 volatile,這裏 putOrderedInt 和 putIntVolatile 是等價的,保證可見性。

爲何這裏使用 lazySet 而沒有用 CAS :

  • 在併發狀況下,只有一個線程執行 CAS 將 state 從 NEW 修改成 COMPLETING 會成功,其餘線程均失敗。
  • 所以隨後只有一個線程繼續修改 state 爲 NORMAL,不存在爭用,無需使用 CAS。

4.1.2 FutureTask#setException

任務執行發生異常,調用該方法。
除了設置任務狀態不一樣,其餘與 FutureTask#set 相同。

狀態變化:NEW -> COMPLETING -> EXCEPTIONAL

java.util.concurrent.FutureTask#setException

/**
 * Causes this future to report an {@link ExecutionException}
 * with the given throwable as its cause, unless this future has
 * already been set or has been cancelled.
 *
 * <p>This method is invoked internally by the {@link #run} method
 * upon failure of the computation.
 *
 * @param t the cause of failure
 */
protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // state: NEW -> COMPLETING
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state: COMPLETING -> EXCEPTIONAL 
        finishCompletion();
    }
}

4.1.3 FutureTask#finishCompletion

執行完畢,喚醒等待線程。

java.util.concurrent.FutureTask#finishCompletion

/**
 * Removes and signals all waiting threads, invokes done(), and
 * nulls out callable.
 */
private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { 
        // CAS 將 waiters 屬性置空:1. CAS 成功,遍歷鏈表喚醒全部節點;2. CAS 失敗,從新讀取 waiters
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t); // 喚醒節點上的線程
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc // 出棧
                q = next;
            }
            break;
        }
    }

    done(); // 預留方法

    callable = null;        // to reduce footprint
}

4.1.4 FutureTask#handlePossibleCancellationInterrupt

在 FutureTask#cancel 方法中,會先將 state 設爲 INTERRUPTING,再中斷 runner 線程,最後將 state 設爲 INTERRUPTED。

因此在 FutureTask#run 的 finally 塊中若是檢查到 state == INTERRUPTING,說明其餘線程發起了 cancel(true) 操做,這裏須要等待其餘線程中斷當前線程。直到檢測到 state != INTERRUPTING,說明其餘線程已完成中斷當前線程操做。

java.util.concurrent.FutureTask#handlePossibleCancellationInterrupt

/**
 * Ensures that any interrupt from a possible cancel(true) is only
 * delivered to a task while in run or runAndReset.
 */
private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us.  Let's spin-wait patiently.
    if (s == INTERRUPTING)
        while (state == INTERRUPTING) // 其餘線程中斷當前線程以後,會設置 state 爲 INTERRUPTED,使這裏結束循環
            Thread.yield(); // wait out pending interrupt

    // assert state == INTERRUPTED;

    // We want to clear any interrupt we may have received from
    // cancel(true).  However, it is permissible to use interrupts
    // as an independent mechanism for a task to communicate with
    // its caller, and there is no way to clear only the
    // cancellation interrupt.
    //
    // Thread.interrupted();
}

4.2 FutureTask#runAndReset

支持週期性執行任務:

  • 執行任務成功,不用返回任務結果,也不用改變任務狀態(保持爲 NEW),下次能夠再次執行任務。
  • 執行任務失敗,則設置異常結果,並修改任務狀態(不爲 NEW),下次沒法再次執行任務。
  • 取消執行任務,則等待其餘線程中斷當前線程,並修改任務狀態(不爲 NEW),下次沒法再次執行任務。
/**
 * designed for use with tasks that intrinsically execute more    // 設計用來支持定時任務
 * than once.
 *
 * @return {@code true} if successfully run and reset
 */
protected boolean runAndReset() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return false;
    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        if (c != null && s == NEW) {
            try {
                c.call(); // don't set result
                ran = true;
            } catch (Throwable ex) {
                setException(ex); // 修改 state: NEW -> COMPLETING -> EXCEPTIONAL
            }
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    return ran && s == NEW; // 返回 true 則容許下次再執行 runAndReset
}

5. Future 實現

5.1 Future#get

獲取任務執行的結果:

  • 若是任務未完成(NEW、COMPLETING),取結果的線程會阻塞(或自旋)。
  • 若是任務執行出錯(EXCEPTIONAL),拋出 ExecutionException
  • 若是任務被取消了(CANCELLED、INTERRUPTING、INTERRUPTED),拋出 CancellationException
  • 若是線程等待被中斷,拋出 InterruptedException

java.util.concurrent.FutureTask#get()

/**
 * @throws CancellationException {@inheritDoc}
 */
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L); // 自旋或阻塞等待任務完成
    return report(s);             // 獲取任務執行結果或拋出異常
}

5.1.1 FutureTask#awaitDone

等待任務完成(任務執行完成、任務執行出現異常、任務取消執行),若當前線程發生中斷、超時則中止等待。

在自旋中進行判斷:

  • 若當前線程已中斷,則將節點出棧,拋出 InterruptedException。
  • 若 state > COMPLETING,說明任務已經完成,返回當前 state。
  • 若 state == COMPLETING,說明任務即將完成,當前線程繼續自旋。
  • 若 state < COMPLETING,須要將當前線程入棧等待:

    • 無超時時間,一直等待直到被其餘線程喚醒(FutureTask#run 或 FutureTask#cancel)或發生中斷(Thread#interrupt);
    • 有超時時間,阻塞直到超時、被喚醒、發生中斷。若已超時,將節點出棧,返回 state。

java.util.concurrent.FutureTask#awaitDone

/**
 * Awaits completion or aborts on interrupt or timeout.
 *
 * @param timed true if use timed waits
 * @param nanos time to wait, if timed
 * @return state upon completion
 */
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) { // 檢查並清除中斷狀態
            removeWaiter(q);        // 已中斷,將節點出棧
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) { // 其餘線程已完成任務,結束等待
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();   // 建立節點,設置 q.thread
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q); // 節點 q 入棧,做爲新的頭節點 waiters
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);  // 已超時,將節點出棧
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this); // 進入阻塞,由 FutureTask#run 或 FutureTask#cancel 來喚醒(內部均調用 FutureTask#finishCompletion)
    }
}

5.1.2 FutureTask#report

當前線程等待完畢,獲取任務的執行結果,或者拋出異常。

java.util.concurrent.FutureTask#report

/**
 * Returns result or throws exception for completed task.
 *
 * @param s completed state value
 */
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED) // CANCELLED、INTERRUPTING、INTERRUPTED
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

5.2 Future#get(timeout, unit)

在必定的時間以內,等待獲取任務執行的結果。

/**
 * @throws CancellationException {@inheritDoc}
 */
public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException(); // 等待超時了,任務尚未執行完,則拋出 TimeoutException
    return report(s);
}

5.3 Future#cancel

嘗試取消任務的執行:

  • 若是任務已完成或已取消,則取消操做會失敗,返回 false。
  • 若是任務還未執行,則取消操做會成功,返回 true。
  • 若是任務正在執行,方法的參數就會指示線程是否須要中斷:

    • mayInterruptIfRunning 爲 true,則當前正在執行的任務會被中斷;
    • mayInterruptIfRunning 爲 false,則容許正在執行的任務繼續運行,直到它執行完。

狀態變化:
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED

public boolean cancel(boolean mayInterruptIfRunning) {
    // 若是任務尚未啓動(NEW),則修改任務狀態(INTERRUPTING or CANCELLED),修改爲功則進入下一步
    // 若是任務狀態不是 NEW,則直接返回。說明任務已完結(已完成、已取消、出現異常),沒法取消,返回 false
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception 
        // 進入這裏,說明任務狀態爲 INTERRUPTING or CANCELLED
        // mayInterruptIfRunning 爲 true 說明須要中斷執行任務的線程,爲 false 容許任務繼續執行完
        if (mayInterruptIfRunning) { 
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                // 只有一個線程會執行到這裏,無需使用 CAS
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // INTERRUPTING -> INTERRUPTED 
            }
        }
    } finally {
        finishCompletion(); // 喚醒等待線程
    }
    return true;
}

6. 示例

使用三個線程依次執行:提交任務、等待任務、取消任務。
觀察執行結果,理解併發狀況下多個線程之間如何使用 Future 進行交互。

/**
 * 三個線程依次執行:提交任務、等待任務、取消任務
 * 在任務未執行完的時候,取消任務。
 * 
 * @author Sumkor
 * @since 2021/4/28
 */
@Test
public void cancel() throws InterruptedException {
    // 定義任務
    FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
        @Override
        public String call() throws Exception {
            Thread.sleep(10000);
            return "哦豁";
        }
    });

    CountDownLatch submitGate = new CountDownLatch(1); // 等待任務提交
    CountDownLatch endGate = new CountDownLatch(3);    // 等待線程執行完

    // 提交任務
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                submitGate.countDown();

                System.out.println(Thread.currentThread().getName() + " 執行任務開始");
                futureTask.run();
                System.out.println(Thread.currentThread().getName() + " 執行任務結束");
            } finally {
                endGate.countDown();
            }
        }
    }).start();

    // 等待任務
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                submitGate.await();
                Thread.sleep(1000);// 等待 futureTask.run() 執行一段時間後再獲取結果

                System.out.println(Thread.currentThread().getName() + " 獲取任務結果開始");
                String result = futureTask.get();
                System.out.println(Thread.currentThread().getName() + " 獲取任務結果結束 " + result);
            } catch (Exception e) {
                System.out.println(Thread.currentThread().getName() + " 獲取任務結果失敗 " + e.getMessage());
                e.printStackTrace();
            } finally {
                endGate.countDown();
            }
        }
    }).start();

    // 取消任務
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                submitGate.await();
                Thread.sleep(2000);// 等待 futureTask.get() 執行一段時間後再取消任務

                System.out.println(Thread.currentThread().getName() + " 取消任務開始");
                boolean cancel = futureTask.cancel(true);
                System.out.println(Thread.currentThread().getName() + " 取消任務結束 " + cancel);
            } catch (Exception e) {
                System.out.println(Thread.currentThread().getName() + " 取消任務失敗 " + e.getMessage());
                e.printStackTrace();
            } finally {
                endGate.countDown();
            }
        }
    }).start();

    endGate.await();
}

執行結果:

Thread-0 執行任務開始
Thread-1 獲取任務結果開始
Thread-2 取消任務開始
Thread-2 取消任務結束 true
Thread-0 執行任務結束
Thread-1 獲取任務結果失敗 null
java.util.concurrent.CancellationException
    at java.util.concurrent.FutureTask.report(FutureTask.java:121)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at com.sumkor.pool.FutureTest$6.run(FutureTest.java:129)
    at java.lang.Thread.run(Thread.java:745)

說明:

  • 線程 A 啓動任務一段時間後,線程 B 來獲取任務結果,進入等待。
  • 隨後線程 C 取消任務,將線程 A 中斷(線程 A 不會拋異常,由於 FutureTask#cancel 先一步修改了 state 致使 FutureTask#setException 中 CAS 失敗)。
  • 此時線程 B 在等待中被喚醒(由線程 C 喚醒,檢查到 state 爲 INTERRUPTED)並拋出異常 CancellationException。

7. 總結

  • FutureTask 實現了 Runnable 和 Future 接口,是一個可取消的異步任務。
  • FutureTask 中的任務具備 7 種狀態,多個線程之間經過該狀態來操做任務,如判斷任務是否已完成、取消任務、獲取任務結果。
  • FutureTask 中只要任務不是 NEW 狀態,就表示任務已經執行完畢或者再也不執行了,並無表示「任務正在執行中」的狀態。
  • FutureTask 中使用鏈表和 CAS 機制構建一個併發安全的棧,用於存儲等待獲取任務結果的線程。
  • FutureTask 在等待獲取任務結果時,依舊會阻塞主線程,違背了異步的初衷。JDK 8 引入了 CompletableFuture,利用回調機制來作到異步獲取任務結果。

做者:Sumkor
連接:https://segmentfault.com/a/11...

相關文章
相關標籤/搜索