java8 CompletableFuture入門 使用教程 詳解全部方法 附實例

概覽

  1. CompletableFuture是java8引入的新類,該類實現了 Future 接口和 CompletionStage 接口,封裝了future、forkjoin相關類來執行異步,因此你仍是能夠像之前同樣經過阻塞(get)或者輪詢的方式得到結果,儘管這種方式不推薦使用。
  2. CompletionStage 接口表明異步計算中的 不一樣階段,以及如何 組合 這些計算階段。
  3. CompletableStage 接口中有 50 多個方法,能夠對 CompletableStage 進行組合、計算,方法看似不少,但能夠按功能對其分類,大多數方法都有 3 種變體:
    1. 不帶 Async 方法:同步方法
    2. 帶 Async,只有一個參數:異步方法,使用默認的 ForkJoinPool.commonPool() 獲取線程池
    3. 帶 Async,有兩個參數:異步方法,且使用第二個參數指定的 ExecutorService 線程池

瞭解CompletableFuture須要理解java8的函數式接口,不瞭解的同窗能夠先移步:http://www.javashuo.com/article/p-nswjqwms-kw.htmljava

建立CompletableFuture對象

//比較特殊,他入參就是返回值,也就是說他能夠用來執行須要其餘返回值的異步任務。
public static <U> CompletableFuture<U> completedFuture(U value)

//無返回值,使用默認線程池
public static CompletableFuture<Void> 	runAsync(Runnable runnable)

//無返回值,使用自定義線程池
public static CompletableFuture<Void> 	runAsync(Runnable runnable, Executor executor)

//有返回值,使用默認線程池
public static <U> CompletableFuture<U> 	supplyAsync(Supplier<U> supplier)

//有返回值,使用自定義線程池
public static <U> CompletableFuture<U> 	supplyAsync(Supplier<U> supplier, Executor executor)

//
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

舉例:app

//supplyAsync方法無入參,可是返回一個String對象。此方法使用了默認的線程池執行異步任務
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    //長時間的計算任務
    return "·00";
});

在建立完CompletableFuture對象而且執行任務以後,還能夠對結果或者異常等進行額外的操做或任務

下面將經過具體例子來展現各個方法的使用。dom

1.whenComplete* 和 exceptionally 方法

當原始的CompletableFuture任務執行完後,不論是否成功計算出結果,仍是拋出異常,都會會執行 whenComplete* 或 exceptionally 的方法中的任務
該操做執行完畢後:異步

  • 會返回一個新的CompletableFuture對象!!!
  • 使用whenComplete*方法時,返回的新的CompletableFuture對象的返回結果和原始的CompletableFuture對象計算結果相同
  • 使用 exceptionally方法時,若是原始計算邏輯拋出異常,那麼返回的 新的CompletableFuture對象 的返回結果由該方法的return值決定;若是原始計算邏輯沒有拋出異常,那麼返回的 新的CompletableFuture對象 的返回結果和原始計算邏輯返回的結果一致。有點繞,先不明白不要緊,下面會有四個exceptionally實例解釋這段話。
BiConsumer<T,U> 函數接口有兩個參數,無返回值。
Function<T,R> 函數接口有一個輸入參數,返回一個結果。

//無Async,同步處理正常計算結果或異常,使用執行任務的那個線程來執行該方法,因此這個方法是同步的。
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
//有Async,異步處理正常計算結果或異常,使用執行任務的那個線程池中的線程來執行該方法!因此這個方法是異步的。
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
//有Async,異步處理正常計算結果或異常,使用自定義線程池來執行該方法,因此這個方法是異步的。
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? superThrowable> action, Executor executor)
//處理異常。
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

注意:當沒有異常拋出來的時候,上面的Throwable參數爲空!函數

舉例:

計算邏輯:
    private static Random random = new Random();
    private static long time = System.currentTimeMillis();

    public static int getMoreData(){
        System.out.println("begin to start compute");
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println("end to compute, passed:" + System.currentTimeMillis());
        return random.nextInt(1000);
    }

    public static int throwException(){
        System.out.println("準備拋出異常");
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println("拋了");
        throw new RuntimeException("主動拋出異常");
    }
whenComplete:

    //若是使用這段代碼,則會是和當前線程同步執行
    public static void main(String[] args) throws Exception{
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> getMoreData());

        CompletableFuture<Integer> future2 = future.whenComplete((result, excetion) -> {
            System.out.println("執行到whenComplete了,result:" + result);
            System.out.println("執行到whenComplete了,exception:" + (excetion == null ? "無異常" : excetion.getClass()));
        });
        System.out.println("執行到最後一段代碼了,future result:" + future.get());
        System.out.println("執行到最後一段代碼了,future2 result:" + future2.get());
    }

> 打印執行結果:  
begin to start compute  
end to compute, passed:1551182552193  
執行到whenComplete了,result:625  
執行到whenComplete了,exception:無異常  
執行到最後一段代碼了,future result:625  
執行到最後一段代碼了,future2 result:625  

>從打印結果可知,whenComplete使用原始的執行的任務的線程,因此能夠當作是同步執行的,而且新的CompletableFuture對象的結果和原始的一致
whenCompleteAsync:

    //若是使用這段代碼,則會是和當前線程同步執行
    public static void main(String[] args) throws Exception{
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> getMoreData());

        future.whenCompleteAsync((result, exception) -> {
            System.out.println("計算已執行完畢,result:" + result);
            System.out.println("計算已執行完畢,exception:" + (excetion == null ? "無異常" : excetion.getClass()));
        });
        System.out.println("執行到最後一段代碼了,result:" + future.get());
    }

> 打印執行結果:  
begin to start compute  
end to compute, passed:1551180611064  
執行到最後一段代碼了,result:323  
執行到whenComplete了,result:323  
執行到whenComplete了,exception:無異常  

>從打印結果可知,whenCompleteAsync是異步執行的

exceptionally比較複雜,須要經過4個實例才能真正明白:.net

exceptionally實例1:

    //這段代碼,因爲會拋出異常,會先走whenCompleteAsync,而後再走exceptionally,並且是沒法獲取到返回值的。
    public static void main(String[] args) throws Exception{

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> throwException());

        future.whenCompleteAsync((result, exception) -> {
            System.out.println("計算已執行完畢,result:" + result);
            System.out.println("計算已執行完畢,exception:" + (exception == null ? "無異常" : exception.getClass()));
        }).exceptionally(exception -> {
            System.out.println("計算執行過程當中發生了異常,exception:" + exception.getClass());
            return 0;
        });

       System.out.println("執行到最後一段代碼了,future result:" + future.get());
    }

> 打印執行結果:  
準備拋出異常
拋了
計算已執行完畢,result:null
計算已執行完畢,result:null
計算已執行完畢,exception:class java.util.concurrent.CompletionException
計算已執行完畢,exception:class java.util.concurrent.CompletionException
計算執行過程當中發生了異常,exception:class java.util.concurrent.CompletionException
計算執行過程當中發生了異常,exception:class java.util.concurrent.CompletionException
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: 主動拋出異常
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
	at me.ele.ecs.eapp.service.impl.Main.main(Main.java:69)
Caused by: java.lang.RuntimeException: 主動拋出異常
	at me.ele.ecs.eapp.service.impl.Main.throwException(Main.java:37)
	at me.ele.ecs.eapp.service.impl.Main.lambda$main$0(Main.java:44)
exceptionally實例2:

    //這裏的打印結果是和上面相似的,但是爲何此次要獲取新的CompletableFuture對象呢?看下面的exceptionally實例3後,再回來對比吧
    public static void main(String[] args) throws Exception{

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> throwException());

        CompletableFuture<Integer> future2 = future.whenCompleteAsync((result, exception) -> {
            System.out.println("計算已執行完畢,result:" + result);
            System.out.println("計算已執行完畢,exception:" + (exception == null ? "無異常" : exception.getClass()));
        });

        CompletableFuture<Integer> future3 = future2.exceptionally(exception -> {
            System.out.println("計算執行過程當中發生了異常,exception:" + exception.getClass());
            return 0;
        });

        System.out.println("執行到最後一段代碼了,future result:" + future.get());
       //由於上面的執行過程當中,已經拋出了異常了,那麼下面的這兩段代碼是沒法執行到的,
        System.out.println("執行到最後一段代碼了,future2 result:" + future2.get());
        System.out.println("執行到最後一段代碼了,future3 result:" + future3.get());
    }

> 打印執行結果:  
準備拋出異常
拋了
計算已執行完畢,result:null
計算已執行完畢,exception:class java.util.concurrent.CompletionException
計算執行過程當中發生了異常,exception:class java.util.concurrent.CompletionException
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: 主動拋出異常
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
	at me.ele.ecs.eapp.service.impl.Main.main(Main.java:69)
Caused by: java.lang.RuntimeException: 主動拋出異常
	at me.ele.ecs.eapp.service.impl.Main.throwException(Main.java:37)
	at me.ele.ecs.eapp.service.impl.Main.lambda$main$0(Main.java:44)
exceptionally實例3:

    //
    public static void main(String[] args) throws Exception{

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> throwException());

        CompletableFuture<Integer> future2 = future.whenCompleteAsync((result, exception) -> {
            System.out.println("計算已執行完畢,result:" + result);
            System.out.println("計算已執行完畢,exception:" + (exception == null ? "無異常" : exception.getClass()));
        });

        CompletableFuture<Integer> future3 = future2.exceptionally(exception -> {
            System.out.println("計算執行過程當中發生了異常,exception:" + exception.getClass());
            //這裏的返回值實際其是沒有用處的。由於若是拋出了異常,future的get方法是執行不到的;而若是沒有拋出異常的話,仍是會返回原始的CompletableFuture的值的
            //因此這個exceptionally就是僅僅用來處理異常的。
            return 0;
        });

        //System.out.println("執行到最後一段代碼了,future result:" + future.get());
        //System.out.println("執行到最後一段代碼了,future2 result:" + future2.get());
        //和上面實例2惟一的區別就是註釋掉了上面兩段代碼,可是執行結果卻不同了,並且整個main方法都沒有拋出來異常,緣由就在於future和future2是異步執行的,因此是在別的線程拋了異常,而main方法是不會拋出來的。並且在獲取future3的結果時,能夠發現,返回了future3對象自定義的返回值
        System.out.println("執行到最後一段代碼了,future3 result:" + future3.get());
    }

> 打印執行結果:  
準備拋出異常
拋了
計算已執行完畢,result:null
計算已執行完畢,exception:class java.util.concurrent.CompletionException
計算執行過程當中發生了異常,exception:class java.util.concurrent.CompletionException
執行到最後一段代碼了,future3 result:0
exceptionally實例4:

    public static void main(String[] args) throws Exception{

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(Main::getMoreData);

        CompletableFuture<Integer> future2 = future.whenCompleteAsync((result, exception) -> {
            System.out.println("計算已執行完畢,result:" + result);
            System.out.println("計算已執行完畢,exception:" + (exception == null ? "無異常" : exception.getClass()));
        });

        CompletableFuture<Integer> future3 = future2.exceptionally(exception -> {
            System.out.println("計算執行過程當中發生了異常,exception:" + exception.getClass());
            return 0;
        });

        System.out.println("執行到最後一段代碼了,future result:" + future.get());
        System.out.println("執行到最後一段代碼了,future2 result:" + future2.get());
        //原始的計算邏輯不變,exceptionally返回的新的CompletableFuture對象的結果和原始計算邏輯返回的結果一致。
        System.out.println("執行到最後一段代碼了,future3 result:" + future3.get());
    }

> 打印執行結果:  
begin to start compute
end to compute, passed:1551239497158
getMoreData: 679
執行到最後一段代碼了,future result:679
計算已執行完畢,result:679
計算已執行完畢,exception:無異常
執行到最後一段代碼了,future2 result:679
執行到最後一段代碼了,future3 result:679

2.handle* 方法

和 whenComplete* 方法同樣,都是在任務執行完後,執行該方法的邏輯,可是和 whenComplete* 不一樣的是:
該操做執行完畢後,它返回的新CompletableFuture對象的計算結果是handle*方法的返回值,並非原始計算邏輯的返回值線程

//同步
public <U> CompletableFuture<U> 	handle(BiFunction<? super T,Throwable,? extends U> fn)
//異步,使用原始CompletableFuture的線程
public <U> CompletableFuture<U> 	handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
//異步,使用自定義線程池的線程
public <U> CompletableFuture<U> 	handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)
實例:

    public static void main(String[] args) throws Exception{

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(Main::getMoreData);

        CompletableFuture<Integer> future2 = future.handleAsync((result, exception) -> {
            System.out.println("計算已執行完畢,result:" + result);
            System.out.println("計算已執行完畢,exception:" + (exception == null ? "無異常" : exception.getClass()));
            return result + 1;
        });

        System.out.println("執行到最後一段代碼了,future result:" + future.get());
        System.out.println("執行到最後一段代碼了,future2 result:" + future2.get());
    }

> 打印執行結果:  
begin to start compute
end to compute, passed:1551243326193
getMoreData: 395
執行到最後一段代碼了,future result:395
計算已執行完畢,result:395
計算已執行完畢,exception:無異常
執行到最後一段代碼了,future2 result:396

3.thenApply* 方法:鏈接

thenApply* 能夠鏈接多個CompletableFuture對象,至關於將一個一個的CompletableFuture串聯起來了,第一個CompletableFuture對象的結果會傳遞到下一個對象中,而且下一個CompletableFuture對象的結算結果會做爲上一個對象的CompletableFuture結果,依次類推,也就是說會改變原始CompletableFuture對象的結果。
注:它和 handle 方法有點相似,都會拿到上一個CompletableFuture對象的結果進行計算,可是區別就是thenApply 會改變原始對象的計算結果,而 handle* 並不會**。code

public <U> CompletableFuture<U> 	thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> 	thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> 	thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        return 100;
        });
       //因爲這裏同時鏈接了多個thenApplyAsync,第一個是異步的,第二個是同步的,而且都沒有處理異常,因此異常會直接在執行計算的線程上拋出來。
       CompletableFuture<String> f =  future.thenApplyAsync(i -> i * 10).thenApply(i -> i.toString());
       System.out.println(f.get()); //"1000"
}

4. thenAccept* 方法:純消費一個CompletableFuture對象的結果

thenAccept* 返回的新的CompletableFuture對象不返回結果,若是使用get方法,會返回一個null。對象

public CompletableFuture<Void> 	thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> 	thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> 	thenAcceptAsync(Consumer<? super T> action, Executor executor)
public static void main(String[] args) throws ExecutionException, InterruptedException {
         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(Main::getMoreData);

        CompletableFuture<Void> future2 = future.thenAccept(result -> {
            System.out.println("執行到thenAccept了, result:" + result);
        });

        System.out.println("執行到最後一段代碼了,future result:" + future.get());
        System.out.println("執行到最後一段代碼了,future2 result: " + future2.get());
}
> 打印執行結果:
begin to start compute
end to compute, passed:1551341977684
getMoreData: 171
執行到thenAccept了, result:171
執行到最後一段代碼了,future result:171
執行到最後一段代碼了,future2 result: null

5. thenAcceptBoth* 方法:在兩個CompletableFuture對象的執行完後執行。

它和 thenAccept 同樣,都是純消費,可是thenAccept*只能消費一個CompletableFuture對象,而thenAcceptBoth* 能在兩個不一樣的CompletableFuture對象執行完成後,消費他們兩個的計算結果。
並且他僅僅在原始的兩個CompletableFuture對象都計算成功以後,纔開始執行blog

public <U> CompletableFuture<Void> 	thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void> 	thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void> 	thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)

//runAfterBoth和上面三個的區別就是它不消費原始的CompletableFuture結果
public     CompletableFuture<Void> 	runAfterBoth(CompletionStage<?> other,  Runnable action)
public static void main(String[] args) throws ExecutionException, InterruptedException {
         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(Main::getMoreData);

        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(Main::getMoreData);

        future.thenAcceptBothAsync(future2, (x, y) -> {
            System.out.println("future1 和 future都執行完成了,結果分別是:" + x + "," + y);
        });

        System.out.println("執行到最後一段代碼了,future result:" + future.get());
        System.out.println("執行到最後一段代碼了,future2 result: " + future2.get());
}
> 打印執行結果:
begin to start compute
begin to start compute
end to compute, passed:1551342475808
getMoreData: 920
執行到最後一段代碼了,future result:920
end to compute, passed:1551342475811
getMoreData: 747
執行到最後一段代碼了,future2 result: 747
future1 和 future都執行完成了,結果分別是:920,747

6. thenRun* 方法:不消費CompletableFuture對象的結果,執行一個新任務。

在原始CompletableFuture執行任務結束後,並且執行指定的任務,不消費,也不產生結果。

public CompletableFuture<Void> 	thenRun(Runnable action)
public CompletableFuture<Void> 	thenRunAsync(Runnable action)
public CompletableFuture<Void> 	thenRunAsync(Runnable action, Executor executor)
public static void main(String[] args) throws ExecutionException, InterruptedException {
         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(Main::getMoreData);

        CompletableFuture<Void> future2 = future.thenRunAsync(() -> {
            System.out.println("future執行完成了");
        });

        System.out.println("執行到最後一段代碼了,future result:" + future.get());
        System.out.println("執行到最後一段代碼了,future2 result:" + future2.get());
}
> 打印執行結果:
begin to start compute
end to compute, passed:1551344347162
getMoreData: 688
執行到最後一段代碼了,future result:688
future執行完成了
執行到最後一段代碼了,future2 result:null

7.acceptEither* :當任意一個CompletableFuture計算完成的時候就會執行,它沒有返回值。

runAfterBoth是當兩個CompletableFuture都計算完成後才執行。

public CompletableFuture<Void> 	acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> 	acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> 	acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)

8.applyToEither*:當任意一個CompletableFuture計算完成的時候就會執行,它有返回值。

runAfterBoth是當兩個CompletableFuture都計算完成後才執行。而 acceptEither* 沒有返回值。

public <U> CompletableFuture<U> 	applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U> 	applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U> 	applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)

輔助方法 :allOf 和 anyOf

public static CompletableFuture<Void> 	    allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> 	anyOf(CompletableFuture<?>... cfs)

更進一步

若是你用過Guava的Future類,你就會知道它的Futures輔助類提供了不少便利方法,用來處理多個Future,而不像Java的CompletableFuture,只提供了allOf、anyOf兩個方法。 好比有這樣一個需求,將多個CompletableFuture組合成一個CompletableFuture,這個組合後的CompletableFuture的計算結果是個List,它包含前面全部的CompletableFuture的計算結果,guava的Futures.allAsList能夠實現這樣的功能,可是對於java CompletableFuture,咱們須要一些輔助方法:

public static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
       CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
       return allDoneFuture.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.<T>toList()));
   }
public static <T> CompletableFuture<Stream<T>> sequence(Stream<CompletableFuture<T>> futures) {
       List<CompletableFuture<T>> futureList = futures.filter(f -> f != null).collect(Collectors.toList());
       return sequence(futureList);
   }

Java Future轉CompletableFuture:

public static <T> CompletableFuture<T> toCompletable(Future<T> future, Executor executor) {
    return CompletableFuture.supplyAsync(() -> {
        try {
            return future.get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }, executor);
}

本文大量參考了鳥窩的文章,本人只是將實例便於理解。
地址:https://colobu.com/2016/02/29/Java-CompletableFuture/

相關文章
相關標籤/搜索