併發系列(7)之 ScheduledThreadPoolExecutor 詳解

文本將主要講述 ThreadPoolExecutor 一個特殊的子類 ScheduledThreadPoolExecutor,主要用於執行週期性任務;因此在看本文以前最好先了解一下 ThreadPoolExecutor ,能夠參考 ThreadPoolExecutor 詳解;另外 ScheduledThreadPoolExecutor 中使用了延遲隊列,主要是基於徹底二叉堆實現的,能夠參考 徹底二叉堆html

1、ScheduledThreadPoolExecutor 結構概述

1. 繼承關係

public class ScheduledThreadPoolExecutor 
  extends ThreadPoolExecutor implements ScheduledExecutorService {}

scheduledthreadexecutor

在源碼中能夠看到,ScheduledThreadPoolExecutor 的狀態管理、入隊操做、拒絕操做等都是繼承於 ThreadPoolExecutorScheduledThreadPoolExecutor 主要是提供了週期任務和延遲任務相關的操做;java

  • schedule(Runnable command, long delay, TimeUnit unit) // 無返回值的延遲任務
  • schedule(Callable callable, long delay, TimeUnit unit) // 有返回值的延遲任務
  • scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) // 固定頻率週期任務
  • scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) // 固定延遲週期任務



ScheduledThreadPoolExecutor 的運行邏輯而言,大體能夠表述爲:dom

  • 首先將 Runnable/Callable 封裝爲 ScheduledFutureTask,延遲時間做爲比較屬性;
  • 而後加入 DelayedWorkQueue 隊列中,每次取出隊首延遲最小的任務,超時等待,而後執行;
  • 最後判斷是否爲週期任務,而後將其從新加入 DelayedWorkQueue 隊列中;

其內部結構如圖所示:ide

scheduledthreadexecutor

這裏須要注意的:函數

  • ScheduledThreadPoolExecutor 中的隊列不能指定,只能是 DelayedWorkQueue;由於他是 無界隊列,因此再添加任務的時候線程最多能夠增長到 coreSize,這裏不清楚的能夠查看 ThreadPoolExecutor 詳解 ,就再也不重複了;
  • ScheduledThreadPoolExecutor 重寫了 ThreadPoolExecutor 的 execute() 方法,其執行的核心方法變成 delayedExecute()


2. ScheduledFutureTask

private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
  private final long sequenceNumber;  // 任務序號,從 AtomicLong sequencer 獲取,當延遲時間相同時,序號小的先出
  private long time;                  // 下次任務執行時間
  private final long period;          // 0 表示非週期任務,正值表示固定頻率週期任務,負值表示固定延遲週期任務
  RunnableScheduledFuture<V> outerTask = this;  // 重複執行的任務,傳入的任務可使用 decorateTask() 從新包裝
  int heapIndex;                      // 隊列索引
}

scheduledfuturetask

其中最重要的方法必然是 run 方法了:源碼分析

public void run() {
  boolean periodic = isPeriodic();    // 是否爲週期任務,period != 0
  if (!canRunInCurrentRunState(periodic))  // 當前狀態可否繼續運行,詳細測試後面還會講到
    cancel(false);     // 取消任務
  else if (!periodic)  // 不是週期任務時,直接運行
    ScheduledFutureTask.super.run();
  else if (ScheduledFutureTask.super.runAndReset()) {  // 時週期任務
    setNextRunTime();              // 設置下次執行時間
    reExecutePeriodic(outerTask);  // 從新入隊
  }
}
public boolean cancel(boolean mayInterruptIfRunning) {
  boolean cancelled = super.cancel(mayInterruptIfRunning);  // 設置中斷狀態
  if (cancelled && removeOnCancel && heapIndex >= 0)        // 當設置 removeOnCancel 狀態時,移除任務
    remove(this);                                           // 默認爲 false
  return cancelled;
}
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
  if (canRunInCurrentRunState(true)) {  // 若是當前狀態能夠執行
    super.getQueue().add(task);         // 則從新入隊
    if (!canRunInCurrentRunState(true) && remove(task))
      task.cancel(false);
    else ensurePrestart();              // 確保有線程執行任務 
  }
}

此外還有 DelayedWorkQueue,可是這裏不許備講了,能夠查看 徹底二叉堆 瞭解實現的原理;測試


2、scheduleAtFixedRate 與 scheduleWithFixedDelay

