JDK源碼分析-ScheduledThreadPoolExecutor

概述

ScheduledThreadPoolExecutor 也是一個線程池類,是線程池類 ThreadPoolExecutor 的子類。除了 ThreadPoolExecutor 相關的方法以外,它還增長了執行定時任務和週期性任務的方法。它的類簽名和繼承結構以下:app

public class ScheduledThreadPoolExecutor        extends ThreadPoolExecutor        implements ScheduledExecutorService {}異步

能夠看到,它繼承了 ThreadPoolExecutor 類(參考 「 JDK源碼分析-ThreadPoolExecutor 」),而且實現了 ScheduledExecutorService 接口(參考 「 JDK源碼分析-ScheduledExecutorService 」),所以具備兩者的特性。 下面分析其代碼實現。

代碼分析

內部嵌套類 DelayedWorkQueue

先看它的一個內部嵌套類 DelayedWorkQueue,該類是一個延遲隊列,它的類簽名和繼承結構以下:源碼分析

static class DelayedWorkQueue extends AbstractQueue<Runnable>    implements BlockingQueue<Runnable> {}flex

Delay edWorkQue ue 類與前文分析的 DelayQueue 「JDK源碼分析-DelayQueue」實現原理相似 ,這裏就再也不贅述。

構造器

ScheduledThreadPoolExecutor 有以下四個構造器:ui

public ScheduledThreadPoolExecutor(int corePoolSize) {    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,          new DelayedWorkQueue());}
public ScheduledThreadPoolExecutor(int corePoolSize,                                   ThreadFactory threadFactory) {    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,          new DelayedWorkQueue(), threadFactory);}
public ScheduledThreadPoolExecutor(int corePoolSize,                                   RejectedExecutionHandler handler) {    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,          new DelayedWorkQueue(), handler);}
public ScheduledThreadPoolExecutor(int corePoolSize,                                   ThreadFactory threadFactory,                                   RejectedExecutionHandler handler) {    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,          new DelayedWorkQueue(), threadFactory, handler);}
this

這幾個構造器都是直接調用父類 ThreadPoolExecutor 的構造器,只是傳入了不一樣的參數。而其中的參數 workQueue 都傳入了上面的延遲隊列 DelayedWorkQueue。

內部類 ScheduledFutureTask

ScheduledThreadPoolExecutor 還有一個內部類 ScheduledFutureTask,它的繼承結構以下:

它繼承了 FutureTask 類(可參考前文「JDK源碼分析-FutureTask」的分析),且實現了 RunnableScheduledFuture 接口,該接口定義以下:spa

