Java多線程進階(四一)—— J.U.C之executors框架:ScheduledThreadPoolExecutor

圖片描述

本文首發於一世流雲專欄: https://segmentfault.com/blog...

1、ScheduledThreadPoolExecutor簡介

executors框架概述一節中,咱們曾經提到過一種可對任務進行延遲/週期性調度的執行器(Executor),這類Executor通常實現了ScheduledExecutorService這個接口。ScheduledExecutorService在普通執行器接口(ExecutorService)的基礎上引入了Future模式,使得能夠限時或週期性地調度任務。java

ScheduledThreadPoolExecutor的類繼承關係以下圖,該圖中除了本節要講解的ScheduledThreadPoolExecutor外,其它部分已經在前2節詳細介紹過了:segmentfault

clipboard.png

從上圖中能夠看到,ScheduledThreadPoolExecutor實際上是繼承了ThreadPoolExecutor這個普通線程池,咱們知道ThreadPoolExecutor中提交的任務都是實現了Runnable接口,可是ScheduledThreadPoolExecutor比較特殊,因爲要知足任務的延遲/週期調度功能,它會對全部的Runnable任務都進行包裝,包裝成一個RunnableScheduledFuture任務。設計模式

clipboard.png

RunnableScheduledFuture是Future模式中的一個接口,關於Future模式,咱們後續會專門章節講解,這裏只要知道RunnableScheduledFuture的做用就是能夠異步地執行【延時/週期任務】。

另外,咱們知道在ThreadPoolExecutor中,須要指定一個阻塞隊列做爲任務隊列。ScheduledThreadPoolExecutor中也同樣,不過特殊的是,ScheduledThreadPoolExecutor中的任務隊列是一種特殊的延時隊列(DelayQueue)。多線程

咱們曾經在juc-collections框架中,分析過該種阻塞隊列,DelayQueue底層基於優先隊列(PriorityQueue)實現,是一種「堆」結構,經過該種阻塞隊列能夠實現任務的延遲到期執行(即每次從隊列獲取的任務都是最早到期的任務)。框架

ScheduledThreadPoolExecutor在內部定義了DelayQueue的變種——DelayedWorkQueue,它和DelayQueue相似,只不過要求全部入隊元素必須實現RunnableScheduledFuture接口。異步

2、ScheduledThreadPoolExecutor基本原理

構造線程池

咱們先來看下ScheduledThreadPoolExecutor的構造,其實在executors框架概述中講Executors時已經接觸過了,Executors使用newScheduledThreadPool工廠方法建立ScheduledThreadPoolExecutor:性能

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

咱們來看下ScheduledThreadPoolExecutor的構造器,內部其實都是調用了父類ThreadPoolExecutor的構造器,這裏最須要注意的就是任務隊列的選擇——DelayedWorkQueue,咱們後面會詳細介紹它的實現原理。this

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}
 
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory);
}
 
public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler);
}
 
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler);
}

線程池的調度

ScheduledThreadPoolExecutor的核心調度方法是schedulescheduleAtFixedRatescheduleWithFixedDelay,咱們經過schedule方法來看下整個調度流程:spa

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null,
            triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}

上述的decorateTask方法把Runnable任務包裝成ScheduledFutureTask,用戶能夠根據本身的須要覆寫該方法:線程

protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
    return task;
}
注意: ScheduledFutureTask是RunnableScheduledFuture接口的實現類,任務經過 period字段來表示任務類型
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
 
    /**
     * 任務序號, 自增惟一
     */
    private final long sequenceNumber;
 
    /**
     * 首次執行的時間點
     */
    private long time;
 
    /**
     * 0: 非週期任務
     * >0: fixed-rate任務
     * <0: fixed-delay任務
     */
    private final long period;
 
    /**
     * 在堆中的索引
     */
    int heapIndex;
 
    ScheduledFutureTask(Runnable r, V result, long ns) {
        super(r, result);
        this.time = ns;
        this.period = 0;
        this.sequenceNumber = sequencer.getAndIncrement();
    }
    
    // ...
}
ScheduledThreadPoolExecutor中的任務隊列—— DelayedWorkQueue,保存的元素就是ScheduledFutureTask。DelayedWorkQueue是一種 堆結構,time最小的任務會排在堆頂(表示最先過時),每次出隊都是取堆頂元素,這樣最快到期的任務就會被先執行。若是兩個ScheduledFutureTask的time相同,就比較它們的序號——sequenceNumber,序號小的表明先被提交,因此就會先執行。

schedule的核心是其中的delayedExecute方法:

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())   // 線程池已關閉
        reject(task);   // 任務拒絕策略
    else {
        super.getQueue().add(task);                 // 將任務入隊
 
        // 若是線程池已關閉且該任務是非週期任務, 則將其從隊列移除
        if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
            task.cancel(false);  // 取消任務
        else
            ensurePrestart();   // 添加一個工做線程
    }
}

經過delayedExecute能夠看出,ScheduledThreadPoolExecutor的整個任務調度流程大體以下圖:

clipboard.png

咱們來分析這個過程:

  1. 首先,任務被提交到線程池後,會判斷線程池的狀態,若是不是RUNNING狀態會執行拒絕策略。
  2. 而後,將任務添加到阻塞隊列中。(注意,因爲DelayedWorkQueue是無界隊列,因此必定會add成功)
  3. 而後,會建立一個工做線程,加入到核心線程池或者非核心線程池:

    void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }

    經過ensurePrestart能夠看到,若是核心線程池未滿,則新建的工做線程會被放到核心線程池中。若是核心線程池已經滿了,ScheduledThreadPoolExecutor不會像ThreadPoolExecutor那樣再去建立歸屬於非核心線程池的工做線程,而是直接返回。也就是說,在ScheduledThreadPoolExecutor中,一旦核心線程池滿了,就不會再去建立工做線程。

