相關閱讀:java
Java併發編程(一)知識地圖
Java併發編程(二)原子性
Java併發編程(三)可見性
Java併發編程(四)有序性
Java併發編程(五)建立線程方式概覽
Java併發編程入門(六)synchronized用法
Java併發編程入門(七)輕鬆理解wait和notify以及使用場景
Java併發編程入門(八)線程生命週期
Java併發編程入門(九)死鎖和死鎖定位
Java併發編程入門(十)鎖優化
Java併發編程入門(十一)限流場景和Spring限流器實現
Java併發編程入門(十二)生產者和消費者模式-代碼模板
Java併發編程入門(十三)讀寫鎖和緩存模板
Java併發編程入門(十四)CountDownLatch應用場景
Java併發編程入門(十五)CyclicBarrier應用場景
Java併發編程入門(十六)秒懂線程池差異
Java併發編程入門(十七)一圖掌握線程經常使用類和接口
Java併發編程入門(十八)再論線程安全
Java併發編程入門(二十)常見加鎖場景和加鎖工具編程
CompleteFeature是對Feature的加強,Feature只能處理簡單的異步任務,而CompleteFeature能夠將多個異步任務進行復雜的組合,支持串行執行,並行執行,And匯聚,Or匯聚,從而能對複雜的關聯任務進行調度。緩存
串行任務指任務B要等待任務A執行完成以後纔會執行,串行任務有以下屬性:安全
屬性 | 描述 |
---|---|
可獲取A的結果 | 任務B可獲取任務A的執行結果做爲參數使用 |
B有返回值 | 若是任務B有返回值,能夠將執行結果經過返回值返回 |
可獲取A異常 | 任務B能夠獲取任務A拋出的異常 |
A異常則終止 | 當任務A拋出異常後,程序是否會終止,若會終止,程序將退出,任務B不會執行,不然程序不會退出,繼續執行。 |
CompleteFeature支持的串行任務方法以下:markdown
方法 | 可獲取A的結果 | B有返回值 | 可獲取A異常 | A異常則終止 |
---|---|---|---|---|
thenRun | 否 | 否 | 否 | 是 |
thenApply | 是 | 是 | 否 | 是 |
thenAccept | 是 | 否 | 否 | 是 |
thenCompose | 是 | 是 | 否 | 是 |
whenComplete | 是 | 否 | 是 | 否 |
exceptionally | 否 | 是 | 是 | 否 |
handle | 是 | 是 | 是 | 否 |
總結:併發
- 任務不會拋出異常就使用前四個方法,不然使用後三個方法。
- exceptionally至關於try {} catch {}的catch部分,whenComplete和handle至關於try {} catch {} finally {} 的catch和finall部分,區別是一個有返回值,一個沒有返回值。
- thenApply和thenCompose的區別是,thenCompose在任務B中返回的是CompletableFuture,可參考後面的例子對比差別。
And匯聚關係是指:任務C要等待任務A或任務B都執行完後才執行。CompleteFeature支持此關係的方法以下:app
方法 | C接收A或B返回值做爲參數 | C有返回值 |
---|---|---|
thenCombine | 是 | 是 |
thenAcceptBoth | 是 | 否 |
runAfterBoth | 否 | 否 |
Or匯聚關係是指:任務C等待任務A或任務B其中一個執行完後就執行,即C只需等待最早執行完成的任務後就可執行。CompleteFeature支持此關係的方法以下:異步
方法 | C接收A或B返回值做爲參數 | C有返回值 |
---|---|---|
applyToEither | 是 | 是 |
acceptEither | 是 | 否 |
runAfterEither | 否 | 否 |
CompletableFuture提供了兩個多任務的方法:工具
方法 | 描述 |
---|---|
anyOf | 多個任務中的任意一個任務執行完則結束,能夠獲取到最早執行完的任務的返回值。 |
allOf | 多個任務都執行完後才結束,不能獲取到任何一個任務的返回值 |
以上全部方法的返回值都是CompletableFuture,這樣就能夠繼續調用前面描述的方法來進行任務組合,組合出更加複雜的任務處理流程。oop
以上方法中的最後一個任務都是和前面的任務在一個線程內執行,CompletableFuture中還有一套方法讓最後一個任務在新線程中執行,只要在原方法上加上Async後綴則可,例如:
同步 | 異步 |
---|---|
thenApply | thenApplyAsync |
thenAccept | thenAcceptAsync |
thenRun | thenRunAsync |
thenCompose | thenComposeAsync |
具體還有哪些,可參考源碼。
import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class CompleteFeatureDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { simpleTask(); serialTask(); andTask(); orTask(); complexTask(); sleep(2000); // 等待子線程結束 System.out.println("end."); } private static void simpleTask() throws ExecutionException, InterruptedException { // 1. runAsync 執行一個異步任務,沒有返回值 CompletableFuture.runAsync(()-> System.out.println("1. runAsync")); sleep(100); // 2. supplyAsync 執行一個異步任務,有返回值 CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{ System.out.println("2.1 supplyAsync task be called"); sleep(100); return "2.2 supplyAsync return value"; }); System.out.println("2.3 after supplyAsync"); System.out.println(future.get()); sleep(200); } private static void serialTask() throws ExecutionException, InterruptedException { // 3. thenRun CompletableFuture.supplyAsync(()->{ System.out.println("3.1 supplyAsync begin"); sleep(100); // 用於證實B等待A結束纔會執行 return "3.2 supplyAsync end"; }).thenRun(()->{ System.out.println("3.3 thenRun be called."); }); sleep(200); // 4. thenApply CompletableFuture<String> future4 = CompletableFuture.supplyAsync(()->{ sleep(100); return "4.1 apple"; }).thenApply(returnVal->{ return "4.2 " + returnVal + "-蘋果"; }); System.out.println("4.3 get: " + future4.get()); sleep(100); // 5. thenAccept CompletableFuture.supplyAsync(()->{ sleep(100); return "5.1 orange"; }).thenAccept(returnVal->{ System.out.println("5.2 " + returnVal + "-桔子"); }); sleep(100); // 6. thenCompose CompletableFuture<String> future6 = CompletableFuture.supplyAsync(()->{ sleep(100); return "6.1 apple"; }).thenCompose((returnVal)->{ return CompletableFuture.supplyAsync(()->{ return "6.2 " + returnVal; }); }); System.out.println("6.3 get: " + future6.get()); sleep(100); // 7. whenComplete CompletableFuture.supplyAsync(()->{ sleep(100); if (true) { //修改boolean值觀察不一樣結果 return "7.1 return value for whenComplete"; } else { throw new RuntimeException("7.2 throw exception for whenComplete"); } }).whenComplete((returnVal, throwable)->{ System.out.println("7.2 returnVal: " + returnVal); // 能夠直接拿到返回值,不須要經過future.get()獲得 System.out.println("7.3 throwable: " + throwable); // 異步任務拋出異常,並不會由於異常終止,而是會走到這裏,後面的代碼還會繼續執行 }); sleep(100); // 8. exceptionally CompletableFuture<String> future8 = CompletableFuture.supplyAsync(()->{ sleep(100); if (false) { //修改boolean值觀察不一樣結果 return "8.1 return value for exceptionally"; } else { throw new RuntimeException("8.2 throw exception for exceptionally"); } }).exceptionally(throwable -> { throwable.printStackTrace(); return "8.3 return value after dealing exception."; }); System.out.println("8.4 get: " + future8.get()); sleep(100); // 9. handle CompletableFuture<String> future9 = CompletableFuture.supplyAsync(()->{ sleep(100); if (false) { //修改boolean值觀察不一樣結果 return "9.1 return value for handle"; } else { throw new RuntimeException("9.2 throw exception for handle"); } }).handle((retuanVal, throwable)->{ System.out.println("9.3 retuanVal: " + retuanVal); System.out.println("9.4 throwable: " + throwable); return "9.5 new return value."; }); System.out.println("9.6 get: " + future9.get()); sleep(100); } private static void andTask() throws ExecutionException, InterruptedException { // 10. thenCombine 合併結果 CompletableFuture<String> future10 = CompletableFuture.supplyAsync(()->{ sleep(100); return "10.1 TaskA return value"; }).thenCombine(CompletableFuture.supplyAsync(()->{ sleep(100); return "10.2 TaskB return value"; }), (taskAReturnVal, taskBReturnVal) -> taskAReturnVal + taskBReturnVal); System.out.println("10.3 get: " + future10.get()); sleep(200); // 11. thenAcceptBoth CompletableFuture.supplyAsync(()->{ sleep(100); return "11.1 TaskA return value"; }).thenAcceptBoth(CompletableFuture.supplyAsync(()->{ sleep(100); return "11.2 TaskB return value"; }), (taskAReturnVal, taskBReturnVal) -> System.out.println(taskAReturnVal + taskBReturnVal)); sleep(200); // 12. runAfterBoth A,B都執行完後才執行C,C不關心前面任務的返回值 CompletableFuture.supplyAsync(()->{ sleep(200); // 雖然這個任務先執行,可是執行時間比下面的任務長,因此最後會使用下面的返回結果 System.out.println("12.1 TaskA be called."); return "12.2 TaskA return value"; }).runAfterBoth(CompletableFuture.supplyAsync(()->{ sleep(100); System.out.println("12.3 TaskB be called."); return "12.4 TaskB return value"; }), () -> System.out.println("12.5 TaskC be called.")); sleep(300); } private static void orTask() throws ExecutionException, InterruptedException { // 13. applyToEither 使用A,B兩個異步任務優先返回的結果 CompletableFuture<String> future13 = CompletableFuture.supplyAsync(()->{ sleep(200); // 雖然這個任務先執行,可是執行時間比下面的任務長,因此最後會使用下面的返回結果 System.out.println("13.1 TaskA be called"); // 用於證實拿到B的結果後,A還會繼續執行,並不會終止 return "13.2 TaskA return value"; }).applyToEither(CompletableFuture.supplyAsync(()->{ sleep(100); return "13.3 TaskB return value"; }), (returnVal) -> returnVal); System.out.println("13.4 get: " + future13.get()); sleep(300); // 14. acceptEither 使用A,B兩個異步任務優先返回的結果 CompletableFuture.supplyAsync(()->{ sleep(200); // 雖然這個任務先執行,可是執行時間比下面的任務長,因此最後會使用下面的返回結果 return "14.1 TaskA return value"; }).acceptEither(CompletableFuture.supplyAsync(()->{ sleep(100); return "14.2 TaskB return value"; }), (returnVal) -> System.out.println(returnVal)); sleep(300); // 15. runAfterEither A,B任意一個執行完後就執行C,C不關心前面任務的返回值 CompletableFuture.supplyAsync(()->{ sleep(200); // 雖然這個任務先執行,可是執行時間比下面的任務長,因此最後會使用下面的返回結果 System.out.println("15.1 TaskA be called."); return "15.2 TaskA return value"; }).runAfterEither(CompletableFuture.supplyAsync(()->{ sleep(100); System.out.println("15.3 TaskB be called."); return "15.4 TaskB return value"; }), () -> System.out.println("15.5 TaskC be called.")); sleep(300); } private static void complexTask() throws ExecutionException, InterruptedException { // 16. anyOf CompletableFuture future16 = CompletableFuture.anyOf(CompletableFuture.supplyAsync(()-> { sleep(300); System.out.println("16.1 TaskA be called."); return "16.2 TaskA return value."; }), CompletableFuture.supplyAsync(()->{ sleep(100); System.out.println("16.3 TaskB be called."); return "16.4 TaskB return value."; })); System.out.println("16.5 get: " + future16.get()); sleep(400); // 17. allOf CompletableFuture<Void> future17 = CompletableFuture.allOf(CompletableFuture.supplyAsync(()-> { sleep(300); System.out.println("17.1 TaskA be called."); return "17.2 TaskA return value."; }), CompletableFuture.supplyAsync(()->{ sleep(100); System.out.println("17.3 TaskB be called."); return "17.4 TaskB return value."; })); System.out.println("17.5 get: " + future17.get()); // allOf沒有返回值 } private static void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } } 複製代碼
輸出日誌:
1. runAsync 2.3 after supplyAsync 2.1 supplyAsync task be called 2.2 supplyAsync return value 3.1 supplyAsync begin 3.4 3.5 xxx 3.6 AAA 3.3 thenRun be called. 4.3 get: 4.2 4.1 apple-蘋果 5.2 5.1 orange-桔子 6.3 get: 6.2 6.1 apple 7.2 returnVal: 7.1 return value for whenComplete 7.3 throwable: null java.util.concurrent.CompletionException: java.lang.RuntimeException: 8.2 throw exception for exceptionally at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1584) at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1574) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1689) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Caused by: java.lang.RuntimeException: 8.2 throw exception for exceptionally at com.javageektour.hikaricp.demo.CompleteFeatureDemo.lambda$serialTask$14(CompleteFeatureDemo.java:101) at com.javageektour.hikaricp.demo.CompleteFeatureDemo?Lambda$14/769287236.get(Unknown Source) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1582) ... 5 more 8.4 get: 8.3 return value after dealing exception. 9.3 retuanVal: null 9.4 throwable: java.util.concurrent.CompletionException: java.lang.RuntimeException: 9.2 throw exception for handle 9.6 get: 9.5 new return value. 10.3 get: 10.1 TaskA return value10.2 TaskB return value 11.1 TaskA return value11.2 TaskB return value 12.3 TaskB be called. 12.1 TaskA be called. 12.5 TaskC be called. 13.4 get: 13.3 TaskB return value 13.1 TaskA be called 14.2 TaskB return value 15.3 TaskB be called. 15.5 TaskC be called. 15.1 TaskA be called. 16.3 TaskB be called. 16.5 get: 16.4 TaskB return value. 16.1 TaskA be called. 17.3 TaskB be called. 17.1 TaskA be called. 17.5 get: null end. 複製代碼
CompleteFeature支持複雜的異步任務調度,支持多個任務串行,並行,匯聚,當多個異步任務有依賴關係時,經過CompleteFeature來調度任務能夠大大簡化代碼和提升執行性能。
end.