異步技巧之CompletableFuture

若是喜歡微信閱讀,想了解更多java知識,系統設計,分佈式中間件等能夠關注個人微信號: java喝咖啡,固然還有更多福利等着你。java

1.Future接口

1.1 什麼是Future?

在jdk的官方的註解中寫道git

A {@code Future} represents the result of an asynchronous
 * computation.  Methods are provided to check if the computation is
 * complete, to wait for its completion, and to retrieve the result of
 * the computation.
複製代碼

在上面的註釋中咱們能知道Future用來表明異步的結果,而且提供了檢查計算完成,等待完成,檢索結果完成等方法。簡而言之就是提供一個異步運算結果的一個建模。它可讓咱們把耗時的操做從咱們自己的調用線程中釋放出來,只須要完成後再進行回調。就好像咱們去飯店裏面吃飯,不須要你去煮飯,而你這個時候能夠作任何事,而後飯煮好後就會回調你去吃。github

1.2 JDK8之前的Future

在JDK8之前的Future使用比較簡單,咱們只須要把咱們須要用來異步計算的過程封裝在Callable或者Runnable中,好比一些很耗時的操做(不能佔用咱們的調用線程時間的),而後再將它提交給咱們的線程池ExecutorService。代碼例子以下:編程

public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<String> future = executor.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return Thread.currentThread().getName();
            }
        });

        doSomethingElse();//在咱們異步操做的同時同樣能夠作其餘操做
        try {
            String res = future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
複製代碼

上面展現了咱們的線程能夠併發方式調用另外一個線程去作咱們耗時的操做。當咱們必須依賴咱們的異步結果的時候咱們就能夠調用get方法去得到。當咱們調用get方法的時候若是咱們的任務完成就能夠立馬返回,可是若是任務沒有完成就會阻塞,直到超時爲止。bash

Future底層是怎麼實現的呢? 咱們首先來到咱們ExecutorService的代碼中submit方法這裏會返回一個Future微信

public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
複製代碼

在sumbmit中會對咱們的Callable進行包裝封裝成咱們的FutureTask,咱們最後的Future其實也是Future的實現類FutureTask,FutureTask實現了Runnable接口因此這裏直接調用execute。在FutureTask代碼中的run方法代碼以下:併發

public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } 
        .......
    }
複製代碼

能夠看見當咱們執行完成以後會set(result)來通知咱們的結果完成了。set(result)代碼以下:app

protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
複製代碼

首先用CAS置換狀態爲完成,以及替換結果,當替換結果完成以後,纔會替換爲咱們的最終狀態,這裏主要是怕咱們設置完COMPLETING狀態以後最終值尚未真正的賦值出去,而咱們的get就去使用了,因此還會有個最終狀態。咱們的get()方法的代碼以下:異步

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
複製代碼

首先得到當前狀態,而後判斷狀態是否完成,若是沒有完成則進入awaitDone循環等待,這也是咱們阻塞的代碼,而後返回咱們的最終結果。async

1.2.1缺陷

咱們的Future使用很簡單,這也致使了若是咱們想完成一些複雜的任務可能就比較難。好比下面一些例子:

  • 將兩個異步計算合成一個異步計算,這兩個異步計算互相獨立,同時第二個又依賴第一個的結果。
  • 當Future集合中某個任務最快結束時,返回結果。
  • 等待Future結合中的全部任務都完成。
  • 經過編程方式完成一個Future任務的執行。
  • 應對Future的完成時間。也就是咱們的回調通知。

1.3CompletableFuture

CompletableFuture是JDK8提出的一個支持非阻塞的多功能的Future,一樣也是實現了Future接口。

1.3.1CompletableFuture基本實現

下面會寫一個比較簡單的例子:

