Java8函數之旅 (八) - 組合式異步編程

前言

隨着多核處理器的出現,如何輕鬆高效的進行異步編程變得愈發重要,咱們看看在java8以前,使用java語言完成異步編程有哪些方案。javascript

JAVA8以前的異步編程

  • 繼承Thead類,重寫run方法
  • 實現runable接口,實現run方法
  • 匿名內部類編寫thread或者實現runable的類,固然在java8中能夠用lambda表達式簡化
  • 使用futureTask進行附帶返回值的異步編程
  • 使用線程池和Future來實現異步編程
  • spring框架下的@async得到異步編程支持

使用線程池與future來實現異步編程

實現方式可謂是多種多樣,這裏咱們使用線程池和future來實現異步編程,藉着這個例子來說述java8的組合式異步編程有着怎樣的優點css

//構造線程池
        ExecutorService pool = Executors.newCachedThreadPool();
        try {
            //構造future結果,doSomethingA十分耗時,所以採用異步
            Future<Integer> future = pool.submit(() -> doSomethingA());
            //作一些別的事情
            doSomethingB();
            //從future中得到結果,設置超時時間,超過了就拋異常
            Integer result = future.get(10, TimeUnit.SECONDS);
            //打印結果
            System.out.printf("the async result is : %d", result);
            //異常捕獲
        } catch (InterruptedException e) {
            System.out.println("任務計算拋出了一個異常!");
        } catch (ExecutionException e) {
            System.out.println("線程在等待的過程當中被中斷了!");
        } catch (TimeoutException e) {
            System.out.println("future對象等待時間超時了!");
        }
    }

然而這樣的異步編程方式僅僅能知足基本的須要,稍微複雜的一些異步處理Future接口彷佛就有點一籌莫展了,例如html

  • 將兩個異步計算合併爲一個——這兩個異步計算之間相互獨立,同時第二個又依賴於第
    一個的結果。
  • 等待 Future 集合中的全部任務都完成。
  • 僅等待 Future 集合中最快結束的任務完成(有可能由於它們試圖經過不一樣的方式計算同
    一個值) ,並返回它的結果。
  • 經過編程方式完成一個 Future 任務的執行(即以手工設定異步操做結果的方式) 。
  • 應對 Future 的完成事件(即當 Future 的完成事件發生時會收到通知,並能使用 Future計算的結果進行下一步的操做,不僅是簡單地阻塞等待操做的結果)

這種感受其實就很像沒有stream以前的collections的操做感受同樣,一樣的,對於future,java8提供了它的函數式升級版本CompletableFuture,從名字就能夠看出來這絕對是future的升級版。java

JAVA8中的組合式異步編程

使用CompletableFuture進行異步編程

事物的發展每每都是由簡單->複雜->簡單,這裏咱們一樣遵循這樣的規律,按部就班。
下面的例子摘取《java8實戰》的異步編程章節,並作了簡化。
咱們假設如今咱們有一項查詢商品價格的服務十分耗時,因此毫無例外的咱們想讓查詢最佳價格的服務以異步的形式執行。
最直接的方式是直接構建一個異步查詢商品價格的api,而且返回,爲了演示須要,編寫一個線程等待一秒的方法來模擬長時間的請求。spring

public double getPrice(String product) {
        return calculatePrice(product);
    }

    private double calculatePrice(String product) {
        delay();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }
    
    public static void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

如今getPrice這方法是一個同步的方法,該方法在通過1秒的延遲以後會返回給咱們一個商品的價格(這裏只是簡單的根據名字構造了一個隨機數)編程

咱們使用completFuture將getPrice轉化爲異步方法,以下bootstrap

public Future<Double> getAsyncPrice(String product) {
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        new Thread(() -> {
            double price = calculatePrice(product);
            futurePrice.complete(price);
        }).start();;
        return futurePrice;
    }

這裏構造一個completableFuture對象,並另起一個異步線程,將異步計算的結果使用futurePrice.complete來接受,無需等待直接返回future結果
調用類使用Integer result = future.get(10, TimeUnit.SECONDS)來接受返回的結果,若是等待超時則拋出異常。
另外,若是異步線程發生異常,而且在排查問題的時候想要知道具體是什麼緣由致使的,
能夠在getAsyncPrice方法中使用completeExcepitonally來獲得異常信息而且結束此次異步任務,代碼以下api