scheduleAtFixedRatescheduleWithFixedDelay 是咱們最經常使用的兩個方法,可是他們的區別可能不是很清楚,這裏重點講一下,this

1. scheduleAtFixedRate

// 測試
ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1);
pool.scheduleAtFixedRate(() -> {
  sleep(1000);               // 睡眠 1s,
  log.info("run task");
}, 1, 2, TimeUnit.SECONDS);  // 延遲 1s,週期 2s

// 打印
[19:41:28,489 INFO ] [pool-1-thread-1] - run task
[19:41:30,482 INFO ] [pool-1-thread-1] - run task
[19:41:32,483 INFO ] [pool-1-thread-1] - run task
[19:41:34,480 INFO ] [pool-1-thread-1] - run task線程

能夠看到的確時固定週期 2s 執行的,可是若是任務執行時間超過週期呢?3d

// 測試
ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1);
pool.scheduleAtFixedRate(() -> {
  int i = 2000 + random.nextInt(3) * 1000;
  sleep(i);
  log.info("run task, sleep :{}", i);
}, 1, 2, TimeUnit.SECONDS);  // 延遲 1s,週期 2s

// 打印
[19:42:53,428 INFO ] [pool-1-thread-1] - run task, sleep :2000
[19:42:55,430 INFO ] [pool-1-thread-1] - run task, sleep :2000
[19:42:59,430 INFO ] [pool-1-thread-1] - run task, sleep :4000
[19:43:02,434 INFO ] [pool-1-thread-1] - run task, sleep :3000
[19:43:06,436 INFO ] [pool-1-thread-1] - run task, sleep :4000

能夠看到若是任務執行時間超出週期時,下一次任務會馬上運行;就好像週期是一個有彈性的袋子,能裝下運行時間的時候,是固定大小,裝不下的時候就會被撐大,圖像化表示以下:

scheduledfuturetask


2. scheduleWithFixedDelay

// 測試
ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1);
pool.scheduleAtFixedRate(() -> {
  int i = 1000 + random.nextInt(5) * 1000;
  sleep(i);
  log.info("run task, sleep :{}", i);
}, 1, 2, TimeUnit.SECONDS);  // 延遲 1s,週期 2s

// 打印
[20:05:40,682 INFO ] [pool-1-thread-1] - run task, sleep :1000
[20:05:45,686 INFO ] [pool-1-thread-1] - run task, sleep :3000
[20:05:49,689 INFO ] [pool-1-thread-1] - run task, sleep :2000
[20:05:55,690 INFO ] [pool-1-thread-1] - run task, sleep :4000
[20:06:01,692 INFO ] [pool-1-thread-1] - run task, sleep :4000

能夠看到不管執行時間是多少,其結果都是在執行完畢後,停頓固定的時間,而後執行下一次任務,其圖形化表示爲:

scheduledfuturetask


3、 源碼分析

1. 延遲任務

public void execute(Runnable command) {
  schedule(command, 0, NANOSECONDS);
}

public <T> Future<T> submit(Callable<T> task) {
  return schedule(task, 0, NANOSECONDS);
}

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
  if (command == null || unit == null) throw new NullPointerException();
  RunnableScheduledFuture<?> t = decorateTask(
    command,new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
  delayedExecute(t);
  return t;
}

public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
  if (callable == null || unit == null) throw new NullPointerException();
  RunnableScheduledFuture<V> t = decorateTask(
    callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit)));
  delayedExecute(t);
  return t;
}

能夠看到全部的週期任務,最終執行的都是 delayedExecute 方法,其中 decorateTask 是一個鉤子函數,其之類能夠利用他對任務進行重構過濾等操做;

private void delayedExecute(RunnableScheduledFuture<?> task) {
  if (isShutdown()) reject(task);  // 若是線程池已經關閉,則拒絕任務
  else {
    super.getQueue().add(task);    // 任務入隊
    if (isShutdown() &&            // 再次檢查,線程池是否關閉
      !canRunInCurrentRunState(task.isPeriodic()) &&
      remove(task))
      task.cancel(false);
    else
      ensurePrestart();            // 確保有線程執行任務
  }
}


