三個角度搞清CompletableFuture(一)

讓天下沒有難學的javajava

點贊、關注、收藏 編程

1、前言

  • Java開發中多線程編程司空見慣,從開始的Thread、Runnable到Future再到CompletableFuture,JDK爲咱們使用多線程不斷擴展功能。
  • 關於CompletableFuture的介紹、教程一搜一大堆,那爲何還要寫這篇文章呢?教程卻是很多,可是複製粘貼的一大堆,要不就是API的堆疊。難以完整地看下去。
  • 所以本文主要從本身學習的角度來談談CompletableFuture,從三個主線來對CompletableFuture進行展開,讓你從一個清晰的層次來了解CompletableFuture有一個清晰的層次。

2、從Future機制提及

  • Future機制是Java 1.5中的引入的,表明着一個異步計算的結果。關於Future/Callable能夠參考多線程

    juejin.im/post/5e0819… ,保證你瞬間有了層次感。不至於什麼都會,可是好像又挺糊塗。app

  • Future解決了Runnable無返回值的問題,提供了2種get()來獲取結果。異步

public interface Future<V>{
  
  //堵塞獲取
  V get() throws InterruptedException,ExecutionException;
  
  //等待timeout獲取
  V get(long timeout,TimeUnit unit) throws InterruptedException,ExecutionException,TimeoutException;
}
複製代碼

3、CompleteFuture誕生

  • 堵塞獲取結果的方式顯然與異步編程相違背,timeout輪詢的方式既不優雅也沒法及時獲得計算結果。不少語言提供了回調實現異步編程,如Node.js。Java的一些類庫如netty、guava等也擴展了Future來改善異步編程的體驗。官方顯然也不甘落後,在Java 8中新增了CompleteFuture及相關API,大大擴展了Future的能力。

//接受Runnable,無返回值,使用ForkJoinPool.commonPool()線程池
public static CompletableFuture<Void> runAsync(Runnable runnable) //接受Runnable,無返回值,使用指定的executor線程池  public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) //接受Supplier,返回U,使用ForkJoinPool.commonPool()線程池 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) //接受Supplier,返回U,使用指定的executor線程池  public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) 複製代碼
  • 能夠看到CompleteFuture實現了Future、CompletionStage2個接口。Future你們都瞭解,主要是用於實現一個未開始的異步事件。

3.一、新建一個CompletableFuture

  • runAsync無返回值 異步編程

  • supplyAsync有返回值 函數

3.二、CompletionStage

  • Stage是階段的意思。顧名思義,CompletionStage它表明了某個同步或者異步計算的一個階段,最終結果上的一個環節。多個CompletionStage構成了一條流水線,一個環節組裝完成了能夠移交給下一個環節。一個結果可能須要流轉了多個CompletionStage。post

  • 如很常見的一個兌換場景,先扣積分、而後發短信、而後設置我的主頁標識,這其中扣積分、發短信、設置標識分別是一個Stage。學習

  • CompletableFuture鏈式調用背後就是CompletionStage來提供支持,CompletionStage的thenApply().thenApply()...一環扣一環。spa

  • 簡單的CompletionStage,supplyAsync()異步任務執行完後使用thenApply()將結果傳遞給下一個stage。

4、三個角度來剖析CompletableFuture

  • 有了CompletableFuture、CompletionStage這2個前置概念的介紹,下面咱們能夠正式從三個角度來認識CompletableFuture。

4.一、串行關係

  • thenApply
//這是最簡單也是最經常使用的一個方法,CompletableFuture的鏈式調用
  public static void main(String[] args) {

    CompletableFuture<String> completableFuture = CompletableFuture.runAsync(() -> {
      try {
        TimeUnit.SECONDS.sleep(3);
        System.out.println("hello");
      }catch (Exception e){
        e.printStackTrace();
      }
    }).thenApply(s1 -> {
      System.out.println(" big");
      return s1 + "big";
    }).thenApply(s2 -> " world");
  }

//hello big world
複製代碼
  • thenRun

    • 計算完成時候執行一個Runnable,不使用CompletableFuture計算結果,此前的CompletableFuture計算結果也會被忽略,返回CompletableFuture類型。
  • thenAccept

    • thenApply、thenAccept相似,區別在於thenAccept純消費,無返回值,支持鏈式調用。
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
      try {
        TimeUnit.SECONDS.sleep(5);
        return "success";
      } catch (Exception e) {
        e.printStackTrace();
        return "error";
      }
    }).thenAcceptAsync(s->{
      if ("success".equals(s)){
        System.out.println(s);
      }else {
        System.err.println(s);
      }
    });
