深刻理解Java線程池:ScheduledThreadPoolExecutor

介紹

自JDK1.5開始,JDK提供了ScheduledThreadPoolExecutor類來支持週期性任務的調度。在這以前的實現須要依靠Timer和TimerTask或者其它第三方工具來完成。但Timer有很多的缺陷:java

  • Timer是單線程模式;
  • 若是在執行任務期間某個TimerTask耗時較久,那麼就會影響其它任務的調度;
  • Timer的任務調度是基於絕對時間的,對系統時間敏感;
  • Timer不會捕獲執行TimerTask時所拋出的異常,因爲Timer是單線程,因此一旦出現異常,則線程就會終止,其餘任務也得不到執行。

ScheduledThreadPoolExecutor繼承ThreadPoolExecutor來重用線程池的功能,它的實現方式以下:數組

  • 將任務封裝成ScheduledFutureTask對象,ScheduledFutureTask基於相對時間,不受系統時間的改變所影響;
  • ScheduledFutureTask實現了java.lang.Comparable接口和java.util.concurrent.Delayed接口,因此有兩個重要的方法:compareTo和getDelay。compareTo方法用於比較任務之間的優先級關係,若是距離下次執行的時間間隔較短,則優先級高;getDelay方法用於返回距離下次任務執行時間的時間間隔;
  • ScheduledThreadPoolExecutor定義了一個DelayedWorkQueue,它是一個有序隊列,會經過每一個任務按照距離下次執行時間間隔的大小來排序;
  • ScheduledFutureTask繼承自FutureTask,能夠經過返回Future對象來獲取執行的結果。

經過如上的介紹,能夠對比一下Timer和ScheduledThreadPoolExecutor:緩存

Timer ScheduledThreadPoolExecutor
單線程 多線程
單個任務執行時間影響其餘任務調度 多線程,不會影響
基於絕對時間 基於相對時間
一旦執行任務出現異常不會捕獲,其餘任務得不到執行 多線程,單個任務的執行不會影響其餘線程

因此,在JDK1.5以後,應該沒什麼理由繼續使用Timer進行任務調度了。網絡

ScheduledThreadPoolExecutor的使用

下面用一個具體的例子來講明ScheduledThreadPoolExecutor的使用:數據結構

public class ScheduledThreadPoolTest {
    public static void main(String[] args) throws InterruptedException {
        // 建立大小爲5的線程池
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
        for (int i = 0; i < 3; i++) {
            Task worker = new Task("task-" + i);
            // 只執行一次
//          scheduledThreadPool.schedule(worker, 5, TimeUnit.SECONDS);
            // 週期性執行,每5秒執行一次
            scheduledThreadPool.scheduleAtFixedRate(worker, 0,5, TimeUnit.SECONDS);
        }
        Thread.sleep(10000);
        System.out.println("Shutting down executor...");
        // 關閉線程池
        scheduledThreadPool.shutdown();
        boolean isDone;
        // 等待線程池終止
        do {
            isDone = scheduledThreadPool.awaitTermination(1, TimeUnit.DAYS);
            System.out.println("awaitTermination...");
        } while(!isDone);
        System.out.println("Finished all threads");
    }
}
class Task implements Runnable {
    private String name;
    public Task(String name) {
        this.name = name;
    }
    @Override
    public void run() {
        System.out.println("name = " + name + ", startTime = " + new Date());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("name = " + name + ", endTime = " + new Date());
    }
}

 

下面就來具體分析一下ScheduledThreadPoolExecutor的實現過程。多線程

ScheduledThreadPoolExecutor的實現

ScheduledThreadPoolExecutor的類結構

看下ScheduledThreadPoolExecutor內部的類圖:異步

 

不要被這麼多類嚇到,這裏只不過是爲了更清楚的瞭解ScheduledThreadPoolExecutor有關調度和隊列的接口。ide

ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,實現了ScheduledExecutorService接口,該接口定義了schedule等任務調度的方法。工具