public static void main(String[] args) {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        new Thread(()->{
            completableFuture.complete(Thread.currentThread().getName());
        }).start();
        doSomethingelse();//作你想作的其餘操做
        
        try {
            System.out.println(completableFuture.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
複製代碼

用法上來講和Future有一點不一樣,咱們這裏fork了一個新的線程來完成咱們的異步操做,在異步操做中咱們會設置值,而後在外部作咱們其餘操做。在complete中會用CAS替換result,而後當咱們get若是能夠獲取到值得時候就能夠返回了。

1.3.2錯誤處理

上面介紹了正常狀況下可是當咱們在咱們異步線程中產生了錯誤的話就會很是的不幸,錯誤的異常不會告知給你,會被扼殺在咱們的異步線程中,而咱們的get方法會被阻塞。

對於咱們的CompletableFuture提供了completeException方法可讓咱們返回咱們異步線程中的異常,代碼以下:

public static void main(String[] args) {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        new Thread(()->{
            completableFuture.completeExceptionally(new RuntimeException("error"));
            completableFuture.complete(Thread.currentThread().getName());
        }).start();
//        doSomethingelse();//作你想作的耗時操做

        try {
            System.out.println(completableFuture.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
--------------
輸出:
java.util.concurrent.ExecutionException: java.lang.RuntimeException: error
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1887)
	at futurepackge.jdk8Future.main(jdk8Future.java:19)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.lang.RuntimeException: error
	at futurepackge.jdk8Future.lambda$main$0(jdk8Future.java:13)
	at futurepackge.jdk8Future$$Lambda$1/1768305536.run(Unknown Source)
	at java.lang.Thread.run(Thread.java:745)
複製代碼

在咱們新建的異步線程中直接New一個異常拋出,在咱們客戶端中依然能夠得到異常。

1.3.2工廠方法建立CompletableFuture

咱們的上面的代碼雖然不復雜,可是咱們的java8依然對其提供了大量的工廠方法,用這些方法更容易完成整個流程。以下面的例子:

public static void main(String[] args) {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() ->{
                return Thread.currentThread().getName();
        });
//        doSomethingelse();//作你想作的耗時操做

        try {
            System.out.println(completableFuture.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
---------
輸出:
ForkJoinPool.commonPool-worker-1
複製代碼

上面的例子經過工廠方法supplyAsync提供了一個Completable,在異步線程中的輸出是ForkJoinPool能夠看出當咱們不指定線程池的時候會使用ForkJoinPool,而咱們上面的compelte的操做在咱們的run方法中作了,源代碼以下:

public void run() {
            CompletableFuture<T> d; Supplier<T> f;
            if ((d = dep) != null && (f = fn) != null) {
                dep = null; fn = null;
                if (d.result == null) {
                    try {
                        d.completeValue(f.get());
                    } catch (Throwable ex) {
                        d.completeThrowable(ex);
                    }
                }
                d.postComplete();
            }
        }
複製代碼

上面代碼中經過d.completeValue(f.get());設置了咱們的值。一樣的構造方法還有runasync等等。

1.3.3計算結果完成時的處理

當CompletableFuture計算結果完成時,咱們須要對結果進行處理,或者當CompletableFuture產生異常的時候須要對異常進行處理。有以下幾種方法:

public CompletableFuture<T> 	whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> 	whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> 	whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)
複製代碼

上面的四種方法都返回了CompletableFuture,當咱們Action執行完畢的時候,future返回的值和咱們原始的CompletableFuture的值是同樣的。上面以Async結尾的會在新的線程池中執行,上面沒有一Async結尾的會在以前的CompletableFuture執行的線程中執行。例子代碼以下:

public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(jdk8Future::getMoreData);
        Future<Integer> f = future.whenComplete((v, e) -> {
            System.out.println(Thread.currentThread().getName());
            System.out.println(v);
        });
        System.out.println("Main" + Thread.currentThread().getName());
        System.out.println(f.get());
    }
複製代碼

exceptionally方法返回一個新的CompletableFuture,當原始的CompletableFuture拋出異常的時候,就會觸發這個CompletableFuture的計算,調用function計算值,不然若是原始的CompletableFuture正常計算完後,這個新的CompletableFuture也計算完成,它的值和原始的CompletableFuture的計算的值相同。也就是這個exceptionally方法用來處理異常的狀況。

1.3.4計算結果完成時的轉換

上面咱們討論瞭如何計算結果完成時進行的處理,接下來咱們討論如何對計算結果完成時,對結果進行轉換。

public <U> CompletableFuture<U> 	thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> 	thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> 	thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
複製代碼

這裏一樣也是返回CompletableFuture,可是這個結果會由咱們自定義返回去轉換他,一樣的不以Async結尾的方法由原來的線程計算,以Async結尾的方法由默認的線程池ForkJoinPool.commonPool()或者指定的線程池executor運行。Java的CompletableFuture類老是遵循這樣的原則,下面就不一一贅述了。 例子代碼以下:

public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            return 10;
        });
        CompletableFuture<String> f = future.thenApply(i ->i+1 ).thenApply(i-> String.valueOf(i));
        System.out.println(f.get());
    }
複製代碼

上面的最終結果會輸出11,咱們成功將其用兩個thenApply轉換爲String。

1.3.5計算結果完成時的消費

上面已經講告終果完成時的處理和轉換,他們最後的CompletableFuture都會返回對應的值,這裏還會有一個只會對計算結果消費不會返回任何結果的方法。

public CompletableFuture<Void> 	thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> 	thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> 	thenAcceptAsync(Consumer<? super T> action, Executor executor)
複製代碼

函數接口爲Consumer,就知道了只會對函數進行消費,例子代碼以下:

public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            return 10;
        });
        future.thenAccept(System.out::println);
    }
