Java8的CompletableFuture詳解

Java5中,Future以及相關使用方法提供了異步執行任務的能力,但對於結果的獲取卻不是很方便,只能經過get()方法阻塞住調用線程直至計算完成返回結果或者isDone()方法輪詢的方式獲得任務結果,也能夠用cancel方法來中止任務的執行,阻塞的方式與咱們理解的異步編程實際上是相違背的,而輪詢又會耗無謂的CPU資源,並且還不能及時獲得計算結果,爲何不能用觀察者設計模式當計算結果完成及時通知監聽者呢?java

不少語言像Node.js,採用回調的方式實現異步編程。Java的一些框架像Netty,本身擴展Java的Future接口,提供了addListener等多個擴展方法:編程

ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
        future.addListener((channelFuture) -> {
            if (channelFuture.isSuccess()) {
                // SUCCESS
            } else {
                // FAILURE
            }
        });

guava裏面也提供了通用的擴展Future: ListenableFuture\SettableFuture以及輔助類Futures等,方便異步編程。bootstrap

做爲正統Java類庫,是否是應該加點什麼特性,能夠增強一下自身庫的功能?設計模式

Java8裏面新增長了一個包含50個方法左右的類:CompletableFuture。app

CompletableFuture類實現了CompletionStage和Future接口。提供了很是強大的Future的擴展功能,能夠幫助簡化異步編程的複雜性,提供了函數式編程能力,能夠經過回調的方式計算處理結果,而且提供了轉換和組織CompletableFuture的方法。框架

CompletableFuture 類實現了CompletionStage和Future接口,因此仍是能夠像之前同樣經過阻塞或輪詢的方式得到結果。儘管這種方式不推薦使用。異步

public T get()
public T get(long timeout, TimeUnit unit)
public T getNow(T valueIfAbsent)
public T join()

其中的getNow有點特殊,若是結果已經計算完則返回結果或拋異常,不然返回給定的valueIfAbsent的值。 join返回計算的結果或拋出一個uncheckd異常。函數式編程

CompletionStage是一個接口,從命名上看得知是一個完成的階段,它裏面的方法也標明是在某個運行階段獲得告終果以後要作的事情。異步編程

進行變換

public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);

以Async結尾的方法都是能夠異步執行的,若是指定了線程池,會在指定的線程池中執行,若是沒有指定,默認會在ForkJoinPool.commonPool()中執行,下文中將會有好多相似的,都不詳細解釋了。關鍵的入參只有一個Function,它是函數式接口,因此使用Lambda表示起來會更加優雅。它的入參是上一個階段計算後的結果,返回值是通過轉化後結果。函數

例如:

@Test
    public void thenApply() {
        String result = CompletableFuture.supplyAsync(() -> "hello").thenApply(s -> s + " world").join();
        System.out.println(result); // hello world
    }

進行消耗

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

thenAccept是針對結果進行消耗,由於他的入參是Consumer,有入參無返回值。

例如:

@Test
public void thenAccept(){    
       CompletableFuture.supplyAsync(() -> "hello").thenAccept(s -> System.out.println(s + " world"));
}

對上一步的計算結果不關心,執行下一個操做

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);

thenRun它的入參是一個Runnable的實例,表示當獲得上一步的結果時的操做。

例如:

@Test
    public void thenRun(){
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }).thenRun(() -> System.out.println("hello world")); // hello world
        while (true){}
    }

結合兩個CompletionStage的結果,進行轉化後返回

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);

它須要原來的處理返回值,而且other表明的CompletionStage也要返回值以後,利用這兩個返回值,進行轉換後返回指定類型的值。 例如:

@Test
    public void thenCombine() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "world";
        }), (s1, s2) -> s1 + " " + s2).join();
        System.out.println(result); // hello world
    }

結合兩個CompletionStage的結果,進行消耗

public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action,     Executor executor);

