1.ScheduledThreadPoolExecutor
總體結構剖析。
根據上面類圖圖能夠看到Executor實際上是一個工具類,裏面提供了好多靜態方法,根據用戶選擇返回不一樣的線程池實例。能夠看到ScheduledThreadPoolExecutor
繼承了 ThreadPoolExecutor
並實現 ScheduledExecutorService
接口。線程池隊列是 DelayedWorkQueue
,和 DelayedQueue
相似是一個延遲隊列。算法
ScheduledFutureTask
是具備返回值的任務,繼承自 FutureTask,FutureTask 內部有個變量 state 用來表示任務的狀態,一開始狀態爲 NEW,全部狀態爲:併發
private static final int NEW = 0;//初始狀態 private static final int COMPLETING = 1;//執行中狀態 private static final int NORMAL = 2;//正常運行結束狀態 private static final int EXCEPTIONAL = 3;//運行中異常 private static final int CANCELLED = 4;//任務被取消 private static final int INTERRUPTING = 5;//任務正在被中斷 private static final int INTERRUPTED = 6;//任務已經被中斷
FutureTask可能的任務狀態轉換路徑以下所示:函數
NEW -> COMPLETING -> NORMAL //初始狀態->執行中->正常結束 NEW -> COMPLETING -> EXCEPTIONAL//初始狀態->執行中->執行異常 NEW -> CANCELLED//初始狀態->任務取消 NEW -> INTERRUPTING -> INTERRUPTED//初始狀態->被中斷中->被中斷
其實ScheduledFutureTask
內部還有個變量 period 用來表示任務的類型,其任務類型以下:工具
period=0,說明當前任務是一次性的,執行完畢後就退出了。this
period 爲負數,說明當前任務爲 fixed-delay 任務,是定時可重複執行任務。spa
period 爲整數,說明當前任務爲 fixed-rate 任務,是定時可重複執行任務。線程
接下來咱們能夠看到ScheduledThreadPoolExecutor
的造函數以下rest
//使用改造後的Delayqueue. public ScheduledThreadPoolExecutor(int corePoolSize) { //調用父類ThreadPoolExecutor的構造函數 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue()); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
根據上面代碼能夠看到線程池隊列是 DelayedWorkQueue
code
咱們主要看三個重要的函數,以下所示:blog
schedule(Runnable command, long delay,TimeUnit unit) scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit) scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
schedule(Runnable command, long delay,TimeUnit unit)
方法該方法做用是提交一個延遲執行的任務,任務從提交時間算起延遲 unit 單位的 delay 時間後開始執行,提交的任務不是週期性任務,任務只會執行一次,代碼以下:
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { //(1)參數校驗 if (command == null || unit == null) throw new NullPointerException(); //(2)任務轉換 RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); //(3)添加任務到延遲隊列 delayedExecute(t); return t; }
能夠看到上面代碼所示,代碼(1)參數校驗,若是 command 或者 unit 爲 null,拋出 NPE 異常。
代碼(2)裝飾任務,把提交的 command 任務轉換爲 ScheduledFutureTask
,ScheduledFutureTask
是具體放入到延遲隊列裏面的東西,因爲是延遲任務,因此 ScheduledFutureTask
實現了 long getDelay(TimeUnit unit)
和 int compareTo(Delayed other)
方法,triggerTime 方法轉換延遲時間爲絕對時間,也就是把當前時間的納秒數加上延遲的納秒數後的 long 型值。
接下來咱們須要看 ScheduledFutureTask
的構造函數,以下所示:
ScheduledFutureTask(Runnable r, V result, long ns) { //調用父類FutureTask的構造函數 super(r, result); this.time = ns; this.period = 0;//period爲0,說明爲一次性任務 this.sequenceNumber = sequencer.getAndIncrement(); }
根據構造函數能夠看到內部首先調用了父類 FutureTask 的構造函數,父類 FutureTask 的構造函數代碼以下:
//經過適配器把runnable轉換爲callable public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; //設置當前任務狀態爲NEW }
根據上面代碼能夠看到FutureTask 中任務又被轉換爲了 Callable 類型後,保存到了變量 this.callable 裏面,並設置 FutureTask 的任務狀態爲 NEW。
而後 ScheduledFutureTask
構造函數內部設置 time 爲上面說的絕對時間,須要注意這裏 period 的值爲 0,這說明當前任務爲一次性任務,不是定時反覆執行任務。
其中 long getDelay(TimeUnit unit)
方法代碼以下,用來獲取當前任務還有多少時間就過時了,代碼以下所示:
//元素過時算法,裝飾後時間-當前時間,就是即將過時剩餘時間 public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); }
接下來接着看compareTo(Delayed other)
方法,代碼以下:
public int compareTo(Delayed other) { if (other == this) // compare zero ONLY if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ? 0 : ((d < 0) ? -1 : 1); }
根據上面代碼的執行邏輯,能夠看到compareTo 做用是加入元素到延遲隊列後,內部創建或者調整堆時候會使用該元素的 compareTo 方法與隊列裏面其餘元素進行比較,讓最快要過時的元素放到隊首。因此不管何時向隊列裏面添加元素,隊首的的元素都是最即將過時的元素。
接下來接着看代碼(3)添加任務到延遲隊列,delayedExecute 的代碼以下:
private void delayedExecute(RunnableScheduledFuture<?> task) { //(4)若是線程池關閉了,則執行線程池拒絕策略 if (isShutdown()) reject(task); else { //(5)添加任務到延遲隊列 super.getQueue().add(task); //(6)再次檢查線程池狀態 if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else //(7)確保至少一個線程在處理任務 ensurePrestart(); } }
能夠看到代碼(4)首先判斷當前線程池是否已經關閉了,若是已經關閉則執行線程池的拒絕策略(若是不知道線程池的拒絕策略能夠看前一篇線程池的介紹。)
否者執行代碼(5)添加任務到延遲隊列。添加完畢後還要從新檢查線程池是否被關閉了,若是已經關閉則從延遲隊列裏面刪除剛纔添加的任務,可是有可能線程池線程已經從任務隊列裏面移除了該任務,也就是該任務已經在執行了,因此還須要調用任務的 cancle 方法取消任務。
若是代碼(6)判斷結果爲 false,則會執行代碼(7)確保至少有一個線程在處理任務,即便核心線程數 corePoolSize 被設置爲 0.
ensurePrestart 代碼以下:
void ensurePrestart() { int wc = workerCountOf(ctl.get()); //增長核心線程數 if (wc < corePoolSize) addWorker(null, true); //若是初始化corePoolSize==0,則也添加一個線程。 else if (wc == 0) addWorker(null, false); } }
如上代碼首先首先獲取線程池中線程個數,若是線程個數小於核心線程數則新增一個線程,否者若是當前線程數爲 0 則新增一個線程。
經過上面代碼咱們分析瞭如何添加任務到延遲隊列,下面咱們看線程池裏面的線程如何獲取並執行任務的,從前面講解的 ThreadPoolExecutor
咱們知道具體執行任務的線程是 Worker 線程,Worker 線程裏面調用具體任務的 run 方法進行執行,因爲這裏任務是 ScheduledFutureTask
,因此咱們下面看看 ScheduledFutureTask
的 run 方法。代碼以下:
public void run() { //(8)是否只執行一次 boolean periodic = isPeriodic(); //(9)取消任務 if (!canRunInCurrentRunState(periodic)) cancel(false); //(10)只執行一次,調用schdule時候 else if (!periodic) ScheduledFutureTask.super.run(); //(11)定時執行 else if (ScheduledFutureTask.super.runAndReset()) { //(11.1)設置time=time+period setNextRunTime(); //(11.2)從新加入該任務到delay隊列 reExecutePeriodic(outerTask); } }
能夠看到代碼(8)isPeriodic 的做用是判斷當前任務是一次性任務仍是可重複執行的任務,isPeriodic 的代碼以下:
public boolean isPeriodic() { return period != 0; }
可知內部是經過 period 的值來判斷,因爲轉換任務建立 ScheduledFutureTask 時候傳遞的 period 爲 0 ,因此這裏 isPeriodic 返回 false。
代碼(9)判斷當前任務是否應該被取消,canRunInCurrentRunState 的代碼以下:
boolean canRunInCurrentRunState(boolean periodic) { return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown); }
這裏傳遞的 periodic 爲 false,因此 isRunningOrShutdown
的參數爲 executeExistingDelayedTasksAfterShutdown
,executeExistingDelayedTasksAfterShutdown
默認是 true 標示當其它線程調用了 shutdown 命令關閉了線程池後,當前任務仍是要執行,否者若是爲 false,標示當前任務要被取消。
因爲 periodic 爲 false,因此執行代碼(10)調用父類 FutureTask 的 run 方法具體執行任務,FutureTask 的 run 方法代碼以下:
public void run() { //(12) if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; //(13) try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; //(13.1) setException(ex); } //(13.2) if (ran) set(result); } } finally { ...省略 } }
能夠看到代碼(12)若是任務狀態不是 NEW 則直接返回,或者若是當前任務狀態爲NEW可是使用 CAS 設置固然任務的持有者爲當前線程失敗則直接返回。代碼(13)具體調用 callable 的 call 方法執行任務,這裏在調用前又判斷了任務的狀態是否爲 NEW 是爲了不在執行代碼(12)後其餘線程修改了任務的狀態(好比取消了該任務)。
若是任務執行成功則執行代碼(13.2)修改任務狀態,set 方法代碼以下:
protected void set(V v) { //若是當前任務狀態爲NEW,則設置爲COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; //設置當前任務終狀爲NORMAL,也就是任務正常結束 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
如上代碼首先 CAS 設置當前任務狀態從 NEW 轉換到 COMPLETING,這裏多個線程調用時候只有一個線程會成功,成功的線程在經過 UNSAFE.putOrderedInt
設置任務的狀態爲正常結束狀態,這裏沒有用 CAS 是由於同一個任務只可能有一個線程能夠運行到這裏,這裏使用 putOrderedInt
比使用 CAS 函數或者 putLongVolatile
效率要高,而且這裏的場景不要求其它線程立刻對設置的狀態值可見。
這裏思考個問題,這裏何時多個線程會同時執行 CAS 設置任務狀態從態從 NEW 到 COMPLETING?其實當同一個 comand 被屢次提交到線程池時候就會存在這樣的狀況,因爲同一個任務共享一個狀態值 state。
若是任務執行失敗,則執行代碼(13.1),setException 的代碼以下,可見與 set 函數相似,代碼以下:
protected void setException(Throwable t) { //若是當前任務狀態爲NEW,則設置爲COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; //設置當前任務終態爲EXCEPTIONAL,也就是任務非正常結束 UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
finishCompletion(); } }
到這裏代碼(10)邏輯執行完畢,一次性任務也就執行完畢了,
下面會講到若是任務是可重複執行的,則不會執行步驟(10)而是執行代碼(11)。
當任務執行完畢後,延遲固定間隔時間後再次運行(fixed-delay 任務):其中 initialDelay 說明提交任務後延遲多少時間開始執行任務 command,delay 表示當任務執行完畢後延長多少時間後再次運行 command 任務,unit 是 initialDelay 和 delay 的時間單位。任務會一直重複運行直到任務運行時候拋出了異常或者取消了任務,或者關閉了線程池。scheduleWithFixedDelay
的代碼以下:
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { //(14)參數校驗 if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); //(15)任務轉換,注意這裏是period=-delay<0 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; //(16)添加任務到隊列 delayedExecute(t); return t;
}
如上代碼(14)進行參數校驗,校驗失敗則拋出異常,代碼(15)轉換 command 任務爲 ScheduledFutureTask
,這裏須要注意的是這裏傳遞給 ScheduledFutureTask
的 period 變量的值爲 -delay,period < 0 這個說明該任務爲可重複執行的任務。而後代碼(16)添加任務到延遲隊列後返回。
任務添加到延遲隊列後線程池線程會從隊列裏面獲取任務,而後調用 ScheduledFutureTask
的 run 方法執行,因爲這裏 period<0 因此 isPeriodic 返回 true,因此執行代碼(11),runAndReset 的代碼以下:
protected boolean runAndReset() { //(17) if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; //(18) boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { ... } return ran && s == NEW;//(19) }
該代碼和 FutureTask 的 run 相似,只是任務正常執行完畢後不會設置任務的狀態,這樣作是爲了讓任務成爲可重複執行的任務,這裏多了代碼(19)若是當前任務正常執行完畢而且任務狀態爲 NEW 則返回 true 否者返回 false。
若是返回了 true 則執行代碼(11.1)setNextRunTime
方法設置該任務下一次的執行時間,setNextRunTime
的代碼以下:
private void setNextRunTime() { long p = period; if (p > 0)//fixed-rate類型任務 time += p; else//fixed-delay類型任務 time = triggerTime(-p); }
如上代碼這裏 p < 0 說明當前任務爲 fixed-delay
類型任務,而後設置 time 爲當前時間加上 -p
的時間,也就是延遲 -p
時間後在次執行。
總結:本節介紹的 fixed-delay
類型的任務的執行實現原理以下,當添加一個任務到延遲隊列後,等 initialDelay 時間後,任務就會過時,過時的任務就會被從隊列移除,並執行,執行完畢後,會從新設置任務的延遲時間,而後在把任務放入延遲隊列實現的,依次往復。須要注意的是若是一個任務在執行某一個次時候拋出了異常,那麼這個任務就結束了,可是不影響其它任務的執行。
相對起始時間點固定頻率調用指定的任務(fixed-rate 任務):當提交任務到線程池後延遲 initialDelay 個時間單位爲 unit 的時間後開始執行任務 comand ,而後 initialDelay + period
時間點再次執行,而後在 initialDelay + 2 * period
時間點再次執行,依次往復,直到拋出異常或者調用了任務的 cancel 方法取消了任務在結束或者關閉了線程池。
scheduleAtFixedRate
的原理與 scheduleWithFixedDelay
相似,下面咱們講下不一樣點,首先調用 scheduleAtFixedRate
時候代碼以下:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { ... //裝飾任務類,注意period=period>0,不是負的 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); ... return t; }
如上代碼 fixed-rate
類型的任務在轉換 command
任務爲 ScheduledFutureTask
的時候設置的 period=period
不在是 -period
。
因此當前任務執行完畢後,調用 setNextRunTime
設置任務下次執行的時間時候執行的是 time += p
而不在是 time = triggerTime(-p);
。
總結:相對於 fixed-delay
任務來講,fixed-rate
方式執行規則爲時間爲 initdelday + n*period;
時候啓動任務,可是若是當前任務尚未執行完,下一次要執行任務的時間到了,不會併發執行,下次要執行的任務會延遲執行,要等到當前任務執行完畢後在執行一個任務。
ScheduledThreadPoolExecutor
的實現原理,其內部使用的 DelayQueue
來存放具體任務,其中任務分爲三種,其中一次性執行任務執行完畢就結束了,fixed-delay
任務保證同一個任務屢次執行之間間隔固定時間,fixed-rate
任務保證任務執行按照固定的頻率執行,其中任務類型使用 period
的值來區分。