《Java8實戰》-第十一章筆記(CompletableFuture:組合式異步編程)

CompletableFuture:組合式異步編程

最近這些年,兩種趨勢不斷地推進咱們反思咱們設計軟件的方式。第一種趨勢和應用運行的硬件平臺相關,第二種趨勢與應用程序的架構相關,尤爲是它們之間如何交互。咱們在第7章中已經討論過硬件平臺的影響。咱們注意到隨着多核處理器的出現,提高應用程序處理速度最有效的方式是編寫能充分發揮多核能力的軟件。你已經看到經過切分大型的任務,讓每一個子任務並行運行,這一目標是可以實現的;你也已經瞭解相對直接使用線程的方式,使用分支/合併框架(在Java 7中引入)和並行流(在Java 8中新引入)能以更簡單、更有效的方式實現這一目標。java

第二種趨勢反映在公共API日益增加的互聯網服務應用。著名的互聯網大鱷們紛紛提供了本身的公共API服務,好比谷歌提供了地理信息服務,Facebook提供了社交信息服務,Twitter提供了新聞服務。如今,不多有網站或者網絡應用會以徹底隔離的方式工做。更多的時候,咱們看到的下一代網絡應用都採用「混聚」(mash-up)的方式:它會使用來自多個來源的內容,將這些內容聚合在一塊兒,方便用戶的生活。git

好比,你可能但願爲你的法國客戶提供指定主題的熱點報道。爲實現這一功能,你須要向谷歌或者Twitter的API請求全部語言中針對該主題最熱門的評論,可能還須要依據你的內部算法對它們的相關性進行排序。以後,你可能還須要使用谷歌的翻譯服務把它們翻譯成法語,甚至利用谷歌地圖服務定位出評論做者的位置信息,最終將全部這些信息彙集起來,呈如今你的網站上。github

固然,若是某些外部網絡服務發生響應慢的狀況,你但願依舊能爲用戶提供部分信息,好比提供帶問號標記的通用地圖,以文本的方式顯示信息,而不是呆呆地顯示一片空白屏幕,直到地圖服務器返回結果或者超時退出。算法

要實現相似的服務,你須要與互聯網上的多個Web服務通訊。但是,你並不但願由於等待某些服務的響應,阻塞應用程序的運行,浪費數十億寶貴的CPU時鐘週期。好比,不要由於等待Facebook的數據,暫停對來自Twitter的數據處理。數據庫

這些場景體現了多任務程序設計的另外一面。第7章中介紹的分支/合併框架以及並行流是實現並行處理的寶貴工具;它們將一個操做切分爲多個子操做,在多個不一樣的核、CPU甚至是機器上並行地執行這些子操做。編程

與此相反,若是你的意圖是實現併發,而非並行,或者你的主要目標是在同一個CPU上執行幾個鬆耦合的任務,充分利用CPU的核,讓其足夠忙碌,從而最大化程序的吞吐量,那麼你其實真正想作的是避免由於等待遠程服務的返回,或者對數據庫的查詢,而阻塞線程的執行,浪費寶貴的計算資源,由於這種等待的時間極可能至關長。經過本章中你會了解,Future接口,尤爲是它的新版實現CompletableFuture,是處理這種狀況的利器。服務器

Future 接口

Future接口在Java 5中被引入,設計初衷是對未來某個時刻會發生的結果進行建模。它建模了一種異步計算,返回一個執行運算結果的引用,當運算結束後,這個引用被返回給調用方。在Future中觸發那些潛在耗時的操做把調用線程解放出來,讓它能繼續執行其餘有價值的工做,再也不須要呆呆等待耗時的操做完成。打個比方,你能夠把它想象成這樣的場景:你拿了一袋子衣服到你中意的乾洗店去洗。乾洗店的員工會給你張發票,告訴你何時你的衣服會洗好(這就是一個Future事件)。衣服乾洗的同時,你能夠去作其餘的事情。Future的另外一個優勢是它比更底層的Thread更易用。要使用Future,一般你只須要將耗時的操做封裝在一個Callable對象中,再將它提交給ExecutorService,就萬事大吉了。下面這段代碼展現了Java 8以前使用Future的一個例子。網絡

