做爲Java 8 Concurrency API改進而引入,本文是CompletableFuture類的功能和用例的介紹。同時在Java 9 也有對CompletableFuture有一些改進,以後再進入講解。設計模式
Future異步計算很難操做,一般咱們但願將任何計算邏輯視爲一系列步驟。可是在異步計算的狀況下,表示爲回調的方法每每分散在代碼中或者深深地嵌套在彼此內部。可是當咱們須要處理其中一個步驟中可能發生的錯誤時,狀況可能會變得更復雜。併發
Futrue接口是Java 5中做爲異步計算而新增的,但它沒有任何方法去進行計算組合或者處理可能出現的錯誤。框架
在Java 8中,引入了CompletableFuture類。與Future接口一塊兒,它還實現了CompletionStage接口。此接口定義了可與其餘Future組合成異步計算契約。異步
CompletableFuture同時是一個組合和一個框架,具備大約50種不一樣的構成,結合,執行異步計算步驟和處理錯誤。函數
如此龐大的API可能會使人難以招架,下文將調一些重要的作重點介紹。this
首先,CompletableFuture類實現Future接口,所以你能夠將其用做Future實現,但須要額外的完成實現邏輯。線程
例如,你可使用無構參構造函數建立此類的實例,而後使用complete
方法完成。消費者可使用get方法來阻塞當前線程,直到get()
結果。設計
在下面的示例中,咱們有一個建立CompletableFuture實例的方法,而後在另外一個線程中計算並當即返回Future。code
計算完成後,該方法經過將結果提供給完整方法來完成Future:對象
public Future<String> calculateAsync() throws InterruptedException { CompletableFuture<String> completableFuture = new CompletableFuture<>(); Executors.newCachedThreadPool().submit(() -> { Thread.sleep(500); completableFuture.complete("Hello"); return null; }); return completableFuture; }
爲了分離計算,咱們使用了Executor API ,這種建立和完成CompletableFuture的方法能夠與任何併發包(包括原始線程)一塊兒使用。
請注意,該calculateAsync
方法返回一個Future
實例。
咱們只是調用方法,接收Future實例並在咱們準備阻塞結果時調用它的get方法。
另請注意,get方法拋出一些已檢查的異常,即ExecutionException(封裝計算期間發生的異常)和InterruptedException(表示執行方法的線程被中斷的異常):
Future<String> completableFuture = calculateAsync(); // ... String result = completableFuture.get(); assertEquals("Hello", result);
若是你已經知道計算的結果,也能夠用變成同步的方式來返回結果。
Future<String> completableFuture = CompletableFuture.completedFuture("Hello"); // ... String result = completableFuture.get(); assertEquals("Hello", result);
做爲在某些場景中,你可能但願取消Future任務的執行。
假設咱們沒有找到結果並決定徹底取消異步執行任務。這能夠經過Future的取消方法完成。此方法mayInterruptIfRunning
,但在CompletableFuture的狀況下,它沒有任何效果,由於中斷不用於控制CompletableFuture的處理。
這是異步方法的修改版本:
public Future<String> calculateAsyncWithCancellation() throws InterruptedException { CompletableFuture<String> completableFuture = new CompletableFuture<>(); Executors.newCachedThreadPool().submit(() -> { Thread.sleep(500); completableFuture.cancel(false); return null; }); return completableFuture; }
當咱們使用Future.get()方法阻塞結果時,cancel()
表示取消執行,它將拋出CancellationException:
Future<String> future = calculateAsyncWithCancellation(); future.get(); // CancellationException
上面的代碼很簡單,下面介紹幾個 static 方法,它們使用任務來實例化一個 CompletableFuture 實例。
CompletableFuture.runAsync(Runnable runnable); CompletableFuture.runAsync(Runnable runnable, Executor executor); CompletableFuture.supplyAsync(Supplier<U> supplier); CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)
靜態方法runAsync
和supplyAsync
容許咱們相應地從Runnable和Supplier功能類型中建立CompletableFuture實例。
該Runnable的接口是在線程使用舊的接口,它不容許返回值。
Supplier接口是一個不具備參數,並返回參數化類型的一個值的單個方法的通用功能接口。
這容許將Supplier的實例做爲lambda表達式提供,該表達式執行計算並返回結果:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello"); // ... assertEquals("Hello", future.get());
在兩個任務任務A,任務B中,若是既不須要任務A的值也不想在任務B中引用,那麼你能夠將Runnable lambda 傳遞給thenRun()
方法。在下面的示例中,在調用future.get()方法以後,咱們只需在控制檯中打印一行:
模板
CompletableFuture.runAsync(() -> {}).thenRun(() -> {}); CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {});
thenRun(Runnable runnable)
,任務 A 執行完執行 B,而且 B 不須要 A 的結果。thenRun(Runnable runnable)
,任務 A 執行完執行 B,會返回resultA
,可是 B 不須要 A 的結果。實戰
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<Void> future = completableFuture .thenRun(() -> System.out.println("Computation finished.")); future.get();
在兩個任務任務A,任務B中,若是你不須要在Future中有返回值,則能夠用 thenAccept
方法接收將計算結果傳遞給它。最後的future.get()調用返回Void類型的實例。
模板
CompletableFuture.runAsync(() -> {}).thenAccept(resultA -> {}); CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {});
runAsync
不會有返回值,第二個方法thenAccept
,接收到的resultA值爲null,同時任務B也不會有返回結果supplyAsync
有返回值,同時任務B不會有返回結果。實戰
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<Void> future = completableFuture .thenAccept(s -> System.out.println("Computation returned: " + s)); future.get();
在兩個任務任務A,任務B中,任務B想要任務A計算的結果,能夠用thenApply
方法來接受一個函數實例,用它來處理結果,並返回一個Future函數的返回值:
模板
CompletableFuture.runAsync(() -> {}).thenApply(resultA -> "resultB"); CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB");
實戰
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<String> future = completableFuture .thenApply(s -> s + " World"); assertEquals("Hello World", future.get());
固然,多個任務的狀況下,若是任務 B 後面還有任務 C,往下繼續調用 .thenXxx() 便可。
接下來會有一個頗有趣的設計模式;
CompletableFuture API 的最佳場景是可以在一系列計算步驟中組合CompletableFuture實例。
這種組合結果自己就是CompletableFuture,容許進一步再續組合。這種方法在函數式語言中無處不在,一般被稱爲monadic設計模式
。
簡單說,Monad就是一種設計模式,表示將一個運算過程,經過函數拆解成互相鏈接的多個步驟。你只要提供下一步運算所需的函數,整個運算就會自動進行下去。
在下面的示例中,咱們使用thenCompose方法按順序組合兩個Futures。
請注意,此方法採用返回CompletableFuture實例的函數。該函數的參數是先前計算步驟的結果。這容許咱們在下一個CompletableFuture的lambda中使用這個值:
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello") .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World")); assertEquals("Hello World", completableFuture.get());
該thenCompose方法連同thenApply同樣實現告終果的合併計算。可是他們的內部形式是不同的,它們與Java 8中可用的Stream和Optional類的map和flatMap方法是有着相似的設計思路在裏面的。
兩個方法都接收一個CompletableFuture並將其應用於計算結果,但thenCompose(flatMap)方法接收一個函數,該函數返回相同類型的另外一個CompletableFuture對象。此功能結構容許將這些類的實例繼續進行組合計算。
取兩個任務的結果
若是要執行兩個獨立的任務,並對其結果執行某些操做,能夠用Future的thenCombine方法:
模板
CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA"); CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> "resultB"); cfA.thenAcceptBoth(cfB, (resultA, resultB) -> {}); cfA.thenCombine(cfB, (resultA, resultB) -> "result A + B");
實戰
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello") .thenCombine(CompletableFuture.supplyAsync( () -> " World"), (s1, s2) -> s1 + s2)); assertEquals("Hello World", completableFuture.get());
更簡單的狀況是,當你想要使用兩個Future結果時,但不須要將任何結果值進行返回時,能夠用thenAcceptBoth
,它表示後續的處理不須要返回值,而 thenCombine 表示須要返回值:
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello") .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> System.out.println(s1 + s2));
在前面的部分中,咱們展現了關於thenApply()和thenCompose()的示例。這兩個API都是使用的CompletableFuture調用,但這兩個API的使用是不一樣的。
此方法用於處理先前調用的結果。可是,要記住的一個關鍵點是返回類型是轉換泛型中的類型,是同一個CompletableFuture。
所以,當咱們想要轉換CompletableFuture 調用的結果時,效果是這樣的 :
CompletableFuture<Integer> finalResult = compute().thenApply(s-> s + 1);
該thenCompose()方法相似於thenApply()在都返回一個新的計算結果。可是,thenCompose()使用前一個Future做爲參數。它會直接使結果變新的Future,而不是咱們在thenApply()中到的嵌套Future,而是用來鏈接兩個CompletableFuture,是生成一個新的CompletableFuture:
CompletableFuture<Integer> computeAnother(Integer i){ return CompletableFuture.supplyAsync(() -> 10 + i); } CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);
所以,若是想要繼續嵌套連接CompletableFuture 方法,那麼最好使用thenCompose()。
當咱們須要並行執行多個任務時,咱們一般但願等待全部它們執行,而後處理它們的組合結果。
該CompletableFuture.allOf
靜態方法容許等待全部的完成任務:
API
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs){...}
實戰
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Beautiful"); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "World"); CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2, future3); // ... combinedFuture.get(); assertTrue(future1.isDone()); assertTrue(future2.isDone()); assertTrue(future3.isDone());
請注意,CompletableFuture.allOf()的返回類型是CompletableFuture <Void>。這種方法的侷限性在於它不會返回全部任務的綜合結果。相反,你必須手動從Futures獲取結果。幸運的是,CompletableFuture.join()方法和Java 8 Streams API能夠解決:
String combined = Stream.of(future1, future2, future3) .map(CompletableFuture::join) .collect(Collectors.joining(" ")); assertEquals("Hello Beautiful World", combined);
CompletableFuture 提供了 join() 方法,它的功能和 get() 方法是同樣的,都是阻塞獲取值,它們的區別在於 join() 拋出的是 unchecked Exception。這使得它能夠在Stream.map()方法中用做方法引用。
說到這裏,咱們順便來講下 CompletableFuture 的異常處理。這裏咱們要介紹兩個方法:
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn); public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
看下代碼
CompletableFuture.supplyAsync(() -> "resultA") .thenApply(resultA -> resultA + " resultB") .thenApply(resultB -> resultB + " resultC") .thenApply(resultC -> resultC + " resultD");
上面的代碼中,任務 A、B、C、D 依次執行,若是任務 A 拋出異常(固然上面的代碼不會拋出異常),那麼後面的任務都得不到執行。若是任務 C 拋出異常,那麼任務 D 得不到執行。
那麼咱們怎麼處理異常呢?看下面的代碼,咱們在任務 A 中拋出異常,並對其進行處理:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { throw new RuntimeException(); }) .exceptionally(ex -> "errorResultA") .thenApply(resultA -> resultA + " resultB") .thenApply(resultB -> resultB + " resultC") .thenApply(resultC -> resultC + " resultD"); System.out.println(future.join());
上面的代碼中,任務 A 拋出異常,而後經過 .exceptionally()
方法處理了異常,並返回新的結果,這個新的結果將傳遞給任務 B。因此最終的輸出結果是:
errorResultA resultB resultC resultD
String name = null; // ... CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { if (name == null) { throw new RuntimeException("Computation error!"); } return "Hello, " + name; })}).handle((s, t) -> s != null ? s : "Hello, Stranger!"); assertEquals("Hello, Stranger!", completableFuture.get());
固然,它們也能夠都爲 null,由於若是它做用的那個 CompletableFuture 實例沒有返回值的時候,s 就是 null。
CompletableFuture類中的API的大多數方法都有兩個帶有Async後綴的附加修飾。這些方法表示用於異步線程。
沒有Async後綴的方法使用調用線程運行下一個執行線程階段。不帶Async方法使用ForkJoinPool.commonPool()線程池的fork / join實現運算任務。帶有Async方法使用傳遞式的Executor任務去運行。
下面附帶一個案例,能夠看到有thenApplyAsync方法。在程序內部,線程被包裝到ForkJoinTask實例中。這樣能夠進一步並行化你的計算並更有效地使用系統資源。
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<String> future = completableFuture .thenApplyAsync(s -> s + " World"); assertEquals("Hello World", future.get());
在Java 9中, CompletableFuture API經過如下更改獲得了進一步加強:
引入了新的實例API:
還有一些靜態實用方法:
最後,爲了解決超時問題,Java 9又引入了兩個新功能:
在本文中,咱們描述了CompletableFuture類的方法和典型用例。