背景:ide
若是一個任務由多個子任務組成,子任務所有執行完成後而後由主線程對全部子任務結果進行封裝,能夠採用以下幾種方式:測試
一、基於Guava ListenableFuture 進行;spa
二、基於FutureTask 和CountDownLatch進行線程
三、基於FutureTask進行;code
四、基於CompletionService進行blog
五、基於BlockingQueue進行get
說明:it
二、3 的區別就是線程池時候每次都新建、shutdown;io
四、5 是一個東西class
public static void listenableFuture() { try { ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2)); List<ListenableFuture<Integer>> futures = new ArrayList<>(); for (int i = 0; i < 20; i++) { final ListenableFuture<Integer> future = pool.submit(new CountTask()); futures.add(future); Futures.addCallback(future, new FutureCallback<Integer>() { @Override public void onSuccess(Integer result) { System.out.println(result); } @Override public void onFailure(Throwable t) { t.printStackTrace(); } }); } System.out.println("submit success"); ListenableFuture<List<Integer>> ret = Futures.successfulAsList(futures); List<Integer> res = ret.get(); System.out.println(res); pool.shutdown(); System.out.println("shutdown success"); } catch (Exception e) { e.printStackTrace(); } } public static void countDownCount() throws Exception { int threadNum = 20; ExecutorService executor = Executors.newCachedThreadPool(); CountDownLatch count = new CountDownLatch(threadNum); List<FutureTask<Integer>> futureTasks = new ArrayList<>(); for (int i = 0; i < threadNum; i++) { CountTask task = new CountTask(count); FutureTask<Integer> futureTask = new FutureTask<>(task); executor.submit(futureTask); futureTasks.add(futureTask); } // 該動做會阻塞主線程知道各個線程完成任務 count.await(); System.out.println("執行完成"); for (FutureTask<Integer> futureTask : futureTasks) { Integer ret = futureTask.get(); System.out.println(ret); } executor.shutdown(); System.out.println("測試完成"); } public static void futureTaskCount() throws Exception { int threadNum = 20; ExecutorService executor = Executors.newCachedThreadPool(); List<FutureTask<Integer>> futureTasks = new ArrayList<>(); for (int i = 0; i < threadNum; i++) { CountTask task = new CountTask(); FutureTask<Integer> futureTask = new FutureTask<>(task); executor.submit(futureTask); futureTasks.add(futureTask); } // 關閉線程池,該動做會阻塞主線程知道線程池中線程執行完成 executor.shutdown(); System.out.println("shutdown"); for (FutureTask<Integer> futureTask : futureTasks) { Integer ret = futureTask.get(); System.out.println(ret); } System.out.println("測試完成"); } public static void completionCount() throws Exception { int threadNum = 20; ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); CompletionService<Integer> pool = new ExecutorCompletionService<Integer>(executor); for (int i = 0; i < threadNum; i++) { pool.submit(new CountTask()); } for (int i = 0; i < threadNum; i++) { Integer ret = pool.take().get(); System.out.println("輸出結果" + ret); } System.out.println("測試完成"); executor.shutdown(); } // 使用阻塞容器保存每次Executor處理的結果,在後面進行統一處理 public static void blockingQueueCount() throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); BlockingQueue<Future<Integer>> queue = new LinkedBlockingQueue<Future<Integer>>(); for (int i = 0; i < 10; i++) { Future<Integer> future = exec.submit(new CountTask()); queue.add(future); } int sum = 0; int queueSize = queue.size(); for (int i = 0; i < queueSize; i++) { sum += queue.take().get(); } System.out.println("總數爲:" + sum); exec.shutdown(); }