| 好看請贊,養成習慣web
你有一個思想,我有一個思想,咱們交換後,一我的就有兩個思想編程
If you can NOT explain it simply, you do NOT understand it well enough微信
現陸續將Demo代碼和技術文章整理在一塊兒 Github實踐精選 ,方便你們閱讀查看,本文一樣收錄在此,以爲不錯,還請Star🌟併發
前言
在 我會手動建立線程,爲何要使用線程池? 中詳細的介紹了 ExecutorService,能夠將整塊任務拆分作簡單的並行處理;app
在 不會用Java Future,我懷疑你泡茶沒我快 中又詳細的介紹了 Future 的使用,填補了 Runnable 不能獲取線程執行結果的空缺異步
將兩者結合起來使用看似要一招吃天下了(Java有併發,併發之大,一口吃不下), but ~~ 是我太天真編輯器
ExecutorService VS CompletionService
假設咱們有 4 個任務(A, B, C, D)用來執行復雜的計算,每一個任務的執行時間隨着輸入參數的不一樣而不一樣,若是將任務提交到 ExecutorService, 相信你已經能夠「信手拈來」異步編程
ExecutorService executorService = Executors.newFixedThreadPool(4);
List<Future> futures = new ArrayList<Future<Integer>>();
futures.add(executorService.submit(A));
futures.add(executorService.submit(B));
futures.add(executorService.submit(C));
futures.add(executorService.submit(D));
// 遍歷 Future list,經過 get() 方法獲取每一個 future 結果
for (Future future:futures) {
Integer result = future.get();
// 其餘業務邏輯
}
先直入主題,用 CompletionService 實現一樣的場景函數
ExecutorService executorService = Executors.newFixedThreadPool(4);
// ExecutorCompletionService 是 CompletionService 惟一實現類
CompletionService executorCompletionService= new ExecutorCompletionService<>(executorService );
List<Future> futures = new ArrayList<Future<Integer>>();
futures.add(executorCompletionService.submit(A));
futures.add(executorCompletionService.submit(B));
futures.add(executorCompletionService.submit(C));
futures.add(executorCompletionService.submit(D));
// 遍歷 Future list,經過 get() 方法獲取每一個 future 結果
for (int i=0; i<futures.size(); i++) {
Integer result = executorCompletionService.take().get();
// 其餘業務邏輯
}
兩種方式在代碼實現上幾乎一毛同樣,咱們曾經說過 JDK 中不會重複造輪子,若是要造一個新輪子,一定是原有的輪子在某些場景的使用上有致命缺陷高併發
既然新輪子出來了,兩者到底有啥不一樣呢?在 搞定 CompletableFuture,併發異步編程和編寫串行程序還有什麼區別? 文中,咱們提到了 Future get()
方法的致命缺陷:
若是 Future 結果沒有完成,調用 get() 方法,程序會阻塞在那裏,直至獲取返回結果
先來看第一種實現方式,假設任務 A 因爲參數緣由,執行時間相對任務 B,C,D 都要長不少,可是按照程序的執行順序,程序在 get() 任務 A 的執行結果會阻塞在那裏,致使任務 B,C,D 的後續任務沒辦法執行。又由於每一個任務執行時間是不固定的,因此不管怎樣調整將任務放到 List 的順序,都不合適,這就是致命弊端
新輪子天然要解決這個問題,它的設計理念就是哪一個任務先執行完成,get() 方法就會獲取到相應的任務結果,這麼作的好處是什麼呢?來看個圖你就瞬間理解了
兩張圖一對比,執行時長高下立判了,在當今高併發的時代,這點時間差,在吞吐量上起到的效果可能不是一點半點了
那 CompletionService 是怎麼作到獲取最早執行完的任務結果的呢?
遠看CompletionService 輪廓
若是你使用過消息隊列,你應該秒懂我要說什麼了,CompletionService 實現原理很簡單
就是一個將異步任務的生產和任務完成結果的消費解耦的服務
用人話解釋一下上面的抽象概念我只能再畫一張圖了
說白了,哪一個任務執行的完,就直接將執行結果放到隊列中,這樣消費者拿到的結果天然就是最先拿到的那個了
從上圖中看到,有任務,有結果隊列,那 CompletionService
天然也要圍繞着幾個關鍵字作文章了
-
既然是異步任務,那天然可能用到 Runnable 或 Callable -
既然能獲取到結果,天然也會用到 Future 了
帶着這些線索,咱們走進 CompletionService 源碼看一看
近看 CompletionService 源碼
CompletionService
是一個接口,它簡單的只有 5 個方法:
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
關於 2 個 submit 方法, 我在 不會用Java Future,我懷疑你泡茶沒我快 文章中作了很是詳細的分析以及案例使用說明,這裏再也不過多贅述
另外 3 個方法都是從阻塞隊列中獲取並移除阻塞隊列第一個元素,只不過他們的功能略有不一樣
-
Take: 若是 隊列爲空,那麼調用 take() 方法的線程會 被阻塞 -
Poll: 若是 隊列爲空,那麼調用 poll() 方法的線程會 返回 null -
Poll-timeout: 以 超時的方式獲取並移除阻塞隊列中的第一個元素,若是超時時間到,隊列仍是空,那麼該方法會返回 null
因此說,按大類劃分上面5個方法,其實就是兩個功能
-
提交異步任務 (submit) -
從隊列中拿取並移除第一個元素 (take/poll)
CompletionService
只是接口,ExecutorCompletionService
是該接口的惟一實現類
ExecutorCompletionService 源碼分析
先來看一下類結構, 實現類裏面並無多少內容
ExecutorCompletionService
有兩種構造函數:
private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}
兩個構造函數都須要傳入一個 Executor 線程池,由於是處理異步任務的,咱們是不被容許手動建立線程的,因此這裏要使用線程池也就很好理解了
另一個參數是 BlockingQueue,若是不傳該參數,就會默認隊列爲 LinkedBlockingQueue
,任務執行結果就是加入到這個阻塞隊列中的
因此要完全理解 ExecutorCompletionService
,咱們只須要知道一個問題的答案就能夠了:
它是如何將異步任務結果放到這個阻塞隊列中的?
想知道這個問題的答案,那隻須要看它提交任務以後都作了些什麼?
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
咱們前面也分析過,execute 是提交 Runnable 類型的任務,自己得不到返回值,但又能夠將執行結果放到阻塞隊列裏面,因此確定是在 QueueingFuture 裏面作了文章
從上圖中看一看出,QueueingFuture 實現的接口很是多,因此說也就具有了相應的接口能力。
重中之重是,它繼承了 FutureTask ,FutureTask 重寫了 Runnable 的 run() 方法 (方法細節分析能夠查看FutureTask源碼分析 ) 文中詳細說明了,不管是set() 正常結果,仍是setException() 結果,都會調用 finishCompletion()
方法:
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// 重點 重點 重點
done();
callable = null; // to reduce footprint
}
上述方法會執行 done() 方法,而 QueueingFuture 恰巧重寫了 FutureTask 的 done() 方法:
方法實現很簡單,就是將 task 放到阻塞隊列中
protected void done() {
completionQueue.add(task);
}
執行到此的 task 已是前序步驟 set 過結果的 task,因此就能夠經過消費阻塞隊列獲取相應的結果了
相信到這裏,CompletionService 在你面前應該沒什麼祕密可言了
CompletionService 的主要用途
在 JDK docs 上明確給了兩個例子來講明 CompletionService 的用途:
假設你有一組針對某個問題的solvers,每一個都返回一個類型爲Result的值,而且想要併發地運行它們,處理每一個返回一個非空值的結果,在某些方法使用(Result r)
其實就是文中開頭的使用方式
void solve(Executor e,
Collection<Callable<Result>> solvers)
throws InterruptedException, ExecutionException {
CompletionService<Result> ecs
= new ExecutorCompletionService<Result>(e);
for (Callable<Result> s : solvers)
ecs.submit(s);
int n = solvers.size();
for (int i = 0; i < n; ++i) {
Result r = ecs.take().get();
if (r != null)
use(r);
}
}
假設你想使用任務集的第一個非空結果,忽略任何遇到異常的任務,並在第一個任務準備好時取消全部其餘任務
void solve(Executor e,
Collection<Callable<Result>> solvers)
throws InterruptedException {
CompletionService<Result> ecs
= new ExecutorCompletionService<Result>(e);
int n = solvers.size();
List<Future<Result>> futures
= new ArrayList<Future<Result>>(n);
Result result = null;
try {
for (Callable<Result> s : solvers)
futures.add(ecs.submit(s));
for (int i = 0; i < n; ++i) {
try {
Result r = ecs.take().get();
if (r != null) {
result = r;
break;
}
} catch (ExecutionException ignore) {}
}
}
finally {
for (Future<Result> f : futures)
// 注意這裏的參數給的是 true,詳解一樣在前序 Future 源碼分析文章中
f.cancel(true);
}
if (result != null)
use(result);
}
這兩種方式都是很是經典的 CompletionService 使用 範式 ,請你們仔細品味每一行代碼的用意
範式沒有說明 Executor 的使用,使用 ExecutorCompletionService,須要本身建立線程池,看上去雖然有些麻煩,但好處是你可讓多個 ExecutorCompletionService 的線程池隔離,這種隔離性能避免幾個特別耗時的任務拖垮整個應用的風險 (這也是咱們反覆說過屢次的,不要全部業務共用一個線程池)
總結
CompletionService 的應用場景仍是很是多的,好比
-
Dubbo 中的 Forking Cluster -
多倉庫文件/鏡像下載(從最近的服務中心下載後終止其餘下載過程) -
多服務調用(天氣預報服務,最早獲取到的結果)
CompletionService 不但能知足獲取最快結果,還能起到必定 "load balancer" 做用,獲取可用服務的結果,使用也很是簡單, 只須要遵循範式便可
併發系列 講了這麼多,分析源碼的過程也碰到各類隊列,接下來咱們就看看那些讓人眼花繚亂的隊列
靈魂追問
-
一般處理結果還會用異步方式進行處理,若是採用這種方式,有哪些注意事項? -
若是是你,你會選擇使用無界隊列嗎?爲何?
<<< 左右滑動見更多 >>>
2020-08-06
2020-08-06
2020-08-03
本文分享自微信公衆號 - 日拱一兵(gh_6235a38420b9)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。