在以前的博文--ThreadPoolExecutor源碼解讀已經對ThreadPoolExecutor的實現原理與源碼進行了分析。ScheduledExecutorService也是咱們在開發中常常會用到的一種ExecutorService,JDK中它的默認實現類爲ScheduledThreadPoolExecutor。本文針對ScheduledThreadPoolExecutor的設計原理與實現源碼進行分析解讀。html
首先來看一下ScheduledThreadPoolExecutor的繼承關係。
java
這裏有必要來介紹一下ScheduledExecutorService接口。
ScheduledExecutorService自己繼承了ExecutorService接口,併爲調度任務額外提供了兩種模式數組
咱們知道RunnableFuture接口是ThreadPoolExecutor對內和對外的橋樑。對內它的形態是Runnable來執行任務,對外它的形態是Future。
那麼對於ScheduledThreadPoolExecutor來講,RunnableScheduledFuture是它的內外橋樑,對內形態爲Runnable,對外形態爲ScheduledFuture。安全
ScheduledFutureTask是ScheduledThreadPoolExecutor對於RunnableScheduledFuture的默認實現,而且繼承了FutureTask。
它覆蓋了FutureTask的run方法來實現對延時執行、週期執行的支持,簡單來講它的套路就是對於延時任務則調用FutureTask#run而對於週期性任務則調用FutureTask#runAndReset而且在成功以後根據fixed-delay/fixed-rate模式來設置下次執行時間並從新將任務塞到工做隊列中。
對於ScheduledFutureTask#run方法來講它並不須要關心run的時候是否到了能夠執行的時間,由於這個職責會由ScheduledThreadPoolExecutor中的工做隊列來完成,以保證只有在任務能夠被執行的時候纔會被Worker線程從隊列中取出。併發
DelayedWorkQueue是ScheduledThreadPoolExecutor中阻塞隊列的實現,它內部使用了小根堆來使得自身具備優先隊列的功能,而且經過Leader/Follower模式避免線程沒必要要的等待。
從DelayedWorkQueue中取出任務時,任務必定已經至少到了能夠被執行的時間。ide
分析ScheduledThreadPoolExecutor的源碼,主要會分紅三個部分:ScheduledFutureTask, DelayedWorkQueue以及ScheduledThreadPoolExecutor自己。源碼分析
ScheduledFutureTask是ScheduledThreadPoolExecutor中的一個內部類。
咱們能夠看到,它的接口繼承線大致是兩條:RunnableFuture和ScheduledFuture,而RunnableScheduledFuture是二者的合體。this
outerTask的主要目的就是讓週期任務在第二次及以後的運行時跑的都是decorateTask返回的包裝結果。線程
ScheduledFutureTask一般狀況下就是線程池中Worker線程拿到的Runnable對象。注意這裏說的是一般狀況,由於ScheduledThreadPoolExecutor容許咱們經過decorateTask方法包裝原先的ScheduledFutureTask,相比之下這並不常見。設計
public void run() { // 是否週期性,就是判斷period是否爲0。 boolean periodic = isPeriodic(); // 檢查任務是否能夠被執行。 if (!canRunInCurrentRunState(periodic)) cancel(false); // 若是非週期性任務直接調用run運行便可。 else if (!periodic) ScheduledFutureTask.super.run(); // 若是成功runAndRest,則設置下次運行時間並調用reExecutePeriodic。 else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); // 須要從新將任務(outerTask)放到工做隊列中。此方法源碼會在後文介紹ScheduledThreadPoolExecutor自己API時說起。 reExecutePeriodic(outerTask); } } private void setNextRunTime() { long p = period; /* * fixed-rate模式,時間設置爲上一次時間+p。 * 提一句,這裏的時間其實只是能夠被執行的最小時間,不表明到點就要執行。 * 若是此次任務還沒執行完是確定不會執行下一次的。 */ if (p > 0) time += p; /** * fixed-delay模式,計算下一次任務能夠被執行的時間。 * 簡單來講差很少就是當前時間+delay值。由於代碼走到這裏任務就已經結束了,now()能夠認爲就是任務結束時間。 */ else time = triggerTime(-p); } long triggerTime(long delay) { /* * 若是delay < Long.Max_VALUE/2,則下次執行時間爲當前時間+delay。 * * 不然爲了不隊列中出現因爲溢出致使的排序紊亂,須要調用overflowFree來修正一下delay(若是有必要的話)。 */ return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); } /** * 主要就是有這麼一種狀況: * 某個任務的delay爲負數,說明當前能夠執行(其實早該執行了)。 * 工做隊列中維護任務順序是基於compareTo的,在compareTo中比較兩個任務的順序會用time相減,負數則說明優先級高。 * * 那麼就有可能出現一個delay爲正數,減去另外一個爲負數的delay,結果上溢爲負數,則會致使compareTo產生錯誤的結果。 * * 爲了特殊處理這種狀況,首先判斷一下隊首的delay是否是負數,若是是正數不用管了,怎麼減都不會溢出。 * 不然能夠拿當前delay減去隊首的delay來比較看,若是不出現上溢,則整個隊列都ok,排序不會亂。 * 否則就把當前delay值給調整爲Long.MAX_VALUE + 隊首delay。 */ private long overflowFree(long delay) { Delayed head = (Delayed) super.getQueue().peek(); if (head != null) { long headDelay = head.getDelay(NANOSECONDS); if (headDelay < 0 && (delay - headDelay < 0)) delay = Long.MAX_VALUE + headDelay; } return delay; }
public boolean cancel(boolean mayInterruptIfRunning) { // 先調用父類FutureTask#cancel來取消任務。 boolean cancelled = super.cancel(mayInterruptIfRunning); /* * removeOnCancel開關用於控制任務取消後是否應該從隊列中移除。 * * 若是已經成功取消,而且removeOnCancel開關打開,而且heapIndex >= 0(說明仍然在隊列中), * 則從隊列中刪除該任務。 */ if (cancelled && removeOnCancel && heapIndex >= 0) remove(this); return cancelled; }
DelayedWorkQueue是ScheduledThreadPoolExecutor使用的工做隊列。它內部維護了一個小根堆,根據任務的執行開始時間來維護任務順序。但不一樣的地方在於,它對於ScheduledFutureTask類型的元素額外維護了元素在隊列中堆數組的索引,用來實現快速取消。DelayedWorkQueue用了ReentrantLock+Condition來實現管程保證數據的線程安全性。
public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) // 容量擴增50%。 grow(); size = i + 1; // 第一個元素,其實這裏也能夠統一進行sift-up操做,不必特判。 if (i == 0) { queue[0] = e; setIndex(e, 0); } else { // 插入堆尾。 siftUp(i, e); } // 若是新加入的元素成爲了堆頂,則原先的leader就無效了。 if (queue[0] == e) { leader = null; // 因爲原先leader已經無效被設置爲null了,這裏隨便喚醒一個線程(未必是原先的leader)來取走堆頂任務。 available.signal(); } } finally { lock.unlock(); } return true; }
public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { /* * 循環讀取當前堆中最小也就執行開始時間最近的任務。 * 若是當前隊列爲空無任務,則在available條件上等待。 * 不然若是最近任務的delay<=0則返回這個任務以執行,不然的話根據是否能夠做爲leader分類: * 若是能夠做爲leader,則根據delay進行有時限等待。 * 不然無限等待直至被leader喚醒。 */ for (;;) { RunnableScheduledFuture<?> first = queue[0]; // 若是當前隊列無元素,則在available條件上無限等待直至有任務經過offer入隊並喚醒。 if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); // 若是delay小於0說明任務該馬上執行了。 if (delay <= 0) // 從堆中移除元素並返回結果。 return finishPoll(first); /* * 在接下來等待的過程當中,first應該清爲null。 * 由於下一輪從新拿到的最近須要執行的任務極可能已經不是這裏的first了。 * 因此對於接下來的邏輯來講first已經沒有任何用處了,不應持有引用。 */ first = null; // 若是目前有leader的話,當前線程做爲follower在available條件上無限等待直至喚醒。 if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { /* * 若是從available條件中被喚醒當前線程仍然是leader,則清空leader。 * * 分析一下這裏不等的狀況: * 1. 原先thisThread == leader, 而後堆頂更新了,leader爲null。 * 2. 堆頂更新,offer方法釋放鎖後,有其它線程經過take/poll拿到鎖,讀到leader == null,而後將自身更新爲leader。 * * 對於這兩種狀況統一的處理邏輯就是隻要leader爲thisThread,則清leader爲null用以接下來判斷是否須要喚醒後繼線程。 */ if (leader == thisThread) leader = null; } } } } } finally { /* * 若是當前堆中無元素(根據堆頂判斷)則直接釋放鎖。 * * * 不然若是leader有值,說明當前線程必定不是leader,當前線程不用去喚醒後續等待線程。 * 不然由當前線程來喚醒後繼等待線程。不過這並不表明當前線程原來是leader。 */ if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
因爲和take方法套路差很少,這裏不展開細講了。
ScheduledThreadPoolExecutor支持任務取消的時候快速從隊列中移除,由於大部分狀況下隊列中的元素是ScheduledFutureTask類型,內部維護了heapIndex也即在堆數組中的索引。
堆移除一個元素的時間複雜度是O(log n),前提是咱們須要知道待刪除元素在堆數組中的位置。若是咱們不維護heapIndex則須要遍歷整個堆數組來定位元素在堆數組的位置,這樣光是掃描一次堆數組複雜度就O(n)了。而維護了heapIndex,就能夠以O(1)的時間來確認位置,從而能夠更快的移除元素。
public boolean remove(Object x) { final ReentrantLock lock = this.lock; lock.lock(); try { int i = indexOf(x); if (i < 0) return false; setIndex(queue[i], -1); /* * 堆的刪除某個元素操做就是將最後一個元素移到那個元素。 * 這時候有可能須要向上調整堆,也可能須要向下維護。 * * 對於小根堆而言,若是移過去後比父元素小,則須要向上維護堆結構, * 不然將左右兩個子節點中較小值與當前元素比較,若是當前元素較大,則須要向下維護堆結構。 */ int s = --size; RunnableScheduledFuture<?> replacement = queue[s]; queue[s] = null; // 若是參數x就是堆數組中最後一個元素則刪除操做已經完畢了。 if (s != i) { // 嘗試向下維護堆。 siftDown(i, replacement); // 相等說明replacement比子節點都要小,嘗試向上維護堆。 if (queue[i] == replacement) siftUp(i, replacement); } return true; } finally { lock.unlock(); } } private int indexOf(Object x) { if (x != null) { if (x instanceof ScheduledFutureTask) { int i = ((ScheduledFutureTask) x).heapIndex; // 再次判斷i確實是本線程池的,由於remove方法的參數x徹底能夠是個其它池子裏拿到的ScheduledFutureTask。 if (i >= 0 && i < size && queue[i] == x) return i; } else { for (int i = 0; i < size; i++) if (x.equals(queue[i])) return i; } } return -1; }
在瞭解了ScheduledFutureTask與DelayedWorkQueue以後最後再來看ScheduledThreadPoolExecutor自己的方法,就顯得容易不少。
這裏咱們來介紹一些ScheduledThreadPoolExecutor以及父類ThreadPoolExecutor中的方法。
這個方法在任務提交時,任務運行時都會被調用以校驗當前狀態是否能夠運行任務。
boolean canRunInCurrentRunState(boolean periodic) { /* * isRunningOrShutdown的參數爲布爾值,true則表示shutdown狀態也返回true,不然只有running狀態返回ture。 * 若是爲週期性任務則根據continueExistingPeriodicTasksAfterShutdown來判斷是否shutdown了仍然能夠執行。 * 不然根據executeExistingDelayedTasksAfterShutdown來判斷是否shutdown了仍然能夠執行。 */ return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown); }
ScheduledThreadPoolExecutor任務提交的入口方法主要是execute, schedule, scheduleAtFixedRate以及scheduleWithFixedDelay這幾類。
/** * 覆蓋了父類execute的實現,以零延時任務的形式實現。 */ public void execute(Runnable command) { schedule(command, 0, NANOSECONDS); } public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); // 包裝ScheduledFutureTask。 RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); delayedExecute(t); return t; } public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); // fixed-rate模式period爲正數。 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); // 包裝ScheduledFutureTask,默認返回自己。 RunnableScheduledFuture<Void> t = decorateTask(command, sft); // 將構造出的ScheduledFutureTask的outerTask設置爲通過包裝的結果。 sft.outerTask = t; delayedExecute(t); return t; } public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); // fixed-delay模式delay爲正數。 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); // 包裝ScheduledFutureTask,默認返回自己。 RunnableScheduledFuture<Void> t = decorateTask(command, sft); // 將構造出的ScheduledFutureTask的outerTask設置爲通過包裝的結果。 sft.outerTask = t; delayedExecute(t); return t; }
private void delayedExecute(RunnableScheduledFuture<?> task) { // 非RUNNING態,根據飽和策略處理任務。 if (isShutdown()) reject(task); else { // 往work queue中插入任務。 super.getQueue().add(task); /* * 檢查任務是否能夠被執行。 * 若是任務不該該被執行,而且從隊列中成功移除的話(說明沒被worker拿取執行),則調用cancel取消任務。 */ if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) // 參數中false表示不試圖中斷執行任務的線程。 task.cancel(false); else ensurePrestart(); } } /** * 這是父類ThreadPoolExecutor的方法用於確保有worker線程來執行任務。 void ensurePrestart() { int wc = workerCountOf(ctl.get()); // worker數目小於corePoolSize,則添加一個worker。 if (wc < corePoolSize) addWorker(null, true); // wc==orePoolSize==0的狀況也添加一個worker。 else if (wc == 0) addWorker(null, false); }
void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true)) { // 塞到工做隊列中。 super.getQueue().add(task); // 再次檢查是否能夠執行,若是不能執行且任務還在隊列中未被取走則取消任務。 if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); } }
onShutdown方法是ThreadPoolExecutor的一個鉤子方法,會在shutdown方法中被調用,默認實現爲空。而ScheduledThreadPoolExecutor覆蓋了此方法用於刪除並取消工做隊列中的不須要再執行的任務。
@Override void onShutdown() { BlockingQueue<Runnable> q = super.getQueue(); // shutdown是否仍然執行延時任務。 boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); // shutdown是否仍然執行週期任務。 boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); // 若是二者皆不可則對隊列中全部RunnableScheduledFuture調用cancel取消並清空隊列。 if (!keepDelayed && !keepPeriodic) { for (Object e : q.toArray()) if (e instanceof RunnableScheduledFuture<?>) ((RunnableScheduledFuture<?>) e).cancel(false); q.clear(); } else { for (Object e : q.toArray()) { if (e instanceof RunnableScheduledFuture) { RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e; /* * 不須要執行的任務刪除並取消。 * 已經取消的任務也須要從隊列中刪除。 * 因此這裏就判斷下是否須要執行或者任務是否已經取消。 */ if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || t.isCancelled()) { if (q.remove(t)) t.cancel(false); } } } } // 由於任務被從隊列中清理掉,因此這裏須要調用tryTerminate嘗試躍遷executor的狀態。 tryTerminate(); }
本文介紹了ScheduledThreadPoolExecutor的原理與源碼實現。
ScheduledThreadPoolExecutor內部使用了ScheduledFutureTask來表示任務,即便對於execute方法也將其委託至schedule方法,以零延時的形式實現。同時ScheduledThreadPoolExecutor也容許咱們經過decorateTask方法來包裝任務以實現定製化的封裝。
而ScheduledThreadPoolExecutor內部使用的阻塞隊列DelayedWorkQueue經過小根堆來實現優先隊列的功能。因爲DelayedWorkQueue是無界的,因此本質上對於ScheduledThreadPoolExecutor而言,maximumPoolSize並無意義。總體而言,ScheduledThreadPoolExecutor處理兩類任務--延時任務與週期任務。經過ScheduledFutureTask.period的是否爲零屬於哪一類,經過ScheduledFutureTask.period的正負性來判斷屬於週期任務中的fixed-rate模式仍是fixed-delay模式。而且提供了經過參數來控制延時任務與週期任務在線程池被關閉時是否須要被取消並移除出隊列(若是還在隊列)以及是否容許執行(若是已經被worker線程取出)。
DelayedWorkQueue使用了Leader/Follower來避免沒必要要的等待,只讓leader來等待須要等待的時間,其他線程無限等待直至被喚醒便可。
DelayedWorkQueue全部的堆調整方法都維護了類型爲ScheduledFutureTask的元素的heapIndex,以下降cancel的時間複雜度。
下面整理一下ScheduledThreadPoolExecutor中幾個重要參數。