在 java 中,線程池 ThreadPoolExecutor 是一個繞不過去的類,它是享元模式思想的體現,經過在容器中建立必定數量的線程加以重複利用,從而避免頻繁建立線程帶來的額外開銷。一個設置合理的線程池能夠提升任務響應的速度,而且避免線程數超過硬件能力帶來的意外狀況。java
在本文,將深刻線程池源碼,瞭解線程池的底層實現與運行機制。函數
ThreadPoolExecutor 類一共提供了四個構造方法,咱們基於參數最完整構造方法瞭解一下線程池建立所須要的變量:this
public ThreadPoolExecutor(int corePoolSize, // 核心線程數 int maximumPoolSize, // 最大線程數 long keepAliveTime, // 非核心線程閒置存活時間 TimeUnit unit, // 時間單位 BlockingQueue<Runnable> workQueue, // 工做隊列 ThreadFactory threadFactory, // 建立線程使用的線程工廠 RejectedExecutionHandler handler // 拒絕策略) { }
線程池擁有一個 AtomicInteger 類型的成員變量 ctl ,經過位運算分別使用 ctl 的高位低位以便在一個值中存儲線程數量以及線程池狀態。線程
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 29(32-3) private static final int COUNT_BITS = Integer.SIZE - 3; // 容許的最大工做線程(2^29-1 約5億) private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 運行狀態。線程池接受並處理新任務 private static final int RUNNING = -1 << COUNT_BITS; // 關閉狀態。線程池不能接受新任務,處理完剩餘任務後關閉。調用shutdown()方法會進入該狀態。 private static final int SHUTDOWN = 0 << COUNT_BITS; // 中止狀態。線程池不能接受新任務,而且嘗試中斷舊任務。調用shutdownNow()方法會進入該狀態。 private static final int STOP = 1 << COUNT_BITS; // 整理狀態。由關閉狀態轉變,線程池任務隊列爲空時進入該狀態,會調用terminated()方法。 private static final int TIDYING = 2 << COUNT_BITS; // 終止狀態。terminated()方法執行完畢後進入該狀態,線程池完全中止。 private static final int TERMINATED = 3 << COUNT_BITS;
這裏比較很差理解的是上述-1的位運算,下面咱們來分析一下:日誌
在計算機中,二進制負數通常用補碼錶示,即源碼取反再加一。但又有這種說法,即將最高位做爲符號位,0爲正數,1爲負數。實際上二者是能夠結合在一塊兒看的。假如數字是單字節數,1 字節對應8 bit,即八位,如今,咱們要計算 - 1。code
按照第二種說法,最高位爲符號位,則有 1/000 0001,而後按第一種說法取反後+1,而且符號位不變,則有 1/111 1110 + 1,即 1/111 1111。對象
如今回到 -1 << COUNT_BITS
這行代碼:繼承
一個 int 是 4 個字節,對應 32 bit,按上述過程 -1 轉爲二進制即爲 1/111......1111(32個1), COUNT_BITS
是 29,-1 左移 29 位,最終獲得 111.0...0000。隊列
同理,計算其餘的幾種狀態,可知分別是:rem
狀態 | 二進制 |
---|---|
RUNNING | 111...0....00 |
SHUTDOWN | 000...0....00 |
STOP | 001...0....00 |
TIDYING | 010...0....00 |
TERMINATED | 011...0....00 |
其中,咱們能夠知道 SHUTDOWN 狀態轉爲十進制也是 0 ,而 RUNNING 做爲有符號數,它的最高位是 1,說明轉爲十進制之後是個負數,其餘的狀態最高位都是 0,轉爲十進制以後都是正數,也就是說,咱們能夠這麼認爲:
小於 SHUTDOWN 的就是 RUNNING,大於 SHUTDOWN 就是中止或者中止中。
這也是後面狀態計算的一些寫法的基礎。好比 isRunning()
方法:
private static boolean isRunning(int c) { return c < SHUTDOWN; }
// 根據當前運行狀態和工做線程數獲取當前的 ctl private static int ctlOf(int rs, int wc) { return rs | wc; } // 獲取運行狀態 private static int runStateOf(int c) { return c & ~CAPACITY; } // 獲取工做線程數 private static int workerCountOf(int c) { return c & CAPACITY; }
前面獲取狀態的時候調用了 ctlOf()
方法,根據前面,咱們能夠知道,CAPACITY
其實是 29 位,而線程狀態用的是 32 - 30 共 3 位,也就是說,ctl 共 32 位,高3 位用於表示線程池狀態,而低 29 位表示工做線程的數量。
這樣上述三個方法就很好理解了:
ctlOf()
:獲取 ctl。
將工做線程數量與運行狀態進行於運算,假如咱們處於 RUNNING,而且有 1 個工做線程,那麼 ctl = 111....000 | 000.... 001,最終獲得 111 ..... 001;
runStateOf()
:獲取運行狀態。
繼續根據上文的數據,~CAPACITY
取反即爲 111....000,與運行狀態 111...0000 與運算,最終獲得 111....000,至關於低位掩碼,消去低 29 位;
workerCountOf()
:獲取工做線程數。
同理,c & CAPACITY
裏的 CAPACITY 至關於高位掩碼,用於消去高 3 位,最終獲得 00...001,即工做線程數。
同理,若是要增長工做線程數,就直接經過 CAS 去遞增 ctl,好比新建線程中使用的公共方法:
private boolean compareAndIncrementWorkerCount(int expect) { // 經過 CAS 遞增 ctl return ctl.compareAndSet(expect, expect + 1); }
要改變線程池狀態,就根據當前工做線程和要改變的狀態去合成新的 ctl,而後 CAS 改變 ctl,好比 shutdown()
中涉及的相關代碼:
private void advanceRunState(int targetState) { for (;;) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || // 經過 CAS 改變 ctl ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } }
線程池任務提交方法是 execute()
,根據代碼可知,當一個任務進來時,分四種狀況:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 1.當前工做線程數小於核心線程數,啓動新線程 if (workerCountOf(c) < corePoolSize) { // 添加任務 if (addWorker(command, true)) return; c = ctl.get(); } // 2. 當前工做線程數大於核心線程數,可是未大於最大線程數,嘗試添加到工做隊列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 若是當前線程處於非運行態,而且移除當前任務成功,則拒絕任務(防止添加到一半就shutdown) if (! isRunning(recheck) && remove(command)) reject(command); // 若是當前沒有工做線程了,就啓動新線程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 3.當前線程池核心線程和隊列都滿了,嘗試建立新非核心線程 else if (!addWorker(command, false)) // 4.線程池完全滿了,執行拒絕策略 reject(command); }
添加任務依靠 addWorker()
方法,這個方法很長,可是主要就幹了兩件事:
private boolean addWorker(Runnable firstTask, boolean core) { retry: // 1.改變 ctl 使工做線程+1 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 若是當前不處於運行狀態,傳入任務爲空,而且任務隊列爲空的時候拒絕添加新任務 // 即線程池 shutdown 時不讓添加新任務,可是運行繼續跑完任務隊列裏的任務。 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); // 線程不容許超過最大線程數,核心線程不容許超過最大核心線程數 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // CAS 遞增工做線程數 if (compareAndIncrementWorkerCount(c)) // 失敗了就從新回到上面的retry處繼續往下執行 break retry; // 更新 ctl c = ctl.get(); // 若是運行狀態改變了就所有歷來 if (runStateOf(c) != rs) continue retry; } } // 2.啓動新線程 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 建立新線程 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 加鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); // 若是線程池處於運行狀態,或者沒有新任務的SHUTDOWN狀態(即SHUTDOW之後還在消費工做隊列裏的任務) if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 線程是否在未啓動前就已經啓動了 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); // 若是集合中的工做線程數大於最大線程數,則將池中最大線程數改成當前工做線程數 if (s > largestPoolSize) largestPoolSize = s; // 線程建立完成 workerAdded = true; } } finally { mainLock.unlock(); } // 若是線程成功建立,就啓動線程,而且更改啓動狀態爲成功 if (workerAdded) { t.start(); workerStarted = true; } } } finally { // 若是線程啓動不成功,就執行失敗策略 if (! workerStarted) // 啓動失敗策略,從當前工做線程隊列移除當前啓動失敗的線程,遞減工做線程數,而後嘗試關閉線程池(若是當前任務就是線程池最後一個任務) addWorkerFailed(w); } return workerStarted; }
根據上文,不難發現,在線程池中線程每每以 Worker 對象的方式存在,那麼這個 Worker 又是何方神聖?
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // 工做線程 final Thread thread; // 要執行的任務 Runnable firstTask; // 線程執行過的任務數 volatile long completedTasks; // 經過線程工廠建立工做線程 Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } // 執行任務 public void run() { runWorker(this); } ... ... }
這個 Worker 類繼承了 AQS,也就是說,他自己就至關於一個同步隊列,結合他的成員變量 thread 和 firstTask,能夠知道他實際上就是咱們線程池中所說的「線程」。除了父類 AQS 自己提供的獨佔鎖之外,Worker 還提供了一些檢查任務線程運行狀態以及中斷線程相關的方法。
此外,線程池中還有一個工做隊列 workers,用於保存當前所有的 Worker:
private final HashSet<Worker> workers = new HashSet<Worker>();
當調用 Worker.run()
的時候,其實調用的是 runWorker()
方法。
runWorker()
方法實際上就是調用線程執行任務的方法,他的邏輯大題是這樣的:
getTask()
中循環等待任務);若是整個流程執行完畢,就刪除當前的 Worker。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // 新建立的Worker默認state爲-1,AQS的unlock方法會將其改成0,此後容許使用interruptIfStarted()方法進行中斷 // 完成任務之後是否須要移除當前Worker,即當前任務是否意外退出 boolean completedAbruptly = true; try { // 循環獲取任務 while (task != null || (task = getTask()) != null) { // 加鎖,防止 shundown 時中斷正在運行的任務 w.lock(); // 若是線程池狀態爲 STOP 或更後面的狀態,中斷線程任務 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 鉤子方法,默認空實現,須要本身提供 beforeExecute(wt, task); Throwable thrown = null; try { // 執行任務 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 鉤子方法 afterExecute(task, thrown); } } finally { task = null; // 任務執行完畢 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 根據completedAbruptly決定是否要移除意外退出的Worker,並補充新的Worker // 也就是說,若是上述過程順利完成,工做線程沒有掛掉,就不刪除,下次繼續用,不然就幹掉它再補充一個。 processWorkerExit(w, completedAbruptly); } }
在 runWorker()
方法中,經過 getTask()
方法去獲取任務。值得注意的是,超時處理也在此處,簡單的來講,整套流程是這樣的:
runWorker()
中的processWorkerExit()
方法去刪除;換句話說,runWorker()
方法一旦執行完畢,必然會刪除當前的 Worker,而經過 getTask()
拿任務的 Worker,在線程池正常運行的狀態下,核心線程只會一直在 for 循環中等待直到拿到任務,而非核心線程超時之後拿不到任務就會返回一個 null,而後回到 runWorker()
中走完processWorkerExit()
方法被刪除。
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 若是線程池關閉了,而且工做隊列裏的任務都完成了,或者線程池直接進入了 STOP 或更進一步的狀態,就不返回新任務 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } // 獲取當前工做線程 int wc = workerCountOf(c); // 核心線程是否超時(默認false)或當前是否存在非核心線程,即判斷當前當前是否須要進行超時控制 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 判斷線程是否超過最大線程數或存在非核心線程 if ((wc > maximumPoolSize || (timed && timedOut)) // 而且除非任務隊列爲空,不然池中最少有一個線程 && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 獲取任務 Runnable r = timed ? // 阻塞 keepaliveTime 以獲取任務,若是在 keepaliveTime 時間內沒有獲取到任務,則返回 null. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; // 若是獲取不到任務,說明非核心線程超時了,下一輪判斷確認是否退出循環。 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
線程池的中斷方法分爲三種:
shutdown()
:中斷線程池,再也不添加新任務,同時等待當前進行和隊列中的任務完成;shutdownNow()
:當即中斷線程池,再也不添加新任務,同時中斷全部工做中的任務,再也不處理任務隊列中任務。shutdown 是有序關閉。主要乾了三件事:
public void shutdown() { final ReentrantLock mainLock = this.mainLock; // 加鎖 mainLock.lock(); try { checkShutdownAccess(); // 改變當前線程池狀態 advanceRunState(SHUTDOWN); // 中斷當前線程 interruptIdleWorkers(); // 鉤子函數,默認空實現 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
其中,interruptIdleWorkers()
方法以下:
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 遍歷工做隊列中的所有 Worker for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { // 標記爲中斷 t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
shutdownNow()
與 shutdown()
流程相似,可是會直接將狀態轉爲 STOP,在 addWorker()
或者getTask()
等處理任務的相關方法裏,會針對 STOP 或更進一步的狀態作區分,將不會再處理任務隊列中的任務,配合drainQueue()
方法以刪除任務隊列中的任務。
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 改變當前線程池狀態 advanceRunState(STOP); // 中斷當前線程 interruptWorkers(); // 刪除任務隊列中的任務 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
當任務隊列已滿,而且線程池中線程也到達最大線程數的時候,就會調用拒絕策略。也就是reject()
方法
final void reject(Runnable command) { handler.rejectedExecution(command, this); }
拒絕策略共分四種:
咱們能夠簡單的瞭解一下他們的實現:
AbortPolicy
throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
CallerRunsPolicy
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }
DiscardOldestPolicy
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { // 彈出隊頭元素 e.getQueue().poll(); e.execute(r); } }
DiscardPolicy
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // Does nothing }
和 HashMap 與 LinkedHashMap 中的行爲有點相似,在線程池的代碼中,有些方法調用了一些具備空實現的方法,這些方法是提供給用戶去繼承並重寫的鉤子函數,主要包括三個:
beforeExecute()
:在執行任務以前回調afterExecute()
:在任務執行完後回調terminated()
:在線程池中的全部任務執行完畢後回調經過繼承 ThreadPoolExecutor 類,並重寫以上三個方法,咱們能夠進行監控或者輸出日誌,更方便的瞭解線程池的狀態。
值得一提的是,afterExecute()
方法的入參類型是(Runnable r, Throwable t)
,也就是說,若是線程運行中拋出異常,咱們也能夠經過該方法去捕獲異常並做出相應的處理。
線程池提供了四個構造方法,參數最全的構造方法參數按順序有:核心線程數,最大線程數,非核心線程閒置存活時間,存活時間單位,任務隊列,線程工廠,拒絕策略。
線程池共有五種狀態,分別是:RUNNING,SHUTDOWN,STOP,TYDYING,TERMINATED,它們與工做線程數量一同記錄在成員變量 ctl 中,其中高 3 位用於記錄狀態,低 29 位用於記錄工做線程數,實際使用中經過位運算去獲取。
線程池中任務線程以繼承了 AQS 的 Worker 類的實例形式存在。當添加任務時,會有四種狀況:核心線程不滿,優先建立核心線程;核心線程滿,優先添加任務隊列;核心線程與隊列都滿,建立非核心線程;線程和隊列都滿,則執行拒絕策略。
其中,拒絕策略分爲四類,默認的拒絕策略 AbortPolicy;調用者運行策略 CallerRunsPolicy;棄老策略 DiscardOldestPolicy;丟棄策略 DiscardPolicy。
線程池的中斷有兩個方法:shutdown()
與 shutdownNow()
,二者都會讓線程池再也不接受新任務,可是 shutdown()
會等待當前與任務隊列中的任務執行完畢,而 shutdownNow()
會直接中斷當前任務,忽略並刪除任務隊列中的任務。
線程池提供了beforeExecute()
,afterExecute()
,terminated()
三個鉤子函數,其中,afterExecute()
的入參含有拋出的異常,所以能夠藉由該方法處理線程池中線程拋出的異常。