計算機程序的思惟邏輯 (94) - 組合式異步編程

本系列文章經補充和完善,已修訂整理成書《Java編程的邏輯》(馬俊昌著),由機械工業出版社華章分社出版,於2018年1月上市熱銷,讀者好評如潮!各大網店和書店有售,歡迎購買:京東自營連接 html

前面兩節討論了Java 8中的函數式數據處理,那是對38節55節介紹的容器類的加強,它能夠將對集合數據的多個操做以流水線的方式組合在一塊兒。本節繼續討論Java 8的新功能,主要是一個新的類CompletableFuture,它是對65節83節介紹的併發編程的加強,它能夠方便地將多個有必定依賴關係的異步任務以流水線的方式組合在一塊兒,大大簡化多異步任務的開發。java

以前介紹了那麼多併發編程的內容,還有什麼問題不能解決?CompletableFuture到底能解決什麼問題?與以前介紹的內容有什麼關係?具體如何使用?基本原理是什麼?本節進行詳細討論,咱們先來看它要解決的問題。git

異步任務管理

在現代軟件開發中,系統功能愈來愈複雜,管理複雜度的方法就是分而治之,系統的不少功能可能會被切分爲小的服務,對外提供Web API,單獨開發、部署和維護。好比,在一個電商系統中,可能有專門的產品服務、訂單服務、用戶服務、推薦服務、優惠服務、搜索服務等,在對外具體展現一個頁面時,可能要調用多個服務,而多個調用之間可能還有必定的依賴,好比,顯示一個產品頁面,須要調用產品服務,也可能須要調用推薦服務獲取與該產品有關的其餘推薦,還可能須要調用優惠服務獲取該產品相關的促銷優惠,而爲了調用優惠服務,可能須要先調用用戶服務以獲取用戶的會員級別。github

另外,現代軟件常常依賴不少第三方的服務,好比地圖服務、短信服務、天氣服務、匯率服務等,在實現一個具體功能時,可能要訪問多個這樣的服務,這些訪問之間可能存在着必定的依賴關係。編程

爲了提升性能,充分利用系統資源,這些對外部服務的調用通常都應該是異步的、儘可能併發的。咱們在77節介紹過異步任務執行服務,使用ExecutorService能夠方便地提交單個獨立的異步任務,能夠方便地在須要的時候經過Future接口獲取異步任務的結果,但對於多個尤爲是有必定依賴關係的異步任務,這種支持就不夠了。swift

因而,就有了CompletableFuture,它是一個具體的類,實現了兩個接口,一個是Future,另外一個是CompletionStage,Future表示異步任務的結果,而CompletionStage字面意思是完成階段,多個CompletionStage能夠以流水線的方式組合起來,對於其中一個CompletionStage,它有一個計算任務,但可能須要等待其餘一個或多個階段完成才能開始,它完成後,可能會觸發其餘階段開始運行。CompletionStage提供了大量方法,使用它們,能夠方便地響應任務事件,構建任務流水線,實現組合式異步編程。bash

具體怎麼使用呢?下面咱們會逐步說明,CompletableFuture也是一個Future,咱們先來看與Future相似的地方。微信

與Future/FutureTask對比

基本的任務執行服務

咱們先經過示例來簡要回顧下異步任務執行服務和Future,在異步任務執行服務中,用Callable或Runnable表示任務,以Callable爲例,一個模擬的外部任務爲:併發

private static Random rnd = new Random();

static int delayRandom(int min, int max) {
    int milli = max > min ? rnd.nextInt(max - min) : 0;
    try {
        Thread.sleep(min + milli);
    } catch (InterruptedException e) {
    }
    return milli;
}

static Callable<Integer> externalTask = () -> {
    int time = delayRandom(20, 2000);
    return time;
};
複製代碼

externalTask表示外部任務,咱們使用了Lambda表達式,不熟悉能夠參看91節,delayRandom用於模擬延時。app

假定有一個異步任務執行服務,其代碼爲:

private static ExecutorService executor =
        Executors.newFixedThreadPool(10);
複製代碼

經過任務執行服務調用外部服務,通常返回Future,表示異步結果,示例代碼爲:

public static Future<Integer> callExternalService(){
    return executor.submit(externalTask);
}
複製代碼

在主程序中,結合異步任務和本地調用的示例代碼爲:

