強大的CompletableFuture

圖片描述

引子

爲了讓程序更加高效,讓CPU最大效率的工做,咱們會採用異步編程。首先想到的是開啓一個新的線程去作某項工做。再進一步,爲了讓新線程能夠返回一個值,告訴主線程事情作完了,因而乎Future粉墨登場。然而Future提供的方式是主線程主動問詢新線程,要是有個回調函數就爽了。因此,爲了知足Future的某些遺憾,強大的CompletableFuture隨着Java8一塊兒來了。算法

Future

傳統多線程的卻讓程序更加高效,畢竟是異步,可讓CPU充分工做,但這僅限於新開的線程無需你的主線程再費心了。好比你開啓的新線程僅僅是爲了計算1+...+n再打印結果。有時候你須要子線程返回計算結果,在主線程中進行進一步計算,就須要Future了。編程

看下面這個例子,主線程計算2+4+6+8+10;子線程計算1+3+5+7+9;最後須要在主線程中將兩部分結果再相加。網絡

public class OddNumber implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        Thread.sleep(3000);
        int result = 1 + 3 + 5 + 7 + 9;
        return result;
    }
}
public class FutureTest {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        OddNumber oddNumber = new OddNumber();
        Future<Integer> future = executor.submit(oddNumber);
        long startTime = System.currentTimeMillis();
        int evenNumber = 2 + 4 + 6 + 8 + 10;
        try {
            Thread.sleep(1000);
            System.out.println("0.開始了:"+ (System.currentTimeMillis()-startTime) +"秒");
            int oddNumberResult = future.get();//這時間會被阻塞
            System.out.println("1+2+...+9+10="+(evenNumber+oddNumberResult));
            System.out.println("1.開始了:"+ (System.currentTimeMillis()-startTime) +"秒");
        } catch (Exception e) {
            System.out.println(e);
        }
    }
}
輸出結果:
0.開始了:1001秒
1+2+...+9+10=55
1.開始了:3002秒

看一下Future接口,只有五個方法比較簡單多線程

//取消任務,若是已經完成或者已經取消,就返回失敗
boolean cancel(boolean mayInterruptIfRunning);
//查看任務是否取消
boolean isCancelled();
//查看任務是否完成
boolean isDone();
//剛纔用到了,查看結果,任務未完成就一直阻塞
V get() throws InterruptedException, ExecutionException;
//同上,可是加了一個過時時間,防止長時間阻塞,主線程也作不了事情
V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;

CompletableFuture

上面的看到Future的五個方法,不是很豐富,既然咱們的主線程叫作main,就應該以我爲主,我更但願子線程作完了事情主動通知我。爲此,Java8帶來了CompletableFuture,一個Future的實現類。其實CompletableFuture最迷人的地方並非極大豐富了Future的功能,而是完美結合了Java8流的新特性。框架

實現回調,自動後續操做

提早說一下CompletableFuture實現回調的方法(之一):thenAccept()異步

public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
        return uniAcceptStage(null, action);
    }

參數有個Consumer,用到了Java8新特性,行爲參數化,就是參數不必定是基本類型或者類,也可以使是函數(行爲),或者說一個方法(接口)。async

public class OddNumberPlus implements Supplier<Integer> {
    @Override
    public Integer get() {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 1+3+5+7+9;
    }
}
public class CompletableFutureTest {
    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();
        final int evenNumber = 2 + 4 + 6 + 8 + 10;
        CompletableFuture<Integer> oddNumber = CompletableFuture.supplyAsync(new OddNumberPlus());
        try {
            Thread.sleep(1000);
            System.out.println("0.開始了:"+ (System.currentTimeMillis()-startTime) +"秒");
            //看這裏,實現回調
            oddNumber.thenAccept(oddNumberResult->
                        {
                            System.out.println("1.開始了:"+ (System.currentTimeMillis()-startTime) +"秒");
                            System.out.println("此時計算結果爲:"+(evenNumber+oddNumberResult));
                        });
            oddNumber.get();
        } catch (Exception e) {
            System.out.println(e);
        }
    }
}
輸出結果:
0.開始了:1006秒
1.開始了:3006秒
此時計算結果爲:55

值得一提的是,本例中並無顯示的建立任務鏈接池,程序會默認選擇一個任務鏈接池ForkJoinPool.commonPool()ide

private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

ForkJoinPool始自JDK7,叫作分支/合併框架。能夠經過將一個任務遞歸分紅不少分子任務,造成不一樣的流,進行並行執行,同時還伴隨着強大的工做竊取算法。極大的提升效率。固然,你也能夠本身指定鏈接池。異步編程

