Java 8 CompletableFuture 教程

Java 8 有大量的新特性和加強如 Lambda 表達式StreamsCompletableFuture等。在本篇文章中我將詳細解釋清楚CompletableFuture以及它全部方法的使用。html

什麼是CompletableFuture?

在Java中CompletableFuture用於異步編程,異步編程是編寫非阻塞的代碼,運行的任務在一個單獨的線程,與主線程隔離,而且會通知主線程它的進度,成功或者失敗。java

在這種方式中,主線程不會被阻塞,不須要一直等到子線程完成。主線程能夠並行的執行其餘任務。web

使用這種並行方式,能夠極大的提升程序的性能。express

Future vs CompletableFuture

CompletableFuture 是 Future API的擴展。編程

Future 被用於做爲一個異步計算結果的引用。提供一個 isDone() 方法來檢查計算任務是否完成。當任務完成時,get() 方法用來接收計算任務的結果。api

Callbale和 Future 教程能夠學習更多關於 Future 知識.緩存

Future API 是很是好的 Java 異步編程進階,可是它缺少一些很是重要和有用的特性。oracle

Future 的侷限性

  1. 不能手動完成
    當你寫了一個函數,用於經過一個遠程API獲取一個電子商務產品最新價格。由於這個 API 太耗時,你把它容許在一個獨立的線程中,而且從你的函數中返回一個 Future。如今假設這個API服務宕機了,這時你想經過該產品的最新緩存價格手工完成這個Future 。你會發現沒法這樣作。
  2. Future 的結果在非阻塞的狀況下,不能執行更進一步的操做
    Future 不會通知你它已經完成了,它提供了一個阻塞的 get() 方法通知你結果。你沒法給 Future 植入一個回調函數,當 Future 結果可用的時候,用該回調函數自動的調用 Future 的結果。
  3. 多個 Future 不能串聯在一塊兒組成鏈式調用
    有時候你須要執行一個長時間運行的計算任務,而且當計算任務完成的時候,你須要把它的計算結果發送給另一個長時間運行的計算任務等等。你會發現你沒法使用 Future 建立這樣的一個工做流。
  4. 不能組合多個 Future 的結果
    假設你有10個不一樣的Future,你想並行的運行,而後在它們運行未完成後運行一些函數。你會發現你也沒法使用 Future 這樣作。
  5. 沒有異常處理
    Future API 沒有任務的異常處理結構竟然有如此多的限制,幸虧咱們有CompletableFuture,你可使用 CompletableFuture 達到以上全部目的。

CompletableFuture 實現了 FutureCompletionStage接口,而且提供了許多關於建立,鏈式調用和組合多個 Future 的便利方法集,並且有普遍的異常處理支持。異步

建立 CompletableFuture

1. 簡單的例子
可使用以下無參構造函數簡單的建立 CompletableFuture:async

CompletableFuture<String> completableFuture = new CompletableFuture<String>();

這是一個最簡單的 CompletableFuture,想獲取CompletableFuture 的結果可使用 CompletableFuture.get() 方法:

String result = completableFuture.get()

get() 方法會一直阻塞直到 Future 完成。所以,以上的調用將被永遠阻塞,由於該Future一直不會完成。

你可使用 CompletableFuture.complete() 手工的完成一個 Future:

completableFuture.complete("Future's Result")

全部等待這個 Future 的客戶端都將獲得一個指定的結果,而且 completableFuture.complete() 以後的調用將被忽略。

2. 使用 runAsync() 運行異步計算
若是你想異步的運行一個後臺任務而且不想改任務返回任務東西,這時候可使用 CompletableFuture.runAsync()方法,它持有一個Runnable 對象,並返回 CompletableFuture<Void>

// Run a task specified by a Runnable Object asynchronously.
CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
    @Override
    public void run() {
        // Simulate a long-running Job
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        System.out.println("I'll run in a separate thread than the main thread.");
    }
});

// Block and wait for the future to complete
future.get()

你也能夠以 lambda 表達式的形式傳入 Runnable 對象:

// Using Lambda Expression
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    // Simulate a long-running Job   
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    System.out.println("I'll run in a separate thread than the main thread.");
});

在本文中,我使用lambda表達式會比較頻繁,若是之前你沒有使用過,建議你也多使用lambda 表達式。

3. 使用 supplyAsync() 運行一個異步任務而且返回結果
當任務不須要返回任何東西的時候, CompletableFuture.runAsync() 很是有用。可是若是你的後臺任務須要返回一些結果應該要怎麼樣?

