Java8新的異步編程方式 CompletableFuture(三)

前面兩篇文章已經整理了CompletableFuture大部分的特性,本文會整理完CompletableFuture餘下的特性,以及將它跟RxJava進行比較。java

3.6 Either

Either 表示的是兩個CompletableFuture,當其中任意一個CompletableFuture計算完成的時候就會執行。編程

方法名 描述
acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) 當任意一個CompletableFuture完成的時候,action這個消費者就會被執行。
acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) 當任意一個CompletableFuture完成的時候,action這個消費者就會被執行。使用ForkJoinPool
acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor) 當任意一個CompletableFuture完成的時候,action這個消費者就會被執行。使用指定的線程池
Random random = new Random();

        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{

            try {
                Thread.sleep(random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            return "from future1";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{

            try {
                Thread.sleep(random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            return "from future2";
        });

        CompletableFuture<Void> future =  future1.acceptEither(future2,str->System.out.println("The future is "+str));

        try {
            future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }複製代碼

執行結果:The future is from future1 或者 The future is from future2。
由於future1和future2,執行的順序是隨機的。架構

applyToEither 跟 acceptEither 相似。併發

方法名 描述
applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn) 當任意一個CompletableFuture完成的時候,fn會被執行,它的返回值會看成新的CompletableFuture<U>的計算結果。
applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn) 當任意一個CompletableFuture完成的時候,fn會被執行,它的返回值會看成新的CompletableFuture<U>的計算結果。使用ForkJoinPool
applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor) 當任意一個CompletableFuture完成的時候,fn會被執行,它的返回值會看成新的CompletableFuture<U>的計算結果。使用指定的線程池
Random random = new Random();

        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{

            try {
                Thread.sleep(random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            return "from future1";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{

            try {
                Thread.sleep(random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            return "from future2";
        });

        CompletableFuture<String> future =  future1.applyToEither(future2,str->"The future is "+str);

        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }複製代碼

執行結果也跟上面的程序相似。app

3.7 其餘方法

allOf、anyOf是CompletableFuture的靜態方法。框架

3.7.1 allOf

方法名 描述
allOf(CompletableFuture<?>... cfs) 在全部Future對象完成後結束,並返回一個future。

allOf()方法所返回的CompletableFuture,並不能組合前面多個CompletableFuture的計算結果。因而咱們藉助Java 8的Stream來組合多個future的結果。dom

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "tony");

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "cafei");

        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "aaron");

        CompletableFuture.allOf(future1, future2, future3)
                .thenApply(v ->
                Stream.of(future1, future2, future3)
                        .map(CompletableFuture::join)
                        .collect(Collectors.joining(" ")))
                .thenAccept(System.out::print);複製代碼

執行結果:異步

tony cafei aaron複製代碼

3.7.2 anyOf

方法名 描述
anyOf(CompletableFuture<?>... cfs) 在任何一個Future對象結束後結束,並返回一個future。
Random rand = new Random();
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(rand.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "from future1";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(rand.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "from future2";
        });
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(rand.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "from future3";
        });

        CompletableFuture<Object> future =  CompletableFuture.anyOf(future1,future2,future3);

        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }複製代碼

使用anyOf()時,只要某一個future完成,就結束了。因此執行結果多是"from future1"、"from future2"、"from future3"中的任意一個。async

anyOf 和 acceptEither、applyToEither的區別在於,後二者只能使用在兩個future中,而anyOf可使用在多個future中。異步編程

3.8 CompletableFuture異常處理

CompletableFuture在運行時若是遇到異常,可使用get()並拋出異常進行處理,但這並非一個最好的方法。CompletableFuture自己也提供了幾種方式來處理異常。

3.8.1 exceptionally

方法名 描述
exceptionally(Function fn) 只有當CompletableFuture拋出異常的時候,纔會觸發這個exceptionally的計算,調用function計算值。
CompletableFuture.supplyAsync(() -> "hello world")
                .thenApply(s -> {
                    s = null;
                    int length = s.length();
                    return length;
                }).thenAccept(i -> System.out.println(i))
                .exceptionally(t -> {
                    System.out.println("Unexpected error:" + t);
                    return null;
                });複製代碼

執行結果:

Unexpected error:java.util.concurrent.CompletionException: java.lang.NullPointerException複製代碼

對上面的代碼稍微作了一下修改,修復了空指針的異常。

CompletableFuture.supplyAsync(() -> "hello world")
                .thenApply(s -> {
// s = null;
                    int length = s.length();
                    return length;
                }).thenAccept(i -> System.out.println(i))
                .exceptionally(t -> {
                    System.out.println("Unexpected error:" + t);
                    return null;
                });複製代碼

執行結果:

11複製代碼

3.8.2 whenComplete

whenComplete 在上一篇文章其實已經介紹過了,在這裏跟exceptionally的做用差很少,能夠捕獲任意階段的異常。若是沒有異常的話,就執行action。

CompletableFuture.supplyAsync(() -> "hello world")
                .thenApply(s -> {
                    s = null;
                    int length = s.length();
                    return length;
                }).thenAccept(i -> System.out.println(i))
                .whenComplete((result, throwable) -> {

                    if (throwable != null) {
                       System.out.println("Unexpected error:"+throwable);
                    } else {
                        System.out.println(result);
                    }

                });複製代碼

執行結果:

Unexpected error:java.util.concurrent.CompletionException: java.lang.NullPointerException複製代碼

跟whenComplete類似的方法是handle,handle的用法在上一篇文章中也已經介紹過。

四. CompletableFuture VS Java8 Stream VS RxJava1 & RxJava2

CompletableFuture 有不少特性跟RxJava很像,因此將CompletableFuture、Java 8 Stream和RxJava作一個相互的比較。

composable lazy resuable async cached push back pressure
CompletableFuture 支持 不支持 支持 支持 支持 支持 不支持
Stream 支持 支持 不支持 不支持 不支持 不支持 不支持
Observable(RxJava1) 支持 支持 支持 支持 支持 支持 支持
Observable(RxJava2) 支持 支持 支持 支持 支持 支持 不支持
Flowable(RxJava2) 支持 支持 支持 支持 支持 支持 支持

五. 總結

Java 8提供了一種函數風格的異步和事件驅動編程模型CompletableFuture,它不會形成堵塞。CompletableFuture背後依靠的是fork/join框架來啓動新的線程實現異步與併發。固然,咱們也能經過指定線程池來作這些事情。

CompletableFuture特別是對微服務架構而言,會有很大的做爲。舉一個具體的場景,電商的商品頁面可能會涉及到商品詳情服務、商品評論服務、相關商品推薦服務等等。獲取商品的信息時(/productdetails?productid=xxx),須要調用多個服務來處理這一個請求並返回結果。這裏可能會涉及到併發編程,咱們徹底可使用Java 8的CompletableFuture或者RxJava來實現。

先前的文章:
Java8新的異步編程方式 CompletableFuture(一)
Java8新的異步編程方式 CompletableFuture(二)

相關文章
相關標籤/搜索