Java併發編程入門(十九)異步任務調度工具CompleteFeature

Java極客  |  做者  /  鏗然一葉 這是 Java極客的第 60 篇原創文章

相關閱讀:java

Java併發編程(一)知識地圖
Java併發編程(二)原子性
Java併發編程(三)可見性
Java併發編程(四)有序性
Java併發編程(五)建立線程方式概覽
Java併發編程入門(六)synchronized用法
Java併發編程入門(七)輕鬆理解wait和notify以及使用場景
Java併發編程入門(八)線程生命週期
Java併發編程入門(九)死鎖和死鎖定位
Java併發編程入門(十)鎖優化
Java併發編程入門(十一)限流場景和Spring限流器實現
Java併發編程入門(十二)生產者和消費者模式-代碼模板
Java併發編程入門(十三)讀寫鎖和緩存模板
Java併發編程入門(十四)CountDownLatch應用場景
Java併發編程入門(十五)CyclicBarrier應用場景
Java併發編程入門(十六)秒懂線程池差異
Java併發編程入門(十七)一圖掌握線程經常使用類和接口
Java併發編程入門(十八)再論線程安全
Java併發編程入門(二十)常見加鎖場景和加鎖工具編程


1. CompleteFeature簡介

CompleteFeature是對Feature的加強,Feature只能處理簡單的異步任務,而CompleteFeature能夠將多個異步任務進行復雜的組合,支持串行執行,並行執行,And匯聚,Or匯聚,從而能對複雜的關聯任務進行調度。緩存

2. CompleteFeature支持的業務場景

2.1. 串行任務

串行任務指任務B要等待任務A執行完成以後纔會執行,串行任務有以下屬性:安全

屬性 描述
可獲取A的結果 任務B可獲取任務A的執行結果做爲參數使用
B有返回值 若是任務B有返回值,能夠將執行結果經過返回值返回
可獲取A異常 任務B能夠獲取任務A拋出的異常
A異常則終止 當任務A拋出異常後,程序是否會終止,若會終止,程序將退出,任務B不會執行,不然程序不會退出,繼續執行。

CompleteFeature支持的串行任務方法以下:markdown

方法 可獲取A的結果 B有返回值 可獲取A異常 A異常則終止
thenRun
thenApply
thenAccept
thenCompose
whenComplete
exceptionally
handle

總結:併發

  1. 任務不會拋出異常就使用前四個方法,不然使用後三個方法。
  2. exceptionally至關於try {} catch {}的catch部分,whenComplete和handle至關於try {} catch {} finally {} 的catch和finall部分,區別是一個有返回值,一個沒有返回值。
  3. thenApply和thenCompose的區別是,thenCompose在任務B中返回的是CompletableFuture,可參考後面的例子對比差別。

1.2. And匯聚關係

And匯聚關係是指:任務C要等待任務A或任務B都執行完後才執行。CompleteFeature支持此關係的方法以下:app

方法 C接收A或B返回值做爲參數 C有返回值
thenCombine
thenAcceptBoth
runAfterBoth

1.3. Or匯聚關係

Or匯聚關係是指:任務C等待任務A或任務B其中一個執行完後就執行,即C只需等待最早執行完成的任務後就可執行。CompleteFeature支持此關係的方法以下:異步

方法 C接收A或B返回值做爲參數 C有返回值
applyToEither
acceptEither
runAfterEither

1.4. 多任務

CompletableFuture提供了兩個多任務的方法:工具

方法 描述
anyOf 多個任務中的任意一個任務執行完則結束,能夠獲取到最早執行完的任務的返回值。
allOf 多個任務都執行完後才結束,不能獲取到任何一個任務的返回值

以上全部方法的返回值都是CompletableFuture,這樣就能夠繼續調用前面描述的方法來進行任務組合,組合出更加複雜的任務處理流程。oop

1.5. 方法族

以上方法中的最後一個任務都是和前面的任務在一個線程內執行,CompletableFuture中還有一套方法讓最後一個任務在新線程中執行,只要在原方法上加上Async後綴則可,例如:

同步 異步
thenApply thenApplyAsync
thenAccept thenAcceptAsync
thenRun thenRunAsync
thenCompose thenComposeAsync

具體還有哪些,可參考源碼。

