public class TestScheduledThreadPool { private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); public static void main (String[] args) throws InterruptedException { scheduler.scheduleAtFixedRate(new Runnable() { @Override public void run () { System.out.println("command .. " + new Date()); } }, 0, 1, TimeUnit.SECONDS); } }
方法,指定延遲爲0,表示當即執行, 指定period爲1,以1s爲週期定時執行該任務。ide
//Executors.java public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } //ScheduledThreadPoolExecutor.java public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
public interface ScheduledExecutorService extends ExecutorService { // 指定command任務將在delay延遲後執行 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); // 指定callable任務將在delay延遲後執行 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); // 指定command任務將在delay延遲後執行,並且以設定頻率重複執行 // initialDelay + period 開始, initialDelay + n * period 處執行 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); // 建立並執行一個在給定初始延遲後首次啓用的按期操做,隨後在每一次執行終止和下一次執行開始之間 // 都存在給定的延遲。若是任務在任一一次執行時遇到異常,就會取消後續執行; // 不然,只能經過程序來顯式取消或終止該任務 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); }
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { /** FIFO隊列中的序列號,time相同,序列號小的排在前面 */ private final long sequenceNumber; /** 任務將要被執行的時間,也就是過時時間 */ private long time; /** * period == 0 當前任務是一次性的, 執行完畢後就退出 * period > 0 當前任務是fixed-delay任務,是固定延遲的定時可重複執行任務 * period < 0 當前任務是fixed-rate任務,是固定頻率的定時可重複執行任務 */ private final long period; /** The actual task to be re-enqueued by reExecutePeriodic */ RunnableScheduledFuture<V> outerTask = this; /** * Index into delay queue, to support faster cancellation. */ int heapIndex; //... 省略構造函數 // 當前任務還剩多久過時 public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); } // 隊列中的比較策略 public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; // time相同,序列號小的排在前面 else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; } //... 省略其餘方法 }
public class FutureTask<V> implements RunnableFuture<V> { /** * * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; // 初始狀態 private static final int COMPLETING = 1; // 執行中 private static final int NORMAL = 2; // 正常運行結束 private static final int EXCEPTIONAL = 3; // 運行中異常 private static final int CANCELLED = 4; // 任務被取消 private static final int INTERRUPTING = 5; // 任務正在被中斷 private static final int INTERRUPTED = 6; // 任務已經被中斷 }
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { // 參數校驗 if (command == null || unit == null) throw new NullPointerException(); // 任務轉換: 把command任務轉換爲ScheduledFutureTask RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); // 添加任務到延遲隊列 delayedExecute(t); return t; } // 將延遲時間轉換爲絕對時間, private long triggerTime(long delay, TimeUnit unit) { return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); } // 將當前的那描述加上延遲的nanos後的long型值 long triggerTime(long delay) { return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); } private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); // 調用FutureTask的構造方法 this.time = ns; this.period = 0; // 這裏表示任務是一次性的 this.sequenceNumber = sequencer.getAndIncrement(); } } // FutureTask.java public class FutureTask<V> implements RunnableFuture<V> { public FutureTask(Runnable runnable, V result) { // 將runnable轉化爲callable this.callable = Executors.callable(runnable, result); // 設置當前的任務狀態爲NEW this.state = NEW; // ensure visibility of callable } }
private void delayedExecute(RunnableScheduledFuture<?> task) { // 若是線程池關閉, 則執行拒絕策略 if (isShutdown()) reject(task); else { // 將任務添加到延遲隊列 super.getQueue().add(task); // 檢查線程池狀態,若是已經關閉,則從延遲隊列裏面刪除剛纔添加的任務 // 但此時可能線程池中的線程已經從任務隊列裏面移除了該任務 // 此時須要調用cancel 取消任務 if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else // 確保至少一個線程在處理任務 ensurePrestart(); } }
boolean canRunInCurrentRunState(boolean periodic) { return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown); }
void ensurePrestart() { int wc = workerCountOf(ctl.get()); // 增長核心線程數 if (wc < corePoolSize) addWorker(null, true); // 若是corePoolSize==0 也添加一個線程 else if (wc == 0) addWorker(null, false); }
public void run() { // 是否只執行一次 period != 0 boolean periodic = isPeriodic(); // 取消任務 if (!canRunInCurrentRunState(periodic)) cancel(false); // 任務只執行一次, 調用FutureTask的run else if (!periodic) ScheduledFutureTask.super.run(); // 定時執行 else if (ScheduledFutureTask.super.runAndReset()) { // 設置下一次運行時間 setNextRunTime(); // 從新加入延遲隊列 reExecutePeriodic(outerTask); } }
public void run() { // 若是任務不是NEW狀態 直接返回 // 若是是NEW, 可是cas設置當前任務的持有者爲當前線程失敗 也直接返回 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; // 再次判斷任務的狀態,避免兩次判斷狀態之間有其餘線程對任務狀態進行修改 if (c != null && state == NEW) { V result; boolean ran; try { // 執行任務 result = c.call(); // 執行成功 ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } // 若是執行任務成功 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
protected void set(V v) { // CAS 將當前任務的狀態 從 NEW 轉化 爲 COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; // 走到這裏只有一個線程會到這裏,設置任務狀態 爲NORMAL 正常結束 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
protected void setException(Throwable t) { // CAS 將當前任務的狀態 從 NEW 轉化 爲 COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; // 走到這裏只有一個線程會到這裏,設置任務狀態 爲EXCEPTIONAL,非正常結束 UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
// initialDelay : 提交任務後延遲多少時間開始執行任務 // delay : 當任務執行完畢後延長多少時間後再次運行任務 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { // 參數校驗 if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); // 任務轉換 period < 0 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; // 添加任務到隊列 delayedExecute(t); return t; }
注意這裏構造的ScheduledFutureTask的period<0,會致使boolean periodic = isPeriodic();
public void run() { // 是否只執行一次 period != 0 boolean periodic = isPeriodic(); // 取消任務 if (!canRunInCurrentRunState(periodic)) cancel(false); // 任務只執行一次, 調用FutureTask的run else if (!periodic) ScheduledFutureTask.super.run(); // 定時執行 else if (ScheduledFutureTask.super.runAndReset()) { // 設置下一次運行時間 setNextRunTime(); // 從新加入延遲隊列 reExecutePeriodic(outerTask); } }
protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } // 若是當前任務正常執行完畢而且任務狀態爲NEW 則返回true, 不然返回false return ran && s == NEW; }
// 設置下一次運行時間 private void setNextRunTime() { long p = period; if (p > 0) time += p; else // 延遲-p的時間 time = triggerTime(-p); }
// initialDelay : 提交任務後延遲多少時間開始執行任務 // period 固定週期 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); // period > 0 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
最終的執行規則爲:initialDelay + n * period 的 刻執行任務,若是當前任務執行的時間到了,不會併發執行,下一次執行的任務將會延遲執行。