經過直接繼承Thread, 實現Runnable接口來建立線程。但這兩種方式都有一種缺陷:在執行完任務以後沒法得到執行結果。html
若是須要得到執行結果,就必須經過共享變量或者使用線程通訊的方式來達到效果,這樣使用起來比較麻煩,而jdk中Callable和Future,經過他們能夠在任務執行完畢以後獲得任務執行結果。先看看他們之間的組織關係:java
Callable:node
public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
源碼可知,它也是個一個接口,在他裏面也只是申明一個方法,只不過這個方法爲call(),call方法返回的就是該泛型傳遞進來的V類型,他怎麼使用呢?就是結合以前的ExecuteService:數據結構
<T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task);
第一個submit方法裏面的參數類型就是Callable。多線程
Future:併發
Future就是對於具體的Runnable或者Callable任務的執行進度的查看,取消,查詢是否完成,獲取結果(正確完成時的結果,或異常)。必要時能夠經過get方法獲取執行的結果,該方法會阻塞直到任務返回結果,或經過指定阻塞時間的版本。dom
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
其中cancel()方法用來取消任務,若是取消任務成功則返回true, 若是取消任務失敗則返回false。 參數mayInterruptIfRunning表示是否容許取消真在執行去沒有執行完畢的任務,若是設置true, 則表示能夠取消正在執行過程的任務。 當任務已經完成,或者已經被取消過了,或者由於別的緣由不能取消, 則返回false。 當取消時,該任務尚未開始執行,則該任務不會執行,而且老是返回true。ide
FutureTask:this
public class FutureTask<V> implements RunnableFuture<V>
FutureTask類實現了RunnableFuture接口,看一下RunnableFuture接口的定義:spa
public interface RunnableFuture<V> extends Runnable, Future<V>
RunnableFuture接口接觸了Runnable接口和Future接口, 而FutureTask實現了RunnableFuture接口,因此它既可做爲Runnable被線程執行,也能夠做爲Future獲得Callable的返回值。
構造器定義:
public FutureTask(Callable<V> callable) public FutureTask(Runnable runnable, V result) {
再來看看第二個構造器中的參數怎麼變身Callable的:
this.callable = Executors.callable(runnable, result);
調用Executors.callable方法:
public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); }
簡單實現Callable:
static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
流程:
下面結合完整具體流程走一下FutureTask過程,並解析源碼,草圖以下:
實例代碼以下:
1 public class Test { 2 public static void main(String[] args) { 3 //第一種方式 4 ExecutorService executor = Executors.newCachedThreadPool(); 5 Task task = new Task(); 6 FutureTask<Integer> futureTask = new FutureTask<Integer>(task); 7 executor.submit(futureTask); 8 executor.shutdown(); 15 16 try { 17 Thread.sleep(1000); 18 } catch (InterruptedException e1) { 19 e1.printStackTrace(); 20 } 21 22 System.out.println("主線程在執行任務"); 23 24 try { 25 System.out.println("task運行結果"+futureTask.get()); 26 } catch (InterruptedException e) { 27 e.printStackTrace(); 28 } catch (ExecutionException e) { 29 e.printStackTrace(); 30 } 31 32 System.out.println("全部任務執行完畢"); 33 } 34 } 35 class Task implements Callable<Integer>{ 36 @Override 37 public Integer call() throws Exception { 38 System.out.println("子線程在進行計算"); 39 Thread.sleep(3000); 40 int sum = 0; 41 for(int i=0;i<100;i++) 42 sum += i; 43 return sum; 44 } 45 }
分析過程以前,先準備前準備知識,首先看一下FutureTask內部狀態,以及之間的轉變:
private volatile int state; // volatile 內存可見性 private static final int NEW = 0; //該狀態爲new FutureTask()時設定,同時也表示內部成員callable已經成功賦值,一直到worker thread完成FutureTask中run(). private static final int COMPLETING = 1; //該狀態位worker thread完成task時設定的中間狀態,處於該狀態下,說明worker thread 真正準備設置result. private static final int NORMAL = 2; //當設置result結果完成後,FutureTask處於該狀態,表明過程結果,該狀態爲最終狀態final state,(正確完成的最終狀態) private static final int EXCEPTIONAL = 3; // 同上,只不過task執行過程出現異常,此時結果設值爲exception,也是final state private static final int CANCELLED = 4; //final state, 代表task被cancel(task尚未執行就被cancel的狀態). private static final int INTERRUPTING = 5; // 中間狀態,task運行過程當中被interrupt時,設置的中間狀態; private static final int INTERRUPTED = 6; // final state, 中斷完畢的最終狀態,幾種狀況,下面具體分析。
下面是狀態之間的轉變,貫穿主線:
* Possible state transitions: 1* NEW -> COMPLETING -> NORMAL 2* NEW -> COMPLETING -> EXCEPTIONAL 3* NEW -> CANCELLED 4* NEW -> INTERRUPTING -> INTERRUPTED */
其餘重要的變量:
/** The underlying callable; nulled out after running */ private Callable<V> callable; // 具體run運行時會調用其方法call(),並得到結果,結果時置爲null. /** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes 不必爲votaile,由於其是伴隨state 進行讀寫,而state是FutureTask的主導因素。 /** The thread running the callable; CASed during run() */ private volatile Thread runner; //具體的worker thread. /** Treiber stack of waiting threads */ private volatile WaitNode waiters; //Treiber stack 併發stack數據結構,用於存放阻塞在該futuretask#get方法的線程。
OK,構造new FutureTask開始:
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; //底層callable賦值 this.state = NEW; // 初始狀態NEW,同時也標誌了callable的賦值,可見性 }
ThreadPoolExecutor.submit(Runnable),ThreadPoolExecutor裏面具體細節請見這裏,這裏就假設它直接new a thread來處理該任務了,由於FutureTask爲Runnable的子類,因此worker thread調用該類的run()方法:
public void run() {
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) //狀態檢測,和當前worker Thread的cas原子賦值,有一個不成立,就直接返回。什麼狀況下還沒run()呢?就不是NEW狀態了呢? return; //caller調用cancel了,此時狀態爲Interrupting,也說明了上面的cancel方法說明,task沒運行時,就interrupt,task得不到運行。老是返回 try { //true。
//再來看看這裏worker thread賦值爲何要用cas操做,有競爭racing? 競爭哪裏來?難道threadPoolExecutor線程池多個線程可能搶同一個 Callable<V> c = callable; //任務?不可能 1:線程數 < coreThreadPool 時, 直接new thread, 2 : 大於 coreThreadpool時,放在blockingqueue裏,取的話只能一 if (c != null && state == NEW) { //線程。能想到就是caller那邊了,即多callers(多線程)提交同一FutureTask. V result; //多線程同時提交同一FutureTask,確保該FutureTask的run()只被調用一次, boolean ran; try { result = c.call(); //此處的if,1:當state == NEW(task沒完成,中斷) 而且 worker Thread爲null時,纔會獲得運行 ran = true; // 2: task已經完成了 或者 該任務已經有worker thread來執行時,直接返回不會運行。 } catch (Throwable ex) { //調用callable的call方法 result = null; //執行task時有異常 ran = false; //附異常 setException(ex); } if (ran) //正常完成,則賦值 set(result); } } finally { //注意!!什麼這裏吧runner置爲null,此時run()方法還沒運行完呢啊!如今置爲null,不怕併發調用run()嗎?注意此時state已經變化了(Comple runner = null; //teing或者interrupting了,run()一開始state != NEW 直接return,不會運行。能夠說經過state和 worker thread來一塊兒控制併發調用run int s = state; //必須再讀一次,防止worker thread == null後,遺漏的interrup信號,底下具體分析中斷的狀況。 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); //若是caller中斷信號有的話,則處理該interrupt. } //另外該任務是一致性任務,即state只要不爲NEW,該任務就不會在運行,運行結束或cancel後,就不能在運行了,由於state狀態在那不變哦! }
請看下例子,三個提交線程(提交同一個FutureTask):
public class Test { public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); Task task = new Task(); CountDownLatch latch = new CountDownLatch(1); FutureTask<Integer> futureTask = new FutureTask<Integer>(task); for (int i = 0 ; i < 3; i++) { new Thread(new Submit(executor, futureTask, latch)).start(); } try { Thread.sleep(3000); latch.countDown(); Thread.sleep(20000); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println("全部任務執行完畢"); executor.shutdown(); } } class Submit implements Runnable { private CountDownLatch latch ; private ExecutorService es ; private FutureTask<Integer> task; public Submit(ExecutorService es, FutureTask<Integer> task, CountDownLatch latch) { this.latch = latch; this.es = es; this.task = task; } public void run() { try { latch.await(); Future<?> future = (Future<?>) es.submit(task); System.out.println("Thread name : " + Thread.currentThread().getName() + "go!"); future.get(3000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e1) { e1.printStackTrace(); } catch (TimeoutException e2) { System.err.println("Thread name : " + Thread.currentThread().getName() + " " + e2); } } } class Task implements Callable<Integer>{ public Integer call() throws Exception { System.out.println("thread name : " + Thread.currentThread().getName() + "do the work!"); Thread.sleep(6000); int sum = 0; for(int i=0;i<100;i++) sum += i; return sum; } }
顯示以下:
Thread name : Thread-1go! Thread name : Thread-0go! Thread name : Thread-2go! thread name : pool-1-thread-1do the work! Thread name : Thread-1 java.util.concurrent.TimeoutException 全部任務執行完畢
結果很顯然,同一個任務屢次提交(併發提交),FutureTask保證只是啓一個線程來運行。
想運行屢次(只要不cancel,和throw exception,由於他set(result),正常運行結束,state仍是new),用這個:
protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { runner = null; s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; }
再來看看setException()和set(result):
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // cas原子操做,失敗直接返回,成功的前提以前的狀態必須爲NEW. outcome = v; //可能和什麼衝突呢? caller已經cancel該task,狀態位Interrupting或者Interrpted(此次Interrupted表明interrupt完成,這set() UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state // 不是在worker thread中調用的嘛,怎麼intterupt都完成了,怎麼worker thread還在運行呢?worker thread運行的代碼中沒有響 finishCompletion(); //應interrupt的代碼。因此客戶端cancel操做,對運行中的worker thread,並不必定讓它停下來,不過此時即便運行完畢,也不能賦值。 } } //new -> Completing-> NORMAL 或者NEW ->Interrupting->Intterrpted.
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; //同上,不過附異常。 UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } //new ->completing ->exception 或者 同上 }
finishCompletion()等會細聊,主要是沒說到get()阻塞呢!看看caller端線程調用cancel()和workerThread的handlePossibleCancellationInterrupt(int s)協調:
public boolean cancel(boolean mayInterruptIfRunning) { if (state != NEW) return false; //1:已經cancel(cancelled,Interruping, Interrupted)過了 2:正常完成 Completing(Completed) 3:異常完成completing(exception) 直接返回false; if (mayInterruptIfRunning) { // flag : worker thread 已經啓動運行了,是否能夠中斷 if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) //再次檢查state狀態,完成的話(上面的三種),直接返回false; return false; Thread t = runner; if (t != null) // t == null對應Future task還沒啓動, 跳過thread.interrupt(),直接由interrpting -> interrupted,成功的話 t.interrupt(); //調用worker thread的 interrupt() //mayInterrptIfRunning 爲true ,interrupt 狀態轉變 new -> interrupting -> interrupted. UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state } else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) //mayInterruptIfRunning 爲false,interrupt成功的 狀態轉變 new -> Cancelled return false; finishCompletion(); return true; }
由上面可知,客戶端cancel()中很多cas操做,主要來自兩方面的racing, 1:線程池worker Thread的完成(異常,正常)狀態設置; 2:同一futuretask,不一樣客戶端線程callers的cancel操做。
private void handlePossibleCancellationInterrupt(int s) { // It is possible for our interrupter to stall before getting a // chance to interrupt us. Let's spin-wait patiently. if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt // assert state == INTERRUPTED; // We want to clear any interrupt we may have received from // cancel(true). However, it is permissible to use interrupts // as an independent mechanism for a task to communicate with // its caller, and there is no way to clear only the // cancellation interrupt. // // Thread.interrupted(); }
當state處於Interrupting, 即caller即將調用worker thread.interrupt(), 因此worker thread自旋會,等會interrupt方法的調用,保留interrupt標誌。
再來看看get()和帶參數的get(timeout):
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) //結果未設定的狀況下 s = awaitDone(false, 0L); //無條件等待 return report(s); } /** * @throws CancellationException {@inheritDoc} */ public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) //等到timeout時間內,沒完成,throws TimeoutException throw new TimeoutException(); return report(s); }
awaitDone():
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; //計算出等待時間。 WaitNode q = null; boolean queued = false; //是否加入阻塞隊列的標誌 for (;;) { if (Thread.interrupted()) { //阻塞該caller線程以前,caller線程被中斷,直接throw 異常 removeWaiter(q); //在阻塞隊列中移除該線程的封裝node.此處無心義 throw new InterruptedException(); } int s = state; //讀取state,阻塞前 recheck一下 是否完成? if (s > COMPLETING) { //完成了,直接返回,不要阻塞了 if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); //等會,快完成了。 else if (q == null) q = new WaitNode(); //當前阻塞線程鏈表的簡單封裝 else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); //設爲當前FutureTask阻塞鏈表(stack結構)的棧頂。 else if (timed) { nanos = deadline - System.nanoTime(); //計算當前要阻塞的等待時間 if (nanos <= 0L) { removeWaiter(q); //小於0 直接返回,當前REMOVEWaiter無心義,並無加入stack中。 return state; } LockSupport.parkNanos(this, nanos);本地native方法,阻塞當前線程。 } else LockSupport.park(this); //無時間條件阻塞 } }
無時間限制阻塞,有時間阻塞(阻塞時間大於task完成時間)會等到任務完成而給通知,喚醒該線程,即finishCompletion();而有時間阻塞(阻塞時間在task完成之間就已經結束的)會經過for()退出(退出前,刪除等待隊列中的節點)。
WaiterNode定義:
static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } //當前阻塞線程的引用 }
結合awaitDone()中的新阻塞節點加入順序,其定位stack結構(Treiber stack);
removeWaiter():
private void removeWaiter(WaitNode node) { if (node != null) { node.thread = null; retry: for (;;) { // restart on removeWaiter race for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; if (q.thread != null) pred = q; else if (pred != null) { pred.next = s; if (pred.thread == null) // 檢測競爭 continue retry; //發生重試 } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } } }
finishCompletion():
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { 獲得棧頂阻塞節點的引用 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; //取得阻塞的線程引用, if (t != null) { q.thread = null; LockSupport.unpark(t);//解阻塞 } WaitNode next = q.next; //遍歷解阻塞線程 if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
其實,前面的分析可知,多個caller線程併發提交同一個FutureTask, 而且所謂調用get()阻塞的話(阻塞在該FutureTask上),實際上也就一個caller線程阻塞,其餘線程在調用該FutureTask的run()開始條件檢查時,就直接return了,實際狀況:三個併發線程提交同一個future task,對應生成三份FutureTask(不一樣於以前),三份FutureTask中對應三分Callable,而這三份Callable含有相同的FutureTask(所謂的相同任務) ,向ThreadPoolExecutor.submit(Runnable)實際上提交了三份Runnable(即生成的三分FutureTask), FutureTask實現了Runnable接口, 而後ThreadPoolExecutor生成三個線程來執行這所謂的三個任務,這三個任務run()中都是調用對應內部的callable的call(), 而callable的call方法調用的是他們共同引用的FutureTask(同一個對像)的run()方法,而run方法, 咱們上面解析過了,經過cas和狀態檢測,只運行一個worker thread 調用run()(見上),另外兩個線程直接從共同底層FutureTask的run方法開始直接返回。
暈了?從頭再來看看提交的過程:
1:submit(FutureTask(Runnable)):AbstractExecutorService
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); }
2:生成三個FutureTask(其中runnable就是同一個底層FutureTask任務):
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
3:調用Executors.callable():
public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); }
static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { //直接調用底層同一個FutureTask的run(); task.run(); return result; } }
即三次提交,生成三份FutureTask,每份FutureTask調用Executors.callable()爲本身底層的callable賦值,而Executors.callable方法生成簡單的Callable實現,其中call(),調用底層共同FutureTask的run(), 也就受共同futureTask內部狀態(state, runThread)限制。因此,阻塞在底層共同FutureTask阻塞隊列中的只有一個線程,看下例:
public class Test { public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); Task task = new Task(); int waitTime = 4000; CountDownLatch latch = new CountDownLatch(1); FutureTask<Integer> futureTask = new FutureTask<Integer>(task); for (int i = 0 ; i < 3; i++) { new Thread(new Submit(executor, futureTask, latch, waitTime)).start(); } try { Thread.sleep(3000); latch.countDown(); Thread.sleep(8000); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println("全部任務執行完畢"); executor.shutdown(); } } class Submit implements Runnable { private CountDownLatch latch ; private ExecutorService es ; private FutureTask<Integer> task; private int waitTime ; public Submit(ExecutorService es, FutureTask<Integer> task, CountDownLatch latch, int waitTime) { this.latch = latch; this.es = es; this.task = task; this.waitTime = waitTime; } public void run() { try { latch.await(); Future<?> future = es.submit(task); System.out.println("Thread name : " + Thread.currentThread().getName() + " go!"); waitTime = new Random().nextInt(waitTime); System.out.println("Thread name : " + Thread.currentThread().getName() + " , The wait time : = " + waitTime ); future.get(waitTime, TimeUnit.MILLISECONDS); System.out.println("Thread name : " + Thread.currentThread().getName() + " run over!"); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e1) { e1.printStackTrace(); } catch (TimeoutException e2) { System.err.println("Thread name : " + Thread.currentThread().getName() + " " + e2); } } } class Task implements Callable<Integer>{ public Integer call() throws Exception { System.out.println("thread name : " + Thread.currentThread().getName() + " do the work!"); Thread.sleep(4000); int sum = 0; for(int i=0;i<20;i++) sum += i; return sum; } } class Task1 implements Runnable{ int sum = 0; @Override public void run() { System.out.println("Thread Name : " + Thread.currentThread().getName() + "do the work!"); try { Thread.sleep(6000); for(int i=0;i<100;i++) sum += i; } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
顯示結果:
Thread name : Thread-2 go! Thread name : Thread-0 go! Thread name : Thread-0 , The wait time : = 2738 thread name : pool-1-thread-1 do the work! Thread name : Thread-1 go! Thread name : Thread-2 , The wait time : = 284 Thread name : Thread-1 , The wait time : = 678 Thread name : Thread-2 run over! Thread name : Thread-0 run over! Thread name : Thread-1 java.util.concurrent.TimeoutException 全部任務執行完畢
三個線程都是阻塞一段時間,可是隻有一個超時,另外兩個運行完畢,(他兩實際工做那部分沒運行,處理各自FutureTask那部分代碼,因此只能看到線程池只有一個線程處理底層FutureTask);
但,若是直接併發提交Callable,或者Runnable,線程池會啓動三個線程來分別處理這三個不一樣任務,朋友能夠自行試驗demo下。而FutureTask是本身的自身的限制。
後話,通常調用ThreadPoolExecutor.submit()提交的是Callable<T>和Runnable, 返回的Future<T>, Future<?>(返回Null,或者不要求返回值),提交FutureTask用不着,因此實際中不會碰見這種狀況。
另外,本文源碼基於jdk1.7,與網上1.7以前源碼不一樣(1.6經過AQS實現)。