ExecutorService executor = Executors.newCachedThreadPool();
Future<Double> future = executor.submit(new Callable<Double>() {
    public Double call() {
        return doSomeLongComputation();
    }
});
// 異步操做進行的同時,你能夠作其餘的事情
doSomethingElse();

try {
    Double result = future.get(1, TimeUnit.SECONDS);
} catch (ExecutionException ee) {
    // 計算拋出一個異常
} catch (InterruptedException ie) {
    // 當前線程在等待過程當中被中斷
} catch (TimeoutException te) {
    // 在Future對象完成以前超過已過時
}

這種編程方式讓你的線程能夠在ExecutorService以併發方式調用另外一個線程執行耗時操做的同時,去執行一些其餘的任務。接着,若是你已經運行到沒有異步操做的結果就沒法繼續任何有意義的工做時,能夠調用它的get方法去獲取操做的結果。若是操做已經完成,該方法會馬上返回操做的結果,不然它會阻塞你的線程,直到操做完成,返回相應的結果。架構

你能想象這種場景存在怎樣的問題嗎?若是該長時間運行的操做永遠不返回了會怎樣?爲了處理這種可能性,雖然Future提供了一個無需任何參數的get方法,咱們仍是推薦你們使用重載版本的get方法,它接受一個超時的參數,經過它,你能夠定義你的線程等待Future結果的最長時間,而不是樣永無止境地等待下去。併發

Future 接口的侷限性

經過第一個例子,咱們知道Future接口提供了方法來檢測異步計算是否已經結束(使用isDone方法),等待異步操做結束,以及獲取計算的結果。可是這些特性還不足以讓你編寫簡潔的併發代碼。好比,咱們很難表述Future結果之間的依賴性;從文字描述上這很簡單,「當長時間計算任務完成時,請將該計算的結果通知到另外一個長時間運行的計算任務,這兩個計算任務都完成後,將計算的結果與另外一個查詢操做結果合併」。可是,使用Future中提供的方法完成這樣的操做又是另一回事。這也是咱們須要更具描述能力的特性的緣由,好比下面這些。

  • 將兩個異步計算合併爲一個——這兩個異步計算之間相互獨立,同時第二個又依賴於第一個的結果。
  • 等待Future集合中的全部任務都完成。
  • 僅等待Future集合中最快結束的任務完成(有可能由於它們試圖經過不一樣的方式計算同一個值),並返回它的結果。
  • 經過編程方式完成一個Future任務的執行(即以手工設定異步操做結果的方式)。
  • 應對Future的完成事件(即當Future的完成事件發生時會收到通知,並能使用Future計算的結果進行下一步的操做,不僅是簡單地阻塞等待操做的結果)。

這一章中,你會了解新的CompletableFuture類(它實現了Future接口)如何利用Java 8的新特性以更直觀的方式將上述需求都變爲可能。Stream和CompletableFuture的設計都遵循了相似的模式:它們都使用了Lambda表達式以及流水線的思想。從這個角度,你能夠說CompletableFuture和Future的關係就跟Stream和Collection的關係同樣。

使用CompletableFuture 構建異步應用

