線程池之ScheduledThreadPoolExecutor線程池源碼分析筆記

1.ScheduledThreadPoolExecutor 總體結構剖析。

1.1類圖介紹

 

根據上面類圖圖能夠看到Executor實際上是一個工具類,裏面提供了好多靜態方法,根據用戶選擇返回不一樣的線程池實例。能夠看到ScheduledThreadPoolExecutor 繼承了 ThreadPoolExecutor 並實現 ScheduledExecutorService接口。線程池隊列是 DelayedWorkQueue,和 DelayedQueue 相似是一個延遲隊列。算法

ScheduledFutureTask 是具備返回值的任務,繼承自 FutureTask,FutureTask 內部有個變量 state 用來表示任務的狀態,一開始狀態爲 NEW,全部狀態爲:併發

    private static final int NEW          = 0;//初始狀態
    private static final int COMPLETING   = 1;//執行中狀態
    private static final int NORMAL       = 2;//正常運行結束狀態
    private static final int EXCEPTIONAL  = 3;//運行中異常
    private static final int CANCELLED    = 4;//任務被取消
    private static final int INTERRUPTING = 5;//任務正在被中斷
    private static final int INTERRUPTED  = 6;//任務已經被中斷

FutureTask可能的任務狀態轉換路徑以下所示:函數

    NEW -> COMPLETING -> NORMAL //初始狀態->執行中->正常結束
    NEW -> COMPLETING -> EXCEPTIONAL//初始狀態->執行中->執行異常
    NEW -> CANCELLED//初始狀態->任務取消
    NEW -> INTERRUPTING -> INTERRUPTED//初始狀態->被中斷中->被中斷

其實ScheduledFutureTask 內部還有個變量 period 用來表示任務的類型,其任務類型以下:工具

  • period=0,說明當前任務是一次性的,執行完畢後就退出了。this

  • period 爲負數,說明當前任務爲 fixed-delay 任務,是定時可重複執行任務。spa

  • period 爲整數,說明當前任務爲 fixed-rate 任務,是定時可重複執行任務。線程

 

接下來咱們能夠看到ScheduledThreadPoolExecutor 的造函數以下rest

    //使用改造後的Delayqueue.
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        //調用父類ThreadPoolExecutor的構造函數
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue());
    }
     public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

根據上面代碼能夠看到線程池隊列是 DelayedWorkQueuecode

 

二、原理分析

咱們主要看三個重要的函數,以下所示:blog

schedule(Runnable command, long delay,TimeUnit unit)

scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)

scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)

 

2.一、schedule(Runnable command, long delay,TimeUnit unit)方法

該方法做用是提交一個延遲執行的任務,任務從提交時間算起延遲 unit 單位的 delay 時間後開始執行,提交的任務不是週期性任務,任務只會執行一次,代碼以下:

public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    //(1)參數校驗
    if (command == null || unit == null)
        throw new NullPointerException();

    //(2)任務轉換
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    //(3)添加任務到延遲隊列
    delayedExecute(t);
    return t;
}

能夠看到上面代碼所示,代碼(1)參數校驗,若是 command 或者 unit 爲 null,拋出 NPE 異常。

代碼(2)裝飾任務,把提交的 command 任務轉換爲 ScheduledFutureTaskScheduledFutureTask 是具體放入到延遲隊列裏面的東西,因爲是延遲任務,因此 ScheduledFutureTask 實現了 long getDelay(TimeUnit unit) 和 int compareTo(Delayed other) 方法,triggerTime 方法轉換延遲時間爲絕對時間,也就是把當前時間的納秒數加上延遲的納秒數後的 long 型值。

接下來咱們須要看 ScheduledFutureTask的構造函數,以下所示:

ScheduledFutureTask(Runnable r, V result, long ns) {
  //調用父類FutureTask的構造函數
  super(r, result);
  this.time = ns;
  this.period = 0;//period爲0,說明爲一次性任務
  this.sequenceNumber = sequencer.getAndIncrement();
}

根據構造函數能夠看到內部首先調用了父類 FutureTask 的構造函數,父類 FutureTask 的構造函數代碼以下:

