Java 中的線程池是運用場景最多的併發框架,幾乎全部須要異步或併發執行任務的程序均可以使用線程池。在開發過程當中,合理地使用線程池可以帶來 3 個好處。java
ThreadPoolExecutor 執行 execute 方法分下面 4 種狀況。編程
1)若是當前運行的線程少於 corePoolSize,則建立新線程來執行任務(注意,執行這一步驟須要獲取全局鎖)。
2)若是運行的線程等於或多於 corePoolSize,則將任務加入 BlockingQueue。
3)若是沒法將任務加入 BlockingQueue(隊列已滿),則建立新的線程來處理任務(注意,執行這一步驟須要獲取全局鎖)。
4)若是建立新線程將使當前運行的線程超出 maximumPoolSize,任務將被拒絕,並調用 RejectedExecutionHandler.rejectedExecution() 方法。併發
ThreadPoolExecutor 採起上述步驟的整體設計思路,是爲了在執行 execute() 方法時,儘量地避免獲取全局鎖(那將會是一個嚴重的可伸縮瓶頸)。在 ThreadPoolExecutor 完成預熱以後(當前運行的線程數大於等於 corePoolSize),幾乎全部的 execute() 方法調用都是執行步驟2,而步驟2不須要獲取全局鎖。框架
上面的流程分析讓咱們很直觀地瞭解了線程池的工做原理,讓咱們再經過源代碼來看看是如何實現的,線程池執行任務的方法以下。咱們從 execute 入手分析源碼。異步
private static final int RUNNING = -1 << COUNT_BITS; // 運行狀態 private static final int SHUTDOWN = 0 << COUNT_BITS; // 拒絕提交新任務,會執行完已提交的任務後關閉線程池 private static final int STOP = 1 << COUNT_BITS; // 拒絕提交新任務,也拒絕執行已提交的任務 private static final int TIDYING = 2 << COUNT_BITS; // 關閉線程池後,線程所有關閉後的狀態,以後回調 terminated private static final int TERMINATED = 3 << COUNT_BITS; // 回調 terminated 方法後狀態變爲 TERMINATED
線程池用 ctl 的低 29 位表示線程池中的線程數,高 3 位表示當前線程狀態。oop
// ctl 高3位表示線程池狀態,低29位表示當前工做線程數 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; // 低29位表示工做線程數 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 最大線程數 0x1fffffff // 獲取線程池狀態、線程總數、構造 ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
// 全局鎖,建立工做線程等操做時須要獲取全局鎖 private final ReentrantLock mainLock = new ReentrantLock(); private final Condition termination = mainLock.newCondition(); // 工做線程 private final HashSet<Worker> workers = new HashSet<Worker>(); private int largestPoolSize; private volatile int corePoolSize;
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // ctl 高3位表示線程池狀態,低29位表示當前工做線程數 int c = ctl.get(); // 1. 小於核心線程數,建立新的線程執行任務。須要獲取全局鎖 if (workerCountOf(c) < corePoolSize) { // addWorker 建立新的工做線程,true 表示核心線程數,false 表示最大線程數 if (addWorker(command, true)) return; c = ctl.get(); } // 2. 核心線程已滿,將任務提交到隊列中。不須要獲取全局鎖 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 2.1 恰好此時線程池關閉了,則須要將任務從隊列中踢除 if (!isRunning(recheck) && remove(command)) reject(command); // 任務被踢除後回滾,執行拒絕任務 // 2.2 線程池工做線程爲0,建立一個新的工做線程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 3. 隊列滿後且線程數小於最大線程數,則建立新的線程執行任務。須要獲取全局鎖 // 4. 超出最大線程拒絕任務 else if (!addWorker(command, false)) reject(command); }
工做線程:線程池建立線程時,會將線程封裝成工做線程 Worker,Worker 在執行完任務後,還會循環獲取工做隊列裏的任務來執行。咱們能夠從 Worker 類的 run() 方法裏看到這點。源碼分析
// Worker 是對線程 Thread 的包裝,實現了 AbstractQueuedSynchronizer private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; // 包裝的線程 Runnable firstTask; // 線程初始化時的任務,能夠爲 null Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } }
思考:Worker 爲何要繼承 AbstractQueuedSynchronizer 實現本身的鎖,而不使用 ReentrantLock 呢?ui
實際上 ReentrantLock 是可重入鎖,而 Worker 實現的是獨佔鎖,只有三種狀 -1(初始化)、0(釋放鎖)、1(佔有鎖)。Worker 之因此實現獨佔鎖是爲了不在線程執行的時候被 interrupted 中斷(下面會講到)。this
// addWorker 建立一個新的工做線程 // firstTask 線程初始化任務,能夠爲 null;core 表示是核心線程仍是最大線程 private boolean addWorker(Runnable firstTask, boolean core) { // 1. 經過自旋線程數+1 compareAndIncrementWorkerCount retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 1.1 1、STOP 不能建立新線程 // 2、SHUTDOWN 時 workQueue 爲空,也不能建立新線程 // firstTask 表示線程初始化任務,是新提交的任務,SHUTDOWN 時拒絕新提交的任務 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false; // 1.2 自旋使線程數+1 for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) // 不斷檢查線程池狀態變化 continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 2. 建立線程 Worker boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 2.1 初始化工做線程 Worker,使用全局鎖添加到 workers 隊列中 w = new Worker(firstTask); final Thread t = w.thread; // threadFactory 可能建立線程失敗,返回 null if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); // 2.2 1、RUNNING能夠建立新線程 // 2、SHUTDOWN不接收新任務,但會執行完 workQueue 的任務 ,所以能夠建立空任務的線程 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) // largestPoolSize 表示線程池運行過程當中達到的最大線程數 largestPoolSize = s; workerAdded = true; // 工做線程添加到 workers 成功 } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; // 啓動線程成功 } } } finally { // 2.3 建立工做線程失敗,回滾 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
總結:spa
addWorker 的 4 種調用方式:
addWorker(command, true)
線程數 < coreSize 時,則建立新線程addWorker(command, false)
當①阻塞隊列已滿,②線程數 < maximumPoolSize 時,則建立新線程addWorker(null, true)
同 1。只是線程初始化任務爲 null,至關於建立一個新的線程。實際的使用是在 prestartCoreThread() 等方法,有興趣的讀者能夠自行閱讀,在此不作詳細贅述。addWorker(null, false)
同 2。只是線程初始化任務爲 null,至關於建立一個新的線程,沒立馬分配任務;在 addWorker 建立線程後調用 t.start() 啓動線程,run 方法主要乾了一件事,調用 runWorker(this),接下來咱們來看看 runWorker 的具體實現。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; // 線程初始化任務 task w.firstTask = null; // 1. Worker 是獨佔鎖,此時狀態由 -1 -> 0,也就是其它線程才能獲取w的鎖,進而interrupt w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 2. 循環經過 getTask 獲取任務,若是不能獲取任務了,退出循環,關閉線程池 // 也就是說 getTask 返回 null 時線程就關閉了 while (task != null || (task = getTask()) != null) { w.lock(); // 獲取鎖,這樣在線程執行過程當中不能中斷線程(interrupt) // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // // 3.1 線程池已經STOP,若是線程尚未被中斷(wt.isInterrupted=false),則調用wt.interrupt中斷線程 // 3.2 若是runStateAtLeast(ctl.get(), STOP)=false,則說明線程池處於RUNNING或SHUTDOWN狀態 // 調用 Thread.interrupted() 後會清空線程的 interrupted 狀態 // Thread.interrupted()&& false 結果始終爲 false,這裏僅僅是爲了調用Thread.interrupted() // 實際上就是:一若是線程已經STOP,則必定要將線程 interrupt // 二若是線程處於運行狀態(包括SHUTDOWN),則必定不能 interrupt(也就是要清除 interrupt 標記) if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); // 執行前 Throwable thrown = null; try { task.run(); // 執行任務 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); // 執行後 } } finally { task = null; w.completedTasks++; // 統計執行的任務數 w.unlock(); // 釋放鎖,能夠被中斷了 } } completedAbruptly = false; // true時表示正常退出,false表示異常退出 } finally { processWorkerExit(w, completedAbruptly); } }
總結,runWoker 具體實現:
// 注意 getTask 前 worker 釋放了鎖,也就是可能被 interrupt 喚醒 private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { // 自旋獲取任務 int c = ctl.get(); int rs = runStateOf(c); // 1. ①STOP直接銷燬線程,②SHUTDOWN時任務隊列爲空時也直接銷燬線程 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); // 原子性更新,工做線程數-1 return null; } int wc = workerCountOf(c); // 當前工做線程數 // 2.1 timed表示是否能夠銷燬線程。timed=true表示超時獲取任務,則可能返回null // 當線程數大於核心線程數或容許銷燬核心線程時 timed=true boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 2.2 一是超過了最大線程數,當線程池啓動後手動修改最大線程數可能會出現這種狀況 // 二是當容許銷燬線程時,獲取任務超時 // 2.3 三是線程池中至少有一個工做線程或任務隊列爲空,則能夠銷燬線程 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) // 失敗重試,此時線程數已經-1 return null; continue; } try { // 3. 獲取任務,無限等待則不會返回 null,也就不會銷燬線程。而限時等待則可能返回 null Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; // 其它線程喚醒等待的線程 } } }
總結,整個 getTask 循環實現:
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 權限檢查 advanceRunState(SHUTDOWN); // 更新線程池狀態爲 SHUTDOWN interruptIdleWorkers(); // 關閉全部的空閒線程 onShutdown(); // 子類實現,如 ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); // 嘗試中止線程池 } public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 權限檢查 advanceRunState(STOP); // 更新線程池狀態爲 SHUTDOWN interruptWorkers(); // 關閉全部的線程 tasks = drainQueue(); // 返回還未執行的任務 } finally { mainLock.unlock(); } tryTerminate(); // 嘗試中止線程池 return tasks; }
總結,shutdown 和 shutdownNow 區別:
// 關閉全部的空閒線程 private void interruptIdleWorkers() { interruptIdleWorkers(false); } // 中斷線程其實是調用 t.interrupt(),須要獲取線程鎖 w.tryLock private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; // 只有空閒線程才能獲取鎖,正在執行的線程沒法獲取鎖,也就沒法中斷 // 這也就是爲何 Worker 要實現獨佔鎖的緣由。 if (!t.isInterrupted() && w.tryLock()) { // 須要獲取w的獨佔鎖 try { t.interrupt(); // 其實是調用 t.interrupt() 中斷線程 // 其實是給能線程設置一個標記位 } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
interruptIdleWorkers 只會嘗試獲取鎖,所以只會中斷空閒線程。而 interruptWorkers 不須要獲取鎖,強行中斷線程。實際上業務線程必須對 interrupt 作出響應才能中斷線程,不然會一直等線程執行結束纔會銷燬。
private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } } // 調用Worker#interruptIfStarted 不須要獲取鎖 void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }
而 interruptIdleWorkers 和 interruptWorkers 都是 interrupt 全部線程, 所以大部分線程將馬上被中斷。之因此是大部分,而不是所有,是由於 interrupt() 方法能力有限。 若是線程中沒有 sleep 、wait、Condition、定時鎖等應用, interrupt() 方法是沒法中斷當前的線程的。因此,ShutdownNow() 並不表明線程池就必定當即就能退出,它可能必需要等待全部正在執行的任務都執行完成了才能退出。 以下面這個線程永遠不會中斷,由於該線程沒有響應 Thread.interrupted() 或者是直接將 InterruptedException 異常 catch 了。
// 沒法響應 interrupted,線程永遠沒法停止。 executorService.submit(() -> { while (true) System.out.println("go go go"); }); executorService.shutdownNow();
final void tryTerminate() { for (;;) { int c = ctl.get(); // 1. RUNNING或SHUTDOWN還有任務執行時不能關閉,TIDYING則已經關閉 if (isRunning(c) || // 1.1 正在運行,不能中斷 runStateAtLeast(c, TIDYING) || // 1.2 已經中斷,不須要執行 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) // 1.3 SHUTDOWN時還有任務執行 return; // 2. 還有線程則關閉空閒線程 if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 3. 工做線程數爲0時,能夠關閉線程池了,設置線程狀態爲TIDYING, // 並回調terminated後,線程的狀態最終變爲TERMINATED // 4. 線程狀態設置失敗,則 CAS 自旋 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
除了 shutdown 和 shutdownNow 外,addWorkerFailed、processWorkerExit、remove 等方法也會調用 tryTerminate 方法。
參考:
天天用心記錄一點點。內容也許不重要,但習慣很重要!