這是我參與更文挑戰的第9天,活動詳情查看:更文挑戰markdown
執行步驟以下:oop
若是workerCount小於corePoolSize,將任務做爲first Task新建線程來執行任務。 將任務添加到隊列中,若是添加成功依然須要檢查,在進入任務以前,post
若是線程池被關閉,那麼將任務從隊列中移除;ui
若是當前線程池中沒有工做線程,而剛剛在隊列中加入了任務,要保證線程池中至少有一個工做線程能夠處理任務。this
若是不能將任務加入隊列中,嘗試新加一個線程來執行任務,可是並不定會成功,多是線程池被shut down或者線程池達到了飽和(maximumPoolSize),若是失敗了執行拒絕策略。spa
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 高3位表示狀態,低29位任務數量。
//工做線程小於核心線程數,建立新的線程。
if (workerCountOf(c) < corePoolSize) {
//建立新的worker當即執行command,輪詢workQueue處理task。
if (addWorker(command, true))
return;
c = ctl.get();
}
//線程池在運行狀態且能夠將task插入隊列
//第一次校驗線程池在運行狀態
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//第二次校驗,防止在第一次校驗經過後線程池關閉。若是線程池關閉,在隊列中刪除task並拒絕task
if (! isRunning(recheck) && remove(command))
reject(command);
//若是線程數=0(線程都死掉了,好比:corePoolSize=0),新建線程且未指定firstTask,僅僅去輪訓workQueue
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//線程隊列已滿,嘗試建立新線程執行task,建立失敗後拒絕task,建立失敗緣由:1.線程池關閉;2.線程數已經達到maxPoolSize
else if (!addWorker(command, false))
reject(command);
}
複製代碼
addWorker方法主要有兩部分線程
一:判斷是否能夠建立worker。根據自旋、CAS、ctl等,判斷繼續建立仍是返回false,自旋週期通常很短。code
二:同步建立workder,啓動線程。 addWorker會根據當前線程池的工做狀態和給定的界限限制(corePoolSize 和maximumPoolSize)判斷是否能夠添加新的工做線程。 其中每個Worker對象都是一個AQS隊列。orm
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//外層循環判斷線程池的狀態
for (;;) {
//在進行獲取一次上下文操做機制
int c = ctl.get();
int rs = runStateOf(c);
//線程池狀態
// Check if queue empty only if necessary.
// 線程池狀態:RUNNING = -一、SHUTDOWN = 0、 STOP = 一、TIDYING = 二、TERMINATED = 3
//線程池至少是shutdown狀態
if (rs >= SHUTDOWN &&
// 除了線程池正在關閉(shutdown),
// 隊列裏還有未處理的task的狀況,其餘都不能添加
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//內層循環判斷是否到達容量上限,worker+1
for (;;) {
int wc = workerCountOf(c);//worker數量
//worker大於Integer最大上限
//或到達邊界上限
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS worker+1
if (compareAndIncrementWorkerCount(c))
break retry;//成功了跳出循環
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
//若是線程池狀態發生變化,重試外層循環
continue retry;
// else CAS failed due to workerCount change;
// retry inner loop
// CAS失敗workerCount被其餘線程改變,
// 從新嘗試內層循環CAS對workerCount+1
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);
//1.state置爲-1,Worker繼承了AbstractQueuedSynchronizer.
//2.設置firstTask屬性.
//3.Worker實現了Runable接口,將this做爲入參建立線程.
final Thread t = w.thread;
if (t != null) {
//addWorker須要加鎖
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);//workers是HashSet<Worker>
//設置最大線程池大小
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
複製代碼
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
複製代碼
Worker方法實現了Runnable,它將構造的Thread賦值給thread。第一步中的setState(-1),即將AQS中的同步狀態設置爲-1,線程池使用AQS中的同步狀態來判斷該工做線程是否能夠被中斷。-1:初始化值,此時工做線程尚未啓動,也沒有中斷的必要;0:表示接受中斷,此時工做線程爲空閒狀態;1:表示此時工做線程正在執行任務。它實現了非重入互斥鎖,非重入是爲了不線程池的一些控制方法得到重入鎖。注意Worker實現鎖的目的與傳統鎖的意義不太同樣。其主要是爲了控制線程是否可interrupt,以及其餘的監控,如線程是否active(正在執行任務)。對象
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
//每一個worker有本身的內部線程,ThreadFactory建立失敗時是null
//封裝任務線程機制
final Thread thread;
//初始化任務,多是null
Runnable firstTask;
//每一個worker的完成任務數
volatile long completedTasks;
Worker(Runnable firstTask) {
// inhibit interrupts until runWorker //狀態置爲-1,若是中斷線程須要CAS將state 從0-
>1,以此來保證能只中斷從workerQueue getTask的線程
setState(-1); // 禁止線程在啓動前被打斷
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
//核心方法
public void run() {
//首先執行w.unlock,就是把state置爲0,對該線程的中斷就能夠進行了
runWorker(this);
}
// state = 0 表明未鎖;state = 1 表明已鎖
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 在setCorePoolSize/shutdown等方法中斷worker線程時須要調用該方法,
// 確保中斷的是從workerQueue getTask的線程
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
//調用tryRelease修改state=0,LockSupport.unpark(thread) 下一個等待鎖的線程
public boolean isLocked() { return isHeldExclusively(); }
// interrupt已啓動線程
void interruptIfStarted() {
Thread t;
// 初始化是 state = -1,不會被interrupt
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
複製代碼
Worker的run()方法實際調用的是runWorker(this),啓動工做線程(注意這裏不是直接啓動的任務),在工做線程中執行任務。工做線程在這循環,反覆的從隊列中獲取任務並執行它們。
在運行全部任務以前,須要獲取鎖,來保證當任務在運行的時候不會被中斷,除非線程池正在中止(Stop) 總結以下: 1.Worker類主要負責運行線程狀態的控制。 2.Worker繼承了AQS實現了簡單的獲取鎖和釋放所的操做。來避免中斷等待執行任務的線 程時,中斷正在運行中的線程(線程剛啓動,還沒開始執行任務)。 3.本身實現不可重入鎖,是爲了不在實現線程池控狀態控制的方法,例如:setCorePoolSize的時候中斷正在開始運行的線程。setCorePoolSize可能會調用interruptIdleWorkers(),該方法中會調用worker的tryLock()方法中斷線程,本身實現鎖能夠確保工做線程啓動以前不會被中斷
runWorker的主要任務就是一直loop循環處理任務,沒有任務就去getTask(), 代碼以下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 容許被 interrupt
boolean completedAbruptly = true;
try {
//循環獲取任務直至 task = null (線程池關閉、超時等)
// 注意這裏的getTask()方法,咱們配置的阻塞隊列會在這裏起做用
while (task != null || (task = getTask()) != null) {
w.lock(); // 執行任務前上鎖
// 若是線程池中止(處於STOP狀態或者TIDYING、TERMINATED狀態時),設置當前線程處於中斷狀態若是不是(線程就處於RUNNING或者SHUTDOWN狀態),確保線程不中斷,從新檢查當前線程池的狀態是否大於等於STOP狀態
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//在線程運行前調用
beforeExecute(wt, task);
Throwable thrown = null;
try {
//這裏執行run方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//在線程運行後調用
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); // 線程退出工做
}
}
複製代碼
從隊列中獲取Task,大概分爲如下幾個步驟。
判斷線程池以及隊列的狀態,若是線程池狀態在STOP以上,此時線程池不處理隊列中的任務;或者線程池處於SHUTDOWN可是隊列爲空(SHUTDOWN再也不接受新的任務),workerCount減1,返回null,注意此時只是將變量減1,其實工做線程並無終止真正的終止在 processWorkerExit(w, completedAbruptly);中。 若是經過了狀態檢查,判斷是否要進行線程回收,若是須要workerCount數量減1,成功後返回null。 根據timed(timed表示須要進行超時閒置線程回收),選擇是限時等待仍是阻塞的方式從隊列中獲取任務。
方法依據配置的workQueue來工做,其阻塞原理與超時原理基於阻塞隊列實現,再也不詳述。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
//循環
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//線程線程池狀態和隊列是否爲空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//線程數量
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//(當前線程數是否大於最大線程數或者)
//且(線程數大於1或者任務隊列爲空)
//這裏有個問題(timed && timedOut)timedOut = false,好像(timed && timedOut)一直都是false吧
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//獲取任務
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
複製代碼
ThreadPoolExecutor的核心是Worker,Worker實現了AbstractQueuedSynchronizer並繼承了Runnable。aqs鎖運用的極爲巧妙。