Future是咱們在使用java實現異步時最經常使用到的一個類,咱們能夠向線程池提交一個Callable,並經過future對象獲取執行結果。本篇文章主要講述了JUC中FutureTask中的一些實現原理。使用的jdk版本是1.7。java
Future是一個接口,它定義了5個方法:異步
boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
簡單說明一下接口定義ide
寫個簡單demo:測試
public class FutureDemo { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); Future future = executorService.submit(new Callable<Object>() { @Override public Object call() throws Exception { Long start = System.currentTimeMillis(); while (true) { Long current = System.currentTimeMillis(); if ((current - start) > 1000) { return 1; } } } }); try { Integer result = (Integer)future.get(); System.out.println(result); }catch (Exception e){ e.printStackTrace(); } } }
這裏模擬了1s鐘的CPU空轉,當執行future.get()的時候,主線程阻塞了大約一秒後得到結果。this
固然咱們也可使用get(long timeout, TimeUnit unit)spa
try { Integer result = (Integer) future.get(500, TimeUnit.MILLISECONDS);
System.out.println(result); } catch (Exception e) { e.printStackTrace(); }
因爲在500ms內沒有結果返回,因此拋出異常,打印異常堆棧以下線程
固然,若是咱們把超時時間設置的長一些,仍是能夠獲得預期的結果的。翻譯
下面咱們介紹一下FutureTask內部的一些實現機制。下文從如下幾點敘述:3d
首先咱們看一下FutureTask的繼承結構:rest
FutureTask實現了RunnableFuture接口,而RunnableFuture繼承了Runnable和Future,也就是說FutureTask既是Runnable,也是Future。
FutureTask內部定義瞭如下變量,以及它們的含義以下
private static final int NEW = 0; //任務新建和執行中 private static final int COMPLETING = 1; //任務將要執行完畢 private static final int NORMAL = 2; //任務正常執行結束 private static final int EXCEPTIONAL = 3; //任務異常 private static final int CANCELLED = 4; //任務取消 private static final int INTERRUPTING = 5; //任務線程即將被中斷 private static final int INTERRUPTED = 6; //任務線程已中斷
後三個字段是配合Unsafe類作CAS操做使用的。
FutureTask中使用state表示任務狀態,state值變動的由CAS操做保證原子性。
FutureTask對象初始化時,在構造器中把state置爲爲NEW,以後狀態的變動依據具體執行狀況來定。
例如任務執行正常結束前,state會被設置成COMPLETING,表明任務即將完成,接下來很快就會被設置爲NARMAL或者EXCEPTIONAL,這取決於調用Runnable中的call()方法是否拋出了異常。有異常則後者,反以前者。
任務提交後、任務結束前取消任務,那麼有可能變爲CANCELLED或者INTERRUPTED。在調用cancel方法時,若是傳入false表示不中斷線程,state會被置爲CANCELLED,反之state先被變爲INTERRUPTING,後變爲INTERRUPTED。
總結下,FutureTask的狀態流轉過程,能夠出現如下四種狀況:
1. 任務正常執行並返回。 NEW -> COMPLETING -> NORMAL
2. 執行中出現異常。NEW -> COMPLETING -> EXCEPTIONAL
3. 任務執行過程當中被取消,而且不響應中斷。NEW -> CANCELLED
4. 任務執行過程當中被取消,而且響應中斷。 NEW -> INTERRUPTING -> INTERRUPTED
接下來咱們一塊兒扒一扒FutureTask的源碼。咱們先看一下任務線程是怎麼執行的。當任務被提交到線程池後,會執行futureTask的run()方法。
1 public void run()
public void run() {
// 校驗任務狀態 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable;
// double check if (c != null && state == NEW) { V result; boolean ran; try {
//執行業務代碼 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally {
// 重置runner runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
翻譯一下,這個方法經歷瞭如下幾步
咱們繼續往下看,setException(Throwable t)和set(V v) 具體是怎麼作的
protected void set(V v) { // state狀態 NEW->COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; // COMPLETING -> NORMAL 到達穩定狀態 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 一些結束工做 finishCompletion(); } }
protected void setException(Throwable t) { // state狀態 NEW->COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; // COMPLETING -> EXCEPTIONAL 到達穩定狀態 UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 一些結束工做 finishCompletion(); } }
code中的註釋已經寫的很清楚,故不翻譯了。狀態變動的原子性由unsafe對象提供的CAS操做保證。FutureTask的outcome變量存儲執行結果或者異常對象,會由主線程返回。
2 get()和get(long timeout, TimeUnit unit)
任務由線程池提供的線程執行,那麼這時候主線程則會阻塞,直到任務線程喚醒它們。咱們經過get(long timeout, TimeUnit unit)方法看看是怎麼作的
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }
get的源碼很簡潔,首先校驗參數,而後根據state狀態判斷是否超時,若是超時則異常,不超時則調用report(s)去獲取最終結果。
當 s<= COMPLETING時,代表任務仍然在執行且沒有被取消。若是它爲true,那麼走到awaitDone方法。
awaitDone是futureTask實現阻塞的關鍵方法,咱們重點關注一下它的實現原理。
/** * 等待任務執行完畢,若是任務取消或者超時則中止 * @param timed 爲true表示設置超時時間 * @param nanos 超時時間 * @return 任務完成時的狀態 * @throws InterruptedException */ private int awaitDone(boolean timed, long nanos) throws InterruptedException { // 任務截止時間 final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; // 自旋 for (;;) { if (Thread.interrupted()) { //線程中斷則移除等待線程,並拋出異常 removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { // 任務可能已經完成或者被取消了 if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // 可能任務線程被阻塞了,主線程讓出CPU Thread.yield(); else if (q == null) // 等待線程節點爲空,則初始化新節點並關聯當前線程 q = new WaitNode(); else if (!queued) // 等待線程入隊列,成功則queued=true queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { //已經超時的話,移除等待節點 removeWaiter(q); return state; } // 未超時,將當前線程掛起指定時間 LockSupport.parkNanos(this, nanos); } else // timed=false時會走到這裏,掛起當前線程 LockSupport.park(this); } }
註釋裏也很清楚的寫明瞭每一步的做用,咱們以設置超時時間爲例,總結一下過程
當線程被掛起以後,若是任務線程執行完畢,就會喚醒等待線程哦。這一步就是在finishCompletion裏面作的,前面已經提到這個方法。咱們再看看這個方法具體作了哪些事吧~
/** * 移除並喚醒全部等待線程,執行done,置空callable * nulls out callable. */ private void finishCompletion() { //遍歷等待節點 for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; //喚醒等待線程 LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; // unlink to help gc q.next = null; q = next; } break; } } //模板方法,能夠被覆蓋 done(); //清空callable callable = null; }
由代碼和註釋能夠看出來,這個方法的做用主要在於喚醒等待線程。由前文可知,當任務正常結束或者異常時,都會調用finishCompletion去喚醒等待線程。這個時候,等待線程就能夠醒來,開開心心的得到結果啦。
最後咱們看一下任務取消
3 public boolean cancel(boolean mayInterruptIfRunning)
注意,取消操做不必定會起做用,這裏咱們先貼個demo
1 public class FutureDemo { 2 public static void main(String[] args) { 3 ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 4 // 預建立線程 5 executorService.prestartCoreThread(); 6 7 Future future = executorService.submit(new Callable<Object>() { 8 @Override 9 public Object call() { 10 System.out.println("start to run callable"); 11 Long start = System.currentTimeMillis(); 12 while (true) { 13 Long current = System.currentTimeMillis(); 14 if ((current - start) > 1000) { 15 System.out.println("當前任務執行已經超過1s"); 16 return 1; 17 } 18 } 19 } 20 }); 21 22 System.out.println(future.cancel(false)); 23 24 try { 25 Thread.currentThread().sleep(3000); 26 executorService.shutdown(); 27 } catch (Exception e) { 28 //NO OP 29 } 30 } 31 }
咱們屢次測試後發現,出現了2種打印結果,如圖
結果1
結果2
第一種是任務壓根沒取消,第二種則是任務壓根沒提交成功。
方法簽名註釋告訴咱們,取消操做是可能會失敗的,若是當前任務已經結束或者已經取消,則當前取消操做會失敗。若是任務還沒有開始,那麼任務不會被執行。這就解釋了出現上圖結果2的狀況。咱們仍是從源碼去分析cancel()究竟作了哪些事。
public boolean cancel(boolean mayInterruptIfRunning) { if (state != NEW) return false; if (mayInterruptIfRunning) { if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) return false; Thread t = runner; if (t != null) t.interrupt(); UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state } else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) return false; finishCompletion(); return true; }
執行邏輯以下
可見,cancel()方法改變了futureTask的狀態位,若是傳入的是false而且業務邏輯已經開始執行,當前任務是不會被終止的,而是會繼續執行,直到異常或者執行完畢。若是傳入的是true,會調用當前線程的interrupt()方法,把中斷標誌位設爲true。
事實上,除非線程本身中止本身的任務,或者退出JVM,是沒有其餘方法徹底終止一個線程的任務的。mayInterruptIfRunning=true,經過但願當前線程能夠響應中斷的方式來結束任務。當任務被取消後,會被封裝爲CancellationException拋出。
總結一下,futureTask中的任務狀態由變量state表示,任務狀態都基於state判斷。而futureTask的阻塞則是經過自旋+掛起線程實現。理解FutureTask的內部實現機制,咱們使用Future時才能更加駕輕就熟。文中摻雜着筆者的我的理解,若是有不正之處,還望讀者多多指正
做者:mayday芋頭