2. 代碼例子

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompleteFeatureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        simpleTask();
        serialTask();
        andTask();
        orTask();
        complexTask();

        sleep(2000); // 等待子線程結束
        System.out.println("end.");

    }

    private static void simpleTask() throws ExecutionException, InterruptedException {
        // 1. runAsync 執行一個異步任務,沒有返回值
        CompletableFuture.runAsync(()-> System.out.println("1. runAsync"));
        sleep(100);

        // 2. supplyAsync 執行一個異步任務,有返回值
        CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
            System.out.println("2.1 supplyAsync task be called");
            sleep(100);
            return "2.2 supplyAsync return value";
        });
        System.out.println("2.3 after supplyAsync");
        System.out.println(future.get());
        sleep(200);
    }

    private static void serialTask() throws ExecutionException, InterruptedException {
        // 3. thenRun
        CompletableFuture.supplyAsync(()->{
            System.out.println("3.1 supplyAsync begin");
            sleep(100);  // 用於證實B等待A結束纔會執行
            return "3.2 supplyAsync end";
        }).thenRun(()->{
            System.out.println("3.3 thenRun be called.");
        });
        sleep(200);

        // 4. thenApply
        CompletableFuture<String> future4 = CompletableFuture.supplyAsync(()->{
            sleep(100);
            return "4.1 apple";
        }).thenApply(returnVal->{
            return "4.2 " + returnVal + "-蘋果";
        });
        System.out.println("4.3 get: " + future4.get());
        sleep(100);

        // 5. thenAccept
        CompletableFuture.supplyAsync(()->{
            sleep(100);
            return "5.1 orange";
        }).thenAccept(returnVal->{
            System.out.println("5.2 " + returnVal + "-桔子");
        });
        sleep(100);

        // 6. thenCompose
        CompletableFuture<String> future6 = CompletableFuture.supplyAsync(()->{
            sleep(100);
            return "6.1 apple";
        }).thenCompose((returnVal)->{
            return CompletableFuture.supplyAsync(()->{
                return "6.2 " + returnVal;
            });
        });
        System.out.println("6.3 get: " + future6.get());
        sleep(100);

        // 7. whenComplete
        CompletableFuture.supplyAsync(()->{
            sleep(100);
            if (true) {  //修改boolean值觀察不一樣結果
                return "7.1 return value for whenComplete";
            } else {
                throw new RuntimeException("7.2 throw exception for whenComplete");
            }
        }).whenComplete((returnVal, throwable)->{
            System.out.println("7.2 returnVal: " + returnVal);  // 能夠直接拿到返回值,不須要經過future.get()獲得
            System.out.println("7.3 throwable: " + throwable);  // 異步任務拋出異常,並不會由於異常終止,而是會走到這裏,後面的代碼還會繼續執行
        });
        sleep(100);

        // 8. exceptionally
        CompletableFuture<String> future8 = CompletableFuture.supplyAsync(()->{
            sleep(100);
            if (false) {  //修改boolean值觀察不一樣結果
                return "8.1 return value for exceptionally";
            } else {
                throw new RuntimeException("8.2 throw exception for exceptionally");
            }
        }).exceptionally(throwable -> {
            throwable.printStackTrace();
            return "8.3 return value after dealing exception.";
        });
        System.out.println("8.4 get: " + future8.get());
        sleep(100);

        // 9. handle
        CompletableFuture<String> future9 = CompletableFuture.supplyAsync(()->{
            sleep(100);
            if (false) {  //修改boolean值觀察不一樣結果
                return "9.1 return value for handle";
            } else {
                throw new RuntimeException("9.2 throw exception for handle");
            }
        }).handle((retuanVal, throwable)->{
            System.out.println("9.3 retuanVal: " + retuanVal);
            System.out.println("9.4 throwable: " + throwable);
            return "9.5 new return value.";
        });
        System.out.println("9.6 get: " + future9.get());
        sleep(100);
    }

    private static void andTask() throws ExecutionException, InterruptedException {
        // 10. thenCombine 合併結果
        CompletableFuture<String> future10 = CompletableFuture.supplyAsync(()->{
            sleep(100);
            return "10.1 TaskA return value";
        }).thenCombine(CompletableFuture.supplyAsync(()->{
            sleep(100);
            return "10.2 TaskB return value";
        }), (taskAReturnVal, taskBReturnVal) -> taskAReturnVal + taskBReturnVal);
        System.out.println("10.3 get: " + future10.get());
        sleep(200);

        // 11. thenAcceptBoth
        CompletableFuture.supplyAsync(()->{
            sleep(100);
            return "11.1 TaskA return value";
        }).thenAcceptBoth(CompletableFuture.supplyAsync(()->{
            sleep(100);
            return "11.2 TaskB return value";
        }), (taskAReturnVal, taskBReturnVal) -> System.out.println(taskAReturnVal + taskBReturnVal));
        sleep(200);

        // 12. runAfterBoth A,B都執行完後才執行C,C不關心前面任務的返回值
        CompletableFuture.supplyAsync(()->{
            sleep(200);  // 雖然這個任務先執行,可是執行時間比下面的任務長,因此最後會使用下面的返回結果
            System.out.println("12.1 TaskA be called.");
            return "12.2 TaskA return value";
        }).runAfterBoth(CompletableFuture.supplyAsync(()->{
            sleep(100);
            System.out.println("12.3 TaskB be called.");
            return "12.4 TaskB return value";
        }), () -> System.out.println("12.5 TaskC be called."));
        sleep(300);
    }

    private static void orTask() throws ExecutionException, InterruptedException {
        // 13. applyToEither 使用A,B兩個異步任務優先返回的結果
        CompletableFuture<String> future13 = CompletableFuture.supplyAsync(()->{
            sleep(200);  // 雖然這個任務先執行,可是執行時間比下面的任務長,因此最後會使用下面的返回結果
            System.out.println("13.1 TaskA be called"); // 用於證實拿到B的結果後,A還會繼續執行,並不會終止
            return "13.2 TaskA return value";
        }).applyToEither(CompletableFuture.supplyAsync(()->{
            sleep(100);
            return "13.3 TaskB return value";
        }), (returnVal) -> returnVal);
        System.out.println("13.4 get: " + future13.get());
        sleep(300);

        // 14. acceptEither 使用A,B兩個異步任務優先返回的結果
        CompletableFuture.supplyAsync(()->{
            sleep(200);  // 雖然這個任務先執行,可是執行時間比下面的任務長,因此最後會使用下面的返回結果
            return "14.1 TaskA return value";
        }).acceptEither(CompletableFuture.supplyAsync(()->{
            sleep(100);
            return "14.2 TaskB return value";
        }), (returnVal) -> System.out.println(returnVal));
        sleep(300);

        // 15. runAfterEither A,B任意一個執行完後就執行C,C不關心前面任務的返回值
        CompletableFuture.supplyAsync(()->{
            sleep(200);  // 雖然這個任務先執行,可是執行時間比下面的任務長,因此最後會使用下面的返回結果
            System.out.println("15.1 TaskA be called.");
            return "15.2 TaskA return value";
        }).runAfterEither(CompletableFuture.supplyAsync(()->{
            sleep(100);
            System.out.println("15.3 TaskB be called.");
            return "15.4 TaskB return value";
        }), () -> System.out.println("15.5 TaskC be called."));
        sleep(300);
    }

    private static void complexTask() throws ExecutionException, InterruptedException {
        // 16. anyOf
        CompletableFuture future16 = CompletableFuture.anyOf(CompletableFuture.supplyAsync(()->
        {
            sleep(300);
            System.out.println("16.1 TaskA be called.");
            return "16.2 TaskA return value.";
        }), CompletableFuture.supplyAsync(()->{
            sleep(100);
            System.out.println("16.3 TaskB be called.");
            return "16.4 TaskB return value.";
        }));
        System.out.println("16.5 get: " + future16.get());
        sleep(400);

        // 17. allOf
        CompletableFuture<Void> future17 = CompletableFuture.allOf(CompletableFuture.supplyAsync(()->
        {
            sleep(300);
            System.out.println("17.1 TaskA be called.");
            return "17.2 TaskA return value.";
        }), CompletableFuture.supplyAsync(()->{
            sleep(100);
            System.out.println("17.3 TaskB be called.");
            return "17.4 TaskB return value.";
        }));
        System.out.println("17.5 get: " + future17.get()); // allOf沒有返回值
    }

    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
    }
}
複製代碼