public static void master() {
    // 執行異步任務
    Future<Integer> asyncRet = callExternalService();

    // 執行其餘任務 ...

    // 獲取異步任務的結果,處理可能的異常
    try {
        Integer ret = asyncRet.get();
        System.out.println(ret);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}
複製代碼

基本的CompletableFuture

使用CompletableFuture能夠實現相似功能,不過,它不支持使用Callable表示異步任務,而支持Runnable和Supplier,Supplier替代Callable表示有返回結果的異步任務,與Callale的區別是,它不能拋出受檢異常,若是會發生異常,能夠拋出運行時異常。

使用Supplier表示異步任務,代碼與Callable相似,替換變量類型便可,即:

static Supplier<Integer> externalTask = () -> {
    int time = delayRandom(20, 2000);
    return time;
};
複製代碼

使用CompletableFuture調用外部服務的代碼能夠爲:

public static Future<Integer> callExternalService(){
    return CompletableFuture.supplyAsync(externalTask, executor);
}
複製代碼

supplyAsync是一個靜態方法,其定義爲:

public static <U> CompletableFuture<U> supplyAsync( Supplier<U> supplier, Executor executor) 複製代碼

它接受兩個參數supplier和executor,內部,它使用executor執行supplier表示的任務,返回一個CompletableFuture,調用後,任務被異步執行,這個方法當即返回。

supplyAsync還有一個不帶executor參數的方法:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) 複製代碼

沒有executor,任務被誰執行呢?與系統環境和配置有關,通常來講,若是可用的CPU核數大於2,會使用Java 7引入的Fork/Join任務執行服務,即ForkJoinPool.commonPool(),該任務執行服務背後的工做線程數通常爲CPU核數減1,即Runtime.getRuntime().availableProcessors()-1,不然,會使用ThreadPerTaskExecutor,它會爲每一個任務建立一個線程。

對於CPU密集型的運算任務,使用Fork/Join任務執行服務是合適的,但對於通常的調用外部服務的異步任務,Fork/Join多是不合適的,由於它的並行度比較低,可能會讓本能夠併發的多任務串行運行,這時,應該提供Executor參數。

後面咱們還會看到不少以Async結尾命名的方法,通常都有兩個版本,一個帶Executor參數,另外一個不帶,其含義是相同的,就再也不重複介紹了。

對於類型爲Runnable的任務,構建CompletableFuture的方法爲:

public static CompletableFuture<Void> runAsync( Runnable runnable) public static CompletableFuture<Void> runAsync( Runnable runnable, Executor executor) 複製代碼

它與supplyAsync是相似的,具體就不贅述了。

CompletableFuture對Future的基本加強

Future有的接口,CompletableFuture都是支持的,不過,CompletableFuture還有一些額外的相關方法,好比:

public T join() public boolean isCompletedExceptionally() public T getNow(T valueIfAbsent) 複製代碼

join與get方法相似,也會等待任務結束,但它不會拋出受檢異常,若是任務異常結束了,join會將異常包裝爲運行時異常CompletionException拋出。

Future有isDone方法檢查任務是否結束了,但不知道任務是正常結束仍是異常結束,isCompletedExceptionally方法能夠判斷任務是不是異常結束了。

getNow與join相似,區別是,若是任務尚未結束,它不會等待,而是會返回傳入的參數valueIfAbsent。

進一步理解Future/CompletableFuture

前面例子都使用了任務執行服務,其實,任務執行服務與異步結果Future不是綁在一塊兒的,能夠本身建立線程返回異步結果,爲進一步理解,咱們看些示例。

使用FutureTask調用外部服務,代碼能夠爲:

public static Future<Integer> callExternalService() {
    FutureTask<Integer> future = new FutureTask<>(externalTask);
    new Thread() {
        public void run() {
            future.run();
        }
    }.start();
    return future;
}
複製代碼

內部本身建立了一個線程,線程調用FutureTask的run方法,咱們在77節分析過FutureTask的代碼,run方法會調用externalTask的call方法,並保存結果或碰到的異常,喚醒等待結果的線程。

使用CompletableFuture,也能夠直接建立線程,並返回異步結果,代碼能夠爲:

public static Future<Integer> callExternalService() {
    CompletableFuture<Integer> future = new CompletableFuture<>();
    new Thread() {
        public void run() {
            try {
                future.complete(externalTask.get());
            } catch (Exception e) {
                future.completeExceptionally(e);
            }
        }
    }.start();
    return future;
}
複製代碼

