Java8系列 (七) CompletableFuture異步編程

概述

Java8以前用 Future 處理異步請求, 當你須要獲取任務結果時, 一般的作法是調用  get(long timeout, TimeUnit unit) 此方法會阻塞當前的線程, 若是任務處理超時, 就會拋出一個  TimeoutException html

    @Test
    public void test1() throws InterruptedException, ExecutionException, TimeoutException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Future<String> f = executorService.submit(() -> "ceshishanghu");
        String s = f.get(3, TimeUnit.SECONDS);
        System.out.println(s);
    }

在Java8中引入了 CompletableFuture, 使用它提供的API能夠不用像以前那樣阻塞式或輪詢的獲取某個異步任務的結果, CompletableFuture 會在異步任務處理完成後自動進行回調, 讓你能夠鏈式的組合多個異步任務。java

CompletableFuture 類中提供了許多以 Async 後綴結尾的方法。一般而言,名稱中不帶 Async 的方法和它的前一個任務同樣,在同一個線程中運行。而名稱以 Async 結尾的方法會將後續的任務提交到一個線程池,因此每一個任務是由不一樣的線程處理的。git

靜態工廠方法

  • supplyAsync(): 異步處理任務, 有返回值
  • runAsync(): 異步處理任務, 沒有返回值
  • allOf(): 須要等待全部的異步任務都執行完畢,纔會返回一個新的CompletableFuture
  • anyOf(): 任意一個異步任務執行完畢,就會返回一個新的CompletableFuture
  • completedFuture(): 這種方式獲取的 CompletableFuture 不是異步的,它會等待獲取明確的返回結果以後再返回一個已經完成的 CompletableFuture
    @Test
    public void test2() {
        //建立一個已經有任務結果的CompletableFuture
        CompletableFuture<String> f1 = CompletableFuture.completedFuture("return value");
        //異步處理任務,有返回值
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(this::get);
        //異步處理任務,沒有返回值
        CompletableFuture<Void> f3 = CompletableFuture.runAsync(System.out::println);
        //須要等待全部的異步任務都執行完畢,纔會返回一個新的CompletableFuture
//        CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
        //任意一個異步任務執行完畢,就會返回一個新的CompletableFuture
        CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2, f3);
        Object result = any.join();
        System.out.println("result = " + result);//result = return value
    }

    public String get() {
        delay();
        return "異步任務結果";
    }

    public void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

上面的示例中, allOf() 由於要等待全部的異步任務執行完成,因此要延時1秒鐘纔會返回一個新的 CompletableFuture, 而 anyOf() 則不須要等待全部的異步任務, 由於第一個異步最早完成, 因此控制檯輸出  result = return value 。github

鏈式調用

A任務執行完畢, 繼續執行B任務, B任務執行完畢, 繼續執行C任務...web

    @Test
    public void test2() {
        CompletableFuture<Void> f = CompletableFuture.supplyAsync(() -> {
            //測試拋異常後,handle()方法接受並處理
       //int x = 1 / 0;
            return "這是一個栗子";
        }).handle((res, ex) -> {
            System.out.println("handle res = " + res);
            if (Objects.nonNull(ex)) {
                System.out.println("handle ex" + ex.getCause().getMessage());
            }
            return Objects.nonNull(ex) ? 0 : 1;
        }).thenApply(res -> {
            System.out.println("thenApply res = " + res);
            return res == 1 ? "success" : "error";
        }).thenAccept(res -> System.out.println("thenAccept res = " + res)
        ).thenRun(() -> System.out.println("沒有參數, 異步執行一個沒有返回值的任務"));
        f.join();
    }

輸出結果:spring

handle res = 這是一個栗子
thenApply res = 1
thenAccept res = success
沒有參數, 異步執行一個沒有返回值的任務

將上面   int x = 1 / 0; 這行代碼取消註釋, 從新運行結果以下:mvc

handle res = null
handle ex/ by zero
thenApply res = 0
thenAccept res = error
沒有參數, 異步執行一個沒有返回值的任務

能夠看到, handle() 方法接受前一個 CompletableFuture  的返回結果或拋出的異常做爲方法入參, 通過處理後再返回一個新的結果。app

