在web開發中,服務器須要接受並處理請求,因此會爲一個請求來分配一個線程來進行處理。若是每次請求都新建立一個線程的話實現起來很是簡便,可是存在一個問題: 若是併發的請求數量很是多,但每一個線程執行的時間很短,這樣就會頻繁的建立和銷燬線程,如此一來會大大下降系統的效率。可能出現服務器在爲每一個請求建立新線程和銷燬線程上花費的時間和消耗的系統資源要比處理實際的用戶請求的時間和資源更多。那麼有沒有一種辦法使執行完一個任務,並不被銷燬,而是能夠繼續執行其餘的任務呢?這就是線程池的目的了java
在實際使用中,線程是很佔用系統資源的,若是對線程管理不善很容易致使系統問題。所以,在大多數併發框架中都會使用線程池來管理線程,使用線程池管理線程主要有以下好處:web
下降資源消耗。經過複用已存在的線程和下降線程關閉的次數來儘量下降系統性能損耗;編程
提高系統響應速度。經過複用線程,省去建立線程的過程,所以總體上提高了系統的響應速度:bash
提升線程的可管理性。線程是稀缺資源,若是無限制的建立,不只會消耗系統資源,還會下降系統的穩定性,所以,須要使用線程池來管理線程。服務器
何時使用線程池?多線程
單個任務處理時間比較短
須要處理的任務數量很大
複製代碼
2.線程池的實現原理併發
當向線程池提交一個任務以後,線程池是如何處理這個任務的呢?能夠看看以下步驟:框架
步驟1.線程池判斷核心線程裏的線程是否都在執行任務。分爲兩種狀況,若是有空閒的核心線程,那麼建立一個新的工做線程。則直接執行該任務。若是設置10條核心線程都在執行任務,那麼進入下一個流程。ide
步驟2.線程池會判斷,當前工做隊列是否已經滿了,若是沒有滿,則將該提交的線程任務存儲在這個工做隊列裏面,等待覈心線程有空閒,就會從該隊列取線程出來執行。若是工做隊列滿了,則繼續執行下一個流程。oop
步驟3.線程池會判斷線程池裏的線程(這裏是線程池容許的線程總數)是否都處於執行狀態,若是不是,則建立一個新的線程來執行任務。若是滿了,則交給飽和策略來處理這個任務。
步驟4.按照飽和策略處理沒法執行的任務。
如圖所示:
3.線程池內部
先來關注如下主要的幾個屬性:
//記錄線程池狀態和線程數量(總共32位,前三位表示線程池狀態,後29位表示線程數量)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//線程數量統計位數29 Integer.SIZE=32
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//線程池的運行狀態
// runState存儲在高位中
//接受新任務而且處理阻塞隊列裏的任務
private static final int RUNNING = -1 << COUNT_BITS;
//關閉狀態,再也不接受新提交的任務,但卻能夠繼續處理阻塞隊列中已保存的任務。在線程池處於 RUNNING 狀態時,
//調用 shutdown()方法會使線程池進入到該狀態。(finalize() 方法在執行過程當中也會調用shutdown()方法進入該狀態);
private static final int SHUTDOWN = 0 << COUNT_BITS;
//不能接受新任務,也不處理隊列中的任務,會中斷正在處理任務的線程。在線程池處於 RUNNING 或
// SHUTDOWN 狀態時,調用 shutdownNow() 方法會使線程池進入到該狀態;
private static final int STOP = 1 << COUNT_BITS;
//若是全部的任務都已終止了,workerCount (有效線程數) 爲0,線程池進入該狀態後會調用
// terminated() 方法進入TERMINATED 狀態。
private static final int TIDYING = 2 << COUNT_BITS;
//在terminated() 方法執行完後進入該狀態,默認terminated()方法中什麼也沒有作。
private static final int TERMINATED = 3 << COUNT_BITS;
複製代碼
進入TERMINATED的條件以下: 線程池不是RUNNING狀態; 線程池狀態不是TIDYING狀態或TERMINATED狀態; 若是線程池狀態是SHUTDOWN而且workerQueue爲空; workerCount爲0; 設置TIDYING狀態成功。
線程池狀態轉換
RUNNING -> SHUTDOWN
顯式調用shutdown()方法, 或者隱式調用了finalize()方法
(RUNNING or SHUTDOWN) -> STOP
顯式調用shutdownNow()方法
SHUTDOWN -> TIDYING
當線程池和任務隊列都爲空的時候
STOP -> TIDYING
當線程池爲空的時候
TIDYING -> TERMINATED 當 terminated() hook 方法執行完成時候。
複製代碼
線程狀態的轉換主要是用於判斷線程池中核心線程與工做隊列中的線程和普通線程的狀態在不一樣狀態下,不不一樣處理。還有屬性,咱們後面分析源碼會有用到。
ctl相關方法 這裏還有幾個對ctl進行計算的方法:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
複製代碼
runStateOf:獲取運行狀態; workerCountOf:獲取活動線程數; ctlOf:獲取運行狀態和活動線程數的值。
建立線程池主要是ThreadPoolExecutor類來完成,ThreadPoolExecutor的有許多重載的構造方法,經過參數最多的構造方法來理解建立線程池有哪些須要配置的參數。看源碼ThreadPoolExecutor的構造方法爲:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue, //執行任務以前用於保存任務的隊列
//如下兩個若是不傳入,會設置默認的線程工廠生成以及飽和處理策略
ThreadFactory threadFactory, //當執行程序建立新線程時,使用的自定義的工廠
//去建立個性化線程
RejectedExecutionHandler handler) { //飽和策略的獲取,通常咱們
//能夠傳入一個實現了RejectedExecutionHandler 的子類,實例化成
RejectedExecutionHandler 傳入。
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
複製代碼
下面對參數進行說明: 構造方法中的字段含義以下:
corePoolSize:核心線程數量,當有新任務在execute()方法提交時,會執行如下判斷:
若是運行的線程少於 corePoolSize,則建立新線程來處理任務,即便線程池中的其餘線程是空閒的;
若是線程池中的線程數量大於等於 corePoolSize 且小於 maximumPoolSize,則只有當workQueue滿時才建立新的線程
去處理任務;
若是設置的corePoolSize 和 maximumPoolSize相同,則建立的線程池的大小是固定的,這時若是有新任務提交,
若workQueue未滿,則將請求放入workQueue中,等待有空閒的線程去從workQueue中取任務並處理;
若是運行的線程數量大於等於maximumPoolSize,這時若是workQueue已經滿了,則經過handler所指定的策略來處理任務;
因此,任務提交時,判斷的順序爲 corePoolSize –> workQueue –> maximumPoolSize。
複製代碼
maximumPoolSize:最大線程數量;
workQueue:等待隊列,當任務提交時,若是線程池中的線程數量大於等於corePoolSize而且等待隊列是有界的,非滿的時候,把該任務封裝成一個Worker對象放入等待隊列;
workQueue:保存等待執行的任務的阻塞隊列,當提交一個新的任務到線程池之後, 線程池會根據當前線程池中正在運行着的線程的數量來決定對該任務的處理方式,主要有如下幾種處理方式:
直接切換:這種方式經常使用的隊列是SynchronousQueue,但如今尚未研究過該隊列,這裏暫時還無法介紹;
使用無界隊列:通常使用基於鏈表的阻塞隊列LinkedBlockingQueue。使用這種方式,那麼線程池中可以建立的最大線程數就是corePoolSize,而maximumPoolSize就不會起做用了(後面也會說到)。當線程池中全部的核心線程都是RUNNING狀態時,這時一個新的任務提交就會放入等待隊列中。
3.使用有界隊列:通常使用ArrayBlockingQueue。使用該方式能夠將線程池的最大線程數量限制爲maximumPoolSize,這樣可以下降資源的消耗,但同時這種方式也使得線程池對線程的調度變得更困難,由於線程池和隊列的容量都是有限的值,因此要想使線程池處理任務的吞吐率達到一個相對合理的範圍,又想使線程調度相對簡單,而且還要儘量的下降線程池對資源的消耗,就須要合理的設置這兩個數量。
keepAliveTime:線程池維護線程所容許的空閒時間。當線程池中的線程數量大於corePoolSize的時候,若是這時沒有新的任務提交,核心線程外的線程不會當即銷燬,而是會等待,直到等待的時間超過了keepAliveTime;
threadFactory:它是ThreadFactory類型的變量,用來建立新線程。默認使用Executors.defaultThreadFactory() 來建立線程。使用默認的ThreadFactory來建立線程時,會使新建立的線程具備相同的NORM_PRIORITY優先級而且是非守護線程,同時也設置了線程的名稱。
handler:它是RejectedExecutionHandler類型的變量,表示線程池的飽和策略。若是阻塞隊列滿了而且沒有空閒的線程,這時若是繼續提交任務,就須要採起一種策略處理該任務。線程池提供了4種策略:
下面就來看一下核心的幾個執行的方法
1.submit方法用於提交須要返回值的任務。線程池會返回一個future類型的對象,經過這個future對象能夠判斷任務是否執行成功,而且能夠經過future的get()方法來獲取返回值,get()方法會阻塞當前線程直到任務完成,而使用get(long timeout,TimeUnit unit)方法則會阻塞當前線程一段時間後當即返回,這時候有可能任務沒有執行完。
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
複製代碼
流程步驟以下 1.調用submit方法,傳入Runnable或者Callable對象 2.判斷傳入的對象是否爲null,爲null則拋出異常,不爲null繼續流程 3.將傳入的對象轉換爲RunnableFuture對象 4.執行execute方法,傳入RunnableFuture對象 5.返回RunnableFuture對象
二、execute()
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* clt記錄着runState和workerCount
*/
int c = ctl.get();
/*
* workerCountOf方法取出低29位的值,表示當前活動的線程數;
* 若是當前活動線程數小於corePoolSize,則新建一個線程放入線程池中;
* 並把任務添加到該線程中。
*/
if (workerCountOf(c) < corePoolSize) {
/*
* addWorker中的第二個參數表示限制添加線程的數量是根據corePoolSize來判斷仍是maximumPoolSize來判斷;
* 若是爲true,根據corePoolSize來判斷;
* 若是爲false,則根據maximumPoolSize來判斷
*/
if (addWorker(command, true))
return;
/*
* 若是添加失敗,則從新獲取ctl值
*/
c = ctl.get();
}
/*
* 若是當前線程池是運行狀態而且任務添加到隊列成功
*/
if (isRunning(c) && workQueue.offer(command)) {
// 從新獲取ctl值
int recheck = ctl.get();
// 再次判斷線程池的運行狀態,若是不是運行狀態,因爲以前已經把command添加到workQueue中了,
// 這時須要移除該command
// 執行事後經過handler使用拒絕策略對該任務進行處理,整個方法返回
if (! isRunning(recheck) && remove(command))
reject(command);
/*
* 獲取線程池中的有效線程數,若是數量是0,則執行addWorker方法
* 這裏傳入的參數表示:
* 1. 第一個參數爲null,表示在線程池中建立一個線程,但不去啓動;
* 2. 第二個參數爲false,將線程池的有限線程數量的上限設置爲maximumPoolSize,添加線程時根據maximumPoolSize來判斷;
* 若是判斷workerCount大於0,則直接返回,在workQueue中新增的command會在未來的某個時刻被執行。
*/
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/*
* 若是執行到這裏,有兩種狀況:
* 1. 線程池已經不是RUNNING狀態;
* 2. 線程池是RUNNING狀態,但workerCount >= corePoolSize而且workQueue已滿。
* 這時,再次調用addWorker方法,但第二個參數傳入爲false,將線程池的有限線程數量的上限設置爲maximumPoolSize;
* 若是失敗則拒絕該任務
*/
else if (!addWorker(command, false))
reject(command);
}
複製代碼
簡單來講,在執行execute()方法時若是狀態一直是RUNNING時,的執行過程以下:
若是workerCount < corePoolSize,則建立並啓動一個線程來執行新提交的任務;
若是workerCount >= corePoolSize,且線程池內的阻塞隊列未滿,則將任務添加到該阻塞隊列中;
若是workerCount >= corePoolSize && workerCount < maximumPoolSize,且線程池內的阻塞隊列已滿,則建立並啓動一個線程來執行新提交的任務;
若是workerCount >= maximumPoolSize,而且線程池內的阻塞隊列已滿, 則根據拒絕策略來處理該任務, 默認的處理方式是直接拋異常。
這裏要注意一下addWorker(null, false);,也就是建立一個線程,但並無傳入任務,由於任務已經被添加到workQueue中了,因此worker在執行的時候,會直接從workQueue中獲取任務。因此,在workerCountOf(recheck) == 0時執行addWorker(null, false);也是爲了保證線程池在RUNNING狀態下必需要有一個線程來執行任務。
execute方法執行流程以下:
下面來看下execute方法中調用的幾個重要方法:
addWorker方法
addWorker方法的主要工做是在線程池中建立一個新的線程並執行,firstTask參數 用於指定新增的線程執行的第一個任務,core參數爲true表示在新增線程時會判斷當前活動線程數是否少於corePoolSize,false表示新增線程前須要判斷當前活動線程數是否少於maximumPoolSize,實質就是判斷 若是爲true,則使用corePoolSize綁定,不然爲* maximumPoolSize代碼以下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 獲取運行狀態
int rs = runStateOf(c);
/*
* 這個if判斷
* 若是rs >= SHUTDOWN,則表示此時再也不接收新任務;
* 接着判斷如下3個條件,只要有1個不知足,則返回false:
* 1. rs == SHUTDOWN,這時表示關閉狀態,再也不接受新提交的任務,但卻能夠繼續處理阻塞隊列中已保存的任務
* 2. firsTask爲空
* 3. 阻塞隊列不爲空
*
* 首先考慮rs == SHUTDOWN的狀況
* 這種狀況下不會接受新提交的任務,因此在firstTask不爲空的時候會返回false;
* 而後,若是firstTask爲空,而且workQueue也爲空,則返回false,
* 由於隊列中已經沒有任務了,不須要再添加線程了
*/
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 獲取線程數
int wc = workerCountOf(c);
// 若是wc超過CAPACITY,也就是ctl的低29位的最大值(二進制是29個1),返回false;
// 這裏的core是addWorker方法的第二個參數,若是爲true表示根據corePoolSize來比較,
// 若是爲false則根據maximumPoolSize來比較。
//
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 嘗試增長workerCount,若是成功,則跳出第一個for循環
if (compareAndIncrementWorkerCount(c))
break retry;
// 若是增長workerCount失敗,則從新獲取ctl的值
c = ctl.get(); // Re-read ctl
// 若是當前的運行狀態不等於rs,說明狀態已被改變,返回第一個for循環繼續執行
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 根據firstTask來建立Worker對象
w = new Worker(firstTask);
// 每個Worker對象都會建立一個線程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN表示是RUNNING狀態;
// 若是rs是RUNNING狀態或者rs是SHUTDOWN狀態而且firstTask爲null,向線程池中添加線程。
// 由於在SHUTDOWN時不會在添加新的任務,但仍是會執行workQueue中的任務
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// workers是一個HashSet
workers.add(w);
int s = workers.size();
// largestPoolSize記錄着線程池中出現過的最大線程數量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 啓動線程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
複製代碼
注意一下這裏的t.start()這個語句,啓動時會調用Worker類中的run方法,Worker自己實現了Runnable接口,因此一個Worker類型的對象也是一個線程。
Worker類 線程池中的每個線程被封裝成一個Worker對象,ThreadPool維護的其實就是一組Worker對象,看一下Worker的定義:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** 此線程用來存儲進入線程池的線程 */
final Thread thread;
/** 要運行的初始化任務,可能爲null */
Runnable firstTask;
/** 線程的任務計數器數量 */
volatile long completedTasks;
/**
* 使用給定的第一個任務和ThreadFactory中的線程建立。
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // 在runWorker以前禁止中斷
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** 委託主線程運行到外部的 runWorker */
public void run() {
runWorker(this);
}
// 鎖定方法
//
// 值爲0表示爲未鎖定狀態
// 值爲1表示爲鎖定狀態
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
//中斷運行中的線程,是線程池調用shutdown或者tryTerminate方法時,最終會調用這個方法去中斷線程。
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
複製代碼
Worker類繼承了AQS,並實現了Runnable接口,注意其中的firstTask和thread屬性:firstTask用它來保存傳入的任務;thread是在調用構造方法時經過ThreadFactory來建立的線程,是用來處理任務的線程。
在調用構造方法時,須要把任務傳入,這裏經過getThreadFactory().newThread(this);來新建一個線程,newThread方法傳入的參數是this,由於Worker自己繼承了Runnable接口,也就是一個線程,因此一個Worker對象在啓動的時候會調用Worker類中的run方法。
Worker繼承了AQS,使用AQS來實現獨佔鎖的功能。爲何不使用ReentrantLock來實現呢?能夠看到tryAcquire方法,它是不容許重入的,而ReentrantLock是容許重入的:
一、lock方法一旦獲取了獨佔鎖,表示當前線程正在執行任務中; 二、若是正在執行任務,則不該該中斷線程; 三、若是該線程如今不是獨佔鎖的狀態,也就是空閒的狀態,說明它沒有在處理任務,這時能夠對該線程進行中斷; 四、線程池在執行shutdown方法或tryTerminate方法時會調用interruptIdleWorkers方法來中斷空閒的線程,interruptIdleWorkers方法會使用tryLock方法來判斷線程池中的線程是不是空閒狀態;
之因此設置爲不可重入,是由於咱們不但願任務在調用像setCorePoolSize這樣的線程池控制方法時從新獲取鎖。若是使用ReentrantLock,它是可重入的,這樣若是在任務中調用瞭如setCorePoolSize這類線程池控制的方法,會中斷正在運行的線程。 因此,Worker繼承自AQS,用於判斷線程是否空閒以及是否能夠被中斷。
此外,在構造方法中執行了setState(-1);,把state變量設置爲-1,爲何這麼作呢?是由於AQS中默認的state是0,若是剛建立了一個Worker對象,尚未執行任務時,這時就不該該被中斷,看一下tryAquire方法:
tryAcquire方法是根據state是不是0來判斷的,因此,setState(-1);將state設置爲-1是爲了禁止在執行任務前對線程進行中斷。
正由於如此,在runWorker方法中會先調用Worker對象的unlock方法將state設置爲0.
runWorker方法
在Worker類中的run方法調用了runWorker方法來執行任務,runWorker方法的代碼以下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 獲取第一個任務
Runnable task = w.firstTask;
w.firstTask = null;
// 容許中斷
w.unlock(); // 容許中斷
// 是否由於異常退出循環
boolean completedAbruptly = true;
try {
// 若是task爲空,則經過getTask來獲取任務
while (task != null || (task = getTask()) != null) {
w.lock();
// 若是線程池狀態爲中止,能夠中斷;
// 若是不是,確保線程不能夠被中斷
// 在第二種狀況下須要從新檢查以在清除中斷時處理
//runStateAtLeast判斷當前狀態是否爲TIDYING或TERMINATED,來控制中斷線程.
STOP 值爲 1,>stop狀態的,只能爲中斷和結束狀態
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//原來執行已廢棄,如今執行的實際上是空方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); //容許線程
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++; //任務數量加一
w.unlock();
}
}
completedAbruptly = false; //執行到這一步,說明沒有異常
} finally {
processWorkerExit(w, completedAbruptly);
}
}
複製代碼
這裏說明一下第一個if判斷,目的是:
若是線程池正在中止,那麼要保證當前線程是中斷狀態; 若是不是的話,則要保證當前線程不是中斷狀態; 這裏要考慮在執行該if語句期間可能也執行了shutdownNow方法,shutdownNow方法會把狀態設置爲STOP,回顧一下STOP狀態值爲1:不能接受新任務,也不處理隊列中的任務,會中斷正在處理任務的線程。在線程池處於 RUNNING 或 SHUTDOWN 狀態時,調用 shutdownNow() 方法會使線程池進入到該狀態。
STOP狀態要中斷線程池中的全部線程,而這裏使用Thread.interrupted()來判斷是否中斷是爲了確保在RUNNING或者SHUTDOWN狀態時線程是非中斷狀態的,由於Thread.interrupted()方法會復位中斷的狀態。
總結一下runWorker方法的執行過程:
while循環不斷地經過getTask()方法獲取任務; getTask()方法從阻塞隊列中取任務; 若是線程池正在中止,那麼要保證當前線程是中斷狀態,不然要保證當前線程不是中斷狀態; 調用task.run()執行任務; 若是task爲null則跳出循環,執行processWorkerExit()方法; runWorker方法執行完畢,也表明着Worker中的run方法執行完畢,銷燬線程。 這裏的beforeExecute方法和afterExecute方法在ThreadPoolExecutor類中是空的,留給子類來實現。
completedAbruptly變量來表示在執行任務過程當中是否出現了異常,在processWorkerExit方法中會對該變量的值進行判斷。
下面來看看如何getTask()
private Runnable getTask() {
// timeOut變量的值表示上次從阻塞隊列中取任務時是否超時
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/*
* 若是線程池狀態rs >= SHUTDOWN,也就是非RUNNING狀態,再進行如下判斷:
* 1. rs >= STOP,線程池是否正在stop;
* 2. 阻塞隊列是否爲空。
* 若是以上條件知足,則將workerCount減1並返回null。
* 由於若是當前線程池狀態的值是SHUTDOWN或以上時,不容許再向阻塞隊列中添加任務。
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// timed變量用於判斷是否須要進行超時控制。
// allowCoreThreadTimeOut默認是false,也就是核心線程不容許進行超時;
// wc > corePoolSize,表示當前線程池中的線程數量大於核心線程數量;
// 對於超過核心線程數量的這些線程,須要進行超時控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/*
* wc > maximumPoolSize的狀況是由於可能在此方法執行階段同時執行了setMaximumPoolSize方法;
* timed && timedOut 若是爲true,表示當前操做須要進行超時控制,而且上次從阻塞隊列中獲取任務發生了超時
* 接下來判斷,若是有效線程數量大於1,或者阻塞隊列是空的,那麼嘗試將workerCount減1;
* 若是減1失敗,則返回重試。
* 若是wc == 1時,也就說明當前線程是線程池中惟一的一個線程了。
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/*
* 根據timed來判斷,若是爲true,則經過阻塞隊列的poll方法進行超時控制,若是在keepAliveTime時間內沒有獲取到任務,則返回null;
* 不然經過take方法,若是這時隊列爲空,則take方法會阻塞直到隊列不爲空。
*
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 若是 r == null,說明已經超時,timedOut設置爲true
timedOut = true;
} catch (InterruptedException retry) {
// 若是獲取任務時當前線程發生了中斷,則設置timedOut爲false並返回循環重試
timedOut = false;
}
}
}
複製代碼
這裏重要的地方是第二個if判斷,目的是控制線程池的有效線程數量。由上文中的分析能夠知道,在執行execute方法時,若是當前線程池的線程數量超過了corePoolSize且小於maximumPoolSize,而且workQueue已滿時,則能夠增長工做線程,但這時若是超時沒有獲取到任務,也就是timedOut爲true的狀況,說明workQueue已經爲空了,也就說明了當前線程池中不須要那麼多線程來執行任務了,能夠把多於corePoolSize數量的線程銷燬掉,保持線程數量在corePoolSize便可。
何時會銷燬?固然是runWorker方法執行完以後,也就是Worker中的run方法執行完,由JVM自動回收。
getTask方法返回null時,在runWorker方法中會跳出while循環,而後會執行processWorkerExit方法。
最後看下processWorkerExit方法,這是execute源碼解析中的最後一個方法了。
/*
* 此方法從工做集中刪除線程,而且*若是因爲用戶任務異常退出,或者若是正在*運行少於
* corePoolSize工做者或者隊列非空但*沒有工做者,則可能終止池或替換工做者
* 第二個參數表示,若是工做線程是否因爲用戶線程異常而終止的
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 若是completedAbruptly值爲true,則說明線程執行時出現了異常,須要
// 將workerCount減1;
// 若是線程執行時沒有出現異常,說明在getTask()方法中已經已經對workerCount進行了減1操做,這裏就沒必要再減了。
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //統計完成的任務數 completedTaskCount += w.completedTasks; // 從workers中移除,也就表示着從線程池中移除了一個工做線程 workers.remove(w); } finally { mainLock.unlock(); } // 根據線程池狀態進行判斷是否結束線程池 tryTerminate(); int c = ctl.get(); /* * 當線程池是RUNNING或SHUTDOWN狀態時,若是worker是異常結束,那麼會直接addWorker; * 若是allowCoreThreadTimeOut=true,而且等待隊列有任務,至少保留一個worker; * 若是allowCoreThreadTimeOut=false,workerCount很多於corePoolSize。 */ if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } } 複製代碼
至此,processWorkerExit執行完以後,工做線程被銷燬,以上就是整個工做線程的生命週期,從execute方法開始,Worker使用ThreadFactory建立新的工做線程,runWorker經過getTask獲取任務,而後執行任務,若是getTask返回null,進入processWorkerExit方法,整個線程結束,如圖所示:
下面還有第二篇,主要講如何關閉線程池和終止的幾個方法。將緊接着第一篇繼續講。juejin.im/post/5d6886…
喜歡歡迎關注我哦
摘自: 《Java併發編程的藝術》 JDK8源碼 深刻理解Java線程池 www.ideabuffer.cn/2017/04/04/…