貓頭鷹的深夜翻譯:使用JAVA CompletableFuture的20例子

前言

這篇博客回顧JAVA8的CompletionStageAPI以及其在JAVA庫中的標準實現CompletableFuture。將會經過幾個例子來展現API的各類行爲。java

由於CompletableFutureCompletionInterface接口的實現,因此咱們首先要了解該接口的契約。它表明某個同步或異步計算的一個階段。你能夠把它理解爲是一個爲了產生有價值最終結果的計算的流水線上的一個單元。這意味着多個ComletionStage指令能夠連接起來從而一個階段的完成能夠觸發下一個階段的執行。面試

除了實現了CompletionStage接口,Completion還繼承了Future,這個接口用於實現一個未開始的異步事件。由於可以顯式的完成Future,因此取名爲CompletableFuture微信

1.新建一個完成的CompletableFuture

這個簡單的示例中建立了一個已經完成的預先設置好結果的CompletableFuture。一般做爲計算的起點階段。app

static void completedFutureExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message");
    assertTrue(cf.isDone());
    assertEquals("message", cf.getNow(null));
}

getNow方法會返回完成後的結果(這裏就是message),若是還未完成,則返回傳入的默認值nulldom

2.運行一個簡單的異步stage

下面的例子解釋瞭如何建立一個異步運行Runnable的stage。異步

static void runAsyncExample() {
    CompletableFuture cf = CompletableFuture.runAsync(() -> {
        assertTrue(Thread.currentThread().isDaemon());
        randomSleep();
    });
    assertFalse(cf.isDone());
    sleepEnough();
    assertTrue(cf.isDone());
}

這個例子想要說明兩個事情:ide

  1. CompletableFuture中以Async爲結尾的方法將會異步執行
  2. 默認狀況下(即指沒有傳入Executor的狀況下),異步執行會使用ForkJoinPool實現,該線程池使用一個後臺線程來執行Runnable任務。注意這只是特定於CompletableFuture實現,其它的CompletableStage實現能夠重寫該默認行爲。

3.將方法做用於前一個Stage

下面的例子引用了第一個例子中已經完成的CompletableFuture,它將引用生成的字符串結果並將該字符串大寫。ui

static void thenApplyExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApply(s -> {
        assertFalse(Thread.currentThread().isDaemon());
        return s.toUpperCase();
    });
    assertEquals("MESSAGE", cf.getNow(null));
}

這裏的關鍵詞是thenApplythis

  1. then是指在當前階段正常執行完成後(正常執行是指沒有拋出異常)進行的操做。在本例中,當前階段已經完成並獲得值message
  2. Apply是指將一個Function做用於以前階段得出的結果

Function是阻塞的,這意味着只有當大寫操做執行完成以後纔會執行getNow()方法。spa

4.異步的的將方法做用於前一個Stage

經過在方法後面添加Async後綴,該CompletableFuture鏈將會異步執行(使用ForkJoinPool.commonPool())

static void thenApplyAsyncExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
        assertTrue(Thread.currentThread().isDaemon());
        randomSleep();
        return s.toUpperCase();
    });
    assertNull(cf.getNow(null));
    assertEquals("MESSAGE", cf.join());
}

使用一個自定義的Executor來異步執行該方法

異步方法的一個好處是能夠提供一個Executor來執行CompletableStage。這個例子展現瞭如何使用一個固定大小的線程池來實現大寫操做。

static ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() {
    int count = 1;
    @Override
    public Thread newThread(Runnable runnable) {
        return new Thread(runnable, "custom-executor-" + count++);
    }
});
static void thenApplyAsyncWithExecutorExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
        assertTrue(Thread.currentThread().getName().startsWith("custom-executor-"));
        assertFalse(Thread.currentThread().isDaemon());
        randomSleep();
        return s.toUpperCase();
    }, executor);
    assertNull(cf.getNow(null));
    assertEquals("MESSAGE", cf.join());
}

6.消費(Consume)前一個Stage的結果

若是下一個Stage接收了當前Stage的結果可是在計算中無需返回值(好比其返回值爲void),那麼它將使用方法thenAccept並傳入一個Consumer接口。

static void thenAcceptExample() {
    StringBuilder result = new StringBuilder();
    CompletableFuture.completedFuture("thenAccept message")
            .thenAccept(s -> result.append(s));
    assertTrue("Result was empty", result.length() > 0);
}

Consumer將會同步執行,因此咱們無需在返回的CompletableFuture上執行join操做。

7.異步執行Comsume

一樣,使用Asyn後綴實現:

