文本將主要講述 ThreadPoolExecutor
一個特殊的子類 ScheduledThreadPoolExecutor
,主要用於執行週期性任務;因此在看本文以前最好先了解一下 ThreadPoolExecutor
,能夠參考 ThreadPoolExecutor 詳解;另外 ScheduledThreadPoolExecutor
中使用了延遲隊列,主要是基於徹底二叉堆實現的,能夠參考 徹底二叉堆;html
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {}
在源碼中能夠看到,ScheduledThreadPoolExecutor
的狀態管理、入隊操做、拒絕操做等都是繼承於 ThreadPoolExecutor
;ScheduledThreadPoolExecutor
主要是提供了週期任務和延遲任務相關的操做;java
就 ScheduledThreadPoolExecutor
的運行邏輯而言,大體能夠表述爲:dom
其內部結構如圖所示:ide
這裏須要注意的:函數
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { private final long sequenceNumber; // 任務序號,從 AtomicLong sequencer 獲取,當延遲時間相同時,序號小的先出 private long time; // 下次任務執行時間 private final long period; // 0 表示非週期任務,正值表示固定頻率週期任務,負值表示固定延遲週期任務 RunnableScheduledFuture<V> outerTask = this; // 重複執行的任務,傳入的任務可使用 decorateTask() 從新包裝 int heapIndex; // 隊列索引 }
其中最重要的方法必然是 run 方法了:源碼分析
public void run() { boolean periodic = isPeriodic(); // 是否爲週期任務,period != 0 if (!canRunInCurrentRunState(periodic)) // 當前狀態可否繼續運行,詳細測試後面還會講到 cancel(false); // 取消任務 else if (!periodic) // 不是週期任務時,直接運行 ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { // 時週期任務 setNextRunTime(); // 設置下次執行時間 reExecutePeriodic(outerTask); // 從新入隊 } }
public boolean cancel(boolean mayInterruptIfRunning) { boolean cancelled = super.cancel(mayInterruptIfRunning); // 設置中斷狀態 if (cancelled && removeOnCancel && heapIndex >= 0) // 當設置 removeOnCancel 狀態時,移除任務 remove(this); // 默認爲 false return cancelled; }
void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true)) { // 若是當前狀態能夠執行 super.getQueue().add(task); // 則從新入隊 if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); // 確保有線程執行任務 } }
此外還有 DelayedWorkQueue,可是這裏不許備講了,能夠查看 徹底二叉堆 瞭解實現的原理;測試
scheduleAtFixedRate
和 scheduleWithFixedDelay
是咱們最經常使用的兩個方法,可是他們的區別可能不是很清楚,這裏重點講一下,this
// 測試 ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1); pool.scheduleAtFixedRate(() -> { sleep(1000); // 睡眠 1s, log.info("run task"); }, 1, 2, TimeUnit.SECONDS); // 延遲 1s,週期 2s
// 打印
[19:41:28,489 INFO ] [pool-1-thread-1] - run task
[19:41:30,482 INFO ] [pool-1-thread-1] - run task
[19:41:32,483 INFO ] [pool-1-thread-1] - run task
[19:41:34,480 INFO ] [pool-1-thread-1] - run task線程
能夠看到的確時固定週期 2s 執行的,可是若是任務執行時間超過週期呢?3d
// 測試 ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1); pool.scheduleAtFixedRate(() -> { int i = 2000 + random.nextInt(3) * 1000; sleep(i); log.info("run task, sleep :{}", i); }, 1, 2, TimeUnit.SECONDS); // 延遲 1s,週期 2s
// 打印
[19:42:53,428 INFO ] [pool-1-thread-1] - run task, sleep :2000
[19:42:55,430 INFO ] [pool-1-thread-1] - run task, sleep :2000
[19:42:59,430 INFO ] [pool-1-thread-1] - run task, sleep :4000
[19:43:02,434 INFO ] [pool-1-thread-1] - run task, sleep :3000
[19:43:06,436 INFO ] [pool-1-thread-1] - run task, sleep :4000
能夠看到若是任務執行時間超出週期時,下一次任務會馬上運行;就好像週期是一個有彈性的袋子,能裝下運行時間的時候,是固定大小,裝不下的時候就會被撐大,圖像化表示以下:
// 測試 ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1); pool.scheduleAtFixedRate(() -> { int i = 1000 + random.nextInt(5) * 1000; sleep(i); log.info("run task, sleep :{}", i); }, 1, 2, TimeUnit.SECONDS); // 延遲 1s,週期 2s
// 打印
[20:05:40,682 INFO ] [pool-1-thread-1] - run task, sleep :1000
[20:05:45,686 INFO ] [pool-1-thread-1] - run task, sleep :3000
[20:05:49,689 INFO ] [pool-1-thread-1] - run task, sleep :2000
[20:05:55,690 INFO ] [pool-1-thread-1] - run task, sleep :4000
[20:06:01,692 INFO ] [pool-1-thread-1] - run task, sleep :4000
能夠看到不管執行時間是多少,其結果都是在執行完畢後,停頓固定的時間,而後執行下一次任務,其圖形化表示爲:
public void execute(Runnable command) { schedule(command, 0, NANOSECONDS); } public <T> Future<T> submit(Callable<T> task) { return schedule(task, 0, NANOSECONDS); } public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<?> t = decorateTask( command,new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); delayedExecute(t); return t; } public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<V> t = decorateTask( callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit))); delayedExecute(t); return t; }
能夠看到全部的週期任務,最終執行的都是 delayedExecute
方法,其中 decorateTask
是一個鉤子函數,其之類能夠利用他對任務進行重構過濾等操做;
private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); // 若是線程池已經關閉,則拒絕任務 else { super.getQueue().add(task); // 任務入隊 if (isShutdown() && // 再次檢查,線程池是否關閉 !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); // 確保有線程執行任務 } }
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); // 注意這裏添加的是正值 RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); // 注意這裏添加的是負值 RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
從上面代碼能夠看到 scheduleAtFixedRate
和 scheduleWithFixedDelay
只有週期任務的時間不一樣,其餘的都同樣,那麼下面咱們看一下他們的任務時間計算;
public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); } private void setNextRunTime() { long p = period; if (p > 0) // 正值表示 scheduleAtFixedRate time += p; // 無論任務執行時間,直接加上週期時間,也就是一次任務超時,會影響後續任務的執行, // 超時的時候,getDelay 是負值,因此在延遲隊列中必然排在最前面,馬上被取出執行 else time = triggerTime(-p); // 計算觸發時間 } long triggerTime(long delay) { // 這裏能夠看到,每次的確是在當前時間的基礎上,加上延遲時間; return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); }
這裏特別要注意 scheduleAtFixedRate 一次任務超時,會持續影響後面的任務週期安排,因此在設定週期的時候要特別注意; 例如:
// 測試 ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1); pool.scheduleAtFixedRate(() -> { int i = random.nextInt(5) * 1000; sleep(i); log.info("run task, sleep :{}", i); }, 1, 2, TimeUnit.SECONDS);
// 打印
[20:29:11,310 INFO ] [pool-1-thread-1] - run task, sleep :1000
[20:29:16,304 INFO ] [pool-1-thread-1] - run task, sleep :4000
[20:29:19,304 INFO ] [pool-1-thread-1] - run task, sleep :3000
[20:29:21,305 INFO ] [pool-1-thread-1] - run task, sleep :2000
[20:29:22,305 INFO ] [pool-1-thread-1] - run task, sleep :1000
[20:29:23,306 INFO ] [pool-1-thread-1] - run task, sleep :1000
[20:29:27,306 INFO ] [pool-1-thread-1] - run task, sleep :4000
[20:29:30,307 INFO ] [pool-1-thread-1] - run task, sleep :3000
如圖所示:
private volatile boolean continueExistingPeriodicTasksAfterShutdown; //關閉後繼續執行週期任務,默認false private volatile boolean executeExistingDelayedTasksAfterShutdown = true; //關閉後繼續執行延遲任務,默認true private volatile boolean removeOnCancel = false; // 取消任務是,從隊列中刪除任務,默認 false @Override void onShutdown() { BlockingQueue<Runnable> q = super.getQueue(); boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); // 繼續延遲任務 boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); // 繼續週期任務 if (!keepDelayed && !keepPeriodic) { // 都是 false,直接清除 for (Object e : q.toArray()) if (e instanceof RunnableScheduledFuture<?>) ((RunnableScheduledFuture<?>) e).cancel(false); q.clear(); } else { // Traverse snapshot to avoid iterator exceptions for (Object e : q.toArray()) { if (e instanceof RunnableScheduledFuture) { RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e; if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || t.isCancelled()) { // also remove if already cancelled if (q.remove(t)) t.cancel(false); } } } } tryTerminate(); }