ExecutorCompletionService源碼分析

1、簡介

          ExecutorCompletionService是線程池和隊列的配合使用,它的內部封裝了線程池(線程池須要在構造ExecutorCompletionService對象時傳入),也能夠自定義隊列傳入構造函數, 若是沒傳入,將構造無限的隊列LinkedBlockingQueue,將提交的任務代理給線程池執行(任務是FutureTask的子類 QueueingFuture,QueueingFuture重寫了done()方法,done方法在FutureTask類中是空實現,任務執行完會調用此方法,不清楚的能夠看下個人另外一篇FutureTask源碼分析,模板方法, 子類能夠進行重寫),由於提交的任務被轉換爲QueueingFuture對象,QueueingFuture任務對象處理完成以後,會調用重寫的done方法主動將該執行完的QueueingFuture任務放到ExecutorCompletionService維護的阻塞隊列中,所以執行完成的任務都會被放到阻塞隊列中,若是想要得到任務的執行結果時,只需調用take()或者poll()方法獲取便可。線程池下一篇會進行介紹。
java

2、屬性

//線程池,執行任務
private final Executor executor;
//若是在構造函數傳入進來的線程池參數,是AbstractExecutorService的子類,也會將線程池賦值給該屬性,該類主要用來將Callable和Runnable任務包裝成FutureTask任務,下面會進行介紹
private final AbstractExecutorService aes;
//存放執行完成的任務,若是任務執行出現異常,不會存放在隊列中,隊列若是沒有自定義,在構造函數傳入進來,ExecutorCompletionService會建立一個無限的鏈表隊列,若是沒有及時從隊列獲取執行完成的任務,有可能會致使內存溢出
private final BlockingQueue<Future<V>> completionQueue;複製代碼

3、構造函數

//傳入線程池構造ExecutorCompletionService實例
//@param executor 線程池對象,用來執行任務
public ExecutorCompletionService(Executor executor) {
        //若是傳入的線程池對象爲空,會拋出空指針異常
        if (executor == null)
            //拋出空指針異常 
            throw new NullPointerException();
        //將傳入進來的線程池對象賦值給ExecutorCompletionService實例屬性executor
        this.executor = executor;
        //若是傳入進來的線程池對象是AbstractExecutorService的子類,也會將線程池賦值給ExecutorCompletionService實例屬性aes,不然的話aes賦值爲空 
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        //因爲沒有自定義隊列,會建立一個無限的鏈表隊列LinkedBlockingQueue,用來存放執行完成的任務,若是任務執行出現異常,不會存放在此隊列中
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

//傳入線程池和自定義隊列構造ExecutorCompletionService實例
public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) {
        //若是傳入進來的線程池對象爲空,或者傳入進來的隊列爲空,拋出空指針異常
        if (executor == null || completionQueue == null)
            //拋出空指針異常
            throw new NullPointerException();
        //將傳入進來的線程池對象賦值給ExecutorCompletionService實例屬性executor
        this.executor = executor;
        //若是傳入進來的線程池對象是AbstractExecutorService的子類,也會將線程池賦值給ExecutorCompletionService實例屬性aes,不然的話aes賦值爲空 
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        //將傳入進來的隊列對象賦值給ExecutorCompletionService實例屬性completionQueue
        this.completionQueue = completionQueue;
}
複製代碼

4、內部類

  1. QueueingFuture內部類

    //QueueingFuture是FutureTask的子類,重寫了FutureTask中done方法,done方法在FutureTask中是個空方法,模板方法,子類能夠進行重寫
    private class QueueingFuture extends FutureTask<Void> {
           //QueueingFuture構造函數,將RunnableFuture對象從新適配成QueueingFuture實例對象,對FutureTask不清楚的能夠看下個人另外一篇FutureTask源碼分析https://juejin.im/post/5d08be8ce51d455d6c0ad925 
           QueueingFuture(RunnableFuture<V> task) {
                //調用父類FutureTask的構造函數,FutureTask不清楚的能夠看下個人另外一篇對FutureTask的源碼分析
                super(task, null);
                //將傳入進來的任務賦值給task屬性
                this.task = task;
            }
            //done方法在FutureTask中是個空方法,模板方法,在任務被執行完時,設置任務執行結果中,finishCompletion會調用done()方法,對任務執行完成,作一些處理,QueueingFuture對done方法進行重寫,將執行完成的任務加入到隊列中,這樣就能夠從隊列中獲取任務執行完的結果 
            protected void done() { completionQueue.add(task); }
            //傳入進來的任務 
            private final Future<V> task;
    }複製代碼

