「既生 ExecutorService, 何生 CompletionService?」

前言

我會手動建立線程,爲何要使用線程池? 中詳細的介紹了 ExecutorService,能夠將整塊任務拆分作簡單的並行處理;html

不會用Java Future,我懷疑你泡茶沒我快 中又詳細的介紹了 Future 的使用,填補了 Runnable 不能獲取線程執行結果的空缺java

將兩者結合起來使用看似要一招吃天下了(Java有併發,併發之大,一口吃不下), but ~~ 是我太天真編程

ExecutorService VS CompletionService

假設咱們有 4 個任務(A, B, C, D)用來執行復雜的計算,每一個任務的執行時間隨着輸入參數的不一樣而不一樣,若是將任務提交到 ExecutorService, 相信你已經能夠「信手拈來」併發

ExecutorService executorService = Executors.newFixedThreadPool(4);
List<Future> futures = new ArrayList<Future<Integer>>();
futures.add(executorService.submit(A));
futures.add(executorService.submit(B));
futures.add(executorService.submit(C));
futures.add(executorService.submit(D));

// 遍歷 Future list,經過 get() 方法獲取每一個 future 結果
for (Future future:futures) {
    Integer result = future.get();
    // 其餘業務邏輯
}

先直入主題,用 CompletionService 實現一樣的場景異步

ExecutorService executorService = Executors.newFixedThreadPool(4);

// ExecutorCompletionService 是 CompletionService 惟一實現類
CompletionService executorCompletionService= new ExecutorCompletionService<>(executorService );

List<Future> futures = new ArrayList<Future<Integer>>();
futures.add(executorCompletionService.submit(A));
futures.add(executorCompletionService.submit(B));
futures.add(executorCompletionService.submit(C));
futures.add(executorCompletionService.submit(D));

// 遍歷 Future list,經過 get() 方法獲取每一個 future 結果
for (int i=0; i<futures.size(); i++) {
    Integer result = executorCompletionService.take().get();
    // 其餘業務邏輯
}

兩種方式在代碼實現上幾乎一毛同樣,咱們曾經說過 JDK 中不會重複造輪子,若是要造一個新輪子,一定是原有的輪子在某些場景的使用上有致命缺陷異步編程

既然新輪子出來了,兩者到底有啥不一樣呢? 在 搞定 CompletableFuture,併發異步編程和編寫串行程序還有什麼區別? 文中,咱們提到了 Future get() 方法的致命缺陷:函數

若是 Future 結果沒有完成,調用 get() 方法,程序會 阻塞在那裏,直至獲取返回結果

先來看第一種實現方式,假設任務 A 因爲參數緣由,執行時間相對任務 B,C,D 都要長不少,可是按照程序的執行順序,程序在 get() 任務 A 的執行結果會阻塞在那裏,致使任務 B,C,D 的後續任務沒辦法執行。又由於每一個任務執行時間是不固定的,因此不管怎樣調整將任務放到 List 的順序,都不合適,這就是致命弊端高併發

新輪子天然要解決這個問題,它的設計理念就是哪一個任務先執行完成,get() 方法就會獲取到相應的任務結果,這麼作的好處是什麼呢?來看個圖你就瞬間理解了源碼分析

兩張圖一對比,執行時長高下立判了,在當今高併發的時代,這點時間差,在吞吐量上起到的效果可能不是一點半點了性能

那 CompletionService 是怎麼作到獲取最早執行完的任務結果的呢?

遠看CompletionService 輪廓

若是你使用過消息隊列,你應該秒懂我要說什麼了,CompletionService 實現原理很簡單

就是一個將異步任務的生產和任務完成結果的消費解耦的服務

用人話解釋一下上面的抽象概念我只能再畫一張圖了

說白了,哪一個任務執行的完,就直接將執行結果放到隊列中,這樣消費者拿到的結果天然就是最先拿到的那個了

從上圖中看到,有任務,有結果隊列,那 CompletionService 天然也要圍繞着幾個關鍵字作文章了

  • 既然是異步任務,那天然可能用到 Runnable 或 Callable
  • 既然能獲取到結果,天然也會用到 Future 了

帶着這些線索,咱們走進 CompletionService 源碼看一看

近看 CompletionService 源碼

CompletionService 是一個接口,它簡單的只有 5 個方法:

Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;

關於 2 個 submit 方法, 我在 不會用Java Future,我懷疑你泡茶沒我快 文章中作了很是詳細的分析以及案例使用說明,這裏再也不過多贅述

另外 3 個方法都是從阻塞隊列中獲取並移除阻塞隊列第一個元素,只不過他們的功能略有不一樣

  • Take: 若是隊列爲空,那麼調用 take() 方法的線程會被阻塞
  • Poll: 若是隊列爲空,那麼調用 poll() 方法的線程會返回 null
  • Poll-timeout: 以超時的方式獲取並移除阻塞隊列中的第一個元素,若是超時時間到,隊列仍是空,那麼該方法會返回 null

因此說,按大類劃分上面5個方法,其實就是兩個功能

  • 提交異步任務 (submit)
  • 從隊列中拿取並移除第一個元素 (take/poll)

CompletionService 只是接口,ExecutorCompletionService 是該接口的惟一實現類

ExecutorCompletionService 源碼分析

先來看一下類結構, 實現類裏面並無多少內容

<fancybox></fancybox>