爲了展現CompletableFuture的強大特性,咱們會建立一個名爲「最佳價格查詢器」(best-price-finder)的應用,它會查詢多個在線商店,依據給定的產品或服務找出最低的價格。這個過程當中,你會學到幾個重要的技能。

  • 首先,你會學到如何爲你的客戶提供異步API(若是你擁有一間在線商店的話,這是很是有幫助的)。
  • 其次,你會掌握如何讓你使用了同步API的代碼變爲非阻塞代碼。你會了解如何使用流水線將兩個接續的異步操做合併爲一個異步計算操做。這種狀況確定會出現,好比,在線商店返回了你想要購買商品的原始價格,並附帶着一個折扣代碼——最終,要計算出該商品的實際價格,你不得不訪問第二個遠程折扣服務,查詢該折扣代碼對應的折扣比率。
  • 你還會學到如何以響應式的方式處理異步操做的完成事件,以及隨着各個商店返回它的商品價格,最佳價格查詢器如何持續地更新每種商品的最佳推薦,而不是等待全部的商店都返回他們各自的價格(這種方式存在着必定的風險,一旦某家商店的服務中斷,用戶可能遭遇白屏)。

實現異步API

爲了實現最佳價格查詢器應用,讓咱們從每一個商店都應該提供的API定義入手。首先,商店應該聲明依據指定產品名稱返回價格的方法:

public double getPrice(String product) {
    // 待實現
}

該方法的內部實現會查詢商店的數據庫,但也有可能執行一些其餘耗時的任務,好比聯繫其餘外部服務(好比,商店的供應商,或者跟製造商相關的推廣折扣)。咱們在本章剩下的內容中,採用delay方法模擬這些長期運行的方法的執行,它會人爲地引入1秒鐘的延遲,方法聲明以下。

