Executor,Executors和ExecutorService

Executor

Executor是java.util.concurrent包中的一個接口,是一個執行提交的Runnable任務的對象。這個接口提供了一種方式把任務提交從每一個任務會如何執行的方法中解耦,包括線城市用,調度等的細節。使用Executor代替了顯式建立線程。例如,比起對一組task中的每個調用new Thread(new(RunnableTask())).start(),你能夠用:java

Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
...

可是,Executor接口不是嚴格須要執行是異步的。在最簡單的狀況中,一個executor可以在調用者的線程上當即運行提交的任務:網絡

class DirectExecutor implements Executor {
  public void execute(Runnable r) {
    r.run();
  }
}

更典型的是,任務執行在非調用者線程。下面的executor爲每一個task產出一個新的線程:app

class ThreadPerTaskExecutor implements Executor {
  public void execute(Runnable r) {
    new Thread(r).start();
  }
}

不少Executor的實現強制加入了一些關於如何以及什麼時候任務被調度的限制。下面的executor串行提交的任務到第二個executor,代表它是一個混合的executor:異步

class SerialExecutor implements Executor {
  final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
  final Executor executor;
  Runnable active;

  SerialExecutor(Executor executor) {
    this.executor = executor;
  }

  public synchronized void execute(final Runnable r) {
    tasks.offer(new Runnable() {
      public void run() {
        try {
          r.run();
        } finally {
          scheduleNext();
        }
      }
    });
    if (active == null) {
      scheduleNext();
    }
  }

  protected synchronized void scheduleNext() {
    if ((active = tasks.poll()) != null) {
      executor.execute(active);
    }
  }
}

在java.util.concurrent包中提供的Executor接口的實現(如ThreadPoolExecutorScheduledThreadPoolExecutorForkJoinPoolAbstractExecutorService)也同時實現了ExecutorService,這是一個更普遍的接口。ThreadPoolExecutor類提供了一個可擴展的線程池實現。Executors類爲這些Executors提供了方便的工廠方法。socket

內存一致性效應:線程中在提交一個Runnable對象給一個Executor以前發生的操做happen-before執行這個Runnable(可能在另外一個線程中執行)。this

實現Executor接口須要實現execute方法,定義以下:線程

void execute(Runnbale command)

這個方法在將來某個時間點執行給定的command。這個command可能執行在一個新的線程中,在一個池化的線程中,或在調用者線程中,這取決於Executor的實現。code

ExecutorService

ExecutorService接口繼承Executor接口,是提供管理終止的方法以及produce出Future去跟蹤一個或多個異步任務進度的方法的Executor。server

一個ExecutorService能夠被shutdown,會致使它拒絕新的tasks。提供了兩個不一樣的方法去關閉一個ExecutorService。shutdown方法容許以前提交的任務在終止以前執行,shutdownNow方法禁止等待已經開始任務並試圖結束正在執行的任務。若是一個ExecutorService終止了,一個executor沒有正在執行的活躍任務,沒有等待執行的任務,也沒有新任務能被提交。一個沒有使用的ExecutorService應該被關閉以回收資源。對象

submit方法根據Executor的execute(Runnable)方法擴展,建立並返回一個Future,可以用來cancel執行以及等待執行完成。invokeAny方法以及invokeAll方法執行最普通的批量執行,執行一組任務而後等待至少一個,或者全部任務完成。

Executors類爲ExecutorService提供工廠方法。

使用舉例

這裏有一個網絡服務,其中一個線程池中的線程爲請求提供服務。它使用預配置的Executors的newFixedThreadPool工廠方法:

class NetworkService implements Runnable {
  private final ServerSocket serverSocket;
  private final ExecutorService pool;

  public NetworkService(int port, int poolSize) 
      throws IOException {
    serverSocket = new ServerSocket(port);
    pool = Executors.newFixedTreadPool(poolSize);
  }

