Java8 CompletableFuture功能講解

這幾天一直在學習有關CompletableFuture有關的只是,下面的總結非常不錯,故zhuanhtml

Future是Java 5添加的類,用來描述一個異步計算的結果。你可使用isDone方法檢查計算是否完成,或者使用get阻塞住調用線程,直到計算完成返回結果,你也可使用cancel方法中止任務的執行。java

BasicFuture {
        (String[] args) ExecutionExceptionInterruptedException {
            ExecutorService es = Executors.newFixedThreadPool()Future<Integer> f = es.submit(() ->{
                })f.get()}
    }


雖然Future以及相關使用方法提供了異步執行任務的能力,可是對於結果的獲取倒是很不方便,只能經過阻塞或者輪詢的方式獲得任務的結果。阻塞的方式顯然和咱們的異步編程的初衷相違背,輪詢的方式又會耗費無謂的CPU資源,並且也不能及時地獲得計算結果,爲何不能用觀察者設計模式當計算結果完成及時通知監聽者呢?git

不少語言,好比Node.js,採用回調的方式實現異步編程。Java的一些框架,好比Netty,本身擴展了Java的 Future接口,提供了addListener等多個擴展方法:github


ChannelFuture = bootstrap.connect(InetSocketAddress(hostport)).addListener(()
{
    operationComplete(ChannelFuture ) Exception
    {
        (.isSuccess()) {
            }
        {
            }
    }
})



Google guava也提供了通用的擴展Future:ListenableFutureSettableFuture 以及輔助類Futures等,方便異步編程。web

String = ...inFlight.add(name)ListenableFuture<> future = service.query(name)future.addListener(Runnable() {
run() {
        processedCount.incrementAndGet()inFlight.remove(name)lastProcessed.set(name)logger.info(name)}
}executor)


Scala也提供了簡單易用且功能強大的Future/Promise異步編程模式編程

做爲正統的Java類庫,是否是應該作點什麼,增強一下自身庫的功能呢?bootstrap

在Java 8中, 新增長了一個包含50個方法左右的類: CompletableFuture,提供了很是強大的Future的擴展功能,能夠幫助咱們簡化異步編程的複雜性,提供了函數式編程的能力,能夠經過回調的方式處理計算結果,而且提供了轉換和組合CompletableFuture的方法。設計模式

下面咱們就看一看它的功能吧。
api

主動完成計算

CompletableFuture類實現了CompletionStageFuture接口,因此你仍是能夠像之前同樣經過阻塞或者輪詢的方式得到結果,儘管這種方式不推薦使用。oracle

T    ()
T   (timeoutTimeUnit unit)
T   (T valueIfAbsent)
T   ()

getNow有點特殊,若是結果已經計算完則返回結果或者拋出異常,不然返回給定的valueIfAbsent值。
join返回計算的結果或者拋出一個unchecked異常(CompletionException),它和get對拋出的異常的處理有些細微的區別,你能夠運行下面的代碼進行比較:

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
   
int i = 1/0;
   return 100;
});
//future.join();
future.get();


儘管Future能夠表明在另外的線程中執行的一段異步代碼,可是你仍是能夠在自己線程中執行:

public static CompletableFuture<Integer> compute() {
   final CompletableFuture<Integer> future = new CompletableFuture<>();
   return future;
}


上面的代碼中future沒有關聯任何的Callback、線程池、異步任務等,若是客戶端調用future.get就會一致傻等下去。你能夠經過下面的代碼完成一個計算,觸發客戶端的等待:

f.complete(100);

固然你也能夠拋出一個異常,而不是一個成功的計算結果:


f.completeExceptionally(new Exception());

完整的代碼以下:


}


能夠看到咱們並無把f.complete(100);放在另外的線程中去執行,可是在大部分狀況下咱們可能會用一個線程池去執行這些異步任務。CompletableFuture.complete()CompletableFuture.completeExceptionally只能被調用一次。可是咱們有兩個後門方法能夠重設這個值:obtrudeValueobtrudeException,可是使用的時候要當心,由於complete已經觸發了客戶端,有可能致使客戶端會獲得不指望的結果。

建立CompletableFuture對象。

CompletableFuture.completedFuture是一個靜態輔助方法,用來返回一個已經計算好的CompletableFuture


public static <U> CompletableFuture<U> completedFuture(U value)

而如下四個靜態方法用來爲一段異步執行的代碼建立CompletableFuture對象:



CompletableFuture<Void>   (Runnable runnable)
CompletableFuture<Void>  (Runnable runnableExecutor executor)
<> CompletableFuture<>     (Supplier<> supplier)
<> CompletableFuture<>     (Supplier<> supplierExecutor executor)

