這篇博客回顧JAVA8的CompletionStage
API以及其在JAVA庫中的標準實現CompletableFuture
。將會經過幾個例子來展現API的各類行爲。java
由於CompletableFuture
是CompletionInterface
接口的實現,因此咱們首先要了解該接口的契約。它表明某個同步或異步計算的一個階段。你能夠把它理解爲是一個爲了產生有價值最終結果的計算的流水線上的一個單元。這意味着多個ComletionStage
指令能夠連接起來從而一個階段的完成能夠觸發下一個階段的執行。面試
除了實現了CompletionStage
接口,Completion
還繼承了Future
,這個接口用於實現一個未開始的異步事件。由於可以顯式的完成Future
,因此取名爲CompletableFuture
。微信
這個簡單的示例中建立了一個已經完成的預先設置好結果的CompletableFuture
。一般做爲計算的起點階段。app
static void completedFutureExample() { CompletableFuture cf = CompletableFuture.completedFuture("message"); assertTrue(cf.isDone()); assertEquals("message", cf.getNow(null)); }
getNow
方法會返回完成後的結果(這裏就是message),若是還未完成,則返回傳入的默認值null
。dom
下面的例子解釋瞭如何建立一個異步運行Runnable
的stage。異步
static void runAsyncExample() { CompletableFuture cf = CompletableFuture.runAsync(() -> { assertTrue(Thread.currentThread().isDaemon()); randomSleep(); }); assertFalse(cf.isDone()); sleepEnough(); assertTrue(cf.isDone()); }
這個例子想要說明兩個事情:ide
CompletableFuture
中以Async
爲結尾的方法將會異步執行Executor
的狀況下),異步執行會使用ForkJoinPool
實現,該線程池使用一個後臺線程來執行Runnable
任務。注意這只是特定於CompletableFuture
實現,其它的CompletableStage
實現能夠重寫該默認行爲。下面的例子引用了第一個例子中已經完成的CompletableFuture
,它將引用生成的字符串結果並將該字符串大寫。ui
static void thenApplyExample() { CompletableFuture cf = CompletableFuture.completedFuture("message").thenApply(s -> { assertFalse(Thread.currentThread().isDaemon()); return s.toUpperCase(); }); assertEquals("MESSAGE", cf.getNow(null)); }
這裏的關鍵詞是thenApply
:this
then
是指在當前階段正常執行完成後(正常執行是指沒有拋出異常)進行的操做。在本例中,當前階段已經完成並獲得值message
。Apply
是指將一個Function
做用於以前階段得出的結果Function
是阻塞的,這意味着只有當大寫操做執行完成以後纔會執行getNow()
方法。spa
經過在方法後面添加Async
後綴,該CompletableFuture
鏈將會異步執行(使用ForkJoinPool.commonPool())
static void thenApplyAsyncExample() { CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> { assertTrue(Thread.currentThread().isDaemon()); randomSleep(); return s.toUpperCase(); }); assertNull(cf.getNow(null)); assertEquals("MESSAGE", cf.join()); }
異步方法的一個好處是能夠提供一個Executor
來執行CompletableStage
。這個例子展現瞭如何使用一個固定大小的線程池來實現大寫操做。
static ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() { int count = 1; @Override public Thread newThread(Runnable runnable) { return new Thread(runnable, "custom-executor-" + count++); } }); static void thenApplyAsyncWithExecutorExample() { CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> { assertTrue(Thread.currentThread().getName().startsWith("custom-executor-")); assertFalse(Thread.currentThread().isDaemon()); randomSleep(); return s.toUpperCase(); }, executor); assertNull(cf.getNow(null)); assertEquals("MESSAGE", cf.join()); }
若是下一個Stage接收了當前Stage的結果可是在計算中無需返回值(好比其返回值爲void),那麼它將使用方法thenAccept
並傳入一個Consumer
接口。
static void thenAcceptExample() { StringBuilder result = new StringBuilder(); CompletableFuture.completedFuture("thenAccept message") .thenAccept(s -> result.append(s)); assertTrue("Result was empty", result.length() > 0); }
Consumer
將會同步執行,因此咱們無需在返回的CompletableFuture
上執行join操做。
一樣,使用Asyn後綴實現:
static void thenAcceptAsyncExample() { StringBuilder result = new StringBuilder(); CompletableFuture cf = CompletableFuture.completedFuture("thenAcceptAsync message") .thenAcceptAsync(s -> result.append(s)); cf.join(); assertTrue("Result was empty", result.length() > 0); }
咱們如今來模擬一個出現異常的場景。爲了簡潔性,咱們仍是將一個字符串大寫,可是咱們會模擬延時進行該操做。咱們會使用thenApplyAsyn(Function, Executor)
,第一個參數是大寫轉化方法,第二個參數是一個延時executor,它會延時一秒鐘再將操做提交給ForkJoinPool
。
static void completeExceptionallyExample() { CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase, CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)); CompletableFuture exceptionHandler = cf.handle((s, th) -> { return (th != null) ? "message upon cancel" : ""; }); cf.completeExceptionally(new RuntimeException("completed exceptionally")); assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally()); try { cf.join(); fail("Should have thrown an exception"); } catch(CompletionException ex) { // just for testing assertEquals("completed exceptionally", ex.getCause().getMessage()); } assertEquals("message upon cancel", exceptionHandler.join()); }
message
的CompletableFuture
對象。而後咱們調用thenApplyAsync
方法,該方法會返回一個新的CompletableFuture
。這個方法用異步的方式執行大寫操做。這裏還展現瞭如何使用delayedExecutor(timeout, timeUnit)
方法來延時異步操做。exceptionHandler
,這個階段會處理一切異常並返回另外一個消息message upon cancel
。CompletionException
。它還會觸發handler
階段。API補充:
<U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)
返回一個新的CompletionStage,不管以前的Stage是否正常運行完畢。傳入的參數包括上一個階段的結果和拋出異常。
和計算時異常處理很類似,咱們能夠經過Future
接口中的cancel(boolean mayInterruptIfRunning)
來取消計算。
static void cancelExample() { CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase, CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)); CompletableFuture cf2 = cf.exceptionally(throwable -> "canceled message"); assertTrue("Was not canceled", cf.cancel(true)); assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally()); assertEquals("canceled message", cf2.join()); }
API補充
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
返回一個新的CompletableFuture,若是出現異常,則爲該方法中執行的結果,不然就是正常執行的結果。
下面的例子建立了一個CompletableFuture
對象並將Function
做用於已完成的兩個Stage中的任意一個(沒有保證哪個將會傳遞給Function)。這兩個階段分別以下:一個將字符串大寫,另外一個小寫。
static void applyToEitherExample() { String original = "Message"; CompletableFuture cf1 = CompletableFuture.completedFuture(original) .thenApplyAsync(s -> delayedUpperCase(s)); CompletableFuture cf2 = cf1.applyToEither( CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)), s -> s + " from applyToEither"); assertTrue(cf2.join().endsWith(" from applyToEither")); }
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T,U> fn)
返回一個全新的CompletableFuture,包含着this或是other操做完成以後,在兩者中的任意一個執行fn
和前一個例子相似,將Function
替換爲Consumer
static void acceptEitherExample() { String original = "Message"; StringBuilder result = new StringBuilder(); CompletableFuture cf = CompletableFuture.completedFuture(original) .thenApplyAsync(s -> delayedUpperCase(s)) .acceptEither(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)), s -> result.append(s).append("acceptEither")); cf.join(); assertTrue("Result was empty", result.toString().endsWith("acceptEither")); }
注意這裏的兩個Stage都是同步運行的,第一個stage將字符串轉化爲大寫以後,第二個stage將其轉化爲小寫。
static void runAfterBothExample() { String original = "Message"; StringBuilder result = new StringBuilder(); CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).runAfterBoth( CompletableFuture.completedFuture(original).thenApply(String::toLowerCase), () -> result.append("done")); assertTrue("Result was empty", result.length() > 0); }
Biconsumer支持同時對兩個Stage的結果進行操做。
static void thenAcceptBothExample() { String original = "Message"; StringBuilder result = new StringBuilder(); CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).thenAcceptBoth( CompletableFuture.completedFuture(original).thenApply(String::toLowerCase), (s1, s2) -> result.append(s1 + s2)); assertEquals("MESSAGEmessage", result.toString()); }
若是CompletableFuture
想要合併兩個階段的結果而且返回值,咱們可使用方法thenCombine
。這裏的計算流都是同步的,因此最後的getNow()
方法會得到最終結果,即大寫操做和小寫操做的結果的拼接。
static void thenCombineExample() { String original = "Message"; CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s)) .thenCombine(CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)), (s1, s2) -> s1 + s2); assertEquals("MESSAGEmessage", cf.getNow(null)); }
和以前的例子相似,只是這裏用了不一樣的方法:即兩個階段的操做都是異步的。那麼thenCombine
也會異步執行,及時它沒有Async後綴。
static void thenCombineAsyncExample() { String original = "Message"; CompletableFuture cf = CompletableFuture.completedFuture(original) .thenApplyAsync(s -> delayedUpperCase(s)) .thenCombine(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)), (s1, s2) -> s1 + s2); assertEquals("MESSAGEmessage", cf.join()); }
咱們可使用thenCompose
來完成前兩個例子中的操做。
static void thenComposeExample() { String original = "Message"; CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s)) .thenCompose(upper -> CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)) .thenApply(s -> upper + s)); assertEquals("MESSAGEmessage", cf.join()); }
static void anyOfExample() { StringBuilder result = new StringBuilder(); List messages = Arrays.asList("a", "b", "c"); List<CompletableFuture> futures = messages.stream() .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s))) .collect(Collectors.toList()); CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((res, th) -> { if(th == null) { assertTrue(isUpperCase((String) res)); result.append(res); } }); assertTrue("Result was empty", result.length() > 0); }
static void allOfExample() { StringBuilder result = new StringBuilder(); List messages = Arrays.asList("a", "b", "c"); List<CompletableFuture> futures = messages.stream() .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s))) .collect(Collectors.toList()); CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])) .whenComplete((v, th) -> { futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null)))); result.append("done"); }); assertTrue("Result was empty", result.length() > 0); }
static void allOfAsyncExample() { StringBuilder result = new StringBuilder(); List messages = Arrays.asList("a", "b", "c"); List<CompletableFuture> futures = messages.stream() .map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s))) .collect(Collectors.toList()); CompletableFuture allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])) .whenComplete((v, th) -> { futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null)))); result.append("done"); }); allOf.join(); assertTrue("Result was empty", result.length() > 0); }
下面展現了一個實踐CompletableFuture的場景:
cars()
方法異步得到Car
列表。它將會返回一個CompletionStage<List<Car>>
。cars()
方法應當使用一個遠程的REST端點來實現。rating(manufactureId)
來異步獲取每輛車的評分。allOf()
來進入最終Stage,它將在這兩個階段完成後執行whenComplete()
,打印出車輛的評分。cars().thenCompose(cars -> { List<CompletionStage> updatedCars = cars.stream() .map(car -> rating(car.manufacturerId).thenApply(r -> { car.setRating(r); return car; })).collect(Collectors.toList()); CompletableFuture done = CompletableFuture .allOf(updatedCars.toArray(new CompletableFuture[updatedCars.size()])); return done.thenApply(v -> updatedCars.stream().map(CompletionStage::toCompletableFuture) .map(CompletableFuture::join).collect(Collectors.toList())); }).whenComplete((cars, th) -> { if (th == null) { cars.forEach(System.out::println); } else { throw new RuntimeException(th); } }).toCompletableFuture().join();
Java CompletableFuture 詳解
Guide To CompletableFuture
想要了解更多開發技術,面試教程以及互聯網公司內推,歡迎關注個人微信公衆號!將會不按期的發放福利哦~