瞭解CompletableFuture須要理解java8的函數式接口,不瞭解的同窗能夠先移步:http://www.javashuo.com/article/p-nswjqwms-kw.htmljava
//比較特殊,他入參就是返回值,也就是說他能夠用來執行須要其餘返回值的異步任務。 public static <U> CompletableFuture<U> completedFuture(U value) //無返回值,使用默認線程池 public static CompletableFuture<Void> runAsync(Runnable runnable) //無返回值,使用自定義線程池 public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) //有返回值,使用默認線程池 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) //有返回值,使用自定義線程池 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) // public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
舉例:app
//supplyAsync方法無入參,可是返回一個String對象。此方法使用了默認的線程池執行異步任務 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { //長時間的計算任務 return "·00"; });
下面將經過具體例子來展現各個方法的使用。dom
當原始的CompletableFuture任務執行完後,不論是否成功計算出結果,仍是拋出異常,都會會執行 whenComplete* 或 exceptionally 的方法中的任務。
該操做執行完畢後:異步
BiConsumer<T,U> 函數接口有兩個參數,無返回值。 Function<T,R> 函數接口有一個輸入參數,返回一個結果。 //無Async,同步處理正常計算結果或異常,使用執行任務的那個線程來執行該方法,因此這個方法是同步的。 public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) //有Async,異步處理正常計算結果或異常,使用執行任務的那個線程池中的線程來執行該方法!因此這個方法是異步的。 public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) //有Async,異步處理正常計算結果或異常,使用自定義線程池來執行該方法,因此這個方法是異步的。 public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? superThrowable> action, Executor executor) //處理異常。 public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
注意:當沒有異常拋出來的時候,上面的Throwable參數爲空!函數
計算邏輯: private static Random random = new Random(); private static long time = System.currentTimeMillis(); public static int getMoreData(){ System.out.println("begin to start compute"); try { Thread.sleep(10000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("end to compute, passed:" + System.currentTimeMillis()); return random.nextInt(1000); } public static int throwException(){ System.out.println("準備拋出異常"); try { Thread.sleep(10000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("拋了"); throw new RuntimeException("主動拋出異常"); }
whenComplete: //若是使用這段代碼,則會是和當前線程同步執行 public static void main(String[] args) throws Exception{ CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> getMoreData()); CompletableFuture<Integer> future2 = future.whenComplete((result, excetion) -> { System.out.println("執行到whenComplete了,result:" + result); System.out.println("執行到whenComplete了,exception:" + (excetion == null ? "無異常" : excetion.getClass())); }); System.out.println("執行到最後一段代碼了,future result:" + future.get()); System.out.println("執行到最後一段代碼了,future2 result:" + future2.get()); } > 打印執行結果: begin to start compute end to compute, passed:1551182552193 執行到whenComplete了,result:625 執行到whenComplete了,exception:無異常 執行到最後一段代碼了,future result:625 執行到最後一段代碼了,future2 result:625 >從打印結果可知,whenComplete使用原始的執行的任務的線程,因此能夠當作是同步執行的,而且新的CompletableFuture對象的結果和原始的一致
whenCompleteAsync: //若是使用這段代碼,則會是和當前線程同步執行 public static void main(String[] args) throws Exception{ CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> getMoreData()); future.whenCompleteAsync((result, exception) -> { System.out.println("計算已執行完畢,result:" + result); System.out.println("計算已執行完畢,exception:" + (excetion == null ? "無異常" : excetion.getClass())); }); System.out.println("執行到最後一段代碼了,result:" + future.get()); } > 打印執行結果: begin to start compute end to compute, passed:1551180611064 執行到最後一段代碼了,result:323 執行到whenComplete了,result:323 執行到whenComplete了,exception:無異常 >從打印結果可知,whenCompleteAsync是異步執行的
exceptionally比較複雜,須要經過4個實例才能真正明白:.net
exceptionally實例1: //這段代碼,因爲會拋出異常,會先走whenCompleteAsync,而後再走exceptionally,並且是沒法獲取到返回值的。 public static void main(String[] args) throws Exception{ CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> throwException()); future.whenCompleteAsync((result, exception) -> { System.out.println("計算已執行完畢,result:" + result); System.out.println("計算已執行完畢,exception:" + (exception == null ? "無異常" : exception.getClass())); }).exceptionally(exception -> { System.out.println("計算執行過程當中發生了異常,exception:" + exception.getClass()); return 0; }); System.out.println("執行到最後一段代碼了,future result:" + future.get()); } > 打印執行結果: 準備拋出異常 拋了 計算已執行完畢,result:null 計算已執行完畢,result:null 計算已執行完畢,exception:class java.util.concurrent.CompletionException 計算已執行完畢,exception:class java.util.concurrent.CompletionException 計算執行過程當中發生了異常,exception:class java.util.concurrent.CompletionException 計算執行過程當中發生了異常,exception:class java.util.concurrent.CompletionException Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: 主動拋出異常 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at me.ele.ecs.eapp.service.impl.Main.main(Main.java:69) Caused by: java.lang.RuntimeException: 主動拋出異常 at me.ele.ecs.eapp.service.impl.Main.throwException(Main.java:37) at me.ele.ecs.eapp.service.impl.Main.lambda$main$0(Main.java:44)
exceptionally實例2: //這裏的打印結果是和上面相似的,但是爲何此次要獲取新的CompletableFuture對象呢?看下面的exceptionally實例3後,再回來對比吧 public static void main(String[] args) throws Exception{ CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> throwException()); CompletableFuture<Integer> future2 = future.whenCompleteAsync((result, exception) -> { System.out.println("計算已執行完畢,result:" + result); System.out.println("計算已執行完畢,exception:" + (exception == null ? "無異常" : exception.getClass())); }); CompletableFuture<Integer> future3 = future2.exceptionally(exception -> { System.out.println("計算執行過程當中發生了異常,exception:" + exception.getClass()); return 0; }); System.out.println("執行到最後一段代碼了,future result:" + future.get()); //由於上面的執行過程當中,已經拋出了異常了,那麼下面的這兩段代碼是沒法執行到的, System.out.println("執行到最後一段代碼了,future2 result:" + future2.get()); System.out.println("執行到最後一段代碼了,future3 result:" + future3.get()); } > 打印執行結果: 準備拋出異常 拋了 計算已執行完畢,result:null 計算已執行完畢,exception:class java.util.concurrent.CompletionException 計算執行過程當中發生了異常,exception:class java.util.concurrent.CompletionException Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: 主動拋出異常 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at me.ele.ecs.eapp.service.impl.Main.main(Main.java:69) Caused by: java.lang.RuntimeException: 主動拋出異常 at me.ele.ecs.eapp.service.impl.Main.throwException(Main.java:37) at me.ele.ecs.eapp.service.impl.Main.lambda$main$0(Main.java:44)
exceptionally實例3: // public static void main(String[] args) throws Exception{ CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> throwException()); CompletableFuture<Integer> future2 = future.whenCompleteAsync((result, exception) -> { System.out.println("計算已執行完畢,result:" + result); System.out.println("計算已執行完畢,exception:" + (exception == null ? "無異常" : exception.getClass())); }); CompletableFuture<Integer> future3 = future2.exceptionally(exception -> { System.out.println("計算執行過程當中發生了異常,exception:" + exception.getClass()); //這裏的返回值實際其是沒有用處的。由於若是拋出了異常,future的get方法是執行不到的;而若是沒有拋出異常的話,仍是會返回原始的CompletableFuture的值的 //因此這個exceptionally就是僅僅用來處理異常的。 return 0; }); //System.out.println("執行到最後一段代碼了,future result:" + future.get()); //System.out.println("執行到最後一段代碼了,future2 result:" + future2.get()); //和上面實例2惟一的區別就是註釋掉了上面兩段代碼,可是執行結果卻不同了,並且整個main方法都沒有拋出來異常,緣由就在於future和future2是異步執行的,因此是在別的線程拋了異常,而main方法是不會拋出來的。並且在獲取future3的結果時,能夠發現,返回了future3對象自定義的返回值 System.out.println("執行到最後一段代碼了,future3 result:" + future3.get()); } > 打印執行結果: 準備拋出異常 拋了 計算已執行完畢,result:null 計算已執行完畢,exception:class java.util.concurrent.CompletionException 計算執行過程當中發生了異常,exception:class java.util.concurrent.CompletionException 執行到最後一段代碼了,future3 result:0
exceptionally實例4: public static void main(String[] args) throws Exception{ CompletableFuture<Integer> future = CompletableFuture.supplyAsync(Main::getMoreData); CompletableFuture<Integer> future2 = future.whenCompleteAsync((result, exception) -> { System.out.println("計算已執行完畢,result:" + result); System.out.println("計算已執行完畢,exception:" + (exception == null ? "無異常" : exception.getClass())); }); CompletableFuture<Integer> future3 = future2.exceptionally(exception -> { System.out.println("計算執行過程當中發生了異常,exception:" + exception.getClass()); return 0; }); System.out.println("執行到最後一段代碼了,future result:" + future.get()); System.out.println("執行到最後一段代碼了,future2 result:" + future2.get()); //原始的計算邏輯不變,exceptionally返回的新的CompletableFuture對象的結果和原始計算邏輯返回的結果一致。 System.out.println("執行到最後一段代碼了,future3 result:" + future3.get()); } > 打印執行結果: begin to start compute end to compute, passed:1551239497158 getMoreData: 679 執行到最後一段代碼了,future result:679 計算已執行完畢,result:679 計算已執行完畢,exception:無異常 執行到最後一段代碼了,future2 result:679 執行到最後一段代碼了,future3 result:679
和 whenComplete* 方法同樣,都是在任務執行完後,執行該方法的邏輯,可是和 whenComplete* 不一樣的是:
該操做執行完畢後,它返回的新CompletableFuture對象的計算結果是handle*方法的返回值,並非原始計算邏輯的返回值線程
//同步 public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn) //異步,使用原始CompletableFuture的線程 public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn) //異步,使用自定義線程池的線程 public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)
實例: public static void main(String[] args) throws Exception{ CompletableFuture<Integer> future = CompletableFuture.supplyAsync(Main::getMoreData); CompletableFuture<Integer> future2 = future.handleAsync((result, exception) -> { System.out.println("計算已執行完畢,result:" + result); System.out.println("計算已執行完畢,exception:" + (exception == null ? "無異常" : exception.getClass())); return result + 1; }); System.out.println("執行到最後一段代碼了,future result:" + future.get()); System.out.println("執行到最後一段代碼了,future2 result:" + future2.get()); } > 打印執行結果: begin to start compute end to compute, passed:1551243326193 getMoreData: 395 執行到最後一段代碼了,future result:395 計算已執行完畢,result:395 計算已執行完畢,exception:無異常 執行到最後一段代碼了,future2 result:396
thenApply* 能夠鏈接多個CompletableFuture對象,至關於將一個一個的CompletableFuture串聯起來了,第一個CompletableFuture對象的結果會傳遞到下一個對象中,而且下一個CompletableFuture對象的結算結果會做爲上一個對象的CompletableFuture結果,依次類推,也就是說會改變原始CompletableFuture對象的結果。
注:它和 handle 方法有點相似,都會拿到上一個CompletableFuture對象的結果進行計算,可是區別就是thenApply 會改變原始對象的計算結果,而 handle* 並不會**。code
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { return 100; }); //因爲這裏同時鏈接了多個thenApplyAsync,第一個是異步的,第二個是同步的,而且都沒有處理異常,因此異常會直接在執行計算的線程上拋出來。 CompletableFuture<String> f = future.thenApplyAsync(i -> i * 10).thenApply(i -> i.toString()); System.out.println(f.get()); //"1000" }
thenAccept* 返回的新的CompletableFuture對象不返回結果,若是使用get方法,會返回一個null。對象
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(Main::getMoreData); CompletableFuture<Void> future2 = future.thenAccept(result -> { System.out.println("執行到thenAccept了, result:" + result); }); System.out.println("執行到最後一段代碼了,future result:" + future.get()); System.out.println("執行到最後一段代碼了,future2 result: " + future2.get()); } > 打印執行結果: begin to start compute end to compute, passed:1551341977684 getMoreData: 171 執行到thenAccept了, result:171 執行到最後一段代碼了,future result:171 執行到最後一段代碼了,future2 result: null
它和 thenAccept 同樣,都是純消費,可是thenAccept*只能消費一個CompletableFuture對象,而thenAcceptBoth* 能在兩個不一樣的CompletableFuture對象執行完成後,消費他們兩個的計算結果。
並且他僅僅在原始的兩個CompletableFuture對象都計算成功以後,纔開始執行。blog
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor) //runAfterBoth和上面三個的區別就是它不消費原始的CompletableFuture結果 public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(Main::getMoreData); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(Main::getMoreData); future.thenAcceptBothAsync(future2, (x, y) -> { System.out.println("future1 和 future都執行完成了,結果分別是:" + x + "," + y); }); System.out.println("執行到最後一段代碼了,future result:" + future.get()); System.out.println("執行到最後一段代碼了,future2 result: " + future2.get()); } > 打印執行結果: begin to start compute begin to start compute end to compute, passed:1551342475808 getMoreData: 920 執行到最後一段代碼了,future result:920 end to compute, passed:1551342475811 getMoreData: 747 執行到最後一段代碼了,future2 result: 747 future1 和 future都執行完成了,結果分別是:920,747
在原始CompletableFuture執行任務結束後,並且執行指定的任務,不消費,也不產生結果。
public CompletableFuture<Void> thenRun(Runnable action) public CompletableFuture<Void> thenRunAsync(Runnable action) public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(Main::getMoreData); CompletableFuture<Void> future2 = future.thenRunAsync(() -> { System.out.println("future執行完成了"); }); System.out.println("執行到最後一段代碼了,future result:" + future.get()); System.out.println("執行到最後一段代碼了,future2 result:" + future2.get()); } > 打印執行結果: begin to start compute end to compute, passed:1551344347162 getMoreData: 688 執行到最後一段代碼了,future result:688 future執行完成了 執行到最後一段代碼了,future2 result:null
runAfterBoth是當兩個CompletableFuture都計算完成後才執行。
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
runAfterBoth是當兩個CompletableFuture都計算完成後才執行。而 acceptEither* 沒有返回值。
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn) public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn) public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
若是你用過Guava的Future類,你就會知道它的Futures輔助類提供了不少便利方法,用來處理多個Future,而不像Java的CompletableFuture,只提供了allOf、anyOf兩個方法。 好比有這樣一個需求,將多個CompletableFuture組合成一個CompletableFuture,這個組合後的CompletableFuture的計算結果是個List,它包含前面全部的CompletableFuture的計算結果,guava的Futures.allAsList能夠實現這樣的功能,可是對於java CompletableFuture,咱們須要一些輔助方法:
public static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) { CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); return allDoneFuture.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.<T>toList())); } public static <T> CompletableFuture<Stream<T>> sequence(Stream<CompletableFuture<T>> futures) { List<CompletableFuture<T>> futureList = futures.filter(f -> f != null).collect(Collectors.toList()); return sequence(futureList); }
Java Future轉CompletableFuture:
public static <T> CompletableFuture<T> toCompletable(Future<T> future, Executor executor) { return CompletableFuture.supplyAsync(() -> { try { return future.get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } }, executor); }
本文大量參考了鳥窩的文章,本人只是將實例便於理解。
地址:https://colobu.com/2016/02/29/Java-CompletableFuture/