CompletableFuture.supplyAsync() 就是你的選擇。它持有supplier<T> 而且返回CompletableFuture<T>T 是經過調用 傳入的supplier取得的值的類型。

// Run a task specified by a Supplier object asynchronously
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
    @Override
    public String get() {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        return "Result of the asynchronous computation";
    }
});

// Block and get the result of the Future
String result = future.get();
System.out.println(result);

Supplier<T> 是一個簡單的函數式接口,表示supplier的結果。它有一個get()方法,該方法能夠寫入你的後臺任務中,而且返回結果。

你可使用lambda表達式使得上面的示例更加簡明:

// Using Lambda Expression
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return "Result of the asynchronous computation";
});
一個關於Executor 和Thread Pool筆記
你可能想知道,咱們知道 runAsync() supplyAsync()方法在單獨的線程中執行他們的任務。可是咱們不會永遠只建立一個線程。
CompletableFuture能夠從全局的 ForkJoinPool.commonPool()得到一個線程中執行這些任務。
可是你也能夠建立一個線程池並傳給 runAsync() supplyAsync()方法來讓他們從線程池中獲取一個線程執行它們的任務。
CompletableFuture API 的全部方法都有兩個變體-一個接受 Executor做爲參數,另外一個不這樣:
// Variations of runAsync() and supplyAsync() methods
static CompletableFuture<Void>  runAsync(Runnable runnable)
static CompletableFuture<Void>  runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

建立一個線程池,並傳遞給其中一個方法:

Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return "Result of the asynchronous computation";
}, executor);

在 CompletableFuture 轉換和運行

CompletableFuture.get()方法是阻塞的。它會一直等到Future完成而且在完成後返回結果。
可是,這是咱們想要的嗎?對於構建異步系統,咱們應該附上一個回調給CompletableFuture,當Future完成的時候,自動的獲取結果。
若是咱們不想等待結果返回,咱們能夠把須要等待Future完成執行的邏輯寫入到回調函數中。

可使用 thenApply(), thenAccept()thenRun()方法附上一個回調給CompletableFuture。

1. thenApply()
可使用 thenApply() 處理和改變CompletableFuture的結果。持有一個Function<R,T>做爲參數。Function<R,T>是一個簡單的函數式接口,接受一個T類型的參數,產出一個R類型的結果。

// Create a CompletableFuture
CompletableFuture<String> whatsYourNameFuture = CompletableFuture.supplyAsync(() -> {
   try {
       TimeUnit.SECONDS.sleep(1);
   } catch (InterruptedException e) {
       throw new IllegalStateException(e);
   }
   return "Rajeev";
});

// Attach a callback to the Future using thenApply()
CompletableFuture<String> greetingFuture = whatsYourNameFuture.thenApply(name -> {
   return "Hello " + name;
});

// Block and get the result of the future.
System.out.println(greetingFuture.get()); // Hello Rajeev

你也能夠經過附加一系列的thenApply()在回調方法 在CompletableFuture寫一個連續的轉換。這樣的話,結果中的一個 thenApply方法就會傳遞給該系列的另一個 thenApply方法。

CompletableFuture<String> welcomeText = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Rajeev";
}).thenApply(name -> {
    return "Hello " + name;
}).thenApply(greeting -> {
    return greeting + ", Welcome to the CalliCoder Blog";
});

System.out.println(welcomeText.get());
// Prints - Hello Rajeev, Welcome to the CalliCoder Blog

2. thenAccept() 和 thenRun()
若是你不想從你的回調函數中返回任何東西,僅僅想在Future完成後運行一些代碼片斷,你可使用thenAccept() thenRun()方法,這些方法常常在調用鏈的最末端的最後一個回調函數中使用。
CompletableFuture.thenAccept() 持有一個Consumer<T> ,返回一個CompletableFuture<Void>。它能夠訪問CompletableFuture的結果:

// thenAccept() example
CompletableFuture.supplyAsync(() -> {
    return ProductService.getProductDetail(productId);
}).thenAccept(product -> {
    System.out.println("Got product detail from remote service " + product.getName())
});

雖然thenAccept()能夠訪問CompletableFuture的結果,但thenRun()不能訪Future的結果,它持有一個Runnable返回CompletableFuture<Void>:

// thenRun() example
CompletableFuture.supplyAsync(() -> {
    // Run some computation  
}).thenRun(() -> {
    // Computation Finished.
});
異步回調方法的筆記
CompletableFuture提供的全部回調方法都有兩個變體:
`// thenApply() variants
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)`
這些異步回調變體經過在獨立的線程中執行回調任務幫助你進一步執行並行計算。
如下示例:
CompletableFuture.supplyAsync(() -> {
    try {
       TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
      throw new IllegalStateException(e);
    }
    return "Some Result"
}).thenApply(result -> {
    /* 
      Executed in the same thread where the supplyAsync() task is executed
      or in the main thread If the supplyAsync() task completes immediately (Remove sleep() call to verify)
    */
    return "Processed Result"
})

