CompletableFuture 類聲明瞭 CompletionStage 接口,CompletionStage 接口實際上提供了同步或異步運行計算的舞臺。java
所謂異步調用其實就是實現一個可無需等待被調用函數的返回值而讓操做繼續運行的方法。在 Java 語言中,簡單的講就是另啓一個線程來完成調用中的部分計算,使調用繼續運行或返回,而不須要等待計算結果。但調用者仍須要獲取線程的計算結果。dom
CompletableFuture 提供了以下的異步方法,異步
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); } public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier); } public static CompletableFuture<Void> runAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable); } public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) { return asyncRunStage(screenExecutor(executor), runnable); }
supplyAsync 返回帶有任務結果的CompletableFuture,而runAsync返回CompletableFuture<Void>。async
Executor
參數能夠手動指定線程池,不然默認ForkJoinPool.commonPool()
系統級公共線程池。ide
注意:ForkJoinPool.commonPool()
是
Daemon Thread(守護線程) 函數只要當前JVM實例中尚存在任何一個非守護線程(用戶線程)沒有結束,守護線程就所有工做;this
只有當用戶線程結束時,JVM推出,守護線程隨着JVM一同結束工做。spa
@Test public void test() throws InterruptedException { CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> { System.out.println("runAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println("done=" + cf.isDone()); TimeUnit.SECONDS.sleep(4); System.out.println("done=" + cf.isDone()); }
輸出,線程
done=false runAsync=ForkJoinPool.commonPool-worker-1|true done=true
在這段代碼中,runAsync 是異步執行的 ,經過 Thread.currentThread().isDaemon() 打印的結果就能夠知道是Daemon線程異步執行的。code
CompletableFuture中不帶Async的同步方法以下,
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn); public CompletableFuture<Void> thenAccept(Consumer<? super T> action); public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action); public CompletableFuture<Void> thenRun(Runnable action);
這些方法都是同步執行的
@Test public void testSync11() { CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApply(s -> { randomSleep(); System.out.println("thenApply=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return s.toUpperCase(); }); // gotNow 若是成功就返回結果 System.out.println(cf.getNow(null)); // 一直等待成功,而後返回結果 System.out.println(cf.join()); }
輸出以下,
thenApply=main|false MESSAGE MESSAGE
首先經過 completedFuture 方法獲取一個結果已經完成的Future,而後執行同步方法thenApply,由main線程執行,會阻塞當前的main線程 ,最後getNow方法獲取到結果。
CompletableFuture中異步執行的方法都是帶Async 結尾的,能夠制定執行異步任務的線程池,也能夠不指定,若是不指定,默認使用ForkJoinPool.commonPool() 線程池。
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn); public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor); public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action); public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor); public CompletableFuture<Void> thenRunAsync(Runnable action); public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor);
如下使用的兩個方法都是異步執行任務的方法
@Test public void testAsync1() { CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> { randomSleep(); System.out.println("supplyAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return "message"; }).thenApplyAsync(s -> { randomSleep(); System.out.println("thenApplyAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return s.toUpperCase(); }); // gotNow 若是成功就返回結果 System.out.println(cf.getNow(null)); // 一直等待成功,而後返回結果 System.out.println(cf.join()); }
輸出以下,
null supplyAsync=ForkJoinPool.commonPool-worker-1|true thenApplyAsync=ForkJoinPool.commonPool-worker-1|true MESSAGE
當執行 cf.gotNow 方法的時候,異步任務尚未執行完成,因此返回 null 。執行 cf.join 方法,阻塞一直等到異步任務結果返回。
thenApply 不帶async結尾,是一個同步方法,但可能仍是由執行任務的線程池來執行,或者是當前main線程來執行。
@Test public void testAsync125() { CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> { //沒有sleep System.out.println("supplyAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return "message"; }).thenApply(s -> { // thenApplyAsync=main|false 使用調用者線程來進行處理 System.out.println("thenApply=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return s.toUpperCase(); }); // gotNow 若是成功就返回結果 System.out.println(cf.getNow(null)); // 一直等待成功,而後返回結果 System.out.println(cf.join()); } @Test public void testAsync126() throws InterruptedException { CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> { randomSleep(); System.out.println("supplyAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return "message"; }); TimeUnit.SECONDS.sleep(2); // 使用調用者線程 當前線程main 來進行處理thenApply 轉換操做 cf = cf.thenApply(s -> { System.out.println("thenApply=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return s.toUpperCase(); }); // gotNow 若是成功就返回結果 System.out.println(cf.getNow(null)); // 一直等待成功,而後返回結果 System.out.println(cf.join()); } @Test public void testAsync124() { CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> { randomSleep(); System.out.println("supplyAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return "message"; }).thenApply(s -> { System.out.println("thenApply=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return s.toUpperCase(); }); // gotNow 若是成功就返回結果 System.out.println(cf.getNow(null)); // 一直等待成功,而後返回結果 System.out.println(cf.join()); }
輸出以下,
supplyAsync=ForkJoinPool.commonPool-worker-1|true thenApplyAsync=main|false MESSAGE MESSAGE ////// supplyAsync=ForkJoinPool.commonPool-worker-1|true thenApply=main|false MESSAGE MESSAGE ////// null supplyAsync=ForkJoinPool.commonPool-worker-1|true thenApply=ForkJoinPool.commonPool-worker-1|true MESSAGE
在testAsync125方法中,thenApply 回調方法是由當前main線程執行的;
在testAsync126方法中,thenApply 回調方法是由當前main線程執行的;
在testAsync124方法中,thenApply 方法是由執行任務的線程池的線程來執行的,thenApply 雖然是一個同步方法,但其調用是經過 ForkJoinPool.commonPool 線程池異步執行的。
因此要注意的是 若是在thenApply 方法中執行比較耗時的操做,會阻塞調用者線程或者主線程。
When we need to execute multiple Futures in parallel, we usually want to wait for all of them to execute and then process their combined results.
The CompletableFuture.allOf static method allows to wait for completion of all of the Futures provided as a var-arg:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "Hello"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return "Beautiful"; }); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } return "World"; }); System.out.println("f1=" + future1.isDone()); System.out.println("f2=" + future2.isDone()); System.out.println("f3=" + future3.isDone()); CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2, future3); System.out.println("========"); System.out.println("f1=" + future1.isDone()); System.out.println("f2=" + future2.isDone()); System.out.println("f3=" + future3.isDone()); // 等待全部的future 執行完成 combinedFuture.join(); System.out.println("========"); System.out.println("f1=" + future1.isDone()); System.out.println("f2=" + future2.isDone()); System.out.println("f3=" + future3.isDone());
f1=false f2=false f3=false ======== f1=false f2=false f3=false ======== f1=true f2=true f3=true
經過 combinedFuture.join() 方法等待全部的異步任務執行完成。當其全部的CompletableFuture均完成結果時,combinedFuture就會處於完成狀態
Notice that the return type of the CompletableFuture.allOf() is a CompletableFuture<Void>. The limitation of this method is that it does not return the combined results of all Futures. Instead you have to manually get results from Futures. Fortunately, CompletableFuture.join() method and Java 8 Streams API makes it simple:
String combined = Stream.of(future1, future2, future3) .map(CompletableFuture::join) .collect(Collectors.joining(" ")); System.out.println(combined);
更簡化後完整連貫的代碼,
@Test public void testAllOf2() { CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Beautiful"); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "World"); CompletableFuture.allOf(future1, future2, future3) .thenApply((v) -> Stream.of(future1, future2, future3) .map(CompletableFuture::join) .collect(Collectors.joining(" "))) .thenAccept(System.out::println); }
========END========