static void thenAcceptAsyncExample() {
    StringBuilder result = new StringBuilder();
    CompletableFuture cf = CompletableFuture.completedFuture("thenAcceptAsync message")
            .thenAcceptAsync(s -> result.append(s));
    cf.join();
    assertTrue("Result was empty", result.length() > 0);
}

8.計算出現異常時

咱們如今來模擬一個出現異常的場景。爲了簡潔性,咱們仍是將一個字符串大寫,可是咱們會模擬延時進行該操做。咱們會使用thenApplyAsyn(Function, Executor),第一個參數是大寫轉化方法,第二個參數是一個延時executor,它會延時一秒鐘再將操做提交給ForkJoinPool

static void completeExceptionallyExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
            CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
    CompletableFuture exceptionHandler = cf.handle((s, th) -> { return (th != null) ? "message upon cancel" : ""; });
    cf.completeExceptionally(new RuntimeException("completed exceptionally"));
    assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());
    try {
        cf.join();
        fail("Should have thrown an exception");
    } catch(CompletionException ex) { // just for testing
        assertEquals("completed exceptionally", ex.getCause().getMessage());
    }
    assertEquals("message upon cancel", exceptionHandler.join());
}
  1. 首先,咱們新建了一個已經完成並帶有返回值messageCompletableFuture對象。而後咱們調用thenApplyAsync方法,該方法會返回一個新的CompletableFuture。這個方法用異步的方式執行大寫操做。這裏還展現瞭如何使用delayedExecutor(timeout, timeUnit)方法來延時異步操做。
  2. 而後咱們建立了一個handler stage,exceptionHandler,這個階段會處理一切異常並返回另外一個消息message upon cancel
  3. 最後,咱們顯式的完成第二個階段並拋出異常,它會致使進行大寫操做的階段拋出CompletionException。它還會觸發handler階段。
API補充:
<U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)
返回一個新的CompletionStage,不管以前的Stage是否正常運行完畢。傳入的參數包括上一個階段的結果和拋出異常。

9.取消計算

和計算時異常處理很類似,咱們能夠經過Future接口中的cancel(boolean mayInterruptIfRunning)來取消計算。

static void cancelExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
            CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
    CompletableFuture cf2 = cf.exceptionally(throwable -> "canceled message");
    assertTrue("Was not canceled", cf.cancel(true));
    assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());
    assertEquals("canceled message", cf2.join());
}
API補充
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
返回一個新的CompletableFuture,若是出現異常,則爲該方法中執行的結果,不然就是正常執行的結果。

10.將Function做用於兩個已完成Stage的結果之一

下面的例子建立了一個CompletableFuture對象並將Function做用於已完成的兩個Stage中的任意一個(沒有保證哪個將會傳遞給Function)。這兩個階段分別以下:一個將字符串大寫,另外一個小寫。

static void applyToEitherExample() {
    String original = "Message";
    CompletableFuture cf1 = CompletableFuture.completedFuture(original)
            .thenApplyAsync(s -> delayedUpperCase(s));
    CompletableFuture cf2 = cf1.applyToEither(
            CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
            s -> s + " from applyToEither");
    assertTrue(cf2.join().endsWith(" from applyToEither"));
}
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T,U> fn)
返回一個全新的CompletableFuture,包含着this或是other操做完成以後,在兩者中的任意一個執行fn

11.消費兩個階段的任意一個結果

和前一個例子相似,將Function替換爲Consumer

static void acceptEitherExample() {
    String original = "Message";
    StringBuilder result = new StringBuilder();
    CompletableFuture cf = CompletableFuture.completedFuture(original)
            .thenApplyAsync(s -> delayedUpperCase(s))
            .acceptEither(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
                    s -> result.append(s).append("acceptEither"));
    cf.join();
    assertTrue("Result was empty", result.toString().endsWith("acceptEither"));
}

12.在兩個階段都完成後運行Runnable

注意這裏的兩個Stage都是同步運行的,第一個stage將字符串轉化爲大寫以後,第二個stage將其轉化爲小寫。

static void runAfterBothExample() {
    String original = "Message";
    StringBuilder result = new StringBuilder();
    CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).runAfterBoth(
            CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
            () -> result.append("done"));
    assertTrue("Result was empty", result.length() > 0);
}

13.用Biconsumer接收兩個stage的結果

Biconsumer支持同時對兩個Stage的結果進行操做。

