Java8 對多個異步任務進行流水線操做(筆記)

     如今咱們要對商店商品進行折扣服務.每一個折扣代碼對應不一樣的折扣率,使用一個枚舉變量Discount.Code來實現這一想法,具體代碼以下所示.
以枚舉類型定義的折扣代碼
/**
 * 折扣服務api
 *
 * @author Darcy
 *         Created by Administrator on 2017/3/17.
 */
public class Discount {
    public enum Code {
        NONE(0), SILVER(0), GOLD(10), PLATINUM(15), DIAMOND(20);
        private final int percentage;

        Code(int percentage) {
            this.percentage = percentage;
        }
    }

    public static String applyDiscount(Quote quote) {
        return quote.getShopName() + " price is " + Discount.apply(quote.getPrice(), quote.getDiscountCode());
    }

    private static double apply(double price, Code code) {
        delay();
        return price * (100 - code.percentage) / 100;
    }

    /**
     * 模擬計算,查詢數據庫等耗時
     */
    public static void delay() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

修改商店返回價格的格式:

public String getPrice(String product) {
    double price = calculatePrice(product);
    Discount.Code code = Discount.Code.values()[
            random.nextInt(Discount.Code.values().length)];
    return String.format("%s:%.2f:%s", name, price, code);
}


    * 實現折扣服務

咱們的商店已經能從不一樣的商店得到商品價格,解析結果字符串,針對每一個字符串,查詢折扣服務器取的折扣代碼.這個流程決定了請求商品的最終折扣價格.咱們將對商店返回的字符串的解析操做封裝到了下面的Quote類中:
/**
 * 商店返回消息實體,不可變對象模式 線程安全
 * @author Darcy
 *         Created by Administrator on 2017/3/17.
 */
public final class Quote {
    private final String shopName;
    private final double price;
    private final Discount.Code discountCode;

    public Quote(String shopName, double price, Discount.Code discountCode) {
        this.shopName = shopName;
        this.price = price;
        this.discountCode = discountCode;
    }

    public static Quote parse(String s) {
        String[] split = s.split(":");
        String shopName = split[0];
        double price = Double.parseDouble(split[1]);
        Discount.Code discountCode = Discount.Code.valueOf(split[2]);
        return new Quote(shopName, price, discountCode);
    }

    public String getShopName() {
        return shopName;
    }

    public double getPrice() {
        return price;
    }

    public Discount.Code getDiscountCode() {
        return discountCode;
    }
}

     Discount服務還提供了一個applyDiscount方法,它接收一個Quote對象,返回一個字符串,表示生成該Quote的shop中的折扣價格,代碼以下:
public static String applyDiscount(Quote quote) {
    return quote.getShopName() + " 商品原價: " + quote.getPrice() + " 折扣後價格: " + Discount.apply(quote.getPrice(), quote.getDiscountCode());
}

private static double apply(double price, Code code) {
    delay();
    return price * (100 - code.percentage) / 100;
}


    * 使用Discount服務

/**
 * 商店折扣價格查詢器
 *
 * @param product 商品
 * @return
 */
public static List<String> findprices(String product) {
    return shops
            .stream()
            .map(shop ->  shop.getPrice(product))
            .map(Quote::parse)
            .map(Discount::applyDiscount)
            .collect(Collectors.toList());
}

執行結果:
換成並行流:
/**
 * 商店折扣價格查詢器
 *
 * @param product 商品
 * @return
 */
public static List<String> findprices(String product) {
    return shops
            .parallelStream()
            .map(shop ->  shop.getPrice(product))
            .map(Quote::parse)
            .map(Discount::applyDiscount)
            .collect(Collectors.toList());
}

執行結果:
看到差距了吧


    * 構建同步和異步操做

咱們再次使用ComoletableFuture提供的特性.以異步方式從新實現findPrices方法,詳細代碼以下:
/**
 * 商店折扣價格查詢器(CompletableFuture方式)
 *
 * @param product 商品
 * @return
 */
public static List<String> findPrices(String product) {
    List<CompletableFuture<String>> collect = shops
            .stream()
            //以異步凡是取得每一個shop中指定產品的原始價格
            .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
            //Quote對象存在時,對其返回值進行轉換
            .map(future -> future.thenApply(Quote::parse))
            //使用另外一個異步任務構建指望的Future,申請折扣 thenCompose 將多個future組合 一個一個執行
            .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)))
            .collect(Collectors.toList());
    return collect
            .stream()
            //等待流中全部的future執行完畢,並提取各自的返回值
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
}


    * 對最佳價格查詢器應用的優化

上面的全部例子中都是經過響應以前添加1秒延遲的等待時間模擬方法的遠程調用,毫無疑問,現實生活中,你的應用訪問各個遠程服務器時極可能遭遇沒法預知的延遲,觸發緣由多種多樣,從服務器的負荷到網絡的延遲,有些甚至是源於遠程服務如何評估你應用的商業價值,
因爲這些緣由,你但願購買的商品在某些緣由的查詢速度要比另外一些商店更快,咱們模擬了操做:

/**
 * 模擬不一樣的商店 延遲不同的狀況
 */
public static void randomDelay() {
    int delay = 500 + random.nextInt(2000);
    try {
        Thread.sleep(delay);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

重構findPrices方法 返回一個由Future構成的流:
/**
 * 重構findPrices方法 返回一個由Future構成的流
 *
 * @param product 商品
 * @return
 */
public static Stream<CompletableFuture<String>> findProcesStream(String product) {
    return shops
            .stream()
            //以異步凡是取得每一個shop中指定產品的原始價格
            .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
            //Quote對象存在時,對其返回值進行轉換
            .map(future -> future.thenApply(Quote::parse))
            //使用另外一個異步任務構建指望的Future,申請折扣 thenCompose 將多個future組合 一個一個執行
            .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)));
}

付諸實現:
long start = System.nanoTime();
CompletableFuture[] futures = findProcesStream("myPhones27s")
        //Java 8的CompletableFuture經過thenAccept方法  他接收CompletableFuture執行完畢的返回值做爲參數.
        .map(f -> f.thenAccept(
                s -> System.out.println(s + " (done in " +
                        ((System.nanoTime() - start) / 1_000_000) + " msecs)")))
        .toArray(CompletableFuture[]::new);
//allOf工廠方法接收一個由CompletableFuture構成的數組,數組中全部的CompletableFuture對象執行完畢後,它返回一個
//CompletableFuture<Void>對象,這意味着你須要等待最初Stream中全部的CompletableFuture對象執行完畢
//angOf該方法接收一個CompletableFuture對象構成的數組,返回由第一個執行完畢的CompletableFuture對象的返回值構成的CompletableFuture<Object>
CompletableFuture.allOf(futures).join();
System.out.println("All shops have now responded in  " + ((System.nanoTime() - start) / 1_000_000) + " msecs");

執行結果:
相關文章
相關標籤/搜索