//經過適配器把runnable轉換爲callable
public FutureTask(Runnable runnable, V result) {
   this.callable = Executors.callable(runnable, result);
   this.state = NEW;       //設置當前任務狀態爲NEW
}

根據上面代碼能夠看到FutureTask 中任務又被轉換爲了 Callable 類型後,保存到了變量 this.callable 裏面,並設置 FutureTask 的任務狀態爲 NEW。

而後 ScheduledFutureTask 構造函數內部設置 time 爲上面說的絕對時間,須要注意這裏 period 的值爲 0,這說明當前任務爲一次性任務,不是定時反覆執行任務。

其中 long getDelay(TimeUnit unit) 方法代碼以下,用來獲取當前任務還有多少時間就過時了,代碼以下所示:

//元素過時算法,裝飾後時間-當前時間,就是即將過時剩餘時間
public long getDelay(TimeUnit unit) {
  return unit.convert(time - now(), NANOSECONDS);
}

 

接下來接着看compareTo(Delayed other) 方法,代碼以下:

public int compareTo(Delayed other) {
    if (other == this) // compare zero ONLY if same object
        return 0;
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long d = (getDelay(TimeUnit.NANOSECONDS) -
              other.getDelay(TimeUnit.NANOSECONDS));
    return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}

根據上面代碼的執行邏輯,能夠看到compareTo 做用是加入元素到延遲隊列後,內部創建或者調整堆時候會使用該元素的 compareTo 方法與隊列裏面其餘元素進行比較,讓最快要過時的元素放到隊首。因此不管何時向隊列裏面添加元素,隊首的的元素都是最即將過時的元素。

 

接下來接着看代碼(3)添加任務到延遲隊列,delayedExecute 的代碼以下:

private void delayedExecute(RunnableScheduledFuture<?> task) {

    //(4)若是線程池關閉了,則執行線程池拒絕策略
    if (isShutdown())
        reject(task);
    else {
        //(5)添加任務到延遲隊列
        super.getQueue().add(task);

        //(6)再次檢查線程池狀態
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            //(7)確保至少一個線程在處理任務
            ensurePrestart();
    }
}

能夠看到代碼(4)首先判斷當前線程池是否已經關閉了,若是已經關閉則執行線程池的拒絕策略(若是不知道線程池的拒絕策略能夠看前一篇線程池的介紹。)

否者執行代碼(5)添加任務到延遲隊列。添加完畢後還要從新檢查線程池是否被關閉了,若是已經關閉則從延遲隊列裏面刪除剛纔添加的任務,可是有可能線程池線程已經從任務隊列裏面移除了該任務,也就是該任務已經在執行了,因此還須要調用任務的 cancle 方法取消任務。

若是代碼(6)判斷結果爲 false,則會執行代碼(7)確保至少有一個線程在處理任務,即便核心線程數 corePoolSize 被設置爲 0.

ensurePrestart 代碼以下:

void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    //增長核心線程數
    if (wc < corePoolSize)
        addWorker(null, true);
    //若是初始化corePoolSize==0,則也添加一個線程。
    else if (wc == 0)
        addWorker(null, false);
    }
}

如上代碼首先首先獲取線程池中線程個數,若是線程個數小於核心線程數則新增一個線程,否者若是當前線程數爲 0 則新增一個線程。

經過上面代碼咱們分析瞭如何添加任務到延遲隊列,下面咱們看線程池裏面的線程如何獲取並執行任務的,從前面講解的 ThreadPoolExecutor 咱們知道具體執行任務的線程是 Worker 線程,Worker 線程裏面調用具體任務的 run 方法進行執行,因爲這裏任務是 ScheduledFutureTask,因此咱們下面看看 ScheduledFutureTask 的 run 方法。代碼以下:

public void run() {

    //(8)是否只執行一次
    boolean periodic = isPeriodic();

    //(9)取消任務
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    //(10)只執行一次,調用schdule時候
    else if (!periodic)
        ScheduledFutureTask.super.run();

    //(11)定時執行
    else if (ScheduledFutureTask.super.runAndReset()) {
        //(11.1)設置time=time+period
        setNextRunTime();

        //(11.2)從新加入該任務到delay隊列
        reExecutePeriodic(outerTask);
    }
}   

