java併發編程學習14--CompletableFuture(一)

【回顧Future接口

Future接口時java5引入的,設計初衷是對未來某個時刻會發生的結果建模。它建模了一種異步計算,返回了一個執行預算結果的引用。好比,你去幹洗店洗衣服,店員會告訴你何時能夠來取衣服,而不是讓你一直在乾洗店等待。要使用Future只須要將耗時操做封裝在一個Callable對象中,再將其提交給ExecutorService就能夠了。java

ExecutorService executor = Executors.newFixedThreadPool(10);
        Future<Double> future = executor.submit(new Callable<Double>() {
            @Override
            public Double call() throws Exception {
                return doSomeLongComputation();
            }
        });
        
        doSomethingElse();

        try {
            //最多等待1秒
            Double result = future.get(1,TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            //當前線程等待過程當中被打斷
            e.printStackTrace();
        } catch (ExecutionException e) {
            //計算時出現異常
            e.printStackTrace();
        } catch (TimeoutException e) {
            //完成計算前就超時
            e.printStackTrace();
        }

可是Future依然有一些侷限性:數據庫

  1. 沒法將兩個異步計算的結果合併爲一個。
  2. 等待Future集合中全部任務完成。
  3. 等待Future集合中最快任務完成(選擇最優的執行方案)。
  4. 經過編程的方式完成一個Future任務的執行(手工設定異步結果處理)。
  5. 應對Future的完成事件,當Future的完成事件發生時會收到通知,並可使用Future的結果進行下一步操做,不僅是簡單的阻塞等待。

而CompletableFuture類實現了Future接口,能夠將上述的問題所有解決。CompletableFuture與Stream的設計都遵循了相似的設計模式:使用Lambda表達式以及流水線的思想,從這個角度能夠說CompletableFuture與Future的關係相似於Stream與Collection的關係。編程

【構建一個異步應用

最佳價格查詢器:查詢多個線上商店對同一商品的價格。設計模式

首先構建商店對象:api

package BestPriceFinder;

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

public class Shop {

    private String name;
    public Shop(String name){
        this.name = name;
    }
    public String getName(){
        return name;
    }

    /**
     * 異步api:使用建立CompletableFuture類提供的工廠方法與getPriceAsync()效果徹底一致
     * 能夠更輕易的完成這個流程,而且不用擔憂實現細節
     * @param product
     * @return
     */
    public Future<Double> getPriceAsyncByFactory(String product){
        return CompletableFuture.supplyAsync(() -> calculatePrice(product));
    }
    /**
     * 異步api:
     * @param product
     * @return
     */
    public Future<Double> getPriceAsync(String product){
        //建立CompletableFuture對象,它將包含計算結果
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        //在新線程中異步計算結果
        new Thread(() -> {
            try {
                double price = calculatePrice(product);
                //須要長時間計算的任務結束時,設置future的返回值
                futurePrice.complete(price);
            }catch (Exception e){
                //如這裏沒有使用completeExceptionally,線程不會結束,調用方會永遠的執行下去
                futurePrice.completeExceptionally(e);
            }
        }).start();
        //無需等待計算結果,直接返回future對象
        return futurePrice;
    }

    /**
     * 同步api:
     * 每一個商店都須要提供的查詢api:根據名稱返回價格;
     * 模擬查詢數據庫等一些耗時操做:使用delay()模擬這些耗時操做。
     * @param product
     * @return
     */
    public double getPrice(String product){
        return calculatePrice(product);
    }

    private double calculatePrice(String product){
        delay();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }

    private Random random = new Random();
    /**
     * 模擬耗時操做:延遲一秒
     */
    private static void delay(){
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

下面咱們針對Shop.java提供的同步方法與異步方法來進行測試:網絡

package BestPriceFinder;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;

/**
 * 最佳價格查詢器
 */
public class BestFinder {

    List<Shop> shops = Arrays.asList(
            new Shop("A"),
            new Shop("B"),
            new Shop("C"),
            new Shop("D"),
            new Shop("E"),
            new Shop("F"),
            new Shop("G"),
            new Shop("H"),
            new Shop("I"),
            new Shop("J")
    );

    /**
     * 順序查詢
     */
    public List<String> findPrices(String product){
        return shops.stream()
                     .map(shop -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product)))
                     .collect(Collectors.toList());
    }

    /**
     * 並行流查詢
     */
    public List<String> findPricesParallel(String product){
        return shops.parallelStream()
                .map(shop -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product)))
                .collect(Collectors.toList());
    }

    /**
     * 異步查詢
     * 相比並行流的話CompletableFuture更有優點:能夠對執行器配置,設置線程池大小
     */
    @SuppressWarnings("all")
    private final Executor myExecutor = Executors.newFixedThreadPool(Math.min(shops.size(), 800), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            //使用守護線程保證不會阻止程序的關停
            t.setDaemon(true);
            return t;
        }
    });
    @SuppressWarnings("all")
    public List<String> findPricesAsync(String product){
        List<CompletableFuture<String>> priceFuctures = shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product)),myExecutor))
                .collect(Collectors.toList());
        /** 這裏須要使用新的stream來等待全部的子線程執行完,
         *   由於:若是在一個stream中使用兩個map:
         *   List<CompletableFuture<String>> priceFuctures = shops.parallelStream()
         *           .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product))))
         *           .map(c -> c.join()).collect(Collectors.toList())
         *           .collect(Collectors.toList());
         *   考慮到流操做之間的延遲特性。若是你在單一的流水線中處理流,發向不一樣商家的請求只能以同步順序的方式執行纔會成功。所以每一個建立CompletableFuture
         *   對象只能在前一個操做結束以後執行查詢商家動做。
         */
        return priceFuctures.stream().map(c -> c.join()).collect(Collectors.toList());
    }
}
@Test
    public void findPrices(){
        BestFinder bestFinder = new BestFinder();
        long st = System.currentTimeMillis();
        System.out.println(bestFinder.findPrices("iPhonX"));
        System.out.println("done : " + (System.currentTimeMillis() - st) + "msecs");
    }
    @Test
    public void findPricesParallel(){
        BestFinder bestFinder = new BestFinder();
        long st = System.currentTimeMillis();
        System.out.println(bestFinder.findPrices("iPhonX"));
        System.out.println("done : " + (System.currentTimeMillis() - st) + "msecs");
    }
    @Test
    public void findPricesAsync(){
        BestFinder bestFinder = new BestFinder();
        long st = System.currentTimeMillis();
        System.out.println(bestFinder.findPricesAsync("iPhonX"));
        System.out.println("done : " + (System.currentTimeMillis() - st) + "msecs");
    }

