本文是源起netty
專欄的第4篇文章,很明顯前3篇文章已經在偏離主題的道路上愈來愈遠。因而乎,我決定:繼續保持……前端
首先看看源碼類註釋中的示例(未改變官方示例邏輯,只是增長了print輸出和註釋)java
import java.time.LocalTime; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; public class ScheduleExecutorServiceDemo { private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5); public static void main(String args[]){ final Runnable beeper = new Runnable() { public void run() { System.out.println(Thread.currentThread().getName()+" >>> "+LocalTime.now().toString()+" >>> beep"); //TODO 沉睡吧,少年 //try { // TimeUnit.SECONDS.sleep(3L); //} catch (InterruptedException e) { // e.printStackTrace(); //} } }; //從0s開始輸出beep,間隔1s final ScheduledFuture<?> beeperHandle = scheduler.scheduleAtFixedRate(beeper, 0, 1, TimeUnit.SECONDS); //10s以後中止beeperHandle的瘋狂輸出行爲 scheduler.schedule(new Runnable() { public void run() { System.out.println("覺悟吧,beeperHandle!I will kill you!"); beeperHandle.cancel(true); } }, 10, TimeUnit.SECONDS); } }
scheduleAtFixedRate
也是該類經常使用的打開方式之一,網上不少文章會拿該方法與scheduleWithFixedDelay
進行對比,對比結果其實和方法名一致:後端
scheduleAtFixedRate //以固定頻率執行 scheduleWithFixedDelay //延遲方式執行,間隔時間=間隔時間入參+任務執行時間
ScheduleExecutorService實則是Timer
的進化版,主要改進了Timer單線程方面的弊端,改進方式天然是線程池,ScheduleExecutorService的好基友ScheduledThreadPoolExecutor
華麗麗登場。其實ScheduledThreadPoolExecutor纔是主角,ScheduleExecutorService扮演的是拋磚引玉中的磚……源碼分析
先看下ScheduledThreadPoolExecutor類的江湖地位:this
既然繼承自ThreadPoolExecutor,確乃線程池無疑。spa
本文以以下方法做爲切入點:public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
線程
方法入參period
(譯:週期)就是scheduleAtFixedRate所指的固定頻率嗎?
這個問題很好驗證,把示例中這部分代碼的註釋去掉就能獲得答案。調試
final Runnable beeper = new Runnable() { public void run() { System.out.println(Thread.currentThread().getName()+" >>> "+LocalTime.now().toString()+" >>> beep"); //TODO 沉睡吧,少年 //try { // TimeUnit.SECONDS.sleep(3L); //} catch (InterruptedException e) { // e.printStackTrace(); //} } };
答案就是,若是方法執行時間大於間隔週期period,則任務的下次執行時間將超過period的設定!rest
執行結果以下,能夠看出任務間隔爲3s,而不是period設置的1snetty
不由好奇,ScheduleExecutorService是怎麼實現的多長時間以後執行下一個任務?有句話叫源碼之下無祕密,so..let's do this !
從ScheduleExecutorService的初始化開始:
private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
追隨調用鏈Executors.newScheduledThreadPool
-> new ScheduledThreadPoolExecutor(corePoolSize)
,進入以下方法:
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue()); //注意最後一個參數 }
線程池中的任務隊列用的new DelayedWorkQueue()
,而DelayedWorkQueue是ScheduledThreadPoolExecutor的內部類。
初始化部分關注到這一點便可,以後會是一些成員變量的賦值,不做解釋。
接下來從scheduleAtFixedRate方法開始,進入它的實現方法:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
Runnable command
被封裝成了ScheduledFutureTask
類,無獨有偶,ScheduledFutureTask是ScheduledThreadPoolExecutor的另一個內部類。看下它的類關係圖:
有沒有發現ScheduledFutureTask實現了Comparable
接口?衆所周知這個接口是以某種規則用來比較大小的,這裏的規則就是任務的開始執行時間——ScheduledFutureTask的一個屬性:
/** The time the task is enabled to execute in nanoTime units */ private long time;
compareTo
方法就是明證:
public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; //focus這裏啊喂!!! if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }
通常來講,這些比較(compare)放在集合中才有意義,那ScheduledFutureTask以後會放在哪一個集合中嗎?有些朋友可能已經猜到了,沒錯,ScheduledFutureTask後續會置於前文提到的DelayedWorkQueue中。
繼續ScheduledThreadPoolExecutor.scheduleAtFixedRate
方法:
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); //醒醒,該你出場了
進入delayedExecute
方法:
private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); //代碼一 - 任務加入DelayedWorkQueue if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); //代碼二 - 任務開始 } }
追蹤 代碼一 位置的調用鏈:
-> DelayedWorkQueue.add
-> offer
-> siftUp(int k, RunnableScheduledFuture<?> key)
private void siftUp(int k, RunnableScheduledFuture<?> key) { while (k > 0) { int parent = (k - 1) >>> 1; RunnableScheduledFuture<?> e = queue[parent]; if (key.compareTo(e) >= 0) break; queue[k] = e; setIndex(e, k); k = parent; } queue[k] = key; setIndex(key, k); }
能夠看到,siftUp方法實現了向DelayedWorkQueue添加任務時(add),開始時間靠後的任務(ScheduledFutureTask)會放在後面。
ok,回到 代碼二 位置的ensurePrestart
方法,接着追:ensurePrestart
-> addWorker(Runnable firstTask, boolean core)
濃縮版addWorker方法以下:
private boolean addWorker(Runnable firstTask, boolean core){ ... //省略不少的驗證邏輯 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try{ w = new Worker(firstTask); //代碼三 - 封裝成worker,new Worker會從線程池中獲取線程 final Thread t = w.thread; if (t != null){ final ReentrantLock mainLock = this.mainLock; mainLock.lock(); ... //省略部分狀態控制邏輯 if (workerAdded){ t.start(); //代碼四 - 執行Worker的run方法 workerStarted = true; } } }finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
這裏發現firstTask(ScheduledFutureTask)再次被封裝成了Worker
(代碼三),那麼t.start()
(代碼四),天然會執行Worker的run方法,跟下Worker.run
方法:Worker.run
-> runWorker(Worker w)
濃縮後的runWorker
:
final void runWorker(Worker w){ ... //省略部分代碼 try{ while (task != null || (task = getTask()) != null){ //代碼五 - getTask()獲取任務 ... //省略部分代碼 task.run(); //代碼六 - 任務執行 ... //省略部分代碼 } completedAbruptly = false; }finally{ processWorkerExit(w, completedAbruptly); } }
老規矩,5、六兩處關鍵代碼分別看一下:
getTask
最終定位到DelayedWorkQueue.take
方法,這裏只分析延時任務的執行狀況public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); first = null; // don't retain ref while waiting if (leader != null) //代碼八 - leader線程就是下一次的工做線程 available.await(); else { Thread thisThread = Thread.currentThread(); //代碼七 - 指定leader線程 leader = thisThread; try { available.awaitNanos(delay); //等待 } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
對於延時任務來講,線程池中第一個調用take的線程進來會做爲leader線程(代碼七),而後等待。結束等待的位置在哪?在ScheduledFutureTask.run
的調用中!(我做斷點調試的時候,這個等待時間老是很大,通常兩個小時以上,彷佛直接用await就成?這一點確有疑問)。
而線程池中的其它線程調用take時,發現leader已經被第一個線程搶了,只能等着(代碼八)
task.run()
也就是ScheduledFutureTask.run
public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { //對於延時任務,會進入這個分支 setNextRunTime(); reExecutePeriodic(outerTask); } }
對於延時任務,會執行ScheduledFutureTask.super.runAndReset()
:
protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { //代碼九 - 阻塞式等待beeper完成 c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; }
runAndReset方法會等待最初定義的beeper邏輯執行完成(代碼九),這也解釋了爲何scheduleAtFixedRate
的下次任務執行時間會有可能超過參數period
的設定!
而後調用reExecutePeriodic
:
void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true)) { super.getQueue().add(task); //隊列中再次加入任務 if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); //再次回到ensurePrestart方法 } }
reExecutePeriodic
方法看上去是否是似曾相識,與本小節(3.延時執行)開端的delayedExecute
方法對比下:
private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); //任務加入DelayedWorkQueue if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); //任務開始 } }
都是加入隊列,而後任務開始!
而DelayedWorkQueue.add
中到底作了什麼?以前沒有分析,在這裏看一下:DelayedWorkQueue.add
-> offer
public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) grow(); size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0); } else { siftUp(i, e); } if (queue[0] == e) { leader = null; //將leader賦值清除 available.signal(); //代碼十 - 通知線程 } } finally { lock.unlock(); } return true; }
能夠看到,就是在offer
方法(代碼十),將DelayedWorkQueue.take
中的available.awaitNanos(delay)
喚醒了!
是否是已經繞暈了?很正常,由於源碼終歸是須要本身去讀個幾遍才能理清整個脈絡。因此老鐵們,加油!
最後的總結仍是不能缺乏的,一個定時任務的執行流程是這樣的:
1.任務開始時,將任務ScheduledFutureTask
放入隊列DelayedWorkQueue
。任務放入過程會計算該任務的開始執行時間,執行時間靠前的放入隊列的前端,執行時間靠後的放入隊列的後端。
2.以後的ensurePrestart
方法,先從線程池中獲取線程,該線程會從隊列DelayedWorkQueue
中獲取ScheduledFutureTask
。
獲取過程DelayedWorkQueue.take
先計算任務的延時時間delay
,有兩種狀況:
delay>0 須要延時,出現以下局面:
long delay = first.getDelay(NANOSECONDS); //計算延時時間delay //已不須要延時,當即獲取任務 if (delay <= 0) return finishPoll(first); first = null; // don't retain ref while waiting //須要延時的任務(與此同時有任務正在執行) if (leader != null) //其它線程進來時,有leader線程存在了,等待 available.await(); else { Thread thisThread = Thread.currentThread(); //第一個進入這裏的線程會成爲leader leader = thisThread; try { available.awaitNanos(delay); //等待 } finally { if (leader == thisThread) leader = null; } }
3.獲取任務後,進入執行環節Worker.run
-> ScheduledFutureTask.run
。執行過程會阻塞式等待任務完成,這也是任務執行時間可能會超過period的緣由!任務執行結束會再次放入任務,這樣又回到步驟1,反覆執行。