在Markdown的語法中,<u>下劃線</u>
中的文字會被解析器加上下劃線,爲了避免影響閱讀,本文中JDK文檔涉及到<U>
都會替換爲<N>
,請各位注意。html
Java 1.8 新增長的 CompletableFuture 類內部是使用 ForkJoinPool 來實現的,CompletableFuture 實現了 Future接口 和 CompletionStage接口。java
在以前的Future的介紹和基本用法一文中,咱們瞭解到 Future 表示異步計算的結果。編程
CompletionStage 表明了一個特定的計算的階段,能夠同步或者異步的被完成。api
CompletionStage 能夠被看做一個計算流水線上的一個單元,最終會產生一個最終結果,這意味着幾個 CompletionStage 能夠串聯起來,一個完成的階段能夠觸發下一階段的執行,接着觸發下一次,直到完成全部階段。bash
Future是Java 5添加的類,用來描述一個異步計算的結果。你可使用isDone方法檢查計算是否完成,或者使用get阻塞住調用線程,直到計算完成返回結果,你也可使用cancel方法中止任務的執行。oracle
雖然Future以及相關使用方法提供了異步執行任務的能力,可是對於結果的獲取倒是很不方便,只能經過阻塞或者輪詢的方式獲得任務的結果。阻塞的方式顯然和咱們的異步編程的初衷相違背,輪詢的方式又會耗費無謂的CPU資源,並且也不能及時地獲得計算結果。異步
package net.ijiangtao.tech.concurrent.jsd.future.completable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/** * java5 future * * @author ijiangtao * @create 2019-07-22 9:40 **/
public class Java5Future {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//經過 while 循環等待異步計算處理成功
ExecutorService pool = Executors.newFixedThreadPool(10);
Future<Integer> f1 = pool.submit(() -> {
// 長時間的異步計算 ……
Thread.sleep(1);
// 而後返回結果
return 1001;
});
while (!f1.isDone())
System.out.println("is not done yet");
;
System.out.println("while isDone,result=" + f1.get());
//經過阻塞的方式等待異步處理成功
Future<Integer> f2 = pool.submit(() -> {
// 長時間的異步計算 ……
Thread.sleep(1);
// 而後返回結果
return 1002;
});
System.out.println("after blocking,result=" + f2.get());
}
}
複製代碼
前面咱們提到 CompletableFuture 爲咱們提供了異步計算的實現,而這些實現都是經過它的方法實現的。async
若是你打開它的文檔CompletableFuture-Java8Docs,你會發現CompletableFuture提供了將近60個方法。雖然方法不少,若是你仔細看的話,你會這些方法其中不少都是有類似性的。ide
只要你熟練掌握了這些方法,就可以駕輕就熟地使用 CompletableFuture 來進行異步計算了。異步編程
Java的CompletableFuture類老是遵循這樣的原則:
下面再也不一一贅述了。
返回值 | 方法名及參數 | 方法說明 |
---|---|---|
static CompletableFuture | supplyAsync(Supplier supplier) | Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() with the value obtained by calling the given Supplier. |
static CompletableFuture | supplyAsync(Supplier supplier, Executor executor) | Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor with the value obtained by calling the given Supplier. |
supplyAsync方法以Supplier函數式接口類型爲參數,CompletableFuture的計算結果類型爲U。由於該方法的參數類型都是函數式接口,因此可使用lambda表達式實現異步任務。後面講解其餘方法的時候,會舉例子。
runAsync方法也好理解,它以Runnable函數式接口類型爲參數,因此CompletableFuture的計算結果也爲空(Runnable的run方法返回值爲空)。這裏就再也不一一介紹了,感興趣的小夥伴能夠查看API文檔。
返回值 | 方法名及參數 | 方法說明 |
---|---|---|
T | get() | Waits if necessary for this future to complete, and then returns its result. |
T | get(long timeout, TimeUnit unit) | Waits if necessary for at most the given time for this future to complete, and then returns its result, if available. |
T | getNow(T valueIfAbsent) | Returns the result value (or throws any encountered exception) if completed, else returns the given valueIfAbsent. |
T | join() | Returns the result value when complete, or throws an (unchecked) exception if completed exceptionally. |
你能夠像使用Future同樣使用CompletableFuture,來進行阻塞式的計算(雖然不推薦使用)。其中getNow方法比較特殊:結果已經計算完則返回結果或者拋出異常,不然返回給定的valueIfAbsent值。
join和get方法均可以阻塞到計算完成而後得到返回結果,但二者的處理流程有所不一樣,能夠參考下面一段代碼來比較二者處理異常的不一樣之處:
public static void main(String[] args) {
try {
new CompletableFutureDemo().test2();
new CompletableFutureDemo().test3();
} catch (Exception e) {
e.printStackTrace();
}
}
public void test2() throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 1 / 0;
return 100;
});
future.join();
}
public void test3() throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 1 / 0;
return 100;
});
future.get();
}
複製代碼
輸出以下:
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.ArithmeticException: / by zero
at net.ijiangtao.tech.concurrent.jsd.future.completable.CompletableFutureDemo.lambda$test2$0(CompletableFutureDemo.java:32)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
... 5 more
複製代碼
返回值 | 方法名及參數 | 方法說明 |
---|---|---|
CompletableFuture | thenApply(Function<? super T,? extends U> fn) | Returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied function. |
CompletableFuture | thenApplyAsync(Function<? super T,? extends U> fn) | Returns a new CompletionStage that, when this stage completes normally, is executed using this stage's default asynchronous execution facility, with this stage's result as the argument to the supplied function. |
CompletableFuture | thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) | Returns a new CompletionStage that, when this stage completes normally, is executed using the supplied Executor, with this stage's result as the argument to the supplied function. |
使用CompletableFuture,咱們沒必要由於等待一個計算完成而阻塞着調用線程,而是告訴CompletableFuture當計算完成的時候請執行某個function。並且咱們還能夠將這些操做串聯起來,或者將CompletableFuture組合起來。
這一組函數的功能是當原來的CompletableFuture計算完後,將結果傳遞給函數fn,將fn的結果做爲新的CompletableFuture計算結果。所以它的功能至關於將CompletableFuture 轉換成 CompletableFuture 。
請看下面的例子:
public static void main(String[] args) throws Exception {
try {
// new CompletableFutureDemo().test1();
} catch (Exception e) {
e.printStackTrace();
}
try {
//new CompletableFutureDemo().test2();
//new CompletableFutureDemo().test3();
} catch (Exception e) {
e.printStackTrace();
}
new CompletableFutureDemo().test4();
}
public void test4() throws Exception {
// Create a CompletableFuture
CompletableFuture<Integer> calculateFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("1");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
System.out.println("2");
return 1 + 2;
});
// Attach a callback to the Future using thenApply()
CompletableFuture<String> resultFuture = calculateFuture.thenApply(number -> {
System.out.println("3");
return "1 + 2 is " + number;
});
// Block and get the result of the future.
System.out.println(resultFuture.get());
}
複製代碼
輸出結果爲:
1
2
3
1 + 2 is 3
複製代碼
返回值 | 方法名及參數 | 方法說明 |
---|---|---|
CompletableFuture | thenAccept(Consumer<? super T> action) | Returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied action. |
CompletableFuture | thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) | Returns a new CompletionStage that, when this and the other given stage both complete normally, is executed with the two results as arguments to the supplied action. |
篇幅緣由,Async和Executor方法再也不列舉。
只對結果執行Action,而不返回新的計算值,所以計算值爲Void。這就好像生產者生產了消息,消費者消費消息之後再也不進行消息的生產同樣,所以thenAccept是對計算結果的純消費。
例如以下方法:
public void test5() throws Exception {
// thenAccept() example
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
return "ijiangtao";
}).thenAccept(name -> {
System.out.println("Hi, " + name);
});
System.out.println(future.get());
}
複製代碼
thenAccept的get返回爲null:
Hi, ijiangtao
null
複製代碼
thenAcceptBoth能夠消費二者(生產和消費)的結果,下面提供一個例子:
public void test6() throws Exception {
CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "ijiangtao";
}), (s1, s2) -> {
System.out.println(s1 + " " + s2);
});
while (true){
}
}
複製代碼
輸出以下:
hello ijiangtao
複製代碼
返回值 | 方法名及參數 | 方法說明 |
---|---|---|
<U,V> CompletableFuture | thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) | Returns a new CompletionStage that, when this and the other given stage both complete normally, is executed with the two results as arguments to the supplied function. |
<U,V> CompletableFuture | thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) | Returns a new CompletionStage that, when this and the other given stage complete normally, is executed using this stage's default asynchronous execution facility, with the two results as arguments to the supplied function. |
從功能上來說, thenCombine的功能更相似thenAcceptBoth,只不過thenAcceptBoth是純消費,它的函數參數沒有返回值,而thenCombine的函數參數fn有返回值。
public void test7() throws Exception {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
return 1+2;
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "1+2 is";
});
CompletableFuture<String> f = future1.thenCombine(future2, (x, y) -> y + " " + x);
System.out.println(f.get()); // 輸出:1+2 is 3
}
複製代碼
返回值 | 方法名及參數 | 方法說明 |
---|---|---|
CompletableFuture | thenCompose(Function<? super T,? extends CompletionStage> fn) | Returns a new CompletionStage that, when this stage completes normally, is executed with this stage as the argument to the supplied function. |
因爲篇幅緣由,Async和Executor方法再也不列舉。
thenCompose方法接受一個Function做爲參數,這個Function的輸入是當前的CompletableFuture的計算值,返回結果將是一個新的CompletableFuture。
假如你須要將兩個CompletableFutures相互整合,若是使用thenApply,則結果會是嵌套的CompletableFuture:
CompletableFuture<String> getUsersName(Long id) {
return CompletableFuture.supplyAsync(() -> {
return "ijiangtao";
});
}
CompletableFuture<Integer> getUserAge(String userName) {
return CompletableFuture.supplyAsync(() -> {
return 20;
});
}
public void test8(Long id) throws Exception {
CompletableFuture<CompletableFuture<Integer>> result1 = getUsersName(id)
.thenApply(userName -> getUserAge(userName));
}
複製代碼
這時候可使用thenCompose來得到第二個計算的CompletableFuture:
public void test9(Long id) throws Exception {
CompletableFuture<Integer> result2 = getUsersName(id)
.thenCompose(userName -> getUserAge(userName));
}
複製代碼
返回值 | 方法名及參數 | 方法說明 |
---|---|---|
CompletableFuture | whenComplete(BiConsumer<? super T,? super Throwable> action) | Returns a new CompletionStage with the same result or exception as this stage, that executes the given action when this stage completes. |
CompletableFuture | whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) | Returns a new CompletionStage with the same result or exception as this stage, that executes the given action using this stage's default asynchronous execution facility when this stage completes. |
CompletableFuture | whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) | Returns a new CompletionStage with the same result or exception as this stage, that executes the given action using the supplied Executor when this stage completes. |
當CompletableFuture的計算結果完成,或者拋出異常的時候,咱們能夠執行特定的Action。whenComplete的參數Action的類型是BiConsumer<? super T,? super Throwable>,它能夠處理正常的計算結果,或者異常狀況。注意這幾個方法都會返回CompletableFuture,當Action執行完畢後它的結果返回原始的CompletableFuture的計算結果或者返回異常。
whenComplete方法不以Async結尾,意味着Action使用相同的線程執行,而以Async結尾的方法可能會使用其它的線程去執行(若是使用相同的線程池,也可能會被同一個線程選中執行)。
下面演示一下異常狀況:
public void test10() throws Exception {
String result = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (1 == 1) {
throw new RuntimeException("an RuntimeException");
}
return "s1";
}).whenComplete((s, t) -> {
System.out.println("whenComplete s:"+s);
System.out.println("whenComplete exception:"+t.getMessage());
}).exceptionally(e -> {
System.out.println("exceptionally exception:"+e.getMessage());
return "hello ijiangtao";
}).join();
System.out.println(result);
}
複製代碼
輸出:
whenComplete s:null
whenComplete exception:java.lang.RuntimeException: an RuntimeException
exceptionally exception:java.lang.RuntimeException: an RuntimeException
hello ijiangtao
複製代碼
Java5新增的Future類,能夠實現阻塞式的異步計算,但這種阻塞的方式顯然和咱們的異步編程的初衷相違背。爲了解決這個問題,JDK吸取了Guava的設計思想,加入了Future的諸多擴展功能造成了CompletableFuture。
本文重點介紹了CompletableFuture的不一樣類型的API,掌握了這些API對於使用非阻塞的函數式異步編程進行平常開發很是有幫助,同時也爲下面深刻了解異步編程的各類原理和特性打下了良好的基礎。
CompletableFuture - javase 8 docs
CompletableFuture - Guide To CompletableFuture
CompletableFuture - Java CompletableFuture Tutorial with Examples
CompletableFuture - Java 8: Writing asynchronous code with CompletableFuture
《寫給大忙人的JavaSE9核心技術》- 10.2 異步計算
CompletableFuture - 經過實例理解 JDK8 的 CompletableFuture
CompletableFuture - CompletableFuture 詳解
CompletableFuture - 使用 Java 8 的 CompletableFuture 實現函數式的回調
CompletableFuture - Java CompletableFuture 詳解
CompletableFuture - [譯]20個使用 Java CompletableFuture的例子