同步api測試結果:毫無疑問是10秒之上
圖片描述併發

並行流獲取同步api測試結果:也是10秒之上,可是並行流不是很高效嗎?怎麼會如此悽慘呢?由於這與並行流能夠調用的系統核數相關,個人計算機是8核,最多8個線程同時運行。而商店有10個,也就是說,咱們的兩個線程會一直等待前面的某一個線程釋放出空閒才能繼續運行。
圖片描述dom

異步獲取api測試結果:一秒左右異步

圖片描述
爲什麼差距如此大呢?
明智的選擇是建立了一個配有線程池的執行器,線程池中線程的數目取決於你的應用須要處理的負擔,可是你該如何選擇合適的線程數目呢?ide

【選擇正確的線程池大小

《Java併發編程實戰》中給出以下公式:

Number = NCpu * Ucpu * ( 1 + W/C)
Number : 線程數量
NCpu : 處理器核數
UCpu : 指望cpu利用率
W/C : 等待時間與計算時間比

咱們這裏:99%d的時間是等待商店響應 W/C = 99 ,cpu利用率指望 100% ,NCpu = 9,推斷出 number = 800。可是爲了不過多的線程搞死計算機,咱們選擇商店數與計算值中較小的一個。

【並行流與CompletableFuture

目前,咱們對集合進行計算有兩種方式:1.並行流 2.CompletableFuture;而CompletableFuture更加的靈活,咱們能夠配置線程池的大小確保總體的計算不會由於等待IO而發生阻塞。
書上給出的建議以下:

  1. 若是是計算密集型的操做而且沒有IO推薦stream接口,由於實現簡單效率也高,若是全部的線程都是計算密集型的也就沒有必要建立比核數更多的線程。
  2. 反之,若是任務涉及到IO,網絡等操做:CompletableFuture靈活性更好,由於大部分線程處於等待狀態,須要讓他們更加忙碌,而且再邏輯中加入異常處理能夠更有效的監控是什麼緣由觸發了等待。

如今咱們知道了如何用CompletableFuture提供異步的api,後面的文章會學習如何利用CompletableFuture高效的操做同步api。

相關文章
相關標籤/搜索