在ExecutorService的submit方法中能夠獲取返回值,經過Future的get方法,可是這個Future類存在缺陷,Future接口調用get()方法取得處理後的返回結果時具備阻塞性,也就是說調用Future的get方法時,任務沒有執行完成,則get方法要一直阻塞等到任務完成爲止。 這樣大大的影響了系統的性能,這就是Future的最大缺點。爲此,java1.5之後提供了CompletionServlice來解決這個問題。java
CompletionService 接口CompletionService的功能是異步的方式,一邊生產任務,一邊處理完成的任務結果,這樣能夠將執行的任務與處理任務隔離開來進行處理,使用submit執行任務,使用塔克獲取已完成的任務,並按照這些任務的完成的時間順序來處理他們的結果。dom
向ExecutorService 提交一組任務,哪一個任務先完成,就把完成任務的返回結果打印出來。異步
public class CompletionServiceExecutorDemo { public static void main(String[] args) { ExecutorService threadPool = Executors.newFixedThreadPool(10); // 同時運行多個任務,那個任務先返回數據,就先獲取該數據 CompletionService<String> completionService = new ExecutorCompletionService<String>(threadPool); for (int i = 1; i <= 10; i++) { final int seq = i; completionService.submit(new Callable<String>() { @Override public String call() throws Exception { int waitTime = new Random().nextInt(10); TimeUnit.SECONDS.sleep(waitTime); return "callable:"+seq+" 執行時間:"+waitTime+"s"; } }); } for (int i = 1; i <= 10; i++) { try { Future<String> future = completionService.take(); System.out.println(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } threadPool.shutdown(); } }
執行結果以下:ide
callable:6 執行時間:1s callable:2 執行時間:3s callable:10 執行時間:3s callable:1 執行時間:4s callable:4 執行時間:5s callable:8 執行時間:5s callable:7 執行時間:7s callable:5 執行時間:8s callable:9 執行時間:9s callable:3 執行時間:9s
從打印結果能夠看出,這些任務是按照任務執行完成的順序打印的,先執行完就先返回結果。源碼分析
ExecutorCompletionService 類結構以下性能
public class ExecutorCompletionService<V> implements CompletionService<V> { private final Executor executor; //線程池 private final AbstractExecutorService aes; private final BlockingQueue<Future<V>> completionQueue; //任務完成隊列 private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future<V> task; }
ExecutorCompletionService 類中定義了一個QueueingFuture 的內部類,繼承於FutureTask類,內部重寫了FutureTask的done方法,該方法是在FutureTask任務執行完成後會調用的方法,在FutureTask中該方法未實現任何邏輯。this
重寫done方法,在任務處理完成後把該FutureTask任務放入到阻塞隊列(BlockingQueue)中,而後咱們就能夠從阻塞隊列中take執行完成的任務,進行想用的處理。spa
這裏是實現ExecutorCompletionService的核心邏輯。線程
private RunnableFuture<V> newTaskFor(Callable<V> task) { if (aes == null) return new FutureTask<V>(task); else return aes.newTaskFor(task); } private RunnableFuture<V> newTaskFor(Runnable task, V result) { if (aes == null) return new FutureTask<V>(task, result); else return aes.newTaskFor(task, result); }
ExecutorCompletionService 支持Callable和Runnable任務code
public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); }
public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) { if (executor == null || completionQueue == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = completionQueue; }
該構造能夠指定一個阻塞隊列,其它功能同上構造方法。
public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; } public Future<V> submit(Runnable task, V result) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task, result); executor.execute(new QueueingFuture(f)); return f; }
該方法能夠向ExecutorCompletionService 中提交要執行的任務。
支持Callable和Runnable兩種類型的任務。
若是提交的Runnable任務,則執行完後返回的結果爲null。
public Future<V> take() throws InterruptedException { return completionQueue.take(); }
從阻塞隊列中獲取執行完成的任務的,若是隊列爲空且任務沒有所有完成,則阻塞當前線程,直到有任務執行完成。
public Future<V> poll() { return completionQueue.poll(); } public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException { return completionQueue.poll(timeout, unit); } }
ExecutorCompletionService支持非阻塞方式從阻塞隊列中獲取已完成的任務
ExecutorCompletionService的實現原理是內部使用了FutureTask來實現異步的任務執行。經過一個內部類繼承FutureTask,並實現了FutureTask的一個done方法。該done方法會在任務執行完成以後調用該方法,在任務執行完以後把當前的FutureTask放入到阻塞隊列中。這樣就實現了先執行完成的任務先存放到阻塞隊列中,應用程序能夠從阻塞隊列中提早獲取先執行完的任務。