在 多線程知識梳理(5) - 線程池四部曲之 Executor 框架 中,咱們對Executor
框架以及它的調度模型進行了簡要的介紹,其中用於對線程進行調度和管理的線程池是整個框架的核心,經過線程池咱們能夠:數組
在JDK
包中,ThreadPoolExecutor
就是線程池的具體實現,在閱讀源碼以前,咱們先對它的處理流程進行簡要介紹,當咱們經過execute/submit
方法提交一個任務到線程池後,會通過如下的處理流程: bash
corePoolSize
,則建立新線程來執行任務。corePoolSize
,則將任務加入到等待隊列中。maximumPoolSize
,任務將被拒絕。以上就是ThreadPoolExecutor
對於任務的處理流程,其中有幾點須要說明:多線程
corePoolSize
,那麼銷燬該線程,不然只有當設置了allowCoreThreadTimeOut
,纔會銷燬該線程。從上面的處理流程能夠看出,ThreadPoolExecutor
對於任務的處理流程,會受到corePoolSize
、等待隊列、maximumPoolSize
等參數的影響,而這些參數都是能夠由ThreadPoolExecutor
的建立者去指定的,正是鑑於這種靈活性,使得咱們僅僅經過簡單的配置就能夠實現適用於不一樣的場景的ThreadPoolExecutor
,下面,咱們就來介紹一一介紹這些參數的含義:框架
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
複製代碼
(1) int corePoolSize函數
指定核心線程池的大小,當線程的數量沒有大於corePoolSize
以前,始終會建立新線程來執行分配的任務。若是調用了preStartAllCoreThreads()
方法,線程池會提早建立並啓動全部核心線程。oop
(2) int maximumPoolSizeui
指定線程池的最大數量,當加入新任務時,若是發現等待隊列已經滿了,那麼咱們會嘗試經過建立新線程的方式來執行該任務,而若是此時線程池內線程的數量已經等於maximumPoolSize
,那麼會採用指定的拒絕策略來處理該任務。this
(3) long keepAliveTime 和 TimeUnit unitspa
當一個線程在執行完分配給它的任務以後,會嘗試從等待隊列中取出任務去執行,若是通過keepAliveTime
以後仍然不能從隊列中獲取到任務,說明此時系統中可能並無那麼多的任務須要去處理,那麼就會根據線程池此時的狀態來決定是否銷燬該線程,以保證在可以迅速響應任務的同時,又不至於有太多空閒的存活線程。線程
(4) BlockingQueue workQueue
指定等待隊列的實現方式,咱們能夠根據須要選擇如下幾種等待隊列:
ArrayBlockingQueue
:基於數組結構的有界等待隊列,按先進先出原則排序任務LinkedBlockingQueue
:基於鏈表結構的阻塞隊列,一樣按照先進先出原則排序任務,吞吐量要高於ArrayBlockingQueue
SynchronousQueue
:對於這種阻塞隊列而言,每一個插入操做必需要等到另外一個線程調用移除操做,不然插入操做一直處於阻塞狀態。PriorityBlockingQueue
:一個具備優先級的無限阻塞隊列。(5) ThreadFactory threadFactory
用於建立線程的工廠。
(6) RejectedExecutionHandler handler
系統內置瞭如下幾種策略,用於隊列和線程池都滿了的狀況:
AbortPolicy
:拋出異常,這也是默認的策略CallerRunsPolicy
:使用調用者所在線程來執行任務DiscardOldestPolicy
:先丟棄隊列中最末尾的任務,再從新經過execute
方法執行該任務。DiscardPolicy
:不作任何處理,直接丟棄在 多線程知識梳理(5) - 線程池四部曲之 Executor 框架 中,咱們介紹了幾種ThreadPoolExecutor
的實現:
ThreadPoolExecutor
,只是在構造時傳入了不一樣的參數,以下表所示:
結合以前對於處理流程和核心參數的分析,對它們進行進一步的介紹:
(1) FixedThreadPool
nThread
參數將被做爲核心線程數和最大線程數,當線程池的數量達到nThread
後,以後的任務將會被加入到無界的等待隊列當中nThread
以後將會一直保持不變(2) SingleThreadPoolExecutor
1
,所以以後的任務都被加入到無界隊列當中,而且由線程池中這個惟一的線程從等待隊列中,按照添加的順序依次執行任務(3) CachedThreadPool
SynchonousQueue
,它的每一個插入操做都必須等待另外一個線程的移除操做,對於線程池而言,也就是說:在添加任務到等待隊列時,必需要有一個空閒線程正在嘗試從等待隊列獲取任務,纔有可能添加成功。60s
內都沒法獲取到新的任務,那麼它將會被銷燬。在ThreadPoolExector
中,有一個關鍵變量 - ctl
,理解它是咱們進行源碼走讀的基礎。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0))
複製代碼
這個原子操做類包含了兩部分信息:線程池的狀態、線程池中的存活線程數目,它們用32
位的整型數來表示,其中高3
位表示線程池的狀態,低位表示當前線程池中存活的線程數。
在某一時刻,線程池會處於如下五種狀態之一:
RUNNING
:Accept new tasks and process queued tasks
SHUTDOWN
:Don't accept new tasks, but process queued tasks
STOP
:Don't accept new tasks, don't process queued tasks, and interrupt in-progress tasks
TIDYING
:All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method
TERMINATED
:terminated() has completed
這五種狀態之間轉換轉換圖爲:
對於ctl
變量,如下三個函數能夠用來拆解和組裝:
//獲取線程池的狀態信息
private static int runStateOf(int c) { return c & ~CAPACITY; }
//獲取線程池的存活線程數
private static int workerCountOf(int c) { return c & CAPACITY; }
//將狀態信息和存活線程數進行組合
private static int ctlOf(int rs, int wc) { return rs | wc; }
複製代碼
下面,咱們經過模擬一個任務的執行來對ThreadPoolExecutor
的源碼進行簡單的走讀,整個流程以下圖所示,紅色字部分爲咱們所要關注的關鍵方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
} else if (!addWorker(command, false)) {
reject(command);
}
}
複製代碼
前面咱們說過,向一個線程池中提交任務有兩種方法,execute/submit
,它們最終都會調用到上面的這個execute
當中,這一函數的邏輯分爲三步:
addWorker(firstTask, core)
建立一個新的線程來執行任務,這裏將傳入的runnable
做爲該線程的第一個任務,而且core
參數爲true
,若是建立成功,那麼直接返回,不然從新獲取一次ctl
變量,跳轉到步驟2
ctl
變量判斷若是當前線程池處於running
狀態,那麼將runnable
添加到等待隊列workQueue
當中,若是添加失敗跳轉到步驟3
,添加成功則進行二次檢查,當發現了下面這兩種狀況之一,那麼還須要進行額外的處理:running
狀態,那麼會將該任務從等待隊列中移除;addWorker
方法啓動一個新線程,與第一步不一樣的是,該線程的第一個任務爲空。addWorker
方法建立新線程來執行該任務,和第一步的惟一區別就是core
參數爲false
,若是建立失敗,那麼執行拒絕策略。(2) private boolean addWorker(Runnable firstTask, boolean core) 下面,咱們再來看一下這個核心的函數addWorker
,它的最終目的就是建立一個新的線程來執行任務:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//第一部分:當前線程池的狀態是否知足加入的條件
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//第二部分:當前線程池的容量是否知足加入的條件
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//第三部分:建立工做類Worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
//第四部分:將Worker加入到線程池中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//第五部分:啓動線程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
複製代碼
整個addWorker
進行了如下幾步操做:
Worker
對象,這個Worker
類中包含了一個線程Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
複製代碼
Worker
對象加入到線程池中Worker
中的線程(3) final void runWorker(Worker w)
在第(2)
步中,咱們啓動了Worker
對象中的線程t
,它會調用Worker
對象的run()
函數,接着會執行runWorker
方法:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
複製代碼
這裏面的task
就是咱們經過execute
方法傳入的Runnable
,若是Worker
的第一個任務不爲空,那麼會首先執行該任務,若是第一個任務執行完畢,那麼會調用getTask()
方法來嘗試去獲取下一個任務,當getTask()
方法不返回(等待隊列爲空)時,會一直阻塞在這裏,而當這個while
循環退出的時候,那麼Worker
所對應的線程就會被銷燬。
(4) private Runnable getTask()
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
複製代碼
getTask()
方法會去在第(1)
步中的等待隊列workerQueue
取任務,在獲取任務的時候會考慮超時時間keepAliveTime
,若是超時時間到了仍然沒有獲取到任務,那麼getTask()
方法就會返回null
,從而runWorker()
中的while
循環就會結束,以後在finally
代碼塊中經過processWorkerExit(w, completedAbruptly)
銷燬該線程。
關閉線程池有兩種方法:shutdown
和shutdownNow
。
shutdown
:將線程池的狀態設置成SHUTDOWN
狀態,而後中斷全部沒有正在執行任務的線程。shutdownNow
:將線程池的狀態設置爲STOP
,嘗試中止全部正在執行或暫停任務的線程,並返回等待執行任務的列表。