這篇文章詳細講解java8中CompletableFuture的特性,方法以及實例.java
在java8之前,咱們使用java的多線程編程,通常是經過Runnable中的run方法來完成,這種方式,有個很明顯的缺點,就是,沒有返回值,這時候,你們可能會去嘗試使用Callable中的call方法,而後用Future返回結果,以下:編程
public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newSingleThreadExecutor(); Future<String> stringFuture = executor.submit(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(2000); return "async thread"; } }); Thread.sleep(1000); System.out.println("main thread"); System.out.println(stringFuture.get()); }
經過觀察控制檯,咱們發現先打印 main thread ,一秒後打印 async thread,彷佛能知足咱們的需求,但仔細想咱們發現一個問題,當調用future的get()方法時,當前主線程是堵塞的,這好像並非咱們想看到的,另外一種獲取返回結果的方式是先輪詢,能夠調用isDone,等完成再獲取,但這也不能讓咱們滿意.多線程
無論怎麼看,這種用法看起來並不優雅,起碼從視覺上就有些醜陋,並且某些場景沒法使用,好比說,app
1.不少個異步線程執行時間可能不一致,個人主線程業務不能一直等着,這時候我可能會想要只等最快的線程執行完或者最重要的那個任務執行完,亦或者我只等1秒鐘,至於沒返回結果的線程我就用默認值代替.dom
2.我兩個異步任務之間執行獨立,可是第二個依賴第一個的執行結果.異步
java8的CompletableFuture,就在這混亂且不完美的多線程江湖中閃亮登場了.CompletableFuture讓Future的功能和使用場景獲得極大的完善和擴展,提供了函數式編程能力,使代碼更加美觀優雅,並且能夠經過回調的方式計算處理結果,對異常處理也有了更好的處理手段.async
CompletableFuture源碼中有四個靜態方法用來執行異步任務:ide
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..} public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..} public static CompletableFuture<Void> runAsync(Runnable runnable){..} public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor){..}
若是有多線程的基礎知識,咱們很容易看出,run開頭的兩個方法,用於執行沒有返回值的任務,由於它的入參是Runnable對象,而supply開頭的方法顯然是執行有返回值的任務了,至於方法的入參,若是沒有傳入Executor對象將會使用ForkJoinPool.commonPool() 做爲它的線程池執行異步代碼.在實際使用中,通常咱們使用本身建立的線程池對象來做爲參數傳入使用,這樣速度會快些.函數式編程
執行異步任務的方式也很簡單,只須要使用上述方法就能夠了:函數
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { //....執行任務 return "hello";}, executor)
接下來看一下獲取執行結果的幾個方法.
V get(); V get(long timeout,Timeout unit);
T getNow(T defaultValue);
T join();
上面兩個方法是Future中的實現方式,get()會堵塞當前的線程,這就形成了一個問題,若是執行線程遲遲沒有返回數據,get()會一直等待下去,所以,第二個get()方法能夠設置等待的時間.
getNow()方法比較有意思,表示當有了返回結果時會返回結果,若是異步線程拋了異常會返回本身設置的默認值.
接下來以一些場景的實例來介紹一下CompletableFuture中其餘一些經常使用的方法.
public CompletionStage<Void> thenAccept(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
功能:當前任務正常完成之後執行,當前任務的執行結果能夠做爲下一任務的輸入參數,無返回值.
場景:執行任務A,同時異步執行任務B,待任務B正常返回以後,用B的返回值執行任務C,任務C無返回值
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "任務A"); CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> "任務B"); CompletableFuture<String> futureC = futureB.thenApply(b -> { System.out.println("執行任務C."); System.out.println("參數:" + b);//參數:任務B return "a"; });
public CompletionStage<Void> thenRun(Runnable action); public CompletionStage<Void> thenRunAsync(Runnable action); public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
功能:對不關心上一步的計算結果,執行下一個操做
場景:執行任務A,任務A執行完之後,執行任務B,任務B不接受任務A的返回值(無論A有沒有返回值),也無返回值
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "任務A");
futureA.thenRun(() -> System.out.println("執行任務B"));
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
功能:當前任務正常完成之後執行,當前任務的執行的結果會做爲下一任務的輸入參數,有返回值
場景:多個任務串聯執行,下一個任務的執行依賴上一個任務的結果,每一個任務都有輸入和輸出
實例1:異步執行任務A,當任務A完成時使用A的返回結果resultA做爲入參進行任務B的處理,可實現任意多個任務的串聯執行
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "hello"); CompletableFuture<String> futureB = futureA.thenApply(s->s + " world"); CompletableFuture<String> future3 = futureB.thenApply(String::toUpperCase); System.out.println(future3.join());
上面的代碼,咱們固然能夠先調用future.join()先獲得任務A的返回值,而後再拿返回值作入參去執行任務B,而thenApply的存在就在於幫我簡化了這一步,咱們沒必要由於等待一個計算完成而一直阻塞着調用線程,而是告訴CompletableFuture你啥時候執行完就啥時候進行下一步. 就把多個任務串聯起來了.
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
功能:結合兩個CompletionStage的結果,進行轉化後返回
場景:須要根據商品id查詢商品的當前價格,分兩步,查詢商品的原始價格和折扣,這兩個查詢相互獨立,當都查出來的時候用原始價格乘折扣,算出當前價格. 使用方法:thenCombine(..)
CompletableFuture<Double> futurePrice = CompletableFuture.supplyAsync(() -> 100d); CompletableFuture<Double> futureDiscount = CompletableFuture.supplyAsync(() -> 0.8); CompletableFuture<Double> futureResult = futurePrice.thenCombine(futureDiscount, (price, discount) -> price * discount); System.out.println("最終價格爲:" + futureResult.join()); //最終價格爲:80.0
thenCombine(..)是結合兩個任務的返回值進行轉化後再返回,那若是不須要返回呢,那就須要thenAcceptBoth(..),同理,若是連兩個任務的返回值也不關心呢,那就須要runAfterBoth了,若是理解了上面三個方法,thenApply,thenAccept,thenRun,這裏就不須要單獨再提這兩個方法了,只在這裏提一下.
public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn) public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn) public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)
功能:這個方法接收的輸入是當前的CompletableFuture的計算值,返回結果將是一個新的CompletableFuture
這個方法和thenApply很是像,都是接受上一個任務的結果做爲入參,執行本身的操做,而後返回.那具體有什麼區別呢?
thenApply():它的功能至關於將CompletableFuture<T>轉換成CompletableFuture<U>,改變的是同一個CompletableFuture中的泛型類型
thenCompose():用來鏈接兩個CompletableFuture,返回值是一個新的CompletableFuture
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "hello"); CompletableFuture<String> futureB = futureA.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "world")); CompletableFuture<String> future3 = futureB.thenCompose(s -> CompletableFuture.supplyAsync(s::toUpperCase)); System.out.println(future3.join());
這段代碼實現的和上面thenApply同樣的效果,在實際使用中,我並無很清楚兩個在使用上的區別,若是有大佬,跪求告知.
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn); public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn); public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
功能:執行兩個CompletionStage的結果,那個先執行完了,就是用哪一個的返回值進行下一步操做
場景:假設查詢商品a,有兩種方式,A和B,可是A和B的執行速度不同,咱們但願哪一個先返回就用那個的返回值.
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "經過方式A獲取商品a"; }); CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "經過方式B獲取商品a"; }); CompletableFuture<String> futureC = futureA.applyToEither(futureB, product -> "結果:" + product);
System.out.println(futureC.join()); //結果:經過方式A獲取商品a
一樣的道理,applyToEither的兄弟方法還有acceptEither(),runAfterEither(),我想不須要我解釋你也知道該怎麼用了.
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
功能:當運行出現異常時,調用該方法可進行一些補償操做,如設置默認值.
場景:異步執行任務A獲取結果,若是任務A執行過程當中拋出異常,則使用默認值100返回.
CompletableFuture<String> futureA = CompletableFuture. supplyAsync(() -> "執行結果:" + (100 / 0)) .thenApply(s -> "futureA result:" + s) .exceptionally(e -> { System.out.println(e.getMessage()); //java.lang.ArithmeticException: / by zero return "futureA result: 100"; }); CompletableFuture<String> futureB = CompletableFuture. supplyAsync(() -> "執行結果:" + 50) .thenApply(s -> "futureB result:" + s) .exceptionally(e -> "futureB result: 100"); System.out.println(futureA.join());//futureA result: 100 System.out.println(futureB.join());//futureB result:執行結果:50
上面代碼展現了正常流程和出現異常的狀況,能夠理解成catch,根據返回值能夠體會下.
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action); public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action); public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,Executor executor);
功能:當CompletableFuture的計算結果完成,或者拋出異常的時候,均可以進入whenComplete方法執行,舉個栗子
CompletableFuture<String> futureA = CompletableFuture. supplyAsync(() -> "執行結果:" + (100 / 0)) .thenApply(s -> "apply result:" + s) .whenComplete((s, e) -> { if (s != null) { System.out.println(s);//未執行 } if (e == null) { System.out.println(s);//未執行 } else { System.out.println(e.getMessage());//java.lang.ArithmeticException: / by zero } }) .exceptionally(e -> { System.out.println("ex"+e.getMessage()); //ex:java.lang.ArithmeticException: / by zero
return "futureA result: 100"; });
System.out.println(futureA.join());//futureA result: 100
根據控制檯,咱們能夠看出執行流程是這樣,supplyAsync->whenComplete->exceptionally,能夠看出並無進入thenApply執行,緣由也顯而易見,在supplyAsync中出現了異常,thenApply只有當正常返回時纔會去執行.而whenComplete不論是否正常執行,還要注意一點,whenComplete是沒有返回值的.
上面代碼咱們使用了函數式的編程風格而且先調用whenComplete再調用exceptionally,若是咱們先調用exceptionally,再調用whenComplete會發生什麼呢,咱們看一下:
CompletableFuture<String> futureA = CompletableFuture. supplyAsync(() -> "執行結果:" + (100 / 0)) .thenApply(s -> "apply result:" + s) .exceptionally(e -> { System.out.println("ex:"+e.getMessage()); //ex:java.lang.ArithmeticException: / by zero return "futureA result: 100"; }) .whenComplete((s, e) -> { if (e == null) { System.out.println(s);//futureA result: 100 } else { System.out.println(e.getMessage());//未執行 } }) ; System.out.println(futureA.join());//futureA result: 100
代碼先執行了exceptionally後執行whenComplete,能夠發現,因爲在exceptionally中對異常進行了處理,並返回了默認值,whenComplete中接收到的結果是一個正常的結果,被exceptionally美化過的結果,這一點須要留意一下.
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn); public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn); public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
功能:當CompletableFuture的計算結果完成,或者拋出異常的時候,能夠經過handle方法對結果進行處理
CompletableFuture<String> futureA = CompletableFuture. supplyAsync(() -> "執行結果:" + (100 / 0)) .thenApply(s -> "apply result:" + s) .exceptionally(e -> { System.out.println("ex:" + e.getMessage()); //java.lang.ArithmeticException: / by zero return "futureA result: 100"; }) .handle((s, e) -> { if (e == null) { System.out.println(s);//futureA result: 100 } else { System.out.println(e.getMessage());//未執行 } return "handle result:" + (s == null ? "500" : s); }); System.out.println(futureA.join());//handle result:futureA result: 100
經過控制檯,咱們能夠看出,最後打印的是handle result:futureA result: 100,執行exceptionally後對異常進行了"美化",返回了默認值,那麼handle獲得的就是一個正常的返回,咱們再試下,先調用handle再調用exceptionally的狀況.
CompletableFuture<String> futureA = CompletableFuture. supplyAsync(() -> "執行結果:" + (100 / 0)) .thenApply(s -> "apply result:" + s) .handle((s, e) -> { if (e == null) { System.out.println(s);//未執行 } else { System.out.println(e.getMessage());//java.lang.ArithmeticException: / by zero } return "handle result:" + (s == null ? "500" : s); }) .exceptionally(e -> { System.out.println("ex:" + e.getMessage()); //未執行 return "futureA result: 100"; }); System.out.println(futureA.join());//handle result:500
根據控制檯輸出,能夠看到先執行handle,打印了異常信息,並對接過設置了默認值500,exceptionally並無執行,由於它獲得的是handle返回給它的值,由此咱們大概推測handle和whenComplete的區別
1.都是對結果進行處理,handle有返回值,whenComplete沒有返回值
2.因爲1的存在,使得handle多了一個特性,可在handle裏實現exceptionally的功能
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
allOf:當全部的CompletableFuture
都執行完後執行計算
anyOf:最快的那個CompletableFuture執行完以後執行計算
場景二:查詢一個商品詳情,須要分別去查商品信息,賣家信息,庫存信息,訂單信息等,這些查詢相互獨立,在不一樣的服務上,假設每一個查詢都須要一到兩秒鐘,要求整體查詢時間小於2秒.
public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(4); long start = System.currentTimeMillis(); CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000 + RandomUtils.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } return "商品詳情"; },executorService); CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000 + RandomUtils.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } return "賣家信息"; },executorService); CompletableFuture<String> futureC = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000 + RandomUtils.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } return "庫存信息"; },executorService); CompletableFuture<String> futureD = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000 + RandomUtils.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } return "訂單信息"; },executorService); CompletableFuture<Void> allFuture = CompletableFuture.allOf(futureA, futureB, futureC, futureD); allFuture.join(); System.out.println(futureA.join() + futureB.join() + futureC.join() + futureD.join()); System.out.println("總耗時:" + (System.currentTimeMillis() - start)); }
參考資料:
https://colobu.com/2016/02/29/Java-CompletableFuture/#Either