自JDK1.5開始,JDK提供了ScheduledThreadPoolExecutor類來支持週期性任務的調度。在這以前的實現須要依靠Timer和TimerTask或者其它第三方工具來完成。但Timer有很多的缺陷:java
ScheduledThreadPoolExecutor繼承ThreadPoolExecutor來重用線程池的功能,它的實現方式以下:數組
java.lang.Comparable
接口和java.util.concurrent.Delayed
接口,因此有兩個重要的方法:compareTo和getDelay。compareTo方法用於比較任務之間的優先級關係,若是距離下次執行的時間間隔較短,則優先級高;getDelay方法用於返回距離下次任務執行時間的時間間隔;經過如上的介紹,能夠對比一下Timer和ScheduledThreadPoolExecutor:緩存
Timer | ScheduledThreadPoolExecutor |
---|---|
單線程 | 多線程 |
單個任務執行時間影響其餘任務調度 | 多線程,不會影響 |
基於絕對時間 | 基於相對時間 |
一旦執行任務出現異常不會捕獲,其餘任務得不到執行 | 多線程,單個任務的執行不會影響其餘線程 |
因此,在JDK1.5以後,應該沒什麼理由繼續使用Timer進行任務調度了。網絡
下面用一個具體的例子來講明ScheduledThreadPoolExecutor的使用:數據結構
public class ScheduledThreadPoolTest { public static void main(String[] args) throws InterruptedException { // 建立大小爲5的線程池 ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); for (int i = 0; i < 3; i++) { Task worker = new Task("task-" + i); // 只執行一次 // scheduledThreadPool.schedule(worker, 5, TimeUnit.SECONDS); // 週期性執行,每5秒執行一次 scheduledThreadPool.scheduleAtFixedRate(worker, 0,5, TimeUnit.SECONDS); } Thread.sleep(10000); System.out.println("Shutting down executor..."); // 關閉線程池 scheduledThreadPool.shutdown(); boolean isDone; // 等待線程池終止 do { isDone = scheduledThreadPool.awaitTermination(1, TimeUnit.DAYS); System.out.println("awaitTermination..."); } while(!isDone); System.out.println("Finished all threads"); } } class Task implements Runnable { private String name; public Task(String name) { this.name = name; } @Override public void run() { System.out.println("name = " + name + ", startTime = " + new Date()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("name = " + name + ", endTime = " + new Date()); } }
下面就來具體分析一下ScheduledThreadPoolExecutor的實現過程。多線程
看下ScheduledThreadPoolExecutor內部的類圖:異步
不要被這麼多類嚇到,這裏只不過是爲了更清楚的瞭解ScheduledThreadPoolExecutor有關調度和隊列的接口。ide
ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,實現了ScheduledExecutorService接口,該接口定義了schedule等任務調度的方法。工具
同時ScheduledThreadPoolExecutor有兩個重要的內部類:DelayedWorkQueue和ScheduledFutureTask。能夠看到,DelayeddWorkQueue是一個阻塞隊列,而ScheduledFutureTask繼承自FutureTask,而且實現了Delayed接口。有關FutureTask的介紹請參考另外一篇文章:FutureTask源碼解析。this
ScheduledThreadPoolExecutor有3中構造方法:
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); }
由於ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,因此這裏都是調用的ThreadPoolExecutor類的構造方法。有關ThreadPoolExecutor能夠參考深刻理解Java線程池:ThreadPoolExecutor。
這裏注意傳入的阻塞隊列是DelayedWorkQueue類型的對象。後面會詳細介紹。
在上文的例子中,使用了schedule方法來進行任務調度,schedule方法的代碼以下:
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); delayedExecute(t); return t; } public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit))); delayedExecute(t); return t; }
首先,這裏的兩個重載的schedule方法只是傳入的第一個參數不一樣,能夠是Runnable對象或者Callable對象。會把傳入的任務封裝成一個RunnableScheduledFuture對象,其實也就是ScheduledFutureTask對象,decorateTask默認什麼功能都沒有作,子類能夠重寫該方法:
/** * 修改或替換用於執行 runnable 的任務。此方法可重寫用於管理內部任務的具體類。默認實現只返回給定任務。 */ protected <V> RunnableScheduledFuture<V> decorateTask( Runnable runnable, RunnableScheduledFuture<V> task) { return task; } /** * 修改或替換用於執行 callable 的任務。此方法可重寫用於管理內部任務的具體類。默認實現只返回給定任務。 */ protected <V> RunnableScheduledFuture<V> decorateTask( Callable<V> callable, RunnableScheduledFuture<V> task) { return task; }
而後,經過調用delayedExecute方法來延時執行任務。
最後,返回一個ScheduledFuture對象。
該方法設置了執行週期,下一次執行時間至關因而上一次的執行時間加上period,它是採用已固定的頻率來執行任務:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
該方法設置了執行週期,與scheduleAtFixedRate方法不一樣的是,下一次執行時間是上一次任務執行完的系統時間加上period,於是具體執行時間不是固定的,但週期是固定的,是採用相對固定的延遲來執行任務:
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
注意這裏的unit.toNanos(-delay));
,這裏把週期設置爲負數來表示是相對固定的延遲執行。
scheduleAtFixedRate和scheduleWithFixedDelay的區別在setNextRunTime方法中就能夠看出來:
private void setNextRunTime() { long p = period; // 固定頻率,上次執行時間加上週期時間 if (p > 0) time += p; // 相對固定延遲執行,使用當前系統時間加上週期時間 else time = triggerTime(-p); }
setNextRunTime方法會在run方法中執行完任務後調用。
triggerTime方法用於獲取下一次執行的具體時間:
private long triggerTime(long delay, TimeUnit unit) { return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); } long triggerTime(long delay) { return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); }
這裏的delay < (Long.MAX_VALUE >> 1
是爲了判斷是否要防止Long類型溢出,若是delay的值小於Long類型最大值的一半,則直接返回delay,不然須要進行防止溢出處理。
該方法的做用是限制隊列中全部節點的延遲時間在Long.MAX_VALUE以內,防止在compareTo方法中溢出。
private long overflowFree(long delay) { // 獲取隊列中的第一個節點 Delayed head = (Delayed) super.getQueue().peek(); if (head != null) { // 獲取延遲時間 long headDelay = head.getDelay(NANOSECONDS); // 若是延遲時間小於0,而且 delay - headDelay 超過了Long.MAX_VALUE // 將delay設置爲 Long.MAX_VALUE + headDelay 保證delay小於Long.MAX_VALUE if (headDelay < 0 && (delay - headDelay < 0)) delay = Long.MAX_VALUE + headDelay; } return delay; }
當一個任務已經能夠執行出隊操做,但尚未執行,可能因爲線程池中的工做線程不是空閒的。具體分析一下這種狀況:
now() + delay
,也就是至關於100 + 1023
,這確定是溢出了,那麼返回的時間是-925;long diff = time - x.time;
時,那麼計算後的結果就是-925 - 95 = -1020
,那麼將返回-1,而正常狀況應該是返回1,由於新加入的任務的執行時間要比頭結點的執行時間要晚,這就不是咱們想要的結果了,這會致使隊列中的順序不正確。long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
時也會有這種狀況;若是執行了overflowFree方法呢,這時headDelay = 95 - 100 = -5
,而後執行delay = 1023 + (-5) = 1018
,那麼triggerTime會返回100 + 1018 = -930
,再執行compareTo方法中的long diff = time - x.time;
時,diff = -930 - 95 = -930 - 100 + 5 = 1018 + 5 = 1023
,沒有溢出,符合正常的預期。
因此,overflowFree方法中把已經超時的部分時間給減去,就是爲了不在compareTo方法中出現溢出狀況。
(說實話,這段代碼看的很痛苦,通常狀況下也不會發生這種狀況,誰會傳一個Long.MAX_VALUE呢。要知道Long.MAX_VALUE的納秒數換算成年的話是292年,誰會這麼無聊。。。)
public long getDelay(TimeUnit unit) { // 執行時間減去當前系統時間 return unit.convert(time - now(), NANOSECONDS); }
ScheduledFutureTask繼承自FutureTask並實現了RunnableScheduledFuture接口,具體能夠參考上文的類圖,構造方法以下:
ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } /** * Creates a periodic action with given nano time and period. */ ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); } /** * Creates a one-shot action with given nanoTime-based trigger time. */ ScheduledFutureTask(Callable<V> callable, long ns) { super(callable); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); }
這裏面有幾個重要的屬性,下面來解釋一下:
在schedule方法中,建立完ScheduledFutureTask對象以後,會執行delayedExecute方法來執行任務。
private void delayedExecute(RunnableScheduledFuture<?> task) { // 若是線程池已經關閉,使用拒絕策略拒絕任務 if (isShutdown()) reject(task); else { // 添加到阻塞隊列中 super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else // 確保線程池中至少有一個線程啓動,即便corePoolSize爲0 // 該方法在ThreadPoolExecutor中實現 ensurePrestart(); } }
說一下這裏的第二個if判斷:
對於步驟2,能夠經過setContinueExistingPeriodicTasksAfterShutdownPolicy方法設置在線程池關閉時,週期任務繼續執行,默認爲false,也就是線程池關閉時,再也不執行週期任務。
ensurePrestart方法在ThreadPoolExecutor中定義:
void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); }
調用了addWorker方法,能夠在深刻理解Java線程池:ThreadPoolExecutor中查看addWorker方法的介紹,線程池中的工做線程是經過該方法來啓動並執行任務的。
回顧一下線程池的執行過程:當線程池中的工做線程啓動時,不斷地從阻塞隊列中取出任務並執行,固然,取出的任務實現了Runnable接口,因此是經過調用任務的run方法來執行任務的。
這裏的任務類型是ScheduledFutureTask,因此下面看一下ScheduledFutureTask的run方法:
public void run() { // 是不是週期性任務 boolean periodic = isPeriodic(); // 當前線程池運行狀態下若是不能夠執行任務,取消該任務 if (!canRunInCurrentRunState(periodic)) cancel(false); // 若是不是週期性任務,調用FutureTask中的run方法執行 else if (!periodic) ScheduledFutureTask.super.run(); // 若是是週期性任務,調用FutureTask中的runAndReset方法執行 // runAndReset方法不會設置執行結果,因此能夠重複執行任務 else if (ScheduledFutureTask.super.runAndReset()) { // 計算下次執行該任務的時間 setNextRunTime(); // 重複執行任務 reExecutePeriodic(outerTask); } }
有關FutureTask的run方法和runAndReset方法,能夠參考FutureTask源碼解析。
分析一下執行過程:
void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true)) { super.getQueue().add(task); if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); } }
該方法和delayedExecute方法相似,不一樣的是:
onShutdown方法是ThreadPoolExecutor中的鉤子方法,在ThreadPoolExecutor中什麼都沒有作,參考深刻理解Java線程池:ThreadPoolExecutor,該方法是在執行shutdown方法時被調用:
@Override void onShutdown() { BlockingQueue<Runnable> q = super.getQueue(); // 獲取在線程池已 shutdown 的狀況下是否繼續執行現有延遲任務 boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); // 獲取在線程池已 shutdown 的狀況下是否繼續執行現有按期任務 boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); // 若是在線程池已 shutdown 的狀況下不繼續執行延遲任務和按期任務 // 則依次取消任務,不然則根據取消狀態來判斷 if (!keepDelayed && !keepPeriodic) { for (Object e : q.toArray()) if (e instanceof RunnableScheduledFuture<?>) ((RunnableScheduledFuture<?>) e).cancel(false); q.clear(); } else { // Traverse snapshot to avoid iterator exceptions for (Object e : q.toArray()) { if (e instanceof RunnableScheduledFuture) { RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e; // 若是有在 shutdown 後不繼續的延遲任務或週期任務,則從隊列中刪除並取消任務 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || t.isCancelled()) { // also remove if already cancelled if (q.remove(t)) t.cancel(false); } } } } tryTerminate(); }
ScheduledThreadPoolExecutor之因此要本身實現阻塞的工做隊列,是由於ScheduledThreadPoolExecutor要求的工做隊列有些特殊。
DelayedWorkQueue是一個基於堆的數據結構,相似於DelayQueue和PriorityQueue。在執行定時任務的時候,每一個任務的執行時間都不一樣,因此DelayedWorkQueue的工做就是按照執行時間的升序來排列,執行時間距離當前時間越近的任務在隊列的前面(注意:這裏的順序並非絕對的,堆中的排序只保證了子節點的下次執行時間要比父節點的下次執行時間要大,而葉子節點之間並不必定是順序的,下文中會說明)。
堆結構以下圖所示:
可見,DelayedWorkQueue是一個基於最小堆結構的隊列。堆結構可使用數組表示,能夠轉換成以下的數組:
在這種結構中,能夠發現有以下特性:
假設,索引值從0開始,子節點的索引值爲k,父節點的索引值爲p,則:
爲何要使用DelayedWorkQueue呢?
定時任務執行時須要取出最近要執行的任務,因此任務在隊列中每次出隊時必定要是當前隊列中執行時間最靠前的,因此天然要使用優先級隊列。
DelayedWorkQueue是一個優先級隊列,它能夠保證每次出隊的任務都是當前隊列中執行時間最靠前的,因爲它是基於堆結構的隊列,堆結構在執行插入和刪除操做時的最壞時間複雜度是 O(logN)。
// 隊列初始容量 private static final int INITIAL_CAPACITY = 16; // 根據初始容量建立RunnableScheduledFuture類型的數組 private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; private final ReentrantLock lock = new ReentrantLock(); private int size = 0; // leader線程 private Thread leader = null; // 當較新的任務在隊列的頭部可用時,或者新線程可能須要成爲leader,則經過該條件發出信號 private final Condition available = lock.newCondition();
注意這裏的leader,它是Leader-Follower模式的變體,用於減小沒必要要的定時等待。什麼意思呢?對於多線程的網絡模型來講:
全部線程會有三種身份中的一種:leader和follower,以及一個幹活中的狀態:proccesser。它的基本原則就是,永遠最多隻有一個leader。而全部follower都在等待成爲leader。線程池啓動時會自動產生一個Leader負責等待網絡IO事件,當有一個事件產生時,Leader線程首先通知一個Follower線程將其提拔爲新的Leader,而後本身就去幹活了,去處理這個網絡事件,處理完畢後加入Follower線程等待隊列,等待下次成爲Leader。這種方法能夠加強CPU高速緩存類似性,及消除動態內存分配和線程間的數據交換。
參考自:http://blog.csdn.net/goldlevi/article/details/7705180
具體leader的做用在分析take方法時再詳細介紹。
既然是阻塞隊列,入隊的操做如add和put方法都調用了offer方法,下面查看一下offer方法:
public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; // queue是一個RunnableScheduledFuture類型的數組,若是容量不夠須要擴容 if (i >= queue.length) grow(); size = i + 1; // i == 0 說明堆中尚未數據 if (i == 0) { queue[0] = e; setIndex(e, 0); } else { // i != 0 時,須要對堆進行從新排序 siftUp(i, e); } // 若是傳入的任務已是隊列的第一個節點了,這時available須要發出信號 if (queue[0] == e) { // leader設置爲null爲了使在take方法中的線程在經過available.signal();後會執行available.awaitNanos(delay); leader = null; available.signal(); } } finally { lock.unlock(); } return true; }
有關Condition的介紹請參考深刻理解AbstractQueuedSynchronizer(三)
這裏的重點是siftUp方法。
private void siftUp(int k, RunnableScheduledFuture<?> key) { while (k > 0) { // 找到父節點的索引 int parent = (k - 1) >>> 1; // 獲取父節點 RunnableScheduledFuture<?> e = queue[parent]; // 若是key節點的執行時間大於父節點的執行時間,不須要再排序了 if (key.compareTo(e) >= 0) break; // 若是key.compareTo(e) < 0,說明key節點的執行時間小於父節點的執行時間,須要把父節點移到後面 queue[k] = e; // 設置索引爲k setIndex(e, k); k = parent; } // key設置爲排序後的位置中 queue[k] = key; setIndex(key, k); }
代碼很好理解,就是循環的根據key節點與它的父節點來判斷,若是key節點的執行時間小於父節點,則將兩個節點交換,使執行時間靠前的節點排列在隊列的前面。
假設新入隊的節點的延遲時間(調用getDelay()方法得到)是5,執行過程以下:
3.這時將k設置爲3,繼續循環,再次計算parent爲1,queue[1]的時間間隔爲3,由於 5 > 3 ,這時退出循環,最終k爲3:
可見,每次新增節點時,只是根據父節點來判斷,而不會影響兄弟節點。
另外,setIndex方法只是設置了ScheduledFutureTask中的heapIndex屬性:
private void setIndex(RunnableScheduledFuture<?> f, int idx) { if (f instanceof ScheduledFutureTask) ((ScheduledFutureTask)f).heapIndex = idx; }
public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) available.await(); else { // 計算當前時間到執行時間的時間間隔 long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); first = null; // don't retain ref while waiting // leader不爲空,阻塞線程 if (leader != null) available.await(); else { // leader爲空,則把leader設置爲當前線程, Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 阻塞到執行時間 available.awaitNanos(delay); } finally { // 設置leader = null,讓其餘線程執行available.awaitNanos(delay); if (leader == thisThread) leader = null; } } } } } finally { // 若是leader不爲空,則說明leader的線程正在執行available.awaitNanos(delay); // 若是queue[0] == null,說明隊列爲空 if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
ake方法是何時調用的呢?在深刻理解Java線程池:ThreadPoolExecutor中,介紹了getTask方法,工做線程會循環地從workQueue中取任務。但定時任務卻不一樣,由於若是一旦getTask方法取出了任務就開始執行了,而這時可能尚未到執行的時間,因此在take方法中,要保證只有在到指定的執行時間的時候任務才能夠被取走。
再來講一下leader的做用,這裏的leader是爲了減小沒必要要的定時等待,當一個線程成爲leader時,它只等待下一個節點的時間間隔,但其它線程無限期等待。 leader線程必須在從take()或poll()返回以前signal其它線程,除非其餘線程成爲了leader。
舉例來講,若是沒有leader,那麼在執行take時,都要執行available.awaitNanos(delay)
,假設當前線程執行了該段代碼,這時尚未signal,第二個線程也執行了該段代碼,則第二個線程也要被阻塞。多個這時執行該段代碼是沒有做用的,由於只能有一個線程會從take中返回queue[0](由於有lock),其餘線程這時再返回for循環執行時取的queue[0],已經不是以前的queue[0]了,而後又要繼續阻塞。
因此,爲了避免讓多個線程頻繁的作無用的定時等待,這裏增長了leader,若是leader不爲空,則說明隊列中第一個節點已經在等待出隊,這時其它的線程會一直阻塞,減小了無用的阻塞(注意,在finally中調用了signal()來喚醒一個線程,而不是signalAll())。
下面看下poll方法,與take相似,但這裏要提供超時功能:
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) { if (nanos <= 0) return null; else nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(NANOSECONDS); // 若是delay <= 0,說明已經到了任務執行的時間,返回。 if (delay <= 0) return finishPoll(first); // 若是nanos <= 0,說明已經超時,返回null if (nanos <= 0) return null; first = null; // don't retain ref while waiting // nanos < delay 說明須要等待的時間小於任務要執行的延遲時間 // leader != null 說明有其它線程正在對任務進行阻塞 // 這時阻塞當前線程nanos納秒 if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 這裏的timeLeft表示delay減去實際的等待時間 long timeLeft = available.awaitNanos(delay); // 計算剩餘的等待時間 nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
當調用了take或者poll方法可以獲取到任務時,會調用該方法進行返回:
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) { // 數組長度-1 int s = --size; // 取出最後一個節點 RunnableScheduledFuture<?> x = queue[s]; queue[s] = null; // 長度不爲0,則從第一個元素開始排序,目的是要把最後一個節點放到合適的位置上 if (s != 0) siftDown(0, x); setIndex(f, -1); return f; }
siftDown方法使堆從k開始向下調整:
private void siftDown(int k, RunnableScheduledFuture<?> key) { // 根據二叉樹的特性,數組長度除以2,表示取有子節點的索引 int half = size >>> 1; // 判斷索引爲k的節點是否有子節點 while (k < half) { // 左子節點的索引 int child = (k << 1) + 1; RunnableScheduledFuture<?> c = queue[child]; // 右子節點的索引 int right = child + 1; // 若是有右子節點而且左子節點的時間間隔大於右子節點,取時間間隔最小的節點 if (right < size && c.compareTo(queue[right]) > 0) c = queue[child = right]; // 若是key的時間間隔小於等於c的時間間隔,跳出循環 if (key.compareTo(c) <= 0) break; // 設置要移除索引的節點爲其子節點 queue[k] = c; setIndex(c, k); k = child; } // 將key放入索引爲k的位置 queue[k] = key; setIndex(key, k); }
siftDown方法執行時包含兩種狀況,一種是沒有子節點,一種是有子節點(根據half判斷)。例如:
沒有子節點的狀況:
假設初始的堆以下:
假設 k = 3 ,那麼 k = half ,沒有子節點,在執行siftDown方法時直接把索引爲3的節點設置爲數組的最後一個節點:
有子節點的狀況:
假設 k = 0 ,那麼執行如下步驟:
right < size
,這時比較左子節點和右子節點時間間隔的大小,這裏 3 < 7 ,因此 c = queue[child] ;4.由於 half = 3 ,k = 1 ,繼續執行循環,這時的索引變爲:
5.這時再通過如上判斷後,將k的值爲3,最終的結果以下:
6.最後,若是在finishPoll方法中調用的話,會把索引爲0的節點的索引設置爲-1,表示已經刪除了該節點,而且size也減了1,最後的結果以下:
可見,siftdown方法在執行完並非有序的,但能夠發現,子節點的下次執行時間必定比父節點的下次執行時間要大,因爲每次都會取左子節點和右子節點中下次執行時間最小的節點,因此仍是能夠保證在take和poll時出隊是有序的。
public boolean remove(Object x) { final ReentrantLock lock = this.lock; lock.lock(); try { int i = indexOf(x); if (i < 0) return false; setIndex(queue[i], -1); int s = --size; RunnableScheduledFuture<?> replacement = queue[s]; queue[s] = null; if (s != i) { // 從i開始向下調整 siftDown(i, replacement); // 若是queue[i] == replacement,說明i是葉子節點 // 若是是這種狀況,不能保證子節點的下次執行時間比父節點的大 // 這時須要進行一次向上調整 if (queue[i] == replacement) siftUp(i, replacement); } return true; } finally { lock.unlock(); } }
假設初始的堆結構以下:
這時要刪除8的節點,那麼這時 k = 1,key爲最後一個節點:
這時經過上文對siftDown方法的分析,siftDown方法執行後的結果以下:
這時會發現,最後一個節點的值比父節點還要小,因此這裏要執行一次siftUp方法來保證子節點的下次執行時間要比父節點的大,因此最終結果以下:
本文詳細分析了ScheduedThreadPoolExecutor的實現,主要介紹瞭如下方面:
整體來講,ScheduedThreadPoolExecutor的重點是要理解下次執行時間的計算,以及優先隊列的出隊、入隊和刪除的過程,這兩個是理解ScheduedThreadPoolExecutor的關鍵。