public Future<Double> getPriceAsync(String product) {
    public Future<Double> getPriceAsync(String product) {
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        new Thread(() -> {
            try {
                double price = calculatePrice(product);
                futurePrice.complete(price);
            } catch (Exception ex) {
                futurePrice.completeExceptionally(ex);
            }
        }).start();
        return futurePrice;
    }

這樣,基本的功能就實現了。數組

使用工廠類簡化異步操做

也許你看到上面的代碼,會說:"我暈,你這寫法比原來還複雜哦,並且我也沒看出啥區別啊。",是的,上文的寫法能夠算是原生態的寫法了,目的爲爲下面的知識作一個簡單的鋪墊。
事實上,CompleteFuture自己提供了大量的工廠方法來供咱們十分方便的實現一個異步編程,他封裝了前篇一概的異常與結果接收,你只須要編寫真正的異步邏輯部分就能夠了,同時借住於lambda表達式,能夠更進一步。服務器

supplyAsync 方法接受一個生產者(Supplier)做爲參數,返回一個 CompletableFuture
對象, 該對象完成異步執行後會讀取調用生產者方法的返回值。 生產者方法會交由 ForkJoinPool
池中的某個執行線程(Executor)運行,可是你也可使用 supplyAsync 方法的重載版本,傳
遞第二個參數指定不一樣的執行線程執行生產者方法。

因而上文的例子能夠改寫以下

public Future<Double> getPriceAsync(String product) {
        return CompletableFuture.supplyAsync(() -> calculatePrice(product));
    }

是否是簡潔了許多呢?

可如今還有問題,這裏咱們成功的編寫了一個十分簡潔的異步方法,可實際的狀況中,咱們所能調用的API大部分都是同步的,所以下面將介紹如何使用異步的方法去操做這些同步API。

使用流異步操做同步API

咱們如今有這麼一個需求,它接受產品名做爲參數,返回一個字符串列表,
這個字符串列表中包括商店的名稱、該商店中指定商品的價格,商店集合以及接口設計以下。

List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
new Shop("LetsSaveBig"),
new Shop("MyFavoriteShop"),
new Shop("BuyItAll"));

public List<String> findPrices(String product);

使用同步的方法實現

這樣的集合變換使用stream流來操做十分容易,代碼以下

public List<String> findPrices(String product) {
        return shops.stream()
                .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
                .collect(toList());
    }

stream流將 shop映射爲了shop的名稱以及該shop中商品的價格的字符串,並使用收集器進行收集。

使用異步的方法實現

事實上,咱們徹底可使用流將shop映射成CompletableFuture對象,就好像在操做集合同樣,代碼以下

List<CompletableFuture<String>> priceFutures = shops.stream()
            .map(shop -> CompletableFuture
                    .supplyAsync(() -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))))
            .collect(toList());

使用這種方式,你會獲得一個,List<CompletableFuture >,列表中的每一個CompletableFuture 對象在計算完成後都包含商店的String類型的名稱。可是,因爲你用CompletableFutures 實現的findPrices方法要求返回一個List ,你須要等待全部的future 執行完畢,將其包含的值抽取出來,填充到列表中才能返回。

爲了實現這個效果,你能夠向最初的 List<CompletableFuture > 施加第二個map 操做,對 List 中的全部future對象執行join操做,一個接一個地等待它們運行結束。注意CompletableFuture類中的join方法和Future接口中的get有相同的含義,而且也聲明在Future 接口中,它們惟一的不一樣是join不會拋出任何檢測到的異常。使用它你再也不須要使用try / catch 語句塊讓你傳遞給第二個map方法的Lambda表達式變得過於臃腫。全部這些整合在一塊兒,你就能夠從新實現 findPrices 了,具體代碼以下

public List<String> findPrices(String product) {
        List<CompletableFuture<String>> priceFutures = shops.stream()
                .map(
                    shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPrice(product)))
                .collect(Collectors.toList());
                
        return priceFutures.stream()
                .map(CompletableFuture::join)
                .collect(toList());
    }

以上的代碼你可能會疑惑,爲何不直接按照shop->completableFuture->join->collect
的方式進行流處理呢?那是由於join這一步自己是阻塞的,對於流操做來講,前一個shop沒有處理完以前,是不會處理下一個shop的,因此對於每個shop,處理到join這一步的時候就會阻塞住等待1秒,這樣的話,這個流水線自己就會變回阻塞的了。

而上文的編寫方法能夠看出 shop->completableFuture->collect 這個操做自己是非阻塞的,順利的將全部的請求都發出去了,隨後再使用join來完成結果的收集。

使用線程池來管控異步方法

前面在介紹工廠方法時提到,能夠選擇第二參數放入一個線程池來進行管控。

private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() {
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setDaemon(true);
            return t;
        }
    });

接着在supplyAsync中使用該線程池便可,代碼以下

CompletableFuture.supplyAsync(() -> shop.getName() + " price is " +
shop.getPrice(product), executor);

進階的異步流操做

