/** * A service that decouples the production of new asynchronous tasks * from the consumption of the results of completed tasks. Producers * {@code submit} tasks for execution. Consumers {@code take} * completed tasks and process their results in the order they * complete. A {@code CompletionService} can for example be used to * manage asynchronous I/O, in which tasks that perform reads are * submitted in one part of a program or system, and then acted upon * in a different part of the program when the reads complete, * possibly in a different order than they were requested.**/
參與java doc能夠看到如上描述。簡單來講就是CompletionService使(批)任務異步執行與任務結果處理分離:即生產者執行任務,消費者處理任務結果;而且在後面描述一個在異步I/O上的一個使用場景。java
Future<V> submit(Callable<V> task);
@param result the result to return upon successful completion Future<V> submit(Runnable task, V result);
/** * 此方法阻塞獲取已完成的任務Future,並從任務列表中移除 * @return the Future representing the next completed task * @throws InterruptedException if interrupted while waiting */ Future<V> take() throws InterruptedException;
/** * 比較take,此方法是非阻塞的,若是沒有完成的任務,返回null */ Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
ExecutorCompletionService是CompletionService的惟一實現併發
private final Executor executor; private final AbstractExecutorService aes; private final BlockingQueue<Future<V>> completionQueue;
public ExecutorCompletionService(Executor executor) {
public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)
每一個任務的提交都會構造一個QueueingFutrue異步
public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; }
而QueueingFutrue有個回調方法,在任務執行完成後,放到completionQueue阻塞隊列中async
protected void done() { completionQueue.add(task); }
由此實現方式很明顯了。this
查詢文檔能夠看到官方提供的案例code
/** Suppose you have a set of solvers for a certain problem, each * returning a value of some type {@code Result}, and would like to * run them concurrently, processing the results of each of them that * return a non-null value, in some method {@code use(Result r)}. You * could write this as: * 大意就是:假設你有一批須要回執的任務要併發處理,就能夠使用以下方式(ps: 也能夠利用Futrue方式的實現 * ,這裏再也不說明) */ void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException, ExecutionException { CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e); for (Callable<Result> s : solvers) ecs.submit(s); int n = solvers.size(); for (int i = 0; i < n; ++i) { Result r = ecs.take().get(); if (r != null) use(r); } } /** Suppose instead that you would like to use the first non-null result * of the set of tasks, ignoring any that encounter exceptions, * and cancelling all other tasks when the first one is ready: * 大意是:若是隻想獲得率先執行完任務的返回值,忽略其餘的任務執行狀況,而且在第一個任務執行結束後取消其餘任務 */ void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException { CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e); int n = solvers.size(); List<Future<Result>> futures = new ArrayList<Future<Result>>(n); Result result = null; try { for (Callable<Result> s : solvers) futures.add(ecs.submit(s)); for (int i = 0; i < n; ++i) { try { Result r = ecs.take().get(); if (r != null) { result = r; break; } } catch (ExecutionException ignore) {} } } finally { for (Future<Result> f : futures) f.cancel(true); } if (result != null) use(result); }