隨着多核處理器的出現,提高應用程序的處理速度最有效的方式就是能夠編寫出發揮多核能力的軟件,咱們已經能夠經過切分大型的任務,讓每一個子任務並行運行,使用線程的方式,分支/合併框架(java 7) 和並行流(java 8)來實現。java
如今不少大型的互聯網公司都對外提供了API服務,好比百度的地圖,微博的新聞,天氣預報等等。不多有網站或網絡應用匯以徹底隔離的方式工做,而是採用混聚的方式:它會使用來自多個源的內容,將這些內容聚合在一塊兒,方便用戶使用。數據庫
好比實現一個功能,你須要在微博中搜索某個新聞,而後根據當前座標獲取天氣預報。這些調用第三方信息的時候,不想由於等待搜索新聞時,放棄對獲取天氣預報的處理,因而咱們可使用 分支/合併框架 及並行流 來並行處理,將他們切分爲多個子操做,在多個不一樣的核、CPU甚至是機器上並行的執行這些子操做。數組
相反,若是你想實現併發,而不是並行,或者你的主要目標是在同一個CPU上執行幾個鬆耦合的任務,充分利用CPU的核,讓其足夠忙碌,從而最大化程序的吞吐量,那麼你其實真正想作的是避免由於等待遠程服務的返回,或者對數據庫的查詢,而阻塞線程的執行,浪費寶貴的計算資源,由於這種等待時間可能會很長。Future接口,尤爲是它的新版實現CompletableFuture是處理這種狀況的利器。網絡
Future接口多線程
Future接口在java 5中被引入,設計初衷是對未來某個時刻會發生的結果進行建模。它建模了一種異步計算,返回一個執行運算結果的引用,當運算結束後,這個引用被返回給調用方。在Future中觸發那些可能會耗時的操做把調用線程解放出來,讓它能繼續執行其餘工做,不用一直等待耗時的操做完成,好比:你拿了一袋子衣服到洗衣店去洗衣服,洗衣店會給你張發票,告訴你何時會洗好,而後你就能夠去作其餘的事了。Future的另外一個優勢是它比更底層的Thread更容易使用。使用Future只須要講耗時的操做封裝在一個Callable對象中,再將它提交給ExecutorService就能夠了。 Java 8以前使用Future的例子:併發
public static void main(String[] args) { //建立Executor-Service,經過他能夠向線程池提交任務 ExecutorService executor = Executors.newCachedThreadPool(); //向executor-Service提交 Callable對象 Future<Double> future = executor.submit(new Callable<Double>() { @Override public Double call() throws Exception { //異步的方式執行耗時的操做 return doSomeLongComputation(); } }); //異步時,作其餘的事情 doSomethingElse(); try{ //獲取異步操做的結果,若是被阻塞,沒法獲得結果,那麼最多等待1秒鐘以後退出 Double result = future.get(1, TimeUnit.SECONDS); System.out.print(result); } catch (InterruptedException e) { System.out.print("計算拋出一個異常"); } catch (ExecutionException e) { System.out.print("當前線程在等待過程當中被中斷"); } catch (TimeoutException e) { System.out.print("future對象完成以前已過時"); } } public static Double doSomeLongComputation() throws InterruptedException { Thread.sleep(1000); return 3 + 4.5; } public static void doSomethingElse(){ System.out.print("else"); }
這種方式能夠再ExecutorService以併發的方式調用另一個線程執行耗時的操做的同時,去執行一些其餘任務。接着到已經沒有任務運行時,調用它的get方法來獲取操做的結果,若是操做完成,就會返回結果,不然會阻塞你的線程,一直到操做完成,返回響應的結果。app
CompletableFuture框架
在java 8 中引入了CompletableFuture類,它實現了Future接口,使用了Lambda表達式以及流水線的思想,經過下面這個例子進行學習,好比:咱們要作一個商品查詢,根據折扣來獲取價格。dom
public class Shop { public double getPrice(String product) throws InterruptedException { //查詢商品的數據庫,或連接其餘外部服務獲取折扣 Thread.sleep(1000); return new Random().nextDouble() * product.charAt(0) + product.charAt(1); } }
當調用這個方法時,它會阻塞進程,等待事件完成。異步
將同步方法轉換成異步方法
public Future<Double> getPriceAsync(String product){ //建立CompletableFuture對象 CompletableFuture<Double> futurePrice = new CompletableFuture<>(); new Thread (()->{ try { //在另外一個線程中執行計算 double price = getPrice(product); //須要長時間計算的任務結束並得出結果時,設置future的返回值 futurePrice.complete(price); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); return futurePrice; }
而後能夠這樣調用:
System.out.println("begin"); Future<Double> futurePrice = shop.getPriceAsync("ss"); System.out.println("doSomething"); System.out.println(futurePrice.get());
System.out.println("end");
begin
doSomething
171.47509091822835
end
這個例子中,首先會調用接口 當即返回一個Future對象,在這種方式下,在查詢價格的同時,還能夠處理其餘任務。最後全部的工做都已經完成,而後再調用future的get方法。得到Future中封裝的值,要麼發生阻塞,直到該任務異步任務完成,指望的值可以返回。
錯誤處理
若是沒有意外,這個代碼工做的會很是正常。可是若是計算價格的過程當中發生了錯誤,那麼get會永久的被阻塞。這時可使用重載的get方法,讓它超過一個時間後就強制返回。應該儘可能在代碼中使用這種方式來防止程序永久的等待下去。超時會引起TimeoutException。可是這樣會致使你沒法知道具體什麼緣由致使Future沒法返回,這時須要使用CompletableFUture的completeExceptionally方法將致使CompletableFuture內發生的問題拋出。
public Future<Double> getPriceAsync(String product){ //建立CompletableFuture對象 CompletableFuture<Double> futurePrice = new CompletableFuture<>(); new Thread (()->{ try { double price = getPrice(product); futurePrice.complete(price); } catch (Exception ex) { //拋出異常 futurePrice.completeExceptionally(ex); } }).start(); return futurePrice; }
調用時:
System.out.println("begin"); Future<Double> futurePrice = shop.getPriceAsync("ss"); System.out.println("doSomething"); try { System.out.println(futurePrice.get(1, TimeUnit.SECONDS)); } catch (TimeoutException e) { System.out.print(e); } System.out.println("end");
設置超時時間,而後會將錯誤信息打印出來。
工廠方法supplyAsync建立CompletableFuture
使用工廠方法能夠一句話來建立getPriceAsync方法
public Future<Double> getPriceAsync(String product) { return CompletableFuture.supplyAsync(() -> getPrice(product)); }
supplyAsync方法接受一個生產者(Supplier)做爲參數,返回一個CompletableFuture對象,該對象完成異步執行後悔讀取調用生產者方法的返回值。生產者方法會交由ForkJoinPool池中的某個執行線程(Executor)運行,也能夠調用supplyAsync方法的重載版本,傳入第二個參數指定不一樣的線程執行生產者方法。 工廠方法返回的CompletableFuture對象也提供了一樣的錯誤處理機制。
阻塞優化
例如如今有一個商品列表,而後輸出一個字符串 商品名,價格 。
List<Shop> shops = Arrays.asList( new Shop("one"), new Shop("two"), new Shop("three"), new Shop("four")); long start = System.nanoTime(); List<String> str = shops.stream().map(shop -> String.format("%s price: %.2f", shop.getName(), shop.getPrice(shop.getName()))).collect(toList()); System.out.print(str); long end = System.nanoTime(); System.out.print((end - start) / 1000000);
[one price: 161.83, two price: 126.04, three price: 153.20, four price: 166.06]
4110
每次調用getPrice方法都會阻塞1秒鐘,對付這種咱們可使用並行流來進行優化:
List<String> str = shops.parallelStream().map(shop -> String.format("%s price: %.2f", shop.getName(), shop.getPrice(shop.getName()))).collect(toList());
1137
明顯速度提高了,如今對四個商品查詢 實現了並行,因此只耗時1秒多點,下面咱們嘗試CompletableFuture:
List<CompletableFuture<String>> str2 = shops.stream().map(shop->
CompletableFuture.supplyAsync(
()->String.format("%s price: %.2f", shop.getName(), shop.getPrice(shop.getName())))).collect(toList());
咱們使用工廠方法supplyAsync建立CompletableFuture對象,使用這種方式咱們會獲得一個List<CompletableFuture<String>>,列表中的每個ComplatableFuture對象在計算完成後都會包含商品的名稱。可是咱們要求返回的是List<String>,因此須要等待全部的future執行完畢,再將裏面的值提取出來,填充到列表中才能返回。
List<String> str3 =str2.stream().map(CompletableFuture::join).collect(toList());
爲了返回List<String> 須要對str2添加第二個map操做,對List中的全部future對象執行join操做,一個接一個的等待他們的運行結束。CompletableFuture類中的join和Future接口中的get方法有相同的含義,而且聲明在Future接口中,惟一的不一樣是join不會拋出任何檢測到的異常。
1149
如今使用了兩個不一樣的Stream流水線,而不是在同一個處理流的流水線上一個接一個的防治兩個map操做。考慮流操做之間的延遲特性,若是你在單一流水線中處理流,發向不一樣商家的請求只能以同步、順序執行的方式纔會成功。所以每一個建立CompletableFuture對象只能在前一個操做結束以後,再join返回計算結果。
更好的解決方式
並行流的版本工做的很是好,那是由於他能夠並行處理8個任務,獲取操做系統線程數量:
System.out.print(Runtime.getRuntime().availableProcessors());
可是若是列表是9個呢?那麼執行結果就會2秒。由於他最多隻能讓8個線程處於繁忙狀態。 可是使用CompletableFuture容許你對執行器Executor進行配置,尤爲是線程池的大小,這是並行流API沒法實現的。
定製執行器
//建立一個線程池,線程池的數目爲100何商店數目兩者中較小的一個值 final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); //使用守護線程 ---這種方式不會阻止程序的關停 return t; } });
這個線程池是一個由守護線程構成的線程池,Java程序沒法終止或退出正在運行中的線程,因此最後剩下的那個線程會因爲一直等待沒法發生的事件而引起問題。與此相反,若是將線程標記爲守護進程,意味着程序退出時它也會被回收。這兩者之間沒有性能上的差別。如今能夠將執行器做爲第二個參數傳遞給supplyAsync方法了。
CompletableFuture.supplyAsync( ()->String.format("%s price: %.2f", shop.getName(), shop.getPrice(shop.getName())) ,executor)
這時,執行9個商品時,執行速度只有1秒。 執行18個商品時也是1秒。這種狀態會一直持續,直到商店的數目達到咱們以前計算的閥值。 處理須要大量使用異步操做的狀況時,這幾乎是最有效的策略。
對多個異步任務進行流水線操做
咱們在商品中增長一個枚舉Discount.Code 來表明每一個商品對應不一樣的折扣率,建立枚舉以下:
public class Discount { public enum Code{ NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20); private final int value; Code(int value){ this.value = value; } } }
如今咱們修改 getPrice方法的返回格式爲:ShopName:price:DiscountCode 使用 : 進行分割的返回值。
public String getPrice(String product){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Double price = new Random().nextDouble() * product.charAt(0) + product.charAt(1); Discount.Code code = Discount.Code.values()[new Random().nextInt(Discount.Code.values().length)]; return String.format("%s:%.2f:%s",name,price,code); }
返回值: one:120.10:GOLDD
將返回結果封裝到 Quote 類中:
public class Quote { private final String shopName; private final double price; private final Discount.Code discountCode; public Quote(String shopName, double price, Discount.Code code) { this.shopName = shopName; this.price = price; this.discountCode = code; } public static Quote parse(String s) { String[] split = s.split(":"); String shopName = split[0]; double price = Double.parseDouble(split[1]); Discount.Code discountCode = Discount.Code.valueOf(split[2]); return new Quote(shopName, price, discountCode); } public String getShopName() { return shopName; } public double getPrice() { return price; } public Discount.Code getDiscountCode() { return discountCode; } }
parse方法 經過getPrice的方法 返回的字符串 會返回Quote對象,此外 Discount服務還提供了一個applyDiscount方法,它接收一個Quote對象,返回一個字符串,表示該Quote的shop中的折扣價格:
public class Discount { public enum Code{.. } public static String applyDiscount(Quote quote){ return quote.getShopName() + "price :" + Discount.apply(quote.getPrice() ,quote.getDiscountCode()); } public static double apply(double price,Code code){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return price * (100 - code.value) / 100; } }
Discount中 也模擬了遠程操做 睡了1秒鐘,首先咱們嘗試最直接的方式:
List<String> str = shops.stream() .map(shop->shop.getPrice("hhhhh")) //獲取 one:120.10:GOLDD 格式字符串 .map(Quote::parse) //轉換爲 Quote 對象 .map(Discount::applyDiscount) //返回 Quote的shop中的折扣價格 .collect(toList()); System.out.print(str);
8146
首先,咱們調用getPrice遠程方法將shop對象轉換成了一個字符串。每一個1秒
而後,咱們將字符串轉換爲Quote對象。
最後,咱們將Quote對象 調用 遠程 Discount服務獲取折扣,返回折扣價格。每一個1秒
順序執行4個商品是4秒,而後又調用了Discount服務又4秒 因此是8秒。 雖然咱們如今把流轉換爲並行流 性能會很好 可是數量大於8時也很慢。相反,使用自定義CompletableFuture執行器可以更充分的利用CPU資源。
List<CompletableFuture<String>> priceFutures = shops.stream() //異步獲取每一個shop中的價格 .map(shop -> CompletableFuture.supplyAsync( () -> shop.getPrice("hhhhh", executor) )) //Quote對象存在時,對其返回值進行轉換 .map(future -> future.thenApply(Quote::parse)) //使用另外一個異步任務構造指望的future,申請折扣 .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync( () -> Discount.applyDiscount(quote), executor) )) .collect(toList()); //等待流中的全部Future執行完畢,提取各自的返回值 List<String> str = priceFutures.stream().map(CompletableFuture::join).collect(toList()); System.out.print(str);
2126
使用的這三個map跟同步沒有太大的區別,可是使用了CompletableFuture類提供的特性,在須要的地方把他們變成了異步操做。
thenApply方法:當第一個Future運行結束,返回CompletableFuture<String>對象轉換爲CompleTableFuture<Quote>對象。
thenCompose方法:將兩個異步操做進行流水線,當第一個操做完成時,將其結果做爲參數傳遞給第二個操做。換句話說,你能夠建立兩個CompletableFuture對象,對第一個對象調用thenCompose,並向其傳遞一個函數。
這個方法也有Async版本:thenComposeAsync,一般帶後綴的版本是講任務移交到一個新線程,不帶後綴的在當前線程執行。對於這個例子咱們沒有加上後綴,由於對於最終結果,或者大體的時間而言都沒有多少差異,少了不少線程切換的開銷。
合併兩個CompletableFuture,不管是否依賴
與上面不一樣,第二個CompletableFuture無需等待第一個CompletableFuture運行結束。而是,將兩個徹底不相干的CompletableFuture對象整合起來,不但願等到第一個任務徹底結束纔開始第二個任務。
這種狀況應該使用thenCombine方法,它接受名爲BiFunction的第二個參數,這個參數定義了當兩個CompletableFuture對象完成計算後,結果如何合併。同thenCompose方法同樣,thenCombine方法也提供了一個Async的版本。使用thenCombineAsync會致使BiFunction中定義的合併操做被提交到線程池中,由另外一個任務以異步的方式執行。
回到這個例子,好比說咱們如今須要第三個CompletableFuture來獲取匯率,展現美圓。當前兩個CompletableFuture計算出結果,並由BiFunction方法徹底合併後,由它來最終將誒書這一任務:
Future<Double> futurePriceUSD = CompletableFuture.supplyAsync(()->shops.get(0).getPrice("gg")) .thenCombine( CompletableFuture.supplyAsync( ()-> 0.66 //遠程服務獲取 匯率 ),(price,rate) -> price * rate );
這裏 第一個參數price 是 getPrice的返回值 double , 第二個參數 rate 是第二個工廠方法返回的0.66 偷了個懶, 最後是他們的結果進行乘法操做 返回最終結果。
響應CompletableFuture的completion事件
在本章中,全部的延遲例子都是延遲1秒鐘,可是在現實世界中,有時可能更糟。到目前爲止,你所實現的方法必須等待全部的商品返回時才能現實商品的價格。而你但願的效果是,只要有商品返回商品價格就在第一時間顯示出來,不用等待那些尚未返回的商品。
CompletableFuture[] futures = shops.stream() .map(shop -> CompletableFuture.supplyAsync( () -> shop.getPrice("hhhhh", executor) )) .map(future -> future.thenApply(Quote::parse)) .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync( () -> Discount.applyDiscount(quote), executor) )) //在每一個CompletableFuture上註冊一個操做,該操做會在CompletableFuture完成後使用它的返回值。 //使用thenAccept將結果輸出,它的參數就是 CompletableFuture的返回值。 .map(f -> f.thenAccept(System.out::println)) //你能夠把構成的Stream的全部CompletableFuture<void>對象放到一個數組中,等待全部的任務執行完成 .toArray(size -> new CompletableFuture[size]); //allOf方法接受一個CompletableFuture構成的數組,數組中全部的COmpletableFuture對象執行完成後, //它返回一個COmpletableFuture<Void>對象。因此你須要哦等待最初Stream中的全部CompletableFuture對象執行完畢, //對allOf方法返回的CompletableFuture執行join操做 CompletableFuture.allOf(futures).join();
Connected to the target VM, address: '127.0.0.1:62278', transport: 'socket'
8twoprice :113.31
threeprice :108.15
oneprice :137.844
Disconnected from the target VM, address: '127.0.0.1:62278', transport: 'socket'
fourprice :119.2725
3768
還有一個方法anyOf,對於CompletableFuture對象數組中有任何一個執行完畢就不在等待時使用。
小結:
1.執行比較耗時的操做時,尤爲是那些依賴一個或多個遠程服務的操做,使用異步任務能夠改善程序的性能,加快程序的響應速度。
2.你應該儘量的爲客戶提供異步API。使用CompletableFuture類提供的特性,可以輕鬆的實現這一目標。
3.CompletableFuture類還提供了異常管理的機制,然給你有機會拋出/管理異步任務執行中發生的異常。
4.將同步API的調用封裝到一個CompletableFuture中,你可以以異步的方式使用其結果。
5.若是異步任務之間互相獨立,或者他們之間某一些的結果是另外一些的輸入,你能夠講這些異步任務合併成一個。
6.你能夠爲CompletableFuture註冊一個回調函數,在Future執行完畢或者他們計算的結果可用時,針對性的執行一些程序。
7.你能夠決定在何時將誒書程序的運行,是等待由CompletableFuture對象構成的列表中全部的對象都執行完畢,仍是隻要其中任何一個首先完成就終止程序的運行。