本章節的內容:java
一、建立異步計算並獲取計算結果;算法
二、使用非阻塞操做提高吞吐量;數據庫
三、設計和實現異步API;編程
同步API只是對傳統方法調用的另外一種稱呼:你調用了某個方法,調用方在被調用方運行的過程會等待,被調用方法運行結束返回,調用方取得被調用方的返回值並繼續運行。即便調用方法和被調用方在不一樣的線程運行,調用方仍是須要等待被調用方結束後才能繼續運行,這就是阻塞式調用的由來。api
四、如何以異步的方式使用同步的API;數組
異步API會直接返回,或者至少在被調用方計算完成以前,將他剩餘的計算任務交給另外一個線程去作,該線程和調用方是異步的——這就是非阻塞式調用的由來。執行剩餘計算任務的線程會將他的計算結果返回給調用方。返回的方式要麼是經過回調函數,要麼是由調用方再次執行一個「等待,直到計算完成(Future的get方法)」的方法調用。這種方式的計算在I/O系統程序設計中很是常見。網絡
五、如何對兩個或多個異步操做進行流水線和合並操做;app
六、如何處理異步操做的完成狀態。dom
Future接口在Java5中被引入,設計初衷是對未來某個時刻會發生的結果進行建模。他建模了一種異步計算,返回一個執行運算結果的引用,當運算結束後,這個引用被返回給調用方。在Future中觸發那些潛在耗時的操做吧調用線程解放出來,讓他能繼續執行其餘有價值的工做,再也不須要呆呆等待耗時的操做完成。異步
打個比方,你去一些快餐店吃飯,付錢以後,他不會讓你在排隊窗口那裏呆呆的等。而是給你一張號碼牌,告訴你何時你的飯菜會作好。作飯的同時,你能夠作其餘的事情,和同事坐在餐桌邊聊聊今年爲何不漲工資種種。等到飯作好了,你再去取飯。
Future的另外一個有點是他比更底層的Thread更易用。要使用Future,一般只須要將耗時的操做封裝在一個Callable對象中,再將它提交給ExecutorService就好了。例如
package ch11; import java.util.concurrent.*; public class FutureTest { public static void main(String[] args) { // 建立ExecutorService,經過他你能夠像線程池提交任務 ExecutorService executorService = Executors.newCachedThreadPool(); // 提交任務到線程池中異步執行; // 1.提交的是一個Callable(有返回值)或Raunable(無返回值)對象; // 2.你的主要任務應該是寫在Callable對象的call方法之中; // 3.泛型參數即爲計算結果的類型; // 4.該執行過程是異步的,也就是說,此處的代碼不會產生阻塞,你能夠在以後的代碼中運行其餘的操做; Future<Integer> future = executorService.submit(new Callable<Integer>() { @Override public Integer call() { Integer result = 0; System.out.println("Run some compute that can use more time..."); return result; } }); // 異步操做進行的同時,你能夠作其餘事情 System.out.println("Do other things..."); // 獲取異步操做的結果,此處會產生阻塞,但能夠設置等待時間 try { // V get(long timeout, TimeUnit unit) // 獲取異步操做的結果,若是最終被阻塞,沒法獲得結果,那麼將會 // 在對坐等待1秒以後退出。 future.get(1,TimeUnit.SECONDS); } catch (InterruptedException e) { // 當前線程在等待過程當中被中斷 e.printStackTrace(); } catch (ExecutionException e) { // 計算拋出一個異常 e.printStackTrace(); } catch (TimeoutException e) { // 超時異常 e.printStackTrace(); } } }
若是你已經運行到沒有異步操做的結果就沒法在進行下一步操做的時候,就能夠考慮調用Future的get方法去獲取操做的結果。若是操做已經完成該方法會當即獲得操做結果,不然他會阻塞你的線程,直到操做完成,返回相應的結果。
可是無參的get方法是沒有超時參數的,這可能會致使線程的永久阻塞,所以,推薦使用帶有超時參數的get方法。除非你對你的程序有着「不可能計算出現問題」的自信。
很明顯,若是有多個Future在程序中出現,那麼咱們很難表述Future之間的依賴性。從文字上雖然能夠這樣表述:「當長時間計算任務完成時,請將該計算的結果通知到另外一個長時間運行的計算任務,這兩個計算任務都完成後,將計算的結果與另外一個查詢操做結果合併」。可是,使用Future中提供的方法完成這樣的操做又是另外一回事兒。這也是咱們須要更具描述能力的特性的緣由,好比:
一、兩個兩個異步計算你們的結果合併爲一個 —— 這兩個異步計算之間相互獨立,同時第二個有依賴於第一個的結果;
二、等待Future集合中的全部任務都完成;
三、僅等待Future集合中最快結束的任務完成(有可能由於他們試圖經過不一樣的方式計算同一個值);
四、經過編程方式完成一個Future任務的執行(即以手工設定異步操做的方式)
五、應對Future的完成事件。(即當Future的完成事件發生時會受到通知,並能使用Future計算的結果進行下一步的操做,不僅是簡單的橘色等待操做的結果)。
接下來咱們會使用CompletableFuture構建異步應用,該應用大體就是一個獲取顧客服務、商店打折價等。
若是您開發過集成式的應用程序,會深有感悟,不少獲取結果的操做一般不是咱們練習的時候信手捏來的值,而是經過各類外部服務獲取,例如:
等等,當目標查詢量、數據集過大時,這一般是一個耗時的操做。因爲咱們的重點不在這裏,所以解析來會使用delay方法模擬這個耗時的過程,從而體現出使用異步任務相比較傳統的同步方法的優點。delay方法以下所示,其實就是將當前線程休眠一段時間
public static void delay(){ try{ // sleep 1 sec. Thread.sleep(1000L); }catch(InterruptedExcetion e){ throw new RuntimeException(e); } }
接下來咱們編寫一個同步方法的應用方法,該方法用於查詢一件商品的價格
package ch11; import java.util.Random; /** * 商店類 */ public class Shop { private Random random; private String shopName; public Shop(String shopName) { this.shopName = shopName; this.random = new Random(); } // 獲取商店名稱 public String getShopName() { return shopName; } /** * 模擬延遲操做 */ public static void delay(){ try { Thread.sleep(1000); } catch (InterruptedException e) { System.out.println("sleep now thread error:" + e.getMessage()); e.printStackTrace(); } } public double getPrice(String product){ return calculatePrice(product); } /** * 經過商品的前兩個字符和一個隨機數作乘積得到價格 * @param product * @return */ private double calculatePrice(String product){ // delay times. delay(); return random.nextDouble() * product.charAt(0) + product.charAt(1); } }
記住,delay模擬的是一個耗時的過程。很明顯,這個類的使用者每次調用getPrice方法時,都會爲的古代這個同步事件完成而等待1秒鐘。這是沒法接收的,尤爲是考慮到價格查詢器對網絡中的全部的商店都重複這樣的操做。
所以,咱們但願使用異步的API重寫這段代碼。
使用咱們提到的Future轉換方法
public Future<Double> getPriceAsync(String product){ ... }
Future能夠理解爲一個暫時還不可知道結果的處理器,這個結果在計算完成後,能夠經過調用該Future對象的get方法取得。
由於這樣的設計,getPriceAsync能夠當即返回,調用線程能夠有機會在同一時間去執行其餘的有價值的計算任務。
新的CompleteableFuture提供了不少的方法,支持咱們執行各類各樣的操做。例如:
public Future<Double> getPriceAsync(String product){ // 建立一個CompletableFuture對象,他會包含計算的結果 CompletableFuture<Double> future = new CompletableFuture<>(); // 在另外一個線程中以異步的方法執行計算 new Thread(() -> { double price = calculatePrice(product); // 需長時間計算的任務結束並得出結果時,設置Future的返回值。 future.complete(price); }).start(); // 無需等待結果,直接返回Future對象 return future; }
接下來咱們測試該異步任務的表現
package ch11; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class CFutureTest { public static void main(String[] args) { Shop kfc = new Shop("KFC"); // 得到任意時間(納秒) long start = System.nanoTime(); Future<Double> hamburgerPriceFuture = kfc.getPriceAsync("hamburger"); long usedTimes = System.nanoTime() - start; // System.out.println("異步任務返回花費了" + usedTimes/1_000_000 + " 毫秒的時間;"); System.out.println("執行其餘的任務..."); try { Double price = hamburgerPriceFuture.get(); System.out.println("Price is " + price); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } usedTimes = System.nanoTime() - start; System.out.println("得到異步任務結果花費了" + usedTimes/1_000_000 + "毫秒的時間"); /** * 異步任務返回花費了55 毫秒的時間; * 執行其餘的任務... * Price is 117.5786113274089 * 得到異步任務結果花費了1058毫秒的時間 */ } }
若是價格計算的過程當中出現了錯誤,咱們該怎麼辦?
很是不幸的是,這種狀況下,咱們會獲得一個至關嚴重的結果,用於提示錯誤的異常會被抑制(java core 異常章節有講述抑制的概念)在視圖計算商品價格的當前線程的範圍內。最終會殺死該線程,而這會致使等待get方法返回結果的客戶端永久阻塞。
爲此,客戶端須要使用重載版本的get方法。這樣,在指定的時間內沒有計算出結果時,提早獲知超時消息。
但這也有一個問題,那就是咱們沒法獲知到底發生了什麼錯誤,這對於應用程序開發者來講是一個很嚴重的問題。爲了讓客戶端知道發生了什麼錯誤,咱們可使用CompletableFuture的completeExceptionally方法將致使CompletableFuture內發生的問題異常拋出。
基於此,咱們來改寫以前的代碼(新增一個優化方法)
public Future<Double> getPriceAsync2(String product){ CompletableFuture<Double> future = new CompletableFuture<>(); new Thread(() -> { try{ double price = calculatePrice(product); // 若是計算正常結束,則完成complete操做並設置商品價格; future.complete(price); }catch (Exception e){ // 不然,拋出致使失敗的異常,完成此次Future操做。 future.completeExceptionally(e); } }).start(); return future; }
咱們能夠傳入一個空的字符串做爲商品名稱參數來求取結果,顯然會發生數組越界的異常。
Future<Double> hamburgerPriceFuture = kfc.getPriceAsync2("");
若是咱們調用以前的getPriceAsync
方法,此時程序會被阻塞。
可是若是咱們調用getPriceAsync2
則會獲得異常的緣由,以及主線程及時的終止
異步任務返回花費了55 毫秒的時間; 執行其餘的任務... 得到異步任務結果花費了1057毫秒的時間 java.util.concurrent.ExecutionException: java.lang.StringIndexOutOfBoundsException: String index out of range: 0 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at ch11.CFutureTest.main(CFutureTest.java:23) Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: 0 at java.lang.String.charAt(String.java:658) at ch11.Shop.calculatePrice(Shop.java:76) at ch11.Shop.lambda$getPriceAsync2$1(Shop.java:57) at java.lang.Thread.run(Thread.java:745) Process finished with exit code 0
前面調用CompletableFutre的過程當中,咱們會明顯的感受到調用比較繁瑣。其實還有優雅的建立、調用方式。即便用工廠方式取代以前的建立方式,咱們改寫以前的方法:
public Future<Double> getPriceAsync3(String product){ return CompletableFuture.supplyAsync(() -> calculatePrice(product)); }
supplyAsync接受一個生產者做爲參數,返回一個CompletableFuture對象,該對象是完成異步執行後會讀取調用生產者方法的返回值。
須要注意的是,該使用方式的效果和咱們提供了錯誤管理機制的getPriceAsync2
是同樣的,很明顯,他優雅的許多。
如今咱們解決了查詢一個商店的需求了,但新的需求來了,須要咱們查詢一個列表裏面的商店對於指定的商品,到底哪一家的更便宜。
顯然,這須要咱們分別取訪問全部商店提供的查詢指定商品價格的API。
假設如今有5家商店:
List<Shop> shops = Arrays.asList(new Shop("shop1"), new Shop("shop2"), new Shop("shop3"), new Shop("shop4"), new Shop("shop5"));
針對該問題,咱們來了三位參賽者A、B和C。他們提出了各自的解決方案,咱們分別來看看他們的方案以及效果如何。
A爲咱們提出了一個很是直接的解決方案,即便用java8的順序流,咱們來看看其方案的源碼:
package ch11; import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.stream.Collectors; public class CFutureTest2 { public static void main(String[] args) { String product = "hamburger"; List<Shop> shops = Arrays.asList(new Shop("shop1"), new Shop("shop2"), new Shop("shop3"), new Shop("shop4"), new Shop("shop5")); long start = System.nanoTime(); List<String> list = shops.stream().map(shop -> { return String.format("%s 的對應的價格爲 %.2f", shop.getShopName(), shop.getPrice(product)); }).collect(Collectors.toList()); System.out.println(list); System.out.println("=== 執行效果 ==="); System.out.println("該方式共花費時間(毫秒): " + (System.nanoTime() - start)/1_000_000); /** * [shop1 的對應的價格爲 145.81, shop2 的對應的價格爲 136.21, shop3 的對應的價格爲 191.72, shop4 的對應的價格爲 158.95, shop5 的對應的價格爲 152.92] * === 執行效果 === * 該方式共花費時間(毫秒): 5110 */ } }
由於是順序執行的緣由,所以5s+的時間大概是順序的從這5家商店中依次獲取商品價格所花費的時間。
咱們的第一位方案提出者A,完成需求花費時間大約爲5110毫秒。
接下來,B很是的志得意滿,對A的結果嗤之以鼻。他說,這種狀況徹底可使用java 8的並行流,A吶,既然學過java 8,不會不知道這東西吧,只須要修改你的stream方法爲parallelStream就能夠獲得意想不到的結果了:
List<String> list = shops.parallelStream().map(shop -> { return String.format("%s 的對應的價格爲 %.2f", shop.getShopName(), shop.getPrice(product)); }).collect(Collectors.toList()); /** * [shop1 的對應的價格爲 176.59, shop2 的對應的價格爲 184.50, shop3 的對應的價格爲 141.36, shop4 的對應的價格爲 143.61, shop5 的對應的價格爲 131.53] * === 執行效果 === * 該方式共花費時間(毫秒): 2098 */
取得了不錯的效果,只花費了2098毫秒。看來B並非裝腔做勢,由於對5個商店的查詢採起了並行操做,足足比以前的順序流快了不止一倍的時間。
咱們的第二位方案提出者B,花費時間爲2098毫秒。
因爲價格是隨機生成的,所以顯然和以前的例子價格會有所不一樣。但這不是重點。
問題來了,咱們還能夠作得更好嗎?C會帶來什麼樣的解決方案了,咱們下一節拭目以待。
C很是同意並行的處理方式,不過他以爲,B的這種方式有點太過極端,並且應對一些比較特殊的狀況沒法優雅的處理,甚至沒法知足需求。
咱們還能夠作得更好。可使用CompletableFuture,再進一步的對查詢方法進行優化。
咱們不妨看看C的源碼,看看是否能取得更好的成績。
List<String> list = shops.stream().map(shop -> { return CompletableFuture.supplyAsync(() -> { return String.format("%s 的對應的價格爲 %.2f", shop.getShopName(), shop.getPrice(product)); }); }).map(CompletableFuture::join).collect(Collectors.toList()); System.out.println(list); // 該方式共花費時間(毫秒): 5095
join和get的效果基本一致,惟一的區別在於,使用join方法不會拋出任何檢測到的異常,所以咱們沒必要在使用try/catch語句。
5秒了,但C立馬解釋說,之因此花費這麼多時間是因爲stream的延遲特性致使的,所以,須要使用兩個不一樣的流水線,而不是一個,一個流水線會致使發現不一樣商家的請求只能以同步、順序執行的方式纔會成功。所以,每一個建立CompletableFuture對象只能在前一個操做結束以後執行查詢指定商家的動做、通知join方法返回計算結果。
爲了不這種狀況的發生,咱們應該使用兩個不一樣的map流水線。以下所示:
List<CompletableFuture<String>> futures = shops.stream().map(shop -> { return CompletableFuture.supplyAsync(() -> { return String.format("%s 的對應的價格爲 %.2f", shop.getShopName(), shop.getPrice(product)); }); }).collect(Collectors.toList()); List<String> list = futures.stream().map(CompletableFuture::join).collect(Collectors.toList()); // === 執行效果 === // 該方式共花費時間(毫秒): 2133
2133毫秒,期待着獲得不錯成績的咱們以爲有些失望。這意味着C的解決方案也不見得比B好到哪裏去,甚至顯得有些昂長。
C真的不比B好嗎?C的作法不少餘嗎?咱們接着往下看。
咱們知道,並行處理任務數,通常是和CPU的核數對等的,例如,我當前的這臺電腦的核數是4
System.out.print("CPU核數:" + Runtime.getRuntime().availableProcessors()); // CPU個數:4
這也就是意味着,從1開始計數,每當個人商店數增長4個,那麼針對上述的方案B和C,執行時間都會增長一秒。由於B和C內部採用的都是一樣的通用線程池,默認都是用固定數目的線程,具體數目取決於上述測試代碼,即Runtime.getRuntime().availableProcessors()
的返回值。然而CompletableFuture
具備必定的優點,由於他容許你對執行器(Executor)進行配置,尤爲是線程池的大小,讓他以更適合應用需求的方式進行配置,知足程序的要求,這些,咱們的並行流是沒法提供的。
前面提到,咱們能夠建立一個配有線程池的執行器,線程池中線程的數目取決於你預計你的應用須要處理的符合,可是你該如何選擇合適的線程數目呢?
線程數目的多少是很是講究的?設置的太小,如咱們以前所面臨的問題同樣,沒法充分的利用處理器的性能,若是業務比較頻繁,還會致使過多的任務面臨排隊過長的情況;相反,設置的過大,會致使他們競爭稀缺的處理器和內存資源,浪費大量的時間用在上下文的切換上。
估算線程池大小的公式 $$ N = N_{cpu} * U_{cpu} * (1 + \frac{W}{C}) $$ 其中:
針對該公式,顯然咱們的計算時間W/C約爲100倍,假設須要CPU利用率爲100%,咱們顯然要建立包含400個線程的線程池。
實際操做中,其實咱們只需商店數目的線程數就能夠了。
不過,爲了不商店數目過多,咱們能夠給其設定一個上限值100。以下所示:
final ExecutorService executorService = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); // 設置爲守護進程 t.setDaemon(true); return t; } }); List<CompletableFuture<String>> futures = shops.stream().map(shop -> { // public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) return CompletableFuture.supplyAsync(() -> { return String.format("%s 的對應的價格爲 %.2f", shop.getShopName(), shop.getPrice(product)); },executorService); }).collect(Collectors.toList());
也就是說,咱們將執行器做爲第二個參數傳遞給了supplyAsync方法。
改進以後,看一下時間的花費
=== 執行效果 === 該方式共花費時間(毫秒): 1091
只需1091毫秒。
目前爲止,咱們知道了對集合進行並行計算的兩種方式:
轉化爲並行流,利用map展開工做;
枚舉集合的每個元素,建立新的線程,在CompletableFuture內對其進行操做。
後者提供了更好的靈活性,支持咱們調整線程池的大小,也就是覺得這,確保總體的計算不會由於線程都在等待I/P而發生阻塞。
那麼,在實際狀況中該選擇哪種呢?建議以下:
若是進行的是計算密集型的操做,而且沒有I/O,那麼推薦使用stream接口。由於實現簡單,同時效率也是最高的。
若是並行的工做單元還涉及到等到I/O的操做,例如網絡鏈接等,那麼使用CompletableFuture更好,你能夠像咱們討論的那樣,依據等待/計算,或者W/C的比率設定須要使用的線程數。這種狀況下不適用並行流的緣由還有一個,那就是處理劉的流水線中若是發生I/O等待,流的延遲特性會讓咱們很難判斷到底何時觸發了等待。
接下來咱們定義一個折扣服務,該折扣服務提供了五個不一樣的折扣代碼,每一個折扣代碼對應不一樣的折扣率。
具體代碼以下所示:
package ch12; public class Discount { public enum Code{ NONE(0),SILVER(5),GOLD(10),PLATINUM(15),DIAMOND(20); private final int percentage; Code(int percentage){ this.percentage = percentage; } } // other code... }
接下來,咱們修改以前的getPrice方法,將其返回的價格形式改變
public String getPrice(String product){ double price = calculatePrice(product); // 從五中折扣中隨即選取一種 Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)]; // shopName:price:discountCode return String.format("%s:%.2f:%s",shopName,price,code); }
咱們從商店的getPrice方法中能夠得到shopName:price:discountCode
,接下來咱們須要知道具體的折扣價格,所以編寫一個類來進行解析:
package ch12; public class Quote { private final String shopName; private final double price; private final Discount.Code discountCode; public Quote(String shopName, double price, Discount.Code discountCode) { this.shopName = shopName; this.price = price; this.discountCode = discountCode; } /** * 解析方法 * @param str shopName:price:discountCode * @return */ public static Quote parse(String str){ String[] tokens = str.split(":"); return new Quote(tokens[0],Double.parseDouble(tokens[1]),Discount.Code.valueOf(tokens[2])); } public String getShopName() { return shopName; } public double getPrice() { return price; } public Discount.Code getDiscountCode() { return discountCode; } }
另外,咱們還在Discount類中添加了一個方法,他接收一個Quote對象,返回一個字符串,表示生成該Quote的商店中的折扣價格:
/** * 將折扣應用於商品最初的原始價格 * @param q * @return */ public static String applyDiscount(Quote q){ return q.getShopName() + "的價格是:" + Discount.apply(q.getPrice(),q.getDiscountCode()); } /** * 獲取折扣以後的價格,模擬了延遲 * @param price * @param code * @return */ private static double apply(double price, Code code){ delay(); return price * (100 - code.percentage)/100; } /** * 模擬延遲操做 */ public static void delay(){ try { Thread.sleep(1000); } catch (InterruptedException e) { System.out.println("sleep now thread error:" + e.getMessage()); e.printStackTrace(); } }
因爲Discount也是一種遠程服務,所以咱們在以前他的時候模擬了一秒鐘的延遲時間。接下來咱們使用和以前同樣的幾種方式來使用該服務。
一、同步執行:
package ch12; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; public class Test1 { List<Shop> shops = Arrays.asList(new Shop("shop1"), new Shop("shop2"), new Shop("shop3"), new Shop("shop4")); public static void main(String[] args) { long start = System.nanoTime(); System.out.println(new Test1().getPrices("sss")); System.out.println("花費時間爲(毫秒): " + (System.nanoTime() - start) / 1_000_000); /** * [shop1的價格是:190.06850000000003, shop2的價格是:218.77, shop3的價格是:134.43, shop4的價格是:202.89149999999998] * 花費時間爲(毫秒): 8174 */ } public List<String> getPrices(String product){ return shops.stream().map(shop -> shop.getPrice(product)) .map(Quote::parse) .map(Discount::applyDiscount) .collect(Collectors.toList()); } }
差很少8秒的時間,在預料以內。
二、使用並行流優化
很明顯,當前4個商店,咱們的電腦擁有4個核,所以,並行流操做花費的時間應該是1+1=2S左右,咱們修改代碼以下:
public static void main(String[] args) { long start = System.nanoTime(); System.out.println(new Test2().getPrices("sss")); System.out.println("花費時間爲(毫秒): " + (System.nanoTime() - start) / 1_000_000); /** * [shop1的價格是:206.37, shop2的價格是:224.38, shop3的價格是:160.1315, shop4的價格是:178.02900000000002] * 花費時間爲(毫秒): 2105 */ }
和預估的差很少。但一樣的,並行流的方式沒法控制線程數量,所以會在商店數目上升以後變得力不從心,也就是擴展性並非很好。
接下來使用Completable提供的特性改造代碼。
public List<String> getPrices(String product){ ExecutorService executor = Executors.newFixedThreadPool(shops.size(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } }); List<CompletableFuture<String>> priceFuture = shops.stream() // 以異步方式取得每一個shop中指定產品的shopName:xxx .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product),executor)) // 而後解析返回的串 .map(f -> f.thenApply(Quote::parse)) // 使用另外一個異步任務構造指望的future,申請折扣 .map(f -> f.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote),executor))) .collect(Collectors.toList()); // 等待流中的全部Future執行完畢,並提取各自的返回值 return priceFuture.stream().map(CompletableFuture::join).collect(Collectors.toList()); /** * [shop1的價格是:111.682, shop2的價格是:164.6875, shop3的價格是:97.20800000000001, shop4的價格是:147.77] * 花費時間爲(毫秒): 2115 */ }
須要注意裏面出現的兩個新方法,thenApply以及thenCompose:
thenApply()
是返回的是CompletableFuture
類型:它的功能至關於將CompletableFuture<T>
轉換成CompletableFuture<U>
。
thenCompose()
用來鏈接兩個CompletableFuture
,返回值是新的CompletableFuture
。
也就是說,thenApply() 轉換的是泛型中的類型,是同一個CompletableFuture; thenCompose()用來鏈接兩個CompletableFuture,是生成一個新的CompletableFuture。也就是說,在thenCompose()
中通常會設計到CompletableFuture的建立代碼。新生成的CompletableFuture使用先前的CompletableFuture做爲輸入。
更常見的狀況是將兩個徹底不相干的CompletableFuture對象的結果整合起來(thenCombine)。
public <U,V> CompletableFuture<V> thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
例如,若是某個商店返回的價格是以歐元爲單位的,咱們的需求是獲得美圓的結果,那麼,咱們同時還須要去一個提供匯率的服務獲取美圓和歐元之間的匯率,最後,將這兩個結果進行組合(thenCombine),從而獲得最終的以美圓爲單位的結果。
package ch11; import ch12.ExchangeService; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class CFutureTest5 { public static void main(String[] args) { Shop shop = new Shop("KFC"); // 得到任意時間(納秒) long start = System.nanoTime(); CompletableFuture<Double> future = CompletableFuture .supplyAsync(() -> shop.getPrice("sss")) // 組合獲取匯率的調用結果,第二個參數是 BiFunction類型,結合二者的結果,最終返回最後的答案。 .thenCombine(CompletableFuture.supplyAsync(() -> ExchangeService.getRate("usa", "europe")), (price, rate) -> price * rate); try { System.out.printf("結果爲:%.2f\r\n" ,future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("得到異步任務結果花費了" + (System.nanoTime() - start)/1_000_000 + "毫秒的時間"); } }
爲了演示,咱們添加了一個計算匯率的簡便方法
package ch12; public class ExchangeService { public static double getRate(String firstCountry, String secondContry){ // 1歐元=1.1233美圓 // 其實這裏應該模擬一下延遲的:delay() return 1.1233; } }
固然,你也可使用theCombineAsync方法來啓動一個新的線程執行結果的整合操做,可是這個計算顯然很快速,所以沒有必要在開啓一個新的線程。
如今咱們已經能夠說,CompletableFuture相比較直接使用Java 7的Future的優點。不少時候,咱們若是採用java 7去編寫相同的案例,狀況會變得複雜不少。
可是如今還有一個小小的問題,那就是咱們調用get或者join時還說會形成線程阻塞,直到CompletableFuture完成後纔會繼續往下執行。
而接下來,咱們針對這個問題,來學習一下CompletableFuture的completion事件。
所有完成
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
一個完成便可
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
thenAccept:在返回結果上應用一個回調操做,也就是說,該方法定義的操做會在CompletableFuture獲得計算結果以後執行。
CompletableFuture<Void> thenAccept(Consumer<? super T> action)
因爲thenAccept已經定義了對返回結果的操做,一旦計算到結果,thenAccept返回的就是一個CompletableFuture<Void>
類型的結果,這對於咱們沒有什麼做用。
CompletableFuture[] completableFutures = shops.stream() .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s 的對應的價格爲 %.2f", shop.getShopName(), shop.getPrice(product)), executorService)) .map(f -> f.thenAccept(s -> System.out.println(s))) .toArray(size -> new CompletableFuture[size]); System.out.println("==="); CompletableFuture.anyOf(completableFutures).join(); /** * === * shop5 的對應的價格爲 186.41 */
注意delay的時間設置成爲了一個隨機的值。這樣咱們使用anyOf以後,則該計算會在第一個完成的任務以後直接再也不等待,往下執行。
這一章節,咱們學習到的內容以下:
一、執行比較耗時的操做時,尤爲是那些依賴一個或多個遠程服務的操做,使用異步任務能夠改善程序的性能,加快程序的響應速度;
二、你應該儘量的爲客戶提供異步API,使用CompletableFuture類提供的特性,可以輕鬆地實現這一目標;
三、CompetableFuture類還提供了異常管理的機制,讓咱們有機會拋出或者管理異步任務執行時產生的異常;
四、將同步api的調用封裝到一個CompletableFuture中,你可以以異步的方式使用其結果;
五、若是異步任務之間相互獨立,或者他們之間某一些的結果是另外一些的輸入,你能夠將這些異步任務構造或者合併成一個;
六、你能夠爲CompletableFuture註冊一個回調函數,在Future執行完畢或者他們計算的結果可用時,針對性的執行一些程序;
七、你能夠決定在何時結束程序的運行,是等待由CompletableFuture對象構成的列表中全部的對象都執行完畢,仍是隻要其中任何一個首先完成就終止程序的運行。