自Java 5開始添加了Future
,用來描述一個異步計算的結果。獲取一個結果時方法較少,要麼經過輪詢isDone()
,確認完成後調用get()
獲取值,要麼調用get()
設置一個超時時間。可是get()
方法會阻塞調用線程,這種阻塞的方式顯然和咱們的異步編程的初衷相違背。如:html
package com.common.future; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class FutureRunnerTest { public static void main(String[] args) throws InterruptedException { ExecutorService es = Executors.newSingleThreadExecutor(); //提交一個 Callable Future<Integer> f = es.submit(() -> { // 長時間的異步計算 Thread.sleep(2000L); System.out.println("長時間的異步計算"); return 100; }); // 輪詢 while (true) { System.out.println("f.isDone"); if (f.isDone()) { try { System.out.println(f.get()); es.shutdown(); break; } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } Thread.sleep(100L); } } }
雖然Future
以及相關使用方法提供了異步執行任務的能力,可是對於結果的獲取倒是很不方便,只能經過阻塞或者輪詢的方式獲得任務的結果。阻塞的方式顯然和咱們的異步編程的初衷相違背,輪詢的方式又會耗費無謂的CPU資源,並且也不能及時地獲得計算結果。java
get()
方法通知你結果。你沒法給 Future 植入一個回調函數,當 Future 結果可用的時候,用該回調函數自動的調用 Future 的結果。CompletableFuture 實現了 Future
和 CompletionStage
接口,而且提供了許多關於建立,鏈式調用和組合多個 Future 的便利方法集,並且有普遍的異常處理支持。web
在該類中提供了四個靜態方法建立CompletableFuture對象:編程
runAsync()
運行異步計算supplyAsync()
運行一個異步任務而且返回結果package com.common.future; import java.util.concurrent.*; public class CompletableFutureRunnerTest { // 建立一個固定大小的線程池子 static ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() { int count = 1; @Override public Thread newThread(Runnable runnable) { return new Thread(runnable, "custom-executor-" + count++); } }); public static void sleep() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws ExecutionException, InterruptedException { // 直接使用new 建立 CompletableFuture newCompletable = new CompletableFuture(); // 經過給定的值 建立一個 已經完成的 future CompletableFuture<String> cf = CompletableFuture.completedFuture("message"); System.out.println(cf.isDone()); System.out.println(cf.getNow(null)); // 使用 runAsync() 運行異步計算 CompletableFuture<Void> completableFuture01 = CompletableFuture.runAsync(() -> { sleep(); System.out.println("runAsync..."); }); CompletableFuture<Void> completableFuture02 = CompletableFuture.runAsync(() -> { sleep(); System.out.println("runAsync..."); }, executor); // 使用 supplyAsync() 運行一個異步任務而且返回結果 CompletableFuture<String> completableFuture03 = CompletableFuture.supplyAsync(() -> { sleep(); System.out.println("supplyAsync03..."); return "hello world"; }); System.out.println(completableFuture03.isDone()); // Block and wait for the future to complete System.out.println(completableFuture03.get()); CompletableFuture<String> completableFuture04 = CompletableFuture.supplyAsync(() -> { sleep(); System.out.println("supplyAsync04..."); return "hello world"; }, executor); System.out.println(completableFuture04.isDone()); System.out.println(completableFuture04.get()); } }
上面示例中的isDone() ,get() 方法都是 繼承於 Future 接口中的,通常狀況下,使用CompletableFuture 不須要調用isDone() 方法判斷是否完成,也不須要調用get 方法獲取異步執行的結果的。緩存
當CompletableFuture的計算結果完成時,有以下三個方法進行處理:異步
CompletableFuture<String> completableFuture03 = CompletableFuture.supplyAsync(() -> { sleep(); System.out.println("supplyAsync03..."); return "hello world"; }); System.out.println(completableFuture03.isDone()); System.out.println(completableFuture03.get()); completableFuture03.whenComplete((t, ex) -> { if (t != null) { System.out.println(t); } else { ex.printStackTrace(); } }); completableFuture03.whenCompleteAsync((t, ex) -> { if (t != null) { System.out.println(t); } else { ex.printStackTrace(); } }); completableFuture03.whenCompleteAsync((t, ex) -> { if (t != null) { System.out.println(t); } else { ex.printStackTrace(); } }, executor);
若是拋出了異常,對異常的處理以下所示,ide
CompletableFuture<Integer> completableFuture05 = CompletableFuture.supplyAsync(() -> { sleep(); return 1 / 0; }); completableFuture05.whenComplete((t, ex) -> { if (t != null) { System.out.println(t); } else { ex.printStackTrace(); } });
這裏首先判斷 t 的值是否爲空,若是爲空直接打印異常的堆棧信息異步編程
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592) at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Caused by: java.lang.ArithmeticException: / by zero at com.common.future.CompletableFutureRunnerTest.lambda$main$4(CompletableFutureRunnerTest.java:93) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) ... 5 more
還有一個專門處理異常狀況的方法,如 exceptionally,函數
CompletableFuture<Integer> completableFuture05 = CompletableFuture.supplyAsync(() -> { sleep(); return 1 / 0; }); completableFuture05.exceptionally((e) -> { e.printStackTrace(); return 0; }).whenComplete((t, e) -> { if (t != null) { System.out.println(t); } else { e.printStackTrace(); } }).join();
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn); public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn); public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);
這一組函數的功能是當原來的CompletableFuture計算完後,將結果傳遞給函數fn
,將fn
的結果做爲新的CompletableFuture
計算結果。所以它的功能至關於將CompletableFuture<T>
轉換成CompletableFuture<U>
。網站
不以Async
結尾的方法由原來的線程計算,以Async
結尾的方法由默認的線程池ForkJoinPool.commonPool()
或者指定的線程池executor
運行。看下面的例子,
package com.common.future; import java.util.concurrent.CompletableFuture; public class ThenApplyTest { public static void main(String[] args) { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100); String f = future.thenApplyAsync(i -> i * 10).thenApply(i -> i.toString()).join(); System.out.println(f); CompletableFuture.supplyAsync(() -> "hello world".substring(0, 5)) .thenApply(String::length) .whenComplete((r, t) -> { if (t == null) { System.out.println("the length = " + r); } }); } }
若是你不想從你的回調函數中返回任何東西,僅僅想在Future完成後運行一些代碼片斷,你可使用thenAccept()
和 thenRun()
方法,這些方法常常在調用鏈的最末端的最後一個回調函數中使用。
public CompletableFuture<Void> thenAccept(Consumer<? super T> action); public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action); public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor); public CompletableFuture<Void> thenRun(Runnable action); public CompletableFuture<Void> thenRunAsync(Runnable action); public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor);
CompletableFuture.thenAccept()
持有一個Consumer<T>
,返回一個CompletableFuture<Void>
。它能夠訪問CompletableFuture
的結果:
// thenAccept() example CompletableFuture.supplyAsync(() -> { return ProductService.getProductDetail(productId); }).thenAccept(product -> { System.out.println("Got product detail from remote service " + product.getName()) });
thenAcceptBoth
以及相關方法提供了相似的功能,當兩個CompletionStage都正常完成計算的時候,就會執行提供的action,它用來組合另一個異步的結果。
//thenAccept public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action); public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action); public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor);
以下面的使用示例,
CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } return "first"; }).thenAcceptBoth(CompletableFuture.completedFuture("second"), (first, second) -> System.out.println(first + " : " + second)).join();
雖然thenAccept()
能夠訪問CompletableFuture的結果,但thenRun()
不能訪Future的結果,它持有一個Runnable返回CompletableFuture<Void>:
// thenRun() example CompletableFuture.supplyAsync(() -> { // Run some computation }).thenRun(() -> { // Computation Finished. });
假設你想從一個遠程API中獲取一個用戶的詳細信息,一旦用戶信息可用,你想從另一個服務中獲取他的貸款利率。
考慮下如下兩個方法getUserDetail()
和getCreditRating()
的實現:
static CompletableFuture<User> getUsersDetail(String userId) { return CompletableFuture.supplyAsync(() -> { return new User("12312121", "xiaoming", 12); }); } static CompletableFuture<Double> getCreditRating(User user) { return CompletableFuture.supplyAsync(() -> { if (user.getUserId() == null || user.getUserId().equals("")) { return 0.0; } else { return 0.1; } }); }
如今讓咱們弄明白當使用了thenApply()
後是否會達到咱們指望的結果-
// 若是使用 thenApply ,則返回了一個 最終結果是一個嵌套的CompletableFuture。 CompletableFuture<CompletableFuture<Double>> res = getUsersDetail("testUserId").thenApply(user -> getCreditRating(user));
在更早的示例中,Supplier
函數傳入thenApply
將返回一個簡單的值,可是在本例中,將返回一個CompletableFuture。以上示例的最終結果是一個嵌套的CompletableFuture。
若是你想獲取最終的結果給最頂層future,使用 thenCompose()
方法代替-
// 若是你想獲取最終的結果給最頂層future,使用 thenCompose()方法代替- CompletableFuture<Double> result = getUsersDetail("testUserId") .thenCompose(user -> getCreditRating(user));
所以,規則就是-若是你的回調函數返回一個CompletableFuture,可是你想從CompletableFuture鏈中獲取一個直接合並後的結果,這時候你可使用thenCompose()
。
雖然thenCompose()
被用於當一個future依賴另一個future的時候用來組合兩個future。thenCombine()
被用來當兩個獨立的Future
都完成的時候,用來作一些事情。
System.out.println("Retrieving weight."); CompletableFuture<Double> weightInKgFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return 65.0; }); System.out.println("Retrieving height."); CompletableFuture<Double> heightInCmFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return 177.8; }); System.out.println("Calculating BMI."); CompletableFuture<Double> combinedFuture = weightInKgFuture .thenCombine(heightInCmFuture, (weightInKg, heightInCm) -> { Double heightInMeter = heightInCm / 100; return weightInKg / (heightInMeter * heightInMeter); }); System.out.println("Your BMI is - " + combinedFuture.get());
當兩個Future都完成的時候,傳給``thenCombine()的回調函數將被調用。
咱們使用thenCompose()
和 thenCombine()
把兩個CompletableFuture組合在一塊兒。如今若是你想組合任意數量的CompletableFuture,應該怎麼作?咱們可使用如下兩個方法組合任意數量的CompletableFuture。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs); public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);
CompletableFuture.allOf
的使用場景是當你有一個列表的獨立future,而且你想在它們都完成後並行的作一些事情。
假設你想下載一個網站的100個不一樣的頁面。你能夠串行的作這個操做,可是這很是消耗時間。所以你想寫一個函數,傳入一個頁面連接,返回一個CompletableFuture,異步的下載頁面內容。
public static CompletableFuture<String> downloadWebPage(String pageLink) { return CompletableFuture.supplyAsync(() -> { // Code to download and return the web page's content try { String html = Jsoup.connect(pageLink).execute().body(); return html; } catch (IOException e) { e.printStackTrace(); } return ""; }); }
如今,當全部的頁面已經下載完畢,你想計算包含關鍵字CompletableFuture
頁面的數量。可使用CompletableFuture.allOf()
達成目的。
List<String> webPageLinks = Arrays.asList("https://my.oschina.net/xinxingegeya/blog/674006", "https://my.oschina.net/xinxingegeya/blog/637773", "https://my.oschina.net/xinxingegeya/blog/2222079"); List<CompletableFuture<String>> pageContentFutures = webPageLinks.stream() .map(webPageLink -> downloadWebPage(webPageLink)) .collect(Collectors.toList()); // Create a combined Future using allOf() CompletableFuture<Void> allFutures = CompletableFuture.allOf(pageContentFutures.toArray(new CompletableFuture[0]));
使用CompletableFuture.allOf()
的問題是它返回CompletableFuture<Void>。可是咱們能夠經過寫一些額外的代碼來獲取全部封裝的CompletableFuture結果。
// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list - CompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v -> { return pageContentFutures.stream() .map(pageContentFuture -> pageContentFuture.join()) .collect(Collectors.toList()); });
花一些時間理解下以上代碼片斷。當全部future完成的時候,咱們調用了future.join()
,所以咱們不會在任何地方阻塞。
join()
方法和get()
方法很是相似,這惟一不一樣的地方是若是最頂層的CompletableFuture完成的時候發生了異常,它會拋出一個未經檢查的異常。
如今讓咱們計算包含關鍵字頁面的數量。
// Count the number of web pages having the "CompletableFuture" keyword. CompletableFuture<Long> countFuture = allPageContentsFuture.thenApply(pageContents -> { return pageContents.stream() .filter(pageContent -> pageContent.contains("CompletableFuture")) .count(); }); System.out.println("Number of Web Pages having CompletableFuture keyword - " + countFuture.get());
完整的代碼以下,
package common.future; import org.jsoup.Jsoup; import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; public class MoreCompletableFutureTest { public static CompletableFuture<String> downloadWebPage(String pageLink) { return CompletableFuture.supplyAsync(() -> { // Code to download and return the web page's content try { String html = Jsoup.connect(pageLink).execute().body(); return html; } catch (IOException e) { e.printStackTrace(); } return ""; }); } public static void main(String[] args) throws ExecutionException, InterruptedException { List<String> webPageLinks = Arrays.asList("https://my.oschina.net/xinxingegeya/blog/674006", "https://my.oschina.net/xinxingegeya/blog/637773", "https://my.oschina.net/xinxingegeya/blog/2222079"); List<CompletableFuture<String>> pageContentFutures = webPageLinks.stream() .map(webPageLink -> downloadWebPage(webPageLink)) .collect(Collectors.toList()); // Create a combined Future using allOf() CompletableFuture<Void> allFutures = CompletableFuture.allOf(pageContentFutures.toArray(new CompletableFuture[0])); // When all the Futures are completed, call `future.join()` to get their results and collect the results in a list - CompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v -> { return pageContentFutures.stream() .map(pageContentFuture -> pageContentFuture.join()) .collect(Collectors.toList()); }); // Count the number of web pages having the "CompletableFuture" keyword. CompletableFuture<Long> countFuture = allPageContentsFuture.thenApply(pageContents -> { return pageContents.stream() .filter(pageContent -> pageContent.contains("CompletableFuture")) .count(); }); System.out.println("Number of Web Pages having CompletableFuture keyword - " + countFuture.get()); } }
CompletableFuture.anyOf()
和其名字介紹的同樣,當任何一個CompletableFuture完成的時候【相同的結果類型】,返回一個新的CompletableFuture。如下示例:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Result of Future 1"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Result of Future 2"; }); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Result of Future 3"; }); CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3); System.out.println(anyOfFuture.get()); // Result of Future 2
在以上示例中,當三個中的任何一個CompletableFuture完成, anyOfFuture
就會完成。由於future2
的休眠時間最少,所以她最早完成,最終的結果將是future2
的結果。
CompletableFuture.anyOf()
傳入一個Future可變參數,返回CompletableFuture<Object>。CompletableFuture.anyOf()
的問題是若是你的CompletableFuture返回的結果是不一樣類型的,這時候你講會不知道你最終CompletableFuture是什麼類型。
======END======