編程老司機帶你玩轉 CompletableFuture 異步編程

本文從實例出發,介紹 CompletableFuture 基本用法。不過講的再多,不如親自上手練習一下。因此建議各位小夥伴看完,上機練習一把,快速掌握 CompletableFuturehtml

我的博文地址:sourl.cn/s5MbCmjava

全文摘要:編程

  • Future VS CompletableFuture
  • CompletableFuture 基本用法

0x00. 前言

一些業務場景咱們須要使用多線程異步執行任務,加快任務執行速度。 Java 提供 Runnable Future<V> 兩個接口用來實現異步任務邏輯。多線程

雖然 Future<V> 能夠獲取任務執行結果,可是獲取方式十方不變。咱們不得不使用Future#get 阻塞調用線程,或者使用輪詢方式判斷 Future#isDone 任務是否結束,再獲取結果。併發

這兩種處理方式都不是很優雅,JDK8 以前併發類庫沒有提供相關的異步回調實現方式。沒辦法,咱們只好藉助第三方類庫,如 Guava,擴展 Future,增長支持回調功能。相關代碼以下:app

雖然這種方式加強了 Java 異步編程能力,可是仍是沒法解決多個異步任務須要相互依賴的場景。異步

舉一個生活上的例子,假如咱們須要出去旅遊,須要完成三個任務:ide

  • 任務一:訂購航班
  • 任務二:訂購酒店
  • 任務三:訂購租車服務

很顯然任務一和任務二沒有相關性,能夠單獨執行。可是任務三必須等待任務一與任務二結束以後,才能訂購租車服務。異步編程

爲了使任務三時執行時能獲取到任務一與任務二執行結果,咱們還須要藉助 CountDownLatch函數

0x01. CompletableFuture

JDK8 以後,Java 新增一個功能十分強大的類:CompletableFuture。單獨使用這個類就能夠輕鬆的完成上面的需求:

你們能夠先不用管 CompletableFuture 相關 API,下面將會具體講解。

對比 Future<V>CompletableFuture 優勢在於:

  • 不須要手工分配線程,JDK 自動分配
  • 代碼語義清晰,異步任務鏈式調用
  • 支持編排異步任務

怎麼樣,是否是功能很強大?接下來抓穩了,小黑哥要發車了。

1.1 方法一覽

首先來經過 IDE 查看下這個類提供的方法:

稍微數一下,這個類總共有 50 多個方法,個人天。。。

不過也不要怕,小黑哥幫大家概括好了,跟着小黑哥的節奏,帶大家掌握 CompletableFuture

若圖片不清晰,能夠關注『程序通事』,回覆:『233』,獲取該思惟導圖

1.2 建立 CompletableFuture 實例

建立 CompletableFuture 對象實例咱們可使用以下幾個方法:

第一個方法建立一個具備默認結果的 CompletableFuture,這個沒啥好講。咱們重點講述下下面四個異步方法。

前兩個方法 runAsync 不支持返回值,而 supplyAsync能夠支持返回結果。

這個兩個方法默認將會使用公共的 ForkJoinPool 線程池執行,這個線程池默認線程數是 CPU 的核數。

能夠設置 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 來設置 ForkJoinPool 線程池的線程數

使用共享線程池將會有個弊端,一旦有任務被阻塞,將會形成其餘任務沒機會執行。因此強烈建議使用後兩個方法,根據任務類型不一樣,主動建立線程池,進行資源隔離,避免互相干擾。

1.3 設置任務結果

CompletableFuture 提供如下方法,能夠主動設置任務結果。

boolean complete(T value) boolean completeExceptionally(Throwable ex) 複製代碼

第一個方法,主動設置 CompletableFuture 任務執行結果,若返回 true,表示設置成功。若是返回 false,設置失敗,這是由於任務已經執行結束,已經有了執行結果。

示例代碼以下:

// 執行異步任務
CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
  System.out.println("cf 任務執行開始");
  sleep(10, TimeUnit.SECONDS);
  System.out.println("cf 任務執行結束");
  return "樓下小黑哥";
});
//
Executors.newSingleThreadScheduledExecutor().execute(() -> {
  sleep(5, TimeUnit.SECONDS);
  System.out.println("主動設置 cf 任務結果");
  // 設置任務結果,因爲 cf 任務未執行結束,結果返回 true
  cf.complete("程序通事");
});
// 因爲 cf 未執行結束,將會被阻塞。5 秒後,另一個線程主動設置任務結果
System.out.println("get:" + cf.get());
// 等待 cf 任務執行結束
sleep(10, TimeUnit.SECONDS);
// 因爲已經設置任務結果,cf 執行結束任務結果將會被拋棄
System.out.println("get:" + cf.get());
/*** * cf 任務執行開始 * 主動設置 cf 任務結果 * get:程序通事 * cf 任務執行結束 * get:程序通事 */
複製代碼

