【源起Netty 外傳】ScheduledThreadPoolExecutor源碼解讀

引言

本文是源起netty專欄的第4篇文章,很明顯前3篇文章已經在偏離主題的道路上愈來愈遠。因而乎,我決定:繼續保持……前端

使用

首先看看源碼類註釋中的示例(未改變官方示例邏輯,只是增長了print輸出和註釋)java

import java.time.LocalTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class ScheduleExecutorServiceDemo {
    private final static ScheduledExecutorService scheduler =
            Executors.newScheduledThreadPool(5);

    public static void main(String args[]){
        final Runnable beeper = new Runnable() {
            public void run() {
                System.out.println(Thread.currentThread().getName()+" >>> "+LocalTime.now().toString()+" >>> beep");
                
                //TODO 沉睡吧,少年
                //try {
                //    TimeUnit.SECONDS.sleep(3L);
                //} catch (InterruptedException e) {
                //    e.printStackTrace();
                //}
            }
        };

        //從0s開始輸出beep,間隔1s
        final ScheduledFuture<?> beeperHandle =
                scheduler.scheduleAtFixedRate(beeper, 0, 1, TimeUnit.SECONDS);

        //10s以後中止beeperHandle的瘋狂輸出行爲
        scheduler.schedule(new Runnable() {
            public void run() {
                System.out.println("覺悟吧,beeperHandle!I will kill you!");
                beeperHandle.cancel(true);
            }
        }, 10, TimeUnit.SECONDS);
    }
}

scheduleAtFixedRate也是該類經常使用的打開方式之一,網上不少文章會拿該方法與scheduleWithFixedDelay進行對比,對比結果其實和方法名一致:後端

scheduleAtFixedRate    //以固定頻率執行
scheduleWithFixedDelay    //延遲方式執行,間隔時間=間隔時間入參+任務執行時間

ScheduleExecutorService實則是Timer的進化版,主要改進了Timer單線程方面的弊端,改進方式天然是線程池,ScheduleExecutorService的好基友ScheduledThreadPoolExecutor華麗麗登場。其實ScheduledThreadPoolExecutor纔是主角,ScheduleExecutorService扮演的是拋磚引玉中的磚……源碼分析

先看下ScheduledThreadPoolExecutor類的江湖地位:
clipboard.pngthis

既然繼承自ThreadPoolExecutor,確乃線程池無疑。spa

疑問

本文以以下方法做爲切入點:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)線程

方法入參period(譯:週期)就是scheduleAtFixedRate所指的固定頻率嗎?
這個問題很好驗證,把示例中這部分代碼的註釋去掉就能獲得答案。調試

final Runnable beeper = new Runnable() {
    public void run() {
        System.out.println(Thread.currentThread().getName()+" >>> "+LocalTime.now().toString()+" >>> beep");
                
        //TODO 沉睡吧,少年
        //try {
        //    TimeUnit.SECONDS.sleep(3L);
        //} catch (InterruptedException e) {
        //    e.printStackTrace();
        //}
    }
};

答案就是,若是方法執行時間大於間隔週期period,則任務的下次執行時間將超過period的設定!rest

執行結果以下,能夠看出任務間隔爲3s,而不是period設置的1s
clipboard.pngnetty

不由好奇,ScheduleExecutorService是怎麼實現的多長時間以後執行下一個任務?有句話叫源碼之下無祕密,so..let's do this !

源碼分析

1.初始化

從ScheduleExecutorService的初始化開始:

private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);

追隨調用鏈Executors.newScheduledThreadPool -> new ScheduledThreadPoolExecutor(corePoolSize),進入以下方法:

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());  //注意最後一個參數
}

線程池中的任務隊列用的new DelayedWorkQueue(),而DelayedWorkQueue是ScheduledThreadPoolExecutor的內部類
初始化部分關注到這一點便可,以後會是一些成員變量的賦值,不做解釋。

2.任務封裝

接下來從scheduleAtFixedRate方法開始,進入它的實現方法:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

