Java8新增的併發工具類-CompletableFuture



歡迎關注:本篇主要是介紹CompletableFuture的特性php

最近看到有地方使用CompletableFuture這個類本身沒有使用過,而後就花點時間學習了一下對於jdk新的併發工具必定要握,理論也要有,實戰也能少。
java

介紹web

        首先簡單的介紹一下CompletableFuture,咱們知道1.5之後jdk就開始支持Future異步編程,今天咱們看看java8中,併發包新增的一個併發工具CompletableFuture。它依然放在併發包中,喜歡研究高併發和多線程的朋友,想必早已經熟悉了一遍,做者依然是Doug Lea併發大神編寫。Java8新增了不少基於事件驅動的異步調用框架,好比CompletableFuture 經過回調函數,實現非阻塞的IO的異步調用。它擁有更強大的Future功能,而且實現了CompletionStage接口,這個接口中的方法很是多,每一個方法都有不同的功能和實現,你們能夠去嘗試的測試一下。sql

方法列表編程

         CompletableFutures中的方法列表介紹,這裏就不逐一測試,其中每一種類型基本有三個方法:一種是沒有使用線程池模式,一種是使用默認線程池,還有一種是使用自定義線程池。後端

//建立CompletableFuture對象
public static <U> CompletableFuture<U> completedFuture(U value)
//supplyAsync支持返回值異步任務,runAsync 不支持返回值的異步任務
public static CompletableFuture<Void>     runAsync(Runnable runnable)
public static CompletableFuture<Void>     runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U>     supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U>     supplyAsync(Supplier<U> supplier, Executor executor)


//thenApply當一個線程依賴另外一個線程時,可使用 thenApply 方法來把這兩個線程串行化,進行轉換
public <U> CompletableFuture<U>     thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

//thenAccept:接收任務的處理結果,並處理,無返回結果,能夠註冊一個action,而後消費結果
public CompletableFuture<Void>     thenAccept(Consumer<? super T> action)
public CompletableFuture<Void>     thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void>     thenAcceptAsync(Consumer<? super T> action, Executor executor)

//thenAcceptBoth:當兩個CompletionStage都執行完成後,把結果一塊交給thenAcceptBoth來進行消耗
public <U> CompletableFuture<Void>     thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void>     thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void>     thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)
public     CompletableFuture<Void>     runAfterBoth(CompletionStage<?> other,  Runnable action)

//thenRun:跟 thenAccept 方法不同的是,不關心任務的處理結果。只要上面的任務執行完成,就開始執行 thenAccept
public CompletableFuture<Void>     thenRun(Runnable action)
public CompletableFuture<Void>     thenRunAsync(Runnable action)
public CompletableFuture<Void>     thenRunAsync(Runnable action, Executor executor)

//thenCompose 方法容許你對兩個 CompletionStage 進行流水線操做,第一個操做完成時,將其結果做爲參數傳遞給第二個操做。
public <U> CompletableFuture<U>     thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U>     thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U>     thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)

//whenComplete:是執行當前任務的線程執行繼續執行 whenComplete 的任務,計算結果完成時的處理
public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)

//handle 也是結果處理
public <U> CompletableFuture<U>     handle(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U>     handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U>     handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)

//thenCombine:把兩個 CompletionStage 的任務都執行完成後,把兩個任務的結果一塊交給 thenCombine 來處理。
public <U,V> CompletableFuture<V>     thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V>     thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V>     thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

//acceptEither,當任意一個CompletableFuture計算完成的時候就會執行
public CompletableFuture<Void>     acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void>     acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void>     acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
public <U> CompletableFuture<U>     applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U>     applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U>     applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)

//allOf是等待全部任務完成,構造後CompletableFuture完成
public static CompletableFuture<Void>         allOf(CompletableFuture<?>... cfs)
//anyOf是隻要有一個任務完成,構造後CompletableFuture就完成
public static CompletableFuture<Object>     anyOf(CompletableFuture<?>... cfs)

簡單示例安全

        簡單的例子來理解一下,CompletableFuture的使用。supplyAsync提交一個Supplier異步任務;而後使用thenAccept異步處理結果,無需阻塞等待,至關於只要異步任務完成,使用回調的方式實現了非阻塞異步調用;其次使用exceptionally進行異常處理,比起傳統的Future是否是處理起來更加方便快捷。微信

