掃描下方二維碼或者微信搜索公衆號
菜鳥飛呀飛
,便可關注微信公衆號,閱讀更多Spring源碼分析
和Java併發編程
文章。java
不管是在工做中,仍是在書本中,咱們均可以聽到或者看到關於線程在使用時的一些建議:不要在代碼中本身直接建立線程,而是經過線程池的方式來使用線程。使用線程池的理由大體能夠總結爲如下幾點。面試
1.
下降資源消耗。線程是操做系統十分寶貴的資源,當多我的同時開發一個項目時,在互不知情的狀況下,都本身在代碼中建立了線程,這樣就會致使線程數過多,並且線程的建立和銷燬,在操做系統層面,須要由用戶態切換到內核態
,這是一個費時費力
的過程。而使用線程池能夠避免頻繁的建立線程和銷燬線程,線程池中線程能夠重複使用。2.
提升響應速度。當請求到達時,因爲線程池中的線程已經建立好了,使用線程池,能夠省去線程建立的這段時間。3.
提升線程的可管理性。線程是稀缺資源,當建立過多的線程時,會形成系統性能的降低,而使用線程池,能夠對線程進行統一分配、調優和監控。 線程池的使用十分簡單,可是會用不表明用得好。在面試中,基本不會問線程池應該怎麼用,而是問線程池在使用不當時會形成哪些問題,實際上就是考察線程池的實現原理。所以搞明白線程池的實現原理是頗有必要的一件事,不只僅對面試會有幫助,也會讓咱們在平時工做中避過好多坑。 在線程池中存在幾個概念:核心線程數、最大線程數、任務隊列。核心線程數指的是線程池的基本大小;最大線程數指的是,同一時刻線程池中線程的數量最大不能超過該值;任務隊列是當任務較多時,線程池中線程的數量已經達到了核心線程數,這時候就是用任務隊列來存儲咱們提交的任務。 與其餘池化技術不一樣的是,線程池是基於生產者-消費者
模式來實現的,任務的提交方是生產者,線程池是消費者。當咱們須要執行某個任務時,只須要把任務扔到線程池中便可。線程池中執行任務的流程以下圖以下。編程
1.
先判斷線程池中線程的數量是否超過核心線程數,若是沒有超過核心線程數,就建立新的線程去執行任務;若是超過了核心線程數,就進入到下面流程。2.
判斷任務隊列是否已經滿了,若是沒有滿,就將任務添加到任務隊列中;若是已經滿了,就進入到下面的流程。3.
再判斷若是建立一個線程後,線程數是否會超過最大線程數,若是不會超過最大線程數,就建立一個新的線程來執行任務;若是會,則進入到下面的流程。4.
執行拒絕策略。
在沒看線程池的具體實現以前,我一直存在這樣的疑惑:爲何是先判斷任務隊列有沒有滿,再判斷有沒有超過最大線程數?正常邏輯不是應該先儘量的建立線程,讓線程去處理任務嗎?當任務實在是太多了,線程處理不過來了,再將任務添加到任務隊列嗎?知道我看了線程池的具體代碼實現後,我才知道答案。問題答案在文末。
數組
在JUC包下,已經提供了線程池的具體的實現:ThreadPoolExecutor
。ThreadPoolExecutor提供了不少的構造方法,其中最複雜的構造方法有7個參數。安全
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
}
複製代碼
corePoolSize
:該參數表示的是線程池的核心線程數。當任務提交到線程池時,若是線程池的線程數量尚未達到corePoolSize,那麼就會新建立的一個線程來執行任務,若是達到了,就將任務添加到任務隊列中。微信
maximumPoolSize
:該參數表示的是線程池中容許存在的最大線程數量。當任務隊列滿了之後,再有新的任務進入到線程池時,會判斷再新建一個線程是否會超過maximumPoolSize,若是會超過,則不建立線程,而是執行拒絕策略。若是不會超過maximumPoolSize,則會建立新的線程來執行任務。併發
keepAliveTime
:當線程池中的線程數量大於corePoolSize時,那麼大於corePoolSize這部分的線程,若是沒有任務去處理,那麼就表示它們是空閒的,這個時候是不容許它們一直存在的,而是容許它們最多空閒一段時間,這段時間就是keepAliveTime,時間的單位就是TimeUnit。工具
unit
:空閒線程容許存活時間的單位,TimeUnit
是一個枚舉值,它能夠是納秒、微妙、毫秒、秒、分、小時、天。oop
workQueue
:任務隊列,用來存聽任務。該隊列的類型是阻塞隊列,經常使用的阻塞隊列有ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、PriorityBlockingQueue
等。 ArrayBlockingQueue
是一個基於數組實現的阻塞隊列,元素按照先進先出(FIFO)的順序入隊、出隊。由於底層實現是數組,數組在初始化時必須指定大小,所以ArrayBlockingQueue是有界隊列。 LinkedBlockingQueue
是一個基於鏈表實現的阻塞隊列,元素按照先進先出(FIFO)的順序入隊、出隊。由於頂層是鏈表,鏈表是基於節點之間的指針指向來維持先後關係的,若是不指鏈表的大小,它默認的大小是Integer.MAX_VALUE
,即,這個數值太大了,所以一般稱LinkedBlockingQueue是一個無界隊列。固然若是在初始化的時候,就指定鏈表大小,那麼它就是有界隊列了。
SynchronousQueue
是一個不存儲元素的阻塞隊列。每一個插入操做必須得等到另外一個線程調用了移除操做後,該線程纔會返回,不然將一直阻塞。吞吐量一般要高於LinkedBlockingQueue。 PriorityBlockingQueue
是一個將元素按照優先級排序的阻塞的阻塞隊列,元素的優先級越高,將會越先出隊列。這是一個無界隊列。源碼分析
threadFactory
:線程池工廠,用來建立線程。一般在實際項目中,爲了便於後期排查問題,在建立線程時須要爲線程賦予必定的名稱,經過線程池工廠,能夠方便的爲每個建立的線程設置具備業務含義的名稱。
handler
:拒絕策略。當任務隊列已滿,線程數量達到maximumPoolSize後,線程池就不會再接收新的任務了,這個時候就須要使用拒絕策略來決定最終是怎麼處理這個任務。默認狀況下使用AbortPolicy,表示沒法處理新任務,直接拋出異常
。在ThreadPoolExecutor類中定義了四個內部類,分別表示四種拒絕策略。咱們也能夠經過實現RejectExecutionHandler
接口來實現自定義的拒絕策略。
AbortPocily
:再也不接收新任務,直接拋出異常。CallerRunsPolicy
:提交任務的線程本身處理。DiscardPolicy
:不處理,直接丟棄。DiscardOldestPolicy
:丟棄任務隊列中排在最前面的任務,並執行當前任務。(排在隊列最前面的任務並不必定是在隊列中待的時間最長的任務,由於有多是按照優先級排序的隊列)
在使用ThreadPoolExecutor時,可使用execute(Runnable task)
方法向線程池中提交一個任務,沒有返回值。也可使用submit()
方法向線程池中添加任務,有返回值,返回值對象是Future。submit()方法有三個重載的方法。
submit(Runnable task)
,該方法雖然返回值對象是Future,可是使用Future.get()
獲取結果是null。submit(Runnable task,T result)
,方法的返回值對象是Future,經過Future.get()
獲取具體的返回值時,結果與方法的第二個參數result相等。submit(Callable task)
,該方法的參數是一個Callable類型
的對象,方法有返回值。瞭解了ThreadPoolExecutor的基本用法,下面將結合源碼來分析下線程池的代碼實現。從上面的線程池的原理中,咱們能夠發現,線程池的原理相對比較簡單,代碼實現起來應該不難,看源碼主要是爲了學習他人寫的優秀代碼,尤爲是編程大師Doug Lea寫的代碼。 對於一個線程池,除了上面介紹的幾個重要屬性之外,咱們還須要一個變量來表示線程池狀態,線程池也須要有運行中、關閉中、已關閉等狀態。若是要咱們去實現一個線程池,可能第一反應就是用一個單獨的變量來表示線程池的狀態,再用另外一個變量來表示線程池中線程的數量。這樣的確能夠,不過Doug Lea並非這樣實現的,他將二者用一個變量來表示(不得不感嘆下,大佬就是大佬,想法果真和他人不同)。那麼問題來了,如何用一個變量來表示兩個值?閱讀過讀寫鎖源碼的朋友可能就能立想到另外這一中方案(關於讀寫鎖的介紹能夠閱讀這一篇文章: 讀寫鎖ReadWriteLock的實現原理)。在讀寫鎖的實現中將一個int型的數值,按照高低位來拆分,高位表示一個數,低位再表示另外一個數,在線程池的實現中,Doug Lea再次使用了按位拆分的技巧。 在線程池中,使用了一個原子類AtomicInteger的變量來表示線程池狀態和線程數量,該變量在內存中會佔用4個字節,也就是32bit,其中高3位用來表示線程池的狀態,低29位用來表示線程的數量。線程池的狀態一共有5中狀態,用3bit最多能夠表示8種狀態,所以採用高3位來表示線程池的狀態徹底能知足需求。示意圖以下。
在看線程池的核心實現邏輯以前,先簡單看下ThreadPoolExecutor中關於各類變量和方法的定義。由於在覈心邏輯中,常常會用到它們,並且這些方法和變量大量用到了位運算,看起來不是特別直觀,因此提早熟悉它們的功能對於看核心邏輯有很大的幫助。相關源碼和註釋以下。
// 高2位表示線程池的狀態,其餘29位表示線程的數量,即worker的數量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 計數的位數,29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 線程池的最大大小,2^29 -1,
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// 線程池的狀態,高三位表示線程的運行狀態,
// RUNNING: 111
private static final int RUNNING = -1 << COUNT_BITS;
// SHUTDOWN: 000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// STOP: 001
private static final int STOP = 1 << COUNT_BITS;
// TIFYING(整理): 010
private static final int TIDYING = 2 << COUNT_BITS;
// TERMINATED: 011
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
// 計算線程池的狀態,計算結果的低29位全爲0,所以最終結果就是線程池的狀態
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 工做線程的數量,計算結果的高3位全是0,所以最終結果就是工做線程的數量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 根據線程池的狀態和工做線程的數量,計算ctl,實際上就是將二者合併成ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
/* * Bit field accessors that don't require unpacking ctl. * These depend on the bit layout and on workerCount being never negative. */
// 線程池的狀態是否小於s
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// 線程池的狀態大於等於s
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// 判斷線程池是否處於運行狀態
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
複製代碼
有了前面的基礎,接下來分析下核心實現。再使用線程池時咱們能夠經過execute(Runnable task)
來提交一個任務到線程池,所以咱們從核心入口execute(Runnable task)方法開始分析。execute()方法的源碼以下。在源碼中我添加了部分註釋,以供參考。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// workerCountOf(c)方法時計算工做線程的數量,
// 1. 工做線程數是否小於核心線程數,若是小於核心線程數,就建立新的線程去執行任務
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
// 任務執行失敗後,從新獲取ctl的值,供下面的邏輯計算
c = ctl.get();
}
// 2. 當工做線程數大於等於核心線程數時,線程池是運行狀態,且任務添加到隊列成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 若是線程池狀態不是RUNNING狀態,就執行拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 若是線程池中線程的數量爲0,就調用addWorker()方法建立新的worker線程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 線程池非運行狀態,或者任務入隊失敗,就嘗試建立新的worker線程
else if (!addWorker(command, false))
// 4. 若是建立新的worker線程失敗,就執行拒絕策略。
reject(command);
}
複製代碼
execute()
方法的邏輯大體分爲4部分,分別對應線程池原理部分所提到的4個步驟。
workerCountOf(c)
方法計算當前線程池中線程的數量,而後與初始化線程池時指定的核心線程數相比較,若是小於核心線程數,就調用addWorker()
方法,實際上就是建立新的線程來執行任務。addWorker()方法的源碼後面分析。workQueue.off(command)
方法,將任務添加到任務隊列中。若是此時任務隊列尚未滿,那麼就會添加成功,workQueue.off(command)就會返回true,那麼就會進入到if邏輯塊中,進行一些其餘的判斷。若是此時任務隊列已經滿了,workQueue.off(command)方法就會返回false,那麼就會執行後面的3和4。addWorker()
方法,在addworker()方法中會在建立新的線程以前,判斷線程數會不會超過最大線程數,若是會,addworker()就會返回false。若是不會,就會建立新的線程去執行任務。handler
的rejectedExecution()
方法。其源碼以下。final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
複製代碼
從上面的分析中,能夠看出,在好幾處邏輯中均調用了addWorker()
方法,這說明該方法十分重要。事實也是如此,該方法不只重要,代碼實現還十分複雜。該方法須要兩個參數,第一個參數就是咱們傳入的Runnable任務,第二參數是一個boolean值,傳入true表示的是當前線程池中的線程數尚未達到核心線程數,傳false表示當前線程數已經大於等於核心線程數了。addWorker()
方法的源碼很長,這裏我將其分爲兩個部分,下面先看前面一部分的源碼。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 1. 當線程池的狀態大於SHUTDOWN時,返回false。由於線程池處於關閉狀態了,就不能再接受任務了
// 2. 當線程池的狀態等於SHUTDOWN時,firstTask不爲空或者任務隊列爲空,返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 1. 線程數大於等於理論上的最大數(2^29-1),則直接返回false。(由於線程數不能再增長了)
// 2. 根據core來決定線程數應該和誰比較。當線程數大於核心線程數或者最大線程數時,直接返回false。
// (由於當大於核心線程數時,表示此時任務應該直接添加到隊列中(若是隊列滿了,可能入隊失敗);當大於最大線程數時,確定不能再新建立線程了,否則設置最大線程數有毛用)
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 線程數+1,若是設置成功,就跳出外層循環
if (compareAndIncrementWorkerCount(c))
break retry;
// 再次獲取線程池的狀態,並將其與前面獲取到的值比較,
// 若是相同,說明線程池的狀態沒有發生變化,繼續在內循環中進行循環
// 若是不相同,說明在這期間,線程池的狀態發生了變化,須要跳到外層循環,而後再從新進行循環
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 省略後半部分代碼
// ......
}
複製代碼
在這部分代碼中,咱們能夠看到一個很陌生的語法:retry...break retry...continue retry
。這種寫法真的是太少見了,少見到我第一眼看到的時候,覺得是我不當心碰到鍵盤,把源碼給改了。這種語法有點相似於C語言裏面的goto(已經被遺棄),其做用就是在for循環以前定義一個retry,而後在循環中,使用break retry
時,就會讓代碼跳出循環,並再也不進入循環
;當使用continue retry
時,表示跳出當前循環,立馬進入到下一次循環
。因爲這裏使用了兩層for循環,所以爲了方便在內層循環中一會兒跳出到外層循環
,就使用了retry這種語法。須要說明的是,這裏的retry並非Java裏面的關鍵字,而是隨機定義的一個字符串,咱們也能夠寫成a,b,c等,可是後面的break和continue後面的字符串須要和前面定義的這個字符串對應。 咱們能夠看到,前半部分的核心邏輯就是使用了兩個無限for和一個CAS操做來設置線程池的線程數量。若是線程池的線程數修改爲功,就中斷循環,進入後半部分代碼的邏輯,若是修改失敗,就利用for循環再一次進行修改,這樣的好處是,既實現了線程安全,也避免使用鎖,提升了效率。在這一部分代碼中,進行了不少判斷,這些判斷主要是校驗線程池的狀態以及線程數,我的認爲不是特別重要,咱們主要抓住核心邏輯便可。 當成功修改線程數量之後,就會執行addWorker()方法的後半部分代碼,其源碼以下。
private boolean addWorker(Runnable firstTask, boolean core) {
// 省略前半部分代碼
// ......
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 建立一個新的worker線程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 再次獲取線程池的狀態,由於在獲取鎖期間,線程池的狀態可能改變了
int rs = runStateOf(ctl.get());
// 若是線程池狀態時運行狀態或者是關閉狀態可是firstTask是空,就將worker線程添加到線程池
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 判斷worker線程是否已經啓動了,若是已經啓動,就拋出異常
// 我的以爲這一步沒有任何意義,由於worker線程是剛new出來的,沒有在任何地方啓動
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 啓動線程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 若是啓動失敗,就將worker線程從線程池移除,並將線程數減1
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
複製代碼
在這一部分代碼中,經過new Worker(firstTask)
建立了一個Worker對象,Worker對象繼承了AQS
,同時實現了Runnable接口
,它是線程池中真正幹活的人。咱們提交到線程的任務,最終都是封裝成Worker對象,而後由Worker對象來完成任務
。先簡單看下Woker的構造方法,其源碼以下。在構造方法中,首先設置了同步變量state爲-1,而後經過ThreadFactory建立了一個線程,注意在經過ThreadFactory建立線程時,將Worker自身也就是this,傳入了進去,也就是說最後建立出來的線程對象,它裏面的target屬性就是指向這個Worker對象
。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 將this傳入進去,最後就是使線程的target屬性等於當前的worker對象
this.thread = getThreadFactory().newThread(this);
}
}
複製代碼
當經過new Worker(firstTask)
建立完worker對象後,此時線程已經被建立好了,在啓動線程以前,先經過t.isAlive()
判斷線程是已經啓動,若是沒有啓動,纔會調用線程的start()
方法來啓動線程。這裏有一點須要注意的是,建立完worker對象後,調用了mainLock.lock()
來保證線程安全,由於這一步workers.add(w)
存在併發的可能,因此須要經過獲取鎖來保證線程安全。 當調用線程的start()方法以後,若是線程獲取到CPU的執行權,那麼就會執行線程的run()方法,在線程的run()方法中,會執行線程中target屬性的run()方法。這裏線程的target屬性就是咱們建立的worker對象,所以最終會執行到Worker的run()方法。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
public void run() {
runWorker(this);
}
}
複製代碼
在Worker類的run()
中,直接調用了runWorker()
方法。因此Worker執行任務的核心邏輯就是在runWorker()
方法中實現的。其源碼以下。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 若是worker線程的firstTask不爲空或者能從任務隊列中獲取到任務,就執行
// 不然就會一直阻塞到getTask()方法處
while (task != null || (task = getTask()) != null) {
// 保證worker串行的執行線程
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// beforeExecute()是空方法,由子類具體實現
// 該方法的目的是爲了讓任務執行前作一些其餘操做
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 真正執行任務
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// afterExecute()空方法,由子類具體實現
// 該方法的目的是爲了讓任務執行後作一些其餘操做
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
複製代碼
runWorker()
的源碼看起來也比較長,但核心邏輯就一行,即:task.run()
,最終執行了咱們提交的Runnable任務。在runWorker()方法中經過一個while循環來讓Worker對象一直來執行任務
。當傳入的task對象不爲空或者經過getTask()
方法能從任務隊列中獲取到任務時,worker就會一直執行。不然將在finally語句塊中調用processWorkerExit
退出,讓線程中斷,最終銷燬。 getTask()
方法是一個阻塞的方法,當能從任務隊列中獲取到任務時,就會當即返回一個任務。若是獲取不到任務,就會阻塞。它支持超時,當超過線程池初始化時指定的線程最大存活時間後,就會返回null,從而致使worker線程退出while循環,最終線程銷燬
。 到這兒線程池的execute()
方法就分析完了。最後簡單分析下線程池的shutdown()
方法和shutdownNow()
方法。當調用shutdown()方法時,會令線程池的狀態爲SHUTDOWN
,而後中斷空閒的線程,對於已經在執行任務的線程並不會中斷
。當調用shutdownNow()方法時,會令線程池的狀態爲STOP
,而後在中斷全部的線程,包括正在執行任務的線程
。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 權限校驗
checkShutdownAccess();
// 將線程池的狀態設置爲SHUTDOWN狀態
advanceRunState(SHUTDOWN);
// 中斷空閒的worker線程
interruptIdleWorkers();
// 空方法,由子類具體去實現,例如ScheduledThreadPoolExecutor
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 嘗試將線程池的狀態設置爲TERMINATED
tryTerminate();
}
複製代碼
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 線程池狀態設置爲STOP
advanceRunState(STOP);
// 中斷全部線程,包括正在執行任務的線程
interruptWorkers();
// 清除任務隊列中的全部任務
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
複製代碼
要求任務必須執行完成,那麼就是用shutdown()方法
。一般也建議使用shutdown()方法,更加優雅。corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler
以及它們的各自的意義,而後結合源碼實現,詳細分析了任務的執行過程。addWorker()
方法,在addWorker()的後半部分的邏輯中,會調用mainLock.lock()
方法來獲取全局鎖,而獲取鎖就會形成必定的資源爭搶。若是先判斷最大線程數,再判斷任務隊列是否已滿,這樣就會形成線程池原理的4個步驟中,第1步判斷核心線程數時要獲取全局鎖,第2步判斷最大線程數時,又要獲取全局鎖,這樣相比於先判斷任務隊列是否已滿,再判斷最大線程數,就可能會多出一次獲取全局鎖的過程
。所以在設計線程池,爲了儘量的避免由於獲取全局鎖而形成資源的爭搶
,因此會先判斷任務隊列是否已滿,再判斷最大線程數。LinkedBlockingQueue
的吞吐量比ArrayBlockingQueue
的吞吐量要高。前者是基於鏈表實現的,後者是基於數組實現的,正常狀況下,不該該是數組的性能要高於鏈表嗎?讀和寫操做使用了兩個鎖
,takeLock和putLock,讀寫操做不會形成資源的爭搶
。而ArrayBlockingQueue的讀和寫使用的是同一把鎖
,讀寫操做存在鎖的競爭
。所以LinkedBlockingQueue的吞吐量高於ArrayBlockingQueue。