Runnable command被封裝成了ScheduledFutureTask類,無獨有偶,ScheduledFutureTask是ScheduledThreadPoolExecutor的另一個內部類。看下它的類關係圖:
clipboard.png

有沒有發現ScheduledFutureTask實現了Comparable接口?衆所周知這個接口是以某種規則用來比較大小的,這裏的規則就是任務的開始執行時間——ScheduledFutureTask的一個屬性:

/** The time the task is enabled to execute in nanoTime units */
private long time;

compareTo方法就是明證:

public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        long diff = time - x.time;    //focus這裏啊喂!!!
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

通常來講,這些比較(compare)放在集合中才有意義,那ScheduledFutureTask以後會放在哪一個集合中嗎?有些朋友可能已經猜到了,沒錯,ScheduledFutureTask後續會置於前文提到的DelayedWorkQueue中。

3.延時執行

繼續ScheduledThreadPoolExecutor.scheduleAtFixedRate方法:

ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);    //醒醒,該你出場了

進入delayedExecute方法:

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);
    else {
        super.getQueue().add(task);    //代碼一 - 任務加入DelayedWorkQueue
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            ensurePrestart();    //代碼二 - 任務開始
    }
}

追蹤 代碼一 位置的調用鏈:
-> DelayedWorkQueue.add -> offer -> siftUp(int k, RunnableScheduledFuture<?> key)

private void siftUp(int k, RunnableScheduledFuture<?> key) {
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        RunnableScheduledFuture<?> e = queue[parent];
        if (key.compareTo(e) >= 0)
            break;
        queue[k] = e;
        setIndex(e, k);
        k = parent;
    }
    queue[k] = key;
    setIndex(key, k);
}

能夠看到,siftUp方法實現了向DelayedWorkQueue添加任務時(add),開始時間靠後的任務(ScheduledFutureTask)會放在後面

ok,回到 代碼二 位置的ensurePrestart方法,接着追:
ensurePrestart -> addWorker(Runnable firstTask, boolean core)

濃縮版addWorker方法以下:

private boolean addWorker(Runnable firstTask, boolean core){
    ...    //省略不少的驗證邏輯

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try{
        w = new Worker(firstTask);    //代碼三 - 封裝成worker,new Worker會從線程池中獲取線程
        final Thread t = w.thread;
        if (t != null){
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            
            ...    //省略部分狀態控制邏輯
            
            if (workerAdded){
                t.start();    //代碼四 - 執行Worker的run方法
                workerStarted = true;
            }
        }
    }finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

這裏發現firstTask(ScheduledFutureTask)再次被封裝成了Worker(代碼三),那麼t.start()(代碼四),天然會執行Worker的run方法,跟下Worker.run方法:Worker.run -> runWorker(Worker w)

濃縮後的runWorker

final void runWorker(Worker w){
    
    ...    //省略部分代碼
    
    try{
        while (task != null || (task = getTask()) != null){    //代碼五 -  getTask()獲取任務
            
            ...    //省略部分代碼
                          
            task.run();    //代碼六 - 任務執行
            
            ...    //省略部分代碼
        }
        completedAbruptly = false;
    }finally{
        processWorkerExit(w, completedAbruptly);
    }
}

老規矩,5、六兩處關鍵代碼分別看一下:

  • 代碼五 getTask最終定位到DelayedWorkQueue.take方法,這裏只分析延時任務的執行狀況
public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return finishPoll(first);
                first = null; // don't retain ref while waiting
                if (leader != null)    //代碼八 - leader線程就是下一次的工做線程
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();    //代碼七 - 指定leader線程
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);    //等待
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

對於延時任務來講,線程池中第一個調用take的線程進來會做爲leader線程(代碼七),而後等待。結束等待的位置在哪?在ScheduledFutureTask.run的調用中!(我做斷點調試的時候,這個等待時間老是很大,通常兩個小時以上,彷佛直接用await就成?這一點確有疑問)。
而線程池中的其它線程調用take時,發現leader已經被第一個線程搶了,只能等着(代碼八)

  • 回到 代碼六 位置,task.run()也就是ScheduledFutureTask.run
