Java8新特性--使用CompletableFuture構建異步應用

原文連接:http://www.jianshu.com/p/4897ccdcb278    

 Future 接口的侷限性

future接口能夠構建異步應用,但依然有其侷限性。它很難直接表述多個Future 結果之間的依賴性。實際開發中,咱們常常須要達成如下目的:java

  • 將兩個異步計算合併爲一個——這兩個異步計算之間相互獨立,同時第二個又依賴於第
    一個的結果。
  • 等待 Future 集合中的全部任務都完成。
  • 僅等待 Future 集合中最快結束的任務完成(有可能由於它們試圖經過不一樣的方式計算同
    一個值),並返回它的結果。
  • 經過編程方式完成一個 Future 任務的執行(即以手工設定異步操做結果的方式)。
  • 應對 Future 的完成事件(即當 Future 的完成事件發生時會收到通知,並能使用 Future
    計算的結果進行下一步的操做,不僅是簡單地阻塞等待操做的結果)

新的CompletableFuture將使得這些成爲可能。編程

    CompletableFuture

 異步執行

首先,CompletableFuture實現了Future接口,所以你能夠像Future那樣使用它。數組

其次,CompletableFuture並不是必定要交給線程池執行才能實現異步,你能夠像下面這樣實現異步運行。異步

public static void test1() throws Exception{
        CompletableFuture<String> completableFuture=new CompletableFuture();
        new Thread(new Runnable() {
            @Override
            public void run() {
                //模擬執行耗時任務
                System.out.println("task doing...");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //告訴completableFuture任務已經完成
                completableFuture.complete("result");
            }
        }).start();
        //獲取任務結果,若是沒有完成會一直阻塞等待
        String result=completableFuture.get();
        System.out.println("計算結果:"+result);
    }

 錯誤處理

    若是沒有意外,上面發的代碼工做得很正常。可是,若是任務執行過程當中產生了異常會怎樣呢?ide

    很是不幸,這種狀況下你會獲得一個至關糟糕的結果:異常會被限制在執行任務的線程的範圍內,最終會殺死該線程,而這會致使等待 get 方法返回結果的線程永久地被阻塞。客戶端可使用重載版本的 get 方法,它使用一個超時參數來避免發生這樣的狀況。這是一種值得推薦的作法,你應該儘可能在你的代碼中添加超時判斷的邏輯,避免發生相似的問題。使用這種方法至少能防止程序永久地等待下去,超時發生時,程序會獲得通知發生了 Timeout-Exception 。不過,也由於如此,你不能指定執行任務的線程內到底發生了什麼問題。函數

    爲了能獲取任務線程內發生的異常,你須要使用CompletableFuture 的completeExceptionally方法將致使CompletableFuture 內發生問題的異常拋出。這樣,當執行任務發生異常時,調用get()方法的線程將會收到一個 ExecutionException 異常,該異常接收了一個包含失敗緣由的Exception 參數。spa

public static void test2() throws Exception{
        CompletableFuture<String> completableFuture=new CompletableFuture();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    //模擬執行耗時任務
                    System.out.println("task doing...");
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    throw new RuntimeException("拋異常了");
                }catch (Exception e) {
                    //告訴completableFuture任務發生異常了
                    completableFuture.completeExceptionally(e);
                }
            }
        }).start();
        //獲取任務結果,若是沒有完成會一直阻塞等待
        String result=completableFuture.get();
        System.out.println("計算結果:"+result);
    }

  工廠方法

    前面咱們經過編程本身建立 CompletableFuture 對象以及如何獲取返回值,雖然看起來這些操做已經比較方便,但還有進一步提高的空間.CompletableFuture 類自身提供了大量精巧的工廠方法,使用這些方法能更容易地完成整個流程,還不用擔憂實現的細節。supplyAsync 方法接受一個生產者(Supplier)做爲參數,返回一個 CompletableFuture對象。生產者方法會交由 ForkJoinPool池中的某個執行線程( Executor )運行,可是你也可使用 supplyAsync 方法的重載版本,傳遞第二個參數指定線程池執行器執行生產者方法。線程

public static void test3() throws Exception {
        //supplyAsync內部使用ForkJoinPool線程池執行任務
        CompletableFuture<String> completableFuture=CompletableFuture.supplyAsync(()->{
            //模擬執行耗時任務
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //返回結果
            return "result";
        });
        System.out.println("計算結果:"+completableFuture.get());
    }

    allOf 工廠方法接收一個由CompletableFuture 構成的數組,數組中的全部 Completable-Future 對象執行完成以後,它返回一個 CompletableFuture<Void> 對象。這意味着,若是你須要等待多個 CompletableFuture 對象執行完畢,對 allOf 方法返回的CompletableFuture 執行 join 操做能夠等待CompletableFuture執行完成。code

    或者你可能但願只要 CompletableFuture 對象數組中有任何一個執行完畢就再也不等待,在這種狀況下,你可使用一個相似的工廠方法 anyOf 。該方法接收一個 CompletableFuture 對象構成的數組,返回由第一個執行完畢的 CompletableFuture 對象的返回值構成的 CompletableFuture<Object> 。對象