public class Util {
    public static void delay() {
        int delay = 1000;
        try {
            Thread.sleep(delay);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

爲了介紹本章的內容,getPrice方法會調用delay方法,並返回一個隨機計算的值,代碼清單以下所示。返回隨機計算的價格這段代碼看起來有些取巧。它使用charAt,依據產品的名稱,生成一個隨機值做爲價格。

public class Shop {
    private final String name;
    private final Random random;

    public Shop(String name) {
        this.name = name;
        random = new Random(name.charAt(0) * name.charAt(1) * name.charAt(2));
    }

    public double getPrice(String product) {
        return calculatePrice(product);
    }

    private double calculatePrice(String product) {
        delay();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }
}

很明顯,這個API的使用者(這個例子中爲最佳價格查詢器)調用該方法時,它依舊會被阻塞。爲等待同步事件完成而等待1秒鐘,這是沒法接受的,尤爲是考慮到最佳價格查詢器對網絡中的全部商店都要重複這種操做。本章接下來的小節中,你會了解如何以異步方式使用同步API解決這個問題。可是,出於學習如何設計異步API的考慮,咱們會繼續這一節的內容,僞裝咱們還在深受這一困難的煩擾:你是一個睿智的商店店主,你已經意識到了這種同步API會爲你的用戶帶來多麼痛苦的體驗,你但願以異步API的方式重寫這段代碼,讓用戶更流暢地訪問你的網站。

將同步方法轉換爲異步方法

爲了實現這個目標,你首先須要將getPrice轉換爲getPriceAsync方法,並修改它的返回值:

public class Shop {
    ...
    public Future<Double> getPriceAsync(String product) {
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        new Thread(() -> {
            double price = calculatePrice(product);
            futurePrice.complete(price);
        }).start();
        return futurePrice;
    }
    ...
}

在這段代碼中,你建立了一個表明異步計算的CompletableFuture對象實例,它在計算完成時會包含計算的結果。接着,你調用fork建立了另外一個線程去執行實際的價格計算工做,不等該耗時計算任務結束,直接返回一個Future實例。當請求的產品價格最終計算得出時,你可使用它的complete方法,結束completableFuture對象的運行,並設置變量的值。很顯然,這個新版Future的名稱也解釋了它所具備的特性。使用這個API的客戶端,能夠經過下面的這段代碼對其進行調用。

public class ShopMain {

    public static void main(String[] args) {
        Shop shop = new Shop("最好的商店");
        long start = System.nanoTime();
        Future<Double> futurePrice = shop.getPriceAsync("我最喜歡的商品");
        long invocationTime = ((System.nanoTime() - start) / 1_000_000);
        System.out.println("調用時間 " + invocationTime);
        // 這裏能夠作其餘的事情,好比查詢其餘的商店
        doSomethingElse();
        // 計算商品價格
        try {
            double price = futurePrice.get();
            System.out.printf("價格是 %.2f%n", price);
        } catch (ExecutionException | InterruptedException e) {
            throw new RuntimeException(e);
        }
        long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
        System.out.println("計算價格時間 " + retrievalTime);
    }

    private static void doSomethingElse() {
        System.out.println("正在查詢其餘的商店...");
    }
}

咱們看到這段代碼中,客戶向商店查詢了某種商品的價格。因爲商店提供了異步API,該次調用馬上返回了一個Future對象,經過該對象客戶能夠在未來的某個時刻取得商品的價格。這種方式下,客戶在進行商品價格查詢的同時,還能執行一些其餘的任務,好比查詢其餘家商店中商品的價格,不會呆呆地阻塞在那裏等待第一家商店返回請求的結果。最後,若是全部有意義的工做都已經完成,客戶全部要執行的工做都依賴於商品價格時,再調用Future的get方法。執行了這個操做後,客戶要麼得到Future中封裝的值(若是異步任務已經完成),要麼發生阻塞,直到該異步任務完成,指望的值可以訪問。上面的代碼中,輸出的結果:

調用時間 116
正在查詢其餘的商店...
價格是 49107.07
計算價格時間 1172

你必定已經發現getPriceAsync方法的調用時間遠遠早於最終價格計算完成的時間,在以前的代碼,你還會知道咱們有可能避免發生客戶端被阻塞的風險。實際上這很是簡單,Future執行完畢能夠發送一個通知,僅在計算結果可用時執行一個由Lambda表達式或者方法引用定義的回調函數。不過,咱們當下不會對此進行討論,如今咱們要解決的是另外一個問題:如何正確地管理異步任務執行過程當中可能出現的錯誤。

錯誤處理

若是沒有意外,咱們目前開發的代碼工做得很正常。可是,若是價格計算過程當中產生了錯誤會怎樣呢?很是不幸,這種狀況下你會獲得一個至關糟糕的結果:用於提示錯誤的異常會被限制在試圖計算商品價格的當前線程的範圍內,最終會殺死該線程,而這會致使等待get方法返回結果的客戶端永久地被阻塞。

客戶端可使用重載版本的get方法,它使用一個超時參數來避免發生這樣的狀況。這是一種值得推薦的作法,你應該儘可能在你的代碼中添加超時判斷的邏輯,避免發生相似的問題。使用這種方法至少能防止程序永久地等待下去,超時發生時,程序會獲得通知發生了TimeoutException。不過,也由於如此,你不會有機會發現計算商品價格的線程內到底發生了什麼問題才引起了這樣的失效。爲了讓客戶端能瞭解商店沒法提供請求商品價格的緣由,你須要使用CompletableFuture的completeExceptionally方法將致使CompletableFuture內發生問題的異常拋出。對代碼優化後的結果以下所示。

public Future<Double> getPriceAsync(String product) {
    CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    new Thread(() -> {
        try {
            double price = calculatePrice(product);
            // 若是價格計算正常結束,完成Future操做並設置商品價格
            futurePrice.complete(price);
        } catch (Exception e) {
            // 不然就拋出致使失敗的異常,完成此次Future操做
            futurePrice.completeExceptionally(e);
        }
    }).start();
    return futurePrice;
}

客戶端如今會收到一個ExecutionException異常,該異常接收了一個包含失敗緣由的Exception參數,即價格計算方法最初拋出的異常。因此,舉例來講,若是該方法拋出了一個運行時異常「product not available」,客戶端就會獲得像下面這樣一段ExecutionException:

java.util.concurrent.ExecutionException: java.lang.RuntimeException: product
    not availableat java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2237)
    at xin.codedream.java8.chap11.AsyncShopClient.main(AsyncShopClient.java:14)
    ... 5 more
Caused by: java.lang.RuntimeException: product not available
    at xin.codedream.java8.chap11.AsyncShop.calculatePrice(AsyncShop.java:36)
    atxin.codedream.java8.chap11.AsyncShop.lambda$getPrice$0(AsyncShop.java:23)
    at xin.codedream.java8.chap11.AsyncShop$$Lambda$1/24071475.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:744)

使用工廠方法supplyAsync建立CompletableFuture
目前爲止咱們已經瞭解瞭如何經過編程建立CompletableFuture對象以及如何獲取返回值,雖然看起來這些操做已經比較方便,但還有進一步提高的空間,CompletableFuture類自身提供了大量精巧的工廠方法,使用這些方法能更容易地完成整個流程,還不用擔憂實現的細節。好比,採用supplyAsync方法後,你能夠用一行語句重寫getPriceAsync方法,以下所示。

public Future<Double> getPriceAsync(String product) {
    return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}

太棒了!七八行才能實現的功能,咱們如今只須要一行就能夠搞定了!supplyAsync方法接受一個生產者(Supplier)做爲參數,返回一個CompletableFuture對象,該對象完成異步執行後會讀取調用生產者方法的返回值。生產者方法會交由ForkJoinPool池中的某個執行線程(Executor)運行,可是你也可使用supplyAsync方法的重載版本,傳遞第二個參數指定不一樣的執行線程執行生產者方法。通常而言,向CompletableFuture的工廠方法傳遞可選參數,指定生產者方法的執行線程是可行的,在後面,你會使用這一能力,後面咱們將使用適合你應用特性的執行線程改善程序的性能。

接下來剩餘部分中,咱們會假設你很是不幸,沒法控制Shop類提供API的具體實現,最終提供給你的API都是同步阻塞式的方法。這也是當你試圖使用服務提供的HTTP API時最常發生的狀況。你會學到如何以異步的方式查詢多個商店,避免被單一的請求所阻塞,並由此提高你的「最佳價格查詢器」的性能和吞吐量。

讓你的代碼免受阻塞之苦

因此,你已經被要求進行「最佳價格查詢器」應用的開發了,不過你須要查詢的全部商店都如上面開始時介紹的那樣,只提供了同步API。換句話說,你有一個商家的列表,以下所示:

public class BestPriceFinder {
    private final List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
            new Shop("LetsSaveBig"),
            new Shop("MyFavoriteShop"),
            new Shop("BuyItAll"));
    ...
}

