隨着多核處理器的出現,如何輕鬆高效的進行異步編程變得愈發重要,咱們看看在java8以前,使用java語言完成異步編程有哪些方案。javascript
@async
得到異步編程支持實現方式可謂是多種多樣,這裏咱們使用線程池和future來實現異步編程,藉着這個例子來說述java8的組合式異步編程有着怎樣的優點css
//構造線程池 ExecutorService pool = Executors.newCachedThreadPool(); try { //構造future結果,doSomethingA十分耗時,所以採用異步 Future<Integer> future = pool.submit(() -> doSomethingA()); //作一些別的事情 doSomethingB(); //從future中得到結果,設置超時時間,超過了就拋異常 Integer result = future.get(10, TimeUnit.SECONDS); //打印結果 System.out.printf("the async result is : %d", result); //異常捕獲 } catch (InterruptedException e) { System.out.println("任務計算拋出了一個異常!"); } catch (ExecutionException e) { System.out.println("線程在等待的過程當中被中斷了!"); } catch (TimeoutException e) { System.out.println("future對象等待時間超時了!"); } }
然而這樣的異步編程方式僅僅能知足基本的須要,稍微複雜的一些異步處理Future接口彷佛就有點一籌莫展了,例如html
這種感受其實就很像沒有stream以前的collections的操做感受同樣,一樣的,對於future,java8提供了它的函數式升級版本CompletableFuture,從名字就能夠看出來這絕對是future的升級版。java
事物的發展每每都是由簡單->複雜->簡單,這裏咱們一樣遵循這樣的規律,按部就班。
下面的例子摘取《java8實戰》
的異步編程章節,並作了簡化。
咱們假設如今咱們有一項查詢商品價格的服務十分耗時,因此毫無例外的咱們想讓查詢最佳價格的服務以異步的形式執行。
最直接的方式是直接構建一個異步查詢商品價格的api,而且返回,爲了演示須要,編寫一個線程等待一秒的方法來模擬長時間的請求。spring
public double getPrice(String product) { return calculatePrice(product); } private double calculatePrice(String product) { delay(); return random.nextDouble() * product.charAt(0) + product.charAt(1); } public static void delay() { try { Thread.sleep(1000L); } catch (InterruptedException e) { throw new RuntimeException(e); } }
如今getPrice這方法是一個同步的方法,該方法在通過1秒的延遲以後會返回給咱們一個商品的價格(這裏只是簡單的根據名字構造了一個隨機數)編程
咱們使用completFuture將getPrice
轉化爲異步方法,以下bootstrap
public Future<Double> getAsyncPrice(String product) { CompletableFuture<Double> futurePrice = new CompletableFuture<>(); new Thread(() -> { double price = calculatePrice(product); futurePrice.complete(price); }).start();; return futurePrice; }
這裏構造一個completableFuture對象,並另起一個異步線程,將異步計算的結果使用futurePrice.complete來接受,無需等待直接返回future結果
調用類使用Integer result = future.get(10, TimeUnit.SECONDS)
來接受返回的結果,若是等待超時則拋出異常。
另外,若是異步線程發生異常,而且在排查問題的時候想要知道具體是什麼緣由致使的,
能夠在getAsyncPrice
方法中使用completeExcepitonally來獲得異常信息而且結束此次異步任務,代碼以下api
public Future<Double> getPriceAsync(String product) { public Future<Double> getPriceAsync(String product) { CompletableFuture<Double> futurePrice = new CompletableFuture<>(); new Thread(() -> { try { double price = calculatePrice(product); futurePrice.complete(price); } catch (Exception ex) { futurePrice.completeExceptionally(ex); } }).start(); return futurePrice; }
這樣,基本的功能就實現了。數組
也許你看到上面的代碼,會說:"我暈,你這寫法比原來還複雜哦,並且我也沒看出啥區別啊。",是的,上文的寫法能夠算是原生態的寫法了,目的爲爲下面的知識作一個簡單的鋪墊。
事實上,CompleteFuture自己提供了大量的工廠方法來供咱們十分方便的實現一個異步編程,他封裝了前篇一概的異常與結果接收,你只須要編寫真正的異步邏輯部分就能夠了,同時借住於lambda表達式,能夠更進一步。服務器
supplyAsync 方法接受一個生產者(Supplier)做爲參數,返回一個 CompletableFuture
對象, 該對象完成異步執行後會讀取調用生產者方法的返回值。 生產者方法會交由 ForkJoinPool
池中的某個執行線程(Executor)運行,可是你也可使用 supplyAsync 方法的重載版本,傳
遞第二個參數指定不一樣的執行線程執行生產者方法。
因而上文的例子能夠改寫以下
public Future<Double> getPriceAsync(String product) { return CompletableFuture.supplyAsync(() -> calculatePrice(product)); }
是否是簡潔了許多呢?
可如今還有問題,這裏咱們成功的編寫了一個十分簡潔的異步方法,可實際的狀況中,咱們所能調用的API大部分都是同步的,所以下面將介紹如何使用異步的方法去操做這些同步API。
咱們如今有這麼一個需求,它接受產品名做爲參數,返回一個字符串列表,
這個字符串列表中包括商店的名稱、該商店中指定商品的價格,商店集合以及接口設計以下。
List<Shop> shops = Arrays.asList(new Shop("BestPrice"), new Shop("LetsSaveBig"), new Shop("MyFavoriteShop"), new Shop("BuyItAll")); public List<String> findPrices(String product);
這樣的集合變換使用stream流來操做十分容易,代碼以下
public List<String> findPrices(String product) { return shops.stream() .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))) .collect(toList()); }
stream流將 shop映射爲了shop的名稱以及該shop中商品的價格的字符串,並使用收集器進行收集。
事實上,咱們徹底可使用流將shop映射成CompletableFuture對象,就好像在操做集合同樣,代碼以下
List<CompletableFuture<String>> priceFutures = shops.stream() .map(shop -> CompletableFuture .supplyAsync(() -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))) .collect(toList());
使用這種方式,你會獲得一個,List<CompletableFuture
爲了實現這個效果,你能夠向最初的 List<CompletableFuture
public List<String> findPrices(String product) { List<CompletableFuture<String>> priceFutures = shops.stream() .map( shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPrice(product))) .collect(Collectors.toList()); return priceFutures.stream() .map(CompletableFuture::join) .collect(toList()); }
以上的代碼你可能會疑惑,爲何不直接按照shop->completableFuture->join->collect
的方式進行流處理呢?那是由於join這一步自己是阻塞的,對於流操做來講,前一個shop沒有處理完以前,是不會處理下一個shop的,因此對於每個shop,處理到join這一步的時候就會阻塞住等待1秒,這樣的話,這個流水線自己就會變回阻塞的了。
而上文的編寫方法能夠看出 shop->completableFuture->collect
這個操做自己是非阻塞的,順利的將全部的請求都發出去了,隨後再使用join來完成結果的收集。
前面在介紹工廠方法時提到,能夠選擇第二參數放入一個線程池來進行管控。
private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() { public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } });
接着在supplyAsync中使用該線程池便可,代碼以下
CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPrice(product), executor);
既然咱們已經將異步操做與流相結合了,所以很容易的就會想到對於異步流來講,應該有會有相似於集合流的一些很是好用的API吧?事實上,JAVA8的確爲咱們提供了這些API。
如同集合流操做同樣,異步流也能夠提早安排一系列的任務,而後讓異步任務有條不紊的按照這個順序去執行。
future.thenApply(Function)
來實現,該方法接受一個Function對象stream() .map(xxx->supplayAsync(()->任務A)) //這一步已經異步的映射成了任務A .map(future->future.thenApply(任務B)//執行同步的任務B .collect
future.thenCompose(Function)
stream() .map(xxx->supplayAsync(()->任務A)) //這一步已經異步的映射成了任務A .map(future->future.thenApply(任務B)//執行同步的任務B .map(future->future.thenCompose(任務C))//再異步執行任務C .collect
使用thenCombine來完成,相似任務A與任務B,A是查詢價格,B是查詢匯率,這兩個任務之間自己沒有關聯關係,因此能夠同時發起,但你最後須要計算價格乘以匯率,所以在這兩個任務完成以後須要對他們的結果進行合併,代碼以下
Future<Double> futurePriceInUSD = CompletableFuture.supplyAsync(() -> shop.getPrice(product))//任務A .thenCombine( CompletableFuture.supplyAsync(() -> exchangeService.getRate(Money.EUR, Money.USD)), //任務B (price, rate) -> price * rate); //任務A與任務B的合併操做
注意這裏的任務A與任務B是異步的,但他們的合併操做是同步的,若是想要合併操做也是異步的,使用future.thenCombineAsync的異步方法版本。
使用thenAccept(Consumer)
以上都是對結果進行一些映射,你如今要對結果只進行處理,說白了就是前面的都是Function,如今要換成consumer,而且參數再也不是異步任務,而是任務的結果值,舉個例子,上文的任務A(異步)->任務B(同步)->任務C(異步) 的操做如今想到對他們的操做結果進行打印
就可使用thenAccept(Consumer)
stream() .map(xxx->supplayAsync(()->任務A)) //這一步已經異步的映射成了任務A .map(future->future.thenApply(任務B)//執行同步的任務B .map(future->future.thenCompose(任務C))//再異步執行任務C .map(future->future.thenAccept(System.out::println))//將結果打印 .collect
須要注意的是在執行了Accpet方法以後,你獲得的是一個 CompletableFuture
你能夠對這些流對象進行相似及早求值的操做,例如這條查詢4個商家的價格服務只要有一個給出了返回結果就結束此次異步流。
CompletableFuture[] futures = findPricesStream("myPhone") .map(f -> f.thenAccept(System.out::println)) .toArray(size -> new CompletableFuture[size]); CompletableFuture.anyOf(futures).join();
allOf 工廠方法接收一個由 CompletableFuture 構成的數組,數組中的全部 Completable-Future 對象執行完成以後,它返回一個 CompletableFuture