最近新接手的項目裏大量使用了ScheduledThreadPoolExecutor類去執行一些定時任務,以前一直沒有機會研究這個類的源碼,此次趁着機會好好研讀一下。面試
原文地址:www.jianshu.com/p/18f4c95ac…bash
該類主要仍是基於ThreadPoolExecutor類進行二次開發,因此對Java線程池執行過程還不瞭解的同窗建議先看看我以前的文章。
當面試官問線程池時,你應該知道些什麼?數據結構
與ThreadPoolExecutor不一樣,向ScheduledThreadPoolExecutor中提交任務的時候,任務被包裝成ScheduledFutureTask對象加入延遲隊列並啓動一個woker線程。函數
用戶提交的任務加入延遲隊列時,會按照執行時間進行排列,也就是說隊列頭的任務是須要最先執行的。而woker線程會從延遲隊列中獲取任務,若是已經到了任務的執行時間,則開始執行。不然阻塞等待剩餘延遲時間後再嘗試獲取任務。優化
任務執行完成之後,若是該任務是一個須要週期性反覆執行的任務,則計算好下次執行的時間後會從新加入到延遲隊列中。ui
首先看下ScheduledThreadPoolExecutor類的幾個構造函數:this
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}複製代碼
注:這裏構造函數都是使用super,其實就是ThreadPoolExecutor的構造函數
這裏有三點須要注意:spa
再次說明:上面三點的理解須要先了解ThreadPoolExecutor的知識點。線程
當咱們建立出一個調度線程池之後,就能夠開始提交任務了。這裏依次分析一下三個經常使用API的源碼:rest
首先是schedule方法,該方法是指任務在指定延遲時間到達後觸發,只會執行一次。
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
//參數校驗
if (command == null || unit == null)
throw new NullPointerException();
//這裏是一個嵌套結構,首先把用戶提交的任務包裝成ScheduledFutureTask
//而後在調用decorateTask進行包裝,該方法是留給用戶去擴展的,默認是個空方法
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
//包裝好任務之後,就進行提交了
delayedExecute(t);
return t;
}複製代碼
重點看一下提交任務的源碼:
private void delayedExecute(RunnableScheduledFuture<?> task) {
//若是線程池已經關閉,則使用拒絕策略把提交任務拒絕掉
if (isShutdown())
reject(task);
else {
//與ThreadPoolExecutor不一樣,這裏直接把任務加入延遲隊列
super.getQueue().add(task);
//若是當前狀態沒法執行任務,則取消
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
//這裏是增長一個worker線程,避免提交的任務沒有worker去執行
//緣由就是該類沒有像ThreadPoolExecutor同樣,woker滿了才放入隊列
ensurePrestart();
}
}複製代碼
這裏的關鍵點其實就是super.getQueue().add(task)行代碼,ScheduledThreadPoolExecutor類在內部本身實現了一個基於堆數據結構的延遲隊列。add方法最終會落到offer方法中,一塊兒看下:
public boolean offer(Runnable x) {
//參數校驗
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
//查看當前元素數量,若是大於隊列長度則進行擴容
int i = size;
if (i >= queue.length)
grow();
//元素數量加1
size = i + 1;
//若是當前隊列尚未元素,則直接加入頭部
if (i == 0) {
queue[0] = e;
//記錄索引
setIndex(e, 0);
} else {
//把任務加入堆中,並調整堆結構,這裏就會根據任務的觸發時間排列
//把須要最先執行的任務放在前面
siftUp(i, e);
}
//若是新加入的元素就是隊列頭,這裏有兩種狀況
//1.這是用戶提交的第一個任務
//2.新任務進行堆調整之後,排在隊列頭
if (queue[0] == e) {
//這個變量起優化做用,後面說
leader = null;
//加入元素之後,喚醒worker線程
available.signal();
}
} finally {
lock.unlock();
}
return true;
}複製代碼
經過上面的邏輯,咱們把提交的任務成功加入到了延遲隊列中,前面說了加入任務之後會開啓一個woker線程,該線程的任務就是從延遲隊列中不斷取出任務執行。這些都是跟ThreadPoolExecutor相同的,咱們看下從該延遲隊列中獲取元素的源碼:
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
//取出隊列中第一個元素,即最先須要執行的任務
RunnableScheduledFuture<?> first = queue[0];
//若是隊列爲空,則阻塞等待加入元素時喚醒
if (first == null)
available.await();
else {
//計算任務執行時間,這個delay是當前時間減去任務觸發時間
long delay = first.getDelay(NANOSECONDS);
//若是到了觸發時間,則執行出隊操做
if (delay <= 0)
return finishPoll(first);
first = null;
//這裏表示該任務已經分配給了其餘線程,當前線程等待喚醒就能夠
if (leader != null)
available.await();
else {
//不然把給任務分配給當前線程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//當前線程等待任務剩餘延遲時間
available.awaitNanos(delay);
} finally {
//這裏線程醒來之後,何時leader會發生變化呢?
//就是上面的添加任務的時候
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//若是隊列不爲空,則喚醒其餘woker線程
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}複製代碼
這裏爲何會加入一個leader變量來分配阻塞隊列中的任務呢?緣由是要減小沒必要要的時間等待。好比說如今隊列中的第一個任務1分鐘後執行,那麼用戶提交新的任務時會不斷的加入woker線程,若是新提交的任務都排在隊列後面,也就是說新的woker如今都會取出這第一個任務進行執行延遲時間的等待,當該任務到觸發時間時,會喚醒不少woker線程,這顯然是沒有必要的。
當任務被woker線程取出之後,會執行run方法,因爲此時任務已經被包裝成了ScheduledFutureTask對象,那咱們來看下該類的run方法:
public void run() {
boolean periodic = isPeriodic();
//若是當前線程池已經不支持執行任務,則取消
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
//若是不須要週期性執行,則直接執行run方法而後結束
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
//若是須要週期執行,則在執行完任務之後,設置下一次執行時間
setNextRunTime();
//把任務從新加入延遲隊列
reExecutePeriodic(outerTask);
}
}複製代碼
上面就是schedule方法完整的執行過程。
ScheduledThreadPoolExecutor類中關於週期性執行的任務提供了兩個方法scheduleAtFixedRate跟scheduleWithFixedDelay,一塊兒看下區別。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
//刪除沒必要要的邏輯,重點看區別
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
//兩者惟一區別
unit.toNanos(period));
//...
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
//...
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
//兩者惟一區別
unit.toNanos(-delay));
//..
}複製代碼
前者把週期延遲時間傳入ScheduledFutureTask中,然後者卻設置成負數傳入,區別在哪裏呢?看下當任務執行完成之後的收尾工做中設置任務下次執行時間的方法setNextRunTime源碼:
private void setNextRunTime() {
long p = period;
//大於0是scheduleAtFixedRate方法,表示執行時間是根據初始化參數計算的
if (p > 0)
time += p;
else
//小於0是scheduleWithFixedDelay方法,表示執行時間是根據當前時間從新計算的
time = triggerTime(-p);
}複製代碼
也就是說當使用scheduleAtFixedRate方法提交任務時,任務後續執行的延遲時間都已經肯定好了,分別是initialDelay,initialDelay + period,initialDelay + 2 * period以此類推。
而調用scheduleWithFixedDelay方法提交任務時,第一次執行的延遲時間爲initialDelay,後面的每次執行時間都是在前一次任務執行完成之後的時間點上面加上period延遲執行。
ScheduledThreadPoolExecutor能夠說是在ThreadPoolExecutor上面進行了一些擴展操做,它只是從新包裝了任務以及阻塞隊列。該類的阻塞隊列DelayedWorkQueue是基於堆去實現的,本文沒有太詳細介紹堆結構插入跟刪除數據的調整工做,感興趣的同窗能夠私信或者評論交流。