你須要使用下面這樣的簽名實現一個方法,它接受產品名做爲參數,返回一個字符串列表,這個字符串列表中包括商店的名稱、該商店中指定商品的價格:

public List<String> findPrices(String product);

你的第一個想法多是使用咱們在前面的章節中學習的Stream特性。你可能試圖寫出相似下面這個代碼(是的,做爲第一個方案,若是你想到這些已經至關棒了!)。

好吧,這段代碼看起來很是直白。如今試着用該方法去查詢你最近這些天瘋狂着迷的惟一產品(是的,你已經猜到了,它就是Old-Mi-Mix3)。此外,也請記錄下方法的執行時間,經過這些數據,咱們能夠比較優化以後的方法會帶來多大的性能提高,具體的代碼以下。

public class BestPriceFinder {
    private final List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
            new Shop("LetsSaveBig"),
            new Shop("MyFavoriteShop"),
            new Shop("BuyItAll"));

    public static void main(String[] args) {
        BestPriceFinder finder = new BestPriceFinder();
        finder.testFindPrices();
    }

    public void testFindPrices() {
        long start = System.nanoTime();
        System.out.println(findPrices("Old-Mi-Mix3"));
        long duration = (System.nanoTime() - start) / 1_000_000;
        System.out.println("完成時間 " + duration);
    }

    public List<String> findPrices(String product) {
        return shops.stream()
                .map(shop -> String.format("%s 價格 %.2f",
                        shop.getName(), shop.getPrice(product)))
                .collect(toList());
    }
}