5、封裝任務

//將傳入進來的Callable類型的任務封裝成RunnableFuture任務
private RunnableFuture<V> newTaskFor(Callable<V> task) {
        //若是傳入進來的線程池對象executor是AbstractExecutorService的子類,aes就賦值爲線程池對象
        //若是aes爲空
        if (aes == null)
            //直接使用FutureTask的構造函數將task封裝成FutureTask
            return new FutureTask<V>(task);
        else
            //調用aes.newTaskFor方法將傳入進來的task封裝成FutureTask,newTaskFor方法內部也是使用FutureTask的構造函數將task封裝成FutureTask
            return aes.newTaskFor(task);
}

//將傳入進來的Runnable類型的任務封裝成RunnableFuture任務
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
    /若是傳入進來的線程池對象executor是AbstractExecutorService的子類,aes就賦值爲線程池對象
    //若是aes爲空
    if (aes == null)
        //直接使用FutureTask的構造函數將task封裝成FutureTask
        return new FutureTask<V>(task, result);
    else
        //調用aes.newTaskFor方法將傳入進來的task封裝成FutureTask,newTaskFor方法內部也是使用FutureTask的構造函數將task封裝成FutureTask
        return aes.newTaskFor(task, result);
}
複製代碼

6、執行任務

//提交Callable類型的任務到線程池執行
public Future<V> submit(Callable<V> task) { 
        //若是傳入進來的任務爲空直接拋出空指針異常
        if (task == null) throw new NullPointerException();
        //使用上面介紹的newTaskFor方法將傳入進來的Callable類型的任務封裝成RunnableFuture任務
        RunnableFuture<V> f = newTaskFor(task);
        //QueueingFuture將RunnableFuture適配成QueueingFuture實例,QueueingFuture重寫了FutureTask的done方法,任務執行完成調用done方法,將完成任務加入隊列中,不清楚的能夠看上面QueueingFuture內部類的介紹
        executor.execute(new QueueingFuture(f));
        //返回RunnableFuture類型的任務
        return f;
}

//提交Runnable類型的任務到線程池執行,FutureTask也會調用Executors的callable方法將Runnable適配成Callable類型任務
public Future<V> submit(Runnable task, V result) {
    //若是傳入進來的任務爲空直接拋出空指針異常
    if (task == null) throw new NullPointerException();
    //使用上面介紹的newTaskFor方法將傳入進來的Runnable類型的任務封裝成RunnableFuture任務 
    RunnableFuture<V> f = newTaskFor(task, result);
    //QueueingFuture將RunnableFuture適配成QueueingFuture實例,QueueingFuture重寫了FutureTask的done方法,任務執行完成調用done方法,將完成任務加入隊列中,不清楚的能夠看上面QueueingFuture內部類的介紹
    executor.execute(new QueueingFuture(f));
    //返回RunnableFuture類型的任務
    return f;
}複製代碼

7、獲取任務結果

//獲取完成的任務,獲取不到會阻塞,直到獲取到或者等待線程被中斷拋出中斷異常,隊列的源碼等有空會進行分析
public Future<V> take() throws InterruptedException {
        //從隊列中獲取完成的任務
        return completionQueue.take();
}

//獲取完成的任務,獲取不到不會阻塞,不支持中斷,從隊列中獲取不到完成的任務直接返回null
public Future<V> poll() {
    //從隊列中獲取完成的任務 
    return completionQueue.poll();
}

//獲取完成的任務,循環的獲取已完成的任務,直到獲取到,或者超時,或者獲取線程被中斷,支持中斷異常,從隊列中獲取不
public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
    //從隊列中獲取完成的任務 
    return completionQueue.poll(timeout, unit);
}複製代碼
相關文章
相關標籤/搜索