併發編程 —— ScheduledThreadPoolExecutor

1. 前言

在前面的文章中,咱們介紹了定時任務類 Timer ,他是 JDK 1.3 中出現的,位於 java.util 包下。而今天說的 ScheduledThreadPoolExecutor的是在 JUC 包下,是 JDK1.5 新增的。java

今天就來講說這個類。程序員

2. API 介紹

該類內部結構和 Timer仍是有點相似的,也是 3 個類:數據結構

  • ScheduledThreadPoolExecutor:程序員使用的接口。
  • DelayedWorkQueue : 存儲任務的隊列。
  • ScheduledFutureTask : 執行任務的線程。

構造方法介紹this

// 使用給定核心池大小建立一個新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize)  
// 使用給定初始參數建立一個新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler)  
// 使用給定的初始參數建立一個新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory)  
// 使用給定初始參數建立一個新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler)  
複製代碼

ScheduledThreadPoolExecutor最多支持 3 個參數:核心線程數量,線程工廠,拒絕策略。spa

爲何沒有最大線程數量?因爲 ScheduledThreadPoolExecutor 內部是個無界隊列,maximumPoolSize 也就沒有意思了。線程

再介紹一下他的 API 方法,請原諒我將 JDK 文檔照抄過來了,就當是備忘吧,以下:rest

protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) // 修改或替換用於執行 callable 的任務。 protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) // 修改或替換用於執行 runnable 的任務。  void execute(Runnable command) // 使用所要求的零延遲執行命令。  boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() // 獲取有關在此執行程序已 shutdown 的狀況下、是否繼續執行現有按期任務的策略。  boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() // 獲取有關在此執行程序已 shutdown 的狀況下是否繼續執行現有延遲任務的策略。  BlockingQueue<Runnable> getQueue() // 返回此執行程序使用的任務隊列。  boolean remove(Runnable task) // 從執行程序的內部隊列中移除此任務(若是存在),從而若是還沒有開始,則其再也不運行。  <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) // 建立並執行在給定延遲後啓用的 ScheduledFuture。  ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) // 建立並執行在給定延遲後啓用的一次性操做。  ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) // 建立並執行一個在給定初始延遲後首次啓用的按期操做,後續操做具備給定的週期;也就是將在 initialDelay 後開始執行,而後在 initialDelay+period 後執行,接着在 initialDelay + 2 * period 後執行,依此類推。  ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) // 建立並執行一個在給定初始延遲後首次啓用的按期操做,隨後,在每一次執行終止和下一次執行開始之間都存在給定的延遲。  void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) // 設置有關在此執行程序已 shutdown 的狀況下是否繼續執行現有按期任務的策略。  void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) // 設置有關在此執行程序已 shutdown 的狀況下是否繼續執行現有延遲任務的策略。  void shutdown() // 在之前已提交任務的執行中發起一個有序的關閉,可是不接受新任務。 List<Runnable> shutdownNow() // 嘗試中止全部正在執行的任務、暫停等待任務的處理,並返回等待執行的任務列表。  <T> Future<T> submit(Callable<T> task) // 提交一個返回值的任務用於執行,返回一個表示任務的未決結果的 Future。  Future<?> submit(Runnable task) // 提交一個 Runnable 任務用於執行,並返回一個表示該任務的 Future。  <T> Future<T> submit(Runnable task, T result) // 提交一個 Runnable 任務用於執行,並返回一個表示該任務的 Future。 複製代碼

最常用的幾個方法以下:code

// 使用給定核心池大小建立一個新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize)  

// 建立並執行在給定延遲後啓用的一次性操做。 
ScheduledFuture<?>	schedule(Runnable command, long delay, TimeUnit unit) 
  
// 建立並執行一個在給定初始延遲後首次啓用的按期操做,後續操做具備給定的週期;也就是將在 initialDelay 後開始執行,而後在 initialDelay+period 後執行,接着在 initialDelay + 2 * period 後執行,依此類推。 
ScheduledFuture<?>	scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) 

// 建立並執行一個在給定初始延遲後首次啓用的按期操做,隨後,在每一次執行終止和下一次執行開始之間都存在給定的延遲。 
ScheduledFuture<?>	scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) 
複製代碼

除了默認的構造方法,還有 3 個 schedule 方法。咱們將分析他們內部的實現。cdn

3. 構造方法內部實現

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
複製代碼