這裏思考一點,何時會執行else if (wc == 0)建立一個歸屬於非核心線程池的工做線程?
答案是,當經過setCorePoolSize方法設置核心線程池大小爲0時,這裏必需要保證任務可以被執行,因此會建立一個工做線程,放到非核心線程池中。

最後,線程池中的工做線程會去任務隊列獲取任務並執行,當任務被執行完成後,若是該任務是週期任務,則會重置time字段,並從新插入隊列中,等待下次執行。這裏注意從隊列中獲取元素的方法:

  • 對於核心線程池中的工做線程來講,若是沒有超時設置(allowCoreThreadTimeOut == false),則會使用阻塞方法take獲取任務(由於沒有超時限制,因此會一直等待直到隊列中有任務);若是設置了超時,則會使用poll方法(方法入參須要超時時間),超時還沒拿到任務的話,該工做線程就會被回收。
  • 對於非工做線程來講,都是調用poll獲取隊列元素,超時取不到任務就會被回收。



上述就是ScheduledThreadPoolExecutor的核心調度流程,經過咱們的分析能夠看出,相比ThreadPoolExecutor,ScheduledThreadPoolExecutor主要有如下幾點不一樣:

  1. 整體的調度控制流程略有區別;
  2. 任務的執行方式有所區別;
  3. 任務隊列的選擇不一樣。

最後,咱們來看下ScheduledThreadPoolExecutor中的延時隊列——DelayedWorkQueue

延時隊列

DelayedWorkQueue,該隊列和已經介紹過的DelayQueue區別不大,只不過隊列元素是RunnableScheduledFuture:

static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
    private static final int INITIAL_CAPACITY = 16;
    private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
    private int size = 0;
 
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition available = lock.newCondition();
 
    private Thread leader = null;
 
    // ...
}

DelayedWorkQueue是一個無界隊列,在隊列元素滿了之後會自動擴容,它並無像DelayQueue那樣,將隊列操做委託給PriorityQueue,而是本身從新實現了一遍堆的核心操做——上浮、下沉。我這裏再也不贅述這些堆操做,讀者能夠參考PriorityBlockingQueue自行閱讀源碼。

咱們關鍵來看下addtakepoll這三個隊列方法,由於ScheduledThreadPoolExecutor的核心調度流程中使用到了這三個方法:

public boolean add(Runnable e) {
    return offer(e);
}
 
public boolean offer(Runnable e, long timeout, TimeUnit unit) {
    return offer(e);
}

add、offer內部都調用了下面這個方法:

public boolean offer(Runnable x) {
    if (x == null)
        throw new NullPointerException();
    
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>) x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = size;           // 隊列已滿, 擴容
        if (i >= queue.length)
            grow();
        size = i + 1;
        if (i == 0) {
            queue[0] = e;
            setIndex(e, 0);
        } else {
            siftUp(i, e);       // 堆上浮操做
        }
        
        if (queue[0] == e) {    // 當前元素是首個元素
            leader = null;
            available.signal(); // 喚醒一個等待線程
        }   
    } finally {
        lock.unlock();
    }
    return true;
}

take方法:

public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (; ; ) {
            RunnableScheduledFuture<?> first = queue[0];
            if (first == null)          // 隊列爲空
                available.await();      // 等待元素入隊
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)         // 元素已到期
                    return finishPoll(first);
 
                // 執行到此處, 說明隊首元素還未到期
                first = null;
                if (leader != null)
                    available.await();
                else {
                    // 當前線程成功leader線程
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

注意:上述leader表示一個等待獲取隊首元素的出隊線程,這是一種稱爲「Leader-Follower pattern」的多線程設計模式(讀者能夠參考DelayQueue中的講解)。

每次出隊元素時,若是隊列爲空或者隊首元素還未到期,線程就會在condition條件隊列等待。通常的思路是無限等待,直到出現一個入隊線程,入隊元素後將一個出隊線程喚醒。
爲了提高性能,當隊列非空時,用 leader保存第一個到來並嘗試出隊的線程,並設置它的等待時間爲隊首元素的剩餘期限,這樣當元素過時後,線程也就本身喚醒了,不須要入隊線程喚醒。這樣作的好處就是提高一些性能。

3、總結

本節介紹了ScheduledThreadPoolExecutor,它是對普通線程池ThreadPoolExecutor的擴展,增長了延時調度、週期調度任務的功能。歸納下ScheduledThreadPoolExecutor的主要特色:

  1. 對Runnable任務進行包裝,封裝成ScheduledFutureTask,該類任務支持任務的週期執行、延遲執行;
  2. 採用DelayedWorkQueue做爲任務隊列。該隊列是無界隊列,因此任務必定能添加成功,可是當工做線程嘗試從隊列取任務執行時,只有最早到期的任務會出隊,若是沒有任務或者隊首任務未到期,則工做線程會阻塞;
  3. ScheduledThreadPoolExecutor的任務調度流程與ThreadPoolExecutor略有區別,最大的區別就是,先往隊列添加任務,而後建立工做線程執行任務。

另外,maximumPoolSize這個參數對ScheduledThreadPoolExecutor其實並無做用,由於除非把corePoolSize設置爲0,這種狀況下ScheduledThreadPoolExecutor只會建立一個屬於非核心線程池的工做線程;不然,ScheduledThreadPoolExecutor只會新建歸屬於核心線程池的工做線程,一旦核心線程池滿了,就再也不新建工做線程。

相關文章
相關標籤/搜索