不變的東西就是一直在變化中。java
想必,你們在閒暇時刻,會常常看視頻,常常用的幾個 APP,好比優酷、愛奇藝、騰訊等。編程
這些視頻 APP 不只僅能夠在手機上播放,還可以支持在電視上播放。segmentfault
在電視終端上播放的 APP 是獨立發佈的版本,跟手機端的 APP 是不同的。設計模式
當咱們看一部電影時,點擊進入某一部電影,就進入到了專輯詳情頁頁面,此時,播放器會自動播放視頻。用戶在手機上看到的專輯詳情頁,與電視上看到的專輯詳情頁,頁面樣式設計上是不一樣的。併發
咱們來直觀的看一下效果。app
手機上的騰訊視頻專輯詳情頁:異步
上半部分截圖,下面還有爲你推薦、明星演員、周邊推薦、評論等功能。async
相應的,在電視端的專輯詳情頁展現方式是不同的。假設產品經理提出一個需求,要求對詳情頁作個改版。
樣式要求以下圖所示:ide
兩個終端的樣式對比,在電視端專輯詳情頁中,包含了不少板塊,每一個板塊橫向展現多個內容。函數式編程
產品的設計上要求是,有的板塊內容來源於推薦、有的板塊來源於搜索、有的板塊來源CMS(內容管理系統)。簡單理解爲,每一個板塊內容來源不一樣,來源於推薦、搜索等接口的內容要求是近實時的請求。
二、技術設計方案思考
考慮到產品提的這個需求,其實實現起來並不難。
主要分爲了靜態數據部分和動態數據部分,對於不常常變化的數據能夠經過靜態接口獲取,對於近乎實時的數據能夠經過動態接口獲取。
靜態接口設計:
專輯自己的屬性以及專輯下的視頻數據,通常是不常常變化的。
在需求場景介紹中,我截圖的是電影頻道。若是是電視劇頻道,會展現劇集列表(專輯下的全部視頻,如第 1 集、第 2 集...),而視頻的更新通常是不太頻繁的,因此在專輯詳情頁劇集列表數據就能夠從靜態接口獲取。
靜態接口數據生成流程:
另一部分,就是須要動態接口來實現,調用第三方接口獲取數據,好比推薦、搜索數據。
同時,要求板塊與板塊之間的內容不容許重複。
動態接口設計:
方案一:
串行調用,即按照每一個板塊的展現前後順序,調用相應的第三方接口獲取數據。
方案二:
並行調用,即多個板塊之間能夠並行調用,提升總體接口響應效率。
其實以上兩個方案,各有利弊。
方案一串行調用,好處是開發模型簡單,按照串行方式依次調用接口,內容數據去重,聚合全部的數據返回給客戶端。
可是,接口響應時間依賴於第三方接口的響應時間,一般第三方接口老是不可靠的,可能就會拉高接口總體響應時間,進而致使佔用線程時間過長,影響接口總體吞吐量。
方案二並行調用,理論上是能夠提升接口的總體響應時間,假設同時調用多個第三方接口,取決於最慢的接口響應時間。
並行調用時,須要考慮到「池化技術」,即不能無限制的在 JVM 進程上建立過多的線程。同時,也要考慮到板塊與板塊之間的內容數據,要按照產品設計上的前後順序作去重。
根據這個需求場景,咱們選擇第二種方案來實現更合適一些。
選擇了方案二,咱們抽象出以下圖所示的簡易模型:
T一、T二、T3 表示多個板塊內容線程。T1 線程先返回結果,T2 線程返回的結果不能與與 T1 線程返回的結果內容重複,T3 線程返回的結果不能與 T一、T2 兩個線程返回的結果內容重複。
咱們從技術實現上考量,當並行調用多個第三方接口時,須要獲取接口的返回結果,首先想到的就是 Future ,可以實現異步獲取任務結果。
另外,JDK8 提供了 CompletableFuture 易於使用的獲取異步結果的工具類,解決了 Future 的一些使用上的痛點,以更優雅的方式實現組合式異步編程,同時也契合函數式編程。
Future 接口設計:
提供了獲取任務結果、取消任務、判斷任務狀態接口。調用獲取任務結果方法,在任務未完成狀況下,會致使調用阻塞。
Future 接口提供的方法:
// 獲取任務結果 V get() throws InterruptedException, ExecutionException; // 支持超時時間的獲取任務結果 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; // 判斷任務是否已完成 boolean isDone(); // 判斷任務是否已取消 boolean isCancelled(); // 取消任務 boolean cancel(boolean mayInterruptIfRunning);
一般,咱們在考慮到使用 Future 獲取任務結果時,會使用 ThreadPoolExecutor 或者 FutureTask 來實現功能需求。
ThreadPoolExecutor、FutureTask 與 Future 接口關係類圖:
TheadPoolExecutor 提供三個 submit 方法:
// 1. 提交無需返回值的任務,Runnable 接口 run() 方法無返回值 public Future<?> submit(Runnable task) { } // 2. 提交須要返回值的任務,Callable 接口 call() 方法有返回值 public <T> Future<T> submit(Callable<T> task) { } // 3. 提交須要返回值的任務,任務結果是第二個參數 result 對象 public <T> Future<T> submit(Runnable task, T result) { }
第 3 個 submit 方法使用示例以下所示:
static String x = "東昇的思考"; public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newFixedThreadPool(1); // 建立 Result 對象 r Result r = new Result(); r.setName(x); // 提交任務 Future<Result> future = executor.submit(new Task(r), r); Result fr = future.get(); // 下面等式成立 System.out.println(fr == r); System.out.println(fr.getName() == x); System.out.println(fr.getNick() == x); } static class Result { private String name; private String nick; // ... ignore getter and setter } static class Task implements Runnable { Result r; // 經過構造函數傳入 result Task(Result r) { this.r = r; } @Override public void run() { // 能夠操做 result String name = r.getName(); r.setNick(name); } }
執行結果都是true。
FutureTask 設計實現:
實現了 Runnable 和 Future 兩個接口。實現了 Runnable 接口,說明能夠做爲任務對象,直接提交給 ThreadPoolExecutor 去執行。實現了 Future 接口,說明可以獲取執行任務的返回結果。
咱們來根據產品的需求,使用 FutureTask 模擬兩個線程,經過示例實現下功能。
結合示例代碼註釋理解:
public static void main(String[] args) throws Exception { // 建立任務 T1 的 FutureTask,調用推薦接口獲取數據 FutureTask<String> ft1 = new FutureTask<>(new T1Task()); // 建立任務 T1 的 FutureTask,調用搜索接口獲取數據,依賴 T1 結果 FutureTask<String> ft2 = new FutureTask<>(new T2Task(ft1)); // 線程 T1 執行任務 ft1 Thread T1 = new Thread(ft1); T1.start(); // 線程 T2 執行任務 ft2 Thread T2 = new Thread(ft2); T2.start(); // 等待線程 T2 執行結果 System.out.println(ft2.get()); } // T1Task 調用推薦接口獲取數據 static class T1Task implements Callable<String> { @Override public String call() throws Exception { System.out.println("T1: 調用推薦接口獲取數據..."); TimeUnit.SECONDS.sleep(1); System.out.println("T1: 獲得推薦接口數據..."); TimeUnit.SECONDS.sleep(10); return " [T1 板塊數據] "; } } // T2Task 調用搜索接口數據,同時須要推薦接口數據 static class T2Task implements Callable<String> { FutureTask<String> ft1; // T2 任務須要 T1 任務的 FutureTask 返回結果去重 T2Task(FutureTask<String> ft1) { this.ft1 = ft1; } @Override public String call() throws Exception { System.out.println("T2: 調用搜索接口獲取數據..."); TimeUnit.SECONDS.sleep(1); System.out.println("T2: 獲得搜索接口的數據..."); TimeUnit.SECONDS.sleep(5); // 獲取 T2 線程的數據 System.out.println("T2: 調用 T1.get() 接口獲取推薦數據"); String tf1 = ft1.get(); System.out.println("T2: 獲取到推薦接口數據:" + tf1); System.out.println("T2: 將 T1 與 T2 板塊數據作去重處理"); return "[T1 和 T2 板塊數據聚合結果]"; } }
執行結果以下:
> Task :FutureTaskTest.main() T1: 調用推薦接口獲取數據... T2: 調用搜索接口獲取數據... T1: 獲得推薦接口數據... T2: 獲得搜索接口的數據... T2: 調用 T1.get() 接口獲取推薦數據 T2: 獲取到推薦接口數據: [T1 板塊數據] T2: 將 T1 與 T2 板塊數據作去重處理 [T1 和 T2 板塊數據聚合結果]
小結:
Future 表示「將來」的意思,主要是將耗時的一些操做任務,交給單獨的線程去執行。從而達到異步的目的,提交任務的當前線程,在提交任務後和獲取任務結果的過程當中,當前線程能夠繼續執行其餘操做,不須要在那傻等着返回執行結果。
對於 Future 設計模式,雖然咱們提交任務時,不會進入任何阻塞,可是當調用方要得到這個任務的執行結果,仍是可能會阻塞直至任務執行完成。
在 JDK1.5 設計之初就一直存在這個問題,發展到 JDK1.8 引入了 CompletableFuture 才獲得完美的加強。
在此期間,Google 開源的 Guava 工具包提供了 ListenableFuture ,用於支持任務完成時支持回調方式,感興趣的朋友們能夠自行查閱研究。
在業務需求場景介紹中,不一樣板塊的數據來源是不一樣的,而且板塊與板塊之間是存在數據依賴關係的。
能夠理解爲任務與任務之間是有時序關係的,而根據 CompletableFuture 提供的一些功能特性,是很是適合這種業務場景的。
CompletableFuture 類圖:
CompletableFuture 實現了 Future 和 CompletionStage 兩個接口。實現 Future 接口是爲了關注異步任務何時結束,和獲取異步任務執行的結果。實現 CompletionStage 接口,其提供了很是豐富的功能,實現了串行關係、並行關係、匯聚關係等。
CompletableFuture 核心優點:
1)無需手工維護線程,給任務分配線程的工做無需開發人員關注;
2)在使用上,語義更加清晰明確;
例如:t3 = t1.thenCombine(t2, () -> { // doSomething ... } 可以明確的表述任務 3 要等任務 2 和 任務 1完成後纔會開始執行。
3)代碼更加簡練,支持鏈式調用,讓你更專一業務邏輯。
4)方便的處理異常狀況
接下來,經過 CompletableFuture 來模擬實現專輯下多板塊數據聚合處理。
代碼以下所示:
public static void main(String[] args) throws Exception { // 暫存數據 List<String> stashList = Lists.newArrayList(); // 任務 1:調用推薦接口獲取數據 CompletableFuture<String> t1 = CompletableFuture.supplyAsync(() -> { System.out.println("T1: 獲取推薦接口數據..."); sleepSeconds(5); stashList.add("[T1 板塊數據]"); return "[T1 板塊數據]"; }); // 任務 2:調用搜索接口獲取數據 CompletableFuture<String> t2 = CompletableFuture.supplyAsync(() -> { System.out.println("T2: 調用搜索接口獲取數據..."); sleepSeconds(3); return " [T2 板塊數據] "; }); // 任務 3:任務 1 和任務 2 完成後執行,聚合結果 CompletableFuture<String> t3 = t1.thenCombine(t2, (t1Result, t2Result) -> { System.out.println(t1Result + " 與 " + t2Result + "實現去重邏輯處理"); return "[T1 和 T2 板塊數據聚合結果]"; }); // 等待任務 3 執行結果 System.out.println(t3.get(6, TimeUnit.SECONDS)); } static void sleepSeconds(int timeout) { try { TimeUnit.SECONDS.sleep(timeout); } catch (InterruptedException e) { e.printStackTrace(); } }
執行結果以下:
> Task :CompletableFutureTest.main() T1: 獲取推薦接口數據... T2: 調用搜索接口獲取數據... [T1 板塊數據] 與 [T2 板塊數據] 實現去重邏輯處理 [T1 和 T2 板塊數據聚合結果]
上述的示例代碼在 IDEA 中新建個Class,直接複製進去,便可正常運行。
建立合理的線程池:
在生產環境下,不建議直接使用上述示例代碼形式。由於示例代碼中使用的
CompletableFuture.supplyAsync(() -> {});
建立 CompletableFuture 對象的 supplyAsync() 方法(這裏使用的工廠方法模式),底層使用的默認線程池,不必定能知足業務需求。
結合底層源代碼來看一下:
// 默認使用 ForkJoinPool 線程池 private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); }
建立 ForkJoinPool 線程池:
默認線程池大小是 Runtime.getRuntime().availableProcessors() - 1(CPU 核數 - 1),能夠經過 JVM 參數 -Djava.util.concurrent.ForkJoinPool.common.parallelism 設置線程池大小。
JVM 參數上配置 -Djava.util.concurrent.ForkJoinPool.common.threadFactory 設置線程工廠類;配置 -Djava.util.concurrent.ForkJoinPool.common.exceptionHandler 設置異常處理類,這兩個參數設置後,內部會經過系統類加載器加載 Class。
若是全部 CompletableFuture 都使用默認線程池,一旦有任務執行很慢的 I/O 操做,就會致使全部線程都阻塞在 I/O 操做上,進而影響系統總體性能。
因此,建議你們在生產環境使用時,根據不一樣的業務類型建立不一樣的線程池,以免互相影響
。
CompletableFuture 還提供了另外支持線程池的方法。
// 第二個參數支持傳遞 Executor 自定義線程池 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier); }
自定義線程池,建議參考 「阿里巴巴 Java 開發手冊」,推薦使用 ThreadPoolExecutor 自定義線程池,使用有界隊列,根據實際業務狀況設置隊列大小。
線程池大小的設置,在 「Java 併發編程實戰」一書中,Brian Goetz 提供了很多優化建議。若是線程池數量過多,競爭 CPU 和內存資源,致使大量時間在上下文切換上。反之,若是線程池數量過少,沒法充分利用 CPU 多核優點。
線程池大小與 CPU 處理器的利用率之比能夠用下面公式估算:
異常處理:
CompletableFuture 提供了很是簡單的異常處理 ,以下這些方法,支持鏈式編程方式。
// 相似於 try{}catch{} 中的 catch{} public CompletionStage<T> exceptionally (Function<Throwable, ? extends T> fn); // 相似於 try{}finally{} 中的 finally{},不支持返回結果 public CompletionStage<T> whenComplete (BiConsumer<? super T, ? super Throwable> action); public CompletionStage<T> whenCompleteAsync (BiConsumer<? super T, ? super Throwable> action); // 相似於 try{}finally{} 中的 finally{},支持返回結果 public <U> CompletionStage<U> handle (BiFunction<? super T, Throwable, ? extends U> fn); public <U> CompletionStage<U> handleAsync (BiFunction<? super T, Throwable, ? extends U> fn);
循環壓測任務數以下所示,每次執行壓測,從 1 到 jobNum 數據疊加匯聚結果,計算耗時。
統計維度:CompletableFuture 默認線程池 與 自定義線程池。
性能測試代碼:
// 性能測試代碼 Arrays.asList(-3, -1, 0, 1, 2, 4, 5, 10, 16, 17, 30, 50, 100, 150, 200, 300).forEach(offset -> { int jobNum = PROCESSORS + offset; System.out.println( String.format("When %s tasks => stream: %s, parallelStream: %s, future default: %s, future custom: %s", testCompletableFutureDefaultExecutor(jobNum), testCompletableFutureCustomExecutor(jobNum))); }); // CompletableFuture 使用默認 ForkJoinPool 線程池 private static long testCompletableFutureDefaultExecutor(int jobCount) { List<CompletableFuture<Integer>> tasks = new ArrayList<>(); IntStream.rangeClosed(1, jobCount).forEach(value -> tasks.add(CompletableFuture.supplyAsync(CompleteableFuturePerfTest::getJob))); long start = System.currentTimeMillis(); int sum = tasks.stream().map(CompletableFuture::join).mapToInt(Integer::intValue).sum(); checkSum(sum, jobCount); return System.currentTimeMillis() - start; } // CompletableFuture 使用自定義的線程池 private static long testCompletableFutureCustomExecutor(int jobCount) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(200, 200, 5, TimeUnit.MINUTES, new ArrayBlockingQueue<>(100000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("CUSTOM_DAEMON_COMPLETABLEFUTURE"); thread.setDaemon(true); return thread; } }, new ThreadPoolExecutor.CallerRunsPolicy()); List<CompletableFuture<Integer>> tasks = new ArrayList<>(); IntStream.rangeClosed(1, jobCount).forEach(value -> tasks.add(CompletableFuture.supplyAsync(CompleteableFuturePerfTest::getJob, threadPoolExecutor))); long start = System.currentTimeMillis(); int sum = tasks.stream().map(CompletableFuture::join).mapToInt(Integer::intValue).sum(); checkSum(sum, jobCount); return System.currentTimeMillis() - start; }
測試機器配置:8 核CPU,16G內存
性能測試結果:
根據壓測結果看到,隨着壓測任務數量越大,使用默認的線程池性能越差。
對象建立:
除前面提到的 supplyAsync 方法外,CompletableFuture 還提供了以下方法:
// 執行任務,CompletableFuture<Void> 無返回值,默認線程池 public static CompletableFuture<Void> runAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable); } // 執行任務,CompletableFuture<Void> 無返回值,支持自定義線程池 public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) { return asyncRunStage(screenExecutor(executor), runnable); }
咱們在 CompletableFuture 模式實戰中,提到了 CompletableFuture 實現了 CompletionStage 接口,該接口提供了很是豐富的功能。
CompletionStage 接口支持串行關係、匯聚 AND 關係、匯聚 OR 關係。
下面對這些關係的接口作個簡單描述,你們在使用時能夠去自行查閱 JDK API。
同時,這些關係接口中每一個方法都提供了對應的 xxxAsync() 方法,表示異步化執行任務。
串行關係:
CompletionStage 描述串行關係,主要有 thenApply、thenRun、thenAccept 和 thenCompose 系列接口。
源碼以下所示:
// 對應 U apply(T t) ,接收參數 T並支持返回值 U public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn); public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn); // 不接收參數也不支持返回值 public CompletionStage<Void> thenRun(Runnable action); public CompletionStage<Void> thenRunAsync(Runnable action); // 接收參數但不支持返回值 public CompletionStage<Void> thenAccept(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action); // 組合兩個依賴的 CompletableFuture 對象 public <U> CompletionStage<U> thenCompose (Function<? super T, ? extends CompletionStage<U>> fn); public <U> CompletionStage<U> thenComposeAsync (Function<? super T, ? extends CompletionStage<U>> fn);
匯聚 AND 關係:
CompletionStage 描述 匯聚 AND 關係,主要有 thenCombine、thenAcceptBoth 和 runAfterBoth 系列接口。
源碼以下所示(省略了Async 方法):
// 當前和另外的 CompletableFuture 都完成時,兩個參數傳遞給 fn,fn 有返回值 public <U,V> CompletionStage<V> thenCombine (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn); // 當前和另外的 CompletableFuture 都完成時,兩個參數傳遞給 action,action 沒有返回值 public <U> CompletionStage<Void> thenAcceptBoth (CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action); // 當前和另外的 CompletableFuture 都完成時,執行 action public CompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable action);
匯聚 OR 關係:
CompletionStage 描述 匯聚 OR 關係,主要有 applyToEither、acceptEither 和 runAfterEither 系列接口。
源碼以下所示(省略了Async 方法):
// 當前與另外的 CompletableFuture 任何一個執行完成,將其傳遞給 fn,支持返回值 public <U> CompletionStage<U> applyToEither (CompletionStage<? extends T> other, Function<? super T, U> fn); // 當前與另外的 CompletableFuture 任何一個執行完成,將其傳遞給 action,不支持返回值 public CompletionStage<Void> acceptEither (CompletionStage<? extends T> other, Consumer<? super T> action); // 當前與另外的 CompletableFuture 任何一個執行完成,直接執行 action public CompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action);
到此,CompletableFuture 的相關特性都介紹完了。
異步編程慢慢變得愈來愈成熟,Java 語言官網也開始支持異步編程模式,因此學好異步編程仍是有必要的。
本文結合業務需求場景驅動,引出了 Future 設計模式實戰,而後對 JDK1.8 中的 CompletableFuture 是如何使用的,核心優點、性能測試對比、使用擴展方面作了進一步剖析。
但願對你們有所幫助!
歡迎關注個人公衆號,掃二維碼關注解鎖更多精彩文章,與你一同成長~