搞定 CompletableFuture,併發異步編程和編寫串行程序還有什麼區別?大家要的多圖長文

  • 你有一個思想,我有一個思想,咱們交換後,一我的就有兩個思想
  • If you can NOT explain it simply, you do NOT understand it well enough

前言

上一篇文章 不會用Java Future,我懷疑你泡茶沒我快 全面分析了 Future,經過它咱們能夠獲取線程的執行結果,它雖然解決了 Runnable 的 「三無」 短板,可是它自身仍是有短板:html

不能手動完成計算

假設你使用 Future 運行子線程調用遠程 API 來獲取某款產品的最新價格,服務器因爲洪災宕機了,此時若是你想手動結束計算,而是想返回上次緩存中的價格,這是 Future 作不到的前端

調用 get() 方法會阻塞程序

Future 不會通知你它的完成,它提供了一個get()方法,程序調用該方法會阻塞直到結果可用爲止,沒有辦法利用回調函數附加到Future,並在Future的結果可用時自動調用它java

不能鏈式執行

燒水泡茶中,經過構造函數傳參作到多個任務的鏈式執行,萬一有更多的任務,或是任務鏈的執行順序有變,對原有程序的影響都是很是大的react

整合多個 Future 執行結果方式笨重

假設有多個 Future 並行執行,須要在這些任務所有執行完成以後作後續操做,Future 自己是作不到的,須要藉助工具類 Executors 的方法程序員

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
沒有異常處理

Future 一樣沒有提供很好的異常處理方案編程

上一篇文章看 Future 以爲是發現了新天地,這麼一說有感受回到瞭解放前後端

對於 Java 後端的同窗,在 Java1.8 以前想實現異步編程,還想避開上述這些煩惱,ReactiveX 應該是一個常看法決方案(作Android 的應該會有了解)。若是熟悉前端同窗, ES6 Promise(男友的承諾)也解決了異步編程的煩惱緩存

天下語言都在彼此借鑑相應優勢,Java 做爲老牌勁旅天然也要解決上述問題。又是那個男人,併發大師 Doug Lea 憂天下程序員之憂,解天下程序員之困擾,在 Java1.8 版本(Lambda 橫空出世)中,新增了一個併發工具類 CompletableFuture,它的出現,讓人在泡茶過程當中,品嚐到了不同的味道......服務器

幾個重要 Lambda 函數

CompletableFuture 在 Java1.8 的版本中出現,天然也得搭上 Lambda 的順風車,爲了更好的理解 CompletableFuture,這裏我須要先介紹一下幾個 Lambda 函數,咱們只須要關注它們的如下幾點就能夠:多線程

  • 參數接受形式
  • 返回值形式
  • 函數名稱

Runnable

Runnable 咱們已經說過無數次了,無參數,無返回值

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

Function

Function<T, R> 接受一個參數,而且有返回值

@FunctionalInterface
public interface Function<T, R> {
    R apply(T t);
}

Consumer

Consumer<T> 接受一個參數,沒有返回值

@FunctionalInterface
public interface Consumer<T> {   
    void accept(T t);
}

Supplier

Supplier<T> 沒有參數,有一個返回值

@FunctionalInterface
public interface Supplier<T> {
    T get();
}

BiConsumer

BiConsumer<T, U> 接受兩個參數(Bi, 英文單詞詞根,表明兩個的意思),沒有返回值

@FunctionalInterface
public interface BiConsumer<T, U> {
    void accept(T t, U u);

好了,咱們作個小彙總

有些同窗可能有疑問,爲何要關注這幾個函數式接口,由於 CompletableFuture 的函數命名以及其做用都是和這幾個函數式接口高度相關的,一會你就會發現了

前戲作足,終於能夠進入正題了 CompletableFuture

CompletableFuture

類結構

老規矩,先從類結構看起:

實現了 Future 接口

實現了 Future 接口,那就具備 Future 接口的相關特性,請腦補 Future 那少的可憐的 5 個方法,這裏再也不贅述,具體請查看 不會用Java Future,我懷疑你泡茶沒我快

實現了 CompletionStage 接口

CompletionStage 這個接口仍是挺陌生的,中文直譯過來是【竣工階段】,若是將燒水泡茶比喻成一項大工程,他們的竣工階段體現是不同的

