FutureTask詳解

1 基本概念

1.1 Callable與Future

Runnable封裝一個異步運行的任務,能夠把它想象成爲一個沒有參數和返回值的異步方法。Callable與Runnable相似,可是有返回值。Callable接口是一個參數化的類型,只有一個方法call。併發

public interface Callable<V> {
    V call() throws Exception;
}

類型參數是返回值的類型。例如,異步

Callable<Integer>表示一個最終返回Integer對象的異步計算。

Future保存異步計算的結果。能夠啓動一個計算,將Future對象交給某個線程,而後忘掉它。Future對象的全部者在結果計算好以後就能夠得到它。
Future接口具備下面的方法:高併發

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

第一個get方法的調用被阻塞,知道計算完成。若是在計算完成以前,第二個get方法的調用超時,拋出一個TimeoutException異常。若是運行該計算的線程被中斷,兩個方法都將拋出InterruptedException。若是計算已經完成,那麼get方法當即返回。源碼分析

若是計算還在進行,isDone方法返回false;若是完成了,則返回true。this

能夠用cancel方法取消該計算。若是計算尚未開始,它被取消且再也不開始。若是計算處於運行之中,那麼若是mayInterrupt參數爲true,它就被中斷。線程

1.2 FutureTask

FutureTask包裝器是一種很是便利的機制,同時實現了Future和Runnable接口。code

類圖以下:對象

FutureTask的狀態轉換過程:blog

* NEW -> COMPLETING -> NORMAL
 * NEW -> COMPLETING -> EXCEPTIONAL
 * NEW -> CANCELLED
 * NEW -> INTERRUPTING -> INTERRUPTED

1.3 FutureTask的執行過程

建立一個futureTask對象task
提交task到調度器executor等待調度或者在另一個線程中執行task

等待調度中...

若是此時currentThread調取執行結果task.get(),會有幾種狀況
if task 尚未被executor調度或正在執行中
    阻塞當前線程,並加入到一個阻塞鏈表中waitNode
else if task被其它Thread取消,並取消成功 或task處於中斷狀態
    throw exception
else if task執行完畢,返回執行結果,或執行存在異常,返回異常信息
    
        
若是此時有另一個線程調用task.get()
    
執行過程同上

2 應用場景

1. Future用於異步獲取執行結果或者取消任務。
2. 在高併發場景下確保任務只執行一次。

3 基本例子

Callable<Integer> myComputation = ...;
FutureTask<Integer> task = new FutureTask<Integer>(myComputation);
Thread t = new Thread(task);
t.start();
...
Integer result = task.get(); //獲取結果

4 FutureTask源碼分析

4.1 核心狀態

/**
 * The run state of this task, initially NEW.  The run state
 * transitions to a terminal state only in methods set,
 * setException, and cancel.  During completion, state may take on
 * transient values of COMPLETING (while outcome is being set) or
 * INTERRUPTING (only while interrupting the runner to satisfy a
 * cancel(true)). Transitions from these intermediate to final
 * states use cheaper ordered/lazy writes because values are unique
 * and cannot be further modified.
 * 
 * Possible state transitions:
 * NEW -> COMPLETING -> NORMAL
 * NEW -> COMPLETING -> EXCEPTIONAL
 * NEW -> CANCELLED
 * NEW -> INTERRUPTING -> INTERRUPTED
 */
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

4.2 建立FutureTask

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}
    
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

4.3 獲取執行結果

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

4.4 執行方法

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

4.5 設置狀態

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

5 高級示例

public class Memoizer<A, V> implements Computable<A, V> {
    private final ConcurrentMap<A, Future<V>> cache = new ConcurrentMap<A, Future>>();
    private final Computable<A, V> c;
    
    public Memoizer(Computable<A, V> c) {
        this.c = c;
    }
    
    public C computer(final A arg) throws InterruptedException {
        while(true) {
            Future<V> f = cache.get(arg);
            if(f == null) {
                Callable<V> eval = new Callable<V>() {
                    public V call() throws InterruptedException {
                        return c.compute(arg);
                    }
                };
                FutureTask<V> ft = new FutureTask<V>(eval);
                f = cache.putIfAbsent(arg, ft);
                if(f == null) {
                    f = ft;
                    ft.run();
                }
            }
            
            try {
                return f.get();
            } catch(CancellationException e) {
                cache.remove(arg, f);
            } catch(ExecutionException e) {
                throw launderThrowable(e.getCause());
            }
        }
    }
}
相關文章
相關標籤/搜索