咱們感興趣的就是這個 DelayedWorkQueue 隊列了。他也是一個阻塞隊列。這個隊列的數據結構是堆。同時,這個 queue 也是可比較的,比較什麼呢?任務必須實現 compareTo 方法,這個方法的比較邏輯是:比較任務的執行時間,若是任務的執行時間相同,則比較任務的加入時間。對象

所以,ScheduledFutureTask 有 2 個變量:

  • time : 任務的執行時間。
  • sequenceNumber:任務的加入時間。

這兩個變量就是用來比較任務的執行順序的。整個調度的順序就是這個邏輯。

4. 幾個 schedule 方法的的區別

剛剛說了,有 3 個 schedule 方法:

  1. ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) 建立並執行在給定延遲後啓用的一次性操做。

  2. ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) 建立並執行一個在給定初始延遲後首次啓用的按期操做,後續操做具備給定的週期;也就是將在initialDelay 後開始執行,而後在 initialDelay+period後執行,接着在initialDelay + 2 * period後執行,依此類推。

  3. ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) 建立並執行一個在給定初始延遲後首次啓用的按期操做,隨後,在每一次執行終止和下一次執行開始之間都存在給定的延遲。

第一個方法執行在給定的時間後,執行一次就結束。

有點意思的地方是 第二個方法和 第三個方法,他們直接的區別。

這兩個方法均可以重複的調用。可是,重複調用的邏輯有所區別,這裏就是比 Timer 好用的地方。

他們的共同點在於:必須等待上個任務執行完畢才能執行下個任務。

不一樣點在於:他們調度的時間粗略是不一樣的。

scheduleAtFixedRate 方法的執行週期都是固定的,也就是,他是以上一個任務的開始執行時間做爲起點,加上以後的 period 時間,調度下次任務。

scheduleWithFixedDelay 方法則是以上一個任務的結束時間做爲起點,加上以後的 period 時間,調度下次任務。

有什麼區別呢?

如何任務執行時間很短,那就沒上面區別。可是,若是任務執行時間很長,超過了 period 時間,那麼區別就出來了。

咱們假設一下。

咱們設置 period 時間爲 2 秒,而任務耗時 5 秒。

這個兩個方法的區別就體現出來了。

scheduleAtFixedRate 方法將會在上一個任務結束完畢馬上執行,他和上一個任務的開始執行時間的間隔是 5 秒(由於必須等待上一個任務執行完畢)。

scheduleWithFixedDelay 方法將會在上一個任務結束後,注意:**再等待 2 秒,**纔開始執行,那麼他和上一個任務的開始執行時間的間隔是 7 秒。

因此,咱們在使用 ScheduledThreadPoolExecutor 的過程當中須要注意任務的執行時間不能超過間隔時間,若是超過了,最好使用scheduleAtFixedRate 方法,防止任務堆積。

固然,也和具體的業務有關。不能一律而論。但必定要注意這兩個方法的區別。

5. scheduled 方法的實現

咱們看看 scheduleAtFixedRate 方法的內部實現。

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}
複製代碼

建立一個 ScheduledFutureTask 對象,而後裝飾一個這個Future ,該類實現是直接返回,子類能夠有本身的實現,在這個任務外裝飾一層。

而後執行 delayedExecute 方法,最後返回 Future

這個 ScheduledFutureTask 實現了不少接口,好比 FutureRunnableComparableDelayed 等。

ScheduledFutureTask 的構造方法以下:

ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    super(r, result);
    this.time = ns;
    this.period = period;
    this.sequenceNumber = sequencer.getAndIncrement();
}

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}

static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}
複製代碼

層層遞進,該類首先經過一個原子靜態 int對象這隻任務的入隊編號,而後建立一個 Callable,這個 Callable 是一個適配器,適配了RunnableCallable,也就是將Runnable包裝成 callabe, 他的 call方法就是調用給定任務的 run 方法。固然,這裏的 result是沒有什麼做用的。

若是你傳遞的是一個 callable ,那麼,就調用 FutureTaskrun方法,設置真正的返回值。

這裏使用了適配器模式,仍是挺有趣的。

總的來講,這個 ScheduledFutureTask 基於 FutureTask, 關於 FutureTask 咱們以前從源碼介紹過了。

而他本身重寫了幾個方法:compareTogetDelayrunisPeriodic4 個方法。

