ScheduledThreadPoolExecutor實現原理

 

自jdk1.5開始,Java開始提供ScheduledThreadPoolExecutor類來支持週期性任務的調度,在這以前,這些工做須要依靠Timer/TimerTask或者其它第三方工具來完成。但Timer有着很多缺陷,如Timer是單線程模式,調度多個週期性任務時,若是某個任務耗時較久就會影響其它任務的調度;若是某個任務出現異常而沒有被catch則可能致使惟一的線程死掉而全部任務都不會再被調度。ScheduledThreadPoolExecutor解決了不少Timer存在的缺陷。java

先來看看ScheduledThreadPoolExecutor的實現模型,它經過繼承ThreadPoolExecutor來重用線程池的功能,裏面作了幾件事情:工具

  • 爲線程池設置了一個DelayedWorkQueue,該queue同時具備PriorityQueue(優先級大的元素會放到隊首)和DelayQueue(若是隊列裏第一個元素的getDelay返回值大於0,則take調用會阻塞)的功能
  • 將傳入的任務封裝成ScheduledFutureTask,這個類有兩個特色,實現了java.lang.Comparable和java.util.concurrent.Delayed接口,也就是說裏面有兩個重要的方法:compareTo和getDelay。ScheduledFutureTask裏面存儲了該任務距離下次調度還須要的時間(使用的是基於System#nanoTime實現的相對時間,不會由於系統時間改變而改變,如距離下次執行還有10秒,不會由於將系統時間調前6秒而變成4秒後執行)。getDelay方法就是返回當前時間(運行getDelay的這個時刻)距離下次調用之間的時間差;compareTo用於比較兩個任務的優先關係,距離下次調度間隔較短的優先級高。那麼,當有任務丟進上面說到的DelayedWorkQueue時,由於它有DelayQueue(DelayQueue的內部使用PriorityQueue來實現的)的功能,因此新的任務會與隊列中已經存在的任務進行排序,距離下次調度間隔短的任務排在前面,也就是說這個隊列並非先進先出的;另外,在調用DelayedWorkQueue的take方法的時候,若是沒有元素,會阻塞,若是有元素而第一個元素的getDelay返回值大於0(前面說過已經排好序了,第一個元素的getDelay不會大於後面元素的getDelay返回值),也會一直阻塞。
  • ScheduledFutureTask提供了一個run的實現,線程池執行的就是這個run方法。看看run的源碼(本文的代碼取自hotspot1.5.0_22,jdk後續版本的代碼可能已經不同了,如jdk1.7中使用了本身實現的DelayedWorkQueue,而再也不使用PriorityQueue做爲存儲,不過從外面看它們的行爲仍是同樣的,因此並不影響對ScheduledThreadPoolExecutor調度機制的理解):

     

    public void run() {
         if (isPeriodic())
             runPeriodic();
         else
             ScheduledFutureTask. super .run();
    }

    若是不是週期性任務就直接執行任務(也就是else部分),這個主要是用於實現ScheduledThreadPoolExecutor#schedule(Callable callable, long delay, TimeUnit unit)和ScheduledThreadPoolExecutor#schedule(Runnable command, long delay, TimeUnit unit),後面會講到它們的實現,這裏先關注週期任務的執行方式。週期性任務執行的是runPeriodic(),看下它的實現:this

    private void runPeriodic() {
         boolean ok = ScheduledFutureTask. super .runAndReset();
         boolean down = isShutdown();
         // Reschedule if not cancelled and not shutdown or policy allows
         if (ok && (!down ||
                    (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
                     !isTerminating()))) {
             long p = period;
             if (p > 0 )
                 time += p;
             else
                 time = triggerTime(-p);
             ScheduledThreadPoolExecutor. super .getQueue().add( this );
         }
         // This might have been the final executed delayed
         // task.  Wake up threads to check.
         else if (down)
             interruptIdleWorkers();
    }

    這裏能夠看到,先執行了任務自己(ScheduledFutureTask.super.runAndReset),這個調用有一個返回值,來看下它的實現:spa

    protected boolean runAndReset() {
         return sync.innerRunAndReset();
    }

    跟進去看下innerRunAndReset():線程

    boolean innerRunAndReset() {
         if (!compareAndSetState( 0 , RUNNING))
             return false ;
         try {
             runner = Thread.currentThread();
             callable.call(); // don't set result
             runner = null ;
             return compareAndSetState(RUNNING, 0 );
         } catch (Throwable ex) {
             innerSetException(ex);
             return false ;
         }
    }

    能夠發現,這裏須要關注的是第三個return,也就是若是任務執行出現了異常,會被catch且返回false.rest

    繼續看runPeriodic()方法,if裏面,若是剛纔任務執行的返回值是true且線程池還在運行就在if塊中的操做,若是線程池被關閉了就作else if裏的操做。也就是說,若是以前的任務執行出現的異常返回了false,那麼if裏以及else if裏的代碼都不會執行了,那有什麼影響?接下來看看if裏作了什麼。code

    if裏的代碼很簡單,分爲兩部分,一是計算這個任務下次調度的間隔,二是將任務從新放回隊列中。回到出現異常的狀況,若是剛纔的任務執行出現了異常,就不會將任務再放回隊列中,換而言之,也就是這個任務再也得不到調度了!可是,這並不影響其它週期任務的調度。blog

綜上,能夠看到,ScheduledThreadPoolExecutor執行週期性任務的模型就是:調度一次任務,計算並設置該任務下次間隔,將任務放回隊列中供線程池執行。這裏的隊列起了很大的做用,且有一些特色:距離下次調度間隔短的任務老是在隊首,隊首的任務若距離下次調度的間隔時間大於0就沒法從該隊列的take()方法中拿到任務。排序

接下來看看ScheduledThreadPoolExecutor#schedule(Callable callable, long delay, TimeUnit unit)和ScheduledThreadPoolExecutor#schedule(Runnable command, long delay, TimeUnit unit)這兩個非週期性任務的實現方式,先看看它們的源碼:繼承

public ScheduledFuture<?> schedule(Runnable command,
                                    long delay,
                                    TimeUnit unit) {
     if (command == null || unit == null )
         throw new NullPointerException();
     ScheduledFutureTask<?> t =
         new ScheduledFutureTask<Boolean>(command, null , triggerTime(delay, unit));
     delayedExecute(t);
     return t;
}
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                        long delay,
                                        TimeUnit unit) {
     if (callable == null || unit == null )
         throw new NullPointerException();
     ScheduledFutureTask<V> t =
         new ScheduledFutureTask<V>(callable, triggerTime(delay, unit));
     delayedExecute(t);
     return t;
}
private void delayedExecute(Runnable command) {
     if (isShutdown()) {
         reject(command);
         return ;
     }
     // Prestart a thread if necessary. We cannot prestart it
     // running the task because the task (probably) shouldn't be
     // run yet, so thread will just idle until delay elapses.
     if (getPoolSize() < getCorePoolSize())
         prestartCoreThread();
 
     super .getQueue().add(command);
}

實現方式也很簡單,在建立ScheduledThreadPoolExecutor內部任務(即ScheduledFutureTask)的時候就將調度間隔計算並設置好,若是當前線程數小於設置的核心線程數,就啓動一個線程(多是線程池剛啓動裏面尚未線程,也多是裏面的線程執行任務時掛掉了。若是線程池中的線程掛掉了而又沒有調用這些schedule方法誰去補充掛掉的線程?不用擔憂,線程池本身會處理的)去監聽隊列裏的任務,而後將任務放到隊列裏,在任務執行間隔不大於0的時候,線程就能夠拿到這個任務並執行。

週期性任務的入口(ScheduledThreadPoolExecutor#scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)和ScheduledThreadPoolExecutor#scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit))與非週期性任務是相似的,它們處理方式不一樣的地方在於前文說到的ScheduledFutureTask#run()中。

相關文章
相關標籤/搜索