派生體系
java.util.concurrent
ThreadPoolExecutor
AbstractExecutorService
ExecutorService
Executor
這個類是Executor框的核心實現,它的名字向咱們代表,它是使用thread pool實現的。這個thread pool主要解決了兩個問題:
- 執行大量的單個異步任務,通常狀況下,它能提高總體的性能。
- 執行由多個任務組成的任務集合,經過Future列表返回每一個任務的執行結果。
設計原理

重要概念
爲了可以在更多上下文環境中使用,ThreadPool定義了一些概念,這些概念都直接或間接對應着可調節參數,若是不瞭解這些概念含義,很難正確地使用這些參數。下面來看一下這些概念及其含義:
當前,核心,最小,最大線程數(poolSize, corePoolSize, minimumPoolSize, maximumPoolSize)
poolSize: 當前處於運行和空閒狀態的總線程數。
corePoolSize: 核心線程數, 當poolSize<=corePoolSize時,存在的線程稱爲coreThread。
minimumPoolSize: 最小線程數,當minimumPoolsize = allowCoreThreadTimeOut ? 0 : corePoolSize ,
maximunPoolSize: 最大線程數。
ThreadPool在運行過程當中會自動的調節線程數量(poolSize), 通常來講,poolSize處於[corePoolSize maximumPoolSize]區間以內。
當用戶調用execute提交一個任務時,若是poolSize<corePoolSize, 會建立一個新線程處理這個任務。若是若是poolSize處於[corePoolSize maximumPoolSize]區間內,只有隊列盡是纔會建立新線程。不管如何,poolSize不會大於maximumPoolSize。
默認狀況下,ThreadPool沒有收到任何任務時pooSize = 0, 只有當ThreadPool開始收到任務以後纔會建立線程。可是能夠經過覆蓋prestartCoreThread或prestartAllCoreThreads方法改變這種行爲,提早建立線程。
線程工廠--ThreadFactory
ThreadPool使用實現了ThreadFactory接口的實現建立新線程。Executors工廠類提供了defaultThreadFactory方法,該方法返回一個默認的ThreadFactory實例。使用這個實例建立的線程具備相同的優先級,是非後臺線程,命名上使用相同的前綴。若是不滿意這這些行爲,能夠本身實現一個ThreadFactory交給ThreadPool使用。
保持存活時間(keepAliveTime)
若是poolSize > corePoolSize, 當一個線程的空閒時間大於keepAliveTime, 它會被終止掉。默認狀況下當poolSize <= corePoolSize時,keepAliveTime不會有影響,若是調用 allowCoreThreadTimeOut(true), 可讓keepAliveTime在這個時間也起做用。
任務排隊(queuing)
任何BlockingQueue的實例均可以用於保存排隊的任務。不一樣的BlockingQueue實現決定了不一樣的排隊策略:
SynchronousQueue: 同步隊列,當提交一個任務時,要求ThreadPool中當前有至少一個空閒線程,或者能夠建立新的線程(poolSize < maximumPoolSize)當即執行這個任務,不然ThreadPool會拒絕這個任務。
LinkedBlockingQueue: 無限制的隊列(只受限於可以使用的內存), 不會處於full狀態, offer方法不會返回false,這意味這ThreadPool的pooSize<=corePoolSize, 不會建立大於corePoolSize的線程數。
ArrayBlockingQueue: 有限制的隊列, 受限於它的capacity。當poolSize == corePoolSize且隊列沒滿時, 新提交的任務會追加到隊列中排隊執行。 當poolSize在[corePoolSize maximumPooSize)區間同時隊被填列滿時,將會建立新的線程。直到poolSize == maximumPoolSize位置。 若是隊列被填滿同時pooSize == maximumPoolSize,新的任務會被拒絕。
拒絕任務(rejected tasks)
當ThreadPool遇到如下兩種狀況時會觸發拒絕任務策略:
- 正常狀況下BlockingQueue被填滿,同時poolSize == maximumPoolSize。
- 被關閉
ThreadPool使用RejectedExecutionHandler處理丟棄動做,默認定義了4中丟棄策略:
ThreadPoolExecutor.AbortPolicy: 拋出RejectedExecutionException異常。
ThreadPoolExecutor.CallerRunsPolicy: 本身執行這個被拋棄的任務。
ThreadPoolExecutor.DiscardPolicy: 悄無聲息的丟棄掉這人任務。
狀態
ThreadPool定義了5狀態
RUNNING: 接受新提交的任務,執行隊列中發任務。
SHUTDOWN: 不接受新提交的任務,但仍然會執行隊列中的人。
STOP: 不接受新提交的任務,不執行隊列中的任務,並且會打斷正在執行中的任務。
TIDYING: 全部的任務都終止了,而且線程數爲0,當全部的線程都過渡到TIDYING狀態後會調用treminated方法。
TERMINATED: treminated方法調用已經完成。
狀態之間的轉換關係
RUNNING --> SHUTDOWN
調用shutdown()
(RUNNING或SHUTDOWN) -- > STOP
調用shutdownNow()
SHUTDOWN --> TIDYING
隊列爲空,同時線程數爲0
TIDYING --> TREMINATED
treminated()執行完成。
向ThreadPool提交任務: execute
ThreadPoolExecutor實例建立以後,在沒有調用execute提交任務以前,ThreadPool中是沒有線程的,線程的建立是依賴exeute來驅動的。能夠說,exeute是ThreadPoolExecutor運行的觸發器,全部我選擇先從exeute方法開始分析代碼。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { //
若是線程數小於 corePoolSize, 建立一個新線程。
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { //
若是處於RUNNGIN狀態把任務放到隊列中
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) /
/再次檢查線程狀態,若是不是RUNNING狀態,把任務從隊列刪除,而後拒絕這個任務
reject(command);
else if (workerCountOf(recheck) == 0) //
若是線程數爲0,建立一個新線程
addWorker(null, false);
}
/*
若是運行到這裏說明當前不是出於RUNNING狀態,或處於RUNNING狀態但隊列已經被填滿
*嘗試建立新的線程執行這個任務,若是失敗,拒絕這任務
*/
else if (!addWorker(command, false))
reject(command);
}
以上就是exeute代碼,它很簡單,但其中ctl成員變量比較費解。ctl是AtomicInteger類型,它被用來打包保存ThreadPoolExecutor的狀態和線程數。
AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
它初始化時,把狀態設置成RUNNING,下面來看看它的結構
高位 ---- > 低位
運算狀態(run state)
|
線程數(workerCount)
|
31 -- 29
|
28 -- 0
|
狀態位
RUNNING
SHUTDOWN
STOP
TIDYING
TREMINATED
知道了這些數據的保存方式,把他們取出來,只須要一些簡單的位運算就能夠了。
狀態的大小關係 RUNNING < SHUTDOWN < STOP < TIDYING < TREMINATED,
runStateOf(clt.get()) < SHUTDOWN RUNNING狀態
runStateOf(clt.get()) >= SHUTDOWN 非RUNNING狀態
這個大小關係要記住,這樣理解代碼會更快。
建立新線程
ThreadPool把線程封裝成Worker對對象,添加worker就是添加線程,addWorker方法作的事情就是添加線程。
private boolean addWorker(Runnable firstTask, boolean core) {
/*
這段代碼的做用是確保知足一下條件的任意一個時才建立新線程
*1. 處於RUNNING 狀態, 能夠接受新任務,能夠繼續執行隊列中的任務
*2. 處於SHUTDOWN狀態, 隊列不是空,且當前沒有提交新任務
*/
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && //
非RUNNINGG狀態
! (rs == SHUTDOWN &&
firstTask == null && //
當前提交的新任務
! workQueue.isEmpty())) //
隊列不是空
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) //
若是當前調用建立的是core線程, 確保當前線程數 <corePoolSize, 不然確保當前線程數< maximumPoolSize
return false;
if (compareAndIncrementWorkerCount(c)) //
原子操做,增長線程數
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//
執行到這裏表示已經經過檢查能夠建立新線程,而且線程數已經加1
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) { //
再次檢查,確保當前仍然知足容許建立線程的條件
if (t.isAlive()) //
確保Thread尚未調用start()
throw new IllegalThreadStateException();
workers.add(w); //
把worker線程放進HashSet中
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); //
啓動新線程
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
線程的主循環
Worker實現了Runnable接口
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
構造方法
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
建立線程時把Worker實例自己當作線程的Runnable產生,因此當線程啓動後,將會調用Worker的run方法。
public void run() {
runWorker(this);
}
線程的主循環就在runWorker方法中實現
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; //
若是firstTask!=null, 先執行firstTask
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { //
若是沒有firstTask, 從隊列中取出一個task, 若是沒有取到,退出線程
w.lock();
//
若是處於狀態>=STOP(前面已經講過狀態直接的大小關係), 確保線程處於interrupted狀態
//
不然清除線程的interrupted狀態
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); //
執行任務前調用的方法,默認什麼都沒幹,用戶能夠根據須要覆蓋它
Throwable thrown = null;
try {
task.run(); //
執行任務
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); //
任務完成以後調用的方法, 默認什麼都沒幹,用戶能夠根據須要覆蓋它
}
} finally {
task = null; //
把當前task置空,這樣才能調用getTask從隊列裏取出任務
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//
正常退出線程 completedAbruptly是true, 異常致使的線程退出爲false
processWorkerExit(w, completedAbruptly);
}
}
從隊列中獲得排隊的任務
在runWorker主循環中,除了第一次的任務從worker的firsTask(在它不是null的狀況下)取以外, 後面每次都是調用getTask從隊列中取出一個任務。
下面是getTask的代碼分析
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); //
獲得當前狀態
//
若是當前狀態 > SHUTDOWN 退出線程
//
若是當前狀態 == SHUTDOWN 且 隊列爲空,退出線程
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount(); //減小當前線程數
return null;
}
int wc = workerCountOf(c); //
獲得當前的線程數
//
線程是否容許超時的條件: 設置容許coreThread超時,或者當前線程數 > corePoolSize
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//
線程退出須要同時知足如下兩個條件條件:
//
1. 當前線程數>maximumPooSize 或 容許超時同時檢查到已經超時
//
2. 當前線程數>1 或 隊列爲空
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c)) //
減小當前線程數, 這個方法確保多線程環境下不會過多地結束線程。
return null;
continue;
}
try {
//
取出一個任務。若是容許超時,調用poll,不然調用take
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true; //
已經超時,運行到這裏代表poll超時返回
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
getTask的功能除了取出一個任務之外,它還負責在條件知足的狀況下正常地結束一個線程
線程結束
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) //
若是線程是因爲異常緣由結束的,這裏要糾正線程數
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w); //
把線程從HashSet中刪除
} finally {
mainLock.unlock();
}
tryTerminate(); //
嘗試終止整個ThreadPool
int c = ctl.get();
if (runStateLessThan(c, STOP)) { //
若是當前狀態<STOP
if (!completedAbruptly) { //
若是不是異常結束
//
計算最小線程數min
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min) //
若是當前線程數>=min直接返回
return; // replacement not needed
}
//
建立新線程, 條件:
//
當前線程正常結束
//
當前線程異常結束,但當前線程數小於最小線程數
addWorker(null, false);
}
}
上面的代碼實現了線程的生命週期的管理,線程只有在ThreadPoolExecutor的狀態處於RUNNGIN或SHUTDOWN時才能夠存在。下面是這兩種狀態下線程的生存狀態:
RUNNING:
容許coreThread超時: 線程空閒(意味着隊列爲空)時間超過 keepAliveTime, 線程會被結束, 直到線程數爲0。
不容許coreThread超時: 線程空閒時間超過 keepAliveTime, 線程會被結束,直到線程數爲corePoolSize。
SHUDOWN:
當線程把已經在隊列裏的全部任務執行完畢後,全部線程都會進入退出流程,最終退出。
整個ThreadPoolExecutor的狀態變遷
前面已經講過,ThreadPool的狀態和線程數被打包方進一個32整數中:
AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
初始化把狀態設置成RUNNING, 線程爲0
調用shutdown時把狀態從RUNNING置爲SHUTDOWN, 隨後過渡到TIDYING->TREMINATED。
當調用shutdownNow時把狀態從(RUNNING 或 SHUTDOWN) 設置爲STOP, 隨後過渡到TIDYING->TREMINATED。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN); //
只有當前狀態<SHUTDOWN時才執行狀態設置的動做
interruptIdleWorkers(); //
打斷全部空閒的的線程,讓這些線程有機會本身結束
onShutdown(); //
回調方法,默認什麼都沒作,子類能夠覆蓋
} finally {
mainLock.unlock();
}
tryTerminate(); //
嘗試執行ThreadPool的結束操做
}
shutdownNow和shutdown的操做大體同樣,不一樣的是它把狀態設置成STOP,還會返回隊列中沒有來得及執行的任務list。
tryTerminate方法做用是嘗試結束整個ThreadPool, 它不必定會執行真正的結束動做。它在三個地方被調用, worker線程結束時,shudown中,shutdownNow中。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
//知足如下三個條件中的任何一個就當即返回
//1. 處於RUNNGING狀態
//2. 狀態>= TIDYING
//3. 處於SHUTDOWN狀態,且隊列不是空
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//
若是處於STOP狀態,且線程數不爲0,通知一個處於空閒的線程結束本身
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
//
執行到這裏表示目前狀態>=SHUTDOWN,線程數已是0
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { //
總有一個線程會運行到這裏,把狀態置爲 TIDYING
try {
terminated(); //
調用回調方面,默認什麼都沒幹,子類能夠覆蓋
} finally {
ctl.set(ctlOf(TERMINATED, 0)); //
把狀態置爲TREMINATED, 自此整個ThreadPool纔算終結
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
tryTerminate之因此要在三個地方調用,是爲了保證當調用shutdown或shutdownNow以後,總有一個線程會完成最後的終結工做。
參數設置
分析完前面代碼後,再來使用它,它的參數怎麼設置天然就瞭然於心。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
public void allowCoreThreadTimeOut(boolean value)
public void setCorePoolSize(int corePoolSize)
public void setKeepAliveTime(long time, TimeUnit unit)
public void setMaximumPoolSize(int maximumPoolSize)
public void setRejectedExecutionHandler(RejectedExecutionHandler handler)
public void setThreadFactory(ThreadFactory threadFactory)