既然咱們已經將異步操做與流相結合了,所以很容易的就會想到對於異步流來講,應該有會有相似於集合流的一些很是好用的API吧?事實上,JAVA8的確爲咱們提供了這些API。

構造同步和異步操做

如同集合流操做同樣,異步流也能夠提早安排一系列的任務,而後讓異步任務有條不紊的按照這個順序去執行。

  • 同步任務
    使用future.thenApply(Function)來實現,該方法接受一個Function對象
    你能夠規劃這樣的任務 任務A(異步)->任務B(同步),語法多是這樣的
stream()
    .map(xxx->supplayAsync(()->任務A)) //這一步已經異步的映射成了任務A
    .map(future->future.thenApply(任務B)//執行同步的任務B
    .collect
  • 異步任務
    與同步幾乎同樣,方法變爲future.thenCompose(Function)
    你能夠規劃這樣的任務 任務A(異步)->任務B(同步)->任務C(異步),語法多是這樣的
stream()
    .map(xxx->supplayAsync(()->任務A)) //這一步已經異步的映射成了任務A
    .map(future->future.thenApply(任務B)//執行同步的任務B
    .map(future->future.thenCompose(任務C))//再異步執行任務C
    .collect

將兩個 CompletableFuture 對象整合起來,不管它們是否存在依賴

使用thenCombine來完成,相似任務A與任務B,A是查詢價格,B是查詢匯率,這兩個任務之間自己沒有關聯關係,因此能夠同時發起,但你最後須要計算價格乘以匯率,所以在這兩個任務完成以後須要對他們的結果進行合併,代碼以下

Future<Double> futurePriceInUSD = CompletableFuture.supplyAsync(() -> shop.getPrice(product))//任務A
                .thenCombine(
                     CompletableFuture.supplyAsync(() -> exchangeService.getRate(Money.EUR, Money.USD)), //任務B
                    (price, rate) -> price * rate); //任務A與任務B的合併操做

注意這裏的任務A與任務B是異步的,但他們的合併操做是同步的,若是想要合併操做也是異步的,使用future.thenCombineAsync的異步方法版本。

對結果進行處理

使用thenAccept(Consumer)
以上都是對結果進行一些映射,你如今要對結果只進行處理,說白了就是前面的都是Function,如今要換成consumer,而且參數再也不是異步任務,而是任務的結果值,舉個例子,上文的任務A(異步)->任務B(同步)->任務C(異步) 的操做如今想到對他們的操做結果進行打印
就可使用thenAccept(Consumer)

stream()
    .map(xxx->supplayAsync(()->任務A)) //這一步已經異步的映射成了任務A
    .map(future->future.thenApply(任務B)//執行同步的任務B
    .map(future->future.thenCompose(任務C))//再異步執行任務C
    .map(future->future.thenAccept(System.out::println))//將結果打印
    .collect

使用allOf與anyOf對結果進行處理

須要注意的是在執行了Accpet方法以後,你獲得的是一個 CompletableFuture 流對象
你能夠對這些流對象進行相似及早求值的操做,例如這條查詢4個商家的價格服務只要有一個給出了返回結果就結束此次異步流。

CompletableFuture[] futures = findPricesStream("myPhone")
.map(f -> f.thenAccept(System.out::println))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.anyOf(futures).join();

allOf 工廠方法接收一個由 CompletableFuture 構成的數組,數組中的全部 Completable-Future 對象執行完成以後,它返回一個 CompletableFuture 對象。這意味着,若是你須要等待最初 Stream 中的全部 CompletableFuture 對象執行完畢,對 allOf 方法返回的CompletableFuture 執行 join 操做是個不錯的主意。這個方法對「最佳價格查詢器」應用也是有用的,由於你的用戶可能會困惑是否後面還有一些價格沒有返回,使用這個方法,你能夠在執行完畢以後打印輸出一條消息「All shops returned results or timed out」 。然而在另外一些場景中,你可能但願只要 CompletableFuture 對象數組中有任何一個執行完畢就再也不等待,好比,你正在查詢兩個匯率服務器,任何一個返回告終果都能知足你的需求。在這種狀況下,你可使用一個相似的工廠方法 anyOf 。該方法接收一個 CompletableFuture 對象構成的數組, 返回由第一個執行完畢的 CompletableFuture 對象的返回值構成的 Completable-Future

總結

本文是對Java8實戰中異步編程章節的一些整理和彙總,介紹了利用新增的completableFuture將異步任務與流操做集合起來實現組合式異步編程,利用工廠方法與函數接口能夠大大的簡化代碼,同時提升代碼的可閱讀性,想要查看詳細,能夠自行翻閱該書。

posted @ 2018-01-01 17:30  祈求者- 閱讀( ...) 評論( ...) 編輯 收藏

相關文章
相關標籤/搜索