本文章出處 線程池ThreadPoolExecutor 瞭解 轉載請說明git
Java經過Executors
靜態方法建立4種不一樣類型線程池。github
像newSingleThreadExecutor、newFixedThreadPool、newCachedThreadPool都時內部封裝ThreadPoolExecutor
生成線程池的,下面具體分析ThreadPoolExecutor這個類。緩存
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
複製代碼
建立線程池基本核心構造參數咱們已經知道了,可是咱們還有不少問題沒有搞明白的。怎麼知道線程池內每一個線程運行狀態,是在工做中仍是空閒呢?是否是有一個專門線程去標記空閒線程活動時間?線程是如何實現共用線程。 帶着這些問題去閱讀代碼。安全
如下內容都是來自ThreadPoolExecutor
代碼註釋。 線程池內的線程狀態都是有一個AtomicInteger ctl
保持的,是一個原子整數,包裝了兩個領域含義。 多線程
workerCount
有效的線程數 ,線程總數2 ^ 29 -1 ,線程啓動數量不包括線程中止的數量,而該值多是 與活動線程的實際數量暫時不一樣。例如當ThreadFactory建立線程失敗時,線程正在執行退出,統計線程數量依然包括退出的線程。函數
runState
線程狀態oop
RUNNING
正在接受新的任務而且處理隊列中的任務SHUTDOWN
不接受新的任務,可是能處理任務STOP
不能接受新的任務,不能處理隊列中的任務,可是能夠中斷正在執行的任務。TIDYING
全部的任務終止,workerCount爲0 ,線程所有過渡到TIDYING狀態,即將運行terminated() 鉤子方法TERMINATED
terminated() 鉤子方法執行完成這些狀態都有一個轉換順序this
RUNNING -> SHUTDOWN
執行shutdown()(RUNNING or SHUTDOWN) -> STOP
執行shutdownNow()SHUTDOWN -> TIDYING
當任務隊列和線程池都是空STOP -> TIDYING
線程池都是空TIDYING -> TERMINATED
當 terminated()鉤子方法執行完 這些狀態具體代碼實現private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
複製代碼
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/* * 處理3個步驟 * 1. 若是正在運行的線程數量小於核心線程數,直接建立一個新的線程去執行任務 * 調用addWorker 方法自動檢查 線程狀態和數量,避免在不能添加線程時添加線程出現錯誤警報 * * 2. 若是任務能夠成功進入隊列,咱們仍然須要雙重檢查是否添加一個線程 * 由於存在上次檢查時有線程死亡或者當咱們進入方法時線程池正在關閉 * 所以,咱們從新檢查狀態,若是中止,則回滾排隊,若是沒有,則啓動新線程。 * * 3. 添加任務失敗,則嘗試建立一個線程,若是失敗了,使用拒絕策略 * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { //當前線程數量小於核心線程數
if (addWorker(command, true)) // 建立線程
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { //線程池狀態RUNNING 而且 任務添加成功
int recheck = ctl.get(); // 第二重檢查
if (! isRunning(recheck) && remove(command)) //判斷線程池狀態 刪除任務修改狀態
reject(command);
else if (workerCountOf(recheck) == 0) //線程池數量爲0
addWorker(null, false);
}
else if (!addWorker(command, false)) //線程池狀態不爲RUNNING 或者 隊列已滿再或者線程大於最大線程數而且任務隊列滿了
reject(command);
}
複製代碼
下一步咱們進入addWorker
建立線程的核心方法spa
private boolean addWorker(Runnable firstTask, boolean core) {
retry: //retry標記,第一次看到 😓
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN) //至少SHUTDOWN
&& (runStateAtLeast(c, STOP) // 至少STOP 都是不合法
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) { //狀態合法
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) //大於核心線程或者最大線程都不須要建立線程,和掩碼相與防止最大線程數超過2 ^ 29 - 1 細節啊
return false;
if (compareAndIncrementWorkerCount(c)) // ctl 自增成功,跳出整個循環
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN)) //狀態至少SHUTDOWN 從新進入循環
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); //建立線程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//在加鎖期間從新檢查線程池狀態
int c = ctl.get();
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.isAlive()) // 剛建立線程已經開始執行任務,這是有問題
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); //啓動任務
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
複製代碼
addWorker()
主要流程檢查線程池狀態是否合法,建立新的線程,加入workers中,調用start()
執行任務。咱們去了解下Worker 類線程
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
// TODO: switch to AbstractQueuedLongSynchronizer and move
// completedTasks into the lock word.
/** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker. */
public void run() {
runWorker(this);
}
}
複製代碼
Worker其實就是Runnable包裝類,可是增長了任務中斷功能,他的主要任務就是維護中斷狀態,繼承AQS能夠簡化獲取和釋放圍繞每一個任務執行的鎖定,防止旨在喚醒等待任務的工做線程的中斷。 瞭解Worker怎麼執行任務的進入runWorker()
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; //取出任務
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { //若是當前worker沒有任務,從隊列中獲取任務,直到隊列爲空
w.lock();
//處理線程中斷機制
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); //前置處理,相似攔截器機制,須要子類去實現
try {
task.run(); //調用任務方法
afterExecute(task, null); //後置處理
} catch (Throwable ex) {
afterExecute(task, ex); //異常處理
throw ex;
}
} finally {
task = null;
w.completedTasks++; //執行任務數量+ 1
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); //線程生命週期走完,執行回收工做
}
}
複製代碼
結合Worker構造函數,Worker在初始化就本身給本身上鎖了,避免線程在任務尚未開始的狀況下就被中斷了 。啓動線程執行runWorker方法,取出任務,釋放鎖,若是Worker中的任務爲空,從隊列中拉取任務。處理線程中斷,主要依據第一線程狀態已經至少STOP狀態,而後清除中斷狀態,在判斷線程沒有中斷信號了,再發送中斷信號。按照做者註釋的意思就是當線程池已經在中止過程當中,線程應該中斷,可是必須雙重檢查防止關閉過程當中競爭發送中繼信號。調用run方法執行任務。爲何要上鎖執行任務,主要是執行任務過程,必需要獲取鎖才能中斷線程的,可是Worker自己不支持重入鎖的,只有在任務開始關閉過程才能中斷。 在這裏咱們終於看到線程共用方式了,經過線程不斷從隊列中獲取任務,而後再進行調用run方法執行任務,當線程退出獲取隊列循環,線程生命週期就結束了。
####geTask()
private Runnable getTask() {
boolean timedOut = false; //上一次拉取是否超時
for (;;) {
int c = ctl.get();
//檢查線程池狀態是SHUTDOWN 不接受新的任務
// 任務隊列爲空
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount(); //核心線程數workerCount -1
return null;
}
int wc = workerCountOf(c);
// allowCoreThreadTimeOut 空閒狀況下是否回收核心線程數 默認是false
// 當前線程數大於 核心線程數
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// wc 大於最大線程數 ,先處理線程數量
// 線程在存活的時間內沒有獲取到任務,則須要回收掉,上一個循環的,線程數-1
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) { //wc 不要爲0,任務隊列爲空的狀況
if (compareAndDecrementWorkerCount(c)) //線程-1成功沒有其餘線程競爭,沒有新增任務
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //超時會返回空
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) { //中斷等待獲取任務,放棄執行任務
timedOut = false;
}
}
}
複製代碼
這裏咱們知道空閒時間是怎麼回收線程的,經過同步性隊列poll() + 超時時間知道一個線程在這個時間內沒有任務執行,線程池處於空閒狀態的,返回null給調用方法,跳出while循環,結束整個線程的生命週期。 ####進入processWorkerExit()
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) //若是沒有執行到任務,核心線程-1
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w); //移除當前worker ,線程會被回收掉
} finally {
mainLock.unlock();
}
tryTerminate(); //判斷線程池內狀態,是否對線程池發出關閉信號
int c = ctl.get();
if (runStateLessThan(c, STOP)) { //線程池在RUNNABLE或者SHUTDOWN狀態,線程池任然能夠執行任務或者接受任務
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty()) //線程池內線程已經被回收完了而且任務尚未執行完
min = 1;
if (workerCountOf(c) >= min) //線程池內線程數量大於核心線程池,不須要新建線程去處理
return; // replacement not needed
}
addWorker(null, false); //建立新的線程處理任務
}
}
複製代碼
####進入 tryTerminate() 在線程池SHUTDOWN狀態線程爲0和任務隊列爲空的狀況,或者STOP狀態核心隊列爲空狀況,線程池狀向TIDYING轉移,傳播關閉池信號。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) || //RUNNING 狀態不須要處理
runStateAtLeast(c, TIDYING) || //已經進入TIDYING,也不作處理
(runStateLessThan(c, STOP) && ! workQueue.isEmpty())) //任務隊列不爲空,不知足條件
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE); 嘗試去中斷一個worker
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); //加鎖修改線程池狀態
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // 進入TIDYING狀態
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0)); //執行完terminated() 進入TERMINATED狀態
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
複製代碼
####shutdown() 再去了解下線程池終止方法
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN); //修改線程池狀態爲SHUTDOWN
interruptIdleWorkers(); //中斷線程
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
複製代碼
進入interruptIdleWorkers() 怎麼中斷線程
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); //加鎖主要是workers 是一個不安全集合
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) { //沒有中斷和 可以獲取到鎖,說明此線程池沒有在執行任務,Worker 是不支持重入的
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
複製代碼
處理方法挺簡單的,修改線程池狀態不要接收新的任務,將works中空閒線程取出發出中斷信號。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue(); //刪除隊列中的任務,返回給tasks
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
複製代碼
shutdownNow 會將隊列中尚未來得及處理任務所有刪除掉,直接調用tryTerminate()終止線程池生命週期。
如今咱們知道線程池內部機制是如何建立線程,共用線程,空閒回收,線程池的生命週期。調用execute()提交任務,若是當前線程池數量小於核心線程數,調用addWorker()建立一個新的線程池去執行任務,不然直接加入到隊列中。在addWorker()啓動一個線程去不斷從隊列拉取任務,直到一個隊列存活時間沒有任務執行或者隊列爲空,線程纔會被回收掉。