  public void run() { // run the service
    try {
      for (;;) {
        pool.execute(new Handler(serverSocket.accept()));
      }
    } catch (IOException ex) {
      pool.shutdown();
    }
  }
}

class Handler implements Runnable {
  private final Socket socket;
  Handler(Socket socket) { this.socket = socket; }
  public void run() {
    // read and service request on socket
  }
}

下面的方法哦經過兩步關閉一個ExecutorService,首先調用shutdown以拒絕新來的tasks,而後調用shutdownNow(若是有必要的話),去cancel任何執行的任務:

void shutdownAndAwaitTermination(ExecutorService pool) {
  pool.shutdown(); // Disable new tasks from being submitted
  try {
    // Wait a while for existing tasks to terminate
    if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
      pool.shutdownNow(); // Cancel currently executing tasks
      // Wait a while for tasks to respond to being cancelled
      if (!pool.awaitTermination(60, TimeUnit.SECONDS))
        System.err.println("Pool did not terminate");
    }
  } catch (InterruptedException ie) {
    // (Re-)Cancel if current thread also interrupted
    pool.shutdownNow();
    // Preserve interrupt status
    Thread.currentThread.interrupt();
  }
}

ExecutorService中定義的方法

shutdown

void shutdown()

啓動有序關閉,執行先前提交的任務,可是不接受新的任務。若是已經關閉再次執行沒有影響。

這個方法不等待先前提交的任務完成執行,使用awaitTermination去等待任務執行完畢。

shutdownNow

List<Runnable> shutdownNow()

試圖中止全部正在執行的任務,中止等待任務的處理,返回一個等待執行的任務列表。

這個方法不等待正在執行的任務終止,使用awaitTermination去等待任務終止。

盡力(best-effort)去中止正在執行的任務。例如,標準實現會經過Thread的interrupt去cancel任務,所以任何迴應終止失敗的任務永遠不會終止。

isShutdown

boolean isShutdown()

若是executor已經被shutdown則返回true。

isTerminated

boolean isTerminated()

若是全部任務在shutdown以後都執行完成則返回true。注意isTerminated永遠不會爲true除非shutdown或shutdownNow先執行。

awaitTermination

boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException

在一個shutdown請求後阻塞直到全部任務完成執行,或者timeout發生,或者當前線程被interrupt,看哪一個先發生。

submit

<T> Future<T> submit(Callable<T> task)

提交一個返回值的任務去執行並返回一個表明任務掛起結果的Future對象。Future的get方法會返回成功完成的任務結果。

若是你想要當即阻塞等待一個任務完成,你可使用result = exec.submit(aCallable).get()

submit

<T> Future<T> submit(Runnable task, T result)

提交一個Runnable任務去執行並返回一個表明這個任務的Future。Future的get方法會返回成功完成的任務結果。

submit

Future<?> submit(Runnable task)

提交一個Runnable任務去執行並返回一個表明這個任務的Future。Future的get方法會返回成功完成的任務結果null。

invokeAll

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 
    throws InterruptedException

執行給定的一組任務,返回一個包含這些任務狀態和結果的Future列表。Future的isDone方法對返回列表中的每一個元素調用都返回true。

這個方法會阻塞,等待全部task完成。所以返回的list中的每一個Future都是完成狀態。

invokeAll

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                              long timeout, TimeUnit unit)
       throws InterruptedException

執行給定的一組任務,返回一個包含這些任務狀態和結果的列表。全部任務完成或timeout超時時方法返回。Future的isDone方法對返回列表中的每一個元素調用都返回true。

返回後,沒有完成的任務被cancel。

invokeAny

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
      throws InterruptedException, ExcutionException

執行給定的一組任務,若是其中一個任務成功完成(沒有拋出異常)則返回。返回後,全部沒完成的任務都被cancel。

invokeAny

<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException

執行給定的一組任務,若是其中一個任務成功完成(沒有拋出異常)則返回。

相關文章
相關標籤/搜索