CompletableFuture合併

Java8的確豐富了Future實現,CompletableFuture有不少方法可供你們使用,可是但從上面的例子來看,其實CompletableFuture能作的功能,貌似Future。畢竟你CompletableFuture用get()這個方法的時候還不是阻塞了,我Future蠻能夠本身拿到返回值,再手動執行一些操做嘛(雖然說這樣main方法必定很不爽)。那麼接下來的事情,Future作起來就十分麻煩了。假設咱們main方法只作奇數合集加上偶數合集這一個操做,提早算這兩個合集的操做異步交給兩個子線程,咱們須要怎麼作呢?沒錯,開啓兩個線程,等到兩個線程都計算結束的時候,咱們進行最後的相加,問題在於,你怎麼知道那個子線程最後結束的呢?(貌似能夠作個輪詢,不定的調用isDone()這個方法...)豐富的CompletableFuture功能爲咱們提供了一個方法,用於等待兩個子線程都結束了,再進行相加操做:函數

//asyncPool就是上面提到的默認線程池ForkJoinPool
    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(asyncPool, other, fn);
    }

看個例子:

public class OddCombine implements Supplier<Integer> {
    @Override
    public Integer get() {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 1+3+5+7+9;
    }
}
public class EvenCombine implements Supplier<Integer> {
    @Override
    public Integer get() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 2+4+6+8+10;
    }
}
public class CompletableCombineTest {
    public static void main(String[] args) throws Exception{
        CompletableFuture<Integer> oddNumber = CompletableFuture.supplyAsync(new OddCombine());
        CompletableFuture<Integer> evenNumber = CompletableFuture.supplyAsync(new EvenCombine());
        long startTime = System.currentTimeMillis();
        CompletableFuture<Integer> resultFuturn = oddNumber.thenCombine(evenNumber,(odd,even)->{
            return odd + even;
        });
        System.out.println(resultFuturn.get());
        System.out.println("0.開始了:"+ (System.currentTimeMillis()-startTime) +"秒");
    }
}
輸出結果:
55
0.開始了:3000秒

這邊模擬一個睡1秒,一個睡3秒,可是真正的網絡請求時間是不定的。是否是很爽,最爽的還不是現象,而是以上操做已經利用了Java8流的概念。

兩個子線程還不夠,那麼還有anyOff()函數,能夠承受多個CompletableFuture,會等待全部任務都完成。

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
        return andTree(cfs, 0, cfs.length - 1);
    }

與它長的很像的,有個方法,是當第一個執行結束的時候,就結束,後面任務再也不等了,能夠看做充分條件。

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
        return orTree(cfs, 0, cfs.length - 1);
    }

在上面那個例子的基礎上,把OddNumberPlus類時間調長一點:

public class OddNumberPlus implements Supplier<Integer> {
    @Override
    public Integer get() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 1+3+5+7+9;
    }
}
public class CompletableCombineTest {
    public static void main(String[] args) throws Exception{
        CompletableFuture<Integer> oddNumber = CompletableFuture.supplyAsync(new OddCombine());
        CompletableFuture<Integer> evenNumber = CompletableFuture.supplyAsync(new EvenCombine());
        CompletableFuture<Integer> testNumber = CompletableFuture.supplyAsync(new OddNumberPlus());
        long startTime = System.currentTimeMillis();
        CompletableFuture<Object> resultFuturn = CompletableFuture.anyOf(oddNumber,evenNumber,testNumber);
        System.out.println(resultFuturn.get());
        System.out.println("0.開始了:"+ (System.currentTimeMillis()-startTime) +"秒");
    }
}
輸出結果:
30
0.開始了:1000秒

小結

CompletableFuture的方法其實還有不少,經常使用的好比說runAsync(),相似於supplyAsync(),只是沒有返回值;除了thenApply()能夠加回調函數之外,還有thenApply();還有注入runAfterBoth()、runAfterEither(),這些見名知意。還有不少,能夠點開CompletableFuture這個類的源碼仔細看一看。見微知著,透過CompletableFuture,更加感受到Java8的強大,強大的流概念、行爲參數化、高效的並行理念等等,不只讓Java寫起來更爽,還不斷豐富Java整個生態。Java一直在進步,因此沒有被時代淘汰,咱們Javaer也能夠繼續職業生涯,感謝Java,一塊兒進步。

相關文章
相關標籤/搜索