複製代碼
  • thenCompose
    • 都是接受一個Function,輸入是當前CompleteableFuture計算值,返回一個新CompletableFuture。即這個新的CompleteableFuture組合原來的CompleteableFuture和函數返回的CompleteableFuture。
    • 一般使用在第二個CompleteableFuture須要使用第一個CompleteableFuture結果做爲輸入狀況下。
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) 複製代碼
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1)
                     .thenCompose(x -> CompletableFuture.supplyAsync(() -> x+1));
複製代碼
 thenApply、thenCompose都接受一個Function的函數式接口,那麼區別呢?
 1.thenApply使用在同步mapping方法。
 2.thenCompose使用在異步mapping方法。
  • 🌰來了
//thenApply
thenApply(Function<? super T,? extends U> fn)

//thenCompose
thenCompose(Function<? super T,? extends CompletableFuture<U>> fn)
  
  
//能夠看出thenApply返回的是一個對象U,而thenCompose返回的是CompletableFuture<U>
//從使用角度是來看,若是你第二個操做也是異步操做那麼使用thenCompose,若是是一個簡單同步操做,那麼使用thenApply,相信實戰幾回你就能明白何時使用thenApply、thenCompose。
複製代碼

4.二、AND 關係

  • thenCombine

    • combine、compose依我六級的水平來看,好像都有組合、結合的意思,那麼區別呢?
    • 主要區別在於thenCombine結合的2個CompletableFuture沒有依賴關係,且第二個CompletableFuture不須要等第一個CompletableFuture執行完成纔開始。
    • thenCombine組合的多個CompletableFuture雖然是獨立的,可是整個流水線是同步的。

    🌰來了

//從上圖能夠看出,thenCombine的2個CompletableFuture不是依賴關係,第二個CompletableFuture比第一個CompletableFuture先執行,最後的BiFunction()組合2個CompletableFuture結果。

//再次強調:整個流水線是同步的
複製代碼
  • thenAcceptBoth
//2個CompletableFuture處理完成後,將結果進行消費。
//與thenCombine相似,第2個CompletableFuture不依賴第1個CompletableFuture執行完成,返回值Void。
複製代碼

  • runAfterBoth
    • 2個CompletableFuture都完成以後,會執行一個Runnable。這點與thenAcceptBoth、thenCombine都相似,可是不一樣點在於runAfterBoth絕不關心任意一個CompletableFuture的返回值,只要CompletableFuture都執行完成它就run,一樣它也沒有返回值。
  • allOf
    • 只是單純等待全部的CompletableFuture執行完成,返回一個 CompletableFuture。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) 複製代碼
public static void main(String[] args) throws Exception {
    long start = System.currentTimeMillis();
    CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
      try {
        TimeUnit.SECONDS.sleep(1);
        System.out.println("i am sleep 1");
      } catch (Exception e) {
        e.printStackTrace();
      }
      return "service 1";
    });
    CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {
      try {
        TimeUnit.SECONDS.sleep(2);
        System.out.println("i am sleep 2");
      } catch (Exception e) {
        e.printStackTrace();
      }
      return "service 2";
    });

    CompletableFuture<String> completableFuture3 = CompletableFuture.supplyAsync(() -> {
      try {
        TimeUnit.SECONDS.sleep(3);
        System.out.println("i am sleep 3");
      } catch (Exception e) {
        e.printStackTrace();
      }
      return "service 3";
    });
    CompletableFuture<Void> completableFuture = CompletableFuture
        .allOf(completableFuture1, completableFuture2, completableFuture3);
    completableFuture.join();
    System.out.println(System.currentTimeMillis() - start);
  }
複製代碼

4.三、OR關係

  • applyToEither
    • CompletableFuture最快的執行完成的結果做爲下一個stage的輸入結果
  • acceptEither
    • 最快CompletableFuture執行完成的時候,action消費就會獲得執行。
  • runAfterEither
    • 任何一個CompletableFuture完成以後,就會執行下一步的Runnable。
  • anyOf
    • 任何一個CompletableFuture完成,anyOf函數就會返回。

小結

  • CompletableFuture大大擴展了Future能力,簡化異步編程的複雜性。
  • 本文主要從AND、OR、串行化三個角度來了解CompletableFuture之間的stage。不一樣的場景須要疊加不一樣的stage。
相關文章
相關標籤/搜索