這幾天一直在學習有關CompletableFuture有關的只是,下面的總結非常不錯,故zhuanhtml
Future是Java 5添加的類,用來描述一個異步計算的結果。你可使用isDone
方法檢查計算是否完成,或者使用get
阻塞住調用線程,直到計算完成返回結果,你也可使用cancel
方法中止任務的執行。java
BasicFuture { (String[] args) ExecutionExceptionInterruptedException { ExecutorService es = Executors.newFixedThreadPool()Future<Integer> f = es.submit(() ->{ })f.get()} }
雖然Future
以及相關使用方法提供了異步執行任務的能力,可是對於結果的獲取倒是很不方便,只能經過阻塞或者輪詢的方式獲得任務的結果。阻塞的方式顯然和咱們的異步編程的初衷相違背,輪詢的方式又會耗費無謂的CPU資源,並且也不能及時地獲得計算結果,爲何不能用觀察者設計模式當計算結果完成及時通知監聽者呢?git
不少語言,好比Node.js,採用回調的方式實現異步編程。Java的一些框架,好比Netty,本身擴展了Java的 Future
接口,提供了addListener
等多個擴展方法:github
ChannelFuture = bootstrap.connect(InetSocketAddress(hostport)).addListener(() { operationComplete(ChannelFuture ) Exception { (.isSuccess()) { } { } } })
Google guava也提供了通用的擴展Future:ListenableFuture、SettableFuture 以及輔助類Futures等,方便異步編程。web
String = ...inFlight.add(name)ListenableFuture<> future = service.query(name)future.addListener(Runnable() { run() { processedCount.incrementAndGet()inFlight.remove(name)lastProcessed.set(name)logger.info(name)} }executor)
Scala也提供了簡單易用且功能強大的Future/Promise異步編程模式。編程
做爲正統的Java類庫,是否是應該作點什麼,增強一下自身庫的功能呢?bootstrap
在Java 8中, 新增長了一個包含50個方法左右的類: CompletableFuture,提供了很是強大的Future的擴展功能,能夠幫助咱們簡化異步編程的複雜性,提供了函數式編程的能力,能夠經過回調的方式處理計算結果,而且提供了轉換和組合CompletableFuture的方法。設計模式
CompletableFuture類實現了CompletionStage和Future接口,因此你仍是能夠像之前同樣經過阻塞或者輪詢的方式得到結果,儘管這種方式不推薦使用。oracle
T () T (timeoutTimeUnit unit) T (T valueIfAbsent) T ()
getNow
有點特殊,若是結果已經計算完則返回結果或者拋出異常,不然返回給定的valueIfAbsent
值。join
返回計算的結果或者拋出一個unchecked異常(CompletionException),它和get
對拋出的異常的處理有些細微的區別,你能夠運行下面的代碼進行比較:
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 1/0;
return 100;
});
//future.join();
future.get();
儘管Future能夠表明在另外的線程中執行的一段異步代碼,可是你仍是能夠在自己線程中執行:
public static CompletableFuture<Integer> compute() {
final CompletableFuture<Integer> future = new CompletableFuture<>();
return future;
}
上面的代碼中future
沒有關聯任何的Callback
、線程池、異步任務等,若是客戶端調用future.get
就會一致傻等下去。你能夠經過下面的代碼完成一個計算,觸發客戶端的等待:
f.complete(100);
固然你也能夠拋出一個異常,而不是一個成功的計算結果:
f.completeExceptionally(new Exception());
完整的代碼以下:
}
能夠看到咱們並無把f.complete(100);
放在另外的線程中去執行,可是在大部分狀況下咱們可能會用一個線程池去執行這些異步任務。CompletableFuture.complete()
、CompletableFuture.completeExceptionally
只能被調用一次。可是咱們有兩個後門方法能夠重設這個值:obtrudeValue
、obtrudeException
,可是使用的時候要當心,由於complete
已經觸發了客戶端,有可能致使客戶端會獲得不指望的結果。
CompletableFuture.completedFuture
是一個靜態輔助方法,用來返回一個已經計算好的CompletableFuture
。
public static <U> CompletableFuture<U> completedFuture(U value)
而如下四個靜態方法用來爲一段異步執行的代碼建立CompletableFuture
對象:
CompletableFuture<Void> (Runnable runnable) CompletableFuture<Void> (Runnable runnableExecutor executor) <> CompletableFuture<> (Supplier<> supplier) <> CompletableFuture<> (Supplier<> supplierExecutor executor)
以Async
結尾而且沒有指定Executor
的方法會使用ForkJoinPool.commonPool()
做爲它的線程池執行異步代碼。
runAsync
方法也好理解,它以Runnable
函數式接口類型爲參數,因此CompletableFuture
的計算結果爲空。supplyAsync
方法以Supplier<U>
函數式接口類型爲參數,CompletableFuture
的計算結果類型爲U
。
由於方法的參數類型都是函數式接口,因此可使用lambda表達式實現異步任務,好比:
CompletableFuture<String> = CompletableFuture.(() -> { })
當CompletableFuture
的計算結果完成,或者拋出異常的時候,咱們能夠執行特定的Action
。主要是下面的方法:
CompletableFuture<T> (BiConsumer<? T? Throwable> action) CompletableFuture<T> (BiConsumer<? T? Throwable> action) CompletableFuture<T> (BiConsumer<? T? Throwable> actionExecutor executor) CompletableFuture<T> (Function<Throwable? T> fn)
能夠看到Action
的類型是BiConsumer<? super T,? super Throwable>
,它能夠處理正常的計算結果,或者異常狀況。
方法不以Async
結尾,意味着Action
使用相同的線程執行,而Async
可能會使用其它的線程去執行(若是使用相同的線程池,也可能會被同一個線程選中執行)。
注意這幾個方法都會返回CompletableFuture
,當Action
執行完畢後它的結果返回原始的CompletableFuture
的計算結果或者返回異常。
Main { Random = Random()= System.()() { System..println(){ Thread.()} (InterruptedException e) { RuntimeException(e)} System..println(+ (System.() - )/+ ).nextInt()} (String[] args) Exception { CompletableFuture<Integer> future = CompletableFuture.(Main::)Future<Integer> f = future.whenComplete((ve) -> { System..println(v)System..println(e)})System..println(f.get())System..read()} }
exceptionally
方法返回一個新的CompletableFuture,當原始的CompletableFuture拋出異常的時候,就會觸發這個CompletableFuture的計算,調用function計算值,不然若是原始的CompletableFuture正常計算完後,這個新的CompletableFuture也計算完成,它的值和原始的CompletableFuture的計算的值相同。也就是這個exceptionally
方法用來處理異常的狀況。
下面一組方法雖然也返回CompletableFuture對象,可是對象的值和原來的CompletableFuture計算的值不一樣。當原先的CompletableFuture的值計算完成或者拋出異常的時候,會觸發這個CompletableFuture對象的計算,結果由BiFunction
參數計算而得。所以這組方法兼有whenComplete
和轉換的兩個功能。
<> CompletableFuture<> (BiFunction<? TThrowable? > fn) <> CompletableFuture<> (BiFunction<? TThrowable? > fn) <> CompletableFuture<> (BiFunction<? TThrowable? > fnExecutor executor)
一樣,不以Async
結尾的方法由原來的線程計算,以Async
結尾的方法由默認的線程池ForkJoinPool.commonPool()
或者指定的線程池executor
運行。
CompletableFuture
能夠做爲monad(單子)和functor。因爲回調風格的實現,咱們沒必要由於等待一個計算完成而阻塞着調用線程,而是告訴CompletableFuture
當計算完成的時候請執行某個function
。並且咱們還能夠將這些操做串聯起來,或者將CompletableFuture
組合起來。
<> CompletableFuture<> (Function<? T? > fn) <> CompletableFuture<> (Function<? T? > fn) <> CompletableFuture<> (Function<? T? > fnExecutor executor)
這一組函數的功能是當原來的CompletableFuture計算完後,將結果傳遞給函數fn
,將fn
的結果做爲新的CompletableFuture
計算結果。所以它的功能至關於將CompletableFuture<T>
轉換成CompletableFuture<U>
。
這三個函數的區別和上面介紹的同樣,不以Async
結尾的方法由原來的線程計算,以Async
結尾的方法由默認的線程池ForkJoinPool.commonPool()
或者指定的線程池executor
運行。Java的CompletableFuture類老是遵循這樣的原則,下面就不一一贅述了。
使用例子以下:
CompletableFuture<Integer> = CompletableFuture.(() -> { })CompletableFuture<String> = .thenApplyAsync(i -> i * ).thenApply(i -> i.toString())System.out.println(f.get())
須要注意的是,這些轉換並非立刻執行的,也不會阻塞,而是在前一個stage完成後繼續執行。
它們與handle
方法的區別在於handle
方法會處理正常計算值和異常,所以它能夠屏蔽異常,避免異常繼續拋出。而thenApply
方法只是用來處理正常值,所以一旦有異常就會拋出。
上面的方法是當計算完成的時候,會生成新的計算結果(thenApply
, handle
),或者返回一樣的計算結果whenComplete
,CompletableFuture
還提供了一種處理結果的方法,只對結果執行Action
,而不返回新的計算值,所以計算值爲Void
:
CompletableFuture<Void> (Consumer<? T> action) CompletableFuture<Void> (Consumer<? T> action) CompletableFuture<Void> (Consumer<? T> actionExecutor executor)
看它的參數類型也就明白了,它們是函數式接口Consumer
,這個接口只有輸入,沒有返回值。
CompletableFuture<Integer> = CompletableFuture.(() -> { })CompletableFuture<Void> = .thenAccept(System.::println)System.out.println(f.get())
thenAcceptBoth
以及相關方法提供了相似的功能,當兩個CompletionStage都正常完成計算的時候,就會執行提供的action
,它用來組合另一個異步的結果。runAfterBoth
是當兩個CompletionStage都正常完成計算的時候,執行一個Runnable,這個Runnable並不使用計算的結果。
<> CompletableFuture<Void> (CompletionStage<? > otherBiConsumer<? T? > action) <> CompletableFuture<Void> (CompletionStage<? > otherBiConsumer<? T? > action) <> CompletableFuture<Void> (CompletionStage<? > otherBiConsumer<? T? > actionExecutor executor) CompletableFuture<Void> (CompletionStage<?> otherRunnable action)
例子以下:
;
更完全地,下面一組方法當計算完成的時候會執行一個Runnable,與thenAccept
不一樣,Runnable並不使用CompletableFuture計算的結果。
CompletableFuture<Void> (Runnable action) CompletableFuture<Void> (Runnable action) CompletableFuture<Void> (Runnable actionExecutor executor)
所以先前的CompletableFuture計算的結果被忽略了,這個方法返回CompletableFuture<Void>
類型的對象。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { |
所以,你能夠根據方法的參數的類型來加速你的記憶。
Runnable
類型的參數會忽略計算的結果,Consumer
是純消費計算結果,BiConsumer
會組合另一個CompletionStage
純消費,Function
會對計算結果作轉換,BiFunction
會組合另一個CompletionStage
的計算結果作轉換。
<> CompletableFuture<> (Function<? T? CompletionStage<>> fn) <> CompletableFuture<> (Function<? T? CompletionStage<>> fn) <> CompletableFuture<> (Function<? T? CompletionStage<>> fnExecutor executor)
這一組方法接受一個Function做爲參數,這個Function的輸入是當前的CompletableFuture的計算值,返回結果將是一個新的CompletableFuture,這個新的CompletableFuture會組合原來的CompletableFuture和函數返回的CompletableFuture。所以它的功能相似:
A +--> B +---> C
記住,thenCompose
返回的對象並不一是函數fn
返回的對象,若是原來的CompletableFuture
尚未計算出來,它就會生成一個新的組合後的CompletableFuture。
例子:
CompletableFuture<Integer> = CompletableFuture.(() -> { })CompletableFuture<String> = .thenCompose( i -> { CompletableFuture.(() -> { (* ) + })})System.out.println(f.get())
而下面的一組方法thenCombine
用來複合另一個CompletionStage的結果。它的功能相似
兩個CompletionStage是並行執行的,它們之間並無前後依賴順序,other
並不會等待先前的CompletableFuture
執行完畢後再執行。
<> CompletableFuture<> (CompletionStage<? > otherBiFunction<? T? ? > fn) <> CompletableFuture<> (CompletionStage<? > otherBiFunction<? T? ? > fn) <> CompletableFuture<> (CompletionStage<? > otherBiFunction<? T? ? > fnExecutor executor)
其實從功能上來說,它們的功能更相似thenAcceptBoth
,只不過thenAcceptBoth
是純消費,它的函數參數沒有返回值,而thenCombine
的函數參數fn
有返回值。
CompletableFuture<Integer> = CompletableFuture.(() -> { })CompletableFuture<String> = CompletableFuture.(() -> { })CompletableFuture<String> = .thenCombine((xy) -> y + + x)System.out.println(f.get())
thenAcceptBoth
和runAfterBoth
是當兩個CompletableFuture都計算完成,而咱們下面要了解的方法是當任意一個CompletableFuture計算完成的時候就會執行。
CompletableFuture<Void> (CompletionStage<? T> otherConsumer<? T> action) CompletableFuture<Void> (CompletionStage<? T> otherConsumer<? T> action) CompletableFuture<Void> (CompletionStage<? T> otherConsumer<? T> actionExecutor executor) <> CompletableFuture<> (CompletionStage<? T> otherFunction<? T> fn) <> CompletableFuture<> (CompletionStage<? T> otherFunction<? T> fn) <> CompletableFuture<> (CompletionStage<? T> otherFunction<? T> fnExecutor executor)
acceptEither
方法是當任意一個CompletionStage完成的時候,action
這個消費者就會被執行。這個方法返回CompletableFuture<Void>
applyToEither
方法是當任意一個CompletionStage完成的時候,fn
會被執行,它的返回值會看成新的CompletableFuture<U>
的計算結果。
下面這個例子有時會輸出100
,有時候會輸出200
,哪一個Future先完成就會根據它的結果計算。
Random = Random()CompletableFuture<Integer> = CompletableFuture.(() -> { { Thread.(+ .nextInt())} (InterruptedException e) { e.printStackTrace()} })CompletableFuture<Integer> = CompletableFuture.(() -> { { Thread.(+ .nextInt())} (InterruptedException e) { e.printStackTrace()} })CompletableFuture<String> = .applyToEither(i -> i.toString())
allOf
和 anyOf
前面咱們已經介紹了幾個靜態方法:completedFuture
、runAsync
、supplyAsync
,下面介紹的這兩個方法用來組合多個CompletableFuture。
CompletableFuture<Void> (CompletableFuture<?>... cfs) CompletableFuture<Object> (CompletableFuture<?>… cfs)
allOf
方法是當全部的CompletableFuture
都執行完後執行計算。anyOf
方法是當任意一個CompletableFuture
執行完後就會執行計算,計算的結果相同。
下面的代碼運行結果有時是100,有時是"abc"。可是anyOf
和applyToEither
不一樣。anyOf
接受任意多的CompletableFuture可是applyToEither
只是判斷兩個CompletableFuture,anyOf
返回值的計算結果是參數中其中一個CompletableFuture的計算結果,applyToEither
返回值的計算結果倒是要通過fn
處理的。固然還有靜態方法的區別,線程池的選擇等。
Random = Random()CompletableFuture<Integer> = CompletableFuture.(() -> { { Thread.(+ .nextInt())} (InterruptedException e) { e.printStackTrace()} })CompletableFuture<String> = CompletableFuture.(() -> { { Thread.(+ .nextInt())} (InterruptedException e) { e.printStackTrace()} })CompletableFuture<Object> = CompletableFuture.()System.out.println(f.get())
我想經過上面的介紹,應該把CompletableFuture的方法和功能介紹完了(cancel
、isCompletedExceptionally()
、isDone()
以及繼承於Object的方法無需介紹了, toCompletableFuture()
返回CompletableFuture自己),但願你能全面瞭解CompletableFuture強大的功能,並將它應用到Java的異步編程中。若是你有使用它的開源項目,能夠留言分享一下。
若是你用過Guava的Future類,你就會知道它的Futures
輔助類提供了不少便利方法,用來處理多個Future,而不像Java的CompletableFuture,只提供了allOf
、anyOf
兩個方法。 好比有這樣一個需求,將多個CompletableFuture組合成一個CompletableFuture,這個組合後的CompletableFuture的計算結果是個List,它包含前面全部的CompletableFuture的計算結果,guava的Futures.allAsList
能夠實現這樣的功能,可是對於java CompletableFuture,咱們須要一些輔助方法:
<> CompletableFuture<List<>> (List<CompletableFuture<>> futures) { CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(CompletableFuture[futures.size()]))allDoneFuture.thenApply(v -> .stream().map(CompletableFuture::join).collect(Collectors.<>toList()))} <> CompletableFuture<Stream<>> (Stream<CompletableFuture<>> futures) { List<CompletableFuture<>> futureList = futures.filter(f -> f != ).collect(Collectors.toList())(futureList)}
或者Java Future轉CompletableFuture:
<> CompletableFuture<> (Future<> futureExecutor executor) { CompletableFuture.(() -> { { .get()} (InterruptedException | ExecutionException e) { RuntimeException(e)} }executor)}
github有多個項目能夠實現Java CompletableFuture與其它Future (如Guava ListenableFuture)之間的轉換,如spotify/futures-extra、future-converter、scala/scala-java8-compat 等。