在前面的文章中,咱們介紹了定時任務類 Timer ,他是 JDK 1.3 中出現的,位於 java.util 包下。而今天說的 ScheduledThreadPoolExecutor
的是在 JUC 包下,是 JDK1.5 新增的。java
今天就來講說這個類。程序員
該類內部結構和 Timer
仍是有點相似的,也是 3 個類:數據結構
ScheduledThreadPoolExecutor
:程序員使用的接口。DelayedWorkQueue
: 存儲任務的隊列。ScheduledFutureTask
: 執行任務的線程。構造方法介紹:this
// 使用給定核心池大小建立一個新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize)
// 使用給定初始參數建立一個新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler)
// 使用給定的初始參數建立一個新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory)
// 使用給定初始參數建立一個新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler)
複製代碼
ScheduledThreadPoolExecutor
最多支持 3 個參數:核心線程數量,線程工廠,拒絕策略。spa
爲何沒有最大線程數量?因爲 ScheduledThreadPoolExecutor
內部是個無界隊列,maximumPoolSize
也就沒有意思了。線程
再介紹一下他的 API 方法,請原諒我將 JDK 文檔照抄過來了,就當是備忘吧,以下:rest
protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) // 修改或替換用於執行 callable 的任務。 protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) // 修改或替換用於執行 runnable 的任務。 void execute(Runnable command) // 使用所要求的零延遲執行命令。 boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() // 獲取有關在此執行程序已 shutdown 的狀況下、是否繼續執行現有按期任務的策略。 boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() // 獲取有關在此執行程序已 shutdown 的狀況下是否繼續執行現有延遲任務的策略。 BlockingQueue<Runnable> getQueue() // 返回此執行程序使用的任務隊列。 boolean remove(Runnable task) // 從執行程序的內部隊列中移除此任務(若是存在),從而若是還沒有開始,則其再也不運行。 <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) // 建立並執行在給定延遲後啓用的 ScheduledFuture。 ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) // 建立並執行在給定延遲後啓用的一次性操做。 ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) // 建立並執行一個在給定初始延遲後首次啓用的按期操做,後續操做具備給定的週期;也就是將在 initialDelay 後開始執行,而後在 initialDelay+period 後執行,接着在 initialDelay + 2 * period 後執行,依此類推。 ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) // 建立並執行一個在給定初始延遲後首次啓用的按期操做,隨後,在每一次執行終止和下一次執行開始之間都存在給定的延遲。 void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) // 設置有關在此執行程序已 shutdown 的狀況下是否繼續執行現有按期任務的策略。 void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) // 設置有關在此執行程序已 shutdown 的狀況下是否繼續執行現有延遲任務的策略。 void shutdown() // 在之前已提交任務的執行中發起一個有序的關閉,可是不接受新任務。 List<Runnable> shutdownNow() // 嘗試中止全部正在執行的任務、暫停等待任務的處理,並返回等待執行的任務列表。 <T> Future<T> submit(Callable<T> task) // 提交一個返回值的任務用於執行,返回一個表示任務的未決結果的 Future。 Future<?> submit(Runnable task) // 提交一個 Runnable 任務用於執行,並返回一個表示該任務的 Future。 <T> Future<T> submit(Runnable task, T result) // 提交一個 Runnable 任務用於執行,並返回一個表示該任務的 Future。 複製代碼
最常用的幾個方法以下:code
// 使用給定核心池大小建立一個新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize)
// 建立並執行在給定延遲後啓用的一次性操做。
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
// 建立並執行一個在給定初始延遲後首次啓用的按期操做,後續操做具備給定的週期;也就是將在 initialDelay 後開始執行,而後在 initialDelay+period 後執行,接着在 initialDelay + 2 * period 後執行,依此類推。
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
// 建立並執行一個在給定初始延遲後首次啓用的按期操做,隨後,在每一次執行終止和下一次執行開始之間都存在給定的延遲。
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
複製代碼
除了默認的構造方法,還有 3 個 schedule
方法。咱們將分析他們內部的實現。cdn
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
複製代碼
咱們感興趣的就是這個 DelayedWorkQueue
隊列了。他也是一個阻塞隊列。這個隊列的數據結構是堆。同時,這個 queue
也是可比較的,比較什麼呢?任務必須實現 compareTo
方法,這個方法的比較邏輯是:比較任務的執行時間,若是任務的執行時間相同,則比較任務的加入時間。對象
所以,ScheduledFutureTask
有 2 個變量:
time
: 任務的執行時間。sequenceNumber
:任務的加入時間。這兩個變量就是用來比較任務的執行順序的。整個調度的順序就是這個邏輯。
剛剛說了,有 3 個 schedule
方法:
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
建立並執行在給定延遲後啓用的一次性操做。
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
建立並執行一個在給定初始延遲後首次啓用的按期操做,後續操做具備給定的週期;也就是將在initialDelay
後開始執行,而後在 initialDelay+period
後執行,接着在initialDelay + 2 * period
後執行,依此類推。
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
建立並執行一個在給定初始延遲後首次啓用的按期操做,隨後,在每一次執行終止和下一次執行開始之間都存在給定的延遲。
第一個方法執行在給定的時間後,執行一次就結束。
有點意思的地方是 第二個方法和 第三個方法,他們直接的區別。
這兩個方法均可以重複的調用。可是,重複調用的邏輯有所區別,這裏就是比 Timer 好用的地方。
他們的共同點在於:必須等待上個任務執行完畢才能執行下個任務。
不一樣點在於:他們調度的時間粗略是不一樣的。
scheduleAtFixedRate
方法的執行週期都是固定的,也就是,他是以上一個任務的開始執行時間做爲起點,加上以後的 period 時間,調度下次任務。
scheduleWithFixedDelay
方法則是以上一個任務的結束時間做爲起點,加上以後的 period 時間,調度下次任務。
有什麼區別呢?
如何任務執行時間很短,那就沒上面區別。可是,若是任務執行時間很長,超過了 period 時間,那麼區別就出來了。
咱們假設一下。
咱們設置 period
時間爲 2 秒,而任務耗時 5 秒。
這個兩個方法的區別就體現出來了。
scheduleAtFixedRate
方法將會在上一個任務結束完畢馬上執行,他和上一個任務的開始執行時間的間隔是 5 秒(由於必須等待上一個任務執行完畢)。
scheduleWithFixedDelay
方法將會在上一個任務結束後,注意:**再等待 2 秒,**纔開始執行,那麼他和上一個任務的開始執行時間的間隔是 7 秒。
因此,咱們在使用 ScheduledThreadPoolExecutor
的過程當中須要注意任務的執行時間不能超過間隔時間,若是超過了,最好使用scheduleAtFixedRate
方法,防止任務堆積。
固然,也和具體的業務有關。不能一律而論。但必定要注意這兩個方法的區別。
咱們看看 scheduleAtFixedRate 方法的內部實現。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
複製代碼
建立一個 ScheduledFutureTask
對象,而後裝飾一個這個Future
,該類實現是直接返回,子類能夠有本身的實現,在這個任務外裝飾一層。
而後執行 delayedExecute
方法,最後返回 Future
。
這個 ScheduledFutureTask
實現了不少接口,好比 Future
,Runnable
, Comparable
,Delayed
等。
ScheduledFutureTask
的構造方法以下:
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
複製代碼
層層遞進,該類首先經過一個原子靜態 int
對象這隻任務的入隊編號,而後建立一個 Callable
,這個 Callable 是一個適配器,適配了Runnable
和Callable
,也就是將Runnable
包裝成 callabe
, 他的 call
方法就是調用給定任務的 run 方法。固然,這裏的 result
是沒有什麼做用的。
若是你傳遞的是一個 callable
,那麼,就調用 FutureTask
的 run
方法,設置真正的返回值。
這裏使用了適配器模式,仍是挺有趣的。
總的來講,這個 ScheduledFutureTask
基於 FutureTask
, 關於 FutureTask
咱們以前從源碼介紹過了。
而他本身重寫了幾個方法:compareTo
, getDelay
, run
,isPeriodic
4 個方法。
咱們仍是要看看 delayedExecute
的實現。
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
// 添加進隊列。
super.getQueue().add(task);
// 若是線程池關閉了,且不能夠在當前狀態下運行任務,且從隊列刪除任務成功,就給任務打上取消標記。
// 第二個判斷是由兩個變量控制的(下面是默認值):
// continueExistingPeriodicTasksAfterShutdown = false 表示關閉的時候應取消週期性任務。默認關閉
// executeExistingDelayedTasksAfterShutdown = true。表示關閉的時候應取消非週期性的任務。默認不關閉。
// running 狀態下,canRunInCurrentRunState 一定返回 ture。
// 非 running 狀態下,canRunInCurrentRunState 根據上面的兩個值返回。
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 開始執行任務
ensurePrestart();
}
}
複製代碼
說說上面的方法。
ScheduledFutureTask
的 compareTo
方法來的,先比較執行時間,再比較添加順序。總體的過程如圖:
注意上面的圖,若是是週期性的任務,則會在執行完畢後,歸還隊列。
從哪裏能夠看出來呢?
ScheduledFutureTask
的 run
方法:
public void run() {
// 是不是週期性任務
boolean periodic = isPeriodic();
// 若是不能夠在當前狀態下運行,就取消任務(將這個任務的狀態設置爲CANCELLED)。
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 若是不是週期性的任務,調用 FutureTask # run 方法
else if (!periodic)
ScheduledFutureTask.super.run();
// 若是是週期性的。
// 執行任務,但不設置返回值,成功後返回 true。(callable 不能夠重複執行)
else if (ScheduledFutureTask.super.runAndReset()) {
// 設置下次執行時間
setNextRunTime();
// 再次將任務添加到隊列中
reExecutePeriodic(outerTask);
}
}
複製代碼
邏輯以下:
FutureTask
的 run
方法。runAndReset
方法。而管理整個執行過程的就是ScheduledThreadPoolExecutor
的父類 ThreadPoolExecutor
的runWorker
方法。其中,該方法會從隊列中取出數據,也就是調用隊列的 take
方法。
關於DelayedWorkQueue
的take
方法,其中有個 leader
變量,若是 leader 不是空,說明已經有線程在等待了,那就阻塞當前線程,若是是空,說明,隊列的第一個元素已經被更新了,就設置當前線程爲 leader
.
這是一個 Leader-Follower
模式,Doug Lea 說的。
固然,take
方法總體的邏輯仍是不變的。從隊列的頭部拿數據。使用 Condition
作線程之間的協調。
關於 ScheduledThreadPoolExecutor
調度類,咱們分析的差很少了,總結一下。
ScheduledThreadPoolExecutor
是個定時任務線程池,相似 Timer
,可是比 Timer
強大,健壯。
好比不會像 Timer 那樣,任務異常了,整個調度系統就完全無用了。
也比Timer
多了Rate
模式(Rate 和 Delay
)。
這兩種模式的區別就是任務執行的起點時間不一樣,Rate
是從上一個任務的開始執行時間開始計算;Delay
是從上一個任務的結束時間開始計算。
所以,若是任務自己的時間超過了間隔時間,那麼這兩種模式的間隔時間將會不一致。
而任務的排序是經過 ScheduledFutureTask
的 compareTo
方法排序的,規則是先比較執行時間,若是時間相同,再比較加入時間。
還要注意一點就是:若是任務執行過程當中異常了,那麼將不會再次重複執行。由於 ScheduledFutureTask
的 run
方法沒有作catch
處理。因此程序員須要手動處理,相對於 Timer 異常就直接費了調度系統來講,要好不少。
ScheduledThreadPoolExecutor
的是實現基於 ThreadPoolExecutor
,大部分功能都是重用的父類,只是本身在執行完畢以後,從新設置時間,並再次將任務還到了隊列中,造成了定時任務。