在Java中比較常見的兩種建立線程的方法:繼承Thread類和實現Runnable接口。可是這兩種方法有個缺點就是沒法獲取線程執行後的結果。因此Java以後提供了Future和Runnable接口,用於實現獲取線程執行結果。下面開始源碼分析:node
一、Callable接口算法
public interface Callable<V> { //返回接口,或者拋出異常 V call() throws Exception; }
二、Future接口併發
public interface Future<V> { /***嘗試取消任務,若是任務已經完成、已取消或其餘緣由沒法取消,則失敗。 ** 一、若是任務還沒開始執行,則該任務不該該運行 ** 二、若是任務已經開始執行,由參數mayInterruptIfRunning來決定執行該任務的線程是否應該被中斷,這只是終止任務的一種嘗試。若mayInterruptIfRunning爲true,則會當即中斷執行任務的線程並返回true,若mayInterruptIfRunning爲false,則會返回true且不會中斷任務執行線程。 ** 三、調用這個方法後,之後對isDone方法調用都返回true。 ** 四、若是這個方法返回true,之後對isCancelled返回true。 ***/ boolean cancel(boolean mayInterruptIfRunning); /** * 判斷任務是否被取消了,若是調用了cance()則返回true */ boolean isCancelled(); /** *若是任務完成,則返回ture *任務完成包含正常終止、異常、取消任務。在這些狀況下都返回true */ boolean isDone(); /** * 線程阻塞,直到任務完成,返回結果 * 若是任務被取消,則引起CancellationException * 若是當前線程被中斷,則引起InterruptedException * 當任務在執行的過程當中出現異常,則拋出ExecutionException */ V get() throws InterruptedException, ExecutionException; /** * 線程阻塞必定時間等待任務完成,並返回任務執行結果,若是則超時則拋出TimeoutException */ V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
三、FutureTask
Future只是一個接口,不能直接用來建立對象,其實現類是FutureTask,JDK1.8修改了FutureTask的實現,JKD1.8再也不依賴AQS來實現,而是經過一個volatile變量state以及CAS操做來實現。FutureTask結構以下所示:ide
public class FutureTask<V> implements RunnableFuture<V> { /* * 當前任務運行狀態 * NEW -> COMPLETING -> NORMAL(正常結束,返回結果) * NEW -> COMPLETING -> EXCEPTIONAL(返回異常結果) * NEW -> CANCELLED(任務被取消,無結果) * NEW -> INTERRUPTING -> INTERRUPTED(任務被打斷,無結果) */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; /** 將要被執行的任務 */ private Callable<V> callable; /** 存放執行結果,用於get()方法獲取結果,也可能用於get()方法拋出異常 */ private Object outcome; // non-volatile, protected by state reads/writes /** 執行任務Callable的線程; */ private volatile Thread runner; /** 棧結構的等待隊列,該節點是棧中最頂層的節點 */ private volatile WaitNode waiters;
爲了後面更好的分析FutureTask的實現,這裏有必要解釋下各個狀態。 NEW:表示是個新的任務或者還沒被執行完的任務。這是初始狀態。 COMPLETING:任務已經執行完成或者執行任務的時候發生異常,可是任務執行結果或者異常緣由尚未保存到outcome字段(outcome字段用來保存任務執行結果,若是發生異常,則用來保存異常緣由)的時候,狀態會從NEW變動到COMPLETING。可是這個狀態會時間會比較短,屬於中間狀態。 NORMAL:任務已經執行完成而且任務執行結果已經保存到outcome字段,狀態會從COMPLETING轉換到NORMAL。這是一個最終態。 EXCEPTIONAL:任務執行發生異常而且異常緣由已經保存到outcome字段中後,狀態會從COMPLETING轉換到EXCEPTIONAL。這是一個最終態。 CANCELLED:任務還沒開始執行或者已經開始執行可是尚未執行完成的時候,用戶調用了cancel(false)方法取消任務且不中斷任務執行線程,這個時候狀態會從NEW轉化爲CANCELLED狀態。這是一個最終態。 INTERRUPTING: 任務還沒開始執行或者已經執行可是尚未執行完成的時候,用戶調用了cancel(true)方法取消任務而且要中斷任務執行線程可是尚未中斷任務執行線程以前,狀態會從NEW轉化爲INTERRUPTING。這是一箇中間狀態。 INTERRUPTED:調用interrupt()中斷任務執行線程以後狀態會從INTERRUPTING轉換到INTERRUPTED。這是一個最終態。 有一點須要注意的是,全部值大於COMPLETING的狀態都表示任務已經執行完成(任務正常執行完成,任務執行異常或者任務被取消)。
3.一、FutureTask構造方法函數
// Callable 構造方法 public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } /** * runnable 構造函數 */ public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
3.二、get()方法阻塞隊列源碼分析
static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
3.三、run方法解析this
//Executor調用執行任務 // public void run() { //狀態若是不是NEW,說明任務或者已經執行過,或者已經被取消,直接返返回,固然若是執行任務的線程runner不爲null,說明任務正在執行 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; //執行任務 try { Callable<V> c = callable; //判斷任務是否爲null,狀態是否爲NEW if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); //暫時存聽任務執行結果 ran = true; } catch (Throwable ex) { result = null; ran = false; //執行失敗 //經過CAS算法設置返回值(COMPLETING)和狀態值(EXCEPTIONAL) setException(ex); } //執行成功經過CAS(UNSAFE)設置返回值(COMPLETING)和狀態值(NORMAL) if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() //將任務runner設置爲null,避免發生併發調用run()方法 runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts //須從新讀取任務狀態,避免不可達(泄漏)的中斷 int s = state; //確保cancle(ture)操做時,運行中的任務能接收到中斷指令 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
3.四、setException(Throwable t)解析線程
//發生異常時,將返回值設置到outcome(=COMPLETING)中,並更新任務狀態(EXCEPTIONAL) protected void setException(Throwable t) { //調用UNSAFE類封裝的CAS算法,設置值 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state //喚醒因等待返回值而阻塞的線程 finishCompletion(); } }
3.五、set(V v)方法解析3d
//任務正常完成,將返回值設置到outcome(=COMPLETING)中,並更新任務狀態(=NORMAL) protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
3.六、 finishCompletion解析rest
//移除全部等待線程併發出信號,調用done(),以及將任務callable清空 private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { //循環喚醒阻塞線程,直到阻塞隊列爲空 for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; //一直到阻塞隊列爲空,跳出循環 if (next == null) break; q.next = null; // unlink to help gc 方便gc在適當的時候回收 q = next; } break; } } done(); callable = null; // to reduce footprint }
3.七、handlePossibleCancellationInterrupt 方法解析
private void handlePossibleCancellationInterrupt(int s) { // It is possible for our interrupter to stall before getting a // chance to interrupt us. Let's spin-wait patiently. //自旋等待cancle(true)結束(中斷結束) if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt // assert state == INTERRUPTED; // We want to clear any interrupt we may have received from // cancel(true). However, it is permissible to use interrupts // as an independent mechanism for a task to communicate with // its caller, and there is no way to clear only the // cancellation interrupt. // // Thread.interrupted(); }
3.八、cancle方法解析
//取消任務執行 public boolean cancel(boolean mayInterruptIfRunning) { //對NEW狀態的任務進行中斷,並根據參數設置state if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) //任務已完成(已發出中斷或已取消) return false; //中斷線程 try { // in case call to interrupt throws exception if (mayInterruptIfRunning) {//cancel(true) try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state //經過CAS算法,更新狀態 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { //喚醒阻塞線程 finishCompletion(); } return true; }
3.9 get方法解析
/** * 獲取執行結果 * @throws CancellationException {@inheritDoc} */ public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) //假如任務尚未執行完,則則塞線程,直至任務執行完成(結果已經存放到對應變量中) s = awaitDone(false, 0L); //返回結果 return report(s); } /** * 獲取任務執行結果,指定時間結束,則超時返回,再也不阻塞 * @throws CancellationException {@inheritDoc} */ public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }
3.10 awaitDone解析
/** * Awaits completion or aborts on interrupt or timeout. * 如英文註釋:等待任務執行完畢或任務中斷或任務超時 * * @param timed true if use timed waits * @param nanos time to wait, if timed * @return state upon completion */ private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; //循環等待 for (;;) { //線程已經中斷,則移除等待任務 if (Thread.interrupted()) { removeWaiter(q); //移除當前任務後,拋出中斷異常 throw new InterruptedException(); } //任務已經完成,則返回任務狀態,並對當前任務清場處理 int s = state; if (s > COMPLETING) { if (q != null) //任務不爲空,則將執行線程設爲null,避免併發下重複執行 q.thread = null; return s; } //設置結果,很快就能完成,自旋等待 else if (s == COMPLETING) // cannot time out yet Thread.yield(); //任務提早退出 //正在執行或者還沒開始,則構建新的節點 else if (q == null) q = new WaitNode(); //判斷是否入隊,新節點通常在下一次循環入隊列阻塞 else if (!queued) //沒有入隊列,設置q.next=waiters,並將waiters設爲q queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); //假若有超時限制,則判斷是否超時 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { //超時則將任務節點從阻塞隊列中移除,並返回狀態 removeWaiter(q); return state; } //阻塞調用get方法的線程,有超時限制 LockSupport.parkNanos(this, nanos); } else //阻塞調用get方法的線程,無超時限制 LockSupport.park(this); } }
3.11 removeWaiter方法解析
//移除任務節點 private void removeWaiter(WaitNode node) { if (node != null) { node.thread = null; retry: for (;;) { // restart on removeWaiter race for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; if (q.thread != null) pred = q; else if (pred != null) { pred.next = s; if (pred.thread == null) // check for race continue retry; } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } } }