Java併發編程(六) CompletableFuture與CompletionStage使用

一.前言

最近複習了些java線程池和併發的一點東西,正好看到Future,Future是在JDK5中新加入的用來獲取異步執行結果,可是侷限就是除了採用get()阻塞獲取或者採用輪訓isDone(),別無他法,代碼也不夠優雅,JDK8中新特性CompletableFuture與CompletionStage,是對於Future的補充,下面來分享一下;java

這裏只是對這個類方法進行演示說明,使得快速入門,實際使用中有不少場景和靈活使用;併發

二. CompletableFuture

2.1 提交任務執行、獲取執行結果

static CompletableFuture<Void> runAsync(Runnable runnable) 
//1. 返回一個新的CompletableFuture,它在運行給定操做後由運行在 ForkJoinPool.commonPool()中的任務 異步完成。 
 
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) 
//2. 返回一個新的CompletableFuture,它在運行給定操做以後由在給定執行程序中運行的任務異步完成。  

static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) 
//3. 返回一個新的CompletableFuture,它經過在 ForkJoinPool.commonPool()中運行的任務與經過調用給定的供應商得到的值 異步完成。  

static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) 
//4. 返回一個新的CompletableFuture,由給定執行器中運行的任務異步完成,並經過調用給定的供應商得到的值。

說明:異步

  • 前二者用於無返回值的異步執行,後二者用於可得到返回值的異步執行 ;
  • 2、四 可執行特定線程池或自定義線程池執行,不然默認使用ForkJoinPool.commonPool()線程池;
public void test1() throws ExecutionException, InterruptedException {
        //1. 提交一個一部執行的任務,無結果返回值
        CompletableFuture.runAsync(() -> {
            log.info("I have done Nothing");
        });

        CompletableFuture.runAsync(() -> {
            log.info("Me, too");
        },EnvirmentThreadPool.getThreadPool());

        //2. 提交一個一部執行的任務,有結果返回值
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "future 1");
        //若future執行完畢,則future.get();返回執行結果'future 1',若未執行完畢,則返回給定值'111'
        future.complete("111");

        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "future 1",
                EnvirmentThreadPool.getThreadPool());

        //返回一個指定結果的CompletableFuture對象
        CompletableFuture<String> future2 = CompletableFuture.completedFuture("future 2");
    }

2.2 allOf 、 anyOf

static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) 
//5. 返回一個新的CompletableFuture,當全部給定的CompletableFutures完成時,完成。  

static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) 
//6. 返回一個新的CompletableFuture,當任何一個給定的CompletableFutures完成時,完成相同的結果。

說明:函數

  • 前者爲執行完全部提交任務後進行後面的操做,無返回值;
  • 後者用於有返回值的,當任一執行完返回結果時便可進行後續的任務執行;
@Test
    public void test1() throws ExecutionException, InterruptedException {
        //1. 提交一個一部執行的任務,無結果返回值
        CompletableFuture<Void> void1 = CompletableFuture.runAsync(() -> {
            log.info("I have done Nothing");
        });

        CompletableFuture<Void> void2 = CompletableFuture.runAsync(() -> {
            log.info("Me, too");
        }, EnvirmentThreadPool.getThreadPool());

        CompletableFuture.allOf(void1, void2).thenRun(() -> {log.info("success");}).join();



        //2. 提交一個一部執行的任務,有結果返回值
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "future 1");
        
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "future 3",
                EnvirmentThreadPool.getThreadPool());

         //後面須要join(),不然void1和void2將異步執行,這裏不會阻塞,也就拿不到執行結果
         CompletableFuture.allOf(void1, void2).thenRun(() -> {log.info("success");}).join();

        CompletableFuture.anyOf(future,future1).thenAccept((value)-> System.out.println(value)).join(); //輸出結果future 1與future 3不定



    }
  • 二者均須要join()或者get()方法配合使用才能達到同步入參的返回值執行後面的操做,不然將異步執行;工具

/**
         * 在這裏重點對比一下下面的方法,下面舉例用allOf,實際allOf與anyOf用法相同,下面提到的join(),換成get()也一樣;
         */
        //1.allOf沒有join()配合使用,allOf後面的結果集若沒有執行完畢則直接執行log.info();結果集繼續異步執行,完成後當前異步線程繼續執行thenAccept()方法;
        CompletableFuture.allOf(future, future1).thenAccept((value) -> {
            System.out.println(value);
        });
        //2.allOf配合join()使用,線程將會阻塞等待結果集執行完畢獲取結果後執行thenAccept();
        CompletableFuture.allOf(future, future1).thenAccept((value) -> {
            System.out.println(value);
        }).join();

        CompletableFuture.allOf(future, future1).thenAccept((value) -> {
            System.out.println(value);
        }).get();

        CompletableFuture.allOf(future, future1).thenAccept((value) -> {
            System.out.println(value);
        }).get(2000, TimeUnit.MILLISECONDS);

        log.info("do nothing");

2.3 異步結果的處理

<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) 
//7.返回一個新的CompletionStage,當此階段正常完成時,將以該階段的結果做爲所提供函數的參數執行。 
 
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) 
//8. 返回一個新的CompletionStage,當該階段正常完成時,將使用此階段的默認異步執行工具執行此階段的結果做爲所提供函數的參數。
  
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) 
//9. 返回一個新的CompletionStage,當此階段正常完成時,將使用提供的執行程序執行此階段的結果做爲提供函數的參數。
CompletableFuture<Void> thenAccept(Consumer<? super T> action) 
//10. 返回一個新的CompletionStage,當此階段正常完成時,將以該階段的結果做爲提供的操做的參數執行。
  
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) 
//11. 返回一個新的CompletionStage,當此階段正常完成時,將使用此階段的默認異步執行工具執行,此階段的結果做爲提供的操做的參數。  

CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) 
//12. 返回一個新的CompletionStage,當此階段正常完成時,將使用提供的執行程序執行此階段的結果做爲提供的操做的參數。

 說明: 上面兩塊方法,都用於獲取前一步執行結果後,對其進行接下來的操做,前者用於又返回值的,後者用於無返回值的結果消耗;入參都支持lamdba,可指定線程池;默認爲ForkJoinPool.commonPool();spa

@Test
    public void test3(){
        CompletableFuture.supplyAsync(() -> "future 1").thenAccept(s -> log.info(s));
        CompletableFuture.supplyAsync(() -> "future 1").thenAcceptAsync(s -> log.info(s),EnvirmentThreadPool.getThreadPool());
        
        
        String c = CompletableFuture.supplyAsync(() -> "future 1").thenApply(a -> a).join();
        try {
            String d = CompletableFuture.supplyAsync(() -> "future 1").thenApply(a -> a).get();
        } catch (InterruptedException e) {
            
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        
    }

2.4. 獲取執行結果get(),join()

public T join() {
        Object r;
        return reportJoin((r = result) == null ? waitingGet(false) : r);
    }

    public T get() throws InterruptedException, ExecutionException {
        Object r;
        return reportGet((r = result) == null ? waitingGet(true) : r);
    }

上面的代碼示例有演示,join()沒有異常拋出,而get()方法拋出了2個異常,從異常能夠看出,兩者都是阻塞的等待獲取執行結果,但get()的阻塞能夠被interrupt(),而join()不能夠;兩者對於異常的處理也不相同;.net

get()方法咱們能夠看到異常拋出的位置在68行,也就是在調用get()方法的時候拋出的異常,同時異常中顯示異常位置在66行;get()須要處理檢查異常,必須try..catch..處理或者拋出到外層;線程

下面使用join():3d

此處異常拋出的位置在66行;join()沒有強制處理異常;code

2.5 任務串行執行,不處理上一步執行結果

CompletableFuture<Void> thenRun(Runnable action) 
//13. 返回一個新的CompletionStage,當此階段正常完成時,執行給定的操做。  

CompletableFuture<Void> thenRunAsync(Runnable action) 
//14. 返回一個新的CompletionStage,當此階段正常完成時,使用此階段的默認異步執行工具執行給定的操做。 
 
CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor) 
//15. 返回一個新的CompletionStage,當此階段正常完成時,使用提供的執行程序執行給定的操做。

