ExecutorCompletionService

ExecutorService

使用ExecutorService提交多個任務時,須要保存Callable對應的Future。java

須要反覆循環判斷future是否完成。dom

@Test
public void test() throws InterruptedException {
  LinkedBlockingQueue<Future<String>> futures = new LinkedBlockingQueue<>();
  Random random = new Random();
  ExecutorService pool = Executors.newFixedThreadPool(5);
  Thread producerThread = new Thread(){
    @Override
    public void run() {
      for (int i = 0; i < 10; i++) {
        int finalI = i;
        Future<String> future = pool.submit(new Callable<String>() {

          @Override
          public String call() throws Exception {
            int time = random.nextInt(10000);
            Thread.sleep(time);
            return "task_" + finalI;
          }
        });
        System.out.println("submit_" + i);

        futures.add(future);
      }
    }
  };
  producerThread.start();


  Thread consumerThread = new Thread(){
    @Override
    public void run() {
      while (true) {
        for (Future<String> future : futures) {
          if (future.isDone()) {
            try {
              System.out.println(future.get());
              futures.remove(future);
            } catch (InterruptedException e) {
              e.printStackTrace();
            } catch (ExecutionException e) {
              e.printStackTrace();
            }
          }
        }
      }
    }
  };
  consumerThread.start();
  producerThread.join();
  consumerThread.join();

}

CompletionService 獲取結果

使用CompletionService能夠簡化操做。ide

@Test
public void test2() throws InterruptedException {
   //LinkedBlockingQueue<Future<String>> futures = new LinkedBlockingQueue<>();
   Random random = new Random();
   ExecutorService pool = Executors.newFixedThreadPool(5);
   //使用ExecutorCompletionService包裝ExecutorService
   ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(pool);

   Thread producerThread = new Thread(){
      @Override
      public void run() {

         for (int i = 0; i < 10; i++) {
            int finalI = i;
            //Future<String> future = pool.submit(new Callable<String>() {
            completionService.submit(new Callable<String>() {

               @Override
               public String call() throws Exception {
                  int time = random.nextInt(10000);
                  Thread.sleep(time);
                  return "task_" + finalI;
               }
            });
            System.out.println("submit_" + i);

            //futures.add(future);
         }
      }
   };
   producerThread.start();


   Thread consumerThread = new Thread(){
      @Override
      public void run() {
         while (true) {
            try {
               Future<String> take = completionService.take();
               System.out.println(take.get());
            } catch (InterruptedException e) {
               e.printStackTrace();
            } catch (ExecutionException e) {
               e.printStackTrace();
            }
         }
         /*while (true) {
            for (Future<String> future : futures) {
               if (future.isDone()) {
                  try {
                     System.out.println(future.get());
                     futures.remove(future);
                  } catch (InterruptedException e) {
                     e.printStackTrace();
                  } catch (ExecutionException e) {
                     e.printStackTrace();
                  }
               }
            }
         }*/
      }
   };
   consumerThread.start();
   producerThread.join();
   consumerThread.join();
}

CompletionService 原理

構造方法

//構造方法
public ExecutorCompletionService(Executor executor) {
    if (executor == null)
        throw new NullPointerException();
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

初始化自身屬性:this

  • this.executor 真正的線程池
  • this.completionQueue 保存執行完成的future執行結果的阻塞隊列

submit

public Future<V> submit(Callable<V> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task);
    //包裝成QueueingFuture,傳遞給executor
    executor.execute(new QueueingFuture(f));
    return f;
}

QueueingFuture

ExecutorCompletionService中的內部類線程

private class QueueingFuture extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task) {
        super(task, null);
        this.task = task;
    }
    protected void done() { completionQueue.add(task); }
    private final Future<V> task;
}

重寫done方法,將task添加到completionQueue。completionQueue是ExecutorCompletionService中的屬性。因此,執行完一個任務,就將執行完的RunnableFuture添加到ExecutorCompletionService的阻塞隊列completionQueue中。code

take

public Future<V> take() throws InterruptedException {
    return completionQueue.take();
}

而take操做就是從阻塞隊列中取出,已經完成的任務結果(RunnableFuture)。隊列

相關文章
相關標籤/搜索