咱們在上一篇學習了ThreadPoolExecutor的實現原理:Java併發包源碼學習系列:線程池ThreadPoolExecutor源碼解析java
本篇咱們來學習一下在它基礎之上的擴展:ScheduledThreadPoolExecutor。它繼承了ThreadPoolExecutor並實現了ScheduledExecutorService接口,是一個能夠在指定必定延遲時間後或者定時進行任務調度執行的線程池。編程
public class TestScheduledThreadPool { private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); public static void main (String[] args) throws InterruptedException { scheduler.scheduleAtFixedRate(new Runnable() { @Override public void run () { System.out.println("command .. " + new Date()); } }, 0, 1, TimeUnit.SECONDS); } }
簡單看一個demo吧,這裏使用Executors工具類建立ScheduledExecutorService,起始就是實例化了一個ScheduledThreadPoolExecutor,固然咱們自定義也是能夠的。併發
接着調用scheduleAtFixedRate
方法,指定延遲爲0,表示當即執行, 指定period爲1,以1s爲週期定時執行該任務。ide
從總體感知ScheduledThreadPoolExecutor的執行函數
//Executors.java public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } //ScheduledThreadPoolExecutor.java public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
ScheduledExecutorService表明可在指定延遲後或週期性地執行線程任務線程池,提供了以下4個方法:工具
public interface ScheduledExecutorService extends ExecutorService { // 指定command任務將在delay延遲後執行 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); // 指定callable任務將在delay延遲後執行 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); // 指定command任務將在delay延遲後執行,並且以設定頻率重複執行 // initialDelay + period 開始, initialDelay + n * period 處執行 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); // 建立並執行一個在給定初始延遲後首次啓用的按期操做,隨後在每一次執行終止和下一次執行開始之間 // 都存在給定的延遲。若是任務在任一一次執行時遇到異常,就會取消後續執行; // 不然,只能經過程序來顯式取消或終止該任務 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); }
能夠按照DelayQueue中的Delayed的元素理解,是具體放入延遲隊列中的東西,能夠看到實現了getDelay和compareTo方法。學習
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { /** FIFO隊列中的序列號,time相同,序列號小的排在前面 */ private final long sequenceNumber; /** 任務將要被執行的時間,也就是過時時間 */ private long time; /** * period == 0 當前任務是一次性的, 執行完畢後就退出 * period > 0 當前任務是fixed-delay任務,是固定延遲的定時可重複執行任務 * period < 0 當前任務是fixed-rate任務,是固定頻率的定時可重複執行任務 */ private final long period; /** The actual task to be re-enqueued by reExecutePeriodic */ RunnableScheduledFuture<V> outerTask = this; /** * Index into delay queue, to support faster cancellation. */ int heapIndex; //... 省略構造函數 // 當前任務還剩多久過時 public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); } // 隊列中的比較策略 public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; // time相同,序列號小的排在前面 else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; } //... 省略其餘方法 }
FutureTask內部使用一個state變量表示任務狀態。this
public class FutureTask<V> implements RunnableFuture<V> { /** * * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; // 初始狀態 private static final int COMPLETING = 1; // 執行中 private static final int NORMAL = 2; // 正常運行結束 private static final int EXCEPTIONAL = 3; // 運行中異常 private static final int CANCELLED = 4; // 任務被取消 private static final int INTERRUPTING = 5; // 任務正在被中斷 private static final int INTERRUPTED = 6; // 任務已經被中斷 }
提交一個延遲執行的任務,任務從提交時間算起延遲單位爲unit的delay時間後開始執行。.net
若是提交的任務不是週期性的任務,任務只會執行一次。線程
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { // 參數校驗 if (command == null || unit == null) throw new NullPointerException(); // 任務轉換: 把command任務轉換爲ScheduledFutureTask RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); // 添加任務到延遲隊列 delayedExecute(t); return t; } // 將延遲時間轉換爲絕對時間, private long triggerTime(long delay, TimeUnit unit) { return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); } // 將當前的那描述加上延遲的nanos後的long型值 long triggerTime(long delay) { return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); } private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); // 調用FutureTask的構造方法 this.time = ns; this.period = 0; // 這裏表示任務是一次性的 this.sequenceNumber = sequencer.getAndIncrement(); } } // FutureTask.java public class FutureTask<V> implements RunnableFuture<V> { public FutureTask(Runnable runnable, V result) { // 將runnable轉化爲callable this.callable = Executors.callable(runnable, result); // 設置當前的任務狀態爲NEW this.state = NEW; // ensure visibility of callable } }
private void delayedExecute(RunnableScheduledFuture<?> task) { // 若是線程池關閉, 則執行拒絕策略 if (isShutdown()) reject(task); else { // 將任務添加到延遲隊列 super.getQueue().add(task); // 檢查線程池狀態,若是已經關閉,則從延遲隊列裏面刪除剛纔添加的任務 // 但此時可能線程池中的線程已經從任務隊列裏面移除了該任務 // 此時須要調用cancel 取消任務 if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else // 確保至少一個線程在處理任務 ensurePrestart(); } }
判斷當前任務是否應該被取消。
boolean canRunInCurrentRunState(boolean periodic) { return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown); }
periodic參數經過isPeriodic()
獲得,若是period爲0,則爲false。
相應的isRunningOrShutdown方法傳入的參數就應該是executeExistingDelayedTasksAfterShutdown,默認爲true,表示:其餘線程調用了shutdown命令關閉線程池後,當前任務仍是要執行
確保至少一個線程在處理任務:若是線程個數小於核心線程池數則新增一個線程,不然若是當前線程數爲0,則新增一個線程。
void ensurePrestart() { int wc = workerCountOf(ctl.get()); // 增長核心線程數 if (wc < corePoolSize) addWorker(null, true); // 若是corePoolSize==0 也添加一個線程 else if (wc == 0) addWorker(null, false); }
具體執行任務的線程是Worker線程,任務執行是Worker線程調用任務的潤方法執行,這裏的任務是ScheduledFutureTask,也就是調用它的run方法。
public void run() { // 是否只執行一次 period != 0 boolean periodic = isPeriodic(); // 取消任務 if (!canRunInCurrentRunState(periodic)) cancel(false); // 任務只執行一次, 調用FutureTask的run else if (!periodic) ScheduledFutureTask.super.run(); // 定時執行 else if (ScheduledFutureTask.super.runAndReset()) { // 設置下一次運行時間 setNextRunTime(); // 從新加入延遲隊列 reExecutePeriodic(outerTask); } }
public void run() { // 若是任務不是NEW狀態 直接返回 // 若是是NEW, 可是cas設置當前任務的持有者爲當前線程失敗 也直接返回 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; // 再次判斷任務的狀態,避免兩次判斷狀態之間有其餘線程對任務狀態進行修改 if (c != null && state == NEW) { V result; boolean ran; try { // 執行任務 result = c.call(); // 執行成功 ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } // 若是執行任務成功 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
protected void set(V v) { // CAS 將當前任務的狀態 從 NEW 轉化 爲 COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; // 走到這裏只有一個線程會到這裏,設置任務狀態 爲NORMAL 正常結束 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
protected void setException(Throwable t) { // CAS 將當前任務的狀態 從 NEW 轉化 爲 COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; // 走到這裏只有一個線程會到這裏,設置任務狀態 爲EXCEPTIONAL,非正常結束 UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
針對任務類型爲fixed-delay,當任務執行完畢後,讓其延遲固定時間後再次運行,原理是:
// initialDelay : 提交任務後延遲多少時間開始執行任務 // delay : 當任務執行完畢後延長多少時間後再次運行任務 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { // 參數校驗 if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); // 任務轉換 period < 0 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; // 添加任務到隊列 delayedExecute(t); return t; }
注意這裏構造的ScheduledFutureTask的period<0,會致使boolean periodic = isPeriodic();
的結果是true,所以在ScheduledFutureTask的run邏輯中,會調用FutureTask的runAndReset()方法。
具體執行任務的線程是Worker線程,任務執行是Worker線程調用任務的潤方法執行,這裏的任務是ScheduledFutureTask,也就是調用它的run方法。
public void run() { // 是否只執行一次 period != 0 boolean periodic = isPeriodic(); // 取消任務 if (!canRunInCurrentRunState(periodic)) cancel(false); // 任務只執行一次, 調用FutureTask的run else if (!periodic) ScheduledFutureTask.super.run(); // 定時執行 else if (ScheduledFutureTask.super.runAndReset()) { // 設置下一次運行時間 setNextRunTime(); // 從新加入延遲隊列 reExecutePeriodic(outerTask); } }
相比於FutureTask的run方法,該方法邏輯差很少,但缺乏了:在任務正常執行完後設置狀態的步驟。緣由在於:讓任務成爲可重複執行的任務。
protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } // 若是當前任務正常執行完畢而且任務狀態爲NEW 則返回true, 不然返回false return ran && s == NEW; }
若是該方法返回true,將會調用setNextRunTime()設置下一次的運行時間,接着調用reExecutePeriodic(outerTask)從新加入任務隊列。
// 設置下一次運行時間 private void setNextRunTime() { long p = period; if (p > 0) time += p; else // 延遲-p的時間 time = triggerTime(-p); }
針對任務類型爲fixed-rate,相對起始時間點以固定頻率調用指定的任務。
// initialDelay : 提交任務後延遲多少時間開始執行任務 // period 固定週期 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); // period > 0 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
它和scheduleWithFixedDelay相似,區別在於:
最終的執行規則爲:initialDelay + n * period 的 刻執行任務,若是當前任務執行的時間到了,不會併發執行,下一次執行的任務將會延遲執行。