這裏使用了CompletableFuture的兩個方法:

public boolean complete(T value) public boolean completeExceptionally(Throwable ex) 複製代碼

這兩個方法顯式設置任務的狀態和結果,complete設置任務成功完成,結果爲value,completeExceptionally設置任務異常結束,異常爲ex。Future接口沒有對應的方法,FutureTask有相關方法但不是public的(是protected)。設置完後,它們都會觸發其餘依賴它們的CompletionStage。具體會觸發什麼呢?咱們接下來再看。

響應結果或異常

使用Future,咱們只能經過get獲取結果,而get可能會須要阻塞等待,而經過CompletionStage,能夠註冊回調函數,當任務完成或異常結束時自動觸發執行,有兩類註冊方法,whenComplete和handle,咱們分別來看下。

whenComplete

whenComplete的聲明爲:

public CompletableFuture<T> whenComplete( BiConsumer<? super T, ? super Throwable> action) 複製代碼

參數action表示回調函數,無論前一個階段是正常結束仍是異常結束,它都會被調用,函數類型是BiConsumer,接受兩個參數,第一個參數是正常結束時的結果值,第二個參數是異常結束時的異常,BiConsumer沒有返回值。whenComplete的返回值仍是CompletableFuture,它不會改變原階段的結果,還能夠在其上繼續調用其餘函數。看個簡單的示例:

CompletableFuture.supplyAsync(externalTask).whenComplete((result, ex) -> {
    if (result != null) {
        System.out.println(result);
    }
    if (ex != null) {
        ex.printStackTrace();
    }
}).join();
複製代碼

result表示前一個階段的結果,ex表示異常,只可能有一個不爲null。

whenComplete註冊的函數具體由誰執行呢?通常而言,這要看註冊時任務的狀態,若是註冊時任務尚未結束,則註冊的函數會由執行任務的線程執行,在該線程執行完任務後執行註冊的函數,若是註冊時任務已經結束了,則由當前線程(即調用註冊函數的線程)執行。

若是不但願當前線程執行,避免可能的同步阻塞,可使用其餘兩個異步註冊方法:

public CompletableFuture<T> whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action) public CompletableFuture<T> whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action, Executor executor) 複製代碼

與前面介紹的以Async結尾的方法同樣,對第一個方法,註冊函數action會由默認的任務執行服務(即ForkJoinPool.commonPool()或ThreadPerTaskExecutor執行),對第二個方法,會由參數中指定的executor執行。

handle

whenComplete只是註冊回調函數,不改變結果,它返回了一個CompletableFuture,但這個CompletableFuture的結果與調用它的CompletableFuture是同樣的,還有一個相似的註冊方法handle,其聲明爲:

public <U> CompletableFuture<U> handle( BiFunction<? super T, Throwable, ? extends U> fn) 複製代碼

回調函數是一個BiFunction,也是接受兩個參數,一個是正常結果,另外一個是異常,但BiFunction有返回值,在handle返回的CompletableFuture中,結果會被BiFunction的返回值替代,即便原來有異常,也會被覆蓋,好比:

String ret =
    CompletableFuture.supplyAsync(()->{
        throw new RuntimeException("test");
    }).handle((result, ex)->{
        return "hello";
    }).join();
System.out.println(ret);
複製代碼

輸出爲"hello"。異步任務拋出了異常,但經過handle方法,改變告終果。

與whenComplete相似,handle也有對應的異步註冊方法handleAsync,具體咱們就不探討了。

exceptionally

whenComplete和handle都是既響應正常完成也響應異常,若是隻對異常感興趣,可使用exceptionally,其聲明爲:

public CompletableFuture<T> exceptionally( Function<Throwable, ? extends T> fn) 複製代碼

它註冊的回調函數是Function,接受的參數爲異常,返回一個值,與handle相似,它也會改變結果,具體就不舉例了。

除了響應結果和異常,使用CompletableFuture,能夠方便地構建有多種依賴關係的任務流,咱們先來看簡單的依賴單一階段的狀況。

構建依賴單一階段的任務流

thenRun

在一個階段正常完成後,執行下一個任務,看個簡單示例:

Runnable taskA = () -> System.out.println("task A");
Runnable taskB = () -> System.out.println("task B");
Runnable taskC = () -> System.out.println("task C");