說明: 顯而易見,執行完上一個任務後執行下一個,支持異步執行,而且指定線程池;

2.6 合併2個結果

<U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) 
//16. 返回一個新的CompletionStage,當這個和另外一個給定的階段都正常完成時,兩個結果做爲提供函數的參數執行。
  
<U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) 
//17. 返回一個新的CompletionStage,當這個和另外一個給定階段正常完成時,將使用此階段的默認異步執行工具執行,其中兩個結果做爲提供函數的參數。  

<U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) 
//18. 返回一個新的CompletionStage,當這個和另外一個給定階段正常完成時,使用提供的執行器執行,其中兩個結果做爲提供的函數的參數。
@Test
    public void test4(){
        CompletableFuture<Integer> integerCompletableFuture =
                CompletableFuture.supplyAsync(() -> 3)
                .thenCombine(CompletableFuture.supplyAsync(() -> 8), (one, two) -> one + two);
        System.out.println(integerCompletableFuture.join());
    }

 說明: 上面是合併2個串行結果並返回值,下面的對應的是無返回值的方法

<U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) 
//19. 返回一個新的CompletionStage,當這個和另外一個給定的階段都正常完成時,兩個結果做爲提供的操做的參數被執行。  
<U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) 
//20. 返回一個新的CompletionStage,當這個和另外一個給定階段正常完成時,將使用此階段的默認異步執行工具執行,其中兩個結果做爲提供的操做的參數。  
<U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor) 
//21. 返回一個新的CompletionStage,當這個和另外一個給定階段正常完成時,使用提供的執行器執行,其中兩個結果做爲提供的函數的參數。
@Test
    public void test4(){
            CompletableFuture.supplyAsync(() -> 3)
                    .thenAcceptBoth(CompletableFuture.supplyAsync(() -> 8), (one, two) -> System.out.println(one + two));
    }

輸出11;

2.7 

CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action) 
//22. 返回一個新的CompletionStage,當這個和另外一個給定的階段都正常完成時,執行給定的動做。  
CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action) 
//23. 返回一個新的CompletionStage,當這個和另外一個給定階段正常完成時,使用此階段的默認異步執行工具執行給定的操做。  
CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor) 
//24. 返回一個新CompletionStage,當這和其餘特定階段正常完成,使用附帶的執行見執行給定的動做CompletionStage覆蓋特殊的完成規則的文檔。  

CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action) 
//25. 返回一個新的CompletionStage,當這個或另外一個給定階段正常完成時,執行給定的操做。  
CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action) 
//26. 返回一個新的CompletionStage,當這個或另外一個給定階段正常完成時,使用此階段的默認異步執行工具執行給定的操做。  
CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor) 
//27. 返回一個新的CompletionStage,當這個或另外一個給定階段正常完成時,使用提供的執行器執行給定的操做。

說明: 前3個方法用於2個任務完成以後串行執行第三個任務,後3個方法用於只要前2個任務有一個完成就開始執行第三個任務,都不關心前面的執行結果;

CompletableFuture.supplyAsync(() -> 3)
                .runAfterBoth(CompletableFuture.supplyAsync(()->9),
                        ()->System.out.println()
                );

        CompletableFuture.supplyAsync(() -> 3)
                .runAfterEither(CompletableFuture.supplyAsync(()->9),
                        ()->System.out.println()
                );

2.8

CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) 
//28. 返回與此階段相同的結果或異常的新的CompletionStage,當此階段完成時,使用結果(或 null若是沒有))和此階段的異常(或 null若是沒有))執行給定的操做。  
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) 
//29. 返回一個與此階段相同結果或異常的新CompletionStage,當此階段完成時,執行給定操做將使用此階段的默認異步執行工具執行給定操做,結果(或 null若是沒有))和異常(或 null若是沒有)這個階段做爲參數。  
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) 
//30. 返回與此階段相同的結果或異常的新的CompletionStage,當此階段完成時,使用提供的執行者執行給定的操做,若是沒有,則使用結果(或 null若是沒有))和異常(或 null若是沒有))做爲論據。

 說明; 當前者任務完成時,將執行結果和異常做爲參數傳入方法,因爲後續處理;

CompletableFuture.supplyAsync(() -> 3).whenComplete((r,throwable)->System.out.println(r));

2.9

<U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn) 
//31. 返回一個新的CompletionStage,當這個階段正常完成時,這個階段將做爲提供函數的參數執行。  
<U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn) 
//32. 返回一個新的CompletionStage,當此階段正常完成時,將使用此階段的默認異步執行工具執行,此階段做爲提供的函數的參數。  
<U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor) 
//33. 返回一個新的CompletionStage,當此階段正常完成時,將使用提供的執行程序執行此階段的結果做爲提供函數的參數。

 說明: 將前一執行結果做爲參數傳入後面的任務中;

CompletableFuture.supplyAsync(() -> 3).thenCompose(i-> CompletableFuture.supplyAsync(()->i));

2.10

<U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn) 
//34. 返回一個新的CompletionStage,當此階段正常或異常完成時,將使用此階段的結果和異常做爲所提供函數的參數執行。  
<U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn) 
//35. 返回一個新的CompletionStage,當該階段完成正常或異常時,將使用此階段的默認異步執行工具執行,此階段的結果和異常做爲提供函數的參數。  
<U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor) 
//36. 返回一個新的CompletionStage,當此階段完成正常或異常時,將使用提供的執行程序執行此階段的結果和異常做爲提供的函數的參數。

與2.9不一樣的是,此處將前者執行結果包括異常做爲參數入參執行後續的方法;可與thenApply方法進行對比;多了對異常的處理

CompletableFuture.supplyAsync(() -> 3).thenApply(r->r+1);
        CompletableFuture.supplyAsync(() -> 3).handle((r,throwable)-> r+2);
        CompletableFuture<String> handle = CompletableFuture.supplyAsync(() -> 3).handle((r, throwable) -> throwable.getMessage());

2.11 獲取執行結果

boolean complete(T value) 
//若是不是已經完成,將返回的值 get()種相關方法爲給定值。  
boolean completeExceptionally(Throwable ex) 
//若是還沒有完成,則調用 get()和相關方法來拋出給定的異常。  
T get() 
//等待這個將來完成的必要,而後返回結果。  
T get(long timeout, TimeUnit unit) 
//若是有必要等待這個將來完成的給定時間,而後返回其結果(若是有的話)。  
T getNow(T valueIfAbsent) 
//若是已完成,則返回結果值(或拋出任何遇到的異常),不然返回給定的值IfAbsent。  
T join() 
//返回結果值,若是完成異常,則返回(未檢查)異常。

2.12 

boolean completeExceptionally(Throwable ex) 
//若是還沒有完成,則調用 get()和相關方法來拋出給定的異常。  
CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn) 
//返回一個新的CompletableFuture,當CompletableFuture完成時完成,結果是異常觸發此CompletableFuture的完成特殊功能的給定功能; 不然,若是此CompletableFuture正常完成,則返回的CompletableFuture也會以相同的值正常完成。  
boolean isCompletedExceptionally() 
//若是此CompletableFuture以任何方式完成異常完成,則返回 true 。
CompletableFuture<String> result = CompletableFuture.supplyAsync(() -> {

            try {
                Thread.sleep(30000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            throw new RuntimeException("hahahahh");
        });
        String join = result.exceptionally(throwable -> "123").join();
        System.out.println(join);
        System.out.println(result.isCompletedExceptionally());
        System.out.println(result.completeExceptionally(new RuntimeException()));
        result.get();
相關文章
相關標籤/搜索