級聯組合

  • thenCompose(): 對兩個異步操做進行組合,第一個操做完成時,將其結果做爲參數傳遞給第二個操做, 第二個操做會返回一個新的CompletableFuture。
  • thenCombine(): 將兩個徹底無關聯的異步請求的結果整合起來, 計算出一個新的值並返回
    @Test
    public void test3() {
        CompletableFuture<String> f = CompletableFuture.completedFuture("CompletableFuture 1");
        CompletableFuture<String> f1 = f.thenCompose(res -> {
            System.out.println("thenCompose res = " + res);
            return CompletableFuture.supplyAsync(() -> "CompletableFuture 2");
        });
        System.out.println(f1.join());
        CompletableFuture<Integer> f3 = CompletableFuture.completedFuture(998);
        CompletableFuture<String> f4 = f.thenCombine(f3, (str, num) -> {
            System.out.println("str = " + str + ", num= " + num);
            return str + num;
        });
        System.out.println(f4.join());
    }

輸出結果:異步

thenCompose res = CompletableFuture 1
CompletableFuture 2
str = CompletableFuture 1, num= 998
CompletableFuture 1998

whenComplete

當前一個 CompletableFuture  計算完成或拋出異常時, 能夠使用 whenComplete() 執行指定的任務。async

    @Test
    public void test4() {
        CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> {
            //測試拋異常後,whenComplete()方法接受並處理
            int x = 1 / 0;
            return "這是一個栗子";
        }).whenComplete((res, ex) -> {
            System.out.println("whenComplete res = " + res);
            if (Objects.nonNull(ex)) {
                System.out.println("whenComplete ex" + ex.getCause().getMessage());
            }
        });
        System.out.println("f.join() = " + f.join());
    }

輸出結果以下,其中 res 對應前一個 CompletableFuture 的返回結果,ex 對應前一個 CompletableFuture 拋出的異常(若是發生異常)。

從控制檯輸出順序看出,當前一個 CompletableFuture  計算完成或拋出異常時,  whenComplete() 會接受它的返回結果或拋出的異常,來作一些其餘的事情,最後再返回原來的返回結果或拋出異常。類比下 try/catch 語句塊中的 final 語句塊。

whenComplete res = null
whenComplete ex/ by zero

java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero

    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run$$$capture(CompletableFuture.java:1592)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java)
    at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.ArithmeticException: / by zero
    at com.java8.action.ChapterTest.lambda$test4$0(ChapterTest.java:22)

異常處理

只有當前一個 CompletableFuture 發生異常時,纔會進入到 exceptionally() 方法,並將產生的異常做爲入參。

    @Test
    public void test5() {
        CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> {
            //測試拋異常後,exceptionally()方法接受並處理
            //int x = 1 / 0;
            return "這是一個栗子";
        }).exceptionally(ex -> ex.getCause().getMessage());
        System.out.println("f.join() = " + f.join());
    }

註釋  int x = 1 / 0; ,輸出以下:

f.join() = 這是一個栗子

取消註釋   int x = 1 / 0; , 輸出以下:

f.join() = / by zero

Both系列方法

  • thenAcceptBoth(): 等待當前的 CompletableFuture 和另外一個 CompletableFuture 執行完成,將它們的返回結果做爲入參去執行一個操做,沒有返回值
  • runAfterBoth(): 等待當前的 CompletableFuture 和另外一個 CompletableFuture 執行完成,而後去執行一個操做,沒有返回值

代碼清單一

    @Test
    public void test6() {
        CompletableFuture<Integer> f1 = CompletableFuture.completedFuture(9523);
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(this::get);
        CompletableFuture<Void> both = f1.thenAcceptBoth(f2, (num, str) -> System.out.println("num = " + num + ", str = " + str));
        both.join();
    }

    public String get() {
        delay();
        return "CompletableFuture 2";
    }

    public void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

代碼清單一輸出結果以下:

num = 9523, str = CompletableFuture 2 

代碼清單二

    @Test
    public void test7() {
        CompletableFuture<Integer> f1 = CompletableFuture.completedFuture(9523);
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "");
        CompletableFuture<Void> both = f1.runAfterBoth(f2, () -> System.out.println("執行一個任務,沒有入參"));
        both.join();
    }

代碼清單二輸出結果以下:

執行一個任務,沒有入參