CompletableFuture.runAsync(taskA)
    .thenRun(taskB)
    .thenRun(taskC)
    .join();
複製代碼

這裏,有三個異步任務taskA, taskB和taskC,經過thenRun天然地描述了它們的依賴關係,thenRun是同步版本,有對應的異步版本thenRunAsync:

public CompletableFuture<Void> thenRunAsync(Runnable action) public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor) 複製代碼

在thenRun構建的任務流中,只有前一個階段沒有異常結束,下一個階段的任務纔會執行,若是前一個階段發生了異常,全部後續階段都不會運行,結果會被設爲相同的異常,調用join會拋出運行時異常CompletionException。

thenRun指定的下一個任務類型是Runnable,它不須要前一個階段的結果做爲參數,也沒有返回值,因此,在thenRun返回的CompletableFuture中,結果類型爲Void,即沒有結果。

thenAccept/thenApply

若是下一個任務須要前一個階段的結果做爲參數,可使用thenAccept或thenApply方法:

public CompletableFuture<Void> thenAccept( Consumer<? super T> action) public <U> CompletableFuture<U> thenApply( Function<? super T,? extends U> fn) 複製代碼

thenAccept的任務類型是Consumer,它接受前一個階段的結果做爲參數,沒有返回值。thenApply的任務類型是Function,接受前一個階段的結果做爲參數,返回一個新的值,這個值會成爲thenApply返回的CompletableFuture的結果值。看個簡單示例:

Supplier<String> taskA = () -> "hello";
Function<String, String> taskB = (t) -> t.toUpperCase();
Consumer<String> taskC = (t) -> System.out.println("consume: " + t);

CompletableFuture.supplyAsync(taskA)
    .thenApply(taskB)
    .thenAccept(taskC)
    .join();
複製代碼

taskA的結果是"hello",傳遞給了taskB,taskB轉換結果爲"HELLO",再把結果給taskC,taskC進行了輸出,因此輸出爲:

consume: HELLO
複製代碼

CompletableFuture中有不少名稱帶有run, accept或apply的方法,它們通常與任務的類型相對應,run與Runnable對應,accept與Consumer對應,apply與Function對應,後續就不贅述了。

thenCompose

與thenApply相似,還有一個方法thenCompose,聲明爲:

public <U> CompletableFuture<U> thenCompose( Function<? super T, ? extends CompletionStage<U>> fn) 複製代碼

這個任務類型也是Function,也是接受前一個階段的結果,返回一個新的結果,不過,這個轉換函數fn的返回值類型是CompletionStage,也就是說,它的返回值也是一個階段,若是使用thenApply,結果就會變爲CompletableFuture<CompletableFuture<U>>,而使用thenCompose,會直接返回fn返回的CompletionStage,thenCompose與thenApply的區別,就如同Stream API中flatMap與map的區別,看個簡單的示例:

Supplier<String> taskA = () -> "hello";
Function<String, CompletableFuture<String>> taskB = (t) ->
    CompletableFuture.supplyAsync(() -> t.toUpperCase());
Consumer<String> taskC = (t) -> System.out.println("consume: " + t);

CompletableFuture.supplyAsync(taskA)
    .thenCompose(taskB)
    .thenAccept(taskC)
    .join();
複製代碼

以上代碼中,taskB是一個轉換函數,但它本身也執行了異步任務,返回類型也是CompletableFuture,因此使用了thenCompose。

構建依賴兩個階段的任務流

依賴兩個都完成

thenRun, thenAccept, thenApply和thenCompose用於在一個階段完成後執行另外一個任務,CompletableFuture還有一些方法用於在兩個階段都完成後執行另外一個任務,方法是:

public CompletableFuture<Void> runAfterBoth( CompletionStage<?> other, Runnable action public <U,V> CompletableFuture<V> thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) public <U> CompletableFuture<Void> thenAcceptBoth( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) 複製代碼

runAfterBoth對應的任務類型是Runnable,thenCombine對應的任務類型是BiFunction,接受前兩個階段的結果做爲參數,返回一個結果,thenAcceptBoth對應的任務類型是BiConsumer,接受前兩個階段的結果做爲參數,但不返回結果。它們都有對應的異步和帶Executor參數的版本,用於指定下一個任務由誰執行,具體就不贅述了。當前階段和參數指定的另外一個階段other沒有依賴關係,併發執行,當兩個都執行結束後,開始執行指定的另外一個任務。

