Runnable封裝一個異步運行的任務,能夠把它想象成爲一個沒有參數和返回值的異步方法。Callable與Runnable相似,可是有返回值。Callable接口是一個參數化的類型,只有一個方法call。併發
public interface Callable<V> { V call() throws Exception; }
類型參數是返回值的類型。例如,異步
Callable<Integer>表示一個最終返回Integer對象的異步計算。
Future保存異步計算的結果。能夠啓動一個計算,將Future對象交給某個線程,而後忘掉它。Future對象的全部者在結果計算好以後就能夠得到它。
Future接口具備下面的方法:高併發
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
第一個get方法的調用被阻塞,知道計算完成。若是在計算完成以前,第二個get方法的調用超時,拋出一個TimeoutException異常。若是運行該計算的線程被中斷,兩個方法都將拋出InterruptedException。若是計算已經完成,那麼get方法當即返回。源碼分析
若是計算還在進行,isDone方法返回false;若是完成了,則返回true。this
能夠用cancel方法取消該計算。若是計算尚未開始,它被取消且再也不開始。若是計算處於運行之中,那麼若是mayInterrupt參數爲true,它就被中斷。線程
FutureTask包裝器是一種很是便利的機制,同時實現了Future和Runnable接口。code
類圖以下:對象
FutureTask的狀態轉換過程:blog
* NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED
建立一個futureTask對象task 提交task到調度器executor等待調度或者在另一個線程中執行task 等待調度中... 若是此時currentThread調取執行結果task.get(),會有幾種狀況 if task 尚未被executor調度或正在執行中 阻塞當前線程,並加入到一個阻塞鏈表中waitNode else if task被其它Thread取消,並取消成功 或task處於中斷狀態 throw exception else if task執行完畢,返回執行結果,或執行存在異常,返回異常信息 若是此時有另一個線程調用task.get() 執行過程同上
1. Future用於異步獲取執行結果或者取消任務。 2. 在高併發場景下確保任務只執行一次。
Callable<Integer> myComputation = ...; FutureTask<Integer> task = new FutureTask<Integer>(myComputation); Thread t = new Thread(task); t.start(); ... Integer result = task.get(); //獲取結果
/** * The run state of this task, initially NEW. The run state * transitions to a terminal state only in methods set, * setException, and cancel. During completion, state may take on * transient values of COMPLETING (while outcome is being set) or * INTERRUPTING (only while interrupting the runner to satisfy a * cancel(true)). Transitions from these intermediate to final * states use cheaper ordered/lazy writes because values are unique * and cannot be further modified. * * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } } protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
public class Memoizer<A, V> implements Computable<A, V> { private final ConcurrentMap<A, Future<V>> cache = new ConcurrentMap<A, Future>>(); private final Computable<A, V> c; public Memoizer(Computable<A, V> c) { this.c = c; } public C computer(final A arg) throws InterruptedException { while(true) { Future<V> f = cache.get(arg); if(f == null) { Callable<V> eval = new Callable<V>() { public V call() throws InterruptedException { return c.compute(arg); } }; FutureTask<V> ft = new FutureTask<V>(eval); f = cache.putIfAbsent(arg, ft); if(f == null) { f = ft; ft.run(); } } try { return f.get(); } catch(CancellationException e) { cache.remove(arg, f); } catch(ExecutionException e) { throw launderThrowable(e.getCause()); } } } }