- 你有一個思想,我有一個思想,咱們交換後,一我的就有兩個思想
- If you can NOT explain it simply, you do NOT understand it well enough
建立線程有幾種方式?這個問題的答案應該是能夠脫口而出的吧html
但這兩種方式建立的線程是屬於」三wu產品「:java
class MyThread implements Runnable{ @Override public void run() { log.info("my thread"); } }
Runnable 接口是 JDK1.0 的核心產物編程
/** * @since JDK1.0 */ @FunctionalInterface public interface Runnable { public abstract void run(); }
用着 「三wu產品」 老是有一些弊端,其中沒辦法拿到返回值是最讓人不能忍的,因而 Callable 就誕生了安全
又是 Doug Lea 大師,又是 Java 1.5 這個神奇的版本多線程
/** * @see Executor * @since 1.5 * @author Doug Lea * @param <V> the result type of method {@code call} */ @FunctionalInterface public interface Callable<V> { V call() throws Exception; }
Callable 是一個泛型接口,裏面只有一個 call()
方法,該方法能夠返回泛型值 V ,使用起來就像這樣:併發
Callable<String> callable = () -> { // Perform some computation Thread.sleep(2000); return "Return some result"; };
兩者都是函數式接口,裏面都僅有一個方法,使用上又是如此類似,除了有無返回值,Runnable 與 Callable 就點差異嗎?異步
兩個接口都是用於多線程執行任務的,但他們仍是有很明顯的差異的ide
先從執行機制上來看,Runnable 你太清楚了,它既能夠用在 Thread 類中,也能夠用在 ExecutorService 類中配合線程池的使用;Bu~~~~t, Callable 只能在 ExecutorService 中使用,你翻遍 Thread 類,也找不到Callable 的身影函數
Runnable 接口中的 run 方法簽名上沒有 throws ,天然也就沒辦法向上傳播受檢異常;而 Callable 的 call() 方法簽名卻有 throws,因此它能夠處理受檢異常;高併發
因此概括起來看主要有這幾處不一樣點:
總體差異雖然不大,可是這點差異,卻具備重大意義
返回值和處理異常很好理解,另外,在實際工做中,咱們一般要使用線程池來管理線程(緣由已經在 爲何要使用線程池? 中明確說明),因此咱們就來看看 ExecutorService 中是如何使用兩者的
先來看一下 ExecutorService 類圖
我將上圖標記的方法單獨放在此處
void execute(Runnable command); <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task);
能夠看到,使用ExecutorService 的 execute()
方法依舊得不到返回值,而 submit()
方法清一色的返回 Future
類型的返回值
細心的朋友可能已經發現, submit() 方法已經在 CountDownLatch 和 CyclicBarrier 傻傻的分不清楚? 文章中屢次使用了,只不過咱們沒有獲取其返回值罷了,那麼
咱們帶着這些疑問一點點來看
Future 又是一個接口,裏面只有五個方法:
從方法名稱上相信你已經能看出這些方法的做用
// 取消任務 boolean cancel(boolean mayInterruptIfRunning); // 獲取任務執行結果 V get() throws InterruptedException, ExecutionException; // 獲取任務執行結果,帶有超時時間限制 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; // 判斷任務是否已經取消 boolean isCancelled(); // 判斷任務是否已經結束 boolean isDone();
鋪墊了這麼多,看到這你也許有些亂了,我們趕忙看一個例子,演示一下幾個方法的做用
@Slf4j public class FutureAndCallableExample { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newSingleThreadExecutor(); // 使用 Callable ,能夠獲取返回值 Callable<String> callable = () -> { log.info("進入 Callable 的 call 方法"); // 模擬子線程任務,在此睡眠 2s, // 小細節:因爲 call 方法會拋出 Exception,這裏不用像使用 Runnable 的run 方法那樣 try/catch 了 Thread.sleep(5000); return "Hello from Callable"; }; log.info("提交 Callable 到線程池"); Future<String> future = executorService.submit(callable); log.info("主線程繼續執行"); log.info("主線程等待獲取 Future 結果"); // Future.get() blocks until the result is available String result = future.get(); log.info("主線程獲取到 Future 結果: {}", result); executorService.shutdown(); } }
程序運行結果以下:
若是你運行上述示例代碼,主線程調用 future.get() 方法會阻塞本身,直到子任務完成。咱們也可使用 Future 方法提供的 isDone
方法,它能夠用來檢查 task 是否已經完成了,咱們將上面程序作點小修改:
// 若是子線程沒有結束,則睡眠 1s 從新檢查 while(!future.isDone()) { System.out.println("Task is still not done..."); Thread.sleep(1000); }
來看運行結果:
若是子程序運行時間過長,或者其餘緣由,咱們想 cancel 子程序的運行,則咱們可使用 Future 提供的 cancel 方法,繼續對程序作一些修改
while(!future.isDone()) { System.out.println("子線程任務尚未結束..."); Thread.sleep(1000); double elapsedTimeInSec = (System.nanoTime() - startTime)/1000000000.0; // 若是程序運行時間大於 1s,則取消子線程的運行 if(elapsedTimeInSec > 1) { future.cancel(true); } }
來看運行結果:
爲何調用 cancel 方法程序會出現 CancellationException 呢? 是由於調用 get() 方法時,明確說明了:
調用 get() 方法時,若是計算結果被取消了,則拋出 CancellationException (具體緣由,你會在下面的源碼分析中看到)
有異常不處理是很是不專業的,因此咱們須要進一步修改程序,以更友好的方式處理異常
// 經過 isCancelled 方法判斷程序是否被取消,若是被取消,則打印日誌,若是沒被取消,則正常調用 get() 方法 if (!future.isCancelled()){ log.info("子線程任務已完成"); String result = future.get(); log.info("主線程獲取到 Future 結果: {}", result); }else { log.warn("子線程任務被取消"); }
查看程序運行結果:
相信到這裏你已經對 Future
的幾個方法有了基本的使用印象,但 Future
是接口,其實使用 ExecutorService.submit()
方法返回的一直都是 Future
的實現類 FutureTask
接下來咱們就進入這個核心實現類一探究竟
一樣先來看類結構
public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
很神奇的一個接口,FutureTask
實現了 RunnableFuture
接口,而 RunnableFuture
接口又分別實現了 Runnable
和 Future
接口,因此能夠推斷出 FutureTask
具備這兩種接口的特性:
Runnable
特性,因此能夠用在 ExecutorService
中配合線程池使用Future
特性,因此能夠從中獲取到執行結果若是你完整的看過 AQS 相關分析的文章,你也許會發現,閱讀 Java 併發工具類源碼,咱們無非就是要關注如下這三點:
- 狀態 (代碼邏輯的主要控制) - 隊列 (等待排隊隊列) - CAS (安全的set 值)
腦海中牢記這三點,我們開始看 FutureTask 源碼,看一下它是如何圍繞這三點實現相應的邏輯的
文章開頭已經提到,實現 Runnable 接口形式建立的線程並不能獲取到返回值,而實現 Callable 的才能夠,因此 FutureTask 想要獲取返回值,一定是和 Callable 有聯繫的,這個推斷一點都沒錯,從構造方法中就能夠看出來:
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
即使在 FutureTask 構造方法中傳入的是 Runnable 形式的線程,該構造方法也會經過 Executors.callable
工廠方法將其轉換爲 Callable 類型:
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
可是 FutureTask 實現的是 Runnable 接口,也就是隻能重寫 run() 方法,run() 方法又沒有返回值,那問題來了:
- FutureTask 是怎樣在 run() 方法中獲取返回值的?
- 它將返回值放到哪裏了?
- get() 方法又是怎樣拿到這個返回值的呢?
咱們來看一下 run() 方法(關鍵代碼都已標記註釋)
public void run() { // 若是狀態不是 NEW,說明任務已經執行過或者已經被取消,直接返回 // 若是狀態是 NEW,則嘗試把執行線程保存在 runnerOffset(runner字段),若是賦值失敗,則直接返回 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { // 獲取構造函數傳入的 Callable 值 Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { // 正常調用 Callable 的 call 方法就能夠獲取到返回值 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; // 保存 call 方法拋出的異常 setException(ex); } if (ran) // 保存 call 方法的執行結果 set(result); } } finally { runner = null; int s = state; // 若是任務被中斷,則執行中斷處理 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
run()
方法沒有返回值,至於 run()
方法是如何將 call()
方法的返回結果和異常都保存起來的呢?其實很是簡單, 就是經過 set(result) 保存正常程序運行結果,或經過 setException(ex) 保存程序異常信息
/** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes // 保存異常結果 protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } } // 保存正常結果 protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
setException
和 set
方法很是類似,都是將異常或者結果保存在 Object
類型的 outcome
變量中,outcome
是成員變量,就要考慮線程安全,因此他們要經過 CAS方式設置 outcome 變量的值,既然是在 CAS 成功後 更改 outcome 的值,這也就是 outcome 沒有被 volatile
修飾的緣由所在。
保存正常結果值(set方法)與保存異常結果值(setException方法)兩個方法代碼邏輯,惟一的不一樣就是 CAS 傳入的 state 不一樣。咱們上面提到,state 多數用於控制代碼邏輯,FutureTask 也是這樣,因此要搞清代碼邏輯,咱們須要先對 state 的狀態變化有所瞭解
/* * * Possible state transitions: * NEW -> COMPLETING -> NORMAL //執行過程順利完成 * NEW -> COMPLETING -> EXCEPTIONAL //執行過程出現異常 * NEW -> CANCELLED // 執行過程當中被取消 * NEW -> INTERRUPTING -> INTERRUPTED //執行過程當中,線程被中斷 */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
7種狀態,千萬別慌,整個狀態流轉其實只有四種線路
FutureTask 對象被建立出來,state 的狀態就是 NEW 狀態,從上面的構造函數中你應該已經發現了,四個最終狀態 NORMAL ,EXCEPTIONAL , CANCELLED , INTERRUPTED 也都很好理解,兩個中間狀態稍稍有點讓人困惑:
總的來講,這兩個中間狀態都表示一種瞬時狀態,咱們將幾種狀態圖形化展現一下:
咱們知道了 run() 方法是如何保存結果的,以及知道了將正常結果/異常結果保存到了 outcome 變量裏,那就須要看一下 FutureTask 是如何經過 get() 方法獲取結果的:
public V get() throws InterruptedException, ExecutionException { int s = state; // 若是 state 還沒到 set outcome 結果的時候,則調用 awaitDone() 方法阻塞本身 if (s <= COMPLETING) s = awaitDone(false, 0L); // 返回結果 return report(s); }
awaitDone 方法是 FutureTask 最核心的一個方法
// get 方法支持超時限制,若是沒有傳入超時時間,則接受的參數是 false 和 0L // 有等待就會有隊列排隊或者可響應中斷,從方法簽名上看有 InterruptedException,說明該方法這是能夠被中斷的 private int awaitDone(boolean timed, long nanos) throws InterruptedException { // 計算等待截止時間 final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { // 若是當前線程被中斷,若是是,則在等待對立中刪除該節點,並拋出 InterruptedException if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; // 狀態大於 COMPLETING 說明已經達到某個最終狀態(正常結束/異常結束/取消) // 把 thread 只爲空,並返回結果 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } // 若是是COMPLETING 狀態(中間狀態),表示任務已結束,但 outcome 賦值還沒結束,這時主動讓出執行權,讓其餘線程優先執行(只是發出這個信號,至因而否別的線程執行必定會執行但是不必定的) else if (s == COMPLETING) // cannot time out yet Thread.yield(); // 等待節點爲空 else if (q == null) // 將當前線程構造節點 q = new WaitNode(); // 若是尚未入隊列,則把當前節點加入waiters首節點並替換原來waiters else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 若是設置超時時間 else if (timed) { nanos = deadline - System.nanoTime(); // 時間到,則再也不等待結果 if (nanos <= 0L) { removeWaiter(q); return state; } // 阻塞等待特定時間 LockSupport.parkNanos(this, nanos); } else // 掛起當前線程,知道被其餘線程喚醒 LockSupport.park(this); } }
總的來講,進入這個方法,一般會經歷三輪循環
q == null
, 這時候會新建一個節點 q, 第一輪循環結束。!queue
,這個時候會把第一輪循環中生成的節點的 next 指針指向waiters,而後CAS的把節點q 替換waiters, 也就是把新生成的節點添加到waiters 中的首節點。若是替換成功,queued=true。第二輪循環結束。對於第二輪循環,你們可能稍稍有點迷糊,咱們前面說過,有阻塞,就會排隊,有排隊天然就有隊列,FutureTask 內部一樣維護了一個隊列
/** Treiber stack of waiting threads */ private volatile WaitNode waiters;
說是等待隊列,其實就是一個 Treiber 類型 stack,既然是 stack, 那就像手槍的彈夾同樣(腦補一會兒彈放入彈夾的情形),後進先出,因此剛剛說的第二輪循環,會把新生成的節點添加到 waiters stack 的首節點
若是程序運行正常,一般調用 get() 方法,會將當前線程掛起,那誰來喚醒呢?天然是 run() 方法運行完會喚醒,設置返回結果(set方法)/異常的方法(setException方法) 兩個方法中都會調用 finishCompletion 方法,該方法就會喚醒等待隊列中的線程
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; // 喚醒等待隊列中的線程 LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
將一個任務的狀態設置成終止態只有三種方法:
前兩種方法已經分析完,接下來咱們就看一下 cancel
方法
查看 Future cancel(),該方法註釋上明確說明三種 cancel 操做必定失敗的情形
其它狀況下,cancel操做將返回true。值得注意的是,cancel操做返回 true 並不表明任務真的就是被取消, 這取決於發動cancel狀態時,任務所處的狀態
若是發起cancel時任務已經在運行了,則這時就須要看 mayInterruptIfRunning
參數了:
有了這些鋪墊,看一下 cancel 代碼的邏輯就秒懂了
public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception // 須要中斷任務執行線程 if (mayInterruptIfRunning) { try { Thread t = runner; // 中斷線程 if (t != null) t.interrupt(); } finally { // final state // 修改成最終狀態 INTERRUPTED UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { // 喚醒等待中的線程 finishCompletion(); } return true; }
核心方法終於分析完了,到這我們喝口茶休息一下吧
我是想說,使用 FutureTask 來演練燒水泡茶經典程序
如上圖:
最終泡茶
讓我心算一下,若是串行總共須要 20 分鐘,但很顯然在燒開水期間,咱們能夠洗茶壺/洗茶杯/拿茶葉
這樣總共須要 16 分鐘,節約了 4分鐘時間,燒水泡茶尚且如此,在如今高併發的時代,4分鐘能夠作的事太多了,學會使用 Future 優化程序是必然(其實優化程序就是尋找關鍵路徑,關鍵路徑找到了,非關鍵路徑的任務一般就能夠和關鍵路徑的內容並行執行了)
@Slf4j public class MakeTeaExample { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(2); // 建立線程1的FutureTask FutureTask<String> ft1 = new FutureTask<String>(new T1Task()); // 建立線程2的FutureTask FutureTask<String> ft2 = new FutureTask<String>(new T2Task()); executorService.submit(ft1); executorService.submit(ft2); log.info(ft1.get() + ft2.get()); log.info("開始泡茶"); executorService.shutdown(); } static class T1Task implements Callable<String> { @Override public String call() throws Exception { log.info("T1:洗水壺..."); TimeUnit.SECONDS.sleep(1); log.info("T1:燒開水..."); TimeUnit.SECONDS.sleep(15); return "T1:開水已備好"; } } static class T2Task implements Callable<String> { @Override public String call() throws Exception { log.info("T2:洗茶壺..."); TimeUnit.SECONDS.sleep(1); log.info("T2:洗茶杯..."); TimeUnit.SECONDS.sleep(2); log.info("T2:拿茶葉..."); TimeUnit.SECONDS.sleep(1); return "T2:福鼎白茶拿到了"; } } }
上面的程序是主線程等待兩個 FutureTask 的執行結果,線程1 燒開水時間更長,線程1但願在水燒開的那一剎那就能夠拿到茶葉直接泡茶,怎麼半呢?
那隻須要在線程 1 的FutureTask 中獲取 線程 2 FutureTask 的返回結果就能夠了,咱們稍稍修改一下程序:
@Slf4j public class MakeTeaExample1 { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(2); // 建立線程2的FutureTask FutureTask<String> ft2 = new FutureTask<String>(new T2Task()); // 建立線程1的FutureTask FutureTask<String> ft1 = new FutureTask<String>(new T1Task(ft2)); executorService.submit(ft1); executorService.submit(ft2); executorService.shutdown(); } static class T1Task implements Callable<String> { private FutureTask<String> ft2; public T1Task(FutureTask<String> ft2) { this.ft2 = ft2; } @Override public String call() throws Exception { log.info("T1:洗水壺..."); TimeUnit.SECONDS.sleep(1); log.info("T1:燒開水..."); TimeUnit.SECONDS.sleep(15); String t2Result = ft2.get(); log.info("T1 拿到T2的 {}, 開始泡茶", t2Result); return "T1: 上茶!!!"; } } static class T2Task implements Callable<String> { @Override public String call() throws Exception { log.info("T2:洗茶壺..."); TimeUnit.SECONDS.sleep(1); log.info("T2:洗茶杯..."); TimeUnit.SECONDS.sleep(2); log.info("T2:拿茶葉..."); TimeUnit.SECONDS.sleep(1); return "福鼎白茶"; } } }
來看程序運行結果:
知道這個變化後咱們再回頭看 ExecutorService 的三個 submit 方法:
<T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); <T> Future<T> submit(Callable<T> task);
第一種方法,逐層代碼查看到這裏:
你會發現,和咱們改造燒水泡茶的程序思惟是類似的,能夠傳進去一個 result,result 至關於主線程和子線程之間的橋樑,經過它主子線程能夠共享數據
第二個方法參數是 Runnable 類型參數,即使調用 get() 方法也是返回 null,因此僅是能夠用來斷言任務已經結束了,相似 Thread.join()
第三個方法參數是 Callable 類型參數,經過get() 方法能夠明確獲取 call() 方法的返回值
到這裏,關於 Future 的整塊講解就結束了,仍是須要簡單消化一下的
若是熟悉 Javascript 的朋友,Future 的特性和 Javascript 的 Promise 是相似的,私下開玩笑一般將其比喻成男友的承諾
迴歸到Java,咱們從 JDK 的演變歷史,談及 Callable 的誕生,它彌補了 Runnable 沒有返回值的空缺,經過簡單的 demo 瞭解 Callable 與 Future 的使用。 FutureTask 又是 Future接口的核心實現類,經過閱讀源碼瞭解了整個實現邏輯,最後結合FutureTask 和線程池演示燒水泡茶程序,相信到這裏,你已經能夠輕鬆獲取線程結果了
燒水泡茶是很是簡單的,若是更復雜業務邏輯,以這種方式使用 Future 一定會帶來很大的會亂(程序結束沒辦法主動通知,Future 的連接和整合都須要手動操做)爲了解決這個短板,沒錯,又是那個男人 Doug Lea, CompletableFuture
工具類在 Java1.8 的版本出現了,搭配 Lambda 的使用,讓咱們編寫異步程序也像寫串行代碼那樣簡單,縱享絲滑
接下來咱們就瞭解一下 CompletableFuture
的使用
日拱一兵 | 原創