在阿里巴巴的Java開發手冊中看到了線程池比較推薦使用ThreadPoolExecutor,因而每次也都是照葫蘆畫瓢地使用,對於其中的參數(corePoolSize, maximumPoolSize,keepAliveTime , workQueue)等徹底靠着yy去使用。每次用的是時候都感受心慌慌的,總算是找了個時間來真正地去閱讀其源碼。java
在使用ThreadPoolExecutor的時候,咱們一般會使用它的以下構造函數,(此處未考慮拒絕策略)安全
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
複製代碼
在這裏主要有四個參數:核心線程池大小、最大線程池大小、存活時間、工做隊列。其實看到這四個參數我是很懵的,好比,核心線程池與最大線程池之間的區別、工做隊列又是用來作什麼的,存活時間指的是誰的存活時間。在講解源碼以前不妨猜猜。併發
這個流程粗看沒太大問題,可是有一塊一方卻異常突兀、反常識,就是workQueue和maximum的順序,在個人想象中應該是先maximum再workQueue。可是事實上的確是先workQueue,再maximum。能夠嘗試運行下面這段demo,函數
public class ThreadPoolExecutorMain {
private static final ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 3, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
public static void main(String[] args) {
for (int i = 1; i <= 20; i++) {
final int tmp = i;
pool.execute(() -> {
try {
Thread.sleep(5000);
System.out.println(tmp);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
複製代碼
在這段demo中,不發生意外的時候,執行順序爲(1,12,13),(2,3,4),(5,6,7),(8,9,10),11,每組內部順序能夠混亂。(注意:在真正使用的時候,咱們須要將ThreadPoolExecutor看成無序的使用)this
首先直接看execute()方法的源碼spa
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1. 判斷core是否塞得下
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2. 判斷workQueue是否塞得下
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. addWorker中判斷max是否塞得下
else if (!addWorker(command, false))
reject(command);
}
複製代碼
在這裏ctl是一個設計很是精巧的狀態管理器,它實際上是一個AtomicInteger,它利用int的前三位來存儲當前線程池的狀態(RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED),後29位用來存儲線程數量。線程
在這段代碼中,咱們能夠看到對線程的執行策略分爲了三個部分:1. core部分 2. workQueue部分 3. max部分。其中workQueue部分比較直觀,就是直接調用workQueue.offer(command)將線程加入了待執行隊列。那麼接下來須要關注的是addWorker()方法。設計
private boolean addWorker(Runnable firstTask, boolean core) {
// 判斷firstTask可否被執行
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN && // SHUTDOWN狀態不會執行新線程,可是能夠執行workQueue中的線程
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY || // 最多支持2^29-1個線程
wc >= (core ? corePoolSize : maximumPoolSize)) // 此處判斷max是否塞得下
return false;
if (compareAndIncrementWorkerCount(c)) // 利用CAS防止併發問題
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
// 執行新的線程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try { // 嘗試添加線程
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) { // SHUTDOWN狀態不會執行新線程,可是能夠執行workQueue中的線程
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 執行線程
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
複製代碼
addWorker()這段代碼看起來比較複雜,可是若是去除掉一些細節和併發安全相關的代碼,總體的代碼邏輯就是判斷線程是否能夠執行,若是能夠執行則新建線程執行。在這段代碼中,咱們能夠看到咱們的線程被封裝到了一個叫作Worker的類中,接下來,咱們繼續探究Worker的源碼。code
在上面的代碼中咱們能夠看到Worker的執行是經過worker.thread.start()來執行的,先看一下構造函數。cdn
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
複製代碼
這裏面Worker又做爲了Runnable參數傳給了Worker.thread。那接下來看run()方法
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { // 獲取Task
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 空方法,擴展使用
Throwable thrown = null;
try {
task.run(); // 執行Task
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 空方法,擴展使用
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
複製代碼
這段run()方法能夠看到ThreadPoolExecutor是經過不停地getTask()來複用線程的,可是到這裏,其實我還有一個疑問,就是ThreadPoolExecutor如何保持線程一直處於存活狀態的。那這個問題一樣經過源碼來繼續解讀。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 判斷是否超時消亡
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 根據超時設置選擇不一樣的策略獲取Task
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
複製代碼
在這段代碼中咱們能夠看到此處利用workQueue是阻塞隊列的特性來保持core線程一直處於存活狀態(workQueue.take),max線程超時消亡(workQueue.poll)。固然在這段代碼中,咱們發現也能夠經過設置ThreadPoolExecutor的allowCoreThreadTimeOut來使得core線程超時消亡。至於workQueue的內部實現(take和poll)此處就不繼續深究下去了。
至此,咱們已經知道了ThreadPoolExecutor的總體執行流程以及經常使用參數的意義,一樣也清楚了流程總覽中的demo代碼的執行結果爲什麼具備順序性。至於workQueue內部的實現就留到下一次,初步看了一下,感受其內部也有不少很是有意思的東西。