爲了防止無良網站的爬蟲抓取文章,特此標識,轉載請註明文章出處。LaplaceDemon/ShiJiaqi。html
http://www.javashuo.com/article/p-bffkjxau-ba.htmljava
Future基礎知識:Java併發(6)帶返回結果的任務執行api
Guava的Future:Guava Future併發
Netty的Future:Netty Future與Promiseoracle
CompletableFuture是JDK8提供的Future加強類。CompletableFuture異步任務執行線程池,默認是把異步任務都放在ForkJoinPool中執行。app
官方文檔:dom
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html異步
https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/CompletableFuture.html函數
CompletableFuture接口提供了很是多的方法用於編排異步任務基本每一個方法都有兩套方法,Async版本的函數與非Async版本的函數。post
若方法不以Async結尾,意味着Action使用相同的線程執行,而Async可能會使用其它的線程去執行(若是使用相同的線程池,也可能會被同一個線程選中執行)。
public static <U> CompletableFuture<U> completedFuture(U value); // 執行異步任務 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier); public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor); // 執行異步任務 public static CompletableFuture<Void> runAsync(Runnable runnable); public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
例:建立一個已經有結果值的CompletableFuture。
// 建立 CompletableFuture<String> future = CompletableFuture.completedFuture("a future value");
例:異步執行帶返回值的異步任務。
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{ System.out.println("帶有返回值的異步任務"); return "a future value"; });
例:異步執行不帶返回值的異步任務
CompletableFuture<Void> future = CompletableFuture.runAsync(()->{ System.out.println("不帶返回值的異步任務"); });
public T get() throws InterruptedException, ExecutionException; public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; public T getNow(T valueIfAbsent); public T join();
說明:
get方法:阻塞獲取CompletableFuture的結果值,另外能夠設置該方法的阻塞時間。
getNow方法:若是結果已經計算完則返回結果或者拋出異常,不然返回給定的valueIfAbsent值。
join方法:返回計算的結果或者拋出一個unchecked異常(CompletionException)。
例:獲取Future的結果值。
// 使用get { CompletableFuture<String> future = CompletableFuture.completedFuture("a future value"); String string = future.get(); System.out.println(string); } // 使用join { CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{ return "haha"; }); String join = future.join(); System.out.println(join); }
完成完一個任務後繼續執行一個異步任務。
// thenRun 處理 Runnable public CompletableFuture<Void> thenRun(Runnable action); public CompletableFuture<Void> thenRunAsync(Runnable action); public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor); // thenAccept 處理 Consumer public CompletableFuture<Void> thenAccept(Consumer<? super T> action); public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action); public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor); // thenApply 處理 Function public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn); public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn); public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor); // handle 處理 BiFunction public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn); public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn); public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor);
例:thenRun方法。執行異步任務,執行完後再接一個異步任務。
CompletableFuture<Void> future = CompletableFuture.runAsync(()->{ System.out.println("不帶返回值的異步任務"); }).thenRun(()->{ System.out.println("前一個future後,再異步執行任務。"); });
例:執行異步任務,並將結果給下一個異步任務,最後再返回結果值。
// 轉換 CompletableFuture<String> future0 = CompletableFuture.supplyAsync(()->{ return "a future value"; }).thenApplyAsync((String str)->{ return str.length(); }); // 返回值 Integer join = future1.join(); System.out.println(join);
例:執行異步任務,並將結果給下一個異步任務,最後不返回結果值。
// 消費 CompletableFuture<String> future0 = CompletableFuture.completedFuture("a future value"); CompletableFuture<Void> future1 = future0.thenAcceptAsync((String str)->{ System.out.println("沒有返回值。消費了字符串:" + str); }); future1.join();
接着上一個CompletableFuture的結果執行一個異步任務,最新的異步任務返回一個新的CompletableFuture。
具體方法:
public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn); public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn); public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor);
例:CompletableFuture後跟一個新的一步方法,產生新的CompletableFuture。
CompletableFuture<String> future = CompletableFuture .completedFuture(10) .thenComposeAsync((Integer x)->{ return CompletableFuture.supplyAsync(()-> x.toString()); // 新的ComplatableFuture } ); // 獲取結果 String r = future.join(); System.err.println(r); // 打印:hello10000
爲兩個CompletableFuture的結果值提供一個函數算子,將結果值計算出來。
[Task0] ---\ ==>(fn)-->[Task] [Task1] ---/
具體方法:
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn); public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn); public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor);
例:定義兩個Future組合起來的算子,並計算。
CompletableFuture<Integer> f0 = CompletableFuture.completedFuture(10000); CompletableFuture<String> f1 = CompletableFuture.completedFuture("hello"); CompletableFuture<String> future = f0.thenCombine(f1, (Integer i, String str)-> (str + i)); // 定義算子 // 獲取結果值 String r = future.join(); System.out.println(r); // 打印:hello10000
執行兩個異步任務,並將兩個任務的計算結果獲取後,再執行一個異步任務,最後再返回值。
將兩個future的結果值傳給第三個算子。
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action); public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action); public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor);
例:
CompletableFuture<Integer> f0 = CompletableFuture.supplyAsync(()->{ try { Thread.sleep(1000*5); } catch (InterruptedException e) { e.printStackTrace(); } return 2; }); CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(()->{ try { Thread.sleep(1000*2); } catch (InterruptedException e) { e.printStackTrace(); } return 3; }); // 提供一個異步算子。使用future的計算結果。 CompletableFuture<Void> f= f0.thenAcceptBothAsync(f1,(Integer x,Integer y) -> { System.out.println("兩個future都完成,才計算算子。"); System.out.println(x*y); }); f.join();
等待以前的兩個異步任務都結束,再執行Action。
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action);
例:
CompletableFuture<Integer> f0 = CompletableFuture.supplyAsync(()->{ try { Thread.sleep(1000*5); } catch (InterruptedException e) { e.printStackTrace(); } return 2; }); CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(()->{ try { Thread.sleep(1000*2); } catch (InterruptedException e) { e.printStackTrace(); } return 3; });// 提供一個異步算子。使用future的計算結果。 CompletableFuture<Void> f = f0.runAfterBothAsync(f1, ()->{ System.out.println("兩個future都完成,再執行該任務。"); }); f.join();
兩個異步任務,任意一個CompletableFuture獲取獲得結果值,則執行該方法指定的Runnable 任務。
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
例:
CompletableFuture<Integer> f0 = CompletableFuture.supplyAsync(()->{ try { Thread.sleep(1000*5); } catch (InterruptedException e) { e.printStackTrace(); } return 0; }); CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(()->{ try { Thread.sleep(1000*2); } catch (InterruptedException e) { e.printStackTrace(); } return 1; }); CompletableFuture<Void> f = f0.runAfterEither(f1, ()->{ System.out.println("有一個任務完成了"); }); f.join();
兩個異步任務,任意一個CompletableFuture獲取獲得結果值,則執行該方法指定的Function任務。
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn); public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn); public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor);
例:
CompletableFuture<Integer> f0 = CompletableFuture.supplyAsync(()->{ try { Thread.sleep(1000*5); } catch (InterruptedException e) { e.printStackTrace(); } return 0; }); CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(()->{ try { Thread.sleep(1000*2); } catch (InterruptedException e) { e.printStackTrace(); } return 1; }); CompletableFuture<String> f = f0.applyToEither(f1, (Integer i)-> "task:" + i); String r = f.join(); System.out.println(r); // 打印:task:1
兩個異步任務,任意一個CompletableFuture獲取獲得結果值,則執行該方法指定的Consumer任務。
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action); public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action); public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor);
例:
CompletableFuture<Integer> f0 = CompletableFuture.supplyAsync(()->{ try { Thread.sleep(1000*5); } catch (InterruptedException e) { e.printStackTrace(); } return 0; }); CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(()->{ try { Thread.sleep(1000*2); } catch (InterruptedException e) { e.printStackTrace(); } return 1; }); CompletableFuture<Void> f = f0.acceptEither(f1, (Integer i)->{ System.out.println("task:" + i); // 打印:task:1 }); f.join();
將多個CompletableFuture組合爲一個CompletableFuture,任意一個CompletableFuture有告終果,則該方法的返回值也會獲得結果。
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);
例:三個CompletableFuture執行完一個就獲得一個新的CompletableFuture。
CompletableFuture<Integer> f0 = CompletableFuture.supplyAsync(()->{ try { Thread.sleep(1000*1); } catch (InterruptedException e) { e.printStackTrace(); } return 0; }); CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(()->{ try { Thread.sleep(1000*2); } catch (InterruptedException e) { e.printStackTrace(); } return 1; }); CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(()->{ try { Thread.sleep(1000*3); } catch (InterruptedException e) { e.printStackTrace(); } return 2; }); long t0 = System.currentTimeMillis(); CompletableFuture<Object> f = CompletableFuture.anyOf(f0, f1, f2); Object r = f.join(); System.out.println(r); // 打印:0 long t1 = System.currentTimeMillis(); System.out.println((t1-t0)/1000); // 打印:1
將多個CompletableFuture組合爲一個CompletableFuture,全部CompletableFuture有告終果,則該方法的返回值也會獲得結果。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
例:三個CompletableFuture都執行完,就獲得一個新的CompletableFuture。
CompletableFuture<Integer> f0 = CompletableFuture.supplyAsync(()->{ try { Thread.sleep(1000*1); } catch (InterruptedException e) { e.printStackTrace(); } return 0; }); CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(()->{ try { Thread.sleep(1000*2); } catch (InterruptedException e) { e.printStackTrace(); } return 1; }); CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(()->{ try { Thread.sleep(1000*3); } catch (InterruptedException e) { e.printStackTrace(); } return 2; }); long t0 = System.currentTimeMillis(); CompletableFuture<Void> f = CompletableFuture.allOf(f0, f1, f2); f.join(); long t1 = System.currentTimeMillis(); System.out.println((t1-t0)/1000); // 打印:3
當一系列的任務計算結果完成或者拋出異常的時候,咱們能夠執行指定的任務。
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action); public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action); public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor);
例:任務完成後,執行最後的任務,而且能夠獲取最後任務以前的結果值。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{ try { Thread.sleep(5*1000); } catch (InterruptedException e) { throw new RuntimeException(e); } return new Random().nextInt(1000); }).whenComplete((Integer i, Throwable t)->{ System.out.println("任務結果值:" + i); }); // 阻塞 future.get();
使用CompletableFuture編排異步任務在處理異常的時候,有幾種方式:
1. 在異步任務中使用try...catch...處理異常。
2. 使用whenComplate方法接收異常。
3. 使用exceptionally方法接收異常。
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn);
例:
CompletableFuture<String> f = CompletableFuture.supplyAsync(()->{ return 100/0; }) .exceptionally(ex -> { ex.printStackTrace(); return 0; }).thenApply((Integer i)-> "run:" + i.toString()); String r = f.join(); System.out.println(r);
打印:
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(Unknown Source) at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source) at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) Caused by: java.lang.ArithmeticException: / by zero at test.java/test.TestCompletableFuture.lambda$0(TestCompletableFuture.java:292) ... 5 more run:0
爲了防止無良網站的爬蟲抓取文章,特此標識,轉載請註明文章出處。LaplaceDemon/ShiJiaqi。