public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {    // 一個任務是否週期性執行的,如果則能夠重複執行;不然只能運行一次    boolean isPeriodic();}線程

RunnableScheduledFuture 只定義了一個方法 isPeriodic,該方法用於判斷一個任務是不是週期性執行的。它繼承的 RunnableFuture 接口在前文 FutureTask 類中已進行分析,而 ScheduledFuture 接口如下:3d


它的內部並未定義方法,只是整合了 Delayed 接口和 Future 接口,Delayed 接口前文也已分析,下面分析該類的主要代碼。

先看它的主要成員變量:rest

// 定時任務執行的時間(單位:納秒)private long time;
/** * 重複執行的任務的時間間隔(單位:納秒) * 正數表示固定頻率(fixed-rate)執行 * 負數表示固定延遲(fixed-delay)執行 * 零表示非重複執行的任務 */private final long period;
// reExecutePeriodic 方法中從新排隊的任務RunnableScheduledFuture<V> outerTask = this;
// 延遲隊列中的索引位置,便於快速取消int heapIndex;

構造器:

/** * 構造器一:用給定的觸發時間(納秒),建立一個一次性任務 */ScheduledFutureTask(Runnable r, V result, long ns) {    super(r, result);    this.time = ns;    this.period = 0;    this.sequenceNumber = sequencer.getAndIncrement();}
/** * 構造器二:用給定的觸發時間和間隔(納秒),建立一個週期性任務 */ScheduledFutureTask(Runnable r, V result, long ns, long period) {    super(r, result);    this.time = ns;    this.period = period;    this.sequenceNumber = sequencer.getAndIncrement();}
/** * 構造器三:用給定的觸發時間(納秒),建立一個一次性任務 */ScheduledFutureTask(Callable<V> callable, long ns) {    super(callable);    this.time = ns;    this.period = 0;    this.sequenceNumber = sequencer.getAndIncrement();}

ScheduledFutureTask 有三個構造器,可分爲兩類:分別是建立一次性任務(一和三)和週期性任務(二)。 其中一和 三仍是 Runnable 和 Callable 的區別。

該類是一個任務類,即 Runnable 接口的實現類,所以它最核心的就是 run 方法,以下:

public void run() {    // 是否爲週期性任務    boolean periodic = isPeriodic();    // 若任務不能執行,則取消    if (!canRunInCurrentRunState(periodic))        cancel(false);    // 若爲非週期性任務    else if (!periodic)        // 若爲週期性任務,調用 ScheduledFutureTask 的父類(即 FutureTask)的 run 方法執行        ScheduledFutureTask.super.run();    // 若爲週期性任務,調用 ScheduledFutureTask 的父類(即 FutureTask)的 runAndReset 方法執行    else if (ScheduledFutureTask.super.runAndReset()) {        setNextRunTime(); // 設置下一次執行時間        reExecutePeriodic(outerTask); // 週期性執行    }}

reExecutePeriodic 方法以下:

/** * 該方法主要是將週期性任務從新排隊 * 它的實現與 delayedExecute 方法(後面分析)邏輯有些相似 */void reExecutePeriodic(RunnableScheduledFuture<?> task) {    if (canRunInCurrentRunState(true)) {        super.getQueue().add(task);        if (!canRunInCurrentRunState(true) && remove(task))            task.cancel(false);        else            ensurePrestart();    }}


schedule & scheduleAtFixedRate & scheduleWithFixedDelay

這幾個就是執行定時任務和週期性任務的方法,它們是對前文 「JDK源碼分析-ScheduledExecutorService」接口所定義的方法實現,可參考前文的分析。

schedule 方法 1:其做用是延遲指定的時間後執行任務(即執行定時任務),只會執行一次。

public ScheduledFuture<?> schedule(Runnable command,                                   long delay,                                   TimeUnit unit) {    if (command == null || unit == null)        throw new NullPointerException();    // 把用戶提交的 Runnable 對象包裝爲 RunnableScheduledFuture 對象    // decorateTask 方法默認返回第二個參數    // decorateTask 方法的修飾符是 protected,可根據需求自行擴展    RunnableScheduledFuture<?> t = decorateTask(command,        new ScheduledFutureTask<Void>(command, null,                                      triggerTime(delay, unit)));    // 執行給定的任務    delayedExecute(t);    return t;}

delayExecute 方法:

/* * 延遲或週期性任務的主要執行方法。 * 若線程池已關閉,則拒絕該任務(執行拒絕策略); * 不然將任務添加到工做隊列,如有須要啓動一個線程去執行。 * 若在添加任務時關閉了線程池,則將其從隊列移除並取消該任務 */private void delayedExecute(RunnableScheduledFuture<?> task) {    // 若線程池已關閉,則執行拒絕策略    if (isShutdown())        reject(task);    else {        // 將該任務添加到任務隊列(即前面的延遲隊列)        super.getQueue().add(task);        // 若當前任務沒法執行,則將其從隊列移除而且取消執行(相似事務的回滾操做)        if (isShutdown() &&            !canRunInCurrentRunState(task.isPeriodic()) &&            remove(task))            task.cancel(false);        // 任務能夠執行,如有須要新增線程以執行該任務        else            ensurePrestart();    }}

schedule 方法 2:

public <V> ScheduledFuture<V> schedule(Callable<V> callable,                                       long delay,                                       TimeUnit unit) {    if (callable == null || unit == null)        throw new NullPointerException();    RunnableScheduledFuture<V> t = decorateTask(callable,        new ScheduledFutureTask<V>(callable,                                   triggerTime(delay, unit)));    delayedExecute(t);    return t;}

該方法與前者相似,差異在於這裏的參數類型是 Callable,前者是 Runnable 類型,其餘操做同樣。

scheduleAtFixedRate 方法:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,                                              long initialDelay,                                              long period,                                              TimeUnit unit) {    if (command == null || unit == null)        throw new NullPointerException();    if (period <= 0)        throw new IllegalArgumentException();    // 將 Runnable 對象包裝爲 ScheduledFutureTask 對象    ScheduledFutureTask<Void> sft =        new ScheduledFutureTask<Void>(command,                                      null,                                      triggerTime(initialDelay, unit),                                      unit.toNanos(period));    RunnableScheduledFuture<Void> t = decorateTask(command, sft);    sft.outerTask = t;    delayedExecute(t);    return t;}

該方法與前面的 schedule 方法相似,區別僅在於使用了不一樣的 ScheduledFutureTask 對象,其餘的執行流程幾乎同樣。

scheduleWithFixedDelay 方法:

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,                                                 long initialDelay,                                                 long delay,                                                 TimeUnit unit) {    if (command == null || unit == null)        throw new NullPointerException();    if (delay <= 0)        throw new IllegalArgumentException();    ScheduledFutureTask<Void> sft =        new ScheduledFutureTask<Void>(command,                                      null,                                      triggerTime(initialDelay, unit),                                      unit.toNanos(-delay));    RunnableScheduledFuture<Void> t = decorateTask(command, sft);    sft.outerTask = t;    delayedExecute(t);    return t;}

該方法與  scheduleAtFixedRate 方法基本同樣,區別僅在於構建 ScheduledFutureTask 對象時參數 period 不一樣(一正一負,用以區分類型)。

execute & submit 方法

這兩個方法是 Executor 接口和 ExecutorService 接口所定義的方法,代碼實現以下:

public void execute(Runnable command) {    schedule(command, 0, NANOSECONDS);}
public Future<?> submit(Runnable task) {    return schedule(task, 0, NANOSECONDS);}
它們內部直接調用了 schedule(Runnable) 方法。另外兩個 submit 方法

public <T> Future<T> submit(Runnable task, T result) {    return schedule(Executors.callable(task, result), 0, NANOSECONDS);}
public <T> Future<T> submit(Callable<T> task) {    return schedule(task, 0, NANOSECONDS);}

它們部直接調用了 schedule(Callable) 方法。

小結

1. ScheduledThreadPoolExecutor 是線程池的實現類之一;
2. 它繼承自 ThreadPoolExecutor,並實現了 ScheduledExecutorService 接口;
3. 提供了異步提交任務的 execute 方法和 submit 方法;
4. 供了執行定時任務的 schedule 方法和週期性任務的 scheduleAtFixedRate/scheduleWithFixedDelay 方法(使用延遲隊列實現)。


相關閱讀:
JDK源碼分析-ThreadPoolExecutor
JDK源碼分析-ScheduledExecutorService
JDK源碼分析-DelayQueue
JDK源碼分析-FutureTask


相關文章
相關標籤/搜索