輸出結果:

[BestPrice 價格 109.64, LetsSaveBig 價格 143.13, MyFavoriteShop 價格 175.50, BuyItAll 價格 154.20]
完成時間 4184

正如你預期的,findPrices方法的執行時間僅比4秒鐘多了那麼幾百毫秒,由於對這4個商店的查詢是順序進行的,而且一個查詢操做會阻塞另外一個,每個操做都要花費大約1秒左右的時間計算請求商品的價格。你怎樣才能改進這個結果呢?

使用並行流對請求進行並行操做

若是你看了第七章的筆記,那麼你應該想到的第一個,可能也是最快的改善方法是使用並行流來避免順序計算,以下所示。

public List<String> findPricesParallel(String product) {
    return shops.parallelStream()
            .map(shop -> String.format("%s 價格 %.2f",
                    shop.getName(), shop.getPrice(product)))
            .collect(toList());
}

運行代碼,與最初的代碼執行結果相比較,你發現了新版findPrices的改進了吧。

[BestPrice 價格 109.64, LetsSaveBig 價格 143.13, MyFavoriteShop 價格 175.50, BuyItAll 價格 154.20]
完成時間 1248

至關不錯啊!看起來這是個簡單但有效的主意:如今對四個不一樣商店的查詢實現了並行,因此完成全部操做的總耗時只有1秒多一點兒。你能作得更好嗎?讓咱們嘗試使用剛學過的CompletableFuture,將findPrices方法中對不一樣商店的同步調用替換爲異步調用。

使用CompletableFuture 發起異步請求

你已經知道咱們可使用工廠方法supplyAsync建立CompletableFuture對象。讓咱們把它利用起來:

public List<CompletableFuture<String>> findPricesFuture(String product) {
    return shops.stream()
            .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s 價格 %.2f",
                    shop.getName(), shop.getPrice(product))))
            .collect(toList());
}

使用這種方式,你會獲得一個List<CompletableFuture<String>>,列表中的每一個CompletableFuture對象在計算完成後都包含商店的String類型的名稱。可是,因爲你用CompletableFutures實現的findPrices方法要求返回一個List<String>,你須要等待全部的future執行完畢,將其包含的值抽取出來,填充到列表中才能返回。

爲了實現這個效果,你能夠向最初的List<CompletableFuture<String>>施加第二個map操做,對List中的全部future對象執行join操做,一個接一個地等待它們運行結束。注意CompletableFuture類中的join方法和Future接口中的get有相同的含義,而且也聲明在Future接口中,它們惟一的不一樣是join不會拋出任何檢測到的異常。使用它你再也不須要使用try/catch語句塊讓你傳遞給第二個map方法的Lambda表達式變得過於臃腫。全部這些整合在一塊兒,你就能夠從新實現findPrices了,具體代碼以下。

public List<String> findPrices(String product) {
    List<CompletableFuture<String>> priceFutures = shops.stream()
            .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s 價格 %.2f",
                    shop.getName(), shop.getPrice(product))))
            .collect(toList());

    return priceFutures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
}

運行下代碼瞭解下第三個版本findPrices方法的性能,你會獲得下面這幾行輸出:

[BestPrice 價格 109.64, LetsSaveBig 價格 143.13, MyFavoriteShop 價格 175.50, BuyItAll 價格 154.20]
完成時間 2207

這個結果讓人至關失望,不是嗎?超過2秒意味着利用CompletableFuture實現的版本,比剛開始的代碼中的原生順序執行且會發生阻塞的版本快。可是它的用時也差很少是使用並行流的前一個版本的兩倍。尤爲是,考慮到從順序執行的版本轉換到並行流的版本只作了很是小的改動,就讓人更加沮喪。

