本文首發於一世流雲專欄: https://segmentfault.com/blog...
在executors框架概述一節中,咱們曾經提到過一種可對任務進行延遲/週期性調度的執行器(Executor),這類Executor通常實現了ScheduledExecutorService這個接口。ScheduledExecutorService在普通執行器接口(ExecutorService)的基礎上引入了Future模式,使得能夠限時或週期性地調度任務。java
ScheduledThreadPoolExecutor
的類繼承關係以下圖,該圖中除了本節要講解的ScheduledThreadPoolExecutor外,其它部分已經在前2節詳細介紹過了:segmentfault
從上圖中能夠看到,ScheduledThreadPoolExecutor實際上是繼承了ThreadPoolExecutor這個普通線程池,咱們知道ThreadPoolExecutor中提交的任務都是實現了Runnable接口,可是ScheduledThreadPoolExecutor比較特殊,因爲要知足任務的延遲/週期調度功能,它會對全部的Runnable任務都進行包裝,包裝成一個RunnableScheduledFuture
任務。設計模式
RunnableScheduledFuture是Future模式中的一個接口,關於Future模式,咱們後續會專門章節講解,這裏只要知道RunnableScheduledFuture的做用就是能夠異步地執行【延時/週期任務】。
另外,咱們知道在ThreadPoolExecutor中,須要指定一個阻塞隊列做爲任務隊列。ScheduledThreadPoolExecutor中也同樣,不過特殊的是,ScheduledThreadPoolExecutor中的任務隊列是一種特殊的延時隊列(DelayQueue)。多線程
咱們曾經在juc-collections框架中,分析過該種阻塞隊列,DelayQueue底層基於優先隊列(PriorityQueue)實現,是一種「堆」結構,經過該種阻塞隊列能夠實現任務的延遲到期執行(即每次從隊列獲取的任務都是最早到期的任務)。框架
ScheduledThreadPoolExecutor在內部定義了DelayQueue的變種——DelayedWorkQueue
,它和DelayQueue相似,只不過要求全部入隊元素必須實現RunnableScheduledFuture接口。異步
咱們先來看下ScheduledThreadPoolExecutor的構造,其實在executors框架概述中講Executors時已經接觸過了,Executors使用newScheduledThreadPool
工廠方法建立ScheduledThreadPoolExecutor:性能
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); }
咱們來看下ScheduledThreadPoolExecutor的構造器,內部其實都是調用了父類ThreadPoolExecutor的構造器,這裏最須要注意的就是任務隊列的選擇——DelayedWorkQueue,咱們後面會詳細介紹它的實現原理。this
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); }
ScheduledThreadPoolExecutor的核心調度方法是schedule
、scheduleAtFixedRate
、scheduleWithFixedDelay
,咱們經過schedule方法來看下整個調度流程:spa
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); delayedExecute(t); return t; }
上述的decorateTask方法把Runnable任務包裝成ScheduledFutureTask,用戶能夠根據本身的須要覆寫該方法:線程
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) { return task; }
注意:
ScheduledFutureTask是RunnableScheduledFuture接口的實現類,任務經過
period
字段來表示任務類型
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { /** * 任務序號, 自增惟一 */ private final long sequenceNumber; /** * 首次執行的時間點 */ private long time; /** * 0: 非週期任務 * >0: fixed-rate任務 * <0: fixed-delay任務 */ private final long period; /** * 在堆中的索引 */ int heapIndex; ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } // ... }
ScheduledThreadPoolExecutor中的任務隊列—— DelayedWorkQueue,保存的元素就是ScheduledFutureTask。DelayedWorkQueue是一種 堆結構,time最小的任務會排在堆頂(表示最先過時),每次出隊都是取堆頂元素,這樣最快到期的任務就會被先執行。若是兩個ScheduledFutureTask的time相同,就比較它們的序號——sequenceNumber,序號小的表明先被提交,因此就會先執行。
schedule的核心是其中的delayedExecute方法:
private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) // 線程池已關閉 reject(task); // 任務拒絕策略 else { super.getQueue().add(task); // 將任務入隊 // 若是線程池已關閉且該任務是非週期任務, 則將其從隊列移除 if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); // 取消任務 else ensurePrestart(); // 添加一個工做線程 } }
經過delayedExecute能夠看出,ScheduledThreadPoolExecutor的整個任務調度流程大體以下圖:
咱們來分析這個過程:
而後,會建立一個工做線程,加入到核心線程池或者非核心線程池:
void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); }
經過ensurePrestart能夠看到,若是核心線程池未滿,則新建的工做線程會被放到核心線程池中。若是核心線程池已經滿了,ScheduledThreadPoolExecutor不會像ThreadPoolExecutor那樣再去建立歸屬於非核心線程池的工做線程,而是直接返回。也就是說,在ScheduledThreadPoolExecutor中,一旦核心線程池滿了,就不會再去建立工做線程。
這裏思考一點,何時會執行else if (wc == 0)建立一個歸屬於非核心線程池的工做線程?
答案是,當經過setCorePoolSize方法設置核心線程池大小爲0時,這裏必需要保證任務可以被執行,因此會建立一個工做線程,放到非核心線程池中。
最後,線程池中的工做線程會去任務隊列獲取任務並執行,當任務被執行完成後,若是該任務是週期任務,則會重置time字段,並從新插入隊列中,等待下次執行。這裏注意從隊列中獲取元素的方法:
allowCoreThreadTimeOut == false
),則會使用阻塞方法take獲取任務(由於沒有超時限制,因此會一直等待直到隊列中有任務);若是設置了超時,則會使用poll方法(方法入參須要超時時間),超時還沒拿到任務的話,該工做線程就會被回收。
上述就是ScheduledThreadPoolExecutor的核心調度流程,經過咱們的分析能夠看出,相比ThreadPoolExecutor,ScheduledThreadPoolExecutor主要有如下幾點不一樣:
最後,咱們來看下ScheduledThreadPoolExecutor中的延時隊列——DelayedWorkQueue。
DelayedWorkQueue,該隊列和已經介紹過的DelayQueue區別不大,只不過隊列元素是RunnableScheduledFuture:
static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> { private static final int INITIAL_CAPACITY = 16; private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; private int size = 0; private final ReentrantLock lock = new ReentrantLock(); private final Condition available = lock.newCondition(); private Thread leader = null; // ... }
DelayedWorkQueue是一個無界隊列,在隊列元素滿了之後會自動擴容,它並無像DelayQueue那樣,將隊列操做委託給PriorityQueue,而是本身從新實現了一遍堆的核心操做——上浮、下沉。我這裏再也不贅述這些堆操做,讀者能夠參考PriorityBlockingQueue自行閱讀源碼。
咱們關鍵來看下add
、take
、poll
這三個隊列方法,由於ScheduledThreadPoolExecutor的核心調度流程中使用到了這三個方法:
public boolean add(Runnable e) { return offer(e); } public boolean offer(Runnable e, long timeout, TimeUnit unit) { return offer(e); }
add、offer內部都調用了下面這個方法:
public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>) x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; // 隊列已滿, 擴容 if (i >= queue.length) grow(); size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0); } else { siftUp(i, e); // 堆上浮操做 } if (queue[0] == e) { // 當前元素是首個元素 leader = null; available.signal(); // 喚醒一個等待線程 } } finally { lock.unlock(); } return true; }
take方法:
public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (; ; ) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) // 隊列爲空 available.await(); // 等待元素入隊 else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) // 元素已到期 return finishPoll(first); // 執行到此處, 說明隊首元素還未到期 first = null; if (leader != null) available.await(); else { // 當前線程成功leader線程 Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
注意:上述leader表示一個等待獲取隊首元素的出隊線程,這是一種稱爲「Leader-Follower pattern」的多線程設計模式(讀者能夠參考DelayQueue中的講解)。
每次出隊元素時,若是隊列爲空或者隊首元素還未到期,線程就會在condition條件隊列等待。通常的思路是無限等待,直到出現一個入隊線程,入隊元素後將一個出隊線程喚醒。
爲了提高性能,當隊列非空時,用leader
保存第一個到來並嘗試出隊的線程,並設置它的等待時間爲隊首元素的剩餘期限,這樣當元素過時後,線程也就本身喚醒了,不須要入隊線程喚醒。這樣作的好處就是提高一些性能。
本節介紹了ScheduledThreadPoolExecutor,它是對普通線程池ThreadPoolExecutor的擴展,增長了延時調度、週期調度任務的功能。歸納下ScheduledThreadPoolExecutor的主要特色:
ScheduledFutureTask
,該類任務支持任務的週期執行、延遲執行;DelayedWorkQueue
做爲任務隊列。該隊列是無界隊列,因此任務必定能添加成功,可是當工做線程嘗試從隊列取任務執行時,只有最早到期的任務會出隊,若是沒有任務或者隊首任務未到期,則工做線程會阻塞;ScheduledThreadPoolExecutor
的任務調度流程與ThreadPoolExecutor略有區別,最大的區別就是,先往隊列添加任務,而後建立工做線程執行任務。另外,maximumPoolSize
這個參數對ScheduledThreadPoolExecutor其實並無做用,由於除非把corePoolSize設置爲0,這種狀況下ScheduledThreadPoolExecutor只會建立一個屬於非核心線程池的工做線程;不然,ScheduledThreadPoolExecutor只會新建歸屬於核心線程池的工做線程,一旦核心線程池滿了,就再也不新建工做線程。