這裏須要注意一點,一旦 complete 設置成功,CompletableFuture 返回結果就不會被更改,即便後續 CompletableFuture 任務執行結束。

第二個方法,給 CompletableFuture 設置異常對象。若設置成功,若是調用 get 等方法獲取結果,將會拋錯。

示例代碼以下:

// 執行異步任務
CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
    System.out.println("cf 任務執行開始");
    sleep(10, TimeUnit.SECONDS);
    System.out.println("cf 任務執行結束");
    return "樓下小黑哥";
});
//
Executors.newSingleThreadScheduledExecutor().execute(() -> {
    sleep(5, TimeUnit.SECONDS);
    System.out.println("主動設置 cf 異常");
    // 設置任務結果,因爲 cf 任務未執行結束,結果返回 true
    cf.completeExceptionally(new RuntimeException("啊,掛了"));
});
// 因爲 cf 未執行結束,前 5 秒將會被阻塞。後續程序拋出異常,結束
System.out.println("get:" + cf.get());
/*** * cf 任務執行開始 * 主動設置 cf 異常 * java.util.concurrent.ExecutionException: java.lang.RuntimeException: 啊,掛了 * ...... */
複製代碼

1.4 CompletionStage

CompletableFuture 分別實現兩個接口 FutureCompletionStage

Future 接口你們都比較熟悉,這裏主要講講 CompletionStage

CompletableFuture 大部分方法來自CompletionStage 接口,正是由於這個接口,CompletableFuture纔有如從強大功能。

想要理解 CompletionStage 接口,咱們須要先了解任務的時序關係的。咱們能夠將任務時序關係分爲如下幾種:

  • 串行執行關係
  • 並行執行關係
  • AND 匯聚關係
  • OR 匯聚關係

1.5 串行執行關係

任務串行執行,下一個任務必須等待上一個任務完成才能夠繼續執行。

CompletionStage 有四組接口能夠描述串行這種關係,分別爲:

thenApply 方法須要傳入核心參數爲 Function<T,R>類型。這個類核心方法爲:

R apply(T t) 複製代碼

因此這個接口將會把上一個任務返回結果當作入參,執行結束將會返回結果。

thenAccept 方法須要傳入參數對象爲 Consumer<T>類型,這個類核心方法爲:

void accept(T t) 複製代碼

返回值 void 能夠看出,這個方法不支持返回結果,可是須要將上一個任務執行結果當作參數傳入。

thenRun 方法須要傳入參數對象爲 Runnable 類型,這個類你們應該都比較熟悉,核心方法既不支持傳入參數,也不會返回執行結果。

thenCompose 方法做用與 thenApply 同樣,只不過 thenCompose 須要返回新的 CompletionStage。這麼理解比較抽象,能夠集合代碼一塊兒理解。

方法中帶有 Async ,表明能夠異步執行,這個系列還有重載方法,能夠傳入自定義的線程池,上圖未展現,讀者只能夠自行查看 API。

最後咱們經過代碼展現 thenApply 使用方式:

CompletableFuture<String> cf
        = CompletableFuture.supplyAsync(() -> "hello,樓下小黑哥")// 1
        .thenApply(s -> s + "@程序通事") // 2
        .thenApply(String::toUpperCase); // 3
System.out.println(cf.join());
// 輸出結果 HELLO,樓下小黑哥@程序通事
複製代碼

這段代碼比較簡單,首先咱們開啓一個異步任務,接着串行執行後續兩個任務。任務 2 須要等待任務1 執行完成,任務 3 須要等待任務 2。

上面方法,你們須要記住了 Function<T,R>Consumer<T>Runnable 三者區別,根據場景選擇使用。

1.6 AND 匯聚關係

AND 匯聚關係表明全部任務完成以後,才能進行下一個任務。

如上所示,只有任務 A 與任務 B 都完成以後,任務 C 纔會開始執行。

CompletionStage 有如下接口描述這種關係。

thenCombine 方法核心參數 BiFunction ,做用與 Function同樣,只不過 BiFunction 能夠接受兩個參數,而 Function 只能接受一個參數。

thenAcceptBoth 方法核心參數BiConsumer 做用也與 Consumer同樣,不過其須要接受兩個參數。

runAfterBoth 方法核心參數最簡單,上面已經介紹過,再也不介紹。

