簡書江溢Jonny,轉載請註明原創出處,謝謝!java
本文基於JDK1.7的源碼進行分析並解讀。咱們將以一個簡單的案例開始本話題的討論,而且在文章的結尾,筆者將會給出一些經驗之談,避免後來者踩坑。編程
若是你喜歡我,能夠關注個人公衆號~更多幹貨~ 緩存
ThreadPoolExecutor是JUC提供的一類線程池工具,也是Java語言中應用場景最多的併發框架,能夠說,幾乎全部須要異步或者併發執行的,均可以使用Java線程池。那麼首先,咱們一塊兒來比較一下「單純使用線程的方案」和「使用ThreadPoolExecutor線程池的方案」,在解決問題上有什麼區別吧。安全
在中世紀,有一種叫作抄寫員(Scribe)的工做,他們的職責就好像是複印機,抄寫一本又一本書。假如這個時候有一個抄寫員工做室,只有2個抄寫員,他們要抄寫10本書。bash
咱們在本例中分別「本身寫線程管理」和「由ThreadPoolExecutor作線程管理」服務器
public static class Book {
private static AtomicInteger id = new AtomicInteger(0); // 書名生成器
private String bookName; // 書名
public void copy() { // 抄寫書籍
System.out.println("start copy " + bookName);
try {
Thread.sleep(100L); // sleep 100ms
} catch (Exception e) {
// ignore
}
System.out.println("end copy " + bookName);
}
public Book() {
bookName = "book-" + String.valueOf(id.incrementAndGet()); // 書名自動生成
}
}
複製代碼
// 提早準備好十本書
final BlockingQueue<Book> books = new LinkedBlockingDeque<Book>(10);
for (int i = 0; i < 10; i++) {
try {
books.put(new Book());
} catch (Exception e) {
// ignore
}
}
System.out.println("start work...");
// 建立兩個書籍抄寫員線程
Thread[] scribes = new Thread[2];
for (int scribeIndex = 0; scribeIndex < 2; scribeIndex++) {
scribes[scribeIndex] = new Thread(new Runnable() {
public void run() {
for (; ; ) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("time arrives, stop writing...");
}
try {
Book currentBook = books.poll(5, TimeUnit.SECONDS);
currentBook.copy();
} catch (Exception e) {
System.out.println("time arrives, stop writing...");
return;
}
}
}
});
scribes[scribeIndex].setDaemon(false); // 設置爲非守護線程
scribes[scribeIndex].start();
}
// 工做已經安排下去了,安心等待就行了
try {
Thread.sleep(10000l);
} catch (Exception e) {
// ignore
}
// 時間到了,提醒兩個抄寫員中止抄寫
for (int scribeIndex = 0; scribeIndex < 2; scribeIndex++) {
scribes[scribeIndex].interrupt();
}
System.out.println("end work...");
複製代碼
寫了一大堆代碼來完成上述的功能,讓咱們一塊兒來看看用了ThreadPoolExecutor是怎麼完成的。併發
System.out.println("start work...");
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < 10; i ++) {
executorService.submit(new Runnable() {
public void run() {
new Book().copy();
}
});
}
// 工做已經安排下去了,安心等待就行了
try {
Thread.sleep(10000l);
} catch (Exception e) {
// ignore
}
executorService.shutdownNow();
System.out.println("end work...");
複製代碼
整個流程很是清晰,分別是:任務編寫、線程建立、線程啓動、終止線程。框架
可是不少時候,問題並不只限於上述的內容。異步
最先的併發編程的開發者不少事情都須要親力親爲,而經過使用Java線程池,能夠完成如下工做:函數
1)線程管理,線程的建立、啓動、銷燬等工做;
2)線程複用,線程的建立是會給服務器帶來必定開銷的,如何減小頻繁重複建立線程的開銷;
3)彈性伸縮,服務器一般有高峯期也有低峯期,線程池是否能夠彈性伸縮,好比線程建立成功後長時間不使用是否能夠回收,以減小系統資源的浪費,或者線程池的容量是否能夠隨時增加;
4)拒絕策略,線程數量有限而須要處理的任務不少,超出系統承載範圍的任務是拒絕仍是阻塞等待;
5)異常處理,線程在執行過程當中可能遇到異常或者錯誤,開發者如何正確應對這些異常或者錯誤;
6)任務分配,任務的分配是基於先入先出仍是基於某種優先級的策略。
等等如是,不一而足,這個時候咱們就要介紹Doug Lea大神開發的ThreadPoolExecutor線程池框架,看看大神是怎麼解決上面這些問題的。
首先,在解讀源碼以前,要引入ThreadPoolExecutor的一些重要概念
在ThreadPoolExecutor線程池的設計中,把整個任務執行框架線程池劃分爲5個生命週期:
RUNNING:容許接收新任務而且處理隊列中的任務
SHUTDOWN:再也不接收新的任務,僅消化完隊列中的任務
STOP:不只再也不接收新的任務,連隊列中的任務都再也不消化處理了,而且嘗試中斷正在執行任務的線程
TIDYING:全部任務被終止了,工做線程數workCount
也被設爲0,線程的狀態也被設爲TIDYING,並開始調用鉤子函數terminated()
TERMINATED:鉤子函數terminated()
執行完畢
各個生命週期的轉化圖以下:
從圖中能夠看到,整個生命週期的變化是不可逆的。
ThreadPoolExecutor把線程池狀態和線程池容量打包成一個int型變量,以下圖所示
線程池狀態位
狀態 | 高位值枚舉 | 正負性 |
---|---|---|
RUNNING | 111 | 負數(-536870912) |
SHUTDOWN | 000 | 0 |
STOP | 001 | 正數(536870912) |
TIDYING | 010 | 正數(1073741824) |
TERMINATED | 011 | 正數(1610612736) |
所以在狀態值的排布上能夠知道 TERMINATED > TIDYING > STOP >SHUTDOWN > RUNNING
ThreadPoolExecutor中的代碼以下所示:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 狀態字的高比特位存放線程池狀態信息
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 打包/提取狀態字信息
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
// 判斷當前線程池是否正在執行
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
複製代碼
首先,咱們建立一個線程池。
ExecutorService executorService = Executors.newFixedThreadPool(2);
複製代碼
這裏使用了Executors提供的工廠方法,能夠建立如下四種類型線程池:
newFixedThreadPool。該方法將用於建立一個固定大小的線程池(此時corePoolSize = maxPoolSize),每提交一個任務就建立一個線程池,直到線程池達到最大數量,線程池的規模在此後不會發生任何變化;
newCachedThreadPool。該方法建立了一個可緩存的線程池,(此時corePoolSize = 0,maxPoolSize = Integer.MAX_VALUE),空閒線程超過60秒就會被自動回收,該線程池存在的風險是,若是服務器應用達到請求高峯期時,會不斷建立新的線程,直到內存耗盡;
newSingleThreadExecutor。該方法建立了一個單線程的線程池,該線程池按照任務在隊列中的順序串行執行(如:FIFO、LIFO、優先級);
newScheduledThreadPool。該方法建立了一個固定長度的線程池,能夠以延遲或者定時的方式執行任務;
任務提交的大概邏輯以下:
1)當線程池小於corePoolSize時,新提交任務將建立一個新線程執行任務,即便此時線程池中存在空閒線程;
2)當線程池達到corePoolSize時,新提交任務將被放入workQueue中,等待線程池中任務調度執行;
3)當workQueue已滿,且maximumPoolSize > corePoolSize時,新提交任務會建立新線程執行任務;
4)當提交任務數超過maximumPoolSize時,新提交任務由RejectedExecutionHandler處理;
5)當線程池中超過corePoolSize線程,空閒時間達到keepAliveTime時,關閉空閒線程;
那麼接下來咱們看看源代碼是怎麼實現上面的描述的
線程池建立成功之後,咱們提交任務到線程池中:
executorService.submit(new Runnable() {
public void run() {
new Book().copy();
}
});
複製代碼
submit到線程池之後:
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);// 包裝出一個新的任務
execute(ftask); // 線程池的入口
return ftask;
}
複製代碼
能夠看到ThreadPoolExecutor
的入口方法就是execute(Runnable commad)
。該方法的執行邏輯以下:
int c = ctl.get();
// 1. 若是當前線程池中線程總數少於核心線程數,則添加新線程到線程池中,
// 而且由新線程執行剛提交進來的任務
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 2. 可能剛纔在建立新線程成功的同時,線程池被關閉了,所以須要double-check,
// 若是此時線程池已經被關閉了,那麼回滾剛纔被添加進來的任務
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 若是此時核心線程數(corePoolSize)已經滿了,而且任務隊列也滿了,
// 嘗試增長線程到maximumPoolSize大小,若是仍然失敗,執行拒絕策略
else if (!addWorker(command, false))
reject(command);
複製代碼
在上面的代碼裏面,ctl.get()
方法、workerCountOf()
、以及isRunning()
方法都是對以前提到的狀態字進行讀寫的操做,這部分咱們就再也不展開給讀者看了,有興趣的讀者能夠本身瞭解一下。
接下來,咱們看看addWorker都作了什麼工做:
private boolean addWorker(Runnable firstTask, boolean core) {
// 這部分省略的代碼都是對狀態字進行修改,添加並建立線程以前,
// 須要遞增work記數(此時須要線程安全地操做)
...
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
...
w = new Worker(firstTask); // 此處封裝出了一個新的Work,這個類咱們稍後會介紹
final Thread t = w.thread;
if (t != null) {
...
// 得到線程池狀態,若是線程池已經被關閉了,則再也不建立新的線程
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
...
workerAdded = true;
...
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 若是任務啓動或者提交到線程池失敗,
// 則執行回滾操做(從工做線程池中移除失敗添加的worker、減小狀態字中的任務計數)
addWorkerFailed(w);
}
return workerStarted;
}
複製代碼
任務執行在Worker
類中,而Worker
類是一個繼承了Runnable
接口的類。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
...
public void run() {
runWorker(this);
}
...
}
複製代碼
能夠看到Worker類中調用了外部的runWorker()
方法。所以能夠了解到,任務執行的主要邏輯,就是在外部的runWorker()
方法中執行的
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
...
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { // 循環讀取任務
...
try {
beforeExecute(wt, task); // 用戶實現的回調方法,任務啓動前
Throwable thrown = null;
try {
task.run();// 任務執行
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 用戶實現的回調方法,任務執行後
}
} finally {
task = null;
w.completedTasks++;
...
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
複製代碼
beforeExecute和afterExecute是兩個鉤子方法,在裏面指定了當線程開始執行和完成執行之後執行的動做,須要開發者實現。
另外須要注意的還有runWorker方法內調用的getTask()方法,在該方法內部,若是發生如下狀況將會返回null,終止工做線程的執行循環: 1)當前線程數即將超過maxPoolSize 2)線程池被關閉 3)當前線程數大於corePoolSize且小於maxPoolSize,並關切從BlockingQueue取數據超過了超時時間(默認60秒)
代碼實現以下:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 校驗當前線程池狀態
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
boolean timed; // Are workers subject to culling?
for (;;) {
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
// 若是線程超過指定時間內(默認60秒)沒有獲取到任務,說明有線程即將過時
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
複製代碼
若是線程被提交到線程池時,當前線程池出現如下狀況的任一一種狀況: 1)線程池任務隊列已經滿了 2)線程池被關閉了(調用了shutdown
函數或者shutdownNow
函數) 都將會調用提早設置好的回調策略,ThreadPoolExecutor
中總共提供了四種策略:
1)AbortPolicy(停止):該策略將會直接拋出RejectedExecutionException異常,調用者將會得到異常;
2)DiscardPolicy(拋棄):使用該策略,線程池將會悄悄地丟棄這個任務而不被調用者知道;
3)CallerRunsPolicy(調用者運行):該策略既不會拋棄任務也不會拋出異常,而是將這個任務退回給調用者,從而下降新任務的流量;
4)DiscardOldestPolicy(拋棄最舊的):該策略將會拋棄下一個即將輪到執行的任務,那麼「拋棄最舊」的將致使拋棄優先級最高的任務,所以最好不要把「拋棄最舊的」飽和策略和優先級隊列放在一塊兒使用; 這裏,代碼實現咱們將只展現**CallerRunsPolicy(調用者運行)**策略:
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/** * Creates a {@code CallerRunsPolicy}. */
public CallerRunsPolicy() { }
// 策略實現
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
複製代碼
固然開發者也能夠選擇,根據業務需求,定義本身的飽和策略。
ThreadPoolExecutor
提供了兩種方法銷燬線程池,分別是shutdown()
和shutdownNow()
shutdown()
方法僅僅是把線程池的狀態置爲SHUTDOWN,而且拒絕以後嘗試提交進來的全部請求,可是已經在任務隊列裏的任務會仍然會正常消費。
而shutdownNow()
方法的表現顯得更加簡單粗暴,它會強行關閉ExecutorService
,也會嘗試取消正在執行的任務,而且返回全部已經提交但還沒有開始的任務,開發者能夠將這些任務寫入日誌保存起來以便以後進行處理,另外嘗試取消正在執行的任務僅僅是嘗試對執行線程進行中斷,具體的線程響應中斷策略須要用戶本身編寫。代碼實現以下:
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
複製代碼
不要在ThreadPoolExecutor
線程池中使用ThreadLocal
,由於在ThreadPoolExecutor
中,線程是複用的,所以在這裏使用ThreadLocal
會被多個task共享,所以可能會帶來髒數據污染。須要當心使用
以一段代碼爲例:
// 10個線程,由於任務多,這裏用LinkedBlockingQueue
private static final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
private static final ExecutorService service = new ThreadPoolExecutor(0, 10,
60L, TimeUnit.SECONDS, queue
);
複製代碼
代碼中的corePoolSize=0,也就是核心線程數是1,若是任務數多於10個,那麼會先建立maximumPoolSize個線程執行,其他的任務加入 queue 中等待執行。
而在ThreadPoolExecutor
的實現中,當workQueue已滿,且maximumPoolSize>corePoolSize時,新提交任務會建立新線程執行任務。
所以,queue 是不會滿的,那麼永遠不會有maximumPoolSize個線程被建立,也就是說咱們的任務一直仍是一個線程在跑,沒法達到能夠同時使用多個線程的預期。
雖然ThreadPoolExecutor
提供了shutdownNow()
方法,在調用該方法後會嘗試中斷全部線程,可是該中斷並不能保證線程必定會就此終止,所以,須要開發者實現線程中斷的策略。關於這部分的內容,在Doug Lea的《Java Concurrency In Practice》的7.1.2節已經進行了完整的討論,筆者在這裏就再也不贅述了。
尤爲須要注意的是,ThreadPoolExecutor
有一個finalize
函數,具體實現以下:
protected void finalize() {
shutdown();
}
複製代碼
在該方法中調用了shutdown()
函數,所以,若是你並非真正但願中止線程池的執行,那麼就不要讓線程池離開你代碼的做用範圍。
我是江溢Jonny。