同時ScheduledThreadPoolExecutor有兩個重要的內部類:DelayedWorkQueue和ScheduledFutureTask。能夠看到,DelayeddWorkQueue是一個阻塞隊列,而ScheduledFutureTask繼承自FutureTask,而且實現了Delayed接口。有關FutureTask的介紹請參考另外一篇文章:FutureTask源碼解析this

ScheduledThreadPoolExecutor的構造方法

ScheduledThreadPoolExecutor有3中構造方法:

 

public ScheduledThreadPoolExecutor(int corePoolSize,
                                    ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

 

由於ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,因此這裏都是調用的ThreadPoolExecutor類的構造方法。有關ThreadPoolExecutor能夠參考深刻理解Java線程池:ThreadPoolExecutor

這裏注意傳入的阻塞隊列是DelayedWorkQueue類型的對象。後面會詳細介紹。

schedule方法

在上文的例子中,使用了schedule方法來進行任務調度,schedule方法的代碼以下:

public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                       long delay,
                                       TimeUnit unit) {
    if (callable == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<V> t = decorateTask(callable,
        new ScheduledFutureTask<V>(callable,
                                   triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}

  

首先,這裏的兩個重載的schedule方法只是傳入的第一個參數不一樣,能夠是Runnable對象或者Callable對象。會把傳入的任務封裝成一個RunnableScheduledFuture對象,其實也就是ScheduledFutureTask對象,decorateTask默認什麼功能都沒有作,子類能夠重寫該方法:

/**
 * 修改或替換用於執行 runnable 的任務。此方法可重寫用於管理內部任務的具體類。默認實現只返回給定任務。
 */
protected <V> RunnableScheduledFuture<V> decorateTask(
    Runnable runnable, RunnableScheduledFuture<V> task) {
    return task;
}
/**
 * 修改或替換用於執行 callable 的任務。此方法可重寫用於管理內部任務的具體類。默認實現只返回給定任務。
 */
protected <V> RunnableScheduledFuture<V> decorateTask(
    Callable<V> callable, RunnableScheduledFuture<V> task) {
    return task;
}

  

而後,經過調用delayedExecute方法來延時執行任務。
最後,返回一個ScheduledFuture對象。

scheduleAtFixedRate方法

該方法設置了執行週期,下一次執行時間至關因而上一次的執行時間加上period,它是採用已固定的頻率來執行任務:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

  

scheduleWithFixedDelay方法

該方法設置了執行週期,與scheduleAtFixedRate方法不一樣的是,下一次執行時間是上一次任務執行完的系統時間加上period,於是具體執行時間不是固定的,但週期是固定的,是採用相對固定的延遲來執行任務:

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

  

注意這裏的unit.toNanos(-delay));,這裏把週期設置爲負數來表示是相對固定的延遲執行。

scheduleAtFixedRate和scheduleWithFixedDelay的區別在setNextRunTime方法中就能夠看出來:

private void setNextRunTime() {
    long p = period;
    // 固定頻率,上次執行時間加上週期時間
    if (p > 0)
        time += p;
    // 相對固定延遲執行,使用當前系統時間加上週期時間
    else
        time = triggerTime(-p);
}

  

setNextRunTime方法會在run方法中執行完任務後調用。

triggerTime方法

triggerTime方法用於獲取下一次執行的具體時間:

private long triggerTime(long delay, TimeUnit unit) {
    return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
long triggerTime(long delay) {
    return now() +
        ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

  

這裏的delay < (Long.MAX_VALUE >> 1是爲了判斷是否要防止Long類型溢出,若是delay的值小於Long類型最大值的一半,則直接返回delay,不然須要進行防止溢出處理。

overflowFree方法

該方法的做用是限制隊列中全部節點的延遲時間在Long.MAX_VALUE以內,防止在compareTo方法中溢出。

private long overflowFree(long delay) {
    // 獲取隊列中的第一個節點
    Delayed head = (Delayed) super.getQueue().peek();
    if (head != null) {
        // 獲取延遲時間
        long headDelay = head.getDelay(NANOSECONDS);
        // 若是延遲時間小於0,而且 delay - headDelay 超過了Long.MAX_VALUE
        // 將delay設置爲 Long.MAX_VALUE + headDelay 保證delay小於Long.MAX_VALUE
        if (headDelay < 0 && (delay - headDelay < 0))
            delay = Long.MAX_VALUE + headDelay;
    }
    return delay;
}

  

當一個任務已經能夠執行出隊操做,但尚未執行,可能因爲線程池中的工做線程不是空閒的。具體分析一下這種狀況:

  • 爲了方便說明,假設Long.MAX_VALUE=1023,也就是11位,而且當前的時間是100,調用triggerTime時並無對delay進行判斷,而是直接返回了now() + delay,也就是至關於100 + 1023,這確定是溢出了,那麼返回的時間是-925;
  • 若是頭節點已經能夠出隊可是尚未執行出隊,那麼頭節點的執行時間應該是小於當前時間的,假設是95;
  • 這時調用offer方法向隊列中添加任務,在offer方法中會調用siftUp方法來排序,在siftUp方法執行時又會調用ScheduledFutureTask中的compareTo方法來比較執行時間;
  • 這時若是執行到了compareTo方法中的long diff = time - x.time;時,那麼計算後的結果就是-925 - 95 = -1020,那麼將返回-1,而正常狀況應該是返回1,由於新加入的任務的執行時間要比頭結點的執行時間要晚,這就不是咱們想要的結果了,這會致使隊列中的順序不正確。
  • 同理也能夠算一下在執行compareTo方法中的long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);時也會有這種狀況;
  • 因此在triggerTime方法中對delay的大小作了判斷,就是爲了防止這種狀況發生。

若是執行了overflowFree方法呢,這時headDelay = 95 - 100 = -5,而後執行delay = 1023 + (-5) = 1018,那麼triggerTime會返回100 + 1018 = -930,再執行compareTo方法中的long diff = time - x.time;時,diff = -930 - 95 = -930 - 100 + 5 = 1018 + 5 = 1023,沒有溢出,符合正常的預期。

因此,overflowFree方法中把已經超時的部分時間給減去,就是爲了不在compareTo方法中出現溢出狀況。

(說實話,這段代碼看的很痛苦,通常狀況下也不會發生這種狀況,誰會傳一個Long.MAX_VALUE呢。要知道Long.MAX_VALUE的納秒數換算成年的話是292年,誰會這麼無聊。。。)

 

ScheduledFutureTask的getDelay方法

public long getDelay(TimeUnit unit) {
    // 執行時間減去當前系統時間
    return unit.convert(time - now(), NANOSECONDS);
}

  

ScheduledFutureTask的構造方法

ScheduledFutureTask繼承自FutureTask並實現了RunnableScheduledFuture接口,具體能夠參考上文的類圖,構造方法以下:

ScheduledFutureTask(Runnable r, V result, long ns) {
    super(r, result);
    this.time = ns;
    this.period = 0;
    this.sequenceNumber = sequencer.getAndIncrement();
}
/**
 * Creates a periodic action with given nano time and period.
 */
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    super(r, result);
    this.time = ns;
    this.period = period;
    this.sequenceNumber = sequencer.getAndIncrement();
}
/**
 * Creates a one-shot action with given nanoTime-based trigger time.
 */
ScheduledFutureTask(Callable<V> callable, long ns) {
    super(callable);
    this.time = ns;
    this.period = 0;
    this.sequenceNumber = sequencer.getAndIncrement();
}

  

這裏面有幾個重要的屬性,下面來解釋一下:

  • time:下次任務執行時的時間;
  • period:執行週期;
  • sequenceNumber:保存任務被添加到ScheduledThreadPoolExecutor中的序號。

在schedule方法中,建立完ScheduledFutureTask對象以後,會執行delayedExecute方法來執行任務。

delayedExecute方法

private void delayedExecute(RunnableScheduledFuture<?> task) {
    // 若是線程池已經關閉,使用拒絕策略拒絕任務
    if (isShutdown())
        reject(task);
    else {
        // 添加到阻塞隊列中
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            // 確保線程池中至少有一個線程啓動,即便corePoolSize爲0
            // 該方法在ThreadPoolExecutor中實現
            ensurePrestart();
    }
}

  

說一下這裏的第二個if判斷:

  1. 若是不是SHUTDOWN狀態,執行else,不然執行步驟2;
  2. 若是在當前線程池運行狀態下能夠執行任務,執行else,不然執行步驟3;
  3. 從阻塞隊列中刪除任務,若是失敗,執行else,不然執行步驟4;
  4. 取消任務,但不中斷執行中的任務。

對於步驟2,能夠經過setContinueExistingPeriodicTasksAfterShutdownPolicy方法設置在線程池關閉時,週期任務繼續執行,默認爲false,也就是線程池關閉時,再也不執行週期任務。

ensurePrestart方法在ThreadPoolExecutor中定義:

void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

 

調用了addWorker方法,能夠在深刻理解Java線程池:ThreadPoolExecutor中查看addWorker方法的介紹,線程池中的工做線程是經過該方法來啓動並執行任務的。

ScheduledFutureTask的run方法

回顧一下線程池的執行過程:當線程池中的工做線程啓動時,不斷地從阻塞隊列中取出任務並執行,固然,取出的任務實現了Runnable接口,因此是經過調用任務的run方法來執行任務的。

這裏的任務類型是ScheduledFutureTask,因此下面看一下ScheduledFutureTask的run方法:

public void run() {
    // 是不是週期性任務
    boolean periodic = isPeriodic();
    // 當前線程池運行狀態下若是不能夠執行任務,取消該任務
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    // 若是不是週期性任務,調用FutureTask中的run方法執行
    else if (!periodic)
        ScheduledFutureTask.super.run();
    // 若是是週期性任務,調用FutureTask中的runAndReset方法執行
    // runAndReset方法不會設置執行結果,因此能夠重複執行任務
    else if (ScheduledFutureTask.super.runAndReset()) {
        // 計算下次執行該任務的時間
        setNextRunTime();
        // 重複執行任務
        reExecutePeriodic(outerTask);
    }
}

  

有關FutureTask的run方法和runAndReset方法,能夠參考FutureTask源碼解析

分析一下執行過程:

  1. 若是當前線程池運行狀態不能夠執行任務,取消該任務,而後直接返回,不然執行步驟2;
  2. 若是不是週期性任務,調用FutureTask中的run方法執行,會設置執行結果,而後直接返回,不然執行步驟3;
  3. 若是是週期性任務,調用FutureTask中的runAndReset方法執行,不會設置執行結果,而後直接返回,不然執行步驟4和步驟5;
  4. 計算下次執行該任務的具體時間;
  5. 重複執行任務。

ScheduledFutureTask的reExecutePeriodic方法

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    if (canRunInCurrentRunState(true)) {
        super.getQueue().add(task);
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}

  

該方法和delayedExecute方法相似,不一樣的是:

  1. 因爲調用reExecutePeriodic方法時已經執行過一次週期性任務了,因此不會reject當前任務;
  2. 傳入的任務必定是週期性任務。

onShutdown方法

onShutdown方法是ThreadPoolExecutor中的鉤子方法,在ThreadPoolExecutor中什麼都沒有作,參考深刻理解Java線程池:ThreadPoolExecutor,該方法是在執行shutdown方法時被調用:

@Override void onShutdown() {
    BlockingQueue<Runnable> q = super.getQueue();
    // 獲取在線程池已 shutdown 的狀況下是否繼續執行現有延遲任務
    boolean keepDelayed =
        getExecuteExistingDelayedTasksAfterShutdownPolicy();
    // 獲取在線程池已 shutdown 的狀況下是否繼續執行現有按期任務
    boolean keepPeriodic =
        getContinueExistingPeriodicTasksAfterShutdownPolicy();
    // 若是在線程池已 shutdown 的狀況下不繼續執行延遲任務和按期任務
    // 則依次取消任務,不然則根據取消狀態來判斷
    if (!keepDelayed && !keepPeriodic) {
        for (Object e : q.toArray())
            if (e instanceof RunnableScheduledFuture<?>)
                ((RunnableScheduledFuture<?>) e).cancel(false);
        q.clear();
    }
    else {
        // Traverse snapshot to avoid iterator exceptions
        for (Object e : q.toArray()) {
            if (e instanceof RunnableScheduledFuture) {
                RunnableScheduledFuture<?> t =
                    (RunnableScheduledFuture<?>)e;
                // 若是有在 shutdown 後不繼續的延遲任務或週期任務,則從隊列中刪除並取消任務
                if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
                    t.isCancelled()) { // also remove if already cancelled
                    if (q.remove(t))
                        t.cancel(false);
                }
            }
        }
    }
    tryTerminate();
}

  

DelayedWorkQueue

ScheduledThreadPoolExecutor之因此要本身實現阻塞的工做隊列,是由於ScheduledThreadPoolExecutor要求的工做隊列有些特殊。

DelayedWorkQueue是一個基於堆的數據結構,相似於DelayQueue和PriorityQueue。在執行定時任務的時候,每一個任務的執行時間都不一樣,因此DelayedWorkQueue的工做就是按照執行時間的升序來排列,執行時間距離當前時間越近的任務在隊列的前面(注意:這裏的順序並非絕對的,堆中的排序只保證了子節點的下次執行時間要比父節點的下次執行時間要大,而葉子節點之間並不必定是順序的,下文中會說明)。

堆結構以下圖所示:

 

可見,DelayedWorkQueue是一個基於最小堆結構的隊列。堆結構可使用數組表示,能夠轉換成以下的數組:

在這種結構中,能夠發現有以下特性:

假設,索引值從0開始,子節點的索引值爲k,父節點的索引值爲p,則:

  1. 一個節點的左子節點的索引爲:k = p * 2 + 1;
  2. 一個節點的右子節點的索引爲:k = (p + 1) * 2;
  3. 一個節點的父節點的索引爲:p = (k - 1) / 2。

爲何要使用DelayedWorkQueue呢?

定時任務執行時須要取出最近要執行的任務,因此任務在隊列中每次出隊時必定要是當前隊列中執行時間最靠前的,因此天然要使用優先級隊列。

DelayedWorkQueue是一個優先級隊列,它能夠保證每次出隊的任務都是當前隊列中執行時間最靠前的,因爲它是基於堆結構的隊列,堆結構在執行插入和刪除操做時的最壞時間複雜度是 O(logN)

 

DelayedWorkQueue的屬性

// 隊列初始容量
private static final int INITIAL_CAPACITY = 16;
// 根據初始容量建立RunnableScheduledFuture類型的數組
private RunnableScheduledFuture<?>[] queue =
    new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
// leader線程
private Thread leader = null;
// 當較新的任務在隊列的頭部可用時,或者新線程可能須要成爲leader,則經過該條件發出信號
private final Condition available = lock.newCondition();

  

注意這裏的leader,它是Leader-Follower模式的變體,用於減小沒必要要的定時等待。什麼意思呢?對於多線程的網絡模型來講:

全部線程會有三種身份中的一種:leader和follower,以及一個幹活中的狀態:proccesser。它的基本原則就是,永遠最多隻有一個leader。而全部follower都在等待成爲leader。線程池啓動時會自動產生一個Leader負責等待網絡IO事件,當有一個事件產生時,Leader線程首先通知一個Follower線程將其提拔爲新的Leader,而後本身就去幹活了,去處理這個網絡事件,處理完畢後加入Follower線程等待隊列,等待下次成爲Leader。這種方法能夠加強CPU高速緩存類似性,及消除動態內存分配和線程間的數據交換。

參考自:http://blog.csdn.net/goldlevi/article/details/7705180

具體leader的做用在分析take方法時再詳細介紹。

offer方法

既然是阻塞隊列,入隊的操做如add和put方法都調用了offer方法,下面查看一下offer方法:

public boolean offer(Runnable x) {
    if (x == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = size;
        // queue是一個RunnableScheduledFuture類型的數組,若是容量不夠須要擴容
        if (i >= queue.length)
            grow();
        size = i + 1;
        // i == 0 說明堆中尚未數據
        if (i == 0) {
            queue[0] = e;
            setIndex(e, 0);
        } else {
        // i != 0 時,須要對堆進行從新排序
            siftUp(i, e);
        }
        // 若是傳入的任務已是隊列的第一個節點了,這時available須要發出信號
        if (queue[0] == e) {
            // leader設置爲null爲了使在take方法中的線程在經過available.signal();後會執行available.awaitNanos(delay);
            leader = null;
            available.signal();
        }
    } finally {
        lock.unlock();
    }
    return true;
}

  

有關Condition的介紹請參考深刻理解AbstractQueuedSynchronizer(三)

這裏的重點是siftUp方法。

siftUp方法

private void siftUp(int k, RunnableScheduledFuture<?> key) {
    while (k > 0) {
        // 找到父節點的索引
        int parent = (k - 1) >>> 1;
        // 獲取父節點
        RunnableScheduledFuture<?> e = queue[parent];
        // 若是key節點的執行時間大於父節點的執行時間,不須要再排序了
        if (key.compareTo(e) >= 0)
            break;
        // 若是key.compareTo(e) < 0,說明key節點的執行時間小於父節點的執行時間,須要把父節點移到後面
        queue[k] = e;
        // 設置索引爲k
        setIndex(e, k);
        k = parent;
    }
    // key設置爲排序後的位置中
    queue[k] = key;
    setIndex(key, k);
}

  

代碼很好理解,就是循環的根據key節點與它的父節點來判斷,若是key節點的執行時間小於父節點,則將兩個節點交換,使執行時間靠前的節點排列在隊列的前面。

假設新入隊的節點的延遲時間(調用getDelay()方法得到)是5,執行過程以下:

  1. 先將新的節點添加到數組的尾部,這時新節點的索引k爲7:

  2. 計算新父節點的索引:parent = (k - 1) >>> 1,parent = 3,那麼queue[3]的時間間隔值爲8,由於 5 < 8 ,將執行queue[7] = queue[3]:

  3.這時將k設置爲3,繼續循環,再次計算parent爲1,queue[1]的時間間隔爲3,由於 5 > 3 ,這時退出循環,最終k爲3: 

 

 

可見,每次新增節點時,只是根據父節點來判斷,而不會影響兄弟節點。

另外,setIndex方法只是設置了ScheduledFutureTask中的heapIndex屬性:

private void setIndex(RunnableScheduledFuture<?> f, int idx) {
    if (f instanceof ScheduledFutureTask)
        ((ScheduledFutureTask)f).heapIndex = idx;
}

  

take方法

public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
            if (first == null)
                available.await();
            else {
                // 計算當前時間到執行時間的時間間隔
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return finishPoll(first);
                first = null; // don't retain ref while waiting
                // leader不爲空,阻塞線程
                if (leader != null)
                    available.await();
                else {
                    // leader爲空,則把leader設置爲當前線程,
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 阻塞到執行時間
                        available.awaitNanos(delay);
                    } finally {
                        // 設置leader = null,讓其餘線程執行available.awaitNanos(delay);
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 若是leader不爲空,則說明leader的線程正在執行available.awaitNanos(delay);
        // 若是queue[0] == null,說明隊列爲空
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

  

ake方法是何時調用的呢?在深刻理解Java線程池:ThreadPoolExecutor中,介紹了getTask方法,工做線程會循環地從workQueue中取任務。但定時任務卻不一樣,由於若是一旦getTask方法取出了任務就開始執行了,而這時可能尚未到執行的時間,因此在take方法中,要保證只有在到指定的執行時間的時候任務才能夠被取走。

再來講一下leader的做用,這裏的leader是爲了減小沒必要要的定時等待,當一個線程成爲leader時,它只等待下一個節點的時間間隔,但其它線程無限期等待。 leader線程必須在從take()或poll()返回以前signal其它線程,除非其餘線程成爲了leader。

舉例來講,若是沒有leader,那麼在執行take時,都要執行available.awaitNanos(delay),假設當前線程執行了該段代碼,這時尚未signal,第二個線程也執行了該段代碼,則第二個線程也要被阻塞。多個這時執行該段代碼是沒有做用的,由於只能有一個線程會從take中返回queue[0](由於有lock),其餘線程這時再返回for循環執行時取的queue[0],已經不是以前的queue[0]了,而後又要繼續阻塞。

因此,爲了避免讓多個線程頻繁的作無用的定時等待,這裏增長了leader,若是leader不爲空,則說明隊列中第一個節點已經在等待出隊,這時其它的線程會一直阻塞,減小了無用的阻塞(注意,在finally中調用了signal()來喚醒一個線程,而不是signalAll())。

poll方法

下面看下poll方法,與take相似,但這裏要提供超時功能:

 

public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
            if (first == null) {
                if (nanos <= 0)
                    return null;
                else
                    nanos = available.awaitNanos(nanos);
            } else {
                long delay = first.getDelay(NANOSECONDS);
                // 若是delay <= 0,說明已經到了任務執行的時間,返回。
                if (delay <= 0)
                    return finishPoll(first);
                // 若是nanos <= 0,說明已經超時,返回null
                if (nanos <= 0)
                    return null;
                first = null; // don't retain ref while waiting
                // nanos < delay 說明須要等待的時間小於任務要執行的延遲時間
                // leader != null 說明有其它線程正在對任務進行阻塞
                // 這時阻塞當前線程nanos納秒
                if (nanos < delay || leader != null)
                    nanos = available.awaitNanos(nanos);
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 這裏的timeLeft表示delay減去實際的等待時間
                        long timeLeft = available.awaitNanos(delay);
                        // 計算剩餘的等待時間
                        nanos -= delay - timeLeft;
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

  

finishPoll方法

當調用了take或者poll方法可以獲取到任務時,會調用該方法進行返回:

private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
    // 數組長度-1
    int s = --size;
    // 取出最後一個節點
    RunnableScheduledFuture<?> x = queue[s];
    queue[s] = null;
    // 長度不爲0,則從第一個元素開始排序,目的是要把最後一個節點放到合適的位置上
    if (s != 0)
        siftDown(0, x);
    setIndex(f, -1);
    return f;
}

  

siftDown方法

siftDown方法使堆從k開始向下調整:

private void siftDown(int k, RunnableScheduledFuture<?> key) {
    // 根據二叉樹的特性,數組長度除以2,表示取有子節點的索引
    int half = size >>> 1;
    // 判斷索引爲k的節點是否有子節點
    while (k < half) {
        // 左子節點的索引
        int child = (k << 1) + 1;
        RunnableScheduledFuture<?> c = queue[child];
        // 右子節點的索引
        int right = child + 1;
        // 若是有右子節點而且左子節點的時間間隔大於右子節點,取時間間隔最小的節點
        if (right < size && c.compareTo(queue[right]) > 0)
            c = queue[child = right];
        // 若是key的時間間隔小於等於c的時間間隔,跳出循環
        if (key.compareTo(c) <= 0)
            break;
        // 設置要移除索引的節點爲其子節點
        queue[k] = c;
        setIndex(c, k);
        k = child;
    }
    // 將key放入索引爲k的位置
    queue[k] = key;
    setIndex(key, k);
}

  

siftDown方法執行時包含兩種狀況,一種是沒有子節點,一種是有子節點(根據half判斷)。例如:

沒有子節點的狀況:

假設初始的堆以下:

 

 

假設 k = 3 ,那麼 k = half ,沒有子節點,在執行siftDown方法時直接把索引爲3的節點設置爲數組的最後一個節點:

 

有子節點的狀況:

假設 k = 0 ,那麼執行如下步驟:

  1. 獲取左子節點,child = 1 ,獲取右子節點, right = 2 :
  2. 因爲 right < size ,這時比較左子節點和右子節點時間間隔的大小,這裏 3 < 7 ,因此 c = queue[child] ;
  3. 比較key的時間間隔是否小於c的時間間隔,這裏不知足,繼續執行,把索引爲k的節點設置爲c,而後將k設置爲child,;

   4.由於 half = 3 ,k = 1 ,繼續執行循環,這時的索引變爲:

 

  5.這時再通過如上判斷後,將k的值爲3,最終的結果以下:

  6.最後,若是在finishPoll方法中調用的話,會把索引爲0的節點的索引設置爲-1,表示已經刪除了該節點,而且size也減了1,最後的結果以下:

 

可見,siftdown方法在執行完並非有序的,但能夠發現,子節點的下次執行時間必定比父節點的下次執行時間要大,因爲每次都會取左子節點和右子節點中下次執行時間最小的節點,因此仍是能夠保證在take和poll時出隊是有序的。

remove方法

 

public boolean remove(Object x) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = indexOf(x);
        if (i < 0)
            return false;
        setIndex(queue[i], -1);
        int s = --size;
        RunnableScheduledFuture<?> replacement = queue[s];
        queue[s] = null;
        if (s != i) {
            // 從i開始向下調整
            siftDown(i, replacement);
            // 若是queue[i] == replacement,說明i是葉子節點
            // 若是是這種狀況,不能保證子節點的下次執行時間比父節點的大
            // 這時須要進行一次向上調整
            if (queue[i] == replacement)
                siftUp(i, replacement);
        }
        return true;
    } finally {
        lock.unlock();
    }
}

  

假設初始的堆結構以下:

 這時要刪除8的節點,那麼這時 k = 1,key爲最後一個節點:

這時經過上文對siftDown方法的分析,siftDown方法執行後的結果以下:

這時會發現,最後一個節點的值比父節點還要小,因此這裏要執行一次siftUp方法來保證子節點的下次執行時間要比父節點的大,因此最終結果以下:

 

總結

本文詳細分析了ScheduedThreadPoolExecutor的實現,主要介紹瞭如下方面:

  • 與Timer執行定時任務的比較,相比Timer,ScheduedThreadPoolExecutor有什麼優勢;
  • ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,因此它也是一個線程池,也有coorPoolSize和workQueue,ScheduledThreadPoolExecutor特殊的地方在於,本身實現了優先工做隊列DelayedWorkQueue;
  • ScheduedThreadPoolExecutor實現了ScheduledExecutorService,因此就有了任務調度的方法,如schedule,scheduleAtFixedRate和scheduleWithFixedDelay,同時注意他們之間的區別;
  • 內部類ScheduledFutureTask繼承自FutureTask,實現了任務的異步執行而且能夠獲取返回結果。同時也實現了Delayed接口,能夠經過getDelay方法獲取將要執行的時間間隔;
  • 週期任務的執行實際上是調用了FutureTask類中的runAndReset方法,每次執行完不設置結果和狀態。參考FutureTask源碼解析
  • 詳細分析了DelayedWorkQueue的數據結構,它是一個基於最小堆結構的優先隊列,而且每次出隊時可以保證取出的任務是當前隊列中下次執行時間最小的任務。同時注意一下優先隊列中堆的順序,堆中的順序並非絕對的,但要保證子節點的值要比父節點的值要大,這樣就不會影響出隊的順序。

整體來講,ScheduedThreadPoolExecutor的重點是要理解下次執行時間的計算,以及優先隊列的出隊、入隊和刪除的過程,這兩個是理解ScheduedThreadPoolExecutor的關鍵。

原文地址:http://www.ideabuffer.cn/2017/04/14/%E6%B7%B1%E5%85%A5%E7%90%86%E8%A7%A3Java%E7%BA%BF%E7%A8%8B%E6%B1%A0%EF%BC%9AScheduledThreadPoolExecutor/

相關文章
相關標籤/搜索