如何編寫優雅的異步代碼 — CompletableFuture

前言編程

在咱們的意識裏,同步執行的程序都比較符合人們的思惟方式,而異步的東西一般都很差處理。在異步計算的狀況下,以回調錶示的動做每每會分散在代碼中,也可能相互嵌套在內部,若是須要處理其中一個步驟中可能發生的錯誤時,狀況變得更加糟糕。Java 8 引入了不少的新特性,其中就包含了 CompletableFuture 類的引入,這讓咱們編寫清晰可讀的異步代碼變得更加容易,該類功能很是強大,包含了超過 50 多個方法。。。多線程

什麼是 CompletableFuture併發

CompletableFuture 類的設計靈感來自於 Google Guava 的 ListenableFuture 類,它實現了 Future 和 CompletionStage 接口而且新增了許多方法,它支持 lambda,經過回調利用非阻塞方法,提高了異步編程模型。它容許咱們經過在與主應用程序線程不一樣的線程上(也就是異步)運行任務,並向主線程通知任務的進度、完成或失敗,來編寫非阻塞代碼。異步

爲何要引入 CompletableFutureide

Java 的 1.5 版本引入了 Future,你能夠把它簡單的理解爲運算結果的佔位符,它提供了兩個方法來獲取運算結果。異步編程

get():調用該方法線程將會無限期等待運算結果。函數

get(long timeout, TimeUnit unit):調用該方法線程將僅在指定時間 timeout 內等待結果,若是等待超時就會拋出 TimeoutException 異常。源碼分析

Future 可使用 Runnable 或 Callable 實例來完成提交的任務,經過其源碼能夠看出,它存在以下幾個問題:this

阻塞 調用 get() 方法會一直阻塞,直到等待直到計算完成,它沒有提供任何方法能夠在完成時通知,同時也不具備附加回調函數的功能。線程

鏈式調用和結果聚合處理 在不少時候咱們想連接多個 Future 來完成耗時較長的計算,此時須要合併結果並將結果發送到另外一個任務中,該接口很難完成這種處理。

異常處理 Future 沒有提供任何異常處理的方式。

以上這些問題在 CompletableFuture 中都已經解決了,接下來讓咱們看看如何去使用 CompletableFuture。

如何建立 CompletableFuture

最簡單的建立方式就是調用 CompletableFuture.completedFuture(U value) 方法來獲取一個已經完成的 CompletableFuture 對象。