@Test
public void completableFutureTest1() throws ExecutionException, InterruptedException {
    CompletableFuture completableFuture = CompletableFuture.supplyAsync(new Supplier() {
        @Override
        public String get() {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            int a = 1/0;
            return "supplyAsync";
        }
    }).thenAccept(result -> System.out.println(result))
            .exceptionally((Function<Throwable, Void>) t -> {
                System.out.println("error message:"+t.getMessage());
                return null;
            });
    //阻塞式的獲取結果
    //completableFuture.get();
    System.out.println("xsxsx");
}

        傳統的Future,若是捕獲異步任務異常,整個線程就會中斷,而且須要阻塞的等待結果,這裏CompletableFuture無需等待,程序繼續執行,當supplyAsync執行成功結果直接傳遞到thenAccept回調處理結果,CPU能夠繼續執行其餘的邏輯,而且能夠對異常狀況進行單獨處理。網絡

結合業務場景多線程

        下面針對實際的項目來談一談completableFuture的使用,以前作過一個小需求,就是一個用戶管理詳情頁頁信息:
1.加載用戶的基礎信息,銀行卡信息,積分信息,餘額查詢,還有風控安全等級信息、徵信、還有修改變動記錄等;
2.因爲不少東西沿用vm模版渲染,沒有作js單獨加載,其實這裏異步加載是最好的;
3.因爲有的接口還相互依賴,上一個接口的查詢結果,要傳遞到下面去查新的數據;

處理方式

        既然不想採用js異步加載,我就後端把全部的接口都查詢好,而且返回,發現同步的結果,這張頁面可能要十幾秒。後來採用Future+Callable異步處理,把結果放到一個採用阻塞的方式放到modelAndView中,時間確實縮短到了正常範圍.可是想想若是使用completableFuture是否方便快捷一些呢?後邊發現completableFuture只是幫咱們實現了非阻塞式的回調,和異常處理等,在開啓自定義線程池時,Future異步執行的效果和completableFuture差很少,只不過completableFuture功能更強大,方法庫更豐富。

測試

        下面我模擬一下簡單的場景,userSourceService中定義了查詢基礎信息、帳戶信息、和徵信信息的服務接口。其中每一個接口中MOCK一點數據,而後再sleep1秒.模擬IO操做。假如咱們須要依賴獲取queryUserInfo中的結果,而後再去查詢後面兩個用戶信息,這樣就可使用異步的Future去實現。

interface UserSourceService {

    /**
     * 查詢用戶基礎信息
     * @param userId
     * @return
     */

    UserBase queryUserInfo(String userId);

    /**
     * 查詢用戶帳戶信息
     * @param userId
     * @return
     */

    UserAccount queryUserAccountInfo(String userId);

    /**
     * 查詢徵信信息
     * @param userId
     * @return
     */

    UserCredit queryUserCreditInfo(String userId);

一、Future+stream

