ExecutorCompletionService 源碼分析

概要

在ExecutorService的submit方法中能夠獲取返回值,經過Future的get方法,可是這個Future類存在缺陷,Future接口調用get()方法取得處理後的返回結果時具備阻塞性,也就是說調用Future的get方法時,任務沒有執行完成,則get方法要一直阻塞等到任務完成爲止。 這樣大大的影響了系統的性能,這就是Future的最大缺點。爲此,java1.5之後提供了CompletionServlice來解決這個問題。java

CompletionService 接口CompletionService的功能是異步的方式,一邊生產任務,一邊處理完成的任務結果,這樣能夠將執行的任務與處理任務隔離開來進行處理,使用submit執行任務,使用塔克獲取已完成的任務,並按照這些任務的完成的時間順序來處理他們的結果。dom

示例

向ExecutorService 提交一組任務,哪一個任務先完成,就把完成任務的返回結果打印出來。異步

public class CompletionServiceExecutorDemo {
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        // 同時運行多個任務,那個任務先返回數據,就先獲取該數據
        CompletionService<String> completionService = new ExecutorCompletionService<String>(threadPool);
        for (int i = 1; i <= 10; i++) {
            final int seq = i;
            completionService.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    int waitTime = new Random().nextInt(10);
                    TimeUnit.SECONDS.sleep(waitTime);
                    return "callable:"+seq+" 執行時間:"+waitTime+"s";
                }
            });
        }

        for (int i = 1; i <= 10; i++) {
            try {
                Future<String> future = completionService.take();
                System.out.println(future.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        threadPool.shutdown();
    }
}

執行結果以下:ide

callable:6 執行時間:1s
callable:2 執行時間:3s
callable:10 執行時間:3s
callable:1 執行時間:4s
callable:4 執行時間:5s
callable:8 執行時間:5s
callable:7 執行時間:7s
callable:5 執行時間:8s
callable:9 執行時間:9s
callable:3 執行時間:9s

從打印結果能夠看出,這些任務是按照任務執行完成的順序打印的,先執行完就先返回結果。源碼分析

ExecutorCompletionService 源碼分析

ExecutorCompletionService 類結構以下性能

public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;  //線程池
    private final AbstractExecutorService aes;  
    private final BlockingQueue<Future<V>> completionQueue;  //任務完成隊列


    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

ExecutorCompletionService 類中定義了一個QueueingFuture 的內部類,繼承於FutureTask類,內部重寫了FutureTask的done方法,該方法是在FutureTask任務執行完成後會調用的方法,在FutureTask中該方法未實現任何邏輯。this

重寫done方法,在任務處理完成後把該FutureTask任務放入到阻塞隊列(BlockingQueue)中,而後咱們就能夠從阻塞隊列中take執行完成的任務,進行想用的處理。spa

這裏是實現ExecutorCompletionService的核心邏輯。線程

newTaskFor 方法

private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            return new FutureTask<V>(task);
        else
            return aes.newTaskFor(task);
    }

    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
        if (aes == null)
            return new FutureTask<V>(task, result);
        else
            return aes.newTaskFor(task, result);
    }

ExecutorCompletionService 支持Callable和Runnable任務code

  1. 把用戶提交的Callable任務轉成FutureTask。
  2. 把用戶提交的Runnable任務轉成FutureTask。

ExecutorCompletionService 構造方法1

public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }
  1. 鏈接池(executor)不能爲空。
  2. 判斷該線程池是否AbstractExecutorService類型,若是是則賦值給aes,不然賦值null
    (aes做用:把用戶提交的Callable和Runnable任務轉換成FutureTask)
  3. 建立一個阻塞隊列。(存放執行完成的FutureTask任務)

ExecutorCompletionService 構造方法2

public ExecutorCompletionService(Executor executor,
                                     BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }

該構造能夠指定一個阻塞隊列,其它功能同上構造方法。

submit方法

public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }
 
    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture(f));
        return f;
    }

該方法能夠向ExecutorCompletionService 中提交要執行的任務。
支持Callable和Runnable兩種類型的任務。
若是提交的Runnable任務,則執行完後返回的結果爲null。

public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

從阻塞隊列中獲取執行完成的任務的,若是隊列爲空且任務沒有所有完成,則阻塞當前線程,直到有任務執行完成。

public Future<V> poll() {
        return completionQueue.poll();
    }

    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

}

ExecutorCompletionService支持非阻塞方式從阻塞隊列中獲取已完成的任務

  1. 能夠經過poll方法來從阻塞隊列中獲取任務,若是隊列爲空,則直接返回null,不會阻塞當前線程。
  2. 支持等待多長時間來從阻塞隊列中獲取已經完成的任務。

總結

ExecutorCompletionService的實現原理是內部使用了FutureTask來實現異步的任務執行。經過一個內部類繼承FutureTask,並實現了FutureTask的一個done方法。該done方法會在任務執行完成以後調用該方法,在任務執行完以後把當前的FutureTask放入到阻塞隊列中。這樣就實現了先執行完成的任務先存放到阻塞隊列中,應用程序能夠從阻塞隊列中提早獲取先執行完的任務。

相關文章
相關標籤/搜索