總結!Java8 加強的Future:CompletableFuture

CompletableFuture是Java8新增的一個超大型工具類,爲何說她大呢?由於一方面它實現了Future接口,更重要的是,它實現了CompletionStage接口.這個接口也是Java8新增長的,而CompletionStage擁有多達約40種方法,javascript

經過CompletableFuture提供進一步封裝,咱們很容易實現Future模式那樣的異步調用,例如:java

public static Integer cale(Integer para) {
    try {
        Thread.sleep(1000);
 
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return para * para;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
 
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> cale(50));
    System.out.println(future.get());
}

上述代碼中CompletableFuture.supplyAsync()方法構造了一個CompletableFuture實例,在supplyAsync()函數中,他會在一個新的線程中,執行傳入的參數.在這裏,他會執行calc()方法,而calc()方法的執行多是比較慢的,可是不影響CompletableFuture實例的構造速度,所以supplyAsync()會當即返回,他返回的CompletableFuture對象實例就能夠做爲此次調用的契約,在未來任何場合,用於得到最終的計算結果.在CompletableFuture中,相似的工廠方法有如下幾個:數據庫

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)

其中supplyAsync()方法用於那些須要返回值的場景,好比計算某個數據,而runAsync()方法用於沒有返回值的場景,好比,僅僅是簡單地執行某一個異步動做.編程

首先說明一下已Async結尾的方法都是能夠異步執行的,若是指定了線程池,會在指定的線程池中執行,若是沒有指定,默認會在ForkJoinPool.commonPool()中執行網絡

     在這兩對方法中,都有一個方法能夠接手一個Executor參數,這使咱們可讓Supplier<U>或者Runnable在指定的線程池工做,若是不指定,則在默認的系統公共的ForkJoinPool.common線程池中執行.dom

* 流式調用

在前文中我已經簡單的提到,CompletionStage的約40個接口爲函數式編程作準備的,在這裏,就讓咱們看一下,若是使用這些接口進行函數式的流式API調用異步

public static Integer cale(Integer para) {
    try {
        Thread.sleep(1000);
 
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return para * para;
}
CompletableFuture<Void> future = CompletableFuture
        .supplyAsync(() -> cale(50))
        .thenApply(i -> Integer.toString(i))
        .thenApply(str -> "\"" + str + "\"")
        .thenAccept(System.out::println);
future.get();

上述代碼中,使用supplyAsync()函數執行了一個異步任務,接着連續使用流式調用對任務處理結果進行在加工,直到最後的結果輸出:async

* CompletableFuture中的異常處理

public static Integer cale(Integer para) {
    try {
        Thread.sleep(1000);
 
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return para * para;
}
CompletableFuture<Void> future = CompletableFuture
        .supplyAsync(() -> cale(50))
        .exceptionally(ex -> {
            System.out.println("ex.toString() = " + ex.toString());
            return 0;
        })
        .thenApply(i -> Integer.toString(i))
        .thenApply(str -> "\"" + str + "\"")
        .thenAccept(System.out::println);
future.get();

* 組合多個CompletableFuture

CompletableFuture還容許你將多個CompletableFuture進行組合,一種方法是使用thenCompose(),它的簽名以下:
public <U>CompletableFuture<U>thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
函數式編程

CompletableFuture<Void> future = CompletableFuture
        .supplyAsync(() -> cale(50))
        .thenCompose(i -> CompletableFuture
                .supplyAsync(() -> cale(i)))
        .thenApply(i -> Integer.toString(i))
        .thenApply(str -> "\"" + str + "\"")
        .thenAccept(System.out::println);
future.get();

另一種組和多個CompletableFuture的方法是thenCombine(),它的簽名以下:
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)函數

/*方法thenCombine()首先完成當前CompletableFuture和other的執行,
接着,將這二者的執行結果傳遞給BiFunction(該接口接受兩個參數,並有一個返回值),
並返回表明BiFuntion實例的CompletableFuture對象:*/
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> cale(50));
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> cale(25));
 
CompletableFuture<Void> fu = future1.thenCombine(future2, (i, j) -> (i + j))
        .thenApply(str -> "\"" + str + "\"")
        .thenAccept(System.out::println);
