本文首發於一世流雲的專欄: https://segmentfault.com/blog...
Future模式是Java多線程設計模式中的一種常見模式,它的主要做用就是異步地執行任務,並在須要的時候獲取結果。咱們知道,通常調用一個函數,須要等待函數執行完成,調用線程纔會繼續往下執行,若是是一些計算密集型任務,須要等待的時間可能就會比較長。html
筆者在讀書期間曾參與過一個國家電網的複雜水電系統的聯合優化調度項目,須要在工業上可接受的時間內計算出整個雲南地區近40座大型水電站的日發電計劃。
在電力系統中日發電計劃的制定很是重要,同時又涉及水利學、經濟學、電氣工程、政治政策等諸多複雜約束條件,工業上基本都是經過混合整數規劃、動態規劃再結合其它數學規劃方法建模求解,模型涉及的變量基本都是百萬級以上。
試想一下,這種複雜的計算模型,假設我把它封裝到一個函數中,由調用方進行單線程調用,須要等待多少時間?若是將模型集成到UI,用戶在界面上點擊一下計算,那可能用戶基本就認爲應用假設死崩潰了。
在Java中,一種解決辦法是由調用線程新建一個線程執行該任務,好比下面這樣:java
public void calculate(){ Thread t = new Thread(new Runnable() { @Override public void run() { model.calculate(); } }); t.start(); }
可是,這樣有一個問題,我拿不到計算結果,也不知道任務到底何時計算結束。咱們來看下Future模式是如何來解決的。segmentfault
Future模式,可讓調用方當即返回,而後它本身會在後面慢慢處理,此時調用者拿到的僅僅是一個憑證,調用者能夠先去處理其它任務,在真正須要用到調用結果的場合,再使用憑證去獲取調用結果。這個憑證就是這裏的Future。設計模式
咱們看下時序圖來理解下二者的區別:多線程
傳統的數據獲取方式:
框架
Future模式下的數據獲取:
dom
若是讀者對經濟學有些瞭解,或是瞭解金融衍生品的話,對 Future這個單詞應該不會陌生, Future在經濟學中出現的頻率至關之高,好比關於現金流的折算,其中的終值,英文就是 Future value。常見的金融衍生品,期貨、遠期的英文分別是 Futures、 Financial future。
咱們以前說了,Future模式能夠理解爲一種憑證,拿着該憑證在未來的某個時間點能夠取到我想要的東西,這其實就和期貨、遠期有點相似了,期貨、遠期也是雙方制定協議或合同,而後在約定的某個時間點,拿着合同進行資金或實物的交割。可見,Future模式的命名是頗有深意且很恰當的。異步
在Java多線程基礎之Future模式中,咱們曾經給出過Future模式的通用類關係圖。本章中,我不想教科書般得再貼一遍該圖,而是但願能按部就班地帶領讀者去真正理解Future模式中的各個組件,去思考爲何Future模式的類關係圖是那樣,爲何必定就是那麼幾個組件?ide
首先來思考下,咱們須要執行的是一個任務,那麼在Java中,通常須要實現Runnable
接口,好比像下面這樣:函數
public class Task implements Runnable { @Override public void run() { // do something } }
可是,若是我須要任務的返回結果呢,從Runnable的接口定義來看,並不能知足咱們的要求,Runnable通常僅僅用於定義一個能夠被線程執行的任務,它的run方法沒有返回值:
public interface Runnable { public abstract void run(); }
因而,JDK提供了另外一個接口——Callable
,表示一個具備返回結果的任務:
public interface Callable<V> { V call() throws Exception; }
因此,最終咱們自定義的任務類通常都是實現了Callable接口。如下定義了一個具備複雜計算過程的任務,最終返回一個Double值:
public class ComplexTask implements Callable<Double> { @Override public Double call() { // complex calculating... return ThreadLocalRandom.current().nextDouble(); } }
第一節講到,Future模式可讓調用方獲取任務的一個憑證,以便未來拿着憑證去獲取任務結果,憑證須要具備如下特色:
從以上兩點來看,咱們首先想到的方式就是對Callable任務進行包裝,包裝成一個憑證,而後返回給調用方。
J.U.C提供了Future接口和它的實現類——FutureTask
來知足咱們的需求,咱們能夠像下面這樣對以前定義的ComplexTask包裝:
ComplexTask task = new ComplexTask(); Future<Double> future = new FutureTask<Double>(task);
上面的FutureTask就是真實的「憑證」,Future則是該憑證的接口(從面向對象的角度來說,調用方應面向接口操做)。
Future接口的定義:
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
Future接口很簡單,提供了isCancelled
和isDone
兩個方法監控任務的執行狀態,一個cancel
方法用於取消任務的執行。兩個get
方法用於獲取任務的執行結果,若是任務未執行完成,除非設置超時,不然調用線程將會阻塞。
此外,爲了可以被線程或線程池執行任務,憑證還須要實現Runnable接口,因此J.U.C還提供了一個RunnableFuture
接口,其實就是組合了Runnable和Future接口:
public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
上面提到的FutureTask,其實就是實現了RunnableFuture接口的「憑證」:
public class FutureTask<V> implements RunnableFuture<V> { public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; // ... } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); // ... } }
從構造函數能夠看到,FutureTask既能夠包裝Callable任務,也能夠包裝Runnable任務,但最終都是將Runnable轉換成Callable任務,實際上是一個適配過程。
最終,調用方能夠如下面這種方式使用Future模式,異步地獲取任務的執行結果。
public class Client {
public static void main(String[] args) throws ExecutionException, InterruptedException { ComplexTask task = new ComplexTask(); Future<Double> future = new FutureTask<Double>(task); // time passed... Double result = future.get(); }
}
經過上面的分析,能夠看到,整個Future模式其實就三個核心組件:
在J.U.C提供的Future模式中,最重要的就是FutureTask
類,FutureTask是在JDK1.5時,隨着J.U.C一塊兒引入的,它表明着一個異步任務,這個任務通常提交給Executor執行,固然也能夠由調用方直接調用run方法運行。
既然是任務,就有狀態,FutureTask一共給任務定義了7種狀態:
各個狀態之間的狀態轉換圖以下:
上圖須要注意的是兩點:
JDK1.7以前,FutureTask經過內部類實現了AQS框架來實現功能。 JDK1.7及之後,則改變爲直接經過Unsafe
類CAS操做state
狀態字段來進行同步。
FutureTask在構造時能夠接受Runnable或Callable任務,若是是Runnable,則最終包裝成Callable:
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; }
上述的Executors.callable()
方法咱們在executors框架概述提到過,其實就是對Runnable對象作了適配,返回Callable適配對象——RunnableAdapter:
public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); }
static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
FutureTask的字段定義很是簡單,State標識任務的當前狀態,狀態之間的轉換經過Unsafe來操做,全部操做都基於自旋+CAS完成:
private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; private Callable<V> callable; // 真正的任務 private volatile Thread runner; // 保存正在執行任務的線程 /** * 記錄結果或異常 */ private Object outcome; /** * 無鎖棧(Treiber stack) * 保存等待線程 */ private volatile WaitNode waiters;
注意waiters
這個字段,waiters指向一個「無鎖棧」,該棧保存着全部等待線程,咱們知道當調用FutureTask的get方法時,若是任務沒有完成,則調用線程會被阻塞,其實就是將線程包裝成WaitNode
結點保存到waiters指向的棧中:
static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
FutureTask的運行就是調用了run方法:
public void run() { // 僅當任務爲NEW狀態時, 才能執行任務 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
上述方法,首先判斷當前任務的state是否等於NEW,若是不爲NEW則說明任務或者已經執行過,或者已經被取消,直接返回。
正常執行完成後,會調用set
方法設置任務執行結果:
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
若是任務執行過程當中拋出異常,則調用setException
設置異常信息:
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
cancel方法用於取消任務,參數mayInterruptIfRunning
若是爲true,表示中斷正在執行任務的線程,不然僅僅是將任務狀態置爲CANCELLED :
public boolean cancel(boolean mayInterruptIfRunning) { // 僅NEW狀態下能夠取消任務 if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { if (mayInterruptIfRunning) { // 中斷任務 try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; }
任務取消後,最終調用finishCompletion
方法,釋放全部在棧上等待的線程:
/** * 喚醒棧上的全部等待線程. */ private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null; ) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (; ; ) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); // 鉤子方法 callable = null; // to reduce footprint }
FutureTask能夠經過get方法獲取任務結果,若是須要限時等待,能夠調用get(long timeout, TimeUnit unit)
。
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); // 映射任務執行結果 }
能夠看到,若是當前任務的狀態是NEW或COMPLETING,會調用awaitDone
阻塞線程。不然會認爲任務已經完成,直接經過report
方法映射結果:
/** * 將同步狀態映射爲執行結果. */ private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V) x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable) x); }
report會根據任務的狀態進行映射,若是任務是Normal狀態,說明正常執行完成,則返回任務結果;若是任務被取消(CANCELLED或INTERRUPTED),則拋出CancellationException;其它狀況則拋出ExecutionException。
在ScheduledThreadPoolExecutor一節中,咱們曾經介紹過另外一種FutureTask——ScheduledFutureTask
,ScheduledFutureTask是ScheduledThreadPoolExecutor這個線程池的默認調度任務類。
ScheduledFutureTask在普通FutureTask的基礎上增長了週期執行/延遲執行的功能。經過下面的類圖能夠看到,它實際上是經過繼承FutureTask和Delayed接口來實現週期/延遲功能的。
ScheduledFutureTask(Callable<V> callable, long ns) { super(callable); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); }
ScheduledFutureTask的源碼很是簡單,基本都是委託FutureTask來實現的,關鍵是看下運行任務的方法:
public void run() { boolean periodic = isPeriodic(); // 是不是週期任務 if (!canRunInCurrentRunState(periodic)) // 可否運行任務 cancel(false); else if (!periodic) // 非週期任務:調用FutureTask的run方法運行 ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { // 週期任務:調用FutureTask的runAndReset方法運行 setNextRunTime(); reExecutePeriodic(outerTask); } }
FutureTask的runAndReset
方法與run方法的區別就是當任務正常執行完成後,不會設置任務的最終狀態(即保持NEW狀態),以便任務重複執行:
protected boolean runAndReset() { // 僅NEW狀態的任務能夠執行 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { runner = null; s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; }
本章咱們從源頭開始,講解了Future模式的前因後果,並以J.U.C中的Future模式爲例,分析了Future模式的組件以及核心實現類——FutureTask
,最後回顧了ScheduledFutureTask中定義的內部異步任務類——ScheduledFutureTask
。
理解Future模式的關鍵就是理解它的兩個核心組件,能夠類比生活中的憑證來理解這一律念,沒必要拘泥於Java多線程設計模式中Future模式的類關係圖。