ExecutorCompletionService 有兩種構造函數:

private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;

public ExecutorCompletionService(Executor executor) {
    if (executor == null)
        throw new NullPointerException();
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
public ExecutorCompletionService(Executor executor,
                                 BlockingQueue<Future<V>> completionQueue) {
    if (executor == null || completionQueue == null)
        throw new NullPointerException();
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = completionQueue;
}

兩個構造函數都須要傳入一個 Executor 線程池,由於是處理異步任務的,咱們是不被容許手動建立線程的,因此這裏要使用線程池也就很好理解了

另一個參數是 BlockingQueue,若是不傳該參數,就會默認隊列爲 LinkedBlockingQueue,任務執行結果就是加入到這個阻塞隊列中的

因此要完全理解 ExecutorCompletionService ,咱們只須要知道一個問題的答案就能夠了:

它是如何將異步任務結果放到這個阻塞隊列中的?

想知道這個問題的答案,那隻須要看它提交任務以後都作了些什麼?

public Future<V> submit(Callable<V> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task);
    executor.execute(new QueueingFuture(f));
    return f;
}

咱們前面也分析過,execute 是提交 Runnable 類型的任務,自己得不到返回值,但又能夠將執行結果放到阻塞隊列裏面,因此確定是在 QueueingFuture 裏面作了文章

從上圖中看一看出,QueueingFuture 實現的接口很是多,因此說也就具有了相應的接口能力。

重中之重是,它繼承了 FutureTask ,FutureTask 重寫了 Runnable 的 run() 方法 (方法細節分析能夠查看FutureTask源碼分析 ) 文中詳細說明了,不管是set() 正常結果,仍是setException() 結果,都會調用 finishCompletion() 方法:

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

      // 重點 重點 重點
    done();

    callable = null;        // to reduce footprint
}

上述方法會執行 done() 方法,而 QueueingFuture 恰巧重寫了 FutureTask 的 done() 方法:

方法實現很簡單,就是將 task 放到阻塞隊列中

protected void done() { 
  completionQueue.add(task); 
}

執行到此的 task 已是前序步驟 set 過結果的 task,因此就能夠經過消費阻塞隊列獲取相應的結果了

相信到這裏,CompletionService 在你面前應該沒什麼祕密可言了

CompletionService 的主要用途

在 JDK docs 上明確給了兩個例子來講明 CompletionService 的用途:

假設你有一組針對某個問題的solvers,每一個都返回一個類型爲Result的值,而且想要併發地運行它們,處理每一個返回一個非空值的結果,在某些方法使用(Result r)

其實就是文中開頭的使用方式

void solve(Executor e,
            Collection<Callable<Result>> solvers)
     throws InterruptedException, ExecutionException {
     CompletionService<Result> ecs
         = new ExecutorCompletionService<Result>(e);
     for (Callable<Result> s : solvers)
         ecs.submit(s);
     int n = solvers.size();
     for (int i = 0; i < n; ++i) {
         Result r = ecs.take().get();
         if (r != null)
             use(r);
     }
 }
假設你想使用任務集的第一個非空結果,忽略任何遇到異常的任務,並在第一個任務準備好時取消全部其餘任務
void solve(Executor e,
            Collection<Callable<Result>> solvers)
     throws InterruptedException {
     CompletionService<Result> ecs
         = new ExecutorCompletionService<Result>(e);
     int n = solvers.size();
     List<Future<Result>> futures
         = new ArrayList<Future<Result>>(n);
     Result result = null;
     try {
         for (Callable<Result> s : solvers)
             futures.add(ecs.submit(s));
         for (int i = 0; i < n; ++i) {
             try {
                 Result r = ecs.take().get();
                 if (r != null) {
                     result = r;
                     break;
                 }
             } catch (ExecutionException ignore) {}
         }
     }
     finally {
         for (Future<Result> f : futures)
               // 注意這裏的參數給的是 true,詳解一樣在前序 Future 源碼分析文章中
             f.cancel(true);
     }

     if (result != null)
         use(result);
 }

這兩種方式都是很是經典的 CompletionService 使用 範式 ,請你們仔細品味每一行代碼的用意

範式沒有說明 Executor 的使用,使用 ExecutorCompletionService,須要本身建立線程池,看上去雖然有些麻煩,但好處是你可讓多個 ExecutorCompletionService 的線程池隔離,這種隔離性能避免幾個特別耗時的任務拖垮整個應用的風險 (這也是咱們反覆說過屢次的,不要全部業務共用一個線程池

總結

CompletionService 的應用場景仍是很是多的,好比

  • Dubbo 中的 Forking Cluster
  • 多倉庫文件/鏡像下載(從最近的服務中心下載後終止其餘下載過程)
  • 多服務調用(天氣預報服務,最早獲取到的結果)

CompletionService 不但能知足獲取最快結果,還能起到必定 "load balancer" 做用,獲取可用服務的結果,使用也很是簡單, 只須要遵循範式便可

併發系列 講了這麼多,分析源碼的過程也碰到各類隊列,接下來咱們就看看那些讓人眼花繚亂的隊列

靈魂追問

  1. 一般處理結果還會用異步方式進行處理,若是採用這種方式,有哪些注意事項?
  2. 若是是你,你會選擇使用無界隊列嗎?爲何?

日拱一兵 | 原創

相關文章
相關標籤/搜索