線程池之ScheduledThreadPoolExecutor

1. ScheduledThreadPoolExecutor簡介

ScheduledThreadPoolExecutor能夠用來在給定延時後執行異步任務或者週期性執行任務,相對於任務調度的Timer來講,其功能更增強大,Timer只能使用一個後臺線程執行任務,而ScheduledThreadPoolExecutor則能夠經過構造函數來指定後臺線程的個數。ScheduledThreadPoolExecutor類的UML圖以下:數組

ScheduledThreadPoolExecutor類的UML圖.png

  1. 從UML圖能夠看出,ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,也就是說ScheduledThreadPoolExecutor擁有execute()和submit()提交異步任務的基礎功能,關於ThreadPoolExecutor能夠看這篇文章。可是,ScheduledThreadPoolExecutor類實現了ScheduledExecutorService,該接口定義了ScheduledThreadPoolExecutor可以延時執行任務和週期執行任務的功能;
  2. ScheduledThreadPoolExecutor也兩個重要的內部類:DelayedWorkQueueScheduledFutureTask。能夠看出DelayedWorkQueue實現了BlockingQueue接口,也就是一個阻塞隊列,ScheduledFutureTask則是繼承了FutureTask類,也表示該類用於返回異步任務的結果。這兩個關鍵類,下面會具體詳細來看。

1.1 構造方法

ScheduledThreadPoolExecutor有以下幾個構造方法:數據結構

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
};

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
};
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), handler);
};

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}
複製代碼

能夠看出因爲ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,它的構造方法其實是調用了ThreadPoolExecutor,對ThreadPoolExecutor的介紹能夠能夠看這篇文章,理解ThreadPoolExecutor構造方法的幾個參數的意義後,理解這就很容易了。能夠看出,ScheduledThreadPoolExecutor的核心線程池的線程個數爲指定的corePoolSize,當核心線程池的線程個數達到corePoolSize後,就會將任務提交給有界阻塞隊列DelayedWorkQueue,對DelayedWorkQueue在下面進行詳細介紹,線程池容許最大的線程個數爲Integer.MAX_VALUE,也就是說理論上這是一個大小無界的線程池。異步

1.2 特有方法

ScheduledThreadPoolExecutor實現了ScheduledExecutorService接口,該接口定義了可延時執行異步任務和可週期執行異步任務的特有功能,相應的方法分別爲:ide

//達到給定的延時時間後,執行任務。這裏傳入的是實現Runnable接口的任務,
//所以經過ScheduledFuture.get()獲取結果爲null
public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);
//達到給定的延時時間後,執行任務。這裏傳入的是實現Callable接口的任務,
//所以,返回的是任務的最終計算結果
 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

//是以上一個任務開始的時間計時,period時間過去後,
//檢測上一個任務是否執行完畢,若是上一個任務執行完畢,
//則當前任務當即執行,若是上一個任務沒有執行完畢,則須要等上一個任務執行完畢後當即執行
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
//當達到延時時間initialDelay後,任務開始執行。上一個任務執行結束後到下一次
//任務執行,中間延時時間間隔爲delay。以這種方式,週期性執行任務。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
複製代碼

2. 可週期性執行的任務---ScheduledFutureTask

ScheduledThreadPoolExecutor最大的特點是可以週期性執行異步任務,當調用schedule,scheduleAtFixedRate和scheduleWithFixedDelay方法時,其實是將提交的任務轉換成的ScheduledFutureTask類,從源碼就能夠看出。以schedule方法爲例:函數

public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}
複製代碼

能夠看出,經過decorateTask會將傳入的Runnable轉換成ScheduledFutureTask類。線程池最大做用是將任務和線程進行解耦,線程主要是任務的執行者,而任務也就是如今所說的ScheduledFutureTask。緊接着,會想到任何線程執行任務,總會調用run()方法。爲了保證ScheduledThreadPoolExecutor可以延時執行任務以及可以週期性執行任務,ScheduledFutureTask重寫了run方法:源碼分析

public void run() {
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    else if (!periodic)
		//若是不是週期性執行任務,則直接調用run方法
        ScheduledFutureTask.super.run();
		//若是是週期性執行任務的話,須要重設下一次執行任務的時間
    else if (ScheduledFutureTask.super.runAndReset()) {
        setNextRunTime();
        reExecutePeriodic(outerTask);
    }
}
複製代碼

從源碼能夠很明顯的看出,在重寫的run方法中會先if (!periodic)判斷當前任務是不是週期性任務,若是不是的話就直接調用run()方法;不然的話執行setNextRunTime()方法重設下一次任務執行的時間,並經過reExecutePeriodic(outerTask)方法將下一次待執行的任務放置到DelayedWorkQueue中。post

所以,能夠得出結論:ScheduledFutureTask最主要的功能是根據當前任務是否具備週期性,對異步任務進行進一步封裝。若是不是週期性任務(調用schedule方法)則直接經過run()執行,如果週期性任務,則須要在每一次執行完後,重設下一次執行的時間,而後將下一次任務繼續放入到阻塞隊列中。idea

3. DelayedWorkQueue

在ScheduledThreadPoolExecutor中還有另外的一個重要的類就是DelayedWorkQueue。爲了實現其ScheduledThreadPoolExecutor可以延時執行異步任務以及可以週期執行任務,DelayedWorkQueue進行相應的封裝。DelayedWorkQueue是一個基於堆的數據結構,相似於DelayQueue和PriorityQueue。在執行定時任務的時候,每一個任務的執行時間都不一樣,因此DelayedWorkQueue的工做就是按照執行時間的升序來排列,執行時間距離當前時間越近的任務在隊列的前面。spa

