ExecutorCompletionService是線程池和隊列的配合使用,它的內部封裝了線程池(線程池須要在構造ExecutorCompletionService對象時傳入),也能夠自定義隊列傳入構造函數, 若是沒傳入,將構造無限的隊列LinkedBlockingQueue,將提交的任務代理給線程池執行(任務是FutureTask的子類 QueueingFuture,QueueingFuture重寫了done()方法,done方法在FutureTask類中是空實現,任務執行完會調用此方法,不清楚的能夠看下個人另外一篇FutureTask源碼分析,模板方法, 子類能夠進行重寫),由於提交的任務被轉換爲QueueingFuture對象,QueueingFuture任務對象處理完成以後,會調用重寫的done方法主動將該執行完的QueueingFuture任務放到ExecutorCompletionService維護的阻塞隊列中,所以執行完成的任務都會被放到阻塞隊列中,若是想要得到任務的執行結果時,只需調用take()或者poll()方法獲取便可。線程池下一篇會進行介紹。
java
//線程池,執行任務
private final Executor executor;
//若是在構造函數傳入進來的線程池參數,是AbstractExecutorService的子類,也會將線程池賦值給該屬性,該類主要用來將Callable和Runnable任務包裝成FutureTask任務,下面會進行介紹
private final AbstractExecutorService aes;
//存放執行完成的任務,若是任務執行出現異常,不會存放在隊列中,隊列若是沒有自定義,在構造函數傳入進來,ExecutorCompletionService會建立一個無限的鏈表隊列,若是沒有及時從隊列獲取執行完成的任務,有可能會致使內存溢出
private final BlockingQueue<Future<V>> completionQueue;複製代碼
//傳入線程池構造ExecutorCompletionService實例
//@param executor 線程池對象,用來執行任務
public ExecutorCompletionService(Executor executor) {
//若是傳入的線程池對象爲空,會拋出空指針異常
if (executor == null)
//拋出空指針異常
throw new NullPointerException();
//將傳入進來的線程池對象賦值給ExecutorCompletionService實例屬性executor
this.executor = executor;
//若是傳入進來的線程池對象是AbstractExecutorService的子類,也會將線程池賦值給ExecutorCompletionService實例屬性aes,不然的話aes賦值爲空
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
//因爲沒有自定義隊列,會建立一個無限的鏈表隊列LinkedBlockingQueue,用來存放執行完成的任務,若是任務執行出現異常,不會存放在此隊列中
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
//傳入線程池和自定義隊列構造ExecutorCompletionService實例
public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) {
//若是傳入進來的線程池對象爲空,或者傳入進來的隊列爲空,拋出空指針異常
if (executor == null || completionQueue == null)
//拋出空指針異常
throw new NullPointerException();
//將傳入進來的線程池對象賦值給ExecutorCompletionService實例屬性executor
this.executor = executor;
//若是傳入進來的線程池對象是AbstractExecutorService的子類,也會將線程池賦值給ExecutorCompletionService實例屬性aes,不然的話aes賦值爲空
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
//將傳入進來的隊列對象賦值給ExecutorCompletionService實例屬性completionQueue
this.completionQueue = completionQueue;
}
複製代碼
//QueueingFuture是FutureTask的子類,重寫了FutureTask中done方法,done方法在FutureTask中是個空方法,模板方法,子類能夠進行重寫
private class QueueingFuture extends FutureTask<Void> {
//QueueingFuture構造函數,將RunnableFuture對象從新適配成QueueingFuture實例對象,對FutureTask不清楚的能夠看下個人另外一篇FutureTask源碼分析https://juejin.im/post/5d08be8ce51d455d6c0ad925
QueueingFuture(RunnableFuture<V> task) {
//調用父類FutureTask的構造函數,FutureTask不清楚的能夠看下個人另外一篇對FutureTask的源碼分析
super(task, null);
//將傳入進來的任務賦值給task屬性
this.task = task;
}
//done方法在FutureTask中是個空方法,模板方法,在任務被執行完時,設置任務執行結果中,finishCompletion會調用done()方法,對任務執行完成,作一些處理,QueueingFuture對done方法進行重寫,將執行完成的任務加入到隊列中,這樣就能夠從隊列中獲取任務執行完的結果
protected void done() { completionQueue.add(task); }
//傳入進來的任務
private final Future<V> task;
}複製代碼
//將傳入進來的Callable類型的任務封裝成RunnableFuture任務
private RunnableFuture<V> newTaskFor(Callable<V> task) {
//若是傳入進來的線程池對象executor是AbstractExecutorService的子類,aes就賦值爲線程池對象
//若是aes爲空
if (aes == null)
//直接使用FutureTask的構造函數將task封裝成FutureTask
return new FutureTask<V>(task);
else
//調用aes.newTaskFor方法將傳入進來的task封裝成FutureTask,newTaskFor方法內部也是使用FutureTask的構造函數將task封裝成FutureTask
return aes.newTaskFor(task);
}
//將傳入進來的Runnable類型的任務封裝成RunnableFuture任務
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
/若是傳入進來的線程池對象executor是AbstractExecutorService的子類,aes就賦值爲線程池對象
//若是aes爲空
if (aes == null)
//直接使用FutureTask的構造函數將task封裝成FutureTask
return new FutureTask<V>(task, result);
else
//調用aes.newTaskFor方法將傳入進來的task封裝成FutureTask,newTaskFor方法內部也是使用FutureTask的構造函數將task封裝成FutureTask
return aes.newTaskFor(task, result);
}
複製代碼
//提交Callable類型的任務到線程池執行
public Future<V> submit(Callable<V> task) {
//若是傳入進來的任務爲空直接拋出空指針異常
if (task == null) throw new NullPointerException();
//使用上面介紹的newTaskFor方法將傳入進來的Callable類型的任務封裝成RunnableFuture任務
RunnableFuture<V> f = newTaskFor(task);
//QueueingFuture將RunnableFuture適配成QueueingFuture實例,QueueingFuture重寫了FutureTask的done方法,任務執行完成調用done方法,將完成任務加入隊列中,不清楚的能夠看上面QueueingFuture內部類的介紹
executor.execute(new QueueingFuture(f));
//返回RunnableFuture類型的任務
return f;
}
//提交Runnable類型的任務到線程池執行,FutureTask也會調用Executors的callable方法將Runnable適配成Callable類型任務
public Future<V> submit(Runnable task, V result) {
//若是傳入進來的任務爲空直接拋出空指針異常
if (task == null) throw new NullPointerException();
//使用上面介紹的newTaskFor方法將傳入進來的Runnable類型的任務封裝成RunnableFuture任務
RunnableFuture<V> f = newTaskFor(task, result);
//QueueingFuture將RunnableFuture適配成QueueingFuture實例,QueueingFuture重寫了FutureTask的done方法,任務執行完成調用done方法,將完成任務加入隊列中,不清楚的能夠看上面QueueingFuture內部類的介紹
executor.execute(new QueueingFuture(f));
//返回RunnableFuture類型的任務
return f;
}複製代碼
//獲取完成的任務,獲取不到會阻塞,直到獲取到或者等待線程被中斷拋出中斷異常,隊列的源碼等有空會進行分析
public Future<V> take() throws InterruptedException {
//從隊列中獲取完成的任務
return completionQueue.take();
}
//獲取完成的任務,獲取不到不會阻塞,不支持中斷,從隊列中獲取不到完成的任務直接返回null
public Future<V> poll() {
//從隊列中獲取完成的任務
return completionQueue.poll();
}
//獲取完成的任務,循環的獲取已完成的任務,直到獲取到,或者超時,或者獲取線程被中斷,支持中斷異常,從隊列中獲取不
public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
//從隊列中獲取完成的任務
return completionQueue.poll(timeout, unit);
}複製代碼