fu.get();

 * 實現異步API

public double getPrice(String product) {
    return calculatePrice(product);
}

/**
 * 同步計算商品價格的方法
 *
 * @param product 商品名稱
 * @return 價格
 */
private double calculatePrice(String product) {
    delay();
    return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
/**
 * 模擬計算,查詢數據庫等耗時
 */
public static void delay() {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

* 將同步方法裝換爲異步方法

/**
 * 異步計算商品的價格.
 *
 * @param product 商品名稱
 * @return 價格
 */
public Future<Double> getPriceAsync(String product) {
    CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    new Thread(() -> {
        double price = calculatePrice(product);
        futurePrice.complete(price);
    }).start();
    return futurePrice;
}

使用異步API 模擬客戶端
Shop shop = new Shop("BestShop");
long start = System.nanoTime();
Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
long incocationTime = (System.nanoTime() - start) / 1_000_000;
System.out.println("執行時間:" + incocationTime + " msecs");
try {
    Double price = futurePrice.get();
    System.out.printf("Price is %.2f%n", price);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
long retrievalTime = (System.nanoTime() - start) / 1_000_000;
System.out.println("retrievalTime:" + retrievalTime + " msecs");

//>執行時間:37 msecs
//>Price is 125.79
//>retrievalTime:1055 msecs

* 錯誤處理

上述代碼,若是沒有意外,能夠正常工做,可是若是價格計算過程當中生產了錯誤會怎樣呢?很是不幸,這種狀況下你會獲得一個至關糟糕的結果:
用於提示錯誤的異常會限制在視圖計算商品的價格的當前線程的範圍內,最終會殺死該線程,
而這會致使等待get方法放回結果的客戶端永久的被阻塞,
     客戶端可使用重載的get方法,它給定了一個超時時間,這是一種推薦作法!
     爲了讓客戶端能瞭解商店沒法提供請求商品價格的緣由.咱們對代碼優化,!

/**
 * 異步計算商品的價格.
 *
 * @param product 商品名稱
 * @return 價格
 */
public Future<Double> getPriceAsync(String product) {
    CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    new Thread(() -> {
        try {
            double price = calculatePrice(product);
            futurePrice.complete(price);
        } catch (Exception e) {
            //不然就拋出異常,完成此次future操做
            futurePrice.completeExceptionally(e);
        }
    }).start();
    return futurePrice;
}

* 使用工廠方法supplyAsync建立CompletableFuture

/**
 * 異步計算商品的價格.
 *
 * @param product 商品名稱
 * @return 價格
 */
public Future<Double> getPriceAsync(String product) {
   /* CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    new Thread(() -> {
        try {
            double price = calculatePrice(product);
            futurePrice.complete(price);
        } catch (Exception e) {
            //不然就拋出異常,完成此次future操做
            futurePrice.completeExceptionally(e);
        }
    }).start();
    return futurePrice;*/

    return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}

* 讓代碼免受阻塞之苦

案例:最佳價格查詢器
private static List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
        new Shop(":LetsSaveBig"),
        new Shop("MyFavoriteShop"),
        new Shop("BuyItAll"));

/**
 * 最佳價格查詢器
 *
 * @param product 商品
 * @return
 */
public static List<String> findprices(String product) {
    return shops
            .stream()
            .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
            .collect(Collectors.toList());
}

驗證findprices的正確性和執行性能
long start = System.nanoTime();
System.out.println(findprices("myPhones27s"));
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Done in " + duration+" msecs");
/*
[BestPrice price is 197.76, :
LetsSaveBig price is 155.39, 
MyFavoriteShop price is 124.21,
BuyItAll price is 139.23]
Done in 4071 msecs
*/

* 使用平行流對請求進行並行操做

/**
 * 最佳價格查詢器(並行流)
 *
 * @param product 商品
 * @return
 */
public static List<String> parallelFindprices(String product) {
    return shops
            .parallelStream()
            .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
            .collect(Collectors.toList());
}
/*
[BestPrice price is 201.41, :
LetsSaveBig price is 153.64,
MyFavoriteShop price is 224.65, 
BuyItAll price is 211.83]
Done in 1064 msecs
*/
至關不錯,看起來這是個簡單有效的主意,對4個不一樣商店的查詢實現了並行.全部完成操做的總耗時只有1秒多一點,
讓咱們嘗試使用CompletableFuture,將findprices方法中對不一樣商店的同步調用替換爲異步調用.

* 使用CompletableFuture發起異步請求

/**
 * 最佳價格查詢器(異步調用實現)
 * @param product 商品
 * @return
 */
public static List<String> asyncFindprices(String product) {
    //使用這種方式,你會獲得一個List<CompletableFuture<String>>,
    //列表中的每個CompletableFuture對象在計算完成後都包含商店的String類型的名稱.
    //可是,因爲你用CompletableFuture實現了asyncFindprices方法要求返回一個List<String>.
    //你須要等待全部的future執行完畢,將其包含的值抽取出來,填充到列表中才能返回
    List<CompletableFuture<String>> priceFuture = shops
            .stream()
            .map(shop ->CompletableFuture.supplyAsync(() ->
     String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))))
                 .collect(Collectors.toList());
    //爲了實現這個效果,我門能夠向最初的List<CompletableFuture<String>>
    //施加第二個map操做,對list中的每個future對象執行join操做,一個接一個地等待他們容許結束,join和get方法
    //有相同的含義,不一樣的在於join不會拋出任何檢測到的異常
    return priceFuture
            .stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
}

