FutureTask是一個支持取消行爲的異步任務執行器。該類實現了Future接口的方法。
如:node
FutureTask實現了Runnable接口和Future接口,所以FutureTask能夠傳遞到線程對象Thread或Excutor(線程池)來執行。dom
若是在當前線程中須要執行比較耗時的操做,但又不想阻塞當前線程時,能夠把這些做業交給FutureTask,另開一個線程在後臺完成,噹噹前線程未來須要時,就能夠經過FutureTask對象得到後臺做業的計算結果或者執行狀態。異步
public class FutureTaskDemo { public static void main(String[] args) throws InterruptedException{ FutureTask<Integer> ft = new FutureTask<>(new Callable<Integer>() { @Override public Integer call() throws Exception { int num = new Random().nextInt(10); TimeUnit.SECONDS.sleep(num); return num; } }); Thread t = new Thread(ft); t.start(); //這裏能夠作一些其它的事情,跟futureTask任務並行,等須要futureTask的運行結果時,能夠調用get方法獲取。 try { //等待任務執行完成,獲取返回值 Integer num = ft.get(); System.out.println(num); } catch (Exception e) { e.printStackTrace(); } } }
JDK1.7及以前,FutureTask 經過使用內部類Sync繼承AQS來實現。
內部使用的AQS的共享鎖。
AQS具體實現可參考 AbstractQueuedSynchronizer 源碼分析ide
JDK1.8沒有使用AQS,而是本身實現了一個同步等待隊列,在結果返回以前,全部的線程都被阻塞,存放到等待隊列中。源碼分析
下面咱們來分析下JDK1.8的FutureTask 源碼this
public class FutureTask<V> implements RunnableFuture<V> { /** * 當前任務的運行狀態。 * * 可能存在的狀態轉換 * NEW -> COMPLETING -> NORMAL(有正常結果) * NEW -> COMPLETING -> EXCEPTIONAL(結果爲異常) * NEW -> CANCELLED(無結果) * NEW -> INTERRUPTING -> INTERRUPTED(無結果) */ private volatile int state; private static final int NEW = 0; //初始狀態 private static final int COMPLETING = 1; //結果計算完成或響應中斷到賦值給返回值之間的狀態。 private static final int NORMAL = 2; //任務正常完成,結果被set private static final int EXCEPTIONAL = 3; //任務拋出異常 private static final int CANCELLED = 4; //任務已被取消 private static final int INTERRUPTING = 5; //線程中斷狀態被設置ture,但線程未響應中斷 private static final int INTERRUPTED = 6; //線程已被中斷 //將要執行的任務 private Callable<V> callable; //用於get()返回的結果,也多是用於get()方法拋出的異常 private Object outcome; // non-volatile, protected by state reads/writes //執行callable的線程,調用FutureTask.run()方法經過CAS設置 private volatile Thread runner; //棧結構的等待隊列,該節點是棧中的最頂層節點。 private volatile WaitNode waiters; ....
FutureTask實現的接口信息以下:spa
public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
RunnableFuture 接口基礎了Runnable和Future接口.net
public interface Future<V> { //取消任務 boolean cancel(boolean mayInterruptIfRunning); //判斷任務是否已經取消 boolean isCancelled(); //判斷任務是否結束(執行完成或取消) boolean isDone(); //阻塞式獲取任務執行結果 V get() throws InterruptedException, ExecutionException; //支持超時獲取任務執行結果 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
public void run() { //保證callable任務只被運行一次 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { //執行任務 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { runner = null; int s = state; //判斷該任務是否正在響應中斷,若是中斷沒有完成,則等待中斷操做完成 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
1.若是state狀態不爲New或者設置運行線程runner失敗則直接返回false,說明線程已經啓動過,保證任務在同一時刻只被一個線程執行。
2.調用callable.call()方法,若是調用成功則執行set(result)方法,將state狀態設置成NORMAL。若是調用失敗拋出異常則執行setException(ex)方法,將state狀態設置成EXCEPTIONAL,喚醒全部在get()方法上等待的線程。
3.若是當前狀態爲INTERRUPTING(步驟2已CAS失敗),則一直調用Thread.yield()直至狀態不爲INTERRUPTING線程
set方法rest
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
同上 set(V v) 方法
private void handlePossibleCancellationInterrupt(int s) { if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt }
該方法是若是正在響應中斷(EXCEPTIONAL),則等待響應中斷結束(INTERRUPTED)。
private void finishCompletion() { for (WaitNode q; (q = waiters) != null;) { //經過CAS把棧頂的元素置爲null,至關於彈出棧頂元素 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
把棧中的元素一個一個彈出,並經過 LockSupport.unpark(t)喚醒每個節點,通知每一個線程,該任務執行完成(多是執行完成,也可能cancel,異常等)
protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { runner = null; s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; }
該方法和run方法的區別是,run方法只能被運行一次任務,而該方法能夠屢次運行任務。而runAndReset這個方法不會設置任務的執行結果值,若是該任務成功執行完成後,不修改state的狀態,仍是可運行(NEW)狀態,若是取消任務或出現異常,則不會再次執行。
而只是執行任務完以後,
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
若是state狀態小於等於COMPLETING,說明任務還沒開始執行或還未執行完成,而後調用awaitDone方法阻塞該調用線程。若是state的狀態大於COMPLETING,則說明任務執行完成,或發生異常、中斷、取消狀態。直接經過report方法返回執行結果。
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }
同上面get方法,該get方法支持阻塞等待多長時間,若是超時直接拋出TimeoutException異常。
private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
若是state的狀態爲NORMAL,說明任務正確執行完成,直接返回計算後的值。
若是state的狀態大於等於CANCELLED,說明任務被成功取消執行、或響應中斷,直接返回CancellationException異常
不然返回ExecutionException異常。
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { //若是該線程執行interrupt()方法,則從隊列中移除該節點,並拋出異常 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; //若是state狀態大於COMPLETING 則說明任務執行完成,或取消 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } //若是state=COMPLETING,則使用yield,由於此狀態的時間特別短,經過yield比掛起響應更快。 else if (s == COMPLETING) // cannot time out yet Thread.yield(); //構建節點 else if (q == null) q = new WaitNode(); //把當前節點入棧 else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); //若是須要阻塞指定時間,則使用LockSupport.parkNanos阻塞指定時間 //若是到指定時間還沒執行完,則從隊列中移除該節點,並返回當前狀態 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } //阻塞當前線程 else LockSupport.park(this); } }
構建棧鏈表的節點元素,並將該節點入站,同時阻塞當前線程等待運行主任務的線程喚醒該節點。
JDK1.7版本是使用AQS的雙向鏈表隊列實現的。
private void removeWaiter(WaitNode node) { if (node != null) { node.thread = null; retry: for (;;) { // restart on removeWaiter race for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; if (q.thread != null) pred = q; else if (pred != null) { pred.next = s; if (pred.thread == null) // check for race continue retry; } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } } }
移除棧中的節點元素,須要使用CAS自旋來保障移除成功。
public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; }
總結:狀態爲NEW時,cancel和run方法才能夠被運行。