在以上示例中,在thenApply()中的任務和在supplyAsync()中的任務執行在相同的線程中。任何supplyAsync()當即執行完成,那就是執行在主線程中(嘗試刪除sleep測試下)。
爲了控制執行回調任務的線程,你可使用異步回調。若是你使用thenApplyAsync()回調,將從ForkJoinPool.commonPool()獲取不一樣的線程執行。

CompletableFuture.supplyAsync(() -> {
    return "Some Result"
}).thenApplyAsync(result -> {
    // Executed in a different thread from ForkJoinPool.commonPool()
    return "Processed Result"
})

此外,若是你傳入一個ExecutorthenApplyAsync()回調中,,任務將從Executor線程池獲取一個線程執行。

Executor executor = Executors.newFixedThreadPool(2);
CompletableFuture.supplyAsync(() -> {
    return "Some result"
}).thenApplyAsync(result -> {
    // Executed in a thread obtained from the executor
    return "Processed Result"
}, executor);

組合兩個CompletableFuture

1. 使用 thenCompose() 組合兩個獨立的future
假設你想從一個遠程API中獲取一個用戶的詳細信息,一旦用戶信息可用,你想從另一個服務中獲取他的貸方。
考慮下如下兩個方法getUserDetail() getCreditRating()的實現:

CompletableFuture<User> getUsersDetail(String userId) {
    return CompletableFuture.supplyAsync(() -> {
        UserService.getUserDetails(userId);
    });    
}

CompletableFuture<Double> getCreditRating(User user) {
    return CompletableFuture.supplyAsync(() -> {
        CreditRatingService.getCreditRating(user);
    });
}

如今讓咱們弄明白當使用了thenApply()後是否會達到咱們指望的結果-

CompletableFuture<CompletableFuture<Double>> result = getUserDetail(userId)
.thenApply(user -> getCreditRating(user));

在更早的示例中,Supplier函數傳入thenApply將返回一個簡單的值,可是在本例中,將返回一個CompletableFuture。以上示例的最終結果是一個嵌套的CompletableFuture。
若是你想獲取最終的結果給最頂層future,使用 thenCompose()方法代替-

CompletableFuture<Double> result = getUserDetail(userId)
.thenCompose(user -> getCreditRating(user));

所以,規則就是-若是你的回調函數返回一個CompletableFuture,可是你想從CompletableFuture鏈中獲取一個直接合並後的結果,這時候你可使用thenCompose()

2. 使用thenCombine()組合兩個獨立的 future
雖然thenCompose()被用於當一個future依賴另一個future的時候用來組合兩個future。thenCombine()被用來當兩個獨立的Future都完成的時候,用來作一些事情。

System.out.println("Retrieving weight.");
CompletableFuture<Double> weightInKgFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return 65.0;
});

System.out.println("Retrieving height.");
CompletableFuture<Double> heightInCmFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return 177.8;
});

System.out.println("Calculating BMI.");
CompletableFuture<Double> combinedFuture = weightInKgFuture
        .thenCombine(heightInCmFuture, (weightInKg, heightInCm) -> {
    Double heightInMeter = heightInCm/100;
    return weightInKg/(heightInMeter*heightInMeter);
});

System.out.println("Your BMI is - " + combinedFuture.get());

當兩個Future都完成的時候,傳給``thenCombine()的回調函數將被調用。

組合多個CompletableFuture

咱們使用thenCompose() thenCombine()把兩個CompletableFuture組合在一塊兒。如今若是你想組合任意數量的CompletableFuture,應該怎麼作?咱們可使用如下兩個方法組合任意數量的CompletableFuture。

static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

1. CompletableFuture.allOf()
CompletableFuture.allOf的使用場景是當你一個列表的獨立future,而且你想在它們都完成後並行的作一些事情。

假設你想下載一個網站的100個不一樣的頁面。你能夠串行的作這個操做,可是這很是消耗時間。所以你想寫一個函數,傳入一個頁面連接,返回一個CompletableFuture,異步的下載頁面內容。

CompletableFuture<String> downloadWebPage(String pageLink) {
    return CompletableFuture.supplyAsync(() -> {
        // Code to download and return the web page's content
    });
}

