本文首發於一世流雲專欄: https://segmentfault.com/blog...
juc-executors
框架是整個J.U.C包中類/接口關係最複雜的框架,真正理解executors框架的前提是理清楚各個模塊之間的關係,高屋建瓴,從總體到局部才能透徹理解其中各個模塊的功能和背後的設計思路。java
網上有太多文章講executors框架,要麼泛泛而談,要麼一葉障目不見泰山,缺少總體視角,不少根本沒有理解整個框架的設計思想和模塊關係。本文將對整個executors框架作綜述,介紹各個模塊的功能和聯繫,後續再深刻探討每一個模塊,包括模塊中的各個工具類。segmentfault
Executor
是JDK1.5時,隨着J.U.C引入的一個接口,引入該接口的主要目的是解耦任務自己和任務的執行。咱們以前經過線程執行一個任務時,每每須要先建立一個線程,而後調用線程的start
方法來執行任務:設計模式
new Thread(new(RunnableTask())).start();
上述RunnableTask是實現了Runnable接口的任務類
而Executor接口解耦了任務和任務的執行,該接口只有一個方法,入參爲待執行的任務:緩存
public interface Executor { /** * 執行給定的Runnable任務. * 根據Executor的實現不一樣, 具體執行方式也不相同. * * @param command the runnable task * @throws RejectedExecutionException if this task cannot be accepted for execution * @throws NullPointerException if command is null */ void execute(Runnable command); }
咱們能夠像下面這樣執行任務,而沒必要關心線程的建立:多線程
Executor executor = someExecutor; // 建立具體的Executor對象 executor.execute(new RunnableTask1()); executor.execute(new RunnableTask2()); ...
因爲Executor僅僅是一個接口,因此根據其實現的不一樣,執行任務的具體方式也不盡相同,好比:框架
①同步執行任務異步
class DirectExecutor implements Executor { public void execute(Runnable r) { r.run(); } }
DirectExecutor是一個同步任務執行器,對於傳入的任務,只有執行完成後execute纔會返回。ide
②異步執行任務工具
class ThreadPerTaskExecutor implements Executor { public void execute(Runnable r) { new Thread(r).start(); } }
ThreadPerTaskExecutor是一個異步任務執行器,對於每一個任務,執行器都會建立一個新的線程去執行任務。學習
注意:Java線程與本地操做系統的線程是一一映射的。Java線程啓動時會建立一個本地操做系統線程;當該Java線程終止時,對應操做系統線程會被回收。因爲CPU資源是有限的,因此線程數量有上限,因此通常由線程池來管理線程的建立/回收,而上面這種方式實際上是線程池的雛形。
③對任務進行排隊執行
class SerialExecutor implements Executor { final Queue<Runnable> tasks = new ArrayDeque<Runnable>(); final Executor executor; Runnable active; SerialExecutor(Executor executor) { this.executor = executor; } public synchronized void execute(final Runnable r) { tasks.offer(new Runnable() { public void run() { try { r.run(); } finally { scheduleNext(); } } }); if (active == null) { scheduleNext(); } } protected synchronized void scheduleNext() { if ((active = tasks.poll()) != null) { executor.execute(active); } } }
SerialExecutor 會對傳入的任務進行排隊(FIFO順序),而後從隊首取出一個任務執行。
以上這些示例僅僅是給出了一些可能的Executor實現,J.U.C包中提供了不少Executor的具體實現類,咱們之後會具體講到,這裏關鍵是理解Executor的設計思想——對任務和任務的執行解耦。
Executor接口提供的功能很簡單,爲了對它進行加強,J.U.C又提供了一個名爲ExecutorService
接口,ExecutorService也是在JDK1.5時,隨着J.U.C引入的:
能夠看到,ExecutorService繼承了Executor,它在Executor的基礎上加強了對任務的控制,同時包括對自身生命週期的管理,主要有四類:
public interface ExecutorService extends Executor { /** * 關閉執行器, 主要有如下特色: * 1. 已經提交給該執行器的任務將會繼續執行, 可是再也不接受新任務的提交; * 2. 若是執行器已經關閉了, 則再次調用沒有反作用. */ void shutdown(); /** * 當即關閉執行器, 主要有如下特色: * 1. 嘗試中止全部正在執行的任務, 沒法保證可以中止成功, 但會盡力嘗試(例如, 經過 Thread.interrupt中斷任務, 可是不響應中斷的任務可能沒法終止); * 2. 暫停處理已經提交但未執行的任務; * * @return 返回已經提交但未執行的任務列表 */ List<Runnable> shutdownNow(); /** * 若是該執行器已經關閉, 則返回true. */ boolean isShutdown(); /** * 判斷執行器是否已經【終止】. * <p> * 僅當執行器已關閉且全部任務都已經執行完成, 才返回true. * 注意: 除非首先調用 shutdown 或 shutdownNow, 不然該方法永遠返回false. */ boolean isTerminated(); /** * 阻塞調用線程, 等待執行器到達【終止】狀態. * * @return {@code true} 若是執行器最終到達終止狀態, 則返回true; 不然返回false * @throws InterruptedException if interrupted while waiting */ boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; /** * 提交一個具備返回值的任務用於執行. * 注意: Future的get方法在成功完成時將會返回task的返回值. * * @param task 待提交的任務 * @param <T> 任務的返回值類型 * @return 返回該任務的Future對象 * @throws RejectedExecutionException 若是任務沒法安排執行 * @throws NullPointerException if the task is null */ <T> Future<T> submit(Callable<T> task); /** * 提交一個 Runnable 任務用於執行. * 注意: Future的get方法在成功完成時將會返回給定的結果(入參時指定). * * @param task 待提交的任務 * @param result 返回的結果 * @param <T> 返回的結果類型 * @return 返回該任務的Future對象 * @throws RejectedExecutionException 若是任務沒法安排執行 * @throws NullPointerException if the task is null */ <T> Future<T> submit(Runnable task, T result); /** * 提交一個 Runnable 任務用於執行. * 注意: Future的get方法在成功完成時將會返回null. * * @param task 待提交的任務 * @return 返回該任務的Future對象 * @throws RejectedExecutionException 若是任務沒法安排執行 * @throws NullPointerException if the task is null */ Future<?> submit(Runnable task); /** * 執行給定集合中的全部任務, 當全部任務都執行完成後, 返回保持任務狀態和結果的 Future 列表. * <p> * 注意: 該方法爲同步方法. 返回列表中的全部元素的Future.isDone() 爲 true. * * @param tasks 任務集合 * @param <T> 任務的返回結果類型 * @return 任務的Future對象列表,列表順序與集合中的迭代器所生成的順序相同, * @throws InterruptedException 若是等待時發生中斷, 會將全部未完成的任務取消. * @throws NullPointerException 任一任務爲 null * @throws RejectedExecutionException 若是任一任務沒法安排執行 */ <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; /** * 執行給定集合中的全部任務, 當全部任務都執行完成後或超時期滿時(不管哪一個首先發生), 返回保持任務狀態和結果的 Future 列表. */ <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; /** * 執行給定集合中的任務, 只有其中某個任務率先成功完成(未拋出異常), 則返回其結果. * 一旦正常或異常返回後, 則取消還沒有完成的任務. */ <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; /** * 執行給定集合中的任務, 若是在給定的超時期滿前, 某個任務已成功完成(未拋出異常), 則返回其結果. * 一旦正常或異常返回後, 則取消還沒有完成的任務. */ <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
關於Future,其實就是Java多線程設計模式中 Future模式,讀者能夠先參考下個人這篇博文( https://segmentfault.com/a/11...),後面咱們會專門講解J.U.C中的Future框架。
Future
對象提供了對任務異步執行的支持,也就是說調用線程無需等待任務執行完成,提交待執行的任務後,就會當即返回往下執行。而後,能夠在須要時檢查Future是否有結果了,若是任務已執行完畢,經過Future.get()
方法能夠獲取到執行結果——Future.get()是阻塞方法。
在工業環境中,咱們可能但願提交給執行器的某些任務可以定時執行或週期性地執行,這時咱們能夠本身實現Executor接口來建立符合咱們須要的類,Doug Lea已經考慮到了這類需求,因此在ExecutorService的基礎上,又提供了一個接口——ScheduledExecutorService
,該接口也是在JDK1.5時,隨着J.U.C引入的:
ScheduledExecutorService提供了一系列schedule方法,能夠在給定的延遲後執行提交的任務,或者每一個指定的週期執行一次提交的任務,咱們來看下面這個示例:
public class ScheduleExecutorTest { public static void main(String[] args) { ScheduledExecutorService scheduler = someScheduler; // 建立一個ScheduledExecutorService實例 final ScheduledFuture<?> scheduledFuture = scheduler.scheduleAtFixedRate(new BeepTask(), 10, 10, TimeUnit.SECONDS); // 每隔10s蜂鳴一次 scheduler.schedule(new Runnable() { @Override public void run() { scheduledFuture.cancel(true); } }, 1, TimeUnit.HOURS) // 1小時後, 取消蜂鳴任務 } private static class BeepTask implements Runnable { @Override public void run() { System.out.println("beep!"); } } }
上述示例先建立一個ScheduledExecutorService類型的執行器,而後利用scheduleAtFixedRate方法提交了一個「蜂鳴」任務,每隔10s該任務會執行一次。
注意:scheduleAtFixedRate
方法返回一個ScheduledFuture對象,ScheduledFuture其實就是在Future的基礎上增長了延遲的功能。經過ScheduledFuture,能夠取消一個任務的執行,本例中咱們利用schedule方法,設定在1小時後,執行任務的取消。
ScheduledExecutorService完整的接口聲明以下:
public interface ScheduledExecutorService extends ExecutorService { /** * 提交一個待執行的任務, 並在給定的延遲後執行該任務. * * @param command 待執行的任務 * @param delay 延遲時間 * @param unit 延遲時間的單位 */ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); /** * 提交一個待執行的任務(具備返回值), 並在給定的延遲後執行該任務. * * @param command 待執行的任務 * @param delay 延遲時間 * @param unit 延遲時間的單位 * @param <V> 返回值類型 */ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); /** * 提交一個待執行的任務. * 該任務在 initialDelay 後開始執行, 而後在 initialDelay+period 後執行, 接着在 initialDelay + 2 * period 後執行, 依此類推. * * @param command 待執行的任務 * @param initialDelay 首次執行的延遲時間 * @param period 連續執行之間的週期 * @param unit 延遲時間的單位 */ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); /** * 提交一個待執行的任務. * 該任務在 initialDelay 後開始執行, 隨後在每一次執行終止和下一次執行開始之間都存在給定的延遲. * 若是任務的任一執行遇到異常, 就會取消後續執行. 不然, 只能經過執行程序的取消或終止方法來終止該任務. * * @param command 待執行的任務 * @param initialDelay 首次執行的延遲時間 * @param delay 一次執行終止和下一次執行開始之間的延遲 * @param unit 延遲時間的單位 */ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); }
至此,Executors框架中的三個最核心的接口介紹完畢,這三個接口的關係以下圖:
經過第一部分的學習,讀者應該對Executors框架有了一個初步的認識,Executors框架就是用來解耦任務自己與任務的執行,並提供了三個核心接口來知足使用者的需求:
既然上面三種執行器只是接口,那麼就必定存在具體的實現類,J.U.C提供了許多默認的接口實現,若是要用戶本身去建立這些類的實例,就須要瞭解這些類的細節,有沒有一種直接的方式,僅僅根據一些須要的特性(參數)就建立這些實例呢?由於對於用戶來講,其實使用的只是這三個接口。
JDK1.5時,J.U.C中還提供了一個Executors
類,專門用於建立上述接口的實現類對象。Executors其實就是一個簡單工廠,它的全部方法都是static的,用戶能夠根據須要,選擇須要建立的執行器實例,Executors一共提供了五類可供建立的Executor執行器實例。
Executors提供了兩種建立具備固定線程數的Executor的方法,固定線程池在初始化時肯定其中的線程總數,運行過程當中會始終維持線程數量不變。
能夠看到下面的兩種建立方法其實都返回了一個ThreadPoolExecutor
實例。ThreadPoolExecutor是一個ExecutorService接口的實現類,咱們會在後面用專門章節講解,如今只須要了解這是一種Executor,用來調度其中的線程的執行便可。
/** * 建立一個具備固定線程數的Executor. */ public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } /** * 建立一個具備固定線程數的Executor. * 在須要時使用提供的 ThreadFactory 建立新線程. */ public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
上面須要注意的是ThreadFactory
這個接口:
public interface ThreadFactory { Thread newThread(Runnable r); }
既然返回的是一個線程池,那麼就涉及線程的建立,通常咱們須要經過 new Thread ()
這種方法建立一個新線程,可是咱們可能但願設置一些線程屬性,好比
名稱、守護程序狀態、ThreadGroup 等等,線程池中的線程很是多,若是每一個線程都這樣手動配置勢必很是繁瑣,而ThreadFactory 做爲一個線程工廠可讓咱們從這些繁瑣的線程狀態設置的工做中解放出來,還能夠由外部指定ThreadFactory實例,以決定線程的具體建立方式。
Executors提供了靜態內部類,實現了ThreadFactory接口,最簡單且經常使用的就是下面這個DefaultThreadFactory :
/** * 默認的線程工廠. */ static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
能夠看到,DefaultThreadFactory 初始化的時候定義了線程組、線程名稱等信息,每建立一個線程,都給線程統一分配這些信息,避免了一個個手工經過new的方式建立線程,又可進行工廠的複用。
除了固定線程數的線程池,Executors還提供了兩種建立只有單個線程Executor的方法:
/** * 建立一個使用單個 worker 線程的 Executor. */ public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } /** * 建立一個使用單個 worker 線程的 Executor. * 在須要時使用提供的 ThreadFactory 建立新線程. */ public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
能夠看到,只有單個線程的線程池其實就是指定線程數爲1的固定線程池,主要區別就是,返回的Executor實例用了一個FinalizableDelegatedExecutorService
對象進行包裝。
咱們來看下FinalizableDelegatedExecutorService,該類 只定義了一個finalize方法:
static class FinalizableDelegatedExecutorService extends DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super(executor); } protected void finalize() { super.shutdown(); } }
核心是其繼承的DelegatedExecutorService ,這是一個包裝類,實現了ExecutorService的全部方法,可是內部實現其實都委託給了傳入的ExecutorService 實例:
/** * ExecutorService實現類的包裝類. */ static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute(Runnable command) { e.execute(command); } public void shutdown() { e.shutdown(); } public List<Runnable> shutdownNow() { return e.shutdownNow(); } public boolean isShutdown() { return e.isShutdown(); } public boolean isTerminated() { return e.isTerminated(); } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return e.awaitTermination(timeout, unit); } public Future<?> submit(Runnable task) { return e.submit(task); } public <T> Future<T> submit(Callable<T> task) { return e.submit(task); } public <T> Future<T> submit(Runnable task, T result) { return e.submit(task, result); } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { return e.invokeAll(tasks); } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { return e.invokeAll(tasks, timeout, unit); } public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { return e.invokeAny(tasks); } public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return e.invokeAny(tasks, timeout, unit); } }
爲何要畫蛇添足,加上這樣一個委託層?由於返回的ThreadPoolExecutor包含一些設置線程池大小的方法——好比setCorePoolSize,對於只有單個線程的線程池來講,咱們是不但願用戶經過強轉的方式使用這些方法的,因此須要一個包裝類,只暴露ExecutorService自己的方法。
有些狀況下,咱們雖然建立了具備必定線程數的線程池,但出於資源利用率的考慮,可能但願在特定的時候對線程進行回收(好比線程超過指定時間沒有被使用),Executors就提供了這種類型的線程池:
/** * 建立一個可緩存線程的Execotor. * 若是線程池中沒有線程可用, 則建立一個新線程並添加到池中; * 若是有線程長時間未被使用(默認60s, 可經過threadFactory配置), 則從緩存中移除. */ public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } /** * 建立一個可緩存線程的Execotor. * 若是線程池中沒有線程可用, 則建立一個新線程並添加到池中; * 若是有線程長時間未被使用(默認60s, 可經過threadFactory配置), 則從緩存中移除. * 在須要時使用提供的 ThreadFactory 建立新線程. */ public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }
能夠看到,返回的仍是ThreadPoolExecutor對象,只是指定了超時時間,另外線程池中線程的數量在[0, Integer.MAX_VALUE]
之間。
若是有任務須要延遲/週期調用,就須要返回ScheduledExecutorService接口的實例,ScheduledThreadPoolExecutor
就是實現了ScheduledExecutorService接口的一種Executor,和ThreadPoolExecutor同樣,這個咱們後面會專門講解。
/** * 建立一個具備固定線程數的 可調度Executor. * 它可安排任務在指定延遲後或週期性地執行. */ public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } /** * 建立一個具備固定線程數的 可調度Executor. * 它可安排任務在指定延遲後或週期性地執行. * 在須要時使用提供的 ThreadFactory 建立新線程. */ public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); }
Fork/Join線程池是比較特殊的一類線程池,在JDK1.7時才引入,其核心實現就是ForkJoinPool
類。關於Fork/Join框架,咱們後面會專題講解,如今只須要知道,Executors框架提供了一種建立該類線程池的便捷方法。
/** * 建立具備指定並行級別的ForkJoin線程池. */ public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool(parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } /** * 建立並行級別等於CPU核心數的ForkJoin線程池. */ public static ExecutorService newWorkStealingPool() { return new ForkJoinPool(Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }
至此,Executors框架的總體結構基本就講解完了,此時咱們的腦海中應有大體以下的一幅類繼承圖:
下面來回顧一下,上面的各個接口/類的關係和做用:
關於ThreadPoolExecutor和ScheduledThreadPoolExecutor,咱們會在下一章詳細講解,幫助讀者理解線程池的實現原理。至於ForkJoinPool,涉及Fork/Join這個並行框架的講解,咱們後面會專題介紹。