[BestPrice price is 187.24, :
LetsSaveBig price is 158.26, 
MyFavoriteShop price is 169.78, 
BuyItAll price is 170.59]
Done in 1061 msecs
結果讓咱們失望了.咱們採用異步調用新版方法,和並行差很少

* 尋找更好的方案

通過我增長商店數量,而後使用三種方式反覆的測試,發現了一個問題,並行流和異步調用的性能不分伯仲,究其緣由都同樣,
它們內部採用的是一樣的通用線程池,默認都使用固定數目的線程,
具體線程數取決於Runtime.getRuntime.availableProcessors()反回值,然而,
.CompletableFuture具備必定的優點,由於它容許你對執行器進行配置,
尤爲是線程池的大小,讓它以適合應用需求的方式進行配置,知足程序的要求,而這是並行流API沒法提供的.
private static final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100));
/**
 * 最佳價格查詢器(異步調用實現,自定義執行器)
 *
 * @param product 商品
 * @return
 */
public static List<String> asyncFindpricesThread(String product) {
    List<CompletableFuture<String>> priceFuture = shops
            .stream()
            .map(shop -> CompletableFuture.supplyAsync(
() -> shop.getName() + " price is " + shop.getPrice(product), executor))
            .collect(Collectors.toList());
    return priceFuture
            .stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
}

通過測試處理5個商店 是1秒多,處理9個商店也是1秒多

並行--使用流仍是CompletableFutures?

目前爲止,咱們已經知道對集合進行並行計算有兩種方式,要麼將其轉化爲並行流,利用map這樣的操做開展工做,要麼枚舉出集合中的每個元素,建立新的線程,在CompletableFuture內對其進行操做,後者提供了更多的靈活性,你能夠調整線程池大小,兩者能幫助你確保總體計算機不會由於線程都在等待I/O而發生阻塞 咱們使用這些API的建議以下:

  1.  若是你進行的是計算密集型的操做,而且沒有I/O,那麼推薦使用Stream接口,由於實現簡單,同時效率也多是最高的
  2.  反之,若是你並行的工做單元還涉及等待I/O的操做(包括網絡鏈接等待).那麼使用CompletableFuture是靈活性更好,你能夠像前面討論的那樣,依據等待/計算,或者W/C的比率設定須要使用的線程數,

上一篇講的<總結,總結!!!!java回調,future>能夠先看看

創造靠智慧,處世靠常識;有常識而無智慧,謂之平庸,有智慧而無常識,謂之笨拙。智慧是一切力量中最強大的力量,是世界上惟一自覺活着力量。 —— 高爾基

相關文章
相關標籤/搜索