前言
如今大部分的CPU都是多核,咱們都知道想要提高咱們應用程序的運行效率,就必須得充分利用多核CPU的計算能力;Java早已經爲咱們提供了多線程的API,可是實現方式略微麻煩,今天咱們就來看看Java8在這方面提供的改善。服務器
假設場景
如今你須要爲在線教育平臺提供一個查詢用戶詳情的API,該接口須要返回用戶的基本信息,標籤信息,這兩個信息存放在不一樣位置,須要遠程調用來獲取這兩個信息;爲了模擬遠程調用,咱們須要在代碼裏面延遲 1s;網絡
public interface RemoteLoader { String load(); default void delay() { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } } } public class CustomerInfoService implements RemoteLoader { public String load() { this.delay(); return "基本信息"; } } public class LearnRecordService implements RemoteLoader { public String load() { this.delay(); return "學習信息"; } }
同步方式實現版本
若是咱們採用同步的方式來完成這個API接口,咱們的實現代碼:多線程
@Test public void testSync() { long start = System.currentTimeMillis(); List<remoteloader> remoteLoaders = Arrays.asList(new CustomerInfoService(), new LearnRecordService()); List<string> customerDetail = remoteLoaders.stream().map(RemoteLoader::load).collect(toList()); System.out.println(customerDetail); long end = System.currentTimeMillis(); System.out.println("總共花費時間:" + (end - start)); }
不出所料,由於調用的兩個接口都是延遲了 1s ,因此結果大於2秒 異步
Future實現的版本
接下來咱們把這個例子用Java7提供的Future
來實現異步的版本,看下效果如何呢?代碼以下:ide
@Test public void testFuture() { long start = System.currentTimeMillis(); ExecutorService executorService = Executors.newFixedThreadPool(2); List<remoteloader> remoteLoaders = Arrays.asList(new CustomerInfoService(), new LearnRecordService()); List<future<string>> futures = remoteLoaders.stream() .map(remoteLoader -> executorService.submit(remoteLoader::load)) .collect(toList()); List<string> customerDetail = futures.stream() .map(future -> { try { return future.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } return null; }) .filter(Objects::nonNull) .collect(toList()); System.out.println(customerDetail); long end = System.currentTimeMillis(); System.out.println("總共花費時間:" + (end - start)); }
此次咱們採用多線程的方式來改造了咱們這個例子,結果仍是比較滿意的,時間大概花費了1s多一點 學習
> 注意:這裏我分紅了兩個Stream,如何合在一塊兒用同一個Stream,那麼在用future.get()
的時候會致使阻塞,至關於提交一個任務執行完後才提交下一個任務,這樣達不到異步的效果優化
這裏咱們能夠看到雖然Future
達到了咱們預期的效果,可是若是須要實現將兩個異步的結果進行合併處理就稍微麻一些,這裏就不細說,後面主要看下CompletableFuture
在這方面的改進this
Java8並行流
以上咱們用的是Java8以前提供的方法來實現,接下來咱們來看下Java8中提供的並行流來實習咱們這個例子效果怎樣呢?線程
@Test public void testParallelStream() { long start = System.currentTimeMillis(); List<remoteloader> remoteLoaders = Arrays.asList(new CustomerInfoService(), new LearnRecordService()); List<string> customerDetail = remoteLoaders.parallelStream().map(RemoteLoader::load).collect(toList()); System.out.println(customerDetail); long end = System.currentTimeMillis(); System.out.println("總共花費時間:" + (end - start)); }
運行的結果仍是至關的滿意,花費時間 1s 多點 code
和Java8以前的實現對比,咱們發現整個代碼會更加的簡潔;
接下來咱們把咱們的例子改變一下,查詢用戶詳情的接口還須要返回視頻觀看記錄,用戶的標籤信息,購買訂單
public class WatchRecordService implements RemoteLoader { @Override public String load() { this.delay(); return "觀看記錄"; } } public class OrderService implements RemoteLoader { @Override public String load() { this.delay(); return "訂單信息"; } } public class LabelService implements RemoteLoader { @Override public String load() { this.delay(); return "標籤信息"; } }
咱們繼續使用Java8提供的並行流來實現,看下運行的結果是否理想
@Test public void testParallelStream2() { long start = System.currentTimeMillis(); List<remoteloader> remoteLoaders = Arrays.asList( new CustomerInfoService(), new LearnRecordService(), new LabelService(), new OrderService(), new WatchRecordService()); List<string> customerDetail = remoteLoaders.parallelStream().map(RemoteLoader::load).collect(toList()); System.out.println(customerDetail); long end = System.currentTimeMillis(); System.out.println("總共花費時間:" + (end - start)); }
可是此次運行的結果不是太理想,花費時間超過了2秒
CompletableFuture
基本的用法
@Test public void testCompletableFuture() { CompletableFuture<string> future = new CompletableFuture<>(); new Thread(() -> { doSomething(); future.complete("Finish"); //任務執行完成後 設置返回的結果 }).start(); System.out.println(future.join()); //獲取任務線程返回的結果 } private void doSomething() { System.out.println("doSomething..."); }
這種用法還有個問題,就是任務出現了異常,主線程會無感知,任務線程不會把異常給拋出來;這會致使主線程會一直等待,一般咱們也須要知道出現了什麼異常,作出對應的響應;改進的方式是在任務中try-catch全部的異常,而後調用future.completeExceptionally(e)
,代碼以下:
@Test public void testCompletableFuture() throws ExecutionException, InterruptedException { CompletableFuture<string> future = new CompletableFuture<>(); new Thread(() -> { try { doSomething(); future.complete("Finish"); } catch (Exception e) { future.completeExceptionally(e); } }).start(); System.out.println(future.get()); } private void doSomething() { System.out.println("doSomething..."); throw new RuntimeException("Test Exception"); }
從如今來看CompletableFuture
的使用過程須要處理的事情不少,不太簡潔,你會以爲看起來很麻煩;可是這只是表象,Java8其實對這個過程進行了封裝,提供了不少簡潔的操做方式;接下來咱們看下如何改造上面的代碼
@Test public void testCompletableFuture2() throws ExecutionException, InterruptedException { CompletableFuture<string> future = CompletableFuture.supplyAsync(() -> { doSomething(); return "Finish"; }); System.out.println(future.get()); }
這裏咱們採用了supplyAsync
,這下看起來簡潔了許多,世界都明亮了; Java8不只提供容許任務返回結果的supplyAsync
,還提供了沒有返回值的runAsync
;讓咱們能夠更加的關注業務的開發,不須要處理異常錯誤的管理
CompletableFuture異常處理
若是說主線程須要關心任務到底發生了什麼異常,須要對其做出相應操做,這個時候就須要用到exceptionally
@Test public void testCompletableFuture2() throws ExecutionException, InterruptedException { CompletableFuture<string> future = CompletableFuture .supplyAsync(() -> { doSomething(); return "Finish"; }) .exceptionally(throwable -> "Throwable exception message:" + throwable.getMessage()); System.out.println(future.get()); }
使用CompletableFuture來完成咱們查詢用戶詳情的API接口
@Test public void testCompletableFuture3() throws ExecutionException, InterruptedException { long start = System.currentTimeMillis(); List<remoteloader> remoteLoaders = Arrays.asList( new CustomerInfoService(), new LearnRecordService(), new LabelService(), new OrderService(), new WatchRecordService()); List<completablefuture<string>> completableFutures = remoteLoaders .stream() .map(loader -> CompletableFuture.supplyAsync(loader::load)) .collect(toList()); List<string> customerDetail = completableFutures .stream() .map(CompletableFuture::join) .collect(toList()); System.out.println(customerDetail); long end = System.currentTimeMillis(); System.out.println("總共花費時間:" + (end - start)); }
這裏依然是採用的兩個Stream來完成的,執行的結果以下:
這個結果不太滿意,和並行流的結果差很少,消耗時間 2秒多點;在這種場景下咱們用CompletableFuture
作了這麼多工做,可是效果不理想,難道就有沒有其餘的方式可讓它在快一點嗎?
爲了解決這個問題,咱們必須深刻了解下並行流和CompletableFuture
的實現原理,它們底層使用的線程池的大小都是CPU的核數Runtime.getRuntime().availableProcessors()
;那麼咱們來嘗試一下修改線程池的大小,看看效果如何?
自定義線程池,優化CompletableFuture
使用並行流沒法自定義線程池,可是CompletableFuture
能夠
@Test public void testCompletableFuture4() throws ExecutionException, InterruptedException { long start = System.currentTimeMillis(); List<remoteloader> remoteLoaders = Arrays.asList( new CustomerInfoService(), new LearnRecordService(), new LabelService(), new OrderService(), new WatchRecordService()); ExecutorService executorService = Executors.newFixedThreadPool(Math.min(remoteLoaders.size(), 50)); List<completablefuture<string>> completableFutures = remoteLoaders .stream() .map(loader -> CompletableFuture.supplyAsync(loader::load, executorService)) .collect(toList()); List<string> customerDetail = completableFutures .stream() .map(CompletableFuture::join) .collect(toList()); System.out.println(customerDetail); long end = System.currentTimeMillis(); System.out.println("總共花費時間:" + (end - start)); }
咱們使用自定義線程池,設置最大的線程池數量50,來看下執行的結果
這下執行的結果比較滿意了,1秒多點;理論上來講這個結果能夠一直持續,直到達到線程池的大小50
並行流和CompletableFuture
二者該如何選擇
這二者如何選擇主要看任務類型,建議
- 若是你的任務是計算密集型的,而且沒有I/O操做的話,那麼推薦你選擇Stream的並行流,實現簡單並行效率也是最高的
- 若是你的任務是有頻繁的I/O或者網絡鏈接等操做,那麼推薦使用
CompletableFuture
,採用自定義線程池的方式,根據服務器的狀況設置線程池的大小,儘量的讓CPU忙碌起來
CompletableFuture
的其餘經常使用方法
- thenApply、thenApplyAsync: 假如任務執行完成後,還須要後續的操做,好比返回結果的解析等等;能夠經過這兩個方法來完成
- thenCompose、thenComposeAsync: 容許你對兩個異步操做進行流水線的操做,當第一個操做完成後,將其結果傳入到第二個操做中
- thenCombine、thenCombineAsync:容許你把兩個異步的操做整合;好比把第一個和第二個操做返回的結果作字符串的鏈接操做
總結
- Java8並行流的使用方式
- CompletableFuture的使用方式、異常處理機制,讓咱們有機會管理任務執行中發送的異常
- Java8並行流和
CompletableFuture
二者該如何選擇 CompletableFuture
的經常使用方法
原創不易 轉載請註明出處:https://silently9527.cn/archives/48