爲何要使用DelayedWorkQueue呢?線程

定時任務執行時須要取出最近要執行的任務,因此任務在隊列中每次出隊時必定要是當前隊列中執行時間最靠前的,因此天然要使用優先級隊列。

DelayedWorkQueue是一個優先級隊列,它能夠保證每次出隊的任務都是當前隊列中執行時間最靠前的,因爲它是基於堆結構的隊列,堆結構在執行插入和刪除操做時的最壞時間複雜度是 O(logN)。

DelayedWorkQueue的數據結構

//初始大小
private static final int INITIAL_CAPACITY = 16;
//DelayedWorkQueue是由一個大小爲16的數組組成,數組元素爲實現RunnableScheduleFuture接口的類
//實際上爲ScheduledFutureTask
private RunnableScheduledFuture<?>[] queue =
    new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
複製代碼

能夠看出DelayedWorkQueue底層是採用數組構成的,關於DelayedWorkQueue能夠看這篇博主的文章,很詳細。

關於DelayedWorkQueue咱們能夠得出這樣的結論:DelayedWorkQueue是基於堆的數據結構,按照時間順序將每一個任務進行排序,將待執行時間越近的任務放在在隊列的隊頭位置,以便於最早進行執行

4.ScheduledThreadPoolExecutor執行過程

如今咱們對ScheduledThreadPoolExecutor的兩個內部類ScheduledFutueTask和DelayedWorkQueue進行了瞭解,實際上這也是線程池工做流程中最重要的兩個關鍵因素:任務以及阻塞隊列。如今咱們來看下ScheduledThreadPoolExecutor提交一個任務後,總體的執行過程。以ScheduledThreadPoolExecutor的schedule方法爲例,具體源碼爲:

public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
	//將提交的任務轉換成ScheduledFutureTask
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    //延時執行任務ScheduledFutureTask
	delayedExecute(t);
    return t;
}
複製代碼

方法很容易理解,爲了知足ScheduledThreadPoolExecutor可以延時執行任務和能週期執行任務的特性,會先將實現Runnable接口的類轉換成ScheduledFutureTask。而後會調用delayedExecute方法進行執行任務,這個方法也是關鍵方法,來看下源碼:

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
		//若是當前線程池已經關閉,則拒絕任務
        reject(task);
    else {
		//將任務放入阻塞隊列中
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
			//保證至少有一個線程啓動,即便corePoolSize=0
            ensurePrestart();
    }
}
複製代碼

delayedExecute方法的主要邏輯請看註釋,能夠看出該方法的重要邏輯會是在ensurePrestart()方法中,它的源碼爲:

void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}
複製代碼

能夠看出該方法邏輯很簡單,關鍵在於它所調用的addWorker方法,該方法主要功能:新建Worker類,當執行任務時,就會調用被Worker所重寫的run方法,進而會繼續執行runWorker方法。在runWorker方法中會調用getTask方法從阻塞隊列中不斷的去獲取任務進行執行,直到從阻塞隊列中獲取的任務爲null的話,線程結束終止。addWorker方法是ThreadPoolExecutor類中的方法,對ThreadPoolExecutor的源碼分析能夠看這篇文章,很詳細。

5.總結

  1. ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor類,所以,總體上功能一致,線程池主要負責建立線程(Worker類),線程從阻塞隊列中不斷獲取新的異步任務,直到阻塞隊列中已經沒有了異步任務爲止。可是相較於ThreadPoolExecutor來講,ScheduledThreadPoolExecutor具備延時執行任務和可週期性執行任務的特性,ScheduledThreadPoolExecutor從新設計了任務類ScheduleFutureTask,ScheduleFutureTask重寫了run方法使其具備可延時執行和可週期性執行任務的特性。另外,阻塞隊列DelayedWorkQueue是可根據優先級排序的隊列,採用了堆的底層數據結構,使得與當前時間相比,待執行時間越靠近的任務放置隊頭,以便線程可以獲取到任務進行執行;

  2. 線程池不管是ThreadPoolExecutor仍是ScheduledThreadPoolExecutor,在設計時的三個關鍵要素是:任務,執行者以及任務結果。它們的設計思想也是徹底將這三個關鍵要素進行了解耦。

    執行者

    任務的執行機制,徹底交由Worker類,也就是進一步了封裝了Thread。向線程池提交任務,不管爲ThreadPoolExecutor的execute方法和submit方法,仍是ScheduledThreadPoolExecutor的schedule方法,都是先將任務移入到阻塞隊列中,而後經過addWork方法新建了Work類,並經過runWorker方法啓動線程,並不斷的從阻塞對列中獲取異步任務執行交給Worker執行,直至阻塞隊列中沒法取到任務爲止。

    任務

    在ThreadPoolExecutor和ScheduledThreadPoolExecutor中任務是指實現了Runnable接口和Callable接口的實現類。ThreadPoolExecutor中會將任務轉換成FutureTask類,而在ScheduledThreadPoolExecutor中爲了實現可延時執行任務和週期性執行任務的特性,任務會被轉換成ScheduledFutureTask類,該類繼承了FutureTask,並重寫了run方法。

    任務結果

    在ThreadPoolExecutor中提交任務後,獲取任務結果能夠經過Future接口的類,在ThreadPoolExecutor中實際上爲FutureTask類,而在ScheduledThreadPoolExecutor中則是ScheduledFutureTask

相關文章
相關標籤/搜索