Async結尾而且沒有指定Executor的方法會使用ForkJoinPool.commonPool()做爲它的線程池執行異步代碼。


runAsync方法也好理解,它以Runnable函數式接口類型爲參數,因此CompletableFuture的計算結果爲空。

supplyAsync方法以Supplier<U>函數式接口類型爲參數,CompletableFuture的計算結果類型爲U

由於方法的參數類型都是函數式接口,因此可使用lambda表達式實現異步任務,好比:


CompletableFuture<String> = CompletableFuture.(() -> {
    })

計算結果完成時的處理

CompletableFuture的計算結果完成,或者拋出異常的時候,咱們能夠執行特定的Action。主要是下面的方法:


CompletableFuture<T>     (BiConsumer<? T? Throwable> action)
CompletableFuture<T>    (BiConsumer<? T? Throwable> action)
CompletableFuture<T>    (BiConsumer<? T? Throwable> actionExecutor executor)
CompletableFuture<T>     (Function<Throwable? T> fn)

能夠看到Action的類型是BiConsumer<? super T,? super Throwable>,它能夠處理正常的計算結果,或者異常狀況。
方法不以Async結尾,意味着Action使用相同的線程執行,而Async可能會使用其它的線程去執行(若是使用相同的線程池,也可能會被同一個線程選中執行)。

注意這幾個方法都會返回CompletableFuture,當Action執行完畢後它的結果返回原始的CompletableFuture的計算結果或者返回異常。


Main {
    Random = Random()= System.()() {
        System..println(){
            Thread.()} (InterruptedException e) {
            RuntimeException(e)}
        System..println(+ (System.() - )/+ ).nextInt()}
    (String[] args) Exception {
        CompletableFuture<Integer> future = CompletableFuture.(Main::)Future<Integer> f = future.whenComplete((ve) -> {
            System..println(v)System..println(e)})System..println(f.get())System..read()}
}

exceptionally方法返回一個新的CompletableFuture,當原始的CompletableFuture拋出異常的時候,就會觸發這個CompletableFuture的計算,調用function計算值,不然若是原始的CompletableFuture正常計算完後,這個新的CompletableFuture也計算完成,它的值和原始的CompletableFuture的計算的值相同。也就是這個exceptionally方法用來處理異常的狀況。

下面一組方法雖然也返回CompletableFuture對象,可是對象的值和原來的CompletableFuture計算的值不一樣。當原先的CompletableFuture的值計算完成或者拋出異常的時候,會觸發這個CompletableFuture對象的計算,結果由BiFunction參數計算而得。所以這組方法兼有whenComplete和轉換的兩個功能。


<> CompletableFuture<>     (BiFunction<? TThrowable? > fn)
<> CompletableFuture<>    (BiFunction<? TThrowable? > fn)
<> CompletableFuture<>    (BiFunction<? TThrowable? > fnExecutor executor)

一樣,不以Async結尾的方法由原來的線程計算,以Async結尾的方法由默認的線程池ForkJoinPool.commonPool()或者指定的線程池executor運行。

轉換

CompletableFuture能夠做爲monad(單子)和functor。因爲回調風格的實現,咱們沒必要由於等待一個計算完成而阻塞着調用線程,而是告訴CompletableFuture當計算完成的時候請執行某個function。並且咱們還能夠將這些操做串聯起來,或者將CompletableFuture組合起來。


<> CompletableFuture<>     (Function<? T? > fn)
<> CompletableFuture<>    (Function<? T? > fn)
<> CompletableFuture<>    (Function<? T? > fnExecutor executor)

這一組函數的功能是當原來的CompletableFuture計算完後,將結果傳遞給函數fn,將fn的結果做爲新的CompletableFuture計算結果。所以它的功能至關於將CompletableFuture<T>轉換成CompletableFuture<U>


這三個函數的區別和上面介紹的同樣,不以Async結尾的方法由原來的線程計算,以Async結尾的方法由默認的線程池ForkJoinPool.commonPool()或者指定的線程池executor運行。Java的CompletableFuture類老是遵循這樣的原則,下面就不一一贅述了。

使用例子以下:

CompletableFuture<Integer> = CompletableFuture.(() -> {
    })CompletableFuture<String> =  .thenApplyAsync(i -> i * ).thenApply(i -> i.toString())System.out.println(f.get())

須要注意的是,這些轉換並非立刻執行的,也不會阻塞,而是在前一個stage完成後繼續執行。

它們與handle方法的區別在於handle方法會處理正常計算值和異常,所以它能夠屏蔽異常,避免異常繼續拋出。而thenApply方法只是用來處理正常值,所以一旦有異常就會拋出。