public void run() {
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    else if (!periodic)
        ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) {    //對於延時任務,會進入這個分支
        setNextRunTime();
        reExecutePeriodic(outerTask);
    }
}

對於延時任務,會執行ScheduledFutureTask.super.runAndReset()

protected boolean runAndReset() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return false;
    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        if (c != null && s == NEW) {
            try {
                //代碼九 - 阻塞式等待beeper完成
                c.call(); // don't set result
                ran = true;
            } catch (Throwable ex) {
                setException(ex);
            }
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    return ran && s == NEW;
}

runAndReset方法會等待最初定義的beeper邏輯執行完成(代碼九),這也解釋了爲何scheduleAtFixedRate的下次任務執行時間會有可能超過參數period的設定!

而後調用reExecutePeriodic

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    if (canRunInCurrentRunState(true)) {
        super.getQueue().add(task);    //隊列中再次加入任務
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            ensurePrestart();    //再次回到ensurePrestart方法
    }
}

reExecutePeriodic方法看上去是否是似曾相識,與本小節(3.延時執行)開端的delayedExecute方法對比下:

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);
    else {
        super.getQueue().add(task);    //任務加入DelayedWorkQueue
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            ensurePrestart();    //任務開始
    }
}

都是加入隊列,而後任務開始!

DelayedWorkQueue.add中到底作了什麼?以前沒有分析,在這裏看一下:DelayedWorkQueue.add -> offer

public boolean offer(Runnable x) {
    if (x == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = size;
        if (i >= queue.length)
            grow();
        size = i + 1;
        if (i == 0) {
            queue[0] = e;
            setIndex(e, 0);
        } else {
            siftUp(i, e);
        }
        if (queue[0] == e) {
            leader = null;    //將leader賦值清除
            available.signal();    //代碼十 - 通知線程
        }
    } finally {
        lock.unlock();
    }
    return true;
}

能夠看到,就是在offer方法(代碼十),將DelayedWorkQueue.take中的available.awaitNanos(delay)喚醒了!

總結

是否是已經繞暈了?很正常,由於源碼終歸是須要本身去讀個幾遍才能理清整個脈絡。因此老鐵們,加油!

最後的總結仍是不能缺乏的,一個定時任務的執行流程是這樣的:

1.任務開始時,將任務ScheduledFutureTask放入隊列DelayedWorkQueue。任務放入過程會計算該任務的開始執行時間,執行時間靠前的放入隊列的前端,執行時間靠後的放入隊列的後端。

2.以後的ensurePrestart方法,先從線程池中獲取線程,該線程會從隊列DelayedWorkQueue中獲取ScheduledFutureTask

獲取過程DelayedWorkQueue.take先計算任務的延時時間delay ,有兩種狀況:

  • delay<=0 已不須要延時,當即獲取任務
  • delay>0 須要延時,出現以下局面:

    • 第一個進入的線程成爲leader
    • 其它線程等待
long delay = first.getDelay(NANOSECONDS);    //計算延時時間delay 

//已不須要延時,當即獲取任務
if (delay <= 0)
    return finishPoll(first);    
first = null; // don't retain ref while waiting

//須要延時的任務(與此同時有任務正在執行)
if (leader != null)    //其它線程進來時,有leader線程存在了,等待
    available.await();
else {
    Thread thisThread = Thread.currentThread();    //第一個進入這裏的線程會成爲leader
    leader = thisThread;
    try {
        available.awaitNanos(delay);    //等待
    } finally {
        if (leader == thisThread)
            leader = null;
    }
}

3.獲取任務後,進入執行環節Worker.run -> ScheduledFutureTask.run。執行過程會阻塞式等待任務完成,這也是任務執行時間可能會超過period的緣由!任務執行結束會再次放入任務,這樣又回到步驟1,反覆執行。

感謝

分析Java延遲與週期任務的實現原理描述

相關文章
相關標籤/搜索