  1. 單看線程1 或單看線程 2 就是一種串行關係,作完一步以後作下一步
  2. 一塊兒看線程1 和 線程 2,它們彼此就是並行關係,兩個線程作的事彼此獨立互補干擾
  3. 泡茶就是線程1 和 線程 2 的彙總/組合,也就是線程 1 和 線程 2 都完成以後才能到這個階段(固然也存在線程1 或 線程 2 任意一個線程竣工就能夠開啓下一階段的場景)

因此,CompletionStage 接口的做用就作了這點事,全部函數都用於描述任務的時序關係,總結起來就是這個樣子:

CompletableFuture 既然實現了兩個接口,天然也就會實現相應的方法充分利用其接口特性,咱們走進它的方法來看一看

CompletableFuture 大約有50種不一樣處理串行,並行,組合以及處理錯誤的方法。小弟屏幕不爭氣,方法之多,一個屏幕裝不下,看到這麼多方法,是否是瞬間要直接 收藏——>吃灰 2連走人?別擔憂,咱們按照相應的命名和做用進行分類,分分鐘搞定50多種方法

串行關係

then 直譯【而後】,也就是表示下一步,因此一般是一種串行關係體現, then 後面的單詞(好比 run /apply/accept)就是上面說的函數式接口中的抽象方法名稱了,它的做用和那幾個函數式接口的做用是同樣同樣滴

CompletableFuture<Void> thenRun(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
  
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
  
CompletableFuture<Void> thenAccept(Consumer<? super T> action) 
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
  
<U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)  
<U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
<U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)

聚合 And 關係

combine... with...both...and... 都是要求二者都知足,也就是 and 的關係了

<U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
<U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
<U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

<U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
<U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
<U> CompletableFuture<Void> thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)
  
CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action)
CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)

聚合 Or 關係

Either...or... 表示二者中的一個,天然也就是 Or 的體現了

<U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
<U> CompletableFuture<U> applyToEitherAsync(、CompletionStage<? extends T> other, Function<? super T, U> fn)
<U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor)

CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)

CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)
CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action)
CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor)

異常處理

CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
CompletableFuture<T> exceptionallyAsync(Function<Throwable, ? extends T> fn)
CompletableFuture<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor executor)
        
CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
        
       
<U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
<U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
<U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

這個異常處理看着還挺嚇人的,拿傳統的 try/catch/finally 作個對比也就瞬間秒懂了

whenComplete 和 handle 的區別若是你看接受的參數函數式接口名稱你也就能看出差異了,前者使用Comsumer, 天然也就不會有返回值;後者使用 Function,天然也就會有返回值

這裏並無所有列舉,不過相信不少同窗已經發現了規律:

CompletableFuture 提供的全部回調方法都有兩個異步(Async)變體,都像這樣
// thenApply() 的變體
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
另外,方法的名稱也都與前戲中說的函數式接口徹底匹配,按照這中規律分類以後,這 50 多個方法看起來是否是很輕鬆了呢?

基本方法已經羅列的差很少了,接下來咱們經過一些例子來實際演示一下:

案例演示

建立一個 CompletableFuture 對象

建立一個 CompletableFuture 對象並無什麼稀奇的,依舊是經過構造函數構建

CompletableFuture<String> completableFuture = new CompletableFuture<String>();

這是最簡單的 CompletableFuture 對象建立方式,因爲它實現了 Future 接口,因此天然就能夠經過 get() 方法獲取結果

String result = completableFuture.get();

文章開頭已經說過,get()方法在任務結束以前將一直處在阻塞狀態,因爲上面建立的 Future 沒有返回,因此在這裏調用 get() 將會永久性的堵塞

這時就須要咱們調用 complete() 方法手動的結束一個 Future

completableFuture.complete("Future's Result Here Manually");

這時,全部等待這個 Future 的 client 都會返回手動結束的指定結果

runAsync

使用 runAsync 進行異步計算

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    System.out.println("運行在一個單獨的線程當中");
});

future.get();

因爲使用的是 Runnable 函數式表達式,天然也不會獲取到結果

supplyAsync

