如下全部內容以及源碼分析都是基於JDK1.8的,請知悉。java
我寫博客就真的比較沒有順序了,這可能跟個人學習方式有關,我本身也以爲這樣挺很差的,可是沒辦法說服本身去改變,因此也只能這樣想到什麼學什麼了。編程
池化技術真的是一門在我看來很是牛逼的技術,由於它作到了在有限資源內實現了資源利用的最大化,這讓我想到了一門課程,那就是運籌學,當時在上運籌學的時候就常常作這種相似的問題。數組
言歸正傳吧,我接下來會進行一次線程池方面知識點的學習,也會記錄下來分享給你們。併發
線程池的內容當中有涉及到AQS同步器的知識點,若是對AQS同步器知識點感受有點薄弱,能夠去看個人上一篇文章。ide
既然說到線程池了,並且大多數的大牛也都會建議咱們使用池化技術來管理一些資源,那線程池確定也是有它的好處的,要否則怎麼會那麼出名而且讓你們使用呢?函數
咱們就來看看它究竟有什麼優點?oop
資源可控性:使用線程池能夠避免建立大量線程而致使內存的消耗源碼分析
提升響應速度:線程池地建立其實是很消耗時間和性能的,由線程池建立好有任務就運行,提高響應速度。性能
便於管理:池化技術最突出的一個特色就是能夠幫助咱們對池子裏的資源進行管理。由線程池統一分配和管理。學習
咱們要用線程池來統一分配和管理咱們的線程,那首先咱們要建立一個線程池出來,仍是有不少大牛已經幫咱們寫好了不少方面的代碼的,Executors的工廠方法就給咱們提供了建立多種不一樣線程池的方法。由於這個類只是一個建立對象的工廠,並無涉及到不少的具體實現,因此我不會過於詳細地去說明。
老規矩,仍是直接上代碼吧。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
複製代碼
這裏也就舉出一個方法的例子來進行以後的講解吧,咱們能夠看出,Executors只是個工廠而已,方法也只是來實例化不一樣的對象,實際上實例化出來的關鍵類就是ThreadPoolExecutor
。如今咱們就先來簡單地對ThreadPoolExecutor
構造函數內的每一個參數進行解釋一下吧。
corePoolSize(核心線程池大小):當提交一個任務到線程池時,線程池會建立一個線程來執行任務,即便其餘空閒的基本線程可以執行新任務也會建立線程,當任務數大於核心線程數的時候就不會再建立。在這裏要注意一點,線程池剛建立的時候,其中並無建立任何線程,而是等任務來纔去建立線程,除非調用了prestartAllCoreThreads()
或者prestartCoreThread()
方法 ,這樣纔會預先建立好corePoolSize
個線程或者一個線程。
maximumPoolSize(線程池最大線程數):線程池容許建立的最大線程數,若是隊列滿了,而且已建立的線程數小於最大線程數,則線程池會再建立新的線程執行任務。值得注意的是,若是使用了無界隊列,此參數就沒有意義了。
keepAliveTime(線程活動保持時間):此參數默認在線程數大於corePoolSize
的狀況下才會起做用, 當線程的空閒時間達到keepAliveTime
的時候就會終止,直至線程數目小於corePoolSize
。不過若是調用了allowCoreThreadTimeOut
方法,則當線程數目小於corePoolSize
的時候也會起做用.
unit(keelAliveTime的時間單位):keelAliveTime的時間單位,一共有7種,在這裏就不列舉了。
workQueue(阻塞隊列):阻塞隊列,用來存儲等待執行的任務,這個參數也是很是重要的,在這裏簡單介紹一下幾個阻塞隊列。
ArrayBlockingQueue:這是一個基於數組結構的有界阻塞隊列,此隊列按照FIFO的原則對元素進行排序。
LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按照FIFO排序元素,吞吐量一般要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()
就是使用了這個隊列。
SynchronousQueue:一個不存儲元素的阻塞隊列。每一個插入操做必須等到另外一個線程調用移除操做,不然插入操做一直處於阻塞狀態。吞吐量一般要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool()
就使用了這個隊列。
PriorityBlockingQueue:一個具備優先級的無阻塞隊列。
handler(飽和策略);當線程池和隊列都滿了,說明線程池已經處於飽和狀態了,那麼必須採起一種策略來處理還在提交過來的新任務。這個飽和策略默認狀況下是AbortPolicy
,表示沒法處理新任務時拋出異常。共有四種飽和策略提供,固然咱們也能夠選擇本身實現飽和策略。
AbortPolicy:直接丟棄而且拋出RejectedExecutionException
異常
CallerRunsPolicy:只用調用者所在線程來運行任務。
DiscardOldestPolicy:丟棄隊列裏最近的一個任務,並執行當前任務。
DiscardPolicy:丟棄任務而且不拋出異常。
線程池的執行流程就用參考資料裏的圖介紹一下了,具體咱們仍是經過代碼去講解。
在上面咱們簡單的講解了一下Executors
這個工廠類裏的工廠方法,而且講述了一下建立線程池的一些參數以及它們的做用,固然上面的講解並非很深刻,由於想要弄懂的話是須要持續地花時間去看去理解的,而博主本身也仍是沒有徹底弄懂,不過博主的學習方法是先學了個大概,再回頭來看看以前的知識點,可能會更加好理解,因此咱們接着往下面講吧。
在上面咱們就發現了,Executors
的工廠方法主要就返回了ThreadPoolExecutor
對象,至於另外一個在這裏暫時不講,也就是說,要學習線程池,其實關鍵的仍是得學會分析ThreadPoolExecutor
這個對象裏面的源碼,咱們接下來就會對ThreadPoolExecutor
裏的關鍵代碼進行分析。
ctl
是主要的控制狀態,是一個複合類型的變量,其中包括了兩個概念。
workerCount:表示有效的線程數目
runState:線程池裏線程的運行狀態
咱們來分析一下跟ctl
有關的一些源代碼吧,直接上代碼
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//用來表示線程池數量的位數,很明顯是29,Integer.SIZE=32
private static final int COUNT_BITS = Integer.SIZE - 3;
//線程池最大數量,2^29 - 1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
//咱們能夠看出有5種runState狀態,證實至少須要3位來表示runState狀態
//因此高三位就是表示runState了
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
//用於存放線程任務的阻塞隊列
private final BlockingQueue<Runnable> workQueue;
//重入鎖
private final ReentrantLock mainLock = new ReentrantLock();
//線程池當中的線程集合,只有當擁有mainLock鎖的時候,才能夠進行訪問
private final HashSet<Worker> workers = new HashSet<Worker>();
//等待條件支持終止
private final Condition termination = mainLock.newCondition();
//建立新線程的線程工廠
private volatile ThreadFactory threadFactory;
//飽和策略
private volatile RejectedExecutionHandler handler;
複製代碼
CAPACITY
在這裏咱們講一下這個線程池最大數量的計算吧,由於這裏涉及到源碼以及位移之類的操做,我感受大多數人都仍是不太會這個,由於我一開始看的時候也是不太會的。
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
複製代碼
從代碼咱們能夠看出,是須要1往左移29位
,而後再減去1,那個1往左移29位
是怎麼計算的呢?
1 << COUNT_BITS
1的32位2進制是
00000000 00000000 00000000 00000001
左移29位的話就是
00100000 00000000 00000000 00000000
再進行減一的操做
000 11111 11111111 11111111 11111111
也就是說線程池最大數目就是
000 11111 11111111 11111111 11111111
複製代碼
2.runState
正數的原碼、反碼、補碼都是同樣的 在計算機底層,是用補碼來表示的
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
複製代碼
能夠接受新任務而且處理已經在阻塞隊列的任務 高3位所有是1的話,就是RUNNING狀態
-1 << COUNT_BITS
這裏是-1往左移29位,稍微有點不同,-1的話須要咱們本身算出補碼來
-1的原碼
10000000 00000000 00000000 00000001
-1的反碼,負數的反碼是將原碼除符號位之外所有取反
11111111 11111111 11111111 11111110
-1的補碼,負數的補碼就是將反碼+1
11111111 11111111 11111111 11111111
關鍵了,往左移29位,因此高3位全是1就是RUNNING狀態
111 00000 00000000 00000000 00000000
複製代碼
不接受新任務,可是處理已經在阻塞隊列的任務 高3位全是0,就是SHUTDOWN狀態
0 << COUNT_BITS
0的表示
00000000 00000000 00000000 00000000
往左移29位
00000000 00000000 00000000 00000000
複製代碼
不接受新任務,也不處理阻塞隊列裏的任務,而且會中斷正在處理的任務 因此高3位是001,就是STOP狀態
1 << COUNT_BITS
1的表示
00000000 00000000 00000000 00000001
往左移29位
00100000 00000000 00000000 00000000
複製代碼
全部任務都被停止,workerCount是0,線程狀態轉化爲TIDYING而且調用terminated()鉤子方法 因此高3位是010,就是TIDYING狀態
2 << COUNT_BITS
2的32位2進制
00000000 00000000 00000000 00000010
往左移29位
01000000 00000000 00000000 00000000
複製代碼
terminated()鉤子方法已經完成 因此高3位是110,就是TERMINATED狀態
3 << COUNT_BITS
3的32位2進制
00000000 00000000 00000000 00000011
往左移29位
11000000 00000000 00000000 00000000
複製代碼
3.部分方法介紹
實時獲取runState的方法
private static int runStateOf(int c) { return c & ~CAPACITY; }
複製代碼
~CAPACITY
~是按位取反的意思
&是按位與的意思
而CAPACITY是,高位3個0,低29位都是1,因此是
000 11111 11111111 11111111 11111111
取反的話就是
111 00000 00000000 00000000 00000000
傳進來的c參數與取反的CAPACITY進行按位與操做
1、低位29個0進行按位與,仍是29個0
2、高位3個1,既保持c參數的高3位
既高位保持原樣,低29位都是0,這也就得到了線程池的運行狀態runState
複製代碼
獲取線程池的當前有效線程數目
private static int workerCountOf(int c) { return c & CAPACITY; }
複製代碼
CAPACITY的32位2進制是
000 11111 11111111 11111111 11111111
用入參c跟CAPACITY進行按位與操做
1、低29位都是1,因此保留c的低29位,也就是有效線程數
2、高3位都是0,因此c的高3位也是0
這樣獲取出來的即是workerCount的值
複製代碼
原子整型變量ctl的初始化方法
//結合這幾句代碼來看
private static final int RUNNING = -1 << COUNT_BITS;
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) { return rs | wc; }
複製代碼
RUNNING是
111 00000 00000000 00000000 00000000
ctlOf是將rs和wc進行按位或的操做
初始化的時候是將RUNNING和0進行按位或
0的32位2進制是
00000000 00000000 00000000 00000000
因此初始化的ctl是
111 00000 00000000 00000000 00000000
複製代碼
public void execute(Runnable command) {
//須要執行的任務command爲空,拋出空指針異常
if (command == null) // 1
throw new NullPointerException();
/* *執行的流程實際上分爲三步 *一、若是運行的線程小於corePoolSize,以用戶給定的Runable對象新開一個線程去執行 * 而且執行addWorker方法會以原子性操做去檢查runState和workerCount,以防止當返回false的 * 時候添加了不該該添加的線程 *二、 若是任務可以成功添加到隊列當中,咱們仍須要對添加的線程進行雙重檢查,有可能添加的線程在前 * 一次檢查時已經死亡,又或者在進入該方法的時候線程池關閉了。因此咱們須要複查狀態,並有有必 * 要的話須要在中止時回滾入列操做,或者在沒有線程的時候新開一個線程 *三、若是任務沒法入列,那咱們須要嘗試新增一個線程,若是新建線程失敗了,咱們就知道線程可能關閉了 * 或者飽和了,就須要拒絕這個任務 * */
//獲取線程池的控制狀態
int c = ctl.get(); // 2
//經過workCountOf方法算workerCount值,小於corePoolSize
if (workerCountOf(c) < corePoolSize) {
//添加任務到worker集合當中
if (addWorker(command, true))
return; //成功返回
//失敗的話再次獲取線程池的控制狀態
c = ctl.get();
}
/* *判斷線程池是否正處於RUNNING狀態 *是的話添加Runnable對象到workQueue隊列當中 */
if (isRunning(c) && workQueue.offer(command)) { // 3
//再次獲取線程池的狀態
int recheck = ctl.get();
//再次檢查狀態
//線程池不處於RUNNING狀態,將任務從workQueue隊列中移除
if (! isRunning(recheck) && remove(command))
//拒絕任務
reject(command);
//workerCount等於0
else if (workerCountOf(recheck) == 0) // 4
//添加worker
addWorker(null, false);
}
//加入阻塞隊列失敗,則嘗試以線程池最大線程數新開線程去執行該任務
else if (!addWorker(command, false)) // 5
//執行失敗則拒絕任務
reject(command);
}
複製代碼
咱們來講一下上面這個代碼的流程:
一、首先判斷任務是否爲空,空則拋出空指針異常 二、不爲空則獲取線程池控制狀態,判斷小於corePoolSize,添加到worker集合當中執行,
- 如成功,則返回
- 失敗的話再接着獲取線程池控制狀態,由於只有狀態變了纔會失敗,因此從新獲取 三、判斷線程池是否處於運行狀態,是的話則添加command到阻塞隊列,加入時也會再次獲取狀態而且檢測 狀態是否不處於運行狀態,不處於的話則將command從阻塞隊列移除,而且拒絕任務 四、若是線程池裏沒有了線程,則建立新的線程去執行獲取阻塞隊列的任務執行 五、若是以上都沒執行成功,則須要開啓最大線程池裏的線程來執行任務,失敗的話就丟棄
有時候再多的文字也不如一個流程圖來的明白,因此仍是畫了個execute的流程圖給你們方便理解。
2.addWorker(Runnable firstTask, boolean core)
private boolean addWorker(Runnable firstTask, boolean core) {
//外部循環標記
retry:
//外層死循環
for (;;) {
//獲取線程池控制狀態
int c = ctl.get();
//獲取runState
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/** *1.若是線程池runState至少已是SHUTDOWN *2\. 有一個是false則addWorker失敗,看false的狀況 * - runState==SHUTDOWN,即狀態已經大於SHUTDOWN了 * - firstTask爲null,即傳進來的任務爲空,結合上面就是runState是SHUTDOWN,可是 * firstTask不爲空,表明線程池已經關閉了還在傳任務進來 * - 隊列爲空,既然任務已經爲空,隊列爲空,就不須要往線程池添加任務了 */
if (rs >= SHUTDOWN && //runState大於等於SHUTDOWN,初始位RUNNING
! (rs == SHUTDOWN && //runState等於SHUTDOWN
firstTask == null && //firstTask爲null
! workQueue.isEmpty())) //workQueue隊列不爲空
return false;
//內層死循環
for (;;) {
//獲取線程池的workerCount數量
int wc = workerCountOf(c);
//若是workerCount超出最大值或者大於corePoolSize/maximumPoolSize
//返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//經過CAS操做,使workerCount數量+1,成功則跳出循環,回到retry標記
if (compareAndIncrementWorkerCount(c))
break retry;
//CAS操做失敗,再次獲取線程池的控制狀態
c = ctl.get(); // Re-read ctl
//若是當前runState不等於剛開始獲取的runState,則跳出內層循環,繼續外層循環
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
//CAS因爲更改workerCount而失敗,繼續內層循環
}
}
//經過以上循環,能執行到這是workerCount成功+1了
//worker開始標記
boolean workerStarted = false;
//worker添加標記
boolean workerAdded = false;
//初始化worker爲null
Worker w = null;
try {
//初始化一個當前Runnable對象的worker對象
w = new Worker(firstTask);
//獲取該worker對應的線程
final Thread t = w.thread;
//若是線程不爲null
if (t != null) {
//初始線程池的鎖
final ReentrantLock mainLock = this.mainLock;
//獲取鎖
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
//獲取鎖後再次檢查,獲取線程池runState
int rs = runStateOf(ctl.get());
//當runState小於SHUTDOWN或者runState等於SHUTDOWN而且firstTask爲null
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//線程已存活
if (t.isAlive()) // precheck that t is startable
//線程未啓動就存活,拋出IllegalThreadStateException異常
throw new IllegalThreadStateException();
//將worker對象添加到workers集合當中
workers.add(w);
//獲取workers集合的大小
int s = workers.size();
//若是大小超過largestPoolSize
if (s > largestPoolSize)
//從新設置largestPoolSize
largestPoolSize = s;
//標記worker已經被添加
workerAdded = true;
}
} finally {
//釋放鎖
mainLock.unlock();
}
//若是worker添加成功
if (workerAdded) {
//啓動線程
t.start();
//標記worker已經啓動
workerStarted = true;
}
}
} finally {
//若是worker沒有啓動成功
if (! workerStarted)
//workerCount-1的操做
addWorkerFailed(w);
}
//返回worker是否啓動的標記
return workerStarted;
}
複製代碼
咱們也簡單說一下這個代碼的流程吧,還真的是挺難的,博主寫的時候都停了好屢次,想砸鍵盤的說:
一、獲取線程池的控制狀態,進行判斷,不符合則返回false,符合則下一步 二、死循環,判斷workerCount是否大於上限,或者大於corePoolSize/maximumPoolSize,沒有的話則對workerCount+1操做, 三、若是不符合上述判斷或+1操做失敗,再次獲取線程池的控制狀態,獲取runState與剛開始獲取的runState相比,不一致則跳出內層循環繼續外層循環,不然繼續內層循環 四、+1操做成功後,使用重入鎖ReentrantLock來保證往workers當中添加worker實例,添加成功就啓動該實例。
接下來看看流程圖來理解一下上面代碼的一個執行流程
3.addWorkerFailed(Worker w)
addWorker方法添加worker失敗,而且沒有成功啓動任務的時候,就會調用此方法,將任務從workers中移除,而且workerCount作-1操做。
private void addWorkerFailed(Worker w) {
//重入鎖
final ReentrantLock mainLock = this.mainLock;
//獲取鎖
mainLock.lock();
try {
//若是worker不爲null
if (w != null)
//workers移除worker
workers.remove(w);
//經過CAS操做,workerCount-1
decrementWorkerCount();
tryTerminate();
} finally {
//釋放鎖
mainLock.unlock();
}
}
複製代碼
4.tryTerminate()
當對線程池執行了非正常成功邏輯的操做時,都會須要執行tryTerminate嘗試終止線程池
final void tryTerminate() {
//死循環
for (;;) {
//獲取線程池控制狀態
int c = ctl.get();
/* *線程池處於RUNNING狀態 *線程池狀態最小大於TIDYING *線程池==SHUTDOWN而且workQUeue不爲空 *直接return,不能終止 */
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//若是workerCount不爲0
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
//獲取線程池的鎖
final ReentrantLock mainLock = this.mainLock;
//獲取鎖
mainLock.lock();
try {
//經過CAS操做,設置線程池狀態爲TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
//設置線程池的狀態爲TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
//發送釋放信號給在termination條件上等待的線程
termination.signalAll();
}
return;
}
} finally {
//釋放鎖
mainLock.unlock();
}
// else retry on failed CAS
}
}
複製代碼
5.runWorker(Worker w)
該方法的做用就是去執行任務
final void runWorker(Worker w) {
//獲取當前線程
Thread wt = Thread.currentThread();
//獲取worker裏的任務
Runnable task = w.firstTask;
//將worker實例的任務賦值爲null
w.firstTask = null;
/* *unlock方法會調用AQS的release方法 *release方法會調用具體實現類也就是Worker的tryRelease方法 *也就是將AQS狀態置爲0,容許中斷 */
w.unlock(); // allow interrupts
//是否忽然完成
boolean completedAbruptly = true;
try {
//worker實例的task不爲空,或者經過getTask獲取的不爲空
while (task != null || (task = getTask()) != null) {
//獲取鎖
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
/* *獲取線程池的控制狀態,至少要大於STOP狀態 *若是狀態不對,檢查當前線程是否中斷並清除中斷狀態,而且再次檢查線程池狀態是否大於STOP *若是上述知足,檢查該對象是否處於中斷狀態,不清除中斷標記 */
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
//中斷改對象
wt.interrupt();
try {
//執行前的方法,由子類具體實現
beforeExecute(wt, task);
Throwable thrown = null;
try {
//執行任務
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//執行完後調用的方法,也是由子類具體實現
afterExecute(task, thrown);
}
} finally {//執行完後
//task設置爲null
task = null;
//已完成任務數+1
w.completedTasks++;
//釋放鎖
w.unlock();
}
}
completedAbruptly = false;
} finally {
//處理並退出當前worker
processWorkerExit(w, completedAbruptly);
}
}
複製代碼
接下來咱們用文字來講明一下執行任務這個方法的具體邏輯和流程。
- 首先在方法一進來,就執行了w.unlock(),這是爲了將AQS的狀態改成0,由於只有getState() >= 0的時候,線程才能夠被中斷;
- 判斷firstTask是否爲空,爲空則經過getTask()獲取任務,不爲空接着往下執行
- 判斷是否符合中斷狀態,符合的話設置中斷標記
- 執行beforeExecute(),task.run(),afterExecute()方法
- 任何一個出異常都會致使任務執行的終止;進入processWorkerExit來退出任務
- 正常執行的話會接着回到步驟2
附上一副簡單的流程圖:
6.getTask()
在上面的runWorker方法當中咱們能夠看出,當firstTask爲空的時候,會經過該方法來接着獲取任務去執行,那咱們就看看獲取任務這個方法究竟是怎麼樣的?
private Runnable getTask() {
//標誌是否獲取任務超時
boolean timedOut = false; // Did the last poll() time out?
//死循環
for (;;) {
//獲取線程池的控制狀態
int c = ctl.get();
//獲取線程池的runState
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/* *判斷線程池的狀態,出現如下兩種狀況 *一、runState大於等於SHUTDOWN狀態 *二、runState大於等於STOP或者阻塞隊列爲空 *將會經過CAS操做,進行workerCount-1並返回null */
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//獲取線程池的workerCount
int wc = workerCountOf(c);
// Are workers subject to culling?
/* *allowCoreThreadTimeOut:是否容許core Thread超時,默認false *workerCount是否大於核心核心線程池 */
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/* *一、wc大於maximumPoolSize或者已超時 *二、隊列不爲空時保證至少有一個任務 */
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
/* *經過CAS操做,workerCount-1 *能進行-1操做,證實wc大於maximumPoolSize或者已經超時 */
if (compareAndDecrementWorkerCount(c))
//-1操做成功,返回null
return null;
//-1操做失敗,繼續循環
continue;
}
try {
/* *wc大於核心線程池 *執行poll方法 *小於核心線程池 *執行take方法 */
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//判斷任務不爲空返回任務
if (r != null)
return r;
//獲取一段時間沒有獲取到,獲取超時
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
複製代碼
仍是文字解說一下上面的代碼邏輯和流程:
- 獲取線程池控制狀態和runState,判斷線程池是否已經關閉或者正在關閉,是的話則workerCount-1操做返回null
- 獲取workerCount判斷是否大於核心線程池
- 判斷workerCount是否大於最大線程池數目或者已經超時,是的話workerCount-1,-1成功則返回null,不成功則回到步驟1從新繼續
- 判斷workerCount是否大於核心線程池,大於則用poll方法從隊列獲取任務,不然用take方法從隊列獲取任務
- 判斷任務是否爲空,不爲空則返回獲取的任務,不然回到步驟1從新繼續
接下來依然有一副流程圖:
7.processWorkerExit
明顯的,在執行任務當中,會去獲取任務進行執行,那既然是執行任務,確定就會有執行完或者出現異常中斷執行的時候,那這時候確定也會有相對應的操做,至於具體操做是怎麼樣的,咱們仍是直接去看源碼最實際。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
/* *completedAbruptly:在runWorker出現,表明是否忽然完成的意思 *也就是在執行任務過程中出現異常,就會忽然完成,傳true * *若是是忽然完成,須要經過CAS操做,workerCount-1 *不是忽然完成,則不須要-1,由於getTask方法當中已經-1 * *下面的代碼註釋貌似與代碼意思相反了 */
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
//生成重入鎖
final ReentrantLock mainLock = this.mainLock;
//獲取鎖
mainLock.lock();
try {
//線程池統計的完成任務數completedTaskCount加上worker當中完成的任務數
completedTaskCount += w.completedTasks;
//從HashSet<Worker>中移除
workers.remove(w);
} finally {
//釋放鎖
mainLock.unlock();
}
//由於上述操做是釋聽任務或線程,因此會判斷線程池狀態,嘗試終止線程池
tryTerminate();
//獲取線程池的控制狀態
int c = ctl.get();
//判斷runState是否小魚STOP,便是RUNNING或者SHUTDOWN
//若是是RUNNING或者SHUTDOWN,表明沒有成功終止線程池
if (runStateLessThan(c, STOP)) {
/* *是否忽然完成 *如若不是,表明已經沒有任務可獲取完成,由於getTask當中是while循環 */
if (!completedAbruptly) {
/* *allowCoreThreadTimeOut:是否容許core thread超時,默認false *min-默認是corePoolSize */
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//容許core thread超時而且隊列不爲空
//min爲0,即容許core thread超時,這樣就不須要維護核心核心線程池了
//若是workQueue不爲空,則至少保持一個線程存活
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//若是workerCount大於min,則表示知足所需,能夠直接返回
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//若是是忽然完成,添加一個空任務的worker線程--這裏我也不太理解
addWorker(null, false);
}
}
複製代碼
- 首先判斷線程是否忽然終止,若是是忽然終止,經過CAS,workerCount-1
- 統計線程池完成任務數,並將worker從workers當中移除
- 判斷線程池狀態,嘗試終止線程池
- 線程池沒有成功終止
- 判斷是否忽然完成任務,不是則進行下一步,是則進行第三步
- 如容許核心線程超時,隊列不爲空,則至少保證一個線程存活
- 添加一個空任務的worker線程
咱們在上面已經算是挺詳細地講了線程池執行任務execute
的執行流程和一些細節,在上面頻繁地出現了一個字眼,那就是worker實例,那麼這個worker到底是什麼呢?裏面都包含了一些什麼信息,以及worker這個任務到底是怎麼執行的呢?
咱們就在這個部分來介紹一下吧,仍是直接上源碼:
咱們能夠看到Worker內部類繼承AQS同步器而且實現了Runnable接口,因此Worker很明顯就是一個可執行任務而且又能夠控制中斷、起到鎖效果的類。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
/** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */
private static final long serialVersionUID = 6138294804551838833L;
/** 工做線程,若是工廠失敗則爲空. */
final Thread thread;
/** 初始化任務,有可能爲空 */
Runnable firstTask;
/** 已完成的任務計數 */
volatile long completedTasks;
/** * 建立並初始化第一個任務,使用線程工廠來建立線程 * 初始化有3步 *一、設置AQS的同步狀態爲-1,表示該對象須要被喚醒 *二、初始化第一個任務 *三、調用ThreadFactory來使自身建立一個線程,並賦值給worker的成員變量thread */
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
//重寫Runnable的run方法
/** Delegates main run loop to outer runWorker */
public void run() {
//調用ThreadPoolExecutor的runWorker方法
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
//表明是否獨佔鎖,0-非獨佔 1-獨佔
protected boolean isHeldExclusively() {
return getState() != 0;
}
//重寫AQS的tryAcquire方法嘗試獲取鎖
protected boolean tryAcquire(int unused) {
//嘗試將AQS的同步狀態從0改成1
if (compareAndSetState(0, 1)) {
//若是改變成,則將當前獨佔模式的線程設置爲當前線程並返回true
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
//不然返回false
return false;
}
//重寫AQS的tryRelease嘗試釋放鎖
protected boolean tryRelease(int unused) {
//設置當前獨佔模式的線程爲null
setExclusiveOwnerThread(null);
//設置AQS同步狀態爲0
setState(0);
//返回true
return true;
}
//獲取鎖
public void lock() { acquire(1); }
//嘗試獲取鎖
public boolean tryLock() { return tryAcquire(1); }
//釋放鎖
public void unlock() { release(1); }
//是否被獨佔
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
複製代碼
寫這個線程池就真的是不容易了,歷時兩個星期,中途有不少的地方不懂,並且《Java併發編程的藝術》的這本書當中對線程池的介紹其實並不算多,因此本身看起來也挺痛苦的,還常常會看了這個方法就不知道爲何要調用這個以及調用這個方法是出何用意。並且在這學習的過程中,有在懷疑本身的學習方法對不對,由於也有人跟我說不須要一句句去看去分析源碼,只須要知道流程就能夠了,可是後來仍是想一想按照本身的學習路線走,多讀源碼老是有好處的,在這裏我也給程序猿一些建議,有本身的學習方法的時候,按照本身的方式堅決走下去。
方騰飛:《Java併發編程的藝術》
如需轉載,請務必註明出處,畢竟一塊塊搬磚也不是容易的事情。