線程池(英語:thread pool):一種線程使用模式。線程過多會帶來調度開銷,進而影響緩存局部性和總體性能。而線程池維護着多個線程,等待着監督管理者分配可併發執行的任務。這避免了在處理短期任務時建立與銷燬線程的代價。線程池不只可以保證內核的充分利用,還能防止過度調度。可用線程數量應該取決於可用的併發處理器、處理器內核、內存、網絡 sockets 等的數量。 例如,線程數通常取 cpu 數量 +2 比較合適,線程數過多會致使額外的線程切換開銷。html
Java 中的線程池是用 ThreadPoolExecutor 類來實現的. 本文就對該類的源碼來分析一下這個類內部對於線程的建立, 管理以及後臺任務的調度等方面的執行原理。java
先看一下線程池的類圖:緩存
上圖的目的主要是爲了讓你們知道線程池相關類之間的關係,至少賺個眼熟,之後看到不會有懼怕的感受。安全
Executor 框架是一個根據一組執行策略調用,調度,執行和控制的異步任務的框架,目的是提供一種將」任務提交」與」任務如何運行」分離開來的機制。網絡
下面是 ThreadPoolExeCutor 類圖。Executors 實際上是一個工具類,裏面提供了好多靜態方法,這些方法根據用戶選擇返回不一樣的線程實例。多線程
從上圖也能夠看出來,ThreadPoolExeCutor 是線程池的核心。併發
J.U.C 中有三個 Executor 接口:框架
Executor:一個運行新任務的簡單接口;異步
ExecutorService:擴展了 Executor 接口。添加了一些用來管理執行器生命週期和任務生命週期的方法;socket
ScheduledExecutorService:擴展了 ExecutorService。支持 Future 和按期執行任務。
其實經過這些接口就能夠看到一些設計思想,每一個接口的名字和其任務是徹底匹配的。不會由於 Executor 中只有一個方法,就將其放到其餘接口中。這也是很重要的單一原則。
在去具體分析 ThreadPoolExeCutor 運行邏輯前,先看下面的流程圖:
該圖是 ThreadPoolExeCutor 整個運行過程的一個歸納,整個源碼的核心邏輯總結起來就是:
建立線程:要知道如何去建立線程,控制線程數量,線程的存活與銷燬;
添加任務:任務添加後如何處理,是馬上執行,仍是先保存;
下面將進入源碼分析,來深刻理解 ThreadPoolExeCutor 的設計思想。
先來看構造函數:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException();
// 注意 workQueue, threadFactory, handler 是不能夠爲null 的,爲空會直接拋出錯誤 if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
corePoolSize 核心線程數:表示核心線程池的大小。當提交一個任務時,若是當前核心線程池的線程個數沒有達到 corePoolSize,則會建立新的線程來執行所提交的任務,即便當前核心線程池有空閒的線程。若是當前核心線程池的線程個數已經達到了corePoolSize,則再也不從新建立線程。若是調用了 prestartCoreThread()
或者 prestartAllCoreThreads()
,線程池建立的時候全部的核心線程都會被建立而且啓動。若 corePoolSize == 0,則任務執行完以後,沒有任何請求進入時,銷燬線程池的線程。若 corePoolSize > 0,即便本地任務執行完畢,核心線程也不會被銷燬。corePoolSize 其實能夠理解爲可保留的空閒線程數。
maximumPoolSize: 表示線程池可以容納同時執行的最大線程數。若是當阻塞隊列已滿時,而且當前線程池線程個數沒有超過 maximumPoolSize 的話,就會建立新的線程來執行任務。注意 maximumPoolSize >= 1 必須大於等於 1。maximumPoolSize == corePoolSize ,便是固定大小線程池。實際上最大容量是由 CAPACITY 控制。
keepAliveTime: 線程空閒時間。當空閒時間達到 keepAliveTime值時,線程會被銷燬,直到只剩下 corePoolSize 個線程爲止,避免浪費內存和句柄資源。默認狀況,當線程池的線程數 > corePoolSize 時,keepAliveTime 纔會起做用。但當 ThreadPoolExecutor 的 allowCoreThreadTimeOut 變量設置爲 true 時,核心線程超時後會被回收。
unit:時間單位。爲 keepAliveTime 指定時間單位。
workQueue 緩存隊列。當請求的線程數 > maximumPoolSize時,線程進入 BlockingQueue 阻塞隊列。可使用 ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue, PriorityBlockingQueue。
threadFactory 建立線程的工程類。能夠經過指定線程工廠爲每一個建立出來的線程設置更有意義的名字,若是出現併發問題,也方便查找問題緣由。
AbortPolicy: 直接拒絕所提交的任務,並拋出 RejectedExecutionException 異常;
CallerRunsPolicy:只用調用者所在的線程來執行任務;
DiscardPolicy:不處理直接丟棄掉任務;
DiscardOldestPolicy:丟棄掉阻塞隊列中存放時間最久的任務,執行當前任務
看完構造函數以後,再來看下該類裏面的變量,有助於進一步理解整個代碼運行邏輯,下面是一些比較重要的變量:
// 用來標記線程池狀態(高3位),線程個數(低29位) // 默認是 RUNNING 狀態,線程個數爲0 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 線程個數掩碼位數,整型最大位數-3,能夠適用於不一樣平臺 private static final int COUNT_BITS = Integer.SIZE - 3; //線程最大個數(低29位)00011111111111111111111111111111 private static final int CAPACITY = (1 << COUNT_BITS) - 1; //(高3位):11100000000000000000000000000000 private static final int RUNNING = -1 << COUNT_BITS; //(高3位):00000000000000000000000000000000 private static final int SHUTDOWN = 0 << COUNT_BITS; //(高3位):00100000000000000000000000000000 private static final int STOP = 1 << COUNT_BITS; //(高3位):01000000000000000000000000000000 private static final int TIDYING = 2 << COUNT_BITS; //(高3位):01100000000000000000000000000000 private static final int TERMINATED = 3 << COUNT_BITS; // 獲取高三位 運行狀態 private static int runStateOf(int c) { return c & ~CAPACITY; } //獲取低29位 線程個數 private static int workerCountOf(int c) { return c & CAPACITY; } //計算ctl新值,線程狀態 與 線程個數 private static int ctlOf(int rs, int wc) { return rs | wc; }
這裏須要對一些操做作些解釋。
Integer.SIZE:對於不一樣平臺,其位數不同,目前常見的是 32 位;
(1 << COUNT_BITS) - 1:首先是將 1 左移 COUNT_BITS 位,也就是第 COUNT_BITS + 1 位是1,其他都是 0;-1 操做則是將後面前面的 COUNT_BITS 位都變成 1。
-1 << COUNT_BITS:-1 的原碼是 10000000 00000000 00000000 00000001 ,反碼是 111111111 11111111 11111111 11111110 ,補碼 +1,而後左移 29 位是 11100000 00000000 00000000 00000000;這裏轉爲十進制是負數。
~CAPACITY :取反,最高三位是1;
總結:這裏巧妙利用 bit 操做來將線程數量和運行狀態聯繫在一塊兒,減小了變量的存在和內存的佔用。其中五種狀態的十進制排序:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
線程池狀態含義:
RUNNING:接受新任務而且處理阻塞隊列裏的任務;
SHUTDOWN:拒絕新任務可是處理阻塞隊列裏的任務;
STOP:拒絕新任務而且拋棄阻塞隊列裏的任務同時會中斷正在處理的任務;
TIDYING:全部任務都執行完(包含阻塞隊列裏面任務)當前線程池活動線程爲 0,將要調用 terminated 方法
TERMINATED:終止狀態。terminated 方法調用完成之後的狀態;
線程池狀態轉換:
RUNNING -> SHUTDOWN:顯式調用 shutdown() 方法,或者隱式調用了 finalize(),它裏面調用了shutdown()方法。
RUNNING or SHUTDOWN)-> STOP:顯式 shutdownNow() 方法;
SHUTDOWN -> TIDYING:當線程池和任務隊列都爲空的時候;
STOP -> TIDYING:當線程池爲空的時候;
TIDYING -> TERMINATED:當 terminated() hook 方法執行完成時候;
1. 原碼:原碼就是符號位加上真值的絕對值, 即用第一位表示符號,其他位表示值. 好比若是是 8 位二進制:
[+1]原 = 0000 0001
[-1]原 = 1000 0001
負數原碼第一位是符號位.
2. 反碼:反碼的表示方法是,正數的反碼是其自己,負數的反碼是在其原碼的基礎上, 符號位不變,其他各個位取反.
[+1] = [0000 0001]原 = [0000 0001]反
[-1] = [1000 0001]原 = [1111 1110]反
3. 補碼:補碼的表示方法是,正數的補碼就是其自己,負數的補碼是在其原碼的基礎上, 符號位不變, 其他各位取反, 最後 +1. (即在反碼的基礎上 +1)
[+1] = [0000 0001]原 = [0000 0001]反 = [0000 0001]補
[-1] = [1000 0001]原 = [1111 1110]反 = [1111 1111]補
4. 總結
在知道一個數原碼的狀況下:
正數:反碼,補碼 就是自己本身
負數:反碼是高位符號位不變,其他位取反。補碼:反碼+1
5. 左移:當數值左、右移時,先將數值轉化爲其補碼形式,移完後,再轉換成對應的原碼
左移:高位丟棄,低位補零
[+1] = [00000001]補
[0000 0001]補 << 1 = [0000 0010]補 = [0000 0010]原 = [+2]
[-1] = [1000 0001]原 = [1111 1111]補
[1111 1111]補 << 1 = [1111 1110]補 = [1000 0010]原 = [-2]
其中,再次提醒,負數的補碼是反碼+1;負數的反碼是補碼-1;
6. 右移:高位保持不變,低位丟棄
[+127] = [0111 1111]原 = [0111 1111]補
[0111 1111]補 >> 1 = [0011 1111]補 = [0011 1111]原 = [+63]
[-127] = [1111 1111]原 = [1000 0001]補
[1000 0001]補 >> 1 = [1100 0000]補 = [1100 0000]原 = [-64]
經過 ThreadPoolExecutor 建立線程池後,提交任務後執行過程是怎樣的,下面來經過源碼來看一看。execute 方法源碼以下:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 返回包含線程數及線程池狀態(頭3位) int c = ctl.get(); // 若是工做線程數小於核心線程數,則建立線程任務執行 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; // 若是建立失敗,防止外部已經在線程池中加入新任務,從新獲取 c = ctl.get(); } // 只有線程池處於 RUNNING 狀態,且 入隊列成功 if (isRunning(c) && workQueue.offer(command)) { // 後面的操做屬於double-check int recheck = ctl.get(); // 若是線程池不是 RUNNING 狀態,則將剛加入隊列的任務移除 if (! isRunning(recheck) && remove(command)) reject(command); // 若是以前的線程已被消費完,新建一個線程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 核心池和隊列都滿了,嘗試建立一個新線程 else if (!addWorker(command, false)) // 若是 addWorker 返回是 false,即建立失敗,則喚醒拒絕策略 reject(command); }
若是當前運行的線程少於 corePoolSize,則會建立新的線程來執行新的任務;
若是運行的線程個數等於或者大於 corePoolSize,則會將提交的任務存放到阻塞隊列 workQueue 中;
若是當前 workQueue 隊列已滿的話,則會建立新的線程來執行任務;
若是線程個數已經超過了 maximumPoolSize,則會使用飽和策略 RejectedExecutionHandler 來進行處理。
這裏要注意一下 addWorker(null, false)
也就是建立一個線程,但並無傳入任務,由於任務已經被添加到 workQueue 中了,因此 worker 在執行的時候,會直接從 workQueue 中獲取任務。因此,在 workerCountOf(recheck) == 0
時執行 addWorker(null, false)
也是爲了保證線程池在 RUNNING 狀態下必需要有一個線程來執行任務。
須要注意的是,線程池的設計思想就是使用了核心線程池 corePoolSize,阻塞隊列 workQueue 和線程池 maximumPoolSize,這樣的緩存策略來處理任務,實際上這樣的設計思想在須要框架中都會使用。
須要注意線程和任務之間的區別,任務是保存在 workQueue 中的,線程是從線程池裏面取的,由 CAPACITY 控制容量。
addWorker 方法的主要工做是在線程池中建立一個新的線程並執行,firstTask 參數用於指定新增的線程執行的第一個任務,core 參數爲 true 表示在新增線程時會判斷當前活動線程數是否少於 corePoolSize,false 表示新增線程前須要判斷當前活動線程數是否少於 maximumPoolSize,代碼以下:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); // 獲取運行狀態 int rs = runStateOf(c); /* * 這個if判斷 * 若是rs >= SHUTDOWN,則表示此時再也不接收新任務; * 接着判斷如下3個條件,只要有1個不知足,則返回false: * 1. rs == SHUTDOWN,這時表示關閉狀態,再也不接受新提交的任務,但卻能夠繼續處理阻塞隊列中已保存的任務 * 2. firsTask爲空 * 3. 阻塞隊列不爲空 * * 首先考慮rs == SHUTDOWN的狀況 * 這種狀況下不會接受新提交的任務,因此在firstTask不爲空的時候會返回false; * 而後,若是firstTask爲空,而且workQueue也爲空,則返回false, * 由於隊列中已經沒有任務了,不須要再添加線程了 */ // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 獲取線程數 int wc = workerCountOf(c); // 若是wc超過CAPACITY,也就是ctl的低29位的最大值(二進制是29個1),返回false; // 這裏的core是addWorker方法的第二個參數,若是爲true表示根據corePoolSize來比較, // 若是爲false則根據maximumPoolSize來比較。 // if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 嘗試增長workerCount,若是成功,則跳出第一個for循環 if (compareAndIncrementWorkerCount(c)) break retry; // 若是增長workerCount失敗,則從新獲取ctl的值 c = ctl.get(); // Re-read ctl // 若是當前的運行狀態不等於rs,說明狀態已被改變,返回第一個for循環繼續執行 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 根據firstTask來建立Worker對象 w = new Worker(firstTask); // 每個Worker對象都會建立一個線程 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // rs < SHUTDOWN表示是RUNNING狀態; // 若是rs是RUNNING狀態或者rs是SHUTDOWN狀態而且firstTask爲null,向線程池中添加線程。 // 由於在SHUTDOWN時不會在添加新的任務,但仍是會執行workQueue中的任務 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // workers是一個HashSet workers.add(w); int s = workers.size(); // largestPoolSize記錄着線程池中出現過的最大線程數量 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 啓動線程 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
這裏須要注意有如下幾點:
在獲取鎖後從新檢查線程池的狀態,這是由於其餘線程可可能在本方法獲取鎖前改變了線程池的狀態,好比調用了shutdown方法。添加成功則啓動任務執行。
t.start()
會調用 Worker 類中的 run 方法,Worker 自己實現了 Runnable 接口。緣由在建立線程得時候,將 Worker 實例傳入了 t 當中,可參見 Worker 類的構造函數。
wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) 每次調用 addWorker 來添加線程會先判斷當前線程數是否超過了CAPACITY,而後再去判斷是否超 corePoolSize 或 maximumPoolSize,說明線程數其實是由 CAPACITY 來控制的。
上面分析過程當中,提到了一個 Worker 類,對於某些對源碼不是很熟悉得同窗可能有點不清楚,下面就來看看 Worker 的源碼:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask;
// 注意此處傳入的是this this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker. */
// 這裏其實會調用外部的 runWorker 方法來執行本身。 public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) {
// 若是已經設置過1了,這時候在設置1就會返回false,也就是不可重入 if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } // 提供安全中斷線程得方法 void interruptIfStarted() { Thread t;
// 一開始 setstate(-1) 避免了還沒開始運行就被中斷可能 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
首先看到的是 Worker 繼承了(AbstractQueuedSynchronizer) AQS,並實現了 Runnable 接口,說明 Worker 自己也是線程。而後看其構造函數能夠發現,內部有兩個屬性變量分別是 Runnable 和 Thread 實例,該類其實就是對傳進來得屬性作了一個封裝,並加入了獲取鎖的邏輯(繼承了 AQS )。具體可參考文章:透過 ReentrantLock 分析 AQS 的實現原理
Worker 繼承了 AQS,使用 AQS 來實現獨佔鎖的功能。爲何不使用 ReentrantLock 來實現呢?能夠看到 tryAcquire 方法,它是不容許重入的,而 ReentrantLock 是容許重入的:
lock 方法一旦獲取了獨佔鎖,表示當前線程正在執行任務中;
若是正在執行任務,則不該該中斷線程;
若是該線程如今不是獨佔鎖的狀態,也就是空閒的狀態,說明它沒有在處理任務,這時能夠對該線程進行中斷;
線程池在執行 shutdown 方法或 tryTerminate 方法時會調用 interruptIdleWorkers 方法來中斷空閒的線程,interruptIdleWorkers 方法會使用 tryLock 方法來判斷線程池中的線程是不是空閒狀態;
之因此設置爲不可重入,是由於咱們不但願任務在調用像 setCorePoolSize 這樣的線程池控制方法時從新獲取鎖。若是使用 ReentrantLock,它是可重入的,這樣若是在任務中調用瞭如 setCorePoolSize 這類線程池控制的方法,會中斷正在運行的線程,由於 size 小了,須要中斷一些線程 。
因此,Worker 繼承自 AQS,用於判斷線程是否空閒以及是否能夠被中斷。
此外,在構造方法中執行了 setState(-1);
,把 state 變量設置爲 -1,爲何這麼作呢?是由於 AQS 中默認的 state 是 0,若是剛建立了一個 Worker 對象,尚未執行任務時,這時就不該該被中斷,看一下 tryAquire 方法:
protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
正由於如此,在 runWorker 方法中會先調用 Worker 對象的 unlock 方法將 state 設置爲 0。tryAcquire 方法是根據 state 是不是 0 來判斷的,因此,setState(-1);
將 state 設置爲 -1 是爲了禁止在執行任務前對線程進行中斷。
前面提到了內部類 Worker 的 run 方法調用了外部類 runWorker,下面來看下 runWork 的具體邏輯。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // status 設置爲0,容許中斷,也能夠避免再次加鎖失敗 boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { // 要派發task的時候,須要上鎖 w.lock(); // 若是線程池當前狀態至少是stop,則設置中斷標誌; // 若是線程池當前狀態是RUNNININ,則重置中斷標誌,重置後須要從新 //檢查下線程池狀態,由於當重置中斷標誌時候,可能調用了線程池的shutdown方法 //改變了線程池狀態。 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //任務執行前幹一些事情 beforeExecute(wt, task); Throwable thrown = null; try { task.run();//執行任務 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //任務執行完畢後幹一些事情 afterExecute(task, thrown); } } finally { task = null; //統計當前worker完成了多少個任務 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //執行清了工做 processWorkerExit(w, completedAbruptly); } }
總結一下 runWorker 方法的執行過程:
while 循環不斷地經過 getTask() 方法從阻塞隊列中取任務;
若是線程池正在中止,那麼要保證當前線程是中斷狀態,不然要保證當前線程不是中斷狀態;
調用 task.run()
執行任務;
若是 task 爲 null 則跳出循環,執行 processWorkerExit 方法;
runWorker 方法執行完畢,也表明着 Worker 中的 run 方法執行完畢,銷燬線程。
這裏的 beforeExecute 方法和 afterExecute 方法在 ThreadPoolExecutor 類中是空的,留給子類來實現。
completedAbruptly 變量來表示在執行任務過程當中是否出現了異常,在 processWorkerExit 方法中會對該變量的值進行判斷。
getTask 方法是從阻塞隊列裏面獲取任務,具體代碼邏輯以下:
private Runnable getTask() { // timeOut變量的值表示上次從阻塞隊列中取任務時是否超時 boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. /* * 若是線程池狀態rs >= SHUTDOWN,也就是非RUNNING狀態,再進行如下判斷: * 1. rs >= STOP,線程池是否正在stop; * 2. 阻塞隊列是否爲空。 * 若是以上條件知足,則將workerCount減1並返回null。 * 由於若是當前線程池狀態的值是SHUTDOWN或以上時,不容許再向阻塞隊列中添加任務。 */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? // timed變量用於判斷是否須要進行超時控制。 // allowCoreThreadTimeOut默認是false,也就是核心線程不容許進行超時; // wc > corePoolSize,表示當前線程池中的線程數量大於核心線程數量; // 對於超過核心線程數量的這些線程,須要進行超時控制 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; /* * wc > maximumPoolSize的狀況是由於可能在此方法執行階段同時執行了setMaximumPoolSize方法; * timed && timedOut 若是爲true,表示當前操做須要進行超時控制,而且上次從阻塞隊列中獲取任務發生了超時 * 接下來判斷,若是有效線程數量大於1,或者阻塞隊列是空的,那麼嘗試將workerCount減1; * 若是減1失敗,則返回重試。 * 若是wc == 1時,也就說明當前線程是線程池中惟一的一個線程了。 */ if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { /* * 根據timed來判斷,若是爲true,則經過阻塞隊列的poll方法進行超時控制,若是在keepAliveTime時間內沒有獲取到任務,則返回null; * 不然經過take方法,若是這時隊列爲空,則take方法會阻塞直到隊列不爲空。 * */ Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; // 若是 r == null,說明已經超時,timedOut設置爲true timedOut = true; } catch (InterruptedException retry) { // 若是獲取任務時當前線程發生了中斷,則設置timedOut爲false並返回循環重試 timedOut = false; } } }
其實到這裏後,你會發如今 ThreadPoolExcute 內部有幾個重要的檢驗:
判斷當前的運行狀態,根據運行狀態來作處理,若是當前都中止運行了,那不少操做也就不必了;
判斷當前線程池的數量,而後將該數據和 corePoolSize 以及 maximumPoolSize 進行比較,而後再去決定下一步該作啥;
首先是第一個 if 判斷,當運行狀態處於非 RUNNING 狀態,此外 rs >= STOP(線程池是否正在 stop)或阻塞隊列是否爲空。則將 workerCount 減 1 並返回 null。爲何要減 1 呢,由於此處實際上是去獲取一個 task,可是發現處於中止狀態了,也就是不必再去獲取運行任務了,那這個線程就沒有存在的意義了。後續也會在 processWorkerExit 將該線程移除。
第二個 if 條件目的是控制線程池的有效線程數量。由上文中的分析能夠知道,在執行 execute 方法時,若是當前線程池的線程數量超過了 corePoolSize 且小於 maximumPoolSize,而且 workQueue 已滿時,則能夠增長工做線程,但這時若是超時沒有獲取到任務,也就是 timedOut 爲 true 的狀況,說明 workQueue 已經爲空了,也就說明了當前線程池中不須要那麼多線程來執行任務了,能夠把多於 corePoolSize 數量的線程銷燬掉,保持線程數量在 corePoolSize 便可。
何時會銷燬?固然是 runWorker 方法執行完以後,也就是 Worker 中的 run 方法執行完,由 JVM 自動回收。
getTask 方法返回 null 時,在 runWorker 方法中會跳出 while 循環,而後會執行 processWorkerExit 方法。
下面在看 processWorkerExit 方法的具體邏輯:
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 若是completedAbruptly值爲true,則說明線程執行時出現了異常,須要將workerCount減1; // 若是線程執行時沒有出現異常,說明在getTask()方法中已經已經對workerCount進行了減1操做,這裏就沒必要再減了。 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //統計完成的任務數 completedTaskCount += w.completedTasks; // 從workers中移除,也就表示着從線程池中移除了一個工做線程 workers.remove(w); } finally { mainLock.unlock(); } // 根據線程池狀態進行判斷是否結束線程池 tryTerminate(); int c = ctl.get(); /* * 當線程池是RUNNING或SHUTDOWN狀態時,若是worker是異常結束,那麼會直接addWorker; * 若是allowCoreThreadTimeOut=true,而且等待隊列有任務,至少保留一個worker; * 若是allowCoreThreadTimeOut=false,workerCount很多於corePoolSize。 */ if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
至此,processWorkerExit 執行完以後,工做線程被銷燬,以上就是整個工做線程的生命週期。可是這有兩點須要注意:
你們想一想何時纔會調用這個方法,任務幹完了纔會調用。那麼沒事作了,就須要看下是否有必要結束線程池,這時候就會調用 tryTerminate。
若是此時線程處於 STOP 狀態如下,那麼就會判斷核心線程數是否達到了規定的數量,沒有的話,就會繼續建立一個線程。
tryTerminate 方法根據線程池狀態進行判斷是否結束線程池,代碼以下:
final void tryTerminate() { for (;;) { int c = ctl.get(); /* * 當前線程池的狀態爲如下幾種狀況時,直接返回: * 1. RUNNING,由於還在運行中,不能中止; * 2. TIDYING或TERMINATED,由於線程池中已經沒有正在運行的線程了; * 3. SHUTDOWN而且等待隊列非空,這時要執行完workQueue中的task; */ if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 若是線程數量不爲0,則中斷一個空閒的工做線程,並返回 if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 這裏嘗試設置狀態爲TIDYING,若是設置成功,則調用terminated方法 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // terminated方法默認什麼都不作,留給子類實現 terminated(); } finally { // 設置狀態爲TERMINATED ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
interruptIdleWorkers(
boolean onlyOne) 若是 ONLY_ONE = true 那麼就的最多
讓一個空閒線程發生中斷,ONLY_ONE = false 時是全部空閒線程都會發生中斷。那線程何時會處於空閒狀態呢?
一是線程數量不少,任務都完成了;二是線程在 getTask 方法中執行 workQueue.take()
時,若是不執行中斷會一直阻塞。
因此每次在工做線程結束時調用 tryTerminate 方法來嘗試中斷一個空閒工做線程,避免在隊列爲空時取任務一直阻塞的狀況。
shutdown 方法要將線程池切換到 SHUTDOWN 狀態,並調用 interruptIdleWorkers 方法請求中斷全部空閒的 worker,最後調用 tryTerminate 嘗試結束線程池。
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 安全策略判斷 checkShutdownAccess(); // 切換狀態爲SHUTDOWN advanceRunState(SHUTDOWN); // 中斷空閒線程 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 嘗試結束線程池 tryTerminate(); }
這裏思考一個問題:在 runWorker 方法中,執行任務時對 Worker 對象 w 進行了 lock 操做,爲何要在執行任務的時候對每一個工做線程都加鎖呢?
下面仔細分析一下:
在 getTask 方法中,若是這時線程池的狀態是 SHUTDOWN 而且 workQueue 爲空,那麼就應該返回 null 來結束這個工做線程,而使線程池進入 SHUTDOWN 狀態須要調用shutdown 方法;
shutdown 方法會調用 interruptIdleWorkers 來中斷空閒的線程,interruptIdleWorkers 持有 mainLock,會遍歷 workers 來逐個判斷工做線程是否空閒。但 getTask 方法中沒有mainLock;
在 getTask 中,若是判斷當前線程池狀態是 RUNNING,而且阻塞隊列爲空,那麼會調用 workQueue.take()
進行阻塞;
若是在判斷當前線程池狀態是 RUNNING 後,這時調用了 shutdown 方法把狀態改成了 SHUTDOWN,這時若是不進行中斷,那麼當前的工做線程在調用了 workQueue.take()
後會一直阻塞而不會被銷燬,由於在 SHUTDOWN 狀態下不容許再有新的任務添加到 workQueue 中,這樣一來線程池永遠都關閉不了了;
由上可知,shutdown 方法與 getTask 方法(從隊列中獲取任務時)存在競態條件;
解決這一問題就須要用到線程的中斷,也就是爲何要用 interruptIdleWorkers 方法。在調用 workQueue.take()
時,若是發現當前線程在執行以前或者執行期間是中斷狀態,則會拋出 InterruptedException,解除阻塞的狀態;
可是要中斷工做線程,還要判斷工做線程是不是空閒的,若是工做線程正在處理任務,就不該該發生中斷;
因此 Worker 繼承自 AQS,在工做線程處理任務時會進行 lock,interruptIdleWorkers 在進行中斷時會使用 tryLock 來判斷該工做線程是否正在處理任務,若是 tryLock 返回 true,說明該工做線程當前未執行任務,這時才能夠被中斷。
下面就來分析一下 interruptIdleWorkers 方法。
private void interruptIdleWorkers() { interruptIdleWorkers(false); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
interruptIdleWorkers 遍歷 workers 中全部的工做線程,若線程沒有被中斷 tryLock 成功,就中斷該線程。
爲何須要持有 mainLock ?由於 workers 是 HashSet 類型的,不能保證線程安全。
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); // 中斷全部工做線程,不管是否空閒 interruptWorkers(); // 取出隊列中沒有被執行的任務 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
shutdownNow 方法與 shutdown 方法相似,不一樣的地方在於:
設置狀態爲 STOP;
中斷全部工做線程,不管是不是空閒的;
取出阻塞隊列中沒有被執行的任務並返回。
shutdownNow 方法執行完以後調用 tryTerminate 方法,該方法在上文已經分析過了,目的就是使線程池的狀態設置爲 TERMINATED。
經過線程池提供的參數進行監控。線程池裏有一些屬性在監控線程池的時候可使用
getTaskCount:線程池已經執行的和未執行的任務總數;
getCompletedTaskCount:線程池已完成的任務數量,該值小於等於 taskCount;
getLargestPoolSize:線程池曾經建立過的最大線程數量。經過這個數據能夠知道線程池是否滿過,也就是達到了maximumPoolSize;
getPoolSize:線程池當前的線程數量;
getActiveCount:當前線程池中正在執行任務的線程數量。
經過這些方法,能夠對線程池進行監控,在 ThreadPoolExecutor 類中提供了幾個空方法,如 beforeExecute 方法,afterExecute 方法和 terminated 方法,能夠擴展這些方法在執行前或執行後增長一些新的操做,例如統計線程池的執行任務的時間等,能夠繼承自 ThreadPoolExecutor 來進行擴展。
到此,關於 ThreadPoolExecutor 的內容就講完了。