咱們仍是要看看 delayedExecute 的實現。

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);
    else {
        // 添加進隊列。
        super.getQueue().add(task);
        // 若是線程池關閉了,且不能夠在當前狀態下運行任務,且從隊列刪除任務成功,就給任務打上取消標記。
        // 第二個判斷是由兩個變量控制的(下面是默認值):
        // continueExistingPeriodicTasksAfterShutdown = false 表示關閉的時候應取消週期性任務。默認關閉
        // executeExistingDelayedTasksAfterShutdown = true。表示關閉的時候應取消非週期性的任務。默認不關閉。
        // running 狀態下,canRunInCurrentRunState 一定返回 ture。
        // 非 running 狀態下,canRunInCurrentRunState 根據上面的兩個值返回。
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            // 開始執行任務
            ensurePrestart();
    }
}
複製代碼

說說上面的方法。

  1. 判斷是否關閉,關閉則拒絕任務。
  2. 若是不是 關閉狀態,則添加進隊列,而添加隊列的順序咱們以前講過了,根據 ScheduledFutureTaskcompareTo 方法來的,先比較執行時間,再比較添加順序。
  3. 若是這個過程當中線程池關閉了,則判斷此時是否應該取消任務,根據兩個變量來的,註釋裏面寫了。默認的策略是,若是是週期性的任務,就取消,反之不取消。
  4. 若是沒有關閉線程池。就調用線程池裏的線程執行任務。

總體的過程如圖:

image.png

注意上面的圖,若是是週期性的任務,則會在執行完畢後,歸還隊列。

從哪裏能夠看出來呢?

ScheduledFutureTaskrun 方法:

public void run() {
    // 是不是週期性任務
    boolean periodic = isPeriodic();
    // 若是不能夠在當前狀態下運行,就取消任務(將這個任務的狀態設置爲CANCELLED)。
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    // 若是不是週期性的任務,調用 FutureTask # run 方法
    else if (!periodic)
        ScheduledFutureTask.super.run();
    // 若是是週期性的。
    // 執行任務,但不設置返回值,成功後返回 true。(callable 不能夠重複執行)
    else if (ScheduledFutureTask.super.runAndReset()) {
        // 設置下次執行時間
        setNextRunTime();
        // 再次將任務添加到隊列中
        reExecutePeriodic(outerTask);
    }
}
複製代碼

邏輯以下:

  1. 若是不能再當前狀態下運行了,就取消這個任務。
  2. 若是不是週期性的任務,就執行 FutureTaskrun 方法。
  3. 若是是週期性的任務,就須要執行 runAndReset方法。
  4. 執行完畢後,重寫設置當前任務的下次執行時間,而後添加進隊列中。

而管理整個執行過程的就是ScheduledThreadPoolExecutor的父類 ThreadPoolExecutorrunWorker方法。其中,該方法會從隊列中取出數據,也就是調用隊列的 take 方法。

關於DelayedWorkQueuetake方法,其中有個 leader 變量,若是 leader 不是空,說明已經有線程在等待了,那就阻塞當前線程,若是是空,說明,隊列的第一個元素已經被更新了,就設置當前線程爲 leader.

這是一個 Leader-Follower 模式,Doug Lea 說的。

固然,take方法總體的邏輯仍是不變的。從隊列的頭部拿數據。使用 Condition 作線程之間的協調。

5. 總結

關於 ScheduledThreadPoolExecutor 調度類,咱們分析的差很少了,總結一下。

ScheduledThreadPoolExecutor 是個定時任務線程池,相似 Timer,可是比 Timer強大,健壯。

好比不會像 Timer 那樣,任務異常了,整個調度系統就完全無用了。

也比Timer 多了Rate 模式(Rate 和 Delay)。

這兩種模式的區別就是任務執行的起點時間不一樣,Rate是從上一個任務的開始執行時間開始計算;Delay 是從上一個任務的結束時間開始計算。

所以,若是任務自己的時間超過了間隔時間,那麼這兩種模式的間隔時間將會不一致。

而任務的排序是經過 ScheduledFutureTaskcompareTo方法排序的,規則是先比較執行時間,若是時間相同,再比較加入時間。

還要注意一點就是:若是任務執行過程當中異常了,那麼將不會再次重複執行。由於 ScheduledFutureTaskrun方法沒有作catch處理。因此程序員須要手動處理,相對於 Timer 異常就直接費了調度系統來講,要好不少。

ScheduledThreadPoolExecutor 的是實現基於 ThreadPoolExecutor,大部分功能都是重用的父類,只是本身在執行完畢以後,從新設置時間,並再次將任務還到了隊列中,造成了定時任務。

相關文章
相關標籤/搜索