看個簡單的示例,任務A和B執行結束後,執行任務C合併結果,代碼爲:

Supplier<String> taskA = () -> "taskA";
CompletableFuture<String> taskB = CompletableFuture.supplyAsync(() -> "taskB");
BiFunction<String, String, String> taskC = (a, b) -> a + "," + b;

String ret = CompletableFuture.supplyAsync(taskA)
        .thenCombineAsync(taskB, taskC)
        .join();
System.out.println(ret);
複製代碼

輸出爲:

taskA,taskB
複製代碼

依賴兩個階段中的一個

前面的方法要求兩個階段都完成後才執行下一個任務,若是隻須要其中任意一個階段完成,可使用下面的方法:

public CompletableFuture<Void> runAfterEither( CompletionStage<?> other, Runnable action) public <U> CompletableFuture<U> applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn) public CompletableFuture<Void> acceptEither( CompletionStage<? extends T> other, Consumer<? super T> action) 複製代碼

它們都有對應的異步和帶Executor參數的版本,用於指定下一個任務由誰執行,具體就不贅述了。當前階段和參數指定的另外一個階段other沒有依賴關係,併發執行,只要當其中一個執行完了,就會啓動參數指定的另外一個任務,具體就不贅述了。

構建依賴多個階段的任務流

若是依賴的階段不止兩個,可使用以下方法:

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) 複製代碼

它們是靜態方法,基於多個CompletableFuture構建了一個新的CompletableFuture。

對於allOf,當全部子CompletableFuture都完成時,它才完成,若是有的CompletableFuture異常結束了,則新的CompletableFuture的結果也是異常,不過,它並不會由於有異常就提早結束,而是會等待全部階段結束,若是有多個階段異常結束,新的CompletableFuture中保存的異常是最後一個的。新的CompletableFuture會持有異常結果,但不會保存正常結束的結果,若是須要,能夠從每一個階段中獲取。看個簡單的示例:

CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {
    delayRandom(100, 1000);
    return "helloA";
}, executor);

CompletableFuture<Void> taskB = CompletableFuture.runAsync(() -> {
    delayRandom(2000, 3000);
}, executor);

CompletableFuture<Void> taskC = CompletableFuture.runAsync(() -> {
    delayRandom(30, 100);
    throw new RuntimeException("task C exception");
}, executor);

CompletableFuture.allOf(taskA, taskB, taskC).whenComplete((result, ex) -> {
    if (ex != null) {
        System.out.println(ex.getMessage());
    }
    if (!taskA.isCompletedExceptionally()) {
        System.out.println("task A " + taskA.join());
    }
});
複製代碼

taskC會首先異常結束,但新構建的CompletableFuture會等待其餘兩個結束,都結束後,能夠經過子階段(如taskA)的方法檢查子階段的狀態和結果。

對於anyOf返回的CompletableFuture,當第一個子CompletableFuture完成或異常結束時,它相應地完成或異常結束,結果與第一個結束的子CompletableFuture同樣,具體就不舉例了。

小結

本節介紹了Java 8中的組合式異步編程CompletableFuture:

  • 它是對Future的加強,但能夠響應結果或異常事件,有不少方法構建異步任務流
  • 根據任務由誰執行,通常有三類對應方法,名稱不帶Async的方法由當前線程或前一個階段的線程執行,帶Async但沒有指定Executor的方法由默認Excecutor執行(ForkJoinPool.commonPool()或ThreadPerTaskExecutor),帶Async且指定Executor參數的方法由指定的Executor執行
  • 根據任務類型,通常也有三類對應方法,名稱帶run的對應Runnable,帶accept的對應Consumer,帶apply的對應Function

使用CompletableFuture,能夠簡潔天然地表達多個異步任務之間的依賴關係和執行流程,大大簡化代碼,提升可讀性。

下一節,咱們探討Java 8對日期和時間API的加強。

(與其餘章節同樣,本節全部代碼位於 github.com/swiftma/pro…,位於包shuo.laoma.java8.c94下)


未完待續,查看最新文章,敬請關注微信公衆號「老馬說編程」(掃描下方二維碼),從入門到高級,深刻淺出,老馬和你一塊兒探索Java編程及計算機技術的本質。用心原創,保留全部版權。

相關文章
相關標籤/搜索