Java8 CompletableFuture組合式的編程(筆記)

    * 實現異步API


public double getPrice(String product) {
    return calculatePrice(product);
}

/**
 * 同步計算商品價格的方法
 *
 * @param product 商品名稱
 * @return 價格
 */
private double calculatePrice(String product) {
    delay();
    return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
/**
 * 模擬計算,查詢數據庫等耗時
 */
public static void delay() {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

    * 將同步方法裝換爲異步方法

/**
 * 異步計算商品的價格.
 *
 * @param product 商品名稱
 * @return 價格
 */
public Future<Double> getPriceAsync(String product) {
    CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    new Thread(() -> {
        double price = calculatePrice(product);
        futurePrice.complete(price);
    }).start();
    return futurePrice;
}

使用異步API 模擬客戶端
Shop shop = new Shop("BestShop");
long start = System.nanoTime();
Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
long incocationTime = (System.nanoTime() - start) / 1_000_000;
System.out.println("執行時間:" + incocationTime + " msecs");
try {
    Double price = futurePrice.get();
    System.out.printf("Price is %.2f%n", price);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
long retrievalTime = (System.nanoTime() - start) / 1_000_000;
System.out.println("retrievalTime:" + retrievalTime + " msecs");

* 錯誤處理

     上述代碼,若是沒有意外,能夠正常工做,可是若是價格計算過程當中生產了錯誤會怎樣呢?很是不幸,這種狀況下你會獲得一個至關糟糕的結果:用於提示錯誤的異常會限制在視圖計算商品的價格的當前線程的範圍內,最終會殺死該線程,而這會致使等待get方法放回結果的客戶端永久的被阻塞,
     客戶端可使用重載的get方法,它給定了一個超時時間,這是一種推薦作法!
     爲了讓客戶端能瞭解商店沒法提供請求商品價格的緣由.咱們對代碼優化,!
/**
 * 異步計算商品的價格.
 *
 * @param product 商品名稱
 * @return 價格
 */
public Future<Double> getPriceAsync(String product) {
    CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    new Thread(() -> {
        try {
            double price = calculatePrice(product);
            futurePrice.complete(price);
        } catch (Exception e) {
            //不然就拋出異常,完成此次future操做
            futurePrice.completeExceptionally(e);
        }
    }).start();
    return futurePrice;
}


    * 使用工廠方法supplyAsync建立CompletableFuture

/**
 * 異步計算商品的價格.
 *
 * @param product 商品名稱
 * @return 價格
 */
public Future<Double> getPriceAsync(String product) {
   /* CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    new Thread(() -> {
        try {
            double price = calculatePrice(product);
            futurePrice.complete(price);
        } catch (Exception e) {
            //不然就拋出異常,完成此次future操做
            futurePrice.completeExceptionally(e);
        }
    }).start();
    return futurePrice;*/

    return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}


    * 讓代碼免受阻塞之苦

案例:最佳價格查詢器
private static List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
        new Shop(":LetsSaveBig"),
        new Shop("MyFavoriteShop"),
        new Shop("BuyItAll"));

/**
 * 最佳價格查詢器
 *
 * @param product 商品
 * @return
 */
public static List<String> findprices(String product) {
    return shops
            .stream()
            .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
            .collect(Collectors.toList());
}

驗證findprices的正確性和執行性能
long start = System.nanoTime();
System.out.println(findprices("myPhones27s"));
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Done in " + duration+" msecs");

執行結果:

* 使用平行流對請求進行並行操做

/**
 * 最佳價格查詢器(並行流)
 *
 * @param product 商品
 * @return
 */
public static List<String> parallelFindprices(String product) {
    return shops
            .parallelStream()
            .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
            .collect(Collectors.toList());
}

一樣的測試代碼:
至關不錯,看起來這是個簡單有效的主意,對4個不一樣商店的查詢實現了並行.全部完成操做的總耗時只有1秒多一點,讓咱們嘗試使用CompletableFuture,將findprices方法中對不一樣商店的同步調用替換爲異步調用.

    * 使用CompletableFuture發起異步請求

/**
 * 最佳價格查詢器(異步調用實現)
 * @param product 商品
 * @return
 */
public static List<String> asyncFindprices(String product) {
    //使用這種方式,你會獲得一個List<CompletableFuture<String>>,列表中的每個CompletableFuture對象在計算完成後都包含商店的String類型的名稱.
    //可是,因爲你用CompletableFuture實現了asyncFindprices方法要求返回一個List<String>.你須要等待全部的future執行完畢,將其包含的值抽取出來,填充到列表中才能返回
    List<CompletableFuture<String>> priceFuture = shops
            .stream()
            .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))))
            .collect(Collectors.toList());
    //爲了實現這個效果,我門能夠向最初的List<CompletableFuture<String>>施加第二個map操做,對list中的每個future對象執行join操做,一個接一個地等待他們容許結束,join和get方法
    //有相同的含義,不一樣的在於join不會拋出任何檢測到的異常
    return priceFuture
            .stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
}

相同的測試代碼:
結果讓咱們失望了.咱們採用異步調用新版方法,要比並行流慢了一倍.

    * 尋找更好的方案

通過我增長商店數量,而後使用三種方式反覆的測試,發現了一個問題,並行流和異步調用的性能不分伯仲,究其緣由都同樣,它們內部採用的是一樣的通用線程池,默認都使用固定數目的線程,具體線程數取決於Runtime.getRuntime.availableProcessors()放回值,然而,.CompletableFuture具備必定的優點,由於它容許你對執行器進行配置,尤爲是線程池的大小,讓它以適合應用需求的方式進行配置,知足程序的要求,而這是並行流API沒法提供的.

    * 使用定製的執行器

private static final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100));
/**
 * 最佳價格查詢器(異步調用實現,自定義執行器)
 *
 * @param product 商品
 * @return
 */
public static List<String> asyncFindpricesThread(String product) {
    List<CompletableFuture<String>> priceFuture = shops
            .stream()
            .map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPrice(product), executor))
            .collect(Collectors.toList());
    return priceFuture
            .stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
}

通過測試處理5個商店 是1秒多,處理9個商店也是1秒多

並行--使用流仍是CompletableFutures?
     目前爲止,咱們已經知道對集合進行並行計算有兩種方式,要麼將其轉化爲並行流,利用map這樣的操做開展工做,要麼枚舉出集合中的每個元素,建立新的線程,在CompletableFuture內對其進行操做,後者提供了更多的靈活性,你能夠調整線程池大小,兩者能幫助你確保總體計算機不會由於線程都在等待I/O而發生阻塞
     咱們使用這些API的建議以下:

    1. 若是你進行的是計算密集型的操做,而且沒有I/O,那麼推薦使用Stream接口,由於實現簡單,同時效率也多是最高的
    2. 反之,若是你並行的工做單元還涉及等待I/O的操做(包括網絡鏈接等待).那麼使用CompletableFuture是靈活性更好,你能夠像前面討論的那樣,依據等待/計算,或者W/C的比率設定須要使用的線程數,
相關文章
相關標籤/搜索