從圖中咱們能夠看到ScheduledThreadPoolExecutor繼承ThreadPoolExecutor實現了ScheduledExecutorService接口。它至關於提供了"延遲"和"週期執行"功能的ThreadPoolExecutor,還有兩個重要內部類DelayedWorkQueue和ScheduledFutureTask數據結構
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
複製代碼
由於其繼承了ThreadPoolExecutor,調用了ThreadLocalExecutor的構造方法。當核心線程數達到corePoolSize,會將任務提交給有界阻塞隊列DelayedWorkQueue。ScheduledThreadPoolExecutor線程池最大線程數爲Integer.MAX_VALUE測試
ScheduledThreadPoolExecutor實現了ScheduledExecutorService接口,該接口提供了以下方法:this
// 在給定延遲後,執行Runnable任務
public ScheduledFuture schedule(Runnable command,
long delay, TimeUnit unit);
// 在給定延遲後,執行Callable任務
public ScheduledFuture schedule(Callable callable,
long delay, TimeUnit unit);
// 給定延遲(initialDelay)以後,隨後以給定時間(period)爲週期執行任務
// 即執行將在initialDelay以後開始,而後是initialDelay+period,
// 再是initialDelay + 2*period,依此類推
// 若是上一個任務沒有執行完畢,則須要等上一個任務執行完畢後當即執行
public ScheduledFuture scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
// 建立並執行在給定的初始延遲(initialDelay)以後首先啓用的按期操做
// 隨後每一個任務執行的終止和下一個執行的開始之間給定的延遲(delay)
public ScheduledFuture scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
複製代碼
第1、第二個schedule方法都是一次性操做只不過入參一個是Runnable,一個是callable
scheduleAtFixedRate、scheduleWithFixedDelay方法能夠看以下示例spa
public static void main(String[] args) {
SimpleDateFormat sdf = new SimpleDateFormat("hh:MM:ss");
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
Runnable task1 = () -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "測試" + sdf.format(new Date()));
};
Runnable task2 = () -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "測試" + sdf.format(new Date()));
};
executorService.scheduleAtFixedRate(task1, 0, 2, TimeUnit.SECONDS);
executorService.scheduleWithFixedDelay(task2, 0, 2, TimeUnit.SECONDS);
}
輸出:
pool-1-thread-1測試11:12:37
pool-1-thread-2測試11:12:37
pool-1-thread-1測試11:12:40
pool-1-thread-2測試11:12:42
pool-1-thread-1測試11:12:43
pool-1-thread-1測試11:12:46
pool-1-thread-2測試11:12:47
複製代碼
週期間隔2秒,任務耗時3秒
scheduleAtFixedRate方法:
1.若任務耗時超過週期間隔,則須要等待上個任務完成下個任務才能執行
2.若任務耗時小於週期間隔,則下個任務按週期間隔執行任務
scheduleWithFixedDelay方法:
1.下任務等到上個任務執行完成+週期間隔以後才執行任務線程
public ScheduledFuture schedule(Callable callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture t = decorateTask(callable,
new ScheduledFutureTask(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
複製代碼
先參數校驗,再構造task,最後調用delayedExecute()方法延遲執行任務 ensurePrestart主要調用了addWorker方法,此方法主要作了兩件事:rest
private void delayedExecute(RunnableScheduledFuture task) { // 判斷線程池是否處於RUNNING狀態,不處於則根據相應拒絕策略拒絕任務 if (isShutdown()) reject(task); else { // 往阻塞隊列中添加任務 super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } }複製代碼void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); } 複製代碼複製代碼void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); } 複製代碼
當線程執行任務,都會調用到任務的run()方法code
public void run() {
// 判斷是不是週期任務
boolean periodic = isPeriodic();
// 判斷當前線程狀態是否能執行任務
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 不是週期性任務,直接執行任務
else if (!periodic)
ScheduledFutureTask.super.run();
// 如果週期性任務,設置下次執行任務的時間
else if (ScheduledFutureTask.super.runAndReset()) {
// 設置任務下次執行時間
setNextRunTime();
// 將下次任務往阻塞隊列中添加
reExecutePeriodic(outerTask);
}
}
複製代碼
1.先判斷該任務是否能夠執行,若不能執行則調用cancel方法取消
2.再判斷是不是週期性任務,若不是直接執行
3.最後調用runAndReset方法執行任務並重置,setNextRunTime方法設置任務下次的執行時間,reExecutePeriodic方法從新把任務添加到隊列中.orm
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
void reExecutePeriodic(RunnableScheduledFuture task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
複製代碼
ScheduledThreadPoolExecutor是把任務添加到DelayedWorkQueue中,它是一個基於堆的數據結構,經過ScheduledFutureTask的compareTo方法比較大小,小的排在前面,大的排在後面cdn
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask x = (ScheduledFutureTask)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
複製代碼
首先按照time排序,time小的排在前面,大的排在後面,若time相同,則使用sequenceNumber排序,小的排在前面,大的排在後面blog