@Test
public void testSimpleCompletableFuture() {
    CompletableFuture<String> completableFuture = CompletableFuture.completedFuture("Hello mghio");
    assertTrue(completableFuture.isDone());
    try {
        assertEquals("Hello mghio", completableFuture.get());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

須要注意的是當咱們對不完整的 CompleteableFuture 調用 get 方法的話,會因爲 Future 未完成,所以 get 調用將永遠阻塞,此時可使用 CompletableFuture.complete 方法手動完成 Future。

任務異步處理

當咱們想讓程序在後臺異步執行任務而不關心任務的處理結果時,可使用 runAsync 方法,該方法接收一個 Runnable 類型的參數返回 CompletableFuture。

@Test
public void testCompletableFutureRunAsync() {
    AtomicInteger variable = new AtomicInteger(0);
    CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> process(variable));
    runAsync.join();
    assertEquals(100, variable.get());
}

public void process(AtomicInteger variable) {
    System.out.println(Thread.currentThread() + " Process...");
    variable.set(100);
}

若是咱們想讓任務在後臺異步執行並且須要獲取任務的處理結果時,可使用 supplyAsync 方法,該方法接收一個 Supplier類型的參數返回一個 CompletableFuture。

@Test
public void testCompletableFutureSupplyAsync() {
    CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(this::process);
    try {
        assertEquals("Hello mghio", supplyAsync.get()); // Blocking
    } catch (ExecutionException | InterruptedException e) {
        e.printStackTrace();
    }
}

public String process() {
    return "Hello mghio";
}

看到這裏你可能會有個問題,上面執行 runAsync 和 supplyAsync 任務的線程是從哪裏來的、誰建立的呢?實際上它和 Java 8 中的 parallelStream 相似, CompletableFuture 也是從全局 ForkJoinPool.commonPool() 得到的線程中執行這些任務的。同時,上面的兩個方法也提供了自定義線程池去執行任務,其實你若是去了解過 CompletableFuture 的源碼的話,你會發現其 API 中的全部方法都有個重載的版本,有或沒有自定義 Executor 執行器。

@Test
public void testCompletableFutureSupplyAsyncWithExecutor() {
    ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
    CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(this::process, newFixedThreadPool);
    try {
        assertEquals("Hello mghio", supplyAsync.get()); // Blocking
    } catch (ExecutionException | InterruptedException e) {
        e.printStackTrace();
    }
}

public String process() {
    return "Hello mghio";
}

鏈式調用和結果聚合處理

咱們知道 CompletableFuture 的 get() 方法會一直阻塞直到獲取到結果,CompletableFuture 提供了 thenApply、thenAccept 和 thenRun 等方法來避免這種狀況,並且咱們還能夠添加任務完成後的回調通知。這幾個方法的使用場景以下:

thenApply 當咱們若是要在從 Future 接收值後任務以前運行自定義的業務代碼,而後要爲此任務返回一些值時,則可使用該方法

thenAccept 若是咱們但願在從 Future 接收到一些值後執行任務以前運行自定義的業務代碼而不關心返回結果值時,則可使用該方法

thenRun 若是咱們想在 Future 完成後運行自定義的業務代碼,而且不想爲此返回任何值時,則可使用該方法

@Test
public void testCompletableFutureThenApply() {
    Integer notificationId = CompletableFuture.supplyAsync(this::thenApplyProcess)
        .thenApply(this::thenApplyNotify) // Non Blocking
        .join();
    assertEquals(new Integer(1), notificationId);
}

@Test
public void testCompletableFutureThenAccept() {
    CompletableFuture.supplyAsync(this::processVariable)
        .thenAccept(this::thenAcceptNotify) // Non Blocking
        .join();
    assertEquals(100, variable.get());
}

@Test
public void testCompletableFutureThenRun() {
    CompletableFuture.supplyAsync(this::processVariable)
        .thenRun(this::thenRunNotify)
        .join();
    assertEquals(100, variable.get());
}

private String processVariable() {
    variable.set(100);
    return "success";
}

private void thenRunNotify() {
    System.out.println("thenRun completed notify ....");
}

private Integer thenApplyNotify(Integer integer) {
    return integer;
}

private void thenAcceptNotify(String s) {
    System.out.println(
    String.format("Thread %s completed notify ....", Thread.currentThread().getName()));
}

public Integer thenApplyProcess() {
    return 1;
}

若是有大量的異步計算,那麼咱們能夠繼續將值從一個回調傳遞到另外一個回調中去,也就是使用鏈式調用方式,使用方式很簡單。

@Test
public void testCompletableFutureThenApplyAccept() {
    CompletableFuture.supplyAsync(this::findAccountNumber)
        .thenApply(this::calculateBalance)
        .thenApply(this::notifyBalance)
        .thenAccept((i) -> notifyByEmail()).join();
}

private void notifyByEmail() {
    // business code
    System.out.println("send notify by email ...");
}

private Double notifyBalance(Double d) {
    // business code
    System.out.println(String.format("your balance is $%s", d));
    return 9527D;
}

private Double calculateBalance(Object o) {
    // business code
    return 9527D;
}

private Double findAccountNumber() {
    // business code
    return 9527D;
}

比較細心的朋友可能注意到在全部前面的幾個方法示例中,全部方法都是在同一線程上執行的。若是咱們但願這些任務在單獨的線程上運行時,那麼咱們可使用這些方法對應的異步版本。

@Test
public void testCompletableFutureApplyAsync() {
    ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
    ScheduledExecutorService newSingleThreadScheduledExecutor = Executors
        .newSingleThreadScheduledExecutor();
    CompletableFuture<Double> completableFuture =
        CompletableFuture
            .supplyAsync(this::findAccountNumber,
                newFixedThreadPool) // 從線程池 newFixedThreadPool 獲取線程執行任務
            .thenApplyAsync(this::calculateBalance,
                newSingleThreadScheduledExecutor)
            .thenApplyAsync(this::notifyBalance);
    Double balance = completableFuture.join();
    assertEquals(9527D, balance);
}

執行結果處理

thenCompose 方法適合有依賴性的任務處理,好比一個計算帳戶餘額的業務:首先咱們要先找到賬號,而後爲該賬戶計算餘額,而後計算完成後再發送通知。全部這些任務都是依賴前一個任務的返回 CompletableFuture 結果,此時咱們須要使用 thenCompose 方法,其實有點相似於 Java 8 流的 flatMap 操做。

@Test
public void testCompletableFutureThenCompose() {
    Double balance = this.doFindAccountNumber()
        .thenCompose(this::doCalculateBalance)
        .thenCompose(this::doSendNotifyBalance).join();
    assertEquals(9527D, balance);
}

private CompletableFuture<Double> doSendNotifyBalance(Double aDouble) {
    sleepSeconds(2);
    // business code
    System.out.println(String.format("%s doSendNotifyBalance ....", Thread.currentThread().getName()));
    return CompletableFuture.completedFuture(9527D);
}

private CompletableFuture<Double> doCalculateBalance(Double d) {
    sleepSeconds(2);
    // business code
    System.out.println(String.format("%s doCalculateBalance ....", Thread.currentThread().getName()));
    return CompletableFuture.completedFuture(9527D);
}

private CompletableFuture<Double> doFindAccountNumber() {
    sleepSeconds(2);
    // business code
    System.out.println(String.format("%s doFindAccountNumber ....", Thread.currentThread().getName()));
    return CompletableFuture.completedFuture(9527D);
}

private void sleepSeconds(int timeout) {
    try {
        TimeUnit.SECONDS.sleep(timeout);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

thenCombine 方法主要是用於合併多個獨立任務的處理結果。假設咱們須要查找一我的的姓名和住址,則可使用不一樣的任務來分別獲取,而後要得到這我的的完整信息(姓名 + 住址),則須要合併這兩種方法的結果,那麼咱們可使用 thenCombine 方法。

@Test
public void testCompletableFutureThenCombine() {
    CompletableFuture<String> thenCombine = this.findName().thenCombine(this.findAddress(), (name, address) -> name + address);
    String personInfo = thenCombine.join();
    assertEquals("mghio Shanghai, China", personInfo);
}

private CompletableFuture<String> findAddress() {
    return CompletableFuture.supplyAsync(() -> {
        sleepSeconds(2);
        // business code
        return "Shanghai, China";
    });
}

private CompletableFuture<String> findName() {
    return CompletableFuture.supplyAsync(() -> {
        sleepSeconds(2);
        // business code
        return "mghio ";
    });
}

等待多個任務執行完成

在許多狀況下,咱們但願並行運行多個任務,並在全部任務完成後再進行一些處理。假設咱們要查找 3 個不一樣用戶的姓名並將結果合併。此時就可使用 CompletableFuture 的靜態方法 allOf,該方法會等待全部任務完成,須要注意的是該方法它不會返回全部任務的合併結果,所以咱們必須手動組合任務的執行結果。

@Test
public void testCompletableFutureAllof() {
    List<CompletableFuture<String>> list = Lists.newArrayListWithCapacity(4);
    IntStream.range(0, 3).forEach(num -> list.add(findName(num)));

    CompletableFuture<Void> allFuture = CompletableFuture
        .allOf(list.toArray(new CompletableFuture[0]));

    CompletableFuture<List<String>> allFutureList = allFuture
        .thenApply(val -> list.stream().map(CompletableFuture::join).collect(Collectors.toList()));

    CompletableFuture<String> futureHavingAllValues = allFutureList
        .thenApply(fn -> String.join("", fn));

    String result = futureHavingAllValues.join();
    assertEquals("mghio0mghio1mghio2", result);
}

private CompletableFuture<String> findName(int num) {
    return CompletableFuture.supplyAsync(() -> {
        sleepSeconds(2);
        // business code
        return "mghio" + num;
    });
}

異常處理

在多線程中程序異常其實不太好處理,可是幸運的是在 CompletableFuture 中給咱們提供了很方便的異常處理方式,在咱們上面的例子代碼中:

@Test
public void testCompletableFutureThenCompose() {
    Double balance = this.doFindAccountNumber()
        .thenCompose(this::doCalculateBalance)
        .thenCompose(this::doSendNotifyBalance).join();
}

在上面的代碼中,三個方法 doFindAccountNumber、doCalculateBalance 和 doSendNotifyBalance 只要任意一個發生異常了,則以後調用的方法將不會運行。CompletableFuture 提供了三種處理異常的方式,分別是 exceptionally、handle 和 whenComplete 方法。

第一種方式是使用 exceptionally 方法處理異常,若是前面的方法失敗併發生異常,則會調用異常回調。

@Test
public void testCompletableFutureExceptionally() {
    CompletableFuture<Double> thenApply = CompletableFuture.supplyAsync(this::findAccountNumber)
        .thenApply(this::calculateBalance)
        .thenApply(this::notifyBalance)
        .exceptionally(ex -> {
            System.out.println("Exception " + ex.getMessage());
            return 0D;
        });
    Double join = thenApply.join();
    assertEquals(9527D, join);
}

第二種方式是使用 handle 方法處理異常,使用該方式處理異常比上面的 exceptionally 方式更爲靈活,咱們能夠同時獲取到異常對象和當前的處理結果。

@Test
public void testCompletableFutureHandle() {
    CompletableFuture.supplyAsync(this::findAccountNumber)
        .thenApply(this::calculateBalance)
        .thenApply(this::notifyBalance)
        .handle((ok, ex) -> {
            System.out.println("最終要運行的代碼...");
            if (ok != null) {
            System.out.println("No Exception !!");
            } else {
            System.out.println("Exception " + ex.getMessage());
            return -1D;
            }
            return ok;
        });
}

第三種是使用 whenComplete 方法處理異常。

@Test
public void testCompletableFutureWhenComplete() {
    CompletableFuture.supplyAsync(this::findAccountNumber)
        .thenApply(this::calculateBalance)
        .thenApply(this::notifyBalance)
        .whenComplete((result, ex) -> {
            System.out.println("result = " + result + ", ex = " + ex);
            System.out.println("最終要運行的代碼...");
        });
}

總結

在本文中,簡要的介紹了 CompletableFuture 類的部分方法和使用方式,這個類的方法不少同時提供的功能也很是強大,在異步編程中使用的比較多,熟悉了基本的使用方法以後要深刻了解仍是要深刻源碼分析其實現原理。

相關文章
相關標籤/搜索