Either系列

  • acceptEither: 當前的 CompletableFuture 和另外一個 CompletableFuture 任意一個執行完成,將對應的返回結果做爲入參去執行一個操做,沒有返回值
  • applyToEither: 當前的 CompletableFuture 和另外一個 CompletableFuture 任意一個執行完成,將對應的返回結果做爲入參,使用 mapping 函數轉換成一個新的值並返回
  • runAfterEither: 當前的 CompletableFuture 和另外一個 CompletableFuture 任意一個執行完成,而後去執行一個操做,沒有返回值

代碼清單三:

    @Test
    public void test8() {
        CompletableFuture<String> f1 = CompletableFuture.completedFuture("CompletableFuture 1");
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(this::get);
        CompletableFuture<Void> both = f1.acceptEither(f2, System.out::println);
        both.join();
    }

    public String get() {
        delay();
        return "CompletableFuture 2";
    }

    public void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

代碼清單三輸出結果:

CompletableFuture 1

代碼清單四:

    @Test
    public void test9() {
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(this::get);
        CompletableFuture<String> f2 = CompletableFuture.completedFuture("CompletableFuture 2");
        CompletableFuture<Integer> f3 = f1.applyToEither(f2, res -> {
            System.out.println("res = " + res);
            return res.length();
        });
        System.out.println("f3.join() = " + f3.join());
    }

    public String get() {
        delay();//這裏會延時一秒鐘
        return "CompletableFuture 1";
    }

代碼清單四輸出結果:

res = CompletableFuture 2
f3.join() = 19

代碼清單五:

    @Test
    public void test10() {
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(this::get);
        CompletableFuture<Void> f2 = CompletableFuture.allOf();
        CompletableFuture<Void> f3 = f1.runAfterEither(f2, () -> System.out.println("執行一個任務,沒有入參"));
        f3.join();
    }

    public String get() {
        delay();//這裏會延時一秒鐘
        return "CompletableFuture 1";
    }

代碼清單五輸出結果:

執行一個任務,沒有入參

使用自定義的執行器來處理多個異步任務

在實際應用場景中可能會遇到這種狀況,假如你須要同時處理大量的異步任務,且這些異步任務互相不依賴,你只要最後把它們的結果組裝起來就行,這該怎麼實現呢?

下面給出了一個使用默認執行器的示例,經過Stream流同時建立 9 個異步任務,獲取它們的結果並組裝後返回,其中 Runtime.getRuntime().availableProcessors() 表示Java虛擬機可用的處理器個數,在我以前的文章 Java8系列 (二) Stream流 中有介紹過。

代碼清單六:

    @Test
    public void test11() {
        List<String> list = Arrays.asList("王小波書店", "杭州沈記古舊書店", "貓的天空之城概念書店", "純真年代書吧", "南山書屋", "西西弗書店", "新華書店", "鍾書閣", "雲門書屋");
        System.out.println("當前機器有" + Runtime.getRuntime().availableProcessors() + "個可用的處理器");
        long start = System.nanoTime();
        List<CompletableFuture<String>> futures = list.stream()
                .map(str -> CompletableFuture.supplyAsync(() -> this.calculateLength(str)))
                .collect(Collectors.toList());
        System.out.println("get futures "+(System.nanoTime() - start) / 1000_000 + " msecs");
        String result = futures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.joining(",", "[", "]"));
        System.out.println("get result "+(System.nanoTime() - start) / 1000_000 + " msecs");
        System.out.println(result);
    }

    public String calculateLength(String str) {
        delay();
        return str;
    }

    public void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

運行代碼清單六,輸出結果:

當前機器有4個可用的處理器
get futures 95 msecs
get result 3098 msecs
[王小波書店,杭州沈記古舊書店,貓的天空之城概念書店,純真年代書吧,南山書屋,西西弗書店,新華書店,鍾書閣,雲門書屋]

能夠看到,雖然使用了異步處理,但仍是花了 3098 毫秒才執行完成全部任務。這是由於 CompletableFuture 內部採用的是通用線程池 ForkJoinPool.commonPool() , 默認都使用固定數目的線程, 具體線程數取決於  Runtime.getRuntime().availableProcessors()  的返回值。

我這裏測試的機器顯示通用線程池中處於可用狀態的線程數爲 4,一次只能同時處理 4 個任務,後面的5個異步任務只能等到前面某一個操做完成釋放出空閒線程才能繼續, 所以總的會消耗約 3 秒鐘的時間。

