ScheduledThreadPoolExecutor源碼學習

1、首先先看一下線程池的建立:線程

corePoolSize:線程池核心線程數量翻譯

maximumPoolSize:線程池最大線程數量rest

keepAliverTime:當活躍線程數大於核心線程數時,空閒的多餘線程最大存活時間繼承

unit:存活時間的單位接口

workQueue:存聽任務的隊列隊列

handler:超出線程範圍和隊列容量的任務的處理程序資源

2、知道這幾個參數以後,看一下線程池的原理:get

提交一個任務到線程池中,線程池的處理流程以下:源碼

一、判斷線程池裏的核心線程是否都在執行任務,若是不是(核心線程空閒或者還有核心線程沒有被建立)則建立一個新的工做線程來執行任務。若是核心線程都在執行任務,則進入下個流程。it

二、線程池判斷工做隊列是否已滿,若是工做隊列沒有滿,則將新提交的任務存儲在這個工做隊列裏。若是工做隊列滿了,則進入下個流程。

三、判斷線程池裏的線程是否都處於工做狀態,若是沒有,則建立一個新的工做線程來執行任務。若是已經滿了,則交給飽和策略來處理這個任務。

拒絕策略有3種:以下

3、在分析ScheduledThreadPoolExecutor以前,咱們先看看ThreadPoolExecutor的源碼,而後在看ScheduledThreadPoolExecutor,對比分析ScheduledThreadPoolExecutor產生的緣由和使用場景。

1.ThreadPoolExecutor 的源碼

由此能夠看到ThreadPoolExecutor本質上就是個Executor,因此他的的核心是execute方法。咱們來看一下execute方法:

在具體看這個方法以前,能夠看看這個方法的註釋(本身翻譯吧,英語太菜,只會直譯):

1. 若是運行線程數小於核心線程數,則嘗試用給定的命令做爲第一個任務啓動一個新線程。調用addWorker會原子性地檢查runState和workerCount,以此來防止錯誤警報,由於錯誤警報會在不該該添加線程的時候添加線程。

2. 若是一個任務能夠成功添加到隊列,那麼咱們仍然須要再次檢查是否應該添加一個線程(當自上次檢查以來已有的線程已經死亡),或者在進入這個方法後關閉線程池。所以,咱們從新檢查狀態,若是必要的話,當線程池關閉時回滾隊列;當沒有線程的時候,則啓動一個新線程。

3.若是任務沒法添加到隊列,則嘗試添加新線程。若是它失敗了,咱們知道咱們被關閉或飽和,因此拒絕任務。

這個方法的註釋,就是前面寫的,線程池的處理流程。

而後還能夠看看這個類的這2段註釋,更好的理解這個流程:

如今來分析下execute方法:

首先ctl的參數,一眼看上去不知道是什麼,而後咱們先看看這個類關於這個參數的定義:

再跟一下這個ctl:

因此ctl.get()就是獲取主池控制狀態ctl。

當正在運行的線程數小於核心線程數,看一下addWorker(command,true)方法:

 

關於這個方法,先看一下這段註釋:

而後能夠在前面看看runState的狀態,基本上這個方法就能本身看明白了。其中須要注意的是:

這個線程是禁止打斷的執行運行。

還有就是最後finally作了個判斷,若是workStarted爲false,會從線程池中移除該線程。

而後再看一下Worker:

繼承了AQS類,能夠方便的實現工做線程的停止操做;

實現了Runnable接口,能夠將自身做爲一個任務在工做線程中執行;

當前提交的任務firstTask做爲參數傳入Worker的構造方法;

因此咱們應該看一下他的run方法:

這個runWorker方法(上面的圖方法沒截全),主要是如下幾步:

unlock釋放鎖,表示可中斷;

firstTask不爲null或者從線程池中獲取的線程不爲null,線程加鎖,檢查線程池狀態,看是否可中斷,能夠的話就進行中斷;

執行beforeExecute方法,run方法,afterExecute方法;

釋放鎖。

而後在看一下getTask方法:

這個方法看一下注釋,基本上就沒啥了。

看完這些以後,再返回到execute方法:

關於ThreadPoolExecutor的execute方法就分析完了。
execute是任務的執行,再看一下任務的提交,submit方法:

ThreadPoolExecutor並無submit方法,而他的父類AbstractExecutorService實現了ExecutorService的submit方法。

這個execute方法已經分析過了,如今就看看newTaskFor方法:

既然有這麼多狀態,確定有對狀態的獲取方法:

看一下get()方法:

內部經過awaitDone方法對主線程進行阻塞:

若是主線程被中斷,則拋出中斷異常;
判斷FutureTask當前的state,若是大於COMPLETING,說明任務已經執行完成,則直接返回;
若是當前state等於COMPLETING,說明任務已經執行完,這時主線程只需經過yield方法讓出cpu資源,等待state變成NORMAL;
經過WaitNode類封裝當前線程,並經過UNSAFE添加到waiters鏈表;
最終經過LockSupport的park或parkNanos掛起線程;
 

看完這個以後,在看一下FutureTask的類圖關係:

而後看一下FutureTask的run方法:

 

這樣的話,ThreadPoolExecutor 的源碼部分就分析差很少了。

而後若是想給這個類添加額外的功能,能夠看看這個示例:

2.而後看一下ScheduledThreadPoolExecutor:

這個類有這樣一段註釋:

咱們仍是跟ThreadPoolExecutor同樣的方式,研究一下這個類:

先看execute方法:

調用delayedExecute(t)延遲任務的執行:

剛加進去的線程,確定是走else的,看一下ensurePrestart方法:

走到這裏就能夠看出來,跟ThreadPoolExecutor的addWorker是同樣的了。

因此咱們須要好好看看這個部分:

先看一下triggerTime方法:獲取下一次具體執行時間

而後看一下ScheduledFutureTask:

這個FutureTask是否是很熟悉,這個在前面ThreadPoolExecutor的時候分析過,而後ScheduledFutureTask從新了寫了run方法:

看完這個以後,再看一下ScheduledFutureTask的構造方法:

time:下次任務執行時的時間;
period:執行週期;
sequenceNumber:保存任務被添加到ScheduledThreadPoolExecutor中的序號。

再看一下這個類:

這個就總體串起來了。

定時任務,通常會調用ScheduledThreadPoolExecutor這2個方法:

scheduleAtFixedRate方法
該方法設置了執行週期,下一次執行時間至關因而上一次的執行時間加上period,它是採用已固定的頻率來執行任務:

scheduleWithFixedDelay方法
該方法設置了執行週期,與scheduleAtFixedRate方法不一樣的是,下一次執行時間是上一次任務執行完的系統時間加上period,於是具體執行時間不是固定的,但週期是固定的,是採用相對固定的延遲來執行任務:

這裏的unit.toNanos(-delay)),再看一下setNextRunTime()方法,就能理解了,這裏把週期設置爲負數來表示是相對固定的延遲執行。

再看ScheduledThreadPoolExecutor的submit方法:

也是走到schedule方法,剩下的就本身看看吧。

(shutdown的部分沒有寫,能夠本身看看)

相關文章
相關標籤/搜索