附加:Java 4種線程池介紹請查看 html
當咱們經過Executor提交一組併發執行的任務,而且但願在每個任務完成後能當即獲得結果,有兩種方式能夠採起:併發
方式一:ide
經過一個list來保存一組future,而後在循環中輪訓這組future,直到每一個future都已完成。若是咱們不但願出現由於排在前面的任務阻塞致使後面先完成的任務的結果沒有及時獲取的狀況,那麼在調用get方式時,須要將超時時間設置爲0函數
public class CompletionServiceTest { static class Task implements Callable<String>{ private int i; public Task(int i){ this.i = i; } @Override public String call() throws Exception { Thread.sleep(10000); return Thread.currentThread().getName() + "執行完任務:" + i; } } public static void main(String[] args){ testUseFuture(); } private static void testUseFuture(){ int numThread = 5; ExecutorService executor = Executors.newFixedThreadPool(numThread); List<Future<String>> futureList = new ArrayList<Future<String>>(); for(int i = 0;i<numThread;i++ ){ Future<String> future = executor.submit(new CompletionServiceTest.Task(i)); futureList.add(future); } while(numThread > 0){ for(Future<String> future : futureList){ String result = null; try { result = future.get(0, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { //超時異常直接忽略
//future.cancel(true);//超時設置任務取消 } if(null != result){ futureList.remove(future); numThread--; System.out.println(result); //此處必須break,不然會拋出併發修改異常。(也能夠經過將futureList聲明爲CopyOnWriteArrayList類型解決) break; } } } } }
方式二:post
第一種方式顯得比較繁瑣,經過使用ExecutorCompletionService,則能夠達到代碼最簡化的效果。this
public class CompletionServiceTest { static class Task implements Callable<String>{ private int i; public Task(int i){ this.i = i; } @Override public String call() throws Exception { Thread.sleep(10000); return Thread.currentThread().getName() + "執行完任務:" + i; } } public static void main(String[] args) throws InterruptedException, ExecutionException{ testExecutorCompletionService(); } private static void testExecutorCompletionService() throws InterruptedException, ExecutionException{ int numThread = 3; ExecutorService executor = Executors.newFixedThreadPool(numThread); CompletionService<String> completionService = new ExecutorCompletionService<String>(executor); for(int i = 0;i<numThread;i++ ){ completionService.submit(new CompletionServiceTest.Task(i)); } } for(int i = 0;i<numThread;i++ ){ System.out.println(completionService.take().get()); //獲取執行結果 } }
ExecutorCompletionService分析:url
CompletionService是Executor和BlockingQueue的結合體。線程
public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); }
任務的提交和執行都是委託給Executor來完成。在構造函數中建立一個BlockingQueue來保存計算完成的結果,當提交某個任務時,該任務首先將被包裝爲一個QueueingFuture,htm
public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; }
QueueingFuture,這是FutureTask的一個子類,經過改寫該子類的done方法,能夠實現當任務完成時,將結果放入到BlockingQueue中。對象
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
而經過使用BlockingQueue的take(阻塞獲取)或poll(非阻塞獲取)方法,則能夠獲得結果。在BlockingQueue不存在元素時,這兩個操做會阻塞,一旦有結果加入,則當即返回。
附加知識點:
take():取走BlockingQueue裏排在首位的對象,若BlockingQueue爲空,阻斷進入等待狀態直到Blocking有新的對象被加入爲止;
poll(time):取走BlockingQueue裏排在首位的對象,若不能當即取出,則能夠等time參數規定的時間,取不到時返回nul
public Future<V> take() throws InterruptedException { return completionQueue.take(); } public Future<V> poll() { return completionQueue.poll(); }