        這裏使用串行的Stream 而後使用executorService.submit提交任務

static List<String> userIdList = Arrays.asList("000001""000002""000003""000004");
static ExecutorService executorService = Executors.newCachedThreadPool();
private static UserSourceServiceImpl userSourceService = new UserSourceServiceImpl();

@Test
public void FutureTest() {
    Instant start = Instant.now();
    System.out.println(JSON.toJSONString(futureTest()));
    Instant end = Instant.now();
    System.out.println("耗時:"+ Duration.between( start,end ).toMillis());
}

public List<UserCredit> futureTest() {
    List<Future<UserCredit>> userCreditFuture = userIdList.stream()
            .map(userId -> executorService.submit(() -> userSourceService.queryUserInfo(userId)))
            .map(future -> executorService.submit(() -> userSourceService.queryUserAccountInfo(future.get().getUserId())))
            .map(future -> executorService.submit(() -> userSourceService.queryUserCreditInfo(future.get().getUserId())))
            .collect(toList());
    List<UserCredit> creditInfos = new ArrayList<>();
    userCreditFuture.parallelStream().forEach(future -> {
        try {
            creditInfos.add(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    });
    return creditInfos;
}

二、parallelStream

     這裏使用並行的Stream ,而後去掉自定義的線程池。

@Test
public void findUserByParallelStreamTest() 
{
    Instant start = Instant.now();
    System.out.println(JSON.toJSONString(findUserByParallelStream()));
    Instant end = Instant.now();
    System.out.println("耗時:"+ Duration.between( start,end ).toMillis());
}
public static List<UserCredit> findUserByParallelStream() {
    return userIdList.parallelStream()
            .map(userId -> userSourceService.queryUserInfo(userId))
            .map(user -> userSourceService.queryUserAccountInfo(user.getUserId()))
            .map(userAccount -> userSourceService.queryUserCreditInfo(userAccount.getUserId()))
            .collect(toList());
}

3.CompletableFuture

@Test
public void findUserByCompletableFutureTest() {
    Instant start = Instant.now();
    System.out.println(JSON.toJSONString(findUserByCompletableFuture4()));
    Instant end = Instant.now();
    System.out.println("耗時:"+ Duration.between( start,end ).toMillis());
}

public static List<UserCredit> findUserByCompletableFutureTest() {
    List<CompletableFuture<UserCredit>> queryUserFuture = userIdList.stream()
            .map(userId -> CompletableFuture.supplyAsync(() -> userSourceService.queryUserInfo(userId)))
            .map(future -> future.thenApply(user -> userSourceService.queryUserAccountInfo(user.getUserId())))
            .map(future -> future.thenCompose(userAccountInfo -> CompletableFuture.supplyAsync(() -> userSourceService.queryUserCreditInfo(userAccountInfo.getUserId()))))
            .collect(toList());

    //join 操做和get操做有相同的含義,等待全部異步操做的結果。
    return queryUserFuture.stream()
            .map(CompletableFuture::join)
            .collect(toList());
}

第一組:userIdList中4個元素
1.futureTest

耗時:3181毫秒

2.findUserByParallelStreamTest
耗時:3176毫秒

3.findUserByCompletableFuture4Test,使用自定義executorService
耗時:3177毫秒


第二組:userIdList中8個元素

1.futureTest

耗時:3184毫秒

2.findUserByParallelStreamTest
耗時:3187毫秒
3.findUserByCompletableFuture4Test ,使用自定義executorService
耗時:3158毫秒


第三組:userIdList中16個元素

1.futureTest

耗時:3210毫秒

2.findUserByParallelStreamTest
耗時:6190毫秒
3.findUserByCompletableFuture4Test ,使用自定義executorService
耗時:3177毫秒

        由上面的測試結果能夠看出,使用內置的線程池時,在併發數量比較少的狀況下ParallelStream和CompletableFuture 執行時間差很少,當數量遞增後,原生的Future+線程池和CompletableFuture 自定義線程池,性能更佳.發現ParallelStream內置的池線程數是根據CPU核數有關,因此通常只適合cpu密集型的計算模式,而自定義的線程池,能夠知足不一樣的線程數量需求,當IO密集型的操做比較多時,CUP的資源其實並無徹底利用,因此當使executorService的時候,IO型操做效果更佳。

傳統的Future痛點

1.經過使用Stream+傳統的Future的,雖然Future能夠知足咱們的異步處理,可是不支持上一個future依賴,而且轉換輸出,以及結果合併。
2.get是阻塞的,若是某一個異步任務中有沒有catch的運行時異常,這回致使get拋出異常退出運行,若是咱們再一個遍歷操做中,沒有顯示的處理,致使整個線程中斷.可能產生意想不到的風險,因此是否可以自動的幫咱們檢測異常,而且處理異常呢?
3.沒有一個比較好的回調機制,例如響應式的根據上一個異步任務結果觸發某個Action。
4.上面說的阻塞式結果傳遞,須要硬編碼保證,是否不須要這樣的顯示get,直接拿到結果,而且進行流式的計算,使得編碼更加流暢、優雅。

        其實經過測試發現,在IO密集型的任務中使用線程池的方式效率是最好的, 性能測試比較Future和CompletableFuture,並無質的差別,只是CompletableFuture支持更好的函數式響應式編程,以及非阻塞的處理結果,回調機制更加健全等

        上面說了一堆Future的缺點,其實這都不算啥致命的缺點,怎麼方便怎麼來就行,至少操做簡單,不少不肯接受新的工具,認爲有風險,有時候在沒有經驗時,保守一點,對項目也是一種負責。CompletableFuture也存在缺點, CompletableFuture並不能優雅地處理cancel(),由於他建立的時候能夠不和線程綁定在一塊兒,取消的時候,並不會去中斷線程,因此使用cancel的時候須要注意.可是傳統的Future並無這個問題。

總結

1.今天對於java8的Stream計算和併發工具CompletableFutures有了新的認識,項目中有相關的場景,咱們就能夠嘗試的運用;
        2.計算密集型操做,而且沒有I/O,推薦使用Stream接口。由於實現簡單,同時效率也多是最高的(若是全部的線程都是計算密集型的,那就沒有必要建立比處理器核數更多的線程;
        3.若是並行的工做單元還涉及等待I/O的操做(包括網絡鏈接等待),那麼使用CompletableFuture靈活性更好。這種狀況下處理流的流水線中若是發生I/O等待,流的延遲特性會讓咱們很難判斷到底何時觸發了等待;
        4.針對不太瞭解的功能,咱們須要多作測試,不要盲目的追求新工具,若是是使用不當可能達不到效果,甚是有更可怕的事情發生。

參考

1.《Java CompletableFuture 詳解》

2.《CompletableFuture 不能被中斷》


歷史文章

1.JVM-GC調優必會的知識 
2.Mysql事務隔離以及MVCC實現原理                                     
3.一次性瞭解Mysql的幾種日誌 
4.再談synchronized 


微信公衆號:MyClass社區

若有問題或建議,請公衆號留言。

喜歡請關注

本文分享自微信公衆號 - MyClass社區(MyClass_ZZ)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索