static void thenAcceptBothExample() {
    String original = "Message";
    StringBuilder result = new StringBuilder();
    CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).thenAcceptBoth(
            CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
            (s1, s2) -> result.append(s1 + s2));
    assertEquals("MESSAGEmessage", result.toString());
}

14.將Bifunction同時做用於兩個階段的結果

若是CompletableFuture想要合併兩個階段的結果而且返回值,咱們可使用方法thenCombine。這裏的計算流都是同步的,因此最後的getNow()方法會得到最終結果,即大寫操做和小寫操做的結果的拼接。

static void thenCombineExample() {
    String original = "Message";
    CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))
            .thenCombine(CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)),
                    (s1, s2) -> s1 + s2);
    assertEquals("MESSAGEmessage", cf.getNow(null));
}

15.異步將Bifunction同時做用於兩個階段的結果

和以前的例子相似,只是這裏用了不一樣的方法:即兩個階段的操做都是異步的。那麼thenCombine也會異步執行,及時它沒有Async後綴。

static void thenCombineAsyncExample() {
    String original = "Message";
    CompletableFuture cf = CompletableFuture.completedFuture(original)
            .thenApplyAsync(s -> delayedUpperCase(s))
            .thenCombine(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
                    (s1, s2) -> s1 + s2);
    assertEquals("MESSAGEmessage", cf.join());
}

16.Compose CompletableFuture

咱們可使用thenCompose來完成前兩個例子中的操做。

static void thenComposeExample() {
    String original = "Message";
    CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))
            .thenCompose(upper -> CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s))
                    .thenApply(s -> upper + s));
    assertEquals("MESSAGEmessage", cf.join());
}

17.當多個階段中有有何一個完成,即新建一個完成階段

static void anyOfExample() {
    StringBuilder result = new StringBuilder();
    List messages = Arrays.asList("a", "b", "c");
    List<CompletableFuture> futures = messages.stream()
            .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
            .collect(Collectors.toList());
    CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((res, th) -> {
        if(th == null) {
            assertTrue(isUpperCase((String) res));
            result.append(res);
        }
    });
    assertTrue("Result was empty", result.length() > 0);
}

18.當全部的階段完成,新建一個完成階段

static void allOfExample() {
    StringBuilder result = new StringBuilder();
    List messages = Arrays.asList("a", "b", "c");
    List<CompletableFuture> futures = messages.stream()
            .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
            .collect(Collectors.toList());
    CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
        .whenComplete((v, th) -> {
            futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
            result.append("done");
        });
    assertTrue("Result was empty", result.length() > 0);
}

19.當全部階段完成之後,新建一個異步完成階段

static void allOfAsyncExample() {
    StringBuilder result = new StringBuilder();
    List messages = Arrays.asList("a", "b", "c");
    List<CompletableFuture> futures = messages.stream()
            .map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s)))
            .collect(Collectors.toList());
    CompletableFuture allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
            .whenComplete((v, th) -> {
                futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
                result.append("done");
            });
    allOf.join();
    assertTrue("Result was empty", result.length() > 0);
}

20.真實場景

下面展現了一個實踐CompletableFuture的場景:

  1. 先經過調用cars()方法異步得到Car列表。它將會返回一個CompletionStage<List<Car>>cars()方法應當使用一個遠程的REST端點來實現。
  2. 咱們將該Stage和另外一個Stage組合,另外一個Stage會經過調用rating(manufactureId)來異步獲取每輛車的評分。
  3. 當全部的Car對象都填入評分後,咱們調用allOf()來進入最終Stage,它將在這兩個階段完成後執行
  4. 在最終Stage上使用whenComplete(),打印出車輛的評分。
cars().thenCompose(cars -> {
    List<CompletionStage> updatedCars = cars.stream()
            .map(car -> rating(car.manufacturerId).thenApply(r -> {
                car.setRating(r);
                return car;
            })).collect(Collectors.toList());
    CompletableFuture done = CompletableFuture
            .allOf(updatedCars.toArray(new CompletableFuture[updatedCars.size()]));
    return done.thenApply(v -> updatedCars.stream().map(CompletionStage::toCompletableFuture)
            .map(CompletableFuture::join).collect(Collectors.toList()));
}).whenComplete((cars, th) -> {
    if (th == null) {
        cars.forEach(System.out::println);
    } else {
        throw new RuntimeException(th);
    }
}).toCompletableFuture().join();

參考資料

Java CompletableFuture 詳解
Guide To CompletableFuture

clipboard.png
想要了解更多開發技術,面試教程以及互聯網公司內推,歡迎關注個人微信公衆號!將會不按期的發放福利哦~

相關文章
相關標籤/搜索