[Java併發-15] CompletableFuture: 異步編程

前面咱們不止一次提到,用多線程優化性能,其實不過就是將串行操做變成並行操做。若是仔細觀察,你還會發如今串行轉換成並行的過程當中,必定會涉及到異步化,例以下面的示例代碼,如今是串行的,爲了提高性能,咱們得把它們並行化。java

// 如下兩個方法都是耗時操做
doBizA();
doBizB();


//建立兩個子線程去執行就能夠了,兩個操做已經被異步化了。
new Thread(()->doBizA())
  .start();
new Thread(()->doBizB())
  .start();

異步化,是並行方案得以實施的基礎,更深刻地講其實就是:利用多線程優化性能這個核心方案得以實施的基礎。Java 在 1.8 版本提供了 CompletableFuture 來支持異步編程。編程

CompletableFuture 的核心優點

爲了領略 CompletableFuture 異步編程的優點,這裏咱們用 CompletableFuture 從新實現前面曾說起的燒水泡茶程序。首先仍是須要先完成分工方案,在下面的程序中,咱們分了 3 個任務:任務 1 負責洗水壺、燒開水,任務 2 負責洗茶壺、洗茶杯和拿茶葉,任務 3 負責泡茶。其中任務 3 要等待任務 1 和任務 2 都完成後才能開始。這個分工以下圖所示。多線程

燒水泡茶分工方案app

// 任務 1:洗水壺 -> 燒開水
CompletableFuture<Void> f1 = 
  CompletableFuture.runAsync(()->{
  System.out.println("T1: 洗水壺...");
  sleep(1, TimeUnit.SECONDS);

  System.out.println("T1: 燒開水...");
  sleep(15, TimeUnit.SECONDS);
});
// 任務 2:洗茶壺 -> 洗茶杯 -> 拿茶葉
CompletableFuture<String> f2 = 
  CompletableFuture.supplyAsync(()->{
  System.out.println("T2: 洗茶壺...");
  sleep(1, TimeUnit.SECONDS);

  System.out.println("T2: 洗茶杯...");
  sleep(2, TimeUnit.SECONDS);

  System.out.println("T2: 拿茶葉...");
  sleep(1, TimeUnit.SECONDS);
  return " 龍井 ";
});
// 任務 3:任務 1 和任務 2 完成後執行:泡茶
CompletableFuture<String> f3 = 
  f1.thenCombine(f2, (__, tf)->{
    System.out.println("T1: 拿到茶葉:" + tf);
    System.out.println("T1: 泡茶...");
    return " 上茶:" + tf;
  });
// 等待任務 3 執行結果
System.out.println(f3.join());

void sleep(int t, TimeUnit u) {
  try {
    u.sleep(t);
  }catch(InterruptedException e){}
}
// 一次執行結果:
T1: 洗水壺...
T2: 洗茶壺...
T1: 燒開水...
T2: 洗茶杯...
T2: 拿茶葉...
T1: 拿到茶葉: 龍井
T1: 泡茶...
上茶: 龍井

從總體上來看,咱們會發現dom

  1. 無需手工維護線程,沒有繁瑣的手工維護線程的工做,給任務分配線程的工做也不須要咱們關注;
  2. 語義更清晰,例如f3 = f1.thenCombine(f2, ()->{}) 可以清晰地表述「任務 3 要等待任務 1 和任務 2 都完成後才能開始」;
  3. 代碼更簡練而且專一於業務邏輯,幾乎全部代碼都是業務邏輯相關的。

領略 CompletableFuture 異步編程的優點以後,下面咱們詳細介紹 CompletableFuture 的使用。異步

建立 CompletableFuture 對象

建立 CompletableFuture 對象主要靠下面代碼中展現的這 4 個靜態方法,咱們先看前兩個。在燒水泡茶的例子中,咱們已經使用了runAsync(Runnable runnable)supplyAsync(Supplier<U> supplier),它們之間的區別是:Runnable 接口的 run() 方法沒有返回值,而 Supplier 接口的 get() 方法是有返回值的。異步編程