純消費(執行Action)

上面的方法是當計算完成的時候,會生成新的計算結果(thenApplyhandle),或者返回一樣的計算結果whenCompleteCompletableFuture還提供了一種處理結果的方法,只對結果執行Action,而不返回新的計算值,所以計算值爲Void:


CompletableFuture<Void>  (Consumer<? T> action)
CompletableFuture<Void>     (Consumer<? T> action)
CompletableFuture<Void>     (Consumer<? T> actionExecutor executor)

看它的參數類型也就明白了,它們是函數式接口Consumer,這個接口只有輸入,沒有返回值。



CompletableFuture<Integer> = CompletableFuture.(() -> {
    })CompletableFuture<Void> =  .thenAccept(System.::println)System.out.println(f.get())

thenAcceptBoth以及相關方法提供了相似的功能,當兩個CompletionStage都正常完成計算的時候,就會執行提供的action,它用來組合另一個異步的結果。
runAfterBoth是當兩個CompletionStage都正常完成計算的時候,執行一個Runnable,這個Runnable並不使用計算的結果。


<> CompletableFuture<Void>  (CompletionStage<? > otherBiConsumer<? T? > action)
<> CompletableFuture<Void>     (CompletionStage<? > otherBiConsumer<? T? > action)
<> CompletableFuture<Void>     (CompletionStage<? > otherBiConsumer<? T? > actionExecutor executor)
CompletableFuture<Void>     (CompletionStage<?> otherRunnable action)

例子以下:


;

更完全地,下面一組方法當計算完成的時候會執行一個Runnable,與thenAccept不一樣,Runnable並不使用CompletableFuture計算的結果。


CompletableFuture<Void>  (Runnable action)
CompletableFuture<Void>     (Runnable action)
CompletableFuture<Void>     (Runnable actionExecutor executor)

所以先前的CompletableFuture計算的結果被忽略了,這個方法返回CompletableFuture<Void>類型的對象。



CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
   return 100;
});
CompletableFuture<Void> f =  future.thenRun(() -> System.out.println("finished"));
System.out.println(f.get())
;

所以,你能夠根據方法的參數的類型來加速你的記憶。Runnable類型的參數會忽略計算的結果,Consumer是純消費計算結果,BiConsumer會組合另一個CompletionStage純消費,Function會對計算結果作轉換,BiFunction會組合另一個CompletionStage的計算結果作轉換。

組合


<> CompletableFuture<>     (Function<? T? CompletionStage<>> fn)
<> CompletableFuture<>    (Function<? T? CompletionStage<>> fn)
<> CompletableFuture<>    (Function<? T? CompletionStage<>> fnExecutor executor)

這一組方法接受一個Function做爲參數,這個Function的輸入是當前的CompletableFuture的計算值,返回結果將是一個新的CompletableFuture,這個新的CompletableFuture會組合原來的CompletableFuture和函數返回的CompletableFuture。所以它的功能相似:

A +--> B +---> C

記住,thenCompose返回的對象並不一是函數fn返回的對象,若是原來的CompletableFuture尚未計算出來,它就會生成一個新的組合後的CompletableFuture。

例子:


CompletableFuture<Integer> = CompletableFuture.(() -> {
    })CompletableFuture<String> =  .thenCompose( i -> {
    CompletableFuture.(() -> {
        (* ) + })})System.out.println(f.get())

而下面的一組方法thenCombine用來複合另一個CompletionStage的結果。它的功能相似

兩個CompletionStage是並行執行的,它們之間並無前後依賴順序,other並不會等待先前的CompletableFuture執行完畢後再執行。


<> CompletableFuture<>   (CompletionStage<? > otherBiFunction<? T? ? > fn)
<> CompletableFuture<>  (CompletionStage<? > otherBiFunction<? T? ? > fn)
<> CompletableFuture<>  (CompletionStage<? > otherBiFunction<? T? ? > fnExecutor executor)

其實從功能上來說,它們的功能更相似thenAcceptBoth,只不過thenAcceptBoth是純消費,它的函數參數沒有返回值,而thenCombine的函數參數fn有返回值。

CompletableFuture<Integer> = CompletableFuture.(() -> {
    })CompletableFuture<String> = CompletableFuture.(() -> {
    })CompletableFuture<String> =  .thenCombine((xy) -> y + + x)System.out.println(f.get())

Either

thenAcceptBothrunAfterBoth是當兩個CompletableFuture都計算完成,而咱們下面要了解的方法是當任意一個CompletableFuture計算完成的時候就會執行。