與此造成鮮明對比的是,咱們爲採用CompletableFutures完成的新版方法作了大量的工做!但,這就是所有的真相嗎?這種場景下使用CompletableFutures真的是浪費時間嗎?或者咱們可能漏掉了某些重要的東西?繼續往下探究以前,讓咱們休息幾分鐘,尤爲是想一想你測試代碼的機器是否足以以並行方式運行四個線程。

尋找更好的方案

並行流的版本工做得很是好,那是由於它能並行地執行四個任務,因此它幾乎能爲每一個商家分配一個線程。可是,若是你想要增長第五個商家到商店列表中,讓你的「最佳價格查詢」應用對其進行處理,這時會發生什麼狀況?

public class BestPriceFinder {
    private final List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
            new Shop("LetsSaveBig"),
            new Shop("MyFavoriteShop"),
            new Shop("BuyItAll"),
            new Shop("ShopEasy"));
    ...

    public List<String> findPricesParallel(String product) {
        return shops.parallelStream()
                .map(shop -> String.format("%s 價格 %.2f",
                        shop.getName(), shop.getPrice(product)))
                .collect(toList());
    }

    public List<String> findPricesSequential(String product) {
        return shops.stream()
                .map(shop -> String.format("%s 價格 %.2f",
                        shop.getName(), shop.getPrice(product)))
                .collect(toList());
    }


    public List<String> findPricesFuture(String product) {
        List<CompletableFuture<String>> priceFutures = shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s 價格 %.2f",
                        shop.getName(), shop.getPrice(product))))
                .collect(toList());

        return priceFutures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
    }
}

public class BestPriceFinderMain {

    private static BestPriceFinder bestPriceFinder = new BestPriceFinder();

    public static void main(String[] args) {
        execute("sequential", () -> bestPriceFinder.findPricesSequential("Old-Mi-Mix3"));
    }

    private static void execute(String msg, Supplier<List<String>> s) {
        long start = System.nanoTime();
        System.out.println(s.get());
        long duration = (System.nanoTime() - start) / 1_000_000;
        System.out.println(msg + " 完成時間 " + duration);
    }
}

絕不意外,順序執行版本的執行仍是須要大約5秒多鐘的時間,下面是執行的輸出:

[BestPrice 價格 109.64, LetsSaveBig 價格 143.13, MyFavoriteShop 價格 175.50, BuyItAll 價格 154.20, ShopEasy 價格 147.92]
sequential 完成時間 5139

很是不幸,並行流版本的程序此次比以前也多消耗了差很少1秒鐘的時間,由於能夠並行運行(通用線程池中處於可用狀態的)的四個線程如今都處於繁忙狀態,都在對前4個商店進行查詢。第五個查詢只能等到前面某一個操做完成釋放出空閒線程才能繼續,它的運行結果以下:

[BestPrice 價格 163.19, LetsSaveBig 價格 141.77, MyFavoriteShop 價格 159.81, BuyItAll 價格 165.02, ShopEasy 價格 165.81]
parallel 完成時間 2106

CompletableFuture版本的程序結果如何呢?咱們也試着添加第5個商店對其進行了測試,結果以下:

[BestPrice 價格 144.31, LetsSaveBig 價格 142.49, MyFavoriteShop 價格 146.99, BuyItAll 價格 132.52, ShopEasy 價格 139.15]
composed CompletableFuture 完成時間 2004

CompletableFuture版本的程序彷佛比並行流版本的程序還快那麼一點兒。可是最後這個版本也不太使人滿意。好比,若是你試圖讓你的代碼處理9個商店,並行流版本耗時3143毫秒,而CompletableFuture版本耗時3009毫秒。它們看起來不相伯仲,究其緣由都同樣:它們內部採用的是一樣的通用線程池,默認都使用固定數目的線程,具體線程數取決於Runtime.getRuntime().availableProcessors()的返回值。然而,CompletableFuture具備必定的優點,由於它容許你對執行器(Executor)進行配置,尤爲是線程池的大小,讓它以更適合應用需求的方式進行配置,知足程序的要求,而這是並行流API沒法提供的。讓咱們看看你怎樣利用這種配置上的靈活性帶來實際應用程序性能上的提高。