它須要原來的處理返回值,而且other表明的CompletionStage也要返回值以後,利用這兩個返回值,進行消耗。

例如:

@Test
    public void thenAcceptBoth() {
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "world";
        }), (s1, s2) -> System.out.println(s1 + " " + s2)); // hello world
        while (true){}
    }

在兩個CompletionStage都運行完執行

public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);

不關心這兩個CompletionStage的結果,只關心這兩個CompletionStage執行完畢,以後在進行操做(Runnable)。

例如:

@Test
    public void runAfterBoth(){
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).runAfterBothAsync(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s2";
        }), () -> System.out.println("hello world")); // hello world
        while (true){}
    }

兩個CompletionStage,誰計算的快,我就用那個CompletionStage的結果進行下一步的轉化操做

public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);

咱們現實開發場景中,總會碰到有兩種渠道完成同一個事情,因此就能夠調用這個方法,找一個最快的結果進行處理。

例如:

@Test
    public void applyToEither() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).applyToEither(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello world";
        }), s -> s).join();
        System.out.println(result); // hello world
    }

兩個CompletionStage,誰計算的快,我就用那個CompletionStage的結果進行下一步的消耗操做

public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);

例如:

@Test
    public void acceptEither() {
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).acceptEither(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello world";
        }), System.out::println); // hello world
        while (true){}
    }

兩個CompletionStage,任何一個完成了都會執行下一步的操做(Runnable)

public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);

例如:

@Test
    public void runAfterEither() {
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).runAfterEither(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s2";
        }), () -> System.out.println("hello world")); // hello world
        while (true) {
        }
    }

當運行時出現了異常,能夠經過exceptionally進行補償

public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);

例如:

@Test
    public void exceptionally() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (1 == 1) {
                throw new RuntimeException("測試一下異常狀況");
            }
            return "s1";
        }).exceptionally(e -> {
            System.out.println(e.getMessage()); // java.lang.RuntimeException: 測試一下異常狀況
            return "hello world";
        }).join();
        System.out.println(result); // hello world
    }

當運行完成時,對結果的記錄。這裏的完成時有兩種狀況,一種是正常執行,返回值。另一種是遇到異常拋出形成程序的中斷。這裏爲何要說成記錄,由於這幾個方法都會返回CompletableFuture,當Action執行完畢後它的結果返回原始的CompletableFuture的計算結果或者返回異常。因此不會對結果產生任何的做用

public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,Executor executor);

例如:

@Test
    public void whenComplete() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (1 == 1) {
                throw new RuntimeException("測試一下異常狀況");
            }
            return "s1";
        }).whenComplete((s, t) -> {
            System.out.println(s);
            System.out.println(t.getMessage());
        }).exceptionally(e -> {
            System.out.println(e.getMessage());
            return "hello world";
        }).join();
        System.out.println(result);
    }

結果:

null
java.lang.RuntimeException: 測試一下異常狀況
java.lang.RuntimeException: 測試一下異常狀況
hello world

運行完成時,對結果的處理。這裏的完成時有兩種狀況,一種是正常執行,返回值。另一種是遇到異常拋出形成程序的中斷

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

例如: 出現異常時

@Test
    public void handle() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //出現異常
            if (1 == 1) {
                throw new RuntimeException("測試一下異常狀況");
            }
            return "s1";
        }).handle((s, t) -> {
            if (t != null) {
                return "hello world";
            }
            return s;
        }).join();
        System.out.println(result); // hello world
    }

未出現異常時

@Test
    public void handle() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).handle((s, t) -> {
            if (t != null) {
                return "hello world";
            }
            return s;
        }).join();
        System.out.println(result); // s1
    }

上面就是CompletionStage接口中方法的使用實例,CompletableFuture一樣也一樣實現了Future,因此也一樣可使用get進行阻塞獲取值,總的來講,CompletableFuture使用起來仍是比較爽的,看起來也比較優雅一點。

相關文章
相關標籤/搜索