能夠看到代碼(8)isPeriodic 的做用是判斷當前任務是一次性任務仍是可重複執行的任務,isPeriodic 的代碼以下:

public boolean isPeriodic() {
     return period != 0;
}

可知內部是經過 period 的值來判斷,因爲轉換任務建立 ScheduledFutureTask 時候傳遞的 period 爲 0 ,因此這裏 isPeriodic 返回 false。

代碼(9)判斷當前任務是否應該被取消,canRunInCurrentRunState 的代碼以下:

boolean canRunInCurrentRunState(boolean periodic) {
        return isRunningOrShutdown(periodic ?
                                   continueExistingPeriodicTasksAfterShutdown :
                                   executeExistingDelayedTasksAfterShutdown);
}

這裏傳遞的 periodic 爲 false,因此 isRunningOrShutdown 的參數爲 executeExistingDelayedTasksAfterShutdownexecuteExistingDelayedTasksAfterShutdown 默認是 true 標示當其它線程調用了 shutdown 命令關閉了線程池後,當前任務仍是要執行,否者若是爲 false,標示當前任務要被取消。

因爲 periodic 爲 false,因此執行代碼(10)調用父類 FutureTask 的 run 方法具體執行任務,FutureTask 的 run 方法代碼以下:

public void run() {
        //(12)
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;

        //(13)
        try {

            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    //(13.1)
                    setException(ex);
                }
                //(13.2)
                if (ran)
                    set(result);
            }
        } finally {
            ...省略        
        }
    }

 

能夠看到代碼(12)若是任務狀態不是 NEW 則直接返回,或者若是當前任務狀態爲NEW可是使用 CAS 設置固然任務的持有者爲當前線程失敗則直接返回。代碼(13)具體調用 callable 的 call 方法執行任務,這裏在調用前又判斷了任務的狀態是否爲 NEW 是爲了不在執行代碼(12)後其餘線程修改了任務的狀態(好比取消了該任務)。

若是任務執行成功則執行代碼(13.2)修改任務狀態,set 方法代碼以下:

 protected void set(V v) {
        //若是當前任務狀態爲NEW,則設置爲COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            //設置當前任務終狀爲NORMAL,也就是任務正常結束
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
 }

如上代碼首先 CAS 設置當前任務狀態從 NEW 轉換到 COMPLETING,這裏多個線程調用時候只有一個線程會成功,成功的線程在經過 UNSAFE.putOrderedInt 設置任務的狀態爲正常結束狀態,這裏沒有用 CAS 是由於同一個任務只可能有一個線程能夠運行到這裏,這裏使用 putOrderedInt 比使用 CAS 函數或者 putLongVolatile 效率要高,而且這裏的場景不要求其它線程立刻對設置的狀態值可見。

這裏思考個問題,這裏何時多個線程會同時執行 CAS 設置任務狀態從態從 NEW 到 COMPLETING?其實當同一個 comand 被屢次提交到線程池時候就會存在這樣的狀況,因爲同一個任務共享一個狀態值 state。

若是任務執行失敗,則執行代碼(13.1),setException 的代碼以下,可見與 set 函數相似,代碼以下:

protected void setException(Throwable t) {
        //若是當前任務狀態爲NEW,則設置爲COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;

            //設置當前任務終態爲EXCEPTIONAL,也就是任務非正常結束
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); 
finishCompletion(); } }

到這裏代碼(10)邏輯執行完畢,一次性任務也就執行完畢了,

下面會講到若是任務是可重複執行的,則不會執行步驟(10)而是執行代碼(11)。

 

2.2  scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)方法

當任務執行完畢後,延遲固定間隔時間後再次運行(fixed-delay 任務):其中 initialDelay 說明提交任務後延遲多少時間開始執行任務 command,delay 表示當任務執行完畢後延長多少時間後再次運行 command 任務,unit 是 initialDelay 和 delay 的時間單位。任務會一直重複運行直到任務運行時候拋出了異常或者取消了任務,或者關閉了線程池。scheduleWithFixedDelay 的代碼以下:

 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        //(14)參數校驗
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();

        //(15)任務轉換,注意這裏是period=-delay<0
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,                                    triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        //(16)添加任務到隊列
        delayedExecute(t);
        return t;
}

 

