「本文已參與好文召集令活動,點擊查看:後端、大前端雙賽道投稿,2萬元獎池等你挑戰!」前端
ThreadPoolExecutor是Java的線程池併發代名詞,多線程開發基本都是基於這個去作具體的業務開發。雖然以爲本身回了,網上帖子已經有不少的文章寫這個,可是是本身一一點寫的,終歸是要比看別人的理解更加深入,因此最近本身在對java知識的系統梳理。 那麼接下來主要分析下這個多線程框架的原理。java
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
複製代碼
面試靠的最可能是這個構造函數中7個參數的做用,面試
成員變零後端
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1; //線程容量
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
複製代碼
面試最喜歡問的是 ctl變量的表明什麼意義? ctl變量的的的用高3位表示線程池的狀態,用低29位表示線程個數,二者經過 | 操做,拼接出ctl變量,也就是線程池的最大線程數capacity是 (2^29)-1。markdown
首先咱們來看平時業務代碼是提交任務到線程池執行的函數是經過execute或者submit方法, 區別就是submit返回具備Future,execute返回void,的、那麼接下來咱們主要分析execute 的執行流程,submit涉及到線程異步返回,以後會另外單獨分析,那麼下面這個execute函數 就能看出線程池的整個執行流程,多線程
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
// 當線程池的核心線程數設置爲0狀況下,那麼這時workerCountOf(recheck)爲0,這時就開啓非線程數處理隊列任務
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
複製代碼
線程池執行任務流程圖以下: 我相信大概的流程通常同窗是清楚的:併發
實際源碼中執行流程還有一些小細節容易被忽略的地點框架
線程池新增工做任務主要addWorker方法。因爲代碼比較長,我就在 代碼裏寫好註釋less
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
//第一個條件: 線程至少不是運行狀態,那麼就是shutdown stop tidying,terminated狀態
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
//第二個條件: 當前線程池是shutdown狀態且任務隊列非空而且工做任務第一個任務是空的取反條件,這個含義是當除了SHUTDOWN狀態且第一個任務爲空且任務隊列不爲空
// 狀況下,直接返回false,增長Work線程失敗
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
// 線程池是running狀態
(rs == SHUTDOWN && firstTask == null)) {
//線程池處於shutdown狀態而且第一個task爲空
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//加入工做線程的集合
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
//記錄最大線程數
largestPoolSize = s;
workerAdded = true;
}
} finally {
- mainLock.unlock();
- }
- if (workerAdded) {
- t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
複製代碼
添加工做線程主要步驟異步
若是WorkerAdded失敗,則從Worder的Set移除剛纔加入Worker線程,並將線程池的線程數減1,
首先來看下Work的類的成員變量的構造函數,從下面的Work的代碼,能夠看到它是實現了 RUnnable接口,上一節Worker啓動是調用了它的start方法,真正由操做系統調度執行 的其run方法,那麼接下來重點看下run的工做流程。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
//初始化狀態爲-1,表示不能被中斷
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
複製代碼
下面代碼中Work的run直接調用runWork,並傳入自身對象, 開始一個循環判斷 第一個任務後者從任務隊列中取任務不爲空,就開始上鎖,而後執行任務,若是任務 隊列爲空了,則處理Work的退出。
/** Delegates main run loop to outer runWorker */
public void run() {
//直接調用runWorker函數
runWorker(this);
}
final void runWorker(Worker w) {
// Wokder當前線程
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//將state值賦值爲0,這樣就運行中斷
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 循環判斷第一個Task獲取從獲取任務
while (task != null || (task = getTask()) != null) {
//獲取當前Work的鎖,處理任務,也就是當前Work線程處理是同步處理任務的
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//線程池的狀態至少是stop,即便stop,tidying.terminated狀態
if ((runStateAtLeast(ctl.get(), STOP)
//檢查線程是否中斷且清楚中斷
|| (Thread.interrupted()
&&
//再次檢查線程池的狀態至少是STOP
runStateAtLeast(ctl.get(), STOP))) &&
//再次判斷是否中斷
!wt.isInterrupted())
//中斷線程
wt.interrupt();
try {
//執行業務任務前處理(鉤子函數)
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 這裏就是執行提交線程池的Runnable的任務的run方法 task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//執行業務任務後處理(鉤子函數)
afterExecute(task, thrown);
}
} finally {
//執行結束重置爲空,回到while循環拿下一個
task = null;
//處理任務加1
w.completedTasks++;
//釋放鎖,處理下一個任務
w.unlock();
}
}
//代碼執行到這裏,表明業務的任務沒有異常,否則不會走到這裏,
//由於上一層try沒有catch異常的,而業務執行出現異常,最裏層
//雖然catch了異常,可是也都經過throw向外拋出
completedAbruptly = false;
} finally {
//若是循環結束,則處理Work退出工做,表明任務拿不到任務,即任務隊列沒有任務了
processWorkerExit(w, completedAbruptly);
}
}
複製代碼
下面就來看下getTask獲取任務隊列的處理邏輯 、 若是這裏返回null,即runWorker循環退出,則會處理finnaly中processWorkExit, 處理Work線程的退出,下面是getWork返回null的狀況:
private Runnable getTask() {
//超時標誌
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
//獲取線程狀態
int c = ctl.get();
//線程狀態
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 若是線程池狀態值至少是SHUTDOWN狀態,
if (rs >= SHUTDOWN
線程池狀態值至少是STOP狀態,或者是任務隊列是空
&& (rs >= STOP || workQueue.isEmpty())) {
// CAS將worker線程數減1
decrementWorkerCount();
return null;
}
//計算線程池線程數量
int wc = workerCountOf(c);
// Are workers subject to culling?
// allowCoreThreadTimeOut參數設置爲true,或則線程池的線程數大於corePoolSize, 表示須要超時的Worker須要退出,
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//線程數大於最大線程數 || 已經超時
if ((wc > maximumPoolSize || (timed && timedOut))
// 線程數大於1 或者 任務隊列爲空
&& (wc > 1 || workQueue.isEmpty())) {
// CAS將線程數減1
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 須要處理超時的Worker,則獲取任務隊列中任務等待的時間
//就是線程池構造函數中keepAliveTime時間,若是不處理超時的Worker
//則直接調用take一直阻塞等待任務隊列中有任務,拿到就返回Runnale任務
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
複製代碼
Worker的退出處理: 1 從上面分析知道completedAbruptly是任務執行時是否出現異常標誌, 若是任務執行過程出錯,則將線程池的線程數量減1 2.加線程池的mainLock的全局鎖,這裏主要區分Worker執行任務中,拿的是Worker內部的鎖,完成任務加1,將worker從Worker的集合移除, 3. 執行tryTerminate函數,是否線程池線程池是否關閉 4. 根據線程池狀態是否補充非核心的Worker線程去處理
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//任務執行時出現異常,則減去工做
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
//拿到線程池的主鎖
final ReentrantLock mainLock = this.mainLock;
//加鎖
mainLock.lock();
try {
//完成任務加1
completedTaskCount += w.completedTasks;
//將worker從Worker的集合移除
workers.remove(w);
} finally {
mainLock.unlock();
}
//嘗試線程池關閉
tryTerminate();
//獲取線程池的ctl
int c = ctl.get();
//若是線程池的狀態值小於STOP,即便SHUTDOWN RUNNING
if (runStateLessThan(c, STOP)) {
//任務執行沒有異常
if (!completedAbruptly) {
//allowCoreThreadTimeOut參數true,則min=0,表示不須要線程常駐。
//負責是有corePoolSize個線程常駐線程池
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//若是線程池數大於最小,也就是不須要補充線程執行任務隊列的任務
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 走到這裏表示線程池的線程數爲0,而任務隊列又不爲空,得補充一個線程處理任務 addWorker(null, false);
}
}
複製代碼
tryTerminate的邏輯是處理線程池關閉的場景
final void tryTerminate() {
for (;;) {
int c = ctl.get();
//線程池是RUNNING狀態
if (isRunning(c) ||
//線程池狀態至少是TIDYING
runStateAtLeast(c, TIDYING) ||
//線程池狀態是SHUTDOWN可是隊列不爲空
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
//中斷一個空閒線程
interruptIdleWorkers(ONLY_ONE);
return;
}
//只有最後一個線程才能走到這裏,處理線程池從TIDYIING狀態
//到TERMINATED狀態
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//鉤子函數
terminated();
} finally {
//設置線程池TERMINATED狀態
ctl.set(ctlOf(TERMINATED, 0));
//喚醒調用awaitTermination的線程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
複製代碼
當線程池沒法處理任務時的處理策略:
1.默認拒絕策略是AbortPolicy 直接拋出RejectedExecutionException異常
2.DiscardPolicy 直接丟棄任務
3.DiscardOldestPolicy 丟棄任務隊列中最老的任務,這裏以前理解是直接丟棄,其實看了源碼以後,其實它仍是當線程池還咩有關閉時,嘗試去提交該任務到線程池去執行
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
複製代碼
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
複製代碼
總結 本文主要就線程池的狀態轉換、工做線程Worker建立以及執行任務隊列中任務的流程、拒絕策略的詳細分析。