輸出日誌:

1. runAsync
2.3 after supplyAsync
2.1 supplyAsync task be called
2.2 supplyAsync return value
3.1 supplyAsync begin
3.4
3.5 xxx
3.6 AAA
3.3 thenRun be called.
4.3 get: 4.2 4.1 apple-蘋果
5.2 5.1 orange-桔子
6.3 get: 6.2 6.1 apple
7.2 returnVal: 7.1 return value for whenComplete
7.3 throwable: null
java.util.concurrent.CompletionException: java.lang.RuntimeException: 8.2 throw exception for exceptionally
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1584)
	at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1574)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1689)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.RuntimeException: 8.2 throw exception for exceptionally
	at com.javageektour.hikaricp.demo.CompleteFeatureDemo.lambda$serialTask$14(CompleteFeatureDemo.java:101)
	at com.javageektour.hikaricp.demo.CompleteFeatureDemo?Lambda$14/769287236.get(Unknown Source)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1582)
	... 5 more
8.4 get: 8.3 return value after dealing exception.
9.3 retuanVal: null
9.4 throwable: java.util.concurrent.CompletionException: java.lang.RuntimeException: 9.2 throw exception for handle
9.6 get: 9.5 new return value.
10.3 get: 10.1 TaskA return value10.2 TaskB return value
11.1 TaskA return value11.2 TaskB return value
12.3 TaskB be called.
12.1 TaskA be called.
12.5 TaskC be called.
13.4 get: 13.3 TaskB return value
13.1 TaskA be called
14.2 TaskB return value
15.3 TaskB be called.
15.5 TaskC be called.
15.1 TaskA be called.
16.3 TaskB be called.
16.5 get: 16.4 TaskB return value.
16.1 TaskA be called.
17.3 TaskB be called.
17.1 TaskA be called.
17.5 get: null
end.
複製代碼

3. 總結

CompleteFeature支持複雜的異步任務調度,支持多個任務串行,並行,匯聚,當多個異步任務有依賴關係時,經過CompleteFeature來調度任務能夠大大簡化代碼和提升執行性能。

end.


<--閱過留痕,左邊點贊!

相關文章
相關標籤/搜索