使用定製的執行器

就這個主題而言,明智的選擇彷佛是建立一個配有線程池的執行器,線程池中線程的數目取決於你預計你的應用須要處理的負荷,可是你該如何選擇合適的線程數目呢?

調整線程池的大小

你的應用99%的時間都在等待商店的響應,因此估算出的W/C比率爲100。這意味着若是你指望的CPU利用率是100%,你須要建立一個擁有400個線程的線程池。實際操做中,若是你建立的線程數比商店的數目更多,反而是一種浪費,由於這樣作以後,你線程池中的有些線程根本沒有機會被使用。出於這種考慮,咱們建議你將執行器使用的線程數,與你須要查詢的商店數目設定爲同一個值,這樣每一個商店都應該對應一個服務線程。不過,爲了不發生因爲商店的數目過多致使服務器超負荷而崩潰,你仍是須要設置一個上限,好比100個線程。代碼清單以下所示。

private final Executor executor = Executors.newFixedThreadPool(100, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setDaemon(true);
            return t;
        }
    });

注意,你如今正建立的是一個由守護線程構成的線程池。Java程序沒法終止或者退出一個正在運行中的線程,因此最後剩下的那個線程會因爲一直等待沒法發生的事件而引起問題。與此相反,若是將線程標記爲守護進程,意味着程序退出時它也會被回收。這兩者之間沒有性能上的差別。如今,你能夠將執行器做爲第二個參數傳遞給supplyAsync工廠方法了。好比,你如今能夠按照下面的方式建立一個可查詢指定商品價格的CompletableFuture對象:

CompletableFuture.supplyAsync(() -> String.format("%s 價格 %.2f",
                        shop.getName(), shop.getPrice(product)), executor)

改進以後,使用CompletableFuture方案的程序處理5個商店結果:

[BestPrice 價格 144.31, LetsSaveBig 價格 142.49, MyFavoriteShop 價格 146.99, BuyItAll 價格 132.52, ShopEasy 價格 139.15]
composed CompletableFuture 完成時間 1004

這個例子證實了要建立更適合你的應用特性的執行器,利用CompletableFutures向其提交任務執行是個不錯的主意。處理需大量使用異步操做的狀況時,這幾乎是最有效的策略。

並行——使用流仍是CompletableFutures?

目前爲止,你已經知道對集合進行並行計算有兩種方式:要麼將其轉化爲並行流,利用map這樣的操做開展工做,要麼枚舉出集合中的每個元素,建立新的線程,在CompletableFuture內對其進行操做。後者提供了更多的靈活性,你能夠調整線程池的大小,而這能幫助你確保總體的計算不會由於線程都在等待I/O而發生阻塞。書中使用這些API的建議以下。

  • 若是你進行的是計算密集型的操做,而且沒有I/O,那麼推薦使用Stream接口,由於實現簡單,同時效率也多是最高的(若是全部的線程都是計算密集型的,那就沒有必要建立比處理器核數更多的線程)。
  • 反之,若是你並行的工做單元還涉及等待I/O的操做(包括網絡鏈接等待),那麼使用CompletableFuture靈活性更好,你能夠像前文討論的那樣,依據等待/計算,或者W/C的比率設定須要使用的線程數。這種狀況不使用並行流的另外一個緣由是,處理流的流水線中若是發生I/O等待,流的延遲特性會讓咱們很難判斷到底何時觸發了等待。

如今你已經瞭解瞭如何利用CompletableFuture爲你的用戶提供異步API,以及如何將一個同步又緩慢的服務轉換爲異步的服務。不過到目前爲止,咱們每一個Future中進行的都是單次的操做。

代碼

Github: chap11

Gitee: chap11

相關文章
相關標籤/搜索