歡迎關注:本篇主要是介紹CompletableFuture的特性php
最近看到有地方使用CompletableFuture這個類,本身沒有使用過,而後就花點時間學習了一下。對於jdk新的併發工具必定要掌握,理論也要有,實戰也不能少。
java
介紹web
首先簡單的介紹一下CompletableFuture,咱們知道1.5之後jdk就開始支持Future異步編程,今天咱們看看java8中,併發包新增的一個併發工具CompletableFuture。它依然放在併發包中,喜歡研究高併發和多線程的朋友,想必早已經熟悉了一遍,做者依然是Doug Lea併發大神編寫。Java8新增了不少基於事件驅動的異步調用框架,好比CompletableFuture 經過回調函數,實現非阻塞的IO的異步調用。它擁有更強大的Future功能,而且實現了CompletionStage接口,這個接口中的方法很是多,每一個方法都有不同的功能和實現,你們能夠去嘗試的測試一下。sql
方法列表編程
CompletableFutures中的方法列表介紹,這裏就不逐一測試,其中每一種類型基本有三個方法:一種是沒有使用線程池模式,一種是使用默認線程池,還有一種是使用自定義線程池。後端
//建立CompletableFuture對象
public static <U> CompletableFuture<U> completedFuture(U value)
//supplyAsync支持返回值異步任務,runAsync 不支持返回值的異步任務
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
//thenApply當一個線程依賴另外一個線程時,可使用 thenApply 方法來把這兩個線程串行化,進行轉換
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
//thenAccept:接收任務的處理結果,並處理,無返回結果,能夠註冊一個action,而後消費結果
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
//thenAcceptBoth:當兩個CompletionStage都執行完成後,把結果一塊交給thenAcceptBoth來進行消耗
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
//thenRun:跟 thenAccept 方法不同的是,不關心任務的處理結果。只要上面的任務執行完成,就開始執行 thenAccept
public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
//thenCompose 方法容許你對兩個 CompletionStage 進行流水線操做,第一個操做完成時,將其結果做爲參數傳遞給第二個操做。
public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)
//whenComplete:是執行當前任務的線程執行繼續執行 whenComplete 的任務,計算結果完成時的處理
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
//handle 也是結果處理
public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)
//thenCombine:把兩個 CompletionStage 的任務都執行完成後,把兩個任務的結果一塊交給 thenCombine 來處理。
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
//acceptEither,當任意一個CompletableFuture計算完成的時候就會執行
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)
//allOf是等待全部任務完成,構造後CompletableFuture完成
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
//anyOf是隻要有一個任務完成,構造後CompletableFuture就完成
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
簡單示例安全
簡單的例子來理解一下,CompletableFuture的使用。supplyAsync提交一個Supplier異步任務;而後使用thenAccept異步處理結果,無需阻塞等待,至關於只要異步任務完成,使用回調的方式實現了非阻塞異步調用;其次使用exceptionally進行異常處理,比起傳統的Future是否是處理起來更加方便快捷。微信
@Test
public void completableFutureTest1() throws ExecutionException, InterruptedException {
CompletableFuture completableFuture = CompletableFuture.supplyAsync(new Supplier() {
@Override
public String get() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
int a = 1/0;
return "supplyAsync";
}
}).thenAccept(result -> System.out.println(result))
.exceptionally((Function<Throwable, Void>) t -> {
System.out.println("error message:"+t.getMessage());
return null;
});
//阻塞式的獲取結果
//completableFuture.get();
System.out.println("xsxsx");
}
傳統的Future,若是捕獲異步任務異常,整個線程就會中斷,而且須要阻塞的等待結果,這裏CompletableFuture無需等待,程序繼續執行,當supplyAsync執行成功結果直接傳遞到thenAccept回調處理結果,CPU能夠繼續執行其餘的邏輯,而且能夠對異常狀況進行單獨處理。網絡
結合業務場景多線程
下面針對實際的項目來談一談completableFuture的使用,以前作過一個小需求,就是一個用戶管理詳情頁頁信息:
1.加載用戶的基礎信息,銀行卡信息,積分信息,餘額查詢,還有風控安全等級信息、徵信、還有修改變動記錄等;
2.因爲不少東西沿用vm模版渲染,沒有作js單獨加載,其實這裏異步加載是最好的;
3.因爲有的接口還相互依賴,上一個接口的查詢結果,要傳遞到下面去查新的數據;
處理方式
既然不想採用js異步加載,我就後端把全部的接口都查詢好,而且返回,發現同步的結果,這張頁面可能要十幾秒。後來採用Future+Callable異步處理,把結果放到一個採用阻塞的方式放到modelAndView中,時間確實縮短到了正常範圍.可是想想若是使用completableFuture是否方便快捷一些呢?後邊發現completableFuture只是幫咱們實現了非阻塞式的回調,和異常處理等,在開啓自定義線程池時,Future異步執行的效果和completableFuture差很少,只不過completableFuture功能更強大,方法庫更豐富。
測試
下面我模擬一下簡單的場景,userSourceService中定義了查詢基礎信息、帳戶信息、和徵信信息的服務接口。其中每一個接口中MOCK一點數據,而後再sleep1秒.模擬IO操做。假如咱們須要依賴獲取queryUserInfo中的結果,而後再去查詢後面兩個用戶信息,這樣就可使用異步的Future去實現。
interface UserSourceService {
/**
* 查詢用戶基礎信息
* @param userId
* @return
*/
UserBase queryUserInfo(String userId);
/**
* 查詢用戶帳戶信息
* @param userId
* @return
*/
UserAccount queryUserAccountInfo(String userId);
/**
* 查詢徵信信息
* @param userId
* @return
*/
UserCredit queryUserCreditInfo(String userId);
一、Future+stream
這裏使用串行的Stream 而後使用executorService.submit提交任務
static List<String> userIdList = Arrays.asList("000001", "000002", "000003", "000004");
static ExecutorService executorService = Executors.newCachedThreadPool();
private static UserSourceServiceImpl userSourceService = new UserSourceServiceImpl();
@Test
public void FutureTest() {
Instant start = Instant.now();
System.out.println(JSON.toJSONString(futureTest()));
Instant end = Instant.now();
System.out.println("耗時:"+ Duration.between( start,end ).toMillis());
}
public List<UserCredit> futureTest() {
List<Future<UserCredit>> userCreditFuture = userIdList.stream()
.map(userId -> executorService.submit(() -> userSourceService.queryUserInfo(userId)))
.map(future -> executorService.submit(() -> userSourceService.queryUserAccountInfo(future.get().getUserId())))
.map(future -> executorService.submit(() -> userSourceService.queryUserCreditInfo(future.get().getUserId())))
.collect(toList());
List<UserCredit> creditInfos = new ArrayList<>();
userCreditFuture.parallelStream().forEach(future -> {
try {
creditInfos.add(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
return creditInfos;
}
二、parallelStream
這裏使用並行的Stream ,而後去掉自定義的線程池。
@Test
public void findUserByParallelStreamTest() {
Instant start = Instant.now();
System.out.println(JSON.toJSONString(findUserByParallelStream()));
Instant end = Instant.now();
System.out.println("耗時:"+ Duration.between( start,end ).toMillis());
}
public static List<UserCredit> findUserByParallelStream() {
return userIdList.parallelStream()
.map(userId -> userSourceService.queryUserInfo(userId))
.map(user -> userSourceService.queryUserAccountInfo(user.getUserId()))
.map(userAccount -> userSourceService.queryUserCreditInfo(userAccount.getUserId()))
.collect(toList());
}
3.CompletableFuture
@Test
public void findUserByCompletableFutureTest() {
Instant start = Instant.now();
System.out.println(JSON.toJSONString(findUserByCompletableFuture4()));
Instant end = Instant.now();
System.out.println("耗時:"+ Duration.between( start,end ).toMillis());
}
public static List<UserCredit> findUserByCompletableFutureTest() {
List<CompletableFuture<UserCredit>> queryUserFuture = userIdList.stream()
.map(userId -> CompletableFuture.supplyAsync(() -> userSourceService.queryUserInfo(userId)))
.map(future -> future.thenApply(user -> userSourceService.queryUserAccountInfo(user.getUserId())))
.map(future -> future.thenCompose(userAccountInfo -> CompletableFuture.supplyAsync(() -> userSourceService.queryUserCreditInfo(userAccountInfo.getUserId()))))
.collect(toList());
//join 操做和get操做有相同的含義,等待全部異步操做的結果。
return queryUserFuture.stream()
.map(CompletableFuture::join)
.collect(toList());
}
第一組:userIdList中4個元素
1.futureTest耗時:3181毫秒
2.findUserByParallelStreamTest
耗時:3176毫秒3.findUserByCompletableFuture4Test,使用自定義executorService
耗時:3177毫秒
第二組:userIdList中8個元素
1.futureTest
耗時:3184毫秒
2.findUserByParallelStreamTest
耗時:3187毫秒
3.findUserByCompletableFuture4Test ,使用自定義executorService
耗時:3158毫秒
第三組:userIdList中16個元素
1.futureTest
耗時:3210毫秒
2.findUserByParallelStreamTest
耗時:6190毫秒
3.findUserByCompletableFuture4Test ,使用自定義executorService
耗時:3177毫秒
由上面的測試結果能夠看出,使用內置的線程池時,在併發數量比較少的狀況下ParallelStream和CompletableFuture 執行時間差很少,當數量遞增後,原生的Future+線程池和CompletableFuture 自定義線程池,性能更佳.發現ParallelStream內置的池線程數是根據CPU核數有關,因此通常只適合cpu密集型的計算模式,而自定義的線程池,能夠知足不一樣的線程數量需求,當IO密集型的操做比較多時,CUP的資源其實並無徹底利用,因此當使executorService的時候,IO型操做效果更佳。
傳統的Future痛點
1.經過使用Stream+傳統的Future的,雖然Future能夠知足咱們的異步處理,可是不支持上一個future依賴,而且轉換輸出,以及結果合併。
2.get是阻塞的,若是某一個異步任務中有沒有catch的運行時異常,這回致使get拋出異常退出運行,若是咱們再一個遍歷操做中,沒有顯示的處理,致使整個線程中斷.可能產生意想不到的風險,因此是否可以自動的幫咱們檢測異常,而且處理異常呢?
3.沒有一個比較好的回調機制,例如響應式的根據上一個異步任務結果觸發某個Action。
4.上面說的阻塞式結果傳遞,須要硬編碼保證,是否不須要這樣的顯示get,直接拿到結果,而且進行流式的計算,使得編碼更加流暢、優雅。
其實經過測試發現,在IO密集型的任務中使用線程池的方式效率是最好的, 性能測試比較Future和CompletableFuture,並無質的差別,只是CompletableFuture支持更好的函數式和響應式編程,以及非阻塞的處理結果,回調機制更加健全等。
上面說了一堆Future的缺點,其實這都不算啥致命的缺點,怎麼方便怎麼來就行,至少操做簡單,不少不肯接受新的工具,認爲有風險,有時候在沒有經驗時,保守一點,對項目也是一種負責。CompletableFuture也存在缺點, CompletableFuture並不能優雅地處理cancel(),由於他建立的時候能夠不和線程綁定在一塊兒,取消的時候,並不會去中斷線程,因此使用cancel的時候須要注意.可是傳統的Future並無這個問題。
總結
1.今天對於java8的Stream計算和併發工具CompletableFutures有了新的認識,項目中有相關的場景,咱們就能夠嘗試的運用;
2.計算密集型操做,而且沒有I/O,推薦使用Stream接口。由於實現簡單,同時效率也多是最高的(若是全部的線程都是計算密集型的,那就沒有必要建立比處理器核數更多的線程;
3.若是並行的工做單元還涉及等待I/O的操做(包括網絡鏈接等待),那麼使用CompletableFuture靈活性更好。這種狀況下處理流的流水線中若是發生I/O等待,流的延遲特性會讓咱們很難判斷到底何時觸發了等待;
4.針對不太瞭解的功能,咱們須要多作測試,不要盲目的追求新工具,若是是使用不當可能達不到效果,甚是有更可怕的事情發生。
參考
1.《Java CompletableFuture 詳解》
2.《CompletableFuture 不能被中斷》
歷史文章
1.JVM-GC調優必會的知識
2.Mysql事務隔離以及MVCC實現原理
3.一次性瞭解Mysql的幾種日誌
4.再談synchronized
本文分享自微信公衆號 - MyClass社區(MyClass_ZZ)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。