複製代碼

這個方法用法很簡單我就很少說了.Accept家族還有個方法是用來合併結果當兩個CompletionStage都正常執行的時候就會執行提供的action,它用來組合另一個異步的結果。

public <U> CompletableFuture<Void> 	thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void> 	thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void> 	thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)
public     CompletableFuture<Void> 	runAfterBoth(CompletionStage<?> other,  Runnable action)
複製代碼

runAfterBoth是當兩個CompletionStage都正常完成計算的時候,執行一個Runnable,這個Runnable並不使用計算的結果。 示例代碼以下:

public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            return 10;
        });
        System.out.println(future.thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
            return 20;
        }),(x,y) -> System.out.println(x+y)).get());
    }
複製代碼

CompletableFuture也提供了執行Runnable的辦法,這裏咱們就不能使用咱們future中的值了。

public CompletableFuture<Void> 	thenRun(Runnable action)
public CompletableFuture<Void> 	thenRunAsync(Runnable action)
public CompletableFuture<Void> 	thenRunAsync(Runnable action, Executor executor)
複製代碼

1.3.6對計算結果的組合

首先是介紹一下鏈接兩個future的方法:

public <U> CompletableFuture<U> 	thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> 	thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> 	thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)
複製代碼

對於Compose能夠鏈接兩個CompletableFuture,其內部處理邏輯是當第一個CompletableFuture處理沒有完成時會合併成一個CompletableFuture,若是處理完成,第二個future會緊接上一個CompletableFuture進行處理。

public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            return 10;
        });
        System.out.println(future.thenCompose(i -> CompletableFuture.supplyAsync(() -> { return i+1;})).get());
    }
複製代碼

咱們上面的thenAcceptBoth講了合併兩個future,可是沒有返回值這裏將介紹一個有返回值的方法,以下:

public <U,V> CompletableFuture<V> 	thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> 	thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> 	thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
複製代碼

例子比較簡單以下:

public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            return 10;
        });
        CompletableFuture<String> f = future.thenCombine(CompletableFuture.supplyAsync(() -> {
            return 20;
        }),(x,y) -> {return "計算結果:"+x+y;});
        System.out.println(f.get());
    }
複製代碼

上面介紹了兩個future完成的時候應該完成的工做,接下來介紹任意一個future完成時須要執行的工做,方法以下:

public CompletableFuture<Void> 	acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> 	acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> 	acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
public <U> CompletableFuture<U> 	applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U> 	applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U> 	applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)
複製代碼

上面兩個是一個是純消費不返回結果,一個是計算後返回結果。

1.3.6其餘方法

public static CompletableFuture<Void> 	    allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> 	anyOf(CompletableFuture<?>... cfs)
複製代碼

allOf方法是當全部的CompletableFuture都執行完後執行計算。

anyOf方法是當任意一個CompletableFuture執行完後就會執行計算,計算的結果相同。

1.3.7建議

CompletableFuture和Java8的Stream搭配使用對於一些並行訪問的耗時操做有很大的性能提升,能夠自行了解。

最後這篇文章被我收錄於JGrowing,一個全面,優秀,由社區一塊兒共建的Java學習路線,若是您想參與開源項目的維護,能夠一塊兒共建,github地址爲:github.com/javagrowing… 麻煩給個小星星喲。

想要獲取更多信息請關注技術公衆號

相關文章
相關標籤/搜索