這三組方法只能完成兩個任務 AND 匯聚關係,若是須要完成多個任務匯聚關係,須要使用 CompletableFuture#allOf,不過這裏須要注意,這個方法是不支持返回任務結果。

AND 匯聚關係相關示例代碼,開頭已經使用過了,這裏再粘貼一下,方便你們理解:

1.7 OR 匯聚關係

有 AND 匯聚關係,固然也存在 OR 匯聚關係。OR 匯聚關係表明只要多個任務中任一任務完成,就能夠接着接着執行下一任務。

CompletionStage 有如下接口描述這種關係:

前面三組接口方法傳參與 AND 匯聚關係一致,這裏也再也不詳細解釋了。

固然 OR 匯聚關係可使用 CompletableFuture#anyOf 執行多個任務。

下面示例代碼展現如何使用 applyToEither 完成 OR 關係。

CompletableFuture<String> cf
        = CompletableFuture.supplyAsync(() -> {
    sleep(5, TimeUnit.SECONDS);
    return "hello,樓下小黑哥";
});// 1

CompletableFuture<String> cf2 = cf.supplyAsync(() -> {
    sleep(3, TimeUnit.SECONDS);
    return "hello,程序通事";
});
// 執行 OR 關係
CompletableFuture<String> cf3 = cf2.applyToEither(cf, s -> s);

// 輸出結果,因爲 cf2 只休眠 3 秒,優先執行完畢
System.out.println(cf2.join());
// 結果:hello,程序通事
複製代碼

1.8 異常處理

CompletableFuture 方法執行過程若產生異常,當調用 getjoin獲取任務結果纔會拋出異常。

上面代碼咱們顯示使用 try..catch 處理上面的異常。不過這種方式不太優雅,CompletionStage 提供幾個方法,能夠優雅處理異常。

exceptionally 使用方式相似於 try..catchcatch代碼塊中異常處理。

whenCompletehandle 方法就相似於 try..catch..finanllyfinally 代碼塊。不管是否發生異常,都將會執行的。這兩個方法區別在於 handle 支持返回結果。

下面示例代碼展現 handle 用法:

CompletableFuture<Integer>
        f0 = CompletableFuture.supplyAsync(() -> (7 / 0))
        .thenApply(r -> r * 10)
        .handle((integer, throwable) -> {
            // 若是異常存在,打印異常,而且返回默認值
            if (throwable != null) {
                throwable.printStackTrace();
                return 0;
            } else {
                // 若是
                return integer;
            }
        });


System.out.println(f0.join());
/** *java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero * ..... * * 0 */
複製代碼

0x02. 總結

JDK8 提供 CompletableFuture 功能很是強大,能夠編排異步任務,完成串行執行,並行執行,AND 匯聚關係,OR 匯聚關係。

不過這個類方法實在太多,且方法還須要傳入各類函數式接口,新手剛開始使用會直接會被弄懵逼。這裏幫你們在總結一下三類核心參數的做用

  • Function 這類函數接口既支持接收參數,也支持返回值
  • Consumer 這類接口函數只支持接受參數,不支持返回值
  • Runnable 這類接口不支持接受參數,也不支持返回值

搞清楚函數參數做用之後,而後根據串行,AND 匯聚關係,OR 匯聚關係概括一下相關方法,這樣就比較好理解了

最後再貼一下,文章開頭的思惟導圖,但願對你有幫助。

0x03. 幫助文檔

  1. 極客時間-併發編程專欄
  2. colobu.com/2016/02/29/…
  3. www.ibm.com/developerwo…

最後說一句(求關注)

CompletableFuture 很早以前就有關注,本覺得跟 Future同樣,使用挺簡單,誰知道學的時候才發現好難。各類 API 方法看的頭有點大。

後來看到極客時間-『併發編程』專欄使用概括方式分類 CompletableFuture 各類方法,一會兒就看懂了。所這篇文章也參考這種概括方式。

這篇文章找資料,整理一個星期,幸虧今天順利產出。

看在小黑哥寫的這麼辛苦的份上,點個關注吧,賞個讚唄。別下次必定啊,大哥!寫文章很辛苦的,須要來點正反饋。

才疏學淺,不免會有紕漏,若是你發現了錯誤的地方,還請你留言給我指出來,我對其加以修改。

感謝您的閱讀,我堅持原創,十分歡迎並感謝您的關注~

歡迎關注個人公衆號:程序通事,得到平常乾貨推送。若是您對個人專題內容感興趣,也能夠關注個人博客:studyidea.cn

相關文章
相關標籤/搜索