前兩個方法和後兩個方法的區別在於:後兩個方法能夠指定線程池參數。函數

默認狀況下 CompletableFuture 會使用公共的 ForkJoinPool 線程池,這個線程池默認建立的線程數是 CPU 的核數(也能夠經過 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 來設置 ForkJoinPool 線程池的線程數)。若是全部 CompletableFuture 共享一個線程池,那麼一旦有任務執行一些很慢的 I/O 操做,就會致使線程池中全部線程都阻塞在 I/O 操做上,從而形成線程飢餓,進而影響整個系統的性能。因此,強烈建議你要根據不一樣的業務類型建立不一樣的線程池,以免互相干擾性能

// 使用默認線程池
static CompletableFuture<Void> 
  runAsync(Runnable runnable)
static <U> CompletableFuture<U> 
  supplyAsync(Supplier<U> supplier)
// 能夠指定線程池  
static CompletableFuture<Void> 
  runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> 
  supplyAsync(Supplier<U> supplier, Executor executor)

建立完 CompletableFuture 對象以後,會自動地異步執行 runnable.run() 方法或者 supplier.get() 方法,對於一個異步操做,你須要關注兩個問題:一個是異步操做何時結束,另外一個是如何獲取異步操做的執行結果。由於 CompletableFuture 類實現了 Future 接口,因此這兩個問題你均可以經過 Future 接口來解決。另外,CompletableFuture 類還實現了 CompletionStage 接口,這個接口內容實在是太豐富了,在 1.8 版本里有 40 個方法,這些方法咱們該如何理解呢?優化

理解 CompletionStage 接口

能夠站在分工的角度類比一下工做流。任務是有時序關係的,好比有串行關係、並行關係、匯聚關係等。這樣說可能有點抽象,這裏還舉前面燒水泡茶的例子,其中洗水壺和燒開水就是串行關係,洗水壺、燒開水和洗茶壺、洗茶杯這兩組任務之間就是並行關係,而燒開水、拿茶葉和泡茶就是匯聚關係。

圖示123

CompletionStage 接口能夠清晰地描述任務之間的這種時序關係,例如前面提到的
f3 = f1.thenCombine(f2, ()->{}) 描述的就是一種匯聚關係。燒水泡茶程序中的匯聚關係是一種 AND 聚合關係,這裏的 AND 指的是全部依賴的任務(燒開水和拿茶葉)都完成後纔開始執行當前任務(泡茶)。既然有 AND 聚合關係,那就必定還有 OR 聚合關係,所謂 OR 指的是依賴的任務只要有一個完成就能夠執行當前任務。

最後就是異常,CompletionStage 接口也能夠方便地描述異常處理。

下面咱們就來一一介紹,CompletionStage 接口如何描述串行關係、AND 聚合關係、OR 聚合關係以及異常處理。

1. 描述串行關係

CompletionStage 接口裏面描述串行關係,主要是 thenApply、thenAccept、thenRun 和 thenCompose 這四個系列的接口。

thenApply 系列函數裏參數 fn 的類型是接口 Function<T, R>,這個接口裏與 CompletionStage 相關的方法是R apply(T t),這個方法既能接收參數也支持返回值,因此 thenApply 系列方法返回的是CompletionStage<R>

而 thenAccept 系列方法裏參數 consumer 的類型是接口Consumer<T>,這個接口裏與 CompletionStage 相關的方法是void accept(T t),這個方法雖然支持參數,但卻不支持回值,因此 thenAccept 系列方法返回的是CompletionStage<Void>

thenRun 系列方法裏 action 的參數是 Runnable,因此 action 既不能接收參數也不支持返回值,因此 thenRun 系列方法返回的也是CompletionStage<Void>

這些方法裏面 Async 表明的是異步執行 fn、consumer 或者 action。其中,須要你注意的是 thenCompose 系列方法,這個系列的方法會新建立出一個子流程,最終結果和 thenApply 系列是相同的。

CompletionStage<R> thenApply(fn);
CompletionStage<R> thenApplyAsync(fn);
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);
CompletionStage<Void> thenRun(action);
CompletionStage<Void> thenRunAsync(action);
CompletionStage<R> thenCompose(fn);
CompletionStage<R> thenComposeAsync(fn);

