這篇文章是我在閱讀源碼時整理的一些筆記,對源碼的關鍵點進行了比較詳細的註釋,而後加上一些本身對線程池機制的理解。最終目的是要弄清楚下面這些問題:安全
首先須要介紹一下線程池的兩個重要成員:bash
AtomicInteger 類型。高3位存儲線程池狀態,低29位存儲當前線程數量。workerCountOf(c) 返回當前線程數量。runStateOf(c) 返回當前線程池狀態。 線程池有以下狀態:測試
這個線程在線程池中的包裝類。一個 Worker 表明一個線程。線程池用一個 HashSet 管理這些線程。ui
須要注意的是,Worker 自己並不區分核心線程和非核心線程,核心線程只是概念模型上的叫法,特性是依靠對線程數量的判斷來實現的 Worker 特性以下:this
submit 返回一個 Future 對象,咱們能夠調用其 get 方法獲取任務執行的結果。代碼很簡單,就是將 Runnable 包裝成 FutureTask 而已。能夠看到,最終仍是調用 Execute 方法:spa
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
複製代碼
FutureTask 的代碼就不貼了,簡述一下原理:線程
這個機制你們應該都很熟了,再簡述一遍:code
具體的代碼分析以下:對象
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { //小於核心線程數
if (addWorker(command, true)) //啓動核心線程並執行任務
return;
c = ctl.get(); //執行失敗時從新獲取值
}
if (isRunning(c) && workQueue.offer(command)) { //檢查運行狀態並將任務添加到隊列
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) //從新檢查,防止狀態有變化。若是有,移出隊列並拒絕任務
reject(command);
else if (workerCountOf(recheck) == 0) //若是線程數爲0,建立非核心線程,第一個參數爲空時會從隊列中取任務執行
addWorker(null, false);
}
else if (!addWorker(command, false)) //添加到隊列失敗,說明隊列已滿,建立非核心線程執行任務
reject(command); //執行失敗說明達到最大線程數,拒絕任務
複製代碼
線程池使用 addWorker 方法新建線程,第一個參數表明要執行的任務,線程會將這個任務執行完畢後再從隊列取任務執行。第二參數是核心線程的標誌,它並非 Worker 自己的屬性,在這裏只用來判斷工做線程數量是否超標。繼承
這個方法能夠分紅兩部分,第一部分進行一些前置判斷,並使用循環 CAS 結構將線程數量加1。代碼以下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry: //這個語法不經常使用,用於給外層 for 循環命名。方便嵌套 for 循環中,break 和 continue 指定是外層仍是內層循環
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// firstTask 不爲空表明這個方法用於添加任務,爲空表明新建線程。SHUTDOWN 狀態下不接受新任務,但處理隊列中的任務。這就是第二個判斷的邏輯。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 使用循環 CAS 自旋,增長線程數量直到成功爲止
for (;;) {
int wc = workerCountOf(c);
//判斷是否超過線程容量
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//使用 CAS 將線程數量加1
if (compareAndIncrementWorkerCount(c))
break retry;
//修改不成功說明線程數量有變化
//從新判斷線程池狀態,有變化時跳到外層循環從新獲取線程池狀態
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
//到這裏說明狀態沒有變化,從新嘗試增長線程數量
}
}
... ...
}
複製代碼
第二部分負責新建並啓動線程,並將 Worker 添加至 Hashset 中。代碼很簡單,沒什麼好註釋的,用了 ReentrantLock 確保線程安全。
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s; //這個參數是測試用的,不用管它
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w); //添加失敗時移除 Worker 並將線程數量減 1
}
return workerStarted;
}
複製代碼
在 addWorker 方法中,線程會被啓動。新建線程時,Worker 將自身傳入,因此線程啓動後會執行 Worker 的 run 方法,這個方法調用了 ThreadPoolExecutor 的 runWorker 方法執行任務,runWorker 中會循環取任務執行,執行邏輯以下:
具體代碼分析以下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
//task 爲咱們傳給 execute 的任務。task 爲空時從隊列中取任務執行
try {
while (task != null || (task = getTask()) != null) {
w.lock();
//這段邏輯很是繞。實際上它實現瞭如下邏輯:
//1.若是線程池已中止且線程未中斷,條件成立,中斷線程
//2.若是線程池未中止,線程爲中斷狀態,將線程狀態重置,並從新進行1的判斷
//3.若是線程池未中止,線程不爲中斷狀態,條件不成立
//Thread.interrupted() 會重置中斷狀態,保證
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
//beforeExecute 和 afterExecute 爲空方法,交給子類實現
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); //執行任務
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//執行到這裏時說明線程執行完畢,此方法將線程從 HashSet 中移出。線程終止且沒有引用,會被自動回收。
processWorkerExit(w, completedAbruptly);
}
}
複製代碼
在 runWorker 方法中 getTask 方法返回 null 以後會致使線程執行完畢,被移除出 HashSet,從而被系統銷燬。 線程的超時機制也是在這個方法實現的,藉助於 BlockingQueue 的 poll 和 take 方法。
超時機制實現原理以下:
具體代碼以下:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 容許核心線程超時或者線程數大於核心線程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// timed && timedOut 這兩個參數結合起來控制超時機制
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 隊列爲空時,poll 方法會阻塞等待,超過 keepAliveTime 時返回空值。take 方法會直接返回異常。
// 當 allowCoreThreadTimeOut 爲 true 時,核心線程和非核心線程沒有區別,一概調用poll方法
// 當 allowCoreThreadTimeOut 爲 false 時,線程數量超過核心線程數纔會進入超時機制,若是不超過,則將當前線程看成核心線程處理,調用 take,拋出異常後進入下一次循環。若是隊列爲空,此處會一直循環。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
複製代碼