如今,當全部的頁面已經下載完畢,你想計算包含關鍵字CompletableFuture頁面的數量。可使用CompletableFuture.allOf()達成目的。

List<String> webPageLinks = Arrays.asList(...)    // A list of 100 web page links

// Download contents of all the web pages asynchronously
List<CompletableFuture<String>> pageContentFutures = webPageLinks.stream()
        .map(webPageLink -> downloadWebPage(webPageLink))
        .collect(Collectors.toList());


// Create a combined Future using allOf()
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
        pageContentFutures.toArray(new CompletableFuture[pageContentFutures.size()])
);

使用CompletableFuture.allOf()的問題是它返回CompletableFuture<Void>。可是咱們能夠經過寫一些額外的代碼來獲取全部封裝的CompletableFuture結果。

// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list -
CompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v -> {
   return pageContentFutures.stream()
           .map(pageContentFuture -> pageContentFuture.join())
           .collect(Collectors.toList());
});

花一些時間理解下以上代碼片斷。當全部future完成的時候,咱們調用了future.join(),所以咱們不會在任何地方阻塞。

join()方法和get()方法很是相似,這惟一不一樣的地方是若是最頂層的CompletableFuture完成的時候發生了異常,它會拋出一個未經檢查的異常。

如今讓咱們計算包含關鍵字頁面的數量。

// Count the number of web pages having the "CompletableFuture" keyword.
CompletableFuture<Long> countFuture = allPageContentsFuture.thenApply(pageContents -> {
    return pageContents.stream()
            .filter(pageContent -> pageContent.contains("CompletableFuture"))
            .count();
});

System.out.println("Number of Web Pages having CompletableFuture keyword - " + 
        countFuture.get());

2. CompletableFuture.anyOf()

CompletableFuture.anyOf()和其名字介紹的同樣,當任何一個CompletableFuture完成的時候【相同的結果類型】,返回一個新的CompletableFuture。如下示例:

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 1";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 2";
});

CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 3";
});

CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);

System.out.println(anyOfFuture.get()); // Result of Future 2

在以上示例中,當三個中的任何一個CompletableFuture完成, anyOfFuture就會完成。由於future2的休眠時間最少,所以她最早完成,最終的結果將是future2的結果。

CompletableFuture.anyOf()傳入一個Future可變參數,返回CompletableFuture<Object>。CompletableFuture.anyOf()的問題是若是你的CompletableFuture返回的結果是不一樣類型的,這時候你講會不知道你最終CompletableFuture是什麼類型。

CompletableFuture 異常處理

咱們探尋了怎樣建立CompletableFuture,轉換它們,並組合多個CompletableFuture。如今讓咱們弄明白當發生錯誤的時候咱們應該怎麼作。

首先讓咱們明白在一個回調鏈中錯誤是怎麼傳遞的。思考下如下回調鏈:

CompletableFuture.supplyAsync(() -> {
    // Code which might throw an exception
    return "Some result";
}).thenApply(result -> {
    return "processed result";
}).thenApply(result -> {
    return "result after further processing";
}).thenAccept(result -> {
    // do something with the final result
});

若是在原始的supplyAsync()任務中發生一個錯誤,這時候沒有任何thenApply會被調用而且future將以一個異常結束。若是在第一個thenApply發生錯誤,這時候第二個和第三個將不會被調用,一樣的,future將以異常結束。

1. 使用 exceptionally() 回調處理異常
exceptionally()回調給你一個從原始Future中生成的錯誤恢復的機會。你能夠在這裏記錄這個異常並返回一個默認值。

Integer age = -1;

CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
    if(age < 0) {
        throw new IllegalArgumentException("Age can not be negative");
    }
    if(age > 18) {
        return "Adult";
    } else {
        return "Child";
    }
}).exceptionally(ex -> {
    System.out.println("Oops! We have an exception - " + ex.getMessage());
    return "Unknown!";
});

System.out.println("Maturity : " + maturityFuture.get());

2. 使用 handle() 方法處理異常
API提供了一個更通用的方法 - handle()從異常恢復,不管一個異常是否發生它都會被調用。

Integer age = -1;

CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
    if(age < 0) {
        throw new IllegalArgumentException("Age can not be negative");
    }
    if(age > 18) {
        return "Adult";
    } else {
        return "Child";
    }
}).handle((res, ex) -> {
    if(ex != null) {
        System.out.println("Oops! We have an exception - " + ex.getMessage());
        return "Unknown!";
    }
    return res;
});

System.out.println("Maturity : " + maturityFuture.get());

若是異常發生,res參數將是 null,不然,ex將是 null。

相關文章
相關標籤/搜索