使用ExecutorService提交多個任務時,須要保存Callable對應的Future。java
須要反覆循環判斷future是否完成。dom
@Test public void test() throws InterruptedException { LinkedBlockingQueue<Future<String>> futures = new LinkedBlockingQueue<>(); Random random = new Random(); ExecutorService pool = Executors.newFixedThreadPool(5); Thread producerThread = new Thread(){ @Override public void run() { for (int i = 0; i < 10; i++) { int finalI = i; Future<String> future = pool.submit(new Callable<String>() { @Override public String call() throws Exception { int time = random.nextInt(10000); Thread.sleep(time); return "task_" + finalI; } }); System.out.println("submit_" + i); futures.add(future); } } }; producerThread.start(); Thread consumerThread = new Thread(){ @Override public void run() { while (true) { for (Future<String> future : futures) { if (future.isDone()) { try { System.out.println(future.get()); futures.remove(future); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } } } }; consumerThread.start(); producerThread.join(); consumerThread.join(); }
使用CompletionService能夠簡化操做。ide
@Test public void test2() throws InterruptedException { //LinkedBlockingQueue<Future<String>> futures = new LinkedBlockingQueue<>(); Random random = new Random(); ExecutorService pool = Executors.newFixedThreadPool(5); //使用ExecutorCompletionService包裝ExecutorService ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(pool); Thread producerThread = new Thread(){ @Override public void run() { for (int i = 0; i < 10; i++) { int finalI = i; //Future<String> future = pool.submit(new Callable<String>() { completionService.submit(new Callable<String>() { @Override public String call() throws Exception { int time = random.nextInt(10000); Thread.sleep(time); return "task_" + finalI; } }); System.out.println("submit_" + i); //futures.add(future); } } }; producerThread.start(); Thread consumerThread = new Thread(){ @Override public void run() { while (true) { try { Future<String> take = completionService.take(); System.out.println(take.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } /*while (true) { for (Future<String> future : futures) { if (future.isDone()) { try { System.out.println(future.get()); futures.remove(future); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } }*/ } }; consumerThread.start(); producerThread.join(); consumerThread.join(); }
//構造方法 public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); }
初始化自身屬性:this
public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); //包裝成QueueingFuture,傳遞給executor executor.execute(new QueueingFuture(f)); return f; }
ExecutorCompletionService中的內部類線程
private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future<V> task; }
重寫done方法,將task添加到completionQueue。completionQueue是ExecutorCompletionService中的屬性。因此,執行完一個任務,就將執行完的RunnableFuture添加到ExecutorCompletionService的阻塞隊列completionQueue中。code
public Future<V> take() throws InterruptedException { return completionQueue.take(); }
而take操做就是從阻塞隊列中取出,已經完成的任務結果(RunnableFuture)。隊列