本文只介紹 ThreadPoolExecutor
源碼的關鍵部分,開篇會先介紹 ThreadPoolExecutor
中的一些核心常量定義,而後選取線程池工做週期中的幾個關鍵方法分析其源碼實現。其實,看 JDK
源碼的最好途徑就是看類文件註釋,做者把想說的全都寫在裏面了。java
ThreadPoolExecutor
內部做者採用了一個 32bit
的 int
值來表示線程池的運行狀態(runState)
和當前線程池中的線程數目(workerCount)
,這個變量取名叫 ctl
(control
的縮寫),其中高 3bit
表示容許狀態,低 29bit
表示線程數目(最多容許 2^29 - 1
個線程)。程序員
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 29 位
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 線程池最大容量
// runState is stored in the high-order bits
// 定義的線程池狀態常量
// 111+29個0,值爲 -4 + 2 + 1 = -1(不懂的面壁)
private static final int RUNNING = -1 << COUNT_BITS;
// 000+29個0
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001+29個0
private static final int STOP = 1 << COUNT_BITS;
// 010+29個0
private static final int TIDYING = 2 << COUNT_BITS;
// 011+29個0
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; } // 獲得線程池狀態
private static int workerCountOf(int c) { return c & CAPACITY; } // 獲得線程池線程數
private static int ctlOf(int rs, int wc) { return rs | wc; } // 反向構造 ctl 的值
複製代碼
由於表明線程池狀態的常量能夠經過值的大小來表示前後關係(order)
,所以後續源碼中會有:併發
rs >= SHUTDOWN // 那就表示SHUTDOWN、 STOP or TIDYING or TERMINATED,反正不是 RUNNING
複製代碼
理解上述的常量意義有助於後面理解源碼。異步
從第一節咱們已經知道了線程池分爲五個狀態,下面咱們聊聊這五個狀態分別限制了線程池能執行怎樣的行爲:函數
RUNNING:
能夠接受新任務,且執行 Queue
中的任務SHUTDOWN:
再也不接受新的任務,但能繼續執行 Queue
中已有的任務STOP:
再也不接受新的任務,且也再也不執行 Queue
中已有的任務TIDYING:
全部任務完成,workCount=0
,線程池狀態轉爲 TIDYING
且會執行 hook method
,即 terminated()
TERMINATED:``hook method
terminated()
執行完畢以後進入的狀態上圖總結了 ThreadPoolExecutor
源碼中的關鍵性步驟,正好對應咱們這次解析的核心源碼(上圖出處見水印)。oop
execute
方法用來向線程池提交 task
,這是用戶使用線程池的第一步。若是線程池內未達到 corePoolSize
則新建一個線程,將該 task
設置爲這個線程的 firstTask
,而後加入 workerSet
等待調度,這步須要獲取全局鎖 mainLock
corePoolSize
後,將 task
放入阻塞隊列mainLock
workerCount
超出 maxPoolSize
後用 rejectHandler
來處理咱們能夠看到,線程池的設計使得在 2
步驟時避免了使用全局鎖,只須要塞進隊列返回等待異步調度就能夠,僅剩下 1
和 3
建立線程時須要獲取全局鎖,這有利於線程池的效率提高,由於一個線程池老是大部分時間在步驟 2
上,不然這線程池也沒什麼存在的意義。源碼分析
本文只分析 execute
,addWorker
,runWorker
,三個核心方法和一個 Worker
類,看懂了這幾個,其實其餘的代碼都能看懂。ui
// 繼承自 AQS 實現簡單的鎖控制
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
// worker 運行所在的線程
final Thread thread;
// 賦予該線程的第一個 task,多是 null,若是不是 null 就運行這個,
// 若是是 null 就經過 getTask 方法去 Queue 裏取任務
Runnable firstTask;
// 線程完成的任務數量
volatile long completedTasks;
Worker(Runnable firstTask) {
// 限制線程直到 runWorker 方法前都不容許被打斷
setState(-1);
this.firstTask = firstTask;
// 線程工廠建立線程
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
// 線程內部的 run 方法調用了 runWorker 方法
runWorker(this);
}
}
複製代碼
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 若是當前線程數小於 corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 調用 addWorker 方法新建線程,若是新建成功返回 true,那麼 execute 方法結束
if (addWorker(command, true))
return;
// 這裏意味着 addWorker 失敗,向下執行,由於 addWorker 可能改變 ctl 的值,
// 因此這裏從新獲取下 ctl
c = ctl.get();
}
// 到這步要麼是 corePoolSize 滿了,要麼是 addWorker 失敗了
// 前者很好理解,後者爲何會失敗呢?addWorker 中會講
// 若是線程池狀態爲 RUNNING 且 task 插入 Queue 成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 若是已不處於 RUNNING 狀態,那麼刪除已經入隊的 task,而後執行拒絕策略
// 這裏主要是擔憂併發場景下有別的線程改變了線程池狀態,因此 double-check 下
if (! isRunning(recheck) && remove(command))
reject(command);
// 這個分支有點難以理解,意爲若是當前 workerCount=0 的話就建立一個線程
// 那爲何方法開頭的那個 addWorker(command, true) 會返回 false 呢,其實
// 這裏有個場景就是 newCachedThreadPool,corePoolSize=0,maxPoolSize=MAX 的場景,
// 就會進到這個分支,以 maxPoolSize 爲界建立臨時線程,firstTask=null
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 這個分支很好理解,workQueue 滿了那麼要根據 maxPoolSize 建立線程了
// 若是無法建立說明 maxPoolSize 滿了,執行拒絕策略
else if (!addWorker(command, false))
reject(command);
}
複製代碼
// core 表示以 corePoolSize 仍是 maxPoolSize 爲界
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 看看 addWorker 何時返回 false
// 這裏的 if 邏輯有點難懂,用下數學上的分配率,將第一個邏輯表達式放進括號裏就好懂了
// 一、rs >= SHUTDOWN && rs != SHUTDOWN 其實就表示當線程池狀態是 STOP、TIDYING, 或 TERMINATED 的時候,固然不能添加 worker 了,任務都不執行了還想加 worker?
// 二、rs >= SHUTDOWN && firstTask != null 表示當提交一個非空任務,但線程池狀態已經不是 RUNNING 的時候,固然也不能 addWorker,由於你最多隻能執行完 Queue 中已有的任務
// 三、rs >= SHUTDOWN && workQueue.isEmpty() 若是 Queue 已經空了,那麼不容許新增
// 須要注意的是,若是 rs=SHUTDOWN && firstTask=null 或者 rs=SHUTDOWN && workQueue 非空的狀況下,仍是能夠新增 worker 的,須要建立臨時線程處理 Queue 裏的任務
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 這裏也是一個返回 false 的狀況,但很簡單,就是數目溢出了
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS 成功了,就跳出 loop
if (compareAndIncrementWorkerCount(c))
break retry;
// CAS 失敗的話,check 下目前線程池狀態,若是發生改變就回到外層 loop 再來一遍,這個也好理解,不然單純 CAS 失敗可是線程池狀態不變的話,就只要繼續內層 loop 就好了
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 這是全局鎖,必須持有才能進行 addWorker 操做
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 啓動線程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
複製代碼
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 循環直至 task = null,多是因爲線程池關閉、等待超時等
while (task != null || (task = getTask()) != null) {
w.lock();
// 下面這個 if 邏輯沒怎麼讀懂。。。翻譯了下注釋
// 若是線程池中止,確保線程中斷;
// 若是沒有,確保線程不中斷。這須要在第二種狀況下進行從新獲取ctl,以便在清除中斷時處理shutdownNow競爭
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 前置鉤子函數,能夠自定義
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 運行 run 方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
// 線程的 run 不容許拋出 Throwable,因此轉換爲 Error
thrown = x; throw new Error(x);
} finally {
// 後置鉤子函數,也能夠自定義
afterExecute(task, thrown);
}
} finally {
// 獲取下一個任務
task = null;
// 增長完成的任務數目
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
複製代碼
看完 ThreadPoolExecutor
的源碼,不得不驚歎於代碼寫得真優雅,可是正由於寫的太簡潔優雅甚至找不到一句囉嗦的代碼,因此讓人有點難懂。看源碼的建議是先仔細閱讀一遍類註釋,而後再配合 debug
,理清關鍵性的步驟在作什麼,有些 corner case
夾雜在主邏輯裏面,若是一開始看不懂能夠直接略過,過後再來反思。this
這是一個不定時更新的、披着程序員外衣的文青小號。既分享極客技術,也記錄人間煙火。spa