2. 週期任務

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
                                              long period, TimeUnit unit) {
  if (command == null || unit == null) throw new NullPointerException();
  if (period <= 0) throw new IllegalArgumentException();
  
  ScheduledFutureTask<Void> sft =
    new ScheduledFutureTask<Void>(command,
                    null,
                    triggerTime(initialDelay, unit),
                    unit.toNanos(period));  // 注意這裏添加的是正值
  
  RunnableScheduledFuture<Void> t = decorateTask(command, sft);
  sft.outerTask = t;
  delayedExecute(t);
  return t;
}

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
                                                 long delay, TimeUnit unit) {
  if (command == null || unit == null) throw new NullPointerException();
  if (delay <= 0) throw new IllegalArgumentException();
  
  ScheduledFutureTask<Void> sft =
    new ScheduledFutureTask<Void>(command,
                    null,
                    triggerTime(initialDelay, unit),
                    unit.toNanos(-delay));  // 注意這裏添加的是負值
  
  RunnableScheduledFuture<Void> t = decorateTask(command, sft);
  sft.outerTask = t;
  delayedExecute(t);
  return t;
}

從上面代碼能夠看到 scheduleAtFixedRatescheduleWithFixedDelay 只有週期任務的時間不一樣,其餘的都同樣,那麼下面咱們看一下他們的任務時間計算;

public long getDelay(TimeUnit unit) {
  return unit.convert(time - now(), NANOSECONDS);
}

private void setNextRunTime() {
  long p = period;
  if (p > 0)    // 正值表示 scheduleAtFixedRate
    time += p;  // 無論任務執行時間,直接加上週期時間,也就是一次任務超時,會影響後續任務的執行,
                // 超時的時候,getDelay 是負值,因此在延遲隊列中必然排在最前面,馬上被取出執行
  else
    time = triggerTime(-p);  // 計算觸發時間
}

long triggerTime(long delay) {  // 這裏能夠看到,每次的確是在當前時間的基礎上,加上延遲時間;
  return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));  
}

這裏特別要注意 scheduleAtFixedRate 一次任務超時,會持續影響後面的任務週期安排,因此在設定週期的時候要特別注意; 例如:

// 測試
ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1);
pool.scheduleAtFixedRate(() -> {
    int i = random.nextInt(5) * 1000;
    sleep(i);
    log.info("run task, sleep :{}", i);
}, 1, 2, TimeUnit.SECONDS);

// 打印
[20:29:11,310 INFO ] [pool-1-thread-1] - run task, sleep :1000
[20:29:16,304 INFO ] [pool-1-thread-1] - run task, sleep :4000
[20:29:19,304 INFO ] [pool-1-thread-1] - run task, sleep :3000
[20:29:21,305 INFO ] [pool-1-thread-1] - run task, sleep :2000
[20:29:22,305 INFO ] [pool-1-thread-1] - run task, sleep :1000
[20:29:23,306 INFO ] [pool-1-thread-1] - run task, sleep :1000
[20:29:27,306 INFO ] [pool-1-thread-1] - run task, sleep :4000
[20:29:30,307 INFO ] [pool-1-thread-1] - run task, sleep :3000

如圖所示:

scheduledfuturetask


3. 取消任務

private volatile boolean continueExistingPeriodicTasksAfterShutdown; //關閉後繼續執行週期任務,默認false
private volatile boolean executeExistingDelayedTasksAfterShutdown = true; //關閉後繼續執行延遲任務,默認true
private volatile boolean removeOnCancel = false;  // 取消任務是,從隊列中刪除任務,默認 false

@Override void onShutdown() {
    BlockingQueue<Runnable> q = super.getQueue();
    boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();    // 繼續延遲任務
    boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); // 繼續週期任務
    if (!keepDelayed && !keepPeriodic) {  // 都是 false,直接清除
        for (Object e : q.toArray())
            if (e instanceof RunnableScheduledFuture<?>)
                ((RunnableScheduledFuture<?>) e).cancel(false);
        q.clear();
    }
    else {
        // Traverse snapshot to avoid iterator exceptions
        for (Object e : q.toArray()) {
            if (e instanceof RunnableScheduledFuture) {
                RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
                if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
                    t.isCancelled()) { // also remove if already cancelled
                    if (q.remove(t))
                        t.cancel(false);
                }
            }
        }
    }
    tryTerminate();
}


總結

  • scheduleAtFixedRate,固定頻率週期任務,注意一次任務超時,會持續的影響後續的任務週期;
  • scheduleWithFixedDelay,固定延遲週期任務,即每次任務結束後,超時等待固定時間;
  • 此外 ScheduledThreadPoolExecutor 線程最多爲核心線程,最大線程數不起做用,由於 DelayedWorkQueue 是無界隊列;
相關文章
相關標籤/搜索