咱們將上面的代碼進行重構,使用自定義的執行器,經過自定義的執行器你能夠指定線程池的大小。其中線程數的設定能夠參考公式  Nthreads = NCPU * UCPU * (1 + W/C) 

    @Test
    public void test12() {
        List<String> list = Arrays.asList("王小波書店", "杭州沈記古舊書店", "貓的天空之城概念書店", "純真年代書吧", "南山書屋", "西西弗書店", "新華書店", "鍾書閣", "雲門書屋");
        final ExecutorService executor = Executors.newFixedThreadPool(Math.min(list.size(), 100), r -> {
            Thread thread = new Thread(r);
            //守護線程不會組織程序的終止
            thread.setDaemon(true);
            return thread;
        });
        System.out.println("當前機器有" + Runtime.getRuntime().availableProcessors() + "個可用的處理器, 當前處理異步請求的線程池大小爲 " + Math.min(list.size(), 100));
        long start = System.nanoTime();
        List<CompletableFuture<String>> futures = list.stream()
                .map(str -> CompletableFuture.supplyAsync(() -> this.calculateLength(str), executor))
                .collect(Collectors.toList());
        System.out.println("get futures " + (System.nanoTime() - start) / 1000_000 + " msecs");
        String result = futures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.joining(",", "[", "]"));
        System.out.println("get result " + (System.nanoTime() - start) / 1000_000 + " msecs");
        System.out.println(result);
    }

    public String calculateLength(String str) {
        delay();
        return str;
    }

    public void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

輸出結果以下:

當前機器有4個可用的處理器, 當前處理異步請求的線程池大小爲 9
get futures 38 msecs
get result 1039 msecs
[王小波書店,杭州沈記古舊書店,貓的天空之城概念書店,純真年代書吧,南山書屋,西西弗書店,新華書店,鍾書閣,雲門書屋]

能夠看到,使用自定義的執行器調大線程池大小後,總的運行時間只要 1039 毫秒。

將CompletableFuture做爲Controller的返回值

上面還存在一個問題,雖然如今能夠同時處理多個異步任務,可是若是須要將異步結果返回給另外一個服務,那不是還得經過 join() 阻塞的獲取到返回值後才能再返回麼?

自Spring Boot 1.3 (Spring 4.2) 以後開始支持 CompletableFuture 或 CompletionStage 做爲 Controller 的返回值,她很好的解決了上面的異步阻塞問題,只要將  CompletableFuture 做爲 Controller 的返回值,在異步任務執行完成後,它會自動響應結果給另外一個服務。

@RestController
public class AsyncController {

    @GetMapping("/redirect")
    public CompletableFuture<ModelAndView> redirect() {
        return CompletableFuture.supplyAsync(() -> {
            this.delay();
            RedirectView redirectView = new RedirectView("https://www.cnblogs.com/qingshanli/");
            redirectView.addStaticAttribute("hint", "CompletableFuture組裝ModelAndView視圖,異步返回結果");
            return new ModelAndView(redirectView);
        });
    }

    @GetMapping("/async")
    public CompletableFuture<String> async() {
        System.out.println("async method start");
        return CompletableFuture.supplyAsync(() -> {
            this.delay();
            return "CompletableFuture做爲Controller的返回值,異步返回結果";
        }).whenComplete((res, ex) -> System.out.println("async method completely, res = " + res + ", ex = " + ex));
    }

    public void delay() {
        try {
            Thread.sleep(3000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
} 

啓動項目,Postman 訪問 http://localhost:8080/async,截圖以下:

Postman 訪問 http://localhost:8080/redirect,截圖以下:

參考資料

https://github.com/AndreasKl/spring-boot-mvc-completablefuture

https://nickebbitt.github.io/blog/2017/03/22/async-web-service-using-completable-future

https://www.humansreadcode.com/spring-boot-completablefuture/

Java8 實戰

做者:張小凡
出處:https://www.cnblogs.com/qingshanli/ 本文版權歸做者和博客園共有,歡迎轉載,但未經做者贊成必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接,不然保留追究法律責任的權利。若是以爲還有幫助的話,能夠點一下右下角的【推薦】。

相關文章
相關標籤/搜索