A Future計算的結果。 提供方法來檢查計算是否完成,等待其完成,並檢索計算結果。 結果只能在計算完成後使用方法get進行檢索,若有必要,阻塞,直到準備就緒。 取消由cancel方法執行。 提供其餘方法來肯定任務是否正常完成或被取消。 計算完成後,不能取消計算。 若是您想使用Future ,以便不可撤銷,但不提供可用的結果,則能夠聲明Future<?>表格的類型,並返回null做爲基礎任務的結果。 node
public interface Future<V> { //嘗試取消執行此任務。若是任務已經完成,已經被取消或因爲某些其餘緣由而沒法取消,則此嘗試將失敗。 //若是成功,而且在調用 cancel 時此任務還沒有開始,則該任務永遠沒法運行。 //若是任務已經開始,則 mayInterruptIfRunning 參數肯定是否應中斷執行該任務的線程以嘗試中止該任務。 //mayInterruptIfRunning == true, 表示中斷執行中的線程,false 表示讓線程正常完成 boolean cancel(boolean mayInterruptIfRunning); //若是此任務在正常完成以前被取消,則返回true。 boolean isCancelled(); //若是此任務完成,則返回true。完成多是因爲正常終止,異常或取消引發的,在全部這些狀況下,此方法都將返回true。 boolean isDone(); //必要時等待計算完成,而後檢索其結果 V get() throws InterruptedException, ExecutionException; //必要時最多等待給定時間以完成計算,而後檢索其結果(若是有)。 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
Future是一個接口,提供了方法來檢測當前的任務是否已經結束,還能夠等待任務結束而且拿到一個結果,經過調用Future的get()方法能夠當任務結束後返回一個結果值,若是工做沒有結束,則會阻塞當前線程,直到任務執行完畢;能夠經過調用cancel()方法來中止一個任務,若是任務已經中止,則cancel()方法會返回true;若是任務已經完成或者已經中止了或者這個任務沒法中止,則cancel()會返回一個false。當一個任務被成功中止後,他沒法再次執行。isDone()和isCancel()方法能夠判斷當前工做是否完成和是否取消。 多線程
一個場景,咱們要學習作飯,那麼咱們須要準備廚具和食材,廚具經過電子商務網購,食材去菜市場挑選。那麼可使用多線程來併發進行,即咱們能夠先網購下單,在等待快遞員送貨過來的這段時間去菜市場買食材,節省時間,提升效率。併發
public class FutureTest { public static void main(String[] args) throws InterruptedException { long startTime = System.currentTimeMillis(); OnlineShopping shopping = new OnlineShopping(); shopping.start(); Thread.sleep(2000);//等待送貨執行完 System.out.println("第二步:食材到位"); shopping.join();//阻塞訂單直到快遞送到獲得廚具 System.out.println("第三步:開始廚藝"); System.out.println("總共用時:" + (System.currentTimeMillis() - startTime) + "ms"); } static class OnlineShopping extends Thread { @Override public void run() { System.out.println("第一步:下單"); System.out.println("第一步:等待送貨"); try { Thread.sleep(5000);//送貨中 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第一步:快遞送到"); } } } //======結果====== 第一步:下單 第一步:等待送貨 第二步:食材到位 第一步:快遞送到 第三步:開始廚藝 總共用時:5003ms
public class FutureTest { public static void main(String[] args) throws Exception { long startTime = System.currentTimeMillis(); Callable<String> shopping = () ->{ System.out.println("第一步:下單"); System.out.println("第一步:等待送貨"); Thread.sleep(5000);//快遞員送貨中 System.out.println("第一步:快遞送到"); return "廚具到達"; }; FutureTask<String> task = new FutureTask<>(shopping); new Thread(task).start(); Thread.sleep(2000);//保證下單操做執行到「等待送貨」中 System.out.println("第二步:食材到位"); if (!task.isDone()) { // 聯繫快遞員,詢問是否到貨 System.out.println("第三步:廚具還沒到,心情好就等着(心情很差就調用cancel方法取消訂單)"); } String chuju = task.get();//獲得廚具 System.out.println("第三步:開始廚藝"); System.out.println("總共用時:" + (System.currentTimeMillis() - startTime) + "ms"); } } //======結果====== 第一步:下單 第一步:等待送貨 第二步:食材到位 第三步:廚具還沒到,心情好就等着(心情很差就調用cancel方法取消訂單) 第一步:快遞送到 第三步:開始廚藝 總共用時:5048ms
使用Future模式的三部曲:app
public interface Callable<V> { V call() throws Exception; }
public class FutureTask<V> implements RunnableFuture<V>{ public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // 確保callable的可見性 } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // 確保callable的可見性 } } public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
public void run() { // 1. 若是 state != NEW 說明 run 方法已經運行過,直接 return // 2. 若是 state == NEW && CAS 競爭 設置 runner 失敗,說明已經有別的線程在運行,直接 return // NEW 的狀態由構造方法初始化,runner 是運行該 Callable 的線程 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable;// 這裏的callable是從構造方法裏面傳人的 if (c != null && state == NEW) { V result; boolean ran;// 標識符 try { result = c.call();//得到結果 ran = true; } catch (Throwable ex) {//異常 result = null; ran = false; setException(ex); } if (ran)//成功沒有異常,設置返回值 set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() //在狀態設置以前,runner必須是非空的,以防止對run()的併發調用 runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts //爲防止泄漏中斷,必須在空runner以後將狀態設置爲重複讀 int s = state; // 若是最終狀態 >= INTERRUPTING,則處理中斷 // cancel 方法會經過參數 mayInterruptIfRunning 來設置 state 的值 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
private volatile int state;//狀態,volatile讓狀態可見性 private static final int NEW = 0;//構造方法建立時的狀態 private static final int COMPLETING = 1;//這是一箇中間態,完成時和出現異常時有使用到 private static final int NORMAL = 2;//完成運行時的最終狀態 private static final int EXCEPTIONAL = 3;//異常時的最終狀態 private static final int CANCELLED = 4;//已取消 private static final int INTERRUPTING = 5;//中斷中 private static final int INTERRUPTED = 6;//已中斷 可能的 state 轉換: NEW -> COMPLETING -> NORMAL NEW -> COMPLETING -> EXCEPTIONAL NEW -> CANCELLED NEW -> INTERRUPTING -> INTERRUPTED
private Object outcome;//經過get方法得到的返回值 //設置返回值,狀態NEW -> COMPLETING -> NORMAL protected void set(V v) { // 這裏爲何要用 CAS 由於可能會和 cancel 方法產生競爭。 // 若是競爭失敗,說明取消競爭成功,在 cancel 方法承擔喚醒的工做,因此直接跳過。 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//NEW -> COMPLETING // 競爭成功 outcome = v;//outcome爲返回結果 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最終狀態爲NORMAL,COMPLETING -> NORMAL finishCompletion(); } }
//狀態:NEW -> COMPLETING -> EXCEPTIONAL protected void setException(Throwable t) { // 這裏爲何要用 CAS 由於可能會和 cancel 方法產生競爭。 // 若是競爭失敗,說明取消競爭成功,在 cancel 方法承擔喚醒的工做,因此直接跳過。 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//NEW -> COMPLETING // 競爭成功 outcome = t; // outcome 爲一個 Throwable // 把最終狀態改成 EXCEPTIONAL UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state,COMPLETING -> EXCEPTIONAL finishCompletion(); } }
//刪除當前線程並喚醒全部等待線程,調用done(),並取消進行中的方法 private void finishCompletion() { // assert state > COMPLETING; //從 waiters 末尾開始遍歷,for 自旋直到 CAS 成功。 for (WaitNode q; (q = waiters) != null;) { // 使用 CAS 把 waiters 設置爲 null,和 awaitDone 和 removeWatier 方法競爭 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { // 自旋喚醒全部線程 for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t);// 喚醒線程 } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } // 這個一個空方法,主要用於繼承重寫,這也是模板模式的體現 done(); callable = null; // to reduce footprint } protected void done() { } //===========例子============== //ExecutorCompletionService 的做用就是把線程池的執行結果放到一個已完成隊列中,方便獲取執行結果,其內部主要經過一個 FutureTask 的實現類 QueueingFuture 來實現這個功能: private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); }//done方法是FutureTask方法的重寫。FutureTask在完成時會執行done方法,把task放入已完成隊列completionQueue。 private final Future<V> task; }
public V get() throws InterruptedException, ExecutionException { int s = state;//獲得狀態 if (s <= COMPLETING)//狀態未完成,把獲取結果的線程放入等待鏈表,而後阻塞,直至被中斷、完成或出現異常。 s = awaitDone(false, 0L); return report(s);//返回結果 } private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L;//用於計時 WaitNode q = null; boolean queued = false; for (;;) {//自旋 //若是已經被中斷,則removeWaiter,拋出中斷異常 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) {//task已經結束 if (q != null) q.thread = null; return s; } //第二個線程進來,正在運行,發現前面有等待節點,則讓出cpu else if (s == COMPLETING) // cannot time out yet Thread.yield(); // 第一次遍歷,初始化 WaitNode,第一個節點進來時進行,第一個線程狀態是new else if (q == null) q = new WaitNode(); // 是否已入隊,沒有則把WaitNode接到末尾,第一個線程第二次遍歷時運行下面代碼 else if (!queued) // 和 finishCompletion 和 removeWaiter 競爭 // 1. finishCompletion競爭成功,說明state已經 > COMPLETING則下次循環就會退出 // 2. removeWaiter競爭成功,說明waiters變化了,下一次循環再次競爭 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 若是使用了計時,則判斷是否超時,若是超時則移出WaitNode並當即返回無需等待結果,不然阻塞 nanos else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else //阻塞,直到被喚醒(正常完成 || 異常 || 中斷) LockSupport.park(this); } } //根據awaitDone返回狀態返回結果或拋出異常 private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL)//正常 return (V)x; if (s >= CANCELLED)//取消 throw new CancellationException(); // task 執行過程當中出現異常 throw new ExecutionException((Throwable)x); }
/** * Tries to unlink a timed-out or interrupted wait node to avoid * accumulating garbage. Internal nodes are simply unspliced * without CAS since it is harmless if they are traversed anyway * by releasers. To avoid effects of unsplicing from already * removed nodes, the list is retraversed in case of an apparent * race. This is slow when there are a lot of nodes, but we don't * expect lists to be long enough to outweigh higher-overhead * schemes. *嘗試取消連接超時或中斷的等待節點以免堆積垃圾。內部節點的拼接沒有CAS, *由於這對釋放者不管如何遍歷都沒有影響。 爲了不已刪除節點節點未拼接的影響, *若是出現明顯的競爭,則從新遍歷列表。 當節點不少時會很慢,可是咱們不 *指望列表足夠長以抵消較高的開銷計劃。 */ private void removeWaiter(WaitNode node) { if (node != null) { node.thread = null; retry: for (;;) { // restart on removeWaiter race //遍歷整個鏈表 for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; //把q看成前一個節點,遍歷下一個節點 if (q.thread != null) pred = q; // q.thread == null && pred != null,表示當前節點不是第一個節點,是一箇中間節點 // 這裏沒有使用 CAS,若是出現多個線程同時遍歷,前一個節點變爲null,則從新從頭遍歷 // 爲何沒有使用 CAS 由於做者的想法是這個鏈表不會太長,因此咱們使用時不該該使這個鏈表太長 // 操做:把下一個節點鏈接到前一個節點的後面 else if (pred != null) { pred.next = s;//把s鏈接到pred後面 if (pred.thread == null) // check for race continue retry; } // q.thread == null && pred == null,表示第一個節點的 thread == null, // 這裏使用 CAS,由於可能多個線程在操做 // 操做:把下一個節點設置爲末尾節點,若是競爭失敗則從新從頭遍歷 else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } } }
public boolean isDone() { return state != NEW; }
建立Thread,把FutureTask實例放入構造方法中,start開啓線程less
參考:https://www.jianshu.com/p/414cc2f0002c異步