自jdk1.5開始,Java開始提供ScheduledThreadPoolExecutor類來支持週期性任務的調度,在這以前,這些工做須要依靠Timer/TimerTask或者其它第三方工具來完成。但Timer有着很多缺陷,如Timer是單線程模式,調度多個週期性任務時,若是某個任務耗時較久就會影響其它任務的調度;若是某個任務出現異常而沒有被catch則可能致使惟一的線程死掉而全部任務都不會再被調度。ScheduledThreadPoolExecutor解決了不少Timer存在的缺陷。java
先來看看ScheduledThreadPoolExecutor的實現模型,它經過繼承ThreadPoolExecutor來重用線程池的功能,裏面作了幾件事情:工具
public
void
run() {
if
(isPeriodic())
runPeriodic();
else
ScheduledFutureTask.
super
.run();
}
|
若是不是週期性任務就直接執行任務(也就是else部分),這個主要是用於實現ScheduledThreadPoolExecutor#schedule(Callable callable, long delay, TimeUnit unit)和ScheduledThreadPoolExecutor#schedule(Runnable command, long delay, TimeUnit unit),後面會講到它們的實現,這裏先關注週期任務的執行方式。週期性任務執行的是runPeriodic(),看下它的實現:this
private
void
runPeriodic() {
boolean
ok = ScheduledFutureTask.
super
.runAndReset();
boolean
down = isShutdown();
// Reschedule if not cancelled and not shutdown or policy allows
if
(ok && (!down ||
(getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
!isTerminating()))) {
long
p = period;
if
(p >
0
)
time += p;
else
time = triggerTime(-p);
ScheduledThreadPoolExecutor.
super
.getQueue().add(
this
);
}
// This might have been the final executed delayed
// task. Wake up threads to check.
else
if
(down)
interruptIdleWorkers();
}
|
這裏能夠看到,先執行了任務自己(ScheduledFutureTask.super.runAndReset),這個調用有一個返回值,來看下它的實現:spa
protected
boolean
runAndReset() {
return
sync.innerRunAndReset();
}
|
跟進去看下innerRunAndReset():線程
boolean
innerRunAndReset() {
if
(!compareAndSetState(
0
, RUNNING))
return
false
;
try
{
runner = Thread.currentThread();
callable.call();
// don't set result
runner =
null
;
return
compareAndSetState(RUNNING,
0
);
}
catch
(Throwable ex) {
innerSetException(ex);
return
false
;
}
}
|
能夠發現,這裏須要關注的是第三個return,也就是若是任務執行出現了異常,會被catch且返回false.rest
繼續看runPeriodic()方法,if裏面,若是剛纔任務執行的返回值是true且線程池還在運行就在if塊中的操做,若是線程池被關閉了就作else if裏的操做。也就是說,若是以前的任務執行出現的異常返回了false,那麼if裏以及else if裏的代碼都不會執行了,那有什麼影響?接下來看看if裏作了什麼。code
if裏的代碼很簡單,分爲兩部分,一是計算這個任務下次調度的間隔,二是將任務從新放回隊列中。回到出現異常的狀況,若是剛纔的任務執行出現了異常,就不會將任務再放回隊列中,換而言之,也就是這個任務再也得不到調度了!可是,這並不影響其它週期任務的調度。blog
綜上,能夠看到,ScheduledThreadPoolExecutor執行週期性任務的模型就是:調度一次任務,計算並設置該任務下次間隔,將任務放回隊列中供線程池執行。這裏的隊列起了很大的做用,且有一些特色:距離下次調度間隔短的任務老是在隊首,隊首的任務若距離下次調度的間隔時間大於0就沒法從該隊列的take()方法中拿到任務。排序
接下來看看ScheduledThreadPoolExecutor#schedule(Callable callable, long delay, TimeUnit unit)和ScheduledThreadPoolExecutor#schedule(Runnable command, long delay, TimeUnit unit)這兩個非週期性任務的實現方式,先看看它們的源碼:繼承
public
ScheduledFuture<?> schedule(Runnable command,
long
delay,
TimeUnit unit) {
if
(command ==
null
|| unit ==
null
)
throw
new
NullPointerException();
ScheduledFutureTask<?> t =
new
ScheduledFutureTask<Boolean>(command,
null
, triggerTime(delay, unit));
delayedExecute(t);
return
t;
}
|
public
<V> ScheduledFuture<V> schedule(Callable<V> callable,
long
delay,
TimeUnit unit) {
if
(callable ==
null
|| unit ==
null
)
throw
new
NullPointerException();
ScheduledFutureTask<V> t =
new
ScheduledFutureTask<V>(callable, triggerTime(delay, unit));
delayedExecute(t);
return
t;
}
|
private
void
delayedExecute(Runnable command) {
if
(isShutdown()) {
reject(command);
return
;
}
// Prestart a thread if necessary. We cannot prestart it
// running the task because the task (probably) shouldn't be
// run yet, so thread will just idle until delay elapses.
if
(getPoolSize() < getCorePoolSize())
prestartCoreThread();
super
.getQueue().add(command);
}
|
實現方式也很簡單,在建立ScheduledThreadPoolExecutor內部任務(即ScheduledFutureTask)的時候就將調度間隔計算並設置好,若是當前線程數小於設置的核心線程數,就啓動一個線程(多是線程池剛啓動裏面尚未線程,也多是裏面的線程執行任務時掛掉了。若是線程池中的線程掛掉了而又沒有調用這些schedule方法誰去補充掛掉的線程?不用擔憂,線程池本身會處理的)去監聽隊列裏的任務,而後將任務放到隊列裏,在任務執行間隔不大於0的時候,線程就能夠拿到這個任務並執行。
週期性任務的入口(ScheduledThreadPoolExecutor#scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)和ScheduledThreadPoolExecutor#scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit))與非週期性任務是相似的,它們處理方式不一樣的地方在於前文說到的ScheduledFutureTask#run()中。