經過下面的示例代碼,你能夠看一下 thenApply() 方法是如何使用的。首先經過 supplyAsync() 啓動一個異步流程,以後是兩個串行操做,總體看起來仍是挺簡單的。不過,雖然這是一個異步流程,但任務①②③倒是串行執行的,②依賴①的執行結果,③依賴②的執行結果。

CompletableFuture<String> f0 = 
  CompletableFuture.supplyAsync(
    () -> "Hello World")      //①
  .thenApply(s -> s + " QQ")  //②
  .thenApply(String::toUpperCase);//③

System.out.println(f0.join());
// 輸出結果
HELLO WORLD QQ

2. 描述 AND 匯聚關係

CompletionStage 接口裏面描述 AND 匯聚關係,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口,這些接口的區別也是源自 fn、consumer、action 這三個核心參數不一樣。

CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> runAfterBoth(other, action);
CompletionStage<Void> runAfterBothAsync(other, action);

3. 描述 OR 匯聚關係

CompletionStage 接口裏面描述 OR 匯聚關係,主要是 applyToEither、acceptEither 和 runAfterEither 系列的接口,這些接口的區別也是源自 fn、consumer、action 這三個核心參數不一樣。

CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);
CompletableFuture<String> f1 = 
  CompletableFuture.supplyAsync(()->{
    int t = getRandom(5, 10);
    sleep(t, TimeUnit.SECONDS);
    return String.valueOf(t);
});

CompletableFuture<String> f2 = 
  CompletableFuture.supplyAsync(()->{
    int t = getRandom(5, 10);
    sleep(t, TimeUnit.SECONDS);
    return String.valueOf(t);
});

CompletableFuture<String> f3 = 
  f1.applyToEither(f2,s -> s);

System.out.println(f3.join());

4. 異常處理

雖然上面咱們提到的 fn、consumer、action 它們的核心方法都不容許拋出可檢查異常,可是卻沒法限制它們拋出運行時異常 ,例以下面的代碼,執行

CompletableFuture<Integer> 
  f0 = CompletableFuture.
    .supplyAsync(()->(7/0))
    .thenApply(r->r*10);
System.out.println(f0.join());

CompletionStage 接口給咱們提供的方案很是簡單,比 try{}catch{}還要簡單,下面是相關的方法,使用這些方法進行異常處理和串行操做是同樣的,都支持鏈式編程方式。

CompletionStage exceptionally(fn);
CompletionStage<R> whenComplete(consumer);
CompletionStage<R> whenCompleteAsync(consumer);
CompletionStage<R> handle(fn);
CompletionStage<R> handleAsync(fn);

下面的示例代碼展現瞭如何使用 exceptionally() 方法來處理異常,exceptionally() 的使用很是相似於 try{}catch{}中的 catch{},可是因爲支持鏈式編程方式,因此相對更簡單。

whenComplete() 和 handle() 系列方法就相似於 try{}finally{}中的 finally{},不管是否發生異常都會執行 whenComplete() 中的回調函數 consumer 和 handle() 中的回調函數 fn。

whenComplete() 和 handle() 的區別在於 whenComplete() 不支持返回結果,而 handle() 是支持返回結果的。

CompletableFuture<Integer> 
  f0 = CompletableFuture
    .supplyAsync(()->7/0))
    .thenApply(r->r*10)
    .exceptionally(e->0);
System.out.println(f0.join());

總結

不過最近幾年,伴隨着 ReactiveX 的發展(Java 語言的實現版本是 RxJava),回調地獄已經被完美解決了,Java 語言也開始官方支持異步編程:在 1.8 版本提供了 CompletableFuture,在 Java 9 版本則提供了更加完備的 Flow API,異步編程目前已經徹底工業化。

CompletableFuture 已經可以知足簡單的異步編程需求,若是你對異步編程感興趣,能夠重點關注 RxJava 這個項目,利用 RxJava,即使在 Java 1.6 版本也能享受異步編程的樂趣。

相關文章
相關標籤/搜索