使用 runAsync 是沒有返回結果的,咱們想獲取異步計算的返回結果須要使用 supplyAsync() 方法

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            log.info("運行在一個單獨的線程當中");
            return "我有返回值";
        });

        log.info(future.get());

因爲使用的是 Supplier 函數式表達式,天然能夠得到返回結果

咱們已經屢次說過,get() 方法在Future 計算完成以前會一直處在 blocking 狀態下,對於真正的異步處理,咱們但願的是能夠經過傳入回調函數,在Future 結束時自動調用該回調函數,這樣,咱們就不用等待結果

CompletableFuture<String> comboText = CompletableFuture.supplyAsync(() -> {
          //能夠註釋掉作快速返回 start
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            log.info("👍");
          //能夠註釋掉作快速返回 end
            return "贊";
        })
                .thenApply(first -> {
                    log.info("在看");
                    return first + ", 在看";
                })
                .thenApply(second -> second + ", 轉發");

        log.info("三連有沒有?");
        log.info(comboText.get());

對 thenApply 的調用並無阻塞程序打印log,也就是前面說的經過回調通知機制, 這裏你看到 thenApply 使用的是supplyAsync所用的線程,若是將supplyAsync 作快速返回,咱們再來看一下運行結果:

thenApply 此時使用的是主線程,因此:

串行的後續操做並不必定會和前序操做使用同一個線程

thenAccept

若是你不想從回調函數中返回任何結果,那可使用 thenAccept

final CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(
                // 模擬遠端API調用,這裏只返回了一個構造的對象
                () -> Product.builder().id(12345L).name("頸椎/腰椎治療儀").build())
                .thenAccept(product -> {
                    log.info("獲取到遠程API產品名稱 " + product.getName());
                });
        voidCompletableFuture.get();

thenRun

thenAccept 能夠從回調函數中獲取前序執行的結果,但thenRun 卻不能夠,由於它的回調函數式表達式定義中沒有任何參數

CompletableFuture.supplyAsync(() -> {
    //前序操做
}).thenRun(() -> {
    //串行的後需操做,無參數也無返回值
});

咱們前面一樣說過了,每一個提供回調方法的函數都有兩個異步(Async)變體,異步就是另外起一個線程

CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
            log.info("前序操做");
            return "前需操做結果";
        }).thenApplyAsync(result -> {
            log.info("後續操做");
            return "後續操做結果";
        });

到這裏,相信你串行的操做你已經很是熟練了

thenCompose

平常的任務中,一般定義的方法都會返回 CompletableFuture 類型,這樣會給後續操做留有更多的餘地,假若有這樣的業務(X唄是否是都有這樣的業務呢?):

//獲取用戶信息詳情
    CompletableFuture<User> getUsersDetail(String userId) {
        return CompletableFuture.supplyAsync(() -> User.builder().id(12345L).name("日拱一兵").build());
    }

    //獲取用戶信用評級
    CompletableFuture<Double> getCreditRating(User user) {
        return CompletableFuture.supplyAsync(() -> CreditRating.builder().rating(7.5).build().getRating());
    }

這時,若是咱們仍是使用 thenApply() 方法來描述串行關係,返回的結果就會發生 CompletableFuture 的嵌套

CompletableFuture<CompletableFuture<Double>> result = completableFutureCompose.getUsersDetail(12345L)
                .thenApply(user -> completableFutureCompose.getCreditRating(user));

顯然這不是咱們想要的,若是想「拍平」 返回結果,thenCompose 方法就派上用場了

CompletableFuture<Double> result = completableFutureCompose.getUsersDetail(12345L)
                .thenCompose(user -> completableFutureCompose.getCreditRating(user));

這個和 Lambda 的map 和 flatMap 的道理是同樣同樣滴

thenCombine

若是要聚合兩個獨立 Future 的結果,那麼 thenCombine 就會派上用場了

CompletableFuture<Double> weightFuture = CompletableFuture.supplyAsync(() -> 65.0);
        CompletableFuture<Double> heightFuture = CompletableFuture.supplyAsync(() -> 183.8);
        
        CompletableFuture<Double> combinedFuture = weightFuture
                .thenCombine(heightFuture, (weight, height) -> {
                    Double heightInMeter = height/100;
                    return weight/(heightInMeter*heightInMeter);
                });

        log.info("身體BMI指標 - " + combinedFuture.get());

