在 我會手動建立線程,爲何要使用線程池? 中詳細的介紹了 ExecutorService,能夠將整塊任務拆分作簡單的並行處理;html
在 不會用Java Future,我懷疑你泡茶沒我快 中又詳細的介紹了 Future 的使用,填補了 Runnable 不能獲取線程執行結果的空缺java
將兩者結合起來使用看似要一招吃天下了(Java有併發,併發之大,一口吃不下), but ~~ 是我太天真編程
假設咱們有 4 個任務(A, B, C, D)用來執行復雜的計算,每一個任務的執行時間隨着輸入參數的不一樣而不一樣,若是將任務提交到 ExecutorService, 相信你已經能夠「信手拈來」併發
ExecutorService executorService = Executors.newFixedThreadPool(4); List<Future> futures = new ArrayList<Future<Integer>>(); futures.add(executorService.submit(A)); futures.add(executorService.submit(B)); futures.add(executorService.submit(C)); futures.add(executorService.submit(D)); // 遍歷 Future list,經過 get() 方法獲取每一個 future 結果 for (Future future:futures) { Integer result = future.get(); // 其餘業務邏輯 }
先直入主題,用 CompletionService 實現一樣的場景異步
ExecutorService executorService = Executors.newFixedThreadPool(4); // ExecutorCompletionService 是 CompletionService 惟一實現類 CompletionService executorCompletionService= new ExecutorCompletionService<>(executorService ); List<Future> futures = new ArrayList<Future<Integer>>(); futures.add(executorCompletionService.submit(A)); futures.add(executorCompletionService.submit(B)); futures.add(executorCompletionService.submit(C)); futures.add(executorCompletionService.submit(D)); // 遍歷 Future list,經過 get() 方法獲取每一個 future 結果 for (int i=0; i<futures.size(); i++) { Integer result = executorCompletionService.take().get(); // 其餘業務邏輯 }
兩種方式在代碼實現上幾乎一毛同樣,咱們曾經說過 JDK 中不會重複造輪子,若是要造一個新輪子,一定是原有的輪子在某些場景的使用上有致命缺陷異步編程
既然新輪子出來了,兩者到底有啥不一樣呢? 在 搞定 CompletableFuture,併發異步編程和編寫串行程序還有什麼區別? 文中,咱們提到了 Future get()
方法的致命缺陷:函數
若是 Future 結果沒有完成,調用 get() 方法,程序會 阻塞在那裏,直至獲取返回結果
先來看第一種實現方式,假設任務 A 因爲參數緣由,執行時間相對任務 B,C,D 都要長不少,可是按照程序的執行順序,程序在 get() 任務 A 的執行結果會阻塞在那裏,致使任務 B,C,D 的後續任務沒辦法執行。又由於每一個任務執行時間是不固定的,因此不管怎樣調整將任務放到 List 的順序,都不合適,這就是致命弊端高併發
新輪子天然要解決這個問題,它的設計理念就是哪一個任務先執行完成,get() 方法就會獲取到相應的任務結果,這麼作的好處是什麼呢?來看個圖你就瞬間理解了源碼分析
兩張圖一對比,執行時長高下立判了,在當今高併發的時代,這點時間差,在吞吐量上起到的效果可能不是一點半點了性能
那 CompletionService 是怎麼作到獲取最早執行完的任務結果的呢?
若是你使用過消息隊列,你應該秒懂我要說什麼了,CompletionService 實現原理很簡單
就是一個將異步任務的生產和任務完成結果的消費解耦的服務
用人話解釋一下上面的抽象概念我只能再畫一張圖了
說白了,哪一個任務執行的完,就直接將執行結果放到隊列中,這樣消費者拿到的結果天然就是最先拿到的那個了
從上圖中看到,有任務,有結果隊列,那 CompletionService
天然也要圍繞着幾個關鍵字作文章了
帶着這些線索,咱們走進 CompletionService 源碼看一看
CompletionService
是一個接口,它簡單的只有 5 個方法:
Future<V> submit(Callable<V> task); Future<V> submit(Runnable task, V result); Future<V> take() throws InterruptedException; Future<V> poll(); Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
關於 2 個 submit 方法, 我在 不會用Java Future,我懷疑你泡茶沒我快 文章中作了很是詳細的分析以及案例使用說明,這裏再也不過多贅述
另外 3 個方法都是從阻塞隊列中獲取並移除阻塞隊列第一個元素,只不過他們的功能略有不一樣
因此說,按大類劃分上面5個方法,其實就是兩個功能
CompletionService
只是接口,ExecutorCompletionService
是該接口的惟一實現類
先來看一下類結構, 實現類裏面並無多少內容
<fancybox></fancybox>
ExecutorCompletionService
有兩種構造函數:
private final Executor executor; private final AbstractExecutorService aes; private final BlockingQueue<Future<V>> completionQueue; public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); }
public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) { if (executor == null || completionQueue == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = completionQueue; }
兩個構造函數都須要傳入一個 Executor 線程池,由於是處理異步任務的,咱們是不被容許手動建立線程的,因此這裏要使用線程池也就很好理解了
另一個參數是 BlockingQueue,若是不傳該參數,就會默認隊列爲 LinkedBlockingQueue
,任務執行結果就是加入到這個阻塞隊列中的
因此要完全理解 ExecutorCompletionService
,咱們只須要知道一個問題的答案就能夠了:
它是如何將異步任務結果放到這個阻塞隊列中的?
想知道這個問題的答案,那隻須要看它提交任務以後都作了些什麼?
public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; }
咱們前面也分析過,execute 是提交 Runnable 類型的任務,自己得不到返回值,但又能夠將執行結果放到阻塞隊列裏面,因此確定是在 QueueingFuture 裏面作了文章
從上圖中看一看出,QueueingFuture 實現的接口很是多,因此說也就具有了相應的接口能力。
重中之重是,它繼承了 FutureTask ,FutureTask 重寫了 Runnable 的 run() 方法 (方法細節分析能夠查看FutureTask源碼分析 ) 文中詳細說明了,不管是set() 正常結果,仍是setException() 結果,都會調用 finishCompletion()
方法:
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } // 重點 重點 重點 done(); callable = null; // to reduce footprint }
上述方法會執行 done() 方法,而 QueueingFuture 恰巧重寫了 FutureTask 的 done() 方法:
方法實現很簡單,就是將 task 放到阻塞隊列中
protected void done() { completionQueue.add(task); }
執行到此的 task 已是前序步驟 set 過結果的 task,因此就能夠經過消費阻塞隊列獲取相應的結果了
相信到這裏,CompletionService 在你面前應該沒什麼祕密可言了
在 JDK docs 上明確給了兩個例子來講明 CompletionService 的用途:
假設你有一組針對某個問題的solvers,每一個都返回一個類型爲Result的值,而且想要併發地運行它們,處理每一個返回一個非空值的結果,在某些方法使用(Result r)
其實就是文中開頭的使用方式
void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException, ExecutionException { CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e); for (Callable<Result> s : solvers) ecs.submit(s); int n = solvers.size(); for (int i = 0; i < n; ++i) { Result r = ecs.take().get(); if (r != null) use(r); } }
假設你想使用任務集的第一個非空結果,忽略任何遇到異常的任務,並在第一個任務準備好時取消全部其餘任務
void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException { CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e); int n = solvers.size(); List<Future<Result>> futures = new ArrayList<Future<Result>>(n); Result result = null; try { for (Callable<Result> s : solvers) futures.add(ecs.submit(s)); for (int i = 0; i < n; ++i) { try { Result r = ecs.take().get(); if (r != null) { result = r; break; } } catch (ExecutionException ignore) {} } } finally { for (Future<Result> f : futures) // 注意這裏的參數給的是 true,詳解一樣在前序 Future 源碼分析文章中 f.cancel(true); } if (result != null) use(result); }
這兩種方式都是很是經典的 CompletionService 使用 範式 ,請你們仔細品味每一行代碼的用意
範式沒有說明 Executor 的使用,使用 ExecutorCompletionService,須要本身建立線程池,看上去雖然有些麻煩,但好處是你可讓多個 ExecutorCompletionService 的線程池隔離,這種隔離性能避免幾個特別耗時的任務拖垮整個應用的風險 (這也是咱們反覆說過屢次的,不要全部業務共用一個線程池)
CompletionService 的應用場景仍是很是多的,好比
CompletionService 不但能知足獲取最快結果,還能起到必定 "load balancer" 做用,獲取可用服務的結果,使用也很是簡單, 只須要遵循範式便可
併發系列 講了這麼多,分析源碼的過程也碰到各類隊列,接下來咱們就看看那些讓人眼花繚亂的隊列
日拱一兵 | 原創