如上代碼(14)進行參數校驗,校驗失敗則拋出異常,代碼(15)轉換 command 任務爲 ScheduledFutureTask,這裏須要注意的是這裏傳遞給 ScheduledFutureTask 的 period 變量的值爲 -delay,period < 0 這個說明該任務爲可重複執行的任務。而後代碼(16)添加任務到延遲隊列後返回。

任務添加到延遲隊列後線程池線程會從隊列裏面獲取任務,而後調用 ScheduledFutureTask的 run 方法執行,因爲這裏 period<0 因此 isPeriodic 返回 true,因此執行代碼(11),runAndReset 的代碼以下:

protected boolean runAndReset() {
        //(17)
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;

        //(18)
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {

          ...        
    }
        return ran && s == NEW;//(19)
}

該代碼和 FutureTask 的 run 相似,只是任務正常執行完畢後不會設置任務的狀態,這樣作是爲了讓任務成爲可重複執行的任務,這裏多了代碼(19)若是當前任務正常執行完畢而且任務狀態爲 NEW 則返回 true 否者返回 false。

若是返回了 true 則執行代碼(11.1)setNextRunTime 方法設置該任務下一次的執行時間,setNextRunTime 的代碼以下:

private void setNextRunTime() {
            long p = period;
            if (p > 0)//fixed-rate類型任務
                time += p;
            else//fixed-delay類型任務
                time = triggerTime(-p);
 }

 

如上代碼這裏 p < 0 說明當前任務爲 fixed-delay 類型任務,而後設置 time 爲當前時間加上 -p 的時間,也就是延遲 -p 時間後在次執行。

總結:本節介紹的 fixed-delay 類型的任務的執行實現原理以下,當添加一個任務到延遲隊列後,等 initialDelay 時間後,任務就會過時,過時的任務就會被從隊列移除,並執行,執行完畢後,會從新設置任務的延遲時間,而後在把任務放入延遲隊列實現的,依次往復。須要注意的是若是一個任務在執行某一個次時候拋出了異常,那麼這個任務就結束了,可是不影響其它任務的執行。

 

2.三、scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)方法

相對起始時間點固定頻率調用指定的任務(fixed-rate 任務):當提交任務到線程池後延遲 initialDelay 個時間單位爲 unit 的時間後開始執行任務 comand ,而後 initialDelay + period 時間點再次執行,而後在 initialDelay + 2 * period 時間點再次執行,依次往復,直到拋出異常或者調用了任務的 cancel 方法取消了任務在結束或者關閉了線程池。

scheduleAtFixedRate 的原理與 scheduleWithFixedDelay 相似,下面咱們講下不一樣點,首先調用 scheduleAtFixedRate 時候代碼以下:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    ...
    //裝飾任務類,注意period=period>0,不是負的
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    ...
     return t;
}

如上代碼 fixed-rate 類型的任務在轉換 command 任務爲 ScheduledFutureTask 的時候設置的 period=period 不在是 -period

因此當前任務執行完畢後,調用 setNextRunTime 設置任務下次執行的時間時候執行的是 time += p 而不在是 time = triggerTime(-p);

總結:相對於 fixed-delay 任務來講,fixed-rate 方式執行規則爲時間爲 initdelday + n*period; 時候啓動任務,可是若是當前任務尚未執行完,下一次要執行任務的時間到了,不會併發執行,下次要執行的任務會延遲執行,要等到當前任務執行完畢後在執行一個任務。

 

三、總結

 ScheduledThreadPoolExecutor 的實現原理,其內部使用的 DelayQueue來存放具體任務,其中任務分爲三種,其中一次性執行任務執行完畢就結束了,fixed-delay任務保證同一個任務屢次執行之間間隔固定時間,fixed-rate 任務保證任務執行按照固定的頻率執行,其中任務類型使用 period 的值來區分。

相關文章
相關標籤/搜索