因爲咱們的兩個遠程服務:1.查詢價格,2.查詢折扣價格都是基於網絡的。因此很容易出現某一個商店的數據遲遲沒法返回的狀況。因爲這些緣由,我但願查詢器在查詢時可以將拿到數據先返回過來,而不是等待全部的異步查詢完成後集中返回一個List。
咱們首要須要避免的就是:等待建立一個包含全部價格的List。咱們應該直接處理CompletableFuture流,而後去響應他的completion事件,每個CompletableFuture對象完成時獲取到相應的返回值。java
先將Discount的折扣服務延遲時間修改成隨機值:數組
//計算折扣價格 private static Double apply(double price ,Code code){ //模擬遠程操做的延遲 delay(); return (price * (100 - code.percantage)) / 100; } private static void delay(){ try { //隨機延遲時間 int delay = 500 + random.nextInt(2000); Thread.sleep(delay); } catch (InterruptedException e) { e.printStackTrace(); } }
開始實現最佳價格查詢器:網絡
package BestPriceFinder; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.stream.Collectors; import java.util.stream.Stream; /** * 最佳價格查詢器 */ public class BestFinder { List<Shop> shops = Arrays.asList( new Shop("A"), new Shop("B"), new Shop("C"), new Shop("D"), new Shop("E"), new Shop("F"), new Shop("G"), new Shop("H"), new Shop("I"), new Shop("J") ); public void findPricesContinue(String product){ long st = System.currentTimeMillis(); Stream<CompletableFuture<String>> futurePrices = shops.stream() //首先異步獲取價格 .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPriceFormat(product),myExecutor)) //將獲取的字符串解析成對象 .map(future -> future.thenApply(Quote::parse)) //使用另外一個異步任務有獲取折扣價格 .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote),myExecutor))); //thenAccept()會在CompletableFuture完成以後使用他的返回值,這裏會持續執行子線程 CompletableFuture[] futures = futurePrices.map(f -> f.thenAccept(s -> { String sout = String.format("%s done in %s mesc",s,(System.currentTimeMillis() - st)); System.out.println(sout); })) .toArray(size -> new CompletableFuture[size]); //allOf()工廠方法接受由CompletableFuture對象構成的數組,這裏使用其等待全部的子線程執行完畢 CompletableFuture.allOf(futures).join(); } /** * 異步查詢 * 相比並行流的話CompletableFuture更有優點:能夠對執行器配置,設置線程池大小 */ @SuppressWarnings("all") private final Executor myExecutor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); //使用守護線程保證不會阻止程序的關停 t.setDaemon(true); return t; } });
thenAccept():提供了在CompletableFuture對象完成後使用他的返回值的功能。這樣咱們的每個CompletableFuture完成後就會打印他的返回值,最終等待全部的子線程完畢。app
allOf():工廠方法接受由CompletableFuture對象構成的數組,數組中全部的CompletableFuture完成後它返回一個CompletableFuture<Void>對象。dom
anyOf():廠方法接受由CompletableFuture對象構成的數組,返回數組中第一個完成的CompletableFuture的返回值CompletableFuture<Object>對象。異步