(手機橫屏看源碼更方便)java
注:java源碼分析部分如無特殊說明均基於 java8 版本。面試
注:本文基於ScheduledThreadPoolExecutor定時線程池類。spring
前面咱們一塊兒學習了普通任務、將來任務的執行流程,今天咱們再來學習一種新的任務——定時任務。數據結構
定時任務是咱們常常會用到的一種任務,它表示在將來某個時刻執行,或者將來按照某種規則重複執行的任務。源碼分析
(1)如何保證任務是在將來某個時刻才被執行?學習
(2)如何保證任務按照某種規則重複執行?this
建立一個定時線程池,用它來跑四種不一樣的定時任務。spa
public class ThreadPoolTest03 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 建立一個定時線程池
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(5);
System.out.println("start: " + System.currentTimeMillis());
// 執行一個無返回值任務,5秒後執行,只執行一次
scheduledThreadPoolExecutor.schedule(() -> {
System.out.println("spring: " + System.currentTimeMillis());
}, 5, TimeUnit.SECONDS);
// 執行一個有返回值任務,5秒後執行,只執行一次
ScheduledFuture<String> future = scheduledThreadPoolExecutor.schedule(() -> {
System.out.println("inner summer: " + System.currentTimeMillis());
return "outer summer: ";
}, 5, TimeUnit.SECONDS);
// 獲取返回值
System.out.println(future.get() + System.currentTimeMillis());
// 按固定頻率執行一個任務,每2秒執行一次,1秒後執行
// 任務開始時的2秒後
scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
System.out.println("autumn: " + System.currentTimeMillis());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
}, 1, 2, TimeUnit.SECONDS);
// 按固定延時執行一個任務,每延時2秒執行一次,1秒執行
// 任務結束時的2秒後,本文由公從號「彤哥讀源碼」原創
scheduledThreadPoolExecutor.scheduleWithFixedDelay(() -> {
System.out.println("winter: " + System.currentTimeMillis());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
}, 1, 2, TimeUnit.SECONDS);
}
}複製代碼
定時任務整體分爲四種:線程
(1)將來執行一次的任務,無返回值;rest
(2)將來執行一次的任務,有返回值;
(3)將來按固定頻率重複執行的任務;
(4)將來按固定延時重複執行的任務;
本文主要以第三種爲例進行源碼解析。
提交一個按固定頻率執行的任務。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
// 參數判斷
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// 將普通任務裝飾成ScheduledFutureTask
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
// 鉤子方法,給子類用來替換裝飾task,這裏認爲t==sft
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
// 延時執行
delayedExecute(t);
return t;
}複製代碼
能夠看到,這裏的處理跟將來任務相似,都是裝飾成另外一個任務,再拿去執行,不一樣的是這裏交給了delayedExecute()方法去執行,這個方法是幹嗎的呢?
延時執行。
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 若是線程池關閉了,執行拒絕策略
if (isShutdown())
reject(task);
else {
// 先把任務扔到隊列中去
super.getQueue().add(task);
// 再次檢查線程池狀態
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 保證有足夠有線程執行任務
ensurePrestart();
}
}
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
// 建立工做線程
// 注意,這裏沒有傳入firstTask參數,由於上面先把任務扔到隊列中去了
// 另外,沒用上maxPoolSize參數,因此最大線程數量在定時線程池中實際是沒有用的
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}複製代碼
到這裏就結束了?!
實際上,這裏只是控制任務能不能被執行,真正執行任務的地方在任務的run()方法中。
還記得上面的任務被裝飾成了ScheduledFutureTask類的實例嗎?因此,咱們只要看ScheduledFutureTask的run()方法就能夠了。
定時任務執行的地方。
public void run() {
// 是否重複執行
boolean periodic = isPeriodic();
// 線程池狀態判斷
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 一次性任務,直接調用父類的run()方法,這個父類其實是FutureTask
// 這裏咱們再也不講解,有興趣的同窗看看上一章的內容
else if (!periodic)
ScheduledFutureTask.super.run();
// 重複性任務,先調用父類的runAndReset()方法,這個父類也是FutureTask
// 本文主要分析下面的部分
else if (ScheduledFutureTask.super.runAndReset()) {
// 設置下次執行的時間
setNextRunTime();
// 重複執行,本文由公從號「彤哥讀源碼」原創
reExecutePeriodic(outerTask);
}
}複製代碼
能夠看到,對於重複性任務,先調用FutureTask的runAndReset()方法,再設置下次執行的時間,最後再調用reExecutePeriodic()方法。
FutureTask的runAndReset()方法與run()方法相似,只是其任務運行完畢後不會把狀態修改成NORMAL,有興趣的同窗點進源碼看看。
再來看看reExecutePeriodic()方法。
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
// 線程池狀態檢查
if (canRunInCurrentRunState(true)) {
// 再次把任務扔到任務隊列中
super.getQueue().add(task);
// 再次檢查線程池狀態
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
// 保證工做線程足夠
ensurePrestart();
}
}複製代碼
到這裏是否是豁然開朗了,原來定時線程池執行重複任務是在任務執行完畢後,又把任務扔回了任務隊列中。
重複性的問題解決了,那麼,它是怎麼控制任務在某個時刻執行的呢?
OK,這就輪到咱們的延時隊列登場了。
咱們知道,線程池執行任務時須要從任務隊列中拿任務,而普通的任務隊列,若是裏面有任務就直接拿出來了,可是延時隊列不同,它裏面的任務,若是沒有到時間也是拿不出來的,這也是前面分析中一上來就把任務扔進隊列且建立Worker沒有傳入firstTask的緣由。
說了這麼多,它究竟是怎麼實現的呢?
其實,延時隊列咱們在前面都詳細分析過,想看完整源碼分析的能夠看看以前的《死磕 java集合之DelayQueue源碼分析》。
延時隊列內部是使用「堆」這種數據結構來實現的,有興趣的同窗能夠看看以前的《拜託,面試別再問我堆(排序)了!》。
咱們這裏只拿一個take()方法出來分析。
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 加鎖
lock.lockInterruptibly();
try {
for (;;) {
// 堆頂任務
RunnableScheduledFuture<?> first = queue[0];
// 若是隊列爲空,則等待
if (first == null)
available.await();
else {
// 還有多久到時間
long delay = first.getDelay(NANOSECONDS);
// 若是小於等於0,說明這個任務到時間了,能夠從隊列中出隊了
if (delay <= 0)
// 出隊,而後堆化
return finishPoll(first);
// 還沒到時間
first = null;
// 若是前面有線程在等待,直接進入等待
if (leader != null)
available.await();
else {
// 當前線程做爲leader
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待上面計算的延時時間,再自動喚醒
available.awaitNanos(delay);
} finally {
// 喚醒後再次得到鎖後把leader再置空
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
// 至關於喚醒下一個等待的任務
available.signal();
// 解鎖,本文由公從號「彤哥讀源碼」原創
lock.unlock();
}
}複製代碼
大體的原理是,利用堆的特性獲取最快到時間的任務,即堆頂的任務:
(1)若是堆頂的任務到時間了,就讓它從隊列中了隊;
(2)若是堆頂的任務還沒到時間,就看它還有多久到時間,利用條件鎖等待這段時間,待時間到了後從新走(1)的判斷;
這樣就解決了能夠在指定時間後執行任務。
其實,ScheduledThreadPoolExecutor也是可使用execute()或者submit()提交任務的,只不過它們會被當成0延時的任務來執行一次。
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
public <T> Future<T> submit(Callable<T> task) {
return schedule(task, 0, NANOSECONDS);
}複製代碼
實現定時任務有兩個問題要解決,分別是指定將來某個時刻執行任務、重複執行。
(1)指定某個時刻執行任務,是經過延時隊列的特性來解決的;
(2)重複執行,是經過在任務執行後再次把任務加入到隊列中來解決的。
到這裏基本上普通的線程池的源碼解析就結束了,這種線程池是比較經典的實現方式,總體上來講,效率相對不是特別高,由於全部的工做線程共用同一個隊列,每次從隊列中取任務都要加鎖解鎖操做。
那麼,能不能給每一個工做線程配備一個任務隊列呢,在提交任務的時候就把任務分配給指定的工做線程,這樣在取任務的時候就不須要頻繁的加鎖解鎖了。
答案是確定的,下一章咱們一塊兒來看看這種基於「工做竊取」理論的線程池——ForkJoinPool。
歡迎關注個人公衆號「彤哥讀源碼」,查看更多源碼系列文章, 與彤哥一塊兒暢遊源碼的海洋。