ScheduledThreadPoolExecutor 也是一個線程池類,是線程池類 ThreadPoolExecutor 的子類。除了 ThreadPoolExecutor 相關的方法以外,它還增長了執行定時任務和週期性任務的方法。它的類簽名和繼承結構以下:app
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {}
異步
先看它的一個內部嵌套類 DelayedWorkQueue,該類是一個延遲隊列,它的類簽名和繼承結構以下:源碼分析
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {}
flex
ScheduledThreadPoolExecutor 有以下四個構造器:ui
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
this
它繼承了 FutureTask 類(可參考前文「JDK源碼分析-FutureTask」的分析),且實現了 RunnableScheduledFuture 接口,該接口定義以下:spa
public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {
// 一個任務是否週期性執行的,如果則能夠重複執行;不然只能運行一次
boolean isPeriodic();
}
線程
RunnableScheduledFuture 只定義了一個方法 isPeriodic,該方法用於判斷一個任務是不是週期性執行的。它繼承的 RunnableFuture 接口在前文 FutureTask 類中已進行分析,而 ScheduledFuture 接口如下:3d
先看它的主要成員變量:rest
// 定時任務執行的時間(單位:納秒)
private long time;
/**
* 重複執行的任務的時間間隔(單位:納秒)
* 正數表示固定頻率(fixed-rate)執行
* 負數表示固定延遲(fixed-delay)執行
* 零表示非重複執行的任務
*/
private final long period;
// reExecutePeriodic 方法中從新排隊的任務
RunnableScheduledFuture<V> outerTask = this;
// 延遲隊列中的索引位置,便於快速取消
int heapIndex;
構造器:
/**
* 構造器一:用給定的觸發時間(納秒),建立一個一次性任務
*/
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
/**
* 構造器二:用給定的觸發時間和間隔(納秒),建立一個週期性任務
*/
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
/**
* 構造器三:用給定的觸發時間(納秒),建立一個一次性任務
*/
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
該類是一個任務類,即 Runnable 接口的實現類,所以它最核心的就是 run 方法,以下:
public void run() {
// 是否爲週期性任務
boolean periodic = isPeriodic();
// 若任務不能執行,則取消
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 若爲非週期性任務
else if (!periodic)
// 若爲週期性任務,調用 ScheduledFutureTask 的父類(即 FutureTask)的 run 方法執行
ScheduledFutureTask.super.run();
// 若爲週期性任務,調用 ScheduledFutureTask 的父類(即 FutureTask)的 runAndReset 方法執行
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime(); // 設置下一次執行時間
reExecutePeriodic(outerTask); // 週期性執行
}
}
reExecutePeriodic 方法以下:
/**
* 該方法主要是將週期性任務從新排隊
* 它的實現與 delayedExecute 方法(後面分析)邏輯有些相似
*/
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
schedule 方法 1:其做用是延遲指定的時間後執行任務(即執行定時任務),只會執行一次。
public ScheduledFuture > schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
// 把用戶提交的 Runnable 對象包裝爲 RunnableScheduledFuture 對象
// decorateTask 方法默認返回第二個參數
// decorateTask 方法的修飾符是 protected,可根據需求自行擴展
RunnableScheduledFuture > t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
// 執行給定的任務
delayedExecute(t);
return t;
}
delayExecute 方法:
/*
* 延遲或週期性任務的主要執行方法。
* 若線程池已關閉,則拒絕該任務(執行拒絕策略);
* 不然將任務添加到工做隊列,如有須要啓動一個線程去執行。
* 若在添加任務時關閉了線程池,則將其從隊列移除並取消該任務
*/
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 若線程池已關閉,則執行拒絕策略
if (isShutdown())
reject(task);
else {
// 將該任務添加到任務隊列(即前面的延遲隊列)
super.getQueue().add(task);
// 若當前任務沒法執行,則將其從隊列移除而且取消執行(相似事務的回滾操做)
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
// 任務能夠執行,如有須要新增線程以執行該任務
else
ensurePrestart();
}
}
schedule 方法 2:
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
scheduleAtFixedRate 方法:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// 將 Runnable 對象包裝爲 ScheduledFutureTask 對象
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
scheduleWithFixedDelay 方法:
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
這兩個方法是 Executor 接口和 ExecutorService 接口所定義的方法,代碼實現以下:
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
public Future<?> submit(Runnable task) {
return schedule(task, 0, NANOSECONDS);
}
它們內部直接調用了 schedule(Runnable) 方法。另外兩個 submit 方法:
public <T> Future<T> submit(Runnable task, T result) {
return schedule(Executors.callable(task, result), 0, NANOSECONDS);
}
public <T> Future<T> submit(Callable<T> task) {
return schedule(task, 0, NANOSECONDS);
}