CompletableFuture<Void>  (CompletionStage<? T> otherConsumer<? T> action)
CompletableFuture<Void>     (CompletionStage<? T> otherConsumer<? T> action)
CompletableFuture<Void>     (CompletionStage<? T> otherConsumer<? T> actionExecutor executor)
<> CompletableFuture<>    (CompletionStage<? T> otherFunction<? T> fn)
<> CompletableFuture<>    (CompletionStage<? T> otherFunction<? T> fn)
<> CompletableFuture<>    (CompletionStage<? T> otherFunction<? T> fnExecutor executor)

acceptEither方法是當任意一個CompletionStage完成的時候,action這個消費者就會被執行。這個方法返回CompletableFuture<Void>

applyToEither方法是當任意一個CompletionStage完成的時候,fn會被執行,它的返回值會看成新的CompletableFuture<U>的計算結果。

下面這個例子有時會輸出100,有時候會輸出200,哪一個Future先完成就會根據它的結果計算。


Random = Random()CompletableFuture<Integer> = CompletableFuture.(() -> {
    {
        Thread.(+ .nextInt())} (InterruptedException e) {
        e.printStackTrace()}
    })CompletableFuture<Integer> = CompletableFuture.(() -> {
    {
        Thread.(+ .nextInt())} (InterruptedException e) {
        e.printStackTrace()}
    })CompletableFuture<String> =  .applyToEither(i -> i.toString())

輔助方法 allOf 和 anyOf

前面咱們已經介紹了幾個靜態方法:completedFuturerunAsyncsupplyAsync,下面介紹的這兩個方法用來組合多個CompletableFuture。


CompletableFuture<Void>       (CompletableFuture<?>... cfs)
CompletableFuture<Object>    (CompletableFuture<?>… cfs)


allOf方法是當全部的CompletableFuture都執行完後執行計算。

anyOf方法是當任意一個CompletableFuture執行完後就會執行計算,計算的結果相同。

下面的代碼運行結果有時是100,有時是"abc"。可是anyOfapplyToEither不一樣。anyOf接受任意多的CompletableFuture可是applyToEither只是判斷兩個CompletableFuture,anyOf返回值的計算結果是參數中其中一個CompletableFuture的計算結果,applyToEither返回值的計算結果倒是要通過fn處理的。固然還有靜態方法的區別,線程池的選擇等。

Random = Random()CompletableFuture<Integer> = CompletableFuture.(() -> {
    {
        Thread.(+ .nextInt())} (InterruptedException e) {
        e.printStackTrace()}
    })CompletableFuture<String> = CompletableFuture.(() -> {
    {
        Thread.(+ .nextInt())} (InterruptedException e) {
        e.printStackTrace()}
    })CompletableFuture<Object> =  CompletableFuture.()System.out.println(f.get())

我想經過上面的介紹,應該把CompletableFuture的方法和功能介紹完了(cancelisCompletedExceptionally()isDone()以及繼承於Object的方法無需介紹了, toCompletableFuture()返回CompletableFuture自己),但願你能全面瞭解CompletableFuture強大的功能,並將它應用到Java的異步編程中。若是你有使用它的開源項目,能夠留言分享一下。

更進一步

若是你用過Guava的Future類,你就會知道它的Futures輔助類提供了不少便利方法,用來處理多個Future,而不像Java的CompletableFuture,只提供了allOfanyOf兩個方法。 好比有這樣一個需求,將多個CompletableFuture組合成一個CompletableFuture,這個組合後的CompletableFuture的計算結果是個List,它包含前面全部的CompletableFuture的計算結果,guava的Futures.allAsList能夠實現這樣的功能,可是對於java CompletableFuture,咱們須要一些輔助方法:

<> CompletableFuture<List<>> (List<CompletableFuture<>> futures) {
    CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(CompletableFuture[futures.size()]))allDoneFuture.thenApply(v -> .stream().map(CompletableFuture::join).collect(Collectors.<>toList()))}
<> CompletableFuture<Stream<>> (Stream<CompletableFuture<>> futures) {
    List<CompletableFuture<>> futureList = futures.filter(f -> f != ).collect(Collectors.toList())(futureList)}

或者Java Future轉CompletableFuture:


<> CompletableFuture<> (Future<> futureExecutor executor) {
    CompletableFuture.(() -> {
        {
            .get()} (InterruptedException | ExecutionException e) {
            RuntimeException(e)}
    }executor)}

github有多個項目能夠實現Java CompletableFuture與其它Future (如Guava ListenableFuture)之間的轉換,如spotify/futures-extrafuture-converterscala/scala-java8-compat 等。

參考文檔

  1. Java 8: Definitive guide to CompletableFuture

  2. https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html

  3. https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html

相關文章
相關標籤/搜索