public static void test4() throws Exception {

        CompletableFuture<String> completableFuture1=CompletableFuture.supplyAsync(()->{
            //模擬執行耗時任務
            System.out.println("task1 doing...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //返回結果
            return "result1";
        });

        CompletableFuture<String> completableFuture2=CompletableFuture.supplyAsync(()->{
            //模擬執行耗時任務
            System.out.println("task2 doing...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //返回結果
            return "result2";
        });

        CompletableFuture<Object> anyResult=CompletableFuture.anyOf(completableFuture1,completableFuture2);

        System.out.println("第一個完成的任務結果:"+anyResult.get());

        CompletableFuture<Void> allResult=CompletableFuture.allOf(completableFuture1,completableFuture2);

        //阻塞等待全部任務執行完成
        allResult.join();
        System.out.println("全部任務執行完成");

    }

將兩個CompletableFuture創建聯繫

    一般,咱們會有多個須要獨立運行但又有所依賴的的任務。好比先等用於的訂單處理完畢而後才發送郵件通知客戶。

    thenCompose 方法容許你對兩個異步操做進行流水線,第一個操做完成時,將其結果做爲參數傳遞給第二個操做。你能夠建立兩個CompletableFutures 對象,對第一個 CompletableFuture 對象調用thenCompose ,並向其傳遞一個函數。當第一個CompletableFuture 執行完畢後,它的結果將做爲該函數的參數,這個函數的返回值是以第一個 CompletableFuture 的返回作輸入計算出的第二個 CompletableFuture 對象。

public static void test5() throws Exception {

        CompletableFuture<String> completableFuture1=CompletableFuture.supplyAsync(()->{
            //模擬執行耗時任務
            System.out.println("task1 doing...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //返回結果
            return "result1";
        });

        //等第一個任務完成後,將任務結果傳給參數result,執行後面的任務並返回一個表明任務的completableFuture
        CompletableFuture<String> completableFuture2= completableFuture1.thenCompose(result->CompletableFuture.supplyAsync(()->{
            //模擬執行耗時任務
            System.out.println("task2 doing...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //返回結果
            return "result2";
        }));

        System.out.println(completableFuture2.get());

    }

     另外一種比較常見的狀況是,你須要將兩個徹底不相干的 CompletableFuture 對象的結果整合起來,並且你也不但願等到第一個任務徹底結束纔開始第二項任務。這種狀況,你應該使用 thenCombine 方法,它接收名爲 BiFunction 的第二參數,這個參數定義了當兩個 CompletableFuture 對象完成計算後,結果如何合併。

public static void test6() throws Exception {

        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            //模擬執行耗時任務
            System.out.println("task1 doing...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //返回結果
            return 100;
        });

        //將第一個任務與第二個任務組合一塊兒執行,都執行完成後,將兩個任務的結果合併
        CompletableFuture<Integer> completableFuture2 = completableFuture1.thenCombine(
                //第二個任務
                CompletableFuture.supplyAsync(() -> {
                    //模擬執行耗時任務
                    System.out.println("task2 doing...");
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //返回結果
                    return 2000;
                }),
                //合併函數
                (result1, result2) -> result1 + result2);

        System.out.println(completableFuture2.get());

    }

響應 CompletableFuture 的 completion 事件

       咱們能夠在每一個CompletableFuture 上註冊一個操做,該操做會在 CompletableFuture 完成執行後調用它。CompletableFuture 經過 thenAccept 方法提供了這一功能,它接收CompletableFuture 執行完畢後的返回值作參數。

public static void test7() throws Exception {

        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            //模擬執行耗時任務
            System.out.println("task1 doing...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //返回結果
            return 100;
        });

        //註冊完成事件
        completableFuture1.thenAccept(result->System.out.println("task1 done,result:"+result));

        CompletableFuture<Integer> completableFuture2=
                //第二個任務
                CompletableFuture.supplyAsync(() -> {
                    //模擬執行耗時任務
                    System.out.println("task2 doing...");
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //返回結果
                    return 2000;
                });

        //註冊完成事件
        completableFuture2.thenAccept(result->System.out.println("task2 done,result:"+result));

        //將第一個任務與第二個任務組合一塊兒執行,都執行完成後,將兩個任務的結果合併
        CompletableFuture<Integer> completableFuture3 = completableFuture1.thenCombine(completableFuture2,
                //合併函數
                (result1, result2) -> {
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return result1 + result2;
                });

        System.out.println(completableFuture3.get());

    }

 

Notice:固然也不能盲目的使用completableFuture來代替Future,就好比Future中的cancel方法在CompletableFuture中就難以代替。此處還有些須要注意的地方。

相關文章
相關標籤/搜索