固然這裏多數時處理兩個 Future 的關係,若是超過兩個Future,如何處理他們的一些聚合關係呢?

allOf | anyOf

相信你看到方法的簽名,你已經明白他的用處了,這裏就再也不介紹了

static CompletableFuture<Void>     allOf(CompletableFuture<?>... cfs)
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

接下來就是異常的處理了

exceptionally

Integer age = -1;

        CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
            if( age < 0 ) {
                throw new IllegalArgumentException("何方神聖?");
            }
            if(age > 18) {
                return "你們都是成年人";
            } else {
                return "未成年禁止入內";
            }
        }).thenApply((str) -> {
            log.info("遊戲開始");
            return str;
        }).exceptionally(ex -> {
            log.info("必有蹊蹺,來者" + ex.getMessage());
            return "Unknown!";
        });

        log.info(maturityFuture.get());

exceptionally 就至關於 catch,出現異常,將會跳過 thenApply 的後續操做,直接捕獲異常,進行一場處理

handle

用多線程,良好的習慣是使用 try/finally 範式,handle 就能夠起到 finally 的做用,對上述程序作一個小小的更改, handle 接受兩個參數,一個是正常返回值,一個是異常

注意:handle的寫法也算是範式的一種
Integer age = -1;

        CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
            if( age < 0 ) {
                throw new IllegalArgumentException("何方神聖?");
            }
            if(age > 18) {
                return "你們都是成年人";
            } else {
                return "未成年禁止入內";
            }
        }).thenApply((str) -> {
            log.info("遊戲開始");
            return str;
        }).handle((res, ex) -> {
            if(ex != null) {
                log.info("必有蹊蹺,來者" + ex.getMessage());
                return "Unknown!";
            }
            return res;
        });

        log.info(maturityFuture.get());

到這裏,關於 CompletableFuture 的基本使用你已經瞭解的差很少了,不知道你是否注意,咱們前面說的帶有 Sync 的方法是單獨起一個線程來執行,可是咱們並無建立線程,這是怎麼實現的呢?

細心的朋友若是仔細看每一個變種函數的第三個方法也許會發現裏面都有一個 Executor 類型的參數,用於指定線程池,由於實際業務中咱們是嚴謹手動建立線程的,這在 我會手動建立線程,爲何要使用線程池?文章中明確說明過;若是沒有指定線程池,那天然就會有一個默認的線程池,也就是 ForkJoinPool

private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

ForkJoinPool 的線程數默認是 CPU 的核心數。可是,在前序文章中明確說明過:

不要全部業務共用一個線程池,由於,一旦有任務執行一些很慢的 I/O 操做,就會致使線程池中全部線程都阻塞在 I/O 操做上,從而形成線程飢餓,進而影響整個系統的性能

總結

CompletableFuture 的方法並無所有介紹徹底,也不必所有介紹,相信你們按照這個思路來理解 CompletableFuture 也不會有什麼大問題了,剩下的就交給實踐/時間以及本身的體會了

後記

你覺得 JDK1.8 CompletableFuture 已經很完美了是否是,但追去完美的道路上永無止境,Java 9 對CompletableFuture 又作了部分升級和改造

  1. 添加了新的工廠方法
  2. 支持延遲和超時處理

    orTimeout()
    completeOnTimeout()
  3. 改進了對子類的支持

詳情能夠查看: Java 9 CompletableFuture API Improvements. 怎樣快速的切換不一樣 Java 版原本嚐鮮?SDKMAN 統一靈活管理多版本Java 這篇文章的方法送給你

最後我們再泡一壺茶,感覺一下新變化吧

靈魂追問

  1. 據說 ForkJoinPool 線程池效率更高,爲何呢?
  2. 若是批量處理異步程序,有什麼可用的方案嗎?

參考

  1. Java 併發編程實戰
  2. Java 併發編程的藝術
  3. Java 併發編程之美
  4. https://www.baeldung.com/java...
  5. https://www.callicoder.com/ja...

日拱一兵 | 原創

相關文章
相關標籤/搜索