好久沒更新了,緣由並非沒有學習,而是學完了不知道怎麼寫出來,同時還有一股聲音在耳邊告訴我,如今公衆號滿天飛,寫公衆號的人比看公衆號多,同 topic 的文章太多了......。但後面我本身想通了,雖然相似的文章不少,但它不是我寫的,本身寫完有助於對相關知識的梳理,若是恰好能給你們帶來一些幫助,那就更好了,因此白牙仍是鼓勵你們多輸出,經過輸出倒逼輸入,其中的收穫只有作了才知道
經過類圖可知,ThreadPoolExecutor 是一個 ExecutorService,能夠經過池中的線程來執行任務java
// 線程池中重要的變量 ctl,類型爲 AtomicInteger,一個變量同時記錄線程池狀態和線程個數 // Integer 的位數爲 32 位,其中高 3 位表示線程池的狀態,低 29 位表示線程的個數。默認爲 RUNNING 狀態,線程個數爲 0 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 線程個數掩碼位數,去掉高 3 位表明線程個數的 bit 位 private static final int COUNT_BITS = Integer.SIZE - 3; // 線程最大個數,低 29 位 00011111111111111111111111111111 private static final int CAPACITY = (1 << COUNT_BITS) - 1;
不得不佩服寫 JDK 源碼這些大佬,把一個變量用的這麼好,ctl 變量經過位運算同時表達了線程池狀態和線程個數,下面看下與之相關的方法面試
// 計算線程池的狀態 ~CAPACITY 爲:11100000000000000000000000000000,經過讓 ctl 與 ~CAPACITY 相與,至關於取高 3 位的值(前面說了 ctl 高 3 位表示線程池狀態) private static int runStateOf(int c) { return c & ~CAPACITY; } // 計算線程個數 CAPACITY 爲:00011111111111111111111111111111,經過讓 ctl 與 CAPACITY 相與,至關於取低 29 位的值(前面說了 ctl 低 29 位表示線程個數) private static int workerCountOf(int c) { return c & CAPACITY; } // 計算 ct l的值,用線程池狀態和線程個數進行或運算 private static int ctlOf(int rs, int wc) { return rs | wc; }
在源碼中常常看到這些方法,是否是用的很巧妙?redis
// 默認狀態,接收新任務並處理阻塞隊列裏的任務 private static final int RUNNING = -1 << COUNT_BITS; // 拒絕新任務可是處理阻塞隊列裏的任務 private static final int SHUTDOWN = 0 << COUNT_BITS; // 拒絕新任務而且拋棄阻塞隊列裏的任務,同時中斷正在處理的任務 private static final int STOP = 1 << COUNT_BITS; // 全部任務都已經執行完成,線程數是 0,將調用 terminated() 方法 private static final int TIDYING = 2 << COUNT_BITS; // 終止狀態,調用完 terminated() 方法後的狀態 private static final int TERMINATED = 3 << COUNT_BITS;
備註編程
若是想查看上面變量的二進制表現形式,能夠經過方法 Integer.toBinaryString(int i) 查看
RUNNING -> SHUTDOWN:當調用 shutdown() 方法時,也可能隱式的調用 finalize() 方法時(由於 finalize() 方法也是調用的 shutdown() 方法) On invocation of shutdown(), perhaps implicitly in finalize() (RUNNING or SHUTDOWN) -> STOP:當調用 shutdownNow() 方法時 On invocation of shutdownNow() SHUTDOWN -> TIDYING:當隊列和線程池都空時 When both queue and pool are empty STOP -> TIDYING:當線程池爲空時 When pool is empty TIDYING -> TERMINATED:當 terminated() 方法完成時 When the terminated() hook method has completed
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
建立線程池有如上經常使用參數,下面簡單介紹下segmentfault
corePoolSize:線程池中的核心線程數,即便他們處於空閒狀態,除非 allowCoreThreadTimeOut 被設置了 maximumPoolSize:線程池中的最大線程數 workQueue:存放還未被執行任務的阻塞隊列 threadFactory:建立線程的工廠類 rejectHandle:拒絕策略,當線程個數達到最大線程數,同時任務隊列滿了。就會執行拒絕策略。拒絕策略有:AbortPolicy(直接拋出異常)、CallerRunsPolicy(調用者所在線程來執行任務)、DiscardOldestPolicy(從任務隊列中移除一個待執行的任務(最先提交的),而後再次執行任務)、DiscardPolicy(直接拋棄任務) keepAliveTime:存活時間,當線程個數大於了核心線程數,且處於空閒狀態,這些空閒線程可存活的最大時間
使用線程池時,咱們通常是調用 ThreadPoolExecutor.submit(task) 方法,直接把任務交給線程池去處理,而後返回給咱們一個 Future,後面能夠經過 Future.get() 方法獲取任務結果
public Future<?> submit(Runnable task) { // 任務爲空,直接拋異常 if (task == null) throw new NullPointerException(); // 把任務封裝成 RunnableFuture RunnableFuture<Void> ftask = newTaskFor(task, null); // 執行封裝後的任務 execute(ftask); return ftask; }
newTaskFor 方法是把任務封裝成一個 RunnableFuture 類,能夠經過 get 方法獲取結果
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } public FutureTask(Runnable runnable, V result) { // 這裏把 Runnable 適配成 Callable 類型的任務,result 是當任務成功完成時返回的結果,若是須要特殊結 果,就用 null 就好了 this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); // 這裏經過 RunnableAdapter 適配任務和任務的結果 return new RunnableAdapter<T>(task, result); } // 適配類 RunnableAdapter,這種寫法,咱們能夠借鑑下 static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
嘗試把任務交給線程池執行,執行任務的線程多是新建的,也多是複用了線程池中的。若是線程池不能執行該任務(可能緣由有兩個,1.線程池已經關閉了,2.線程達到了最大容量)就會執行拒絕策略
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps:留着原汁原味的總結 * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ // 獲取表示線程池狀態和線程個數的組合變量 ctl int c = ctl.get(); // 判斷線程個數是否小於核心線程數,若是小於就新建一個核心線程來執行任務 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 若是線程池處於 RUNNAING 狀態,就把任務添加到阻塞隊列中(代碼運行到這裏說明要麼線程個數>=核心線程數,要麼執行 addWorder 方法失敗) if (isRunning(c) && workQueue.offer(command)) { // 再次獲取組合變量 ctl,作二次檢查(由於可能在此以前,線程池的狀態已經發生了改變) int recheck = ctl.get(); // 若是線程池狀態不是 RNUUAING 狀態,就把該任務從阻塞任務隊列中移除,並執行拒絕策略 if (! isRunning(recheck) && remove(command)) reject(command); // 若是線程池中線程個數爲 0,就新建一個線程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 若是阻塞任務隊列滿了,新建線程,若是建立線程失敗(即線程個數達到了最大線程個數),執行拒絕策略 else if (!addWorker(command, false)) reject(command); }
當任務被提交到線程池時,判斷運行的線程的個數是否小於 corePoolSize,若是小於就新建一個線程來處理這個任務即便有其餘線程處於空閒狀態。而後再來任務,若是核心線程有空閒的,就直接執行任務。若是核心線程都在忙,那麼就把待執行的任務添加到任務隊列中。微信
若是任務隊列滿了,且運行的線程個數大於 corePoolSize 且小於 maximumPoolSize,那就新建線程來執行任務。網絡
首先會根據當前線程池的狀態和線程數的邊界(核心線程數仍是最大線程數)檢查是否能夠新建一個 worker 線程。若是能夠就新建一個 worker 線程並啓動,而後執行傳過來的任務
/** * 建立新的worker * * @param firstTask 提交給線程的任務,要最早執行,能夠爲 null * @param core 若是爲 true,表示以核心線程數爲界建立線程 爲 false 表示以最大線程數爲界建立線程 * @return */ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { // 獲取 ctl int c = ctl.get(); // 獲取線程池的狀態 int rs = runStateOf(c); // 這裏的判斷條件有點多,拆成 rs>=SHUTDOWN 和 !(rs == SHUTDOWN && firstTask == null &&!workQueue.isEmpty()) // !(rs == SHUTDOWN && firstTask == null &&!workQueue.isEmpty()) 逆着考慮 ,以下: // rs!=SHUTDOWN 也就是爲大於 shutdown,爲 stop,tidying,terminated // firstTask != null // workQueue.isEmpty() // 若是線程池處於關閉狀態,且知足下面條件之一的,不建立 worker // 線程池處於 stop,tidying,terminated 狀態 // firstTask != null // workQueue.isEmpty() // 注意:若是線程池處於 shutdown,且 firstTask 爲 null,同時隊列不爲空,容許建立 worker // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 獲取工做線程數 int wc = workerCountOf(c); // 工做線程數大於最大容量或者工做線程數超過線程數的邊界(根據 core 的值取不一樣的值) 時 不建立worker if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 工做線程數 +1 經過 CAS // 這裏若是失敗,表示有併發操做 if (compareAndIncrementWorkerCount(c)) // 調出循環,執行真正的建立 worker 邏輯 break retry; // 由於存在併發,須要再讀取 ctl 值進行狀態判斷 // Re-read ctl c = ctl.get(); // 若是線程狀態發生了變化,回到外部循環 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 校驗已經都經過,開始建立 worker // 是否已經啓動了 worker boolean workerStarted = false; // 是否已經添加了 worker boolean workerAdded = false; Worker w = null; try { // 把 task 封裝成 worker,經過線程工廠建立線程,最後會把任務設置到 Thread 的 target 屬性上,後續在執行線程的 start 方法時,就會執行對應的任務的 run 方法 w = new Worker(firstTask); // 獲取 worker 中的線程 final Thread t = w.thread; if (t != null) { // 獲取對象鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // 若是線程池處於 Running 狀態 或者 線程池處於 shutdown 狀態且任務爲 null(執行任務隊列中的任務) if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // precheck that t is startable // 檢查線程是否爲啓動狀態,若是爲啓動狀態拋異常 if (t.isAlive()) throw new IllegalThreadStateException(); // 把新建的 worker 添加到 worker 集中 workers.add(w); int s = workers.size(); // largestPoolSize 記錄 workers 中個數存在過的最大值 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 新建的 worker 添加成功就啓動線程,後續有分析 if (workerAdded) { t.start(); workerStarted = true; } } } finally { // 線程沒有啓動成功,對上面建立線程的過程作回滾操做 if (! workerStarted) // 回滾操做,好比把 worker 從 workers 中移除,把線程數減一 addWorkerFailed(w); } return workerStarted; }
回滾以前的 worker 線程建立操做
private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) // 從 workers 中移除 worker workers.remove(w); // 把線程數減一 decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }
在介紹 addWorker 方法時,有一個邏輯是若是新建立的 woker 線程成功添加到 woker 線程集後,會調用線程的 start 方法,其實最後就會執行到 Worker 的 run 方法。由於 Woker 的構造器是經過線程工廠建立的線程,分析以下架構
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 這裏默認的線程工廠是 DefaultThreadFactory this.thread = getThreadFactory().newThread(this); }
經過線程工廠建立線程,最後調用 Thread 的構造器,Thread(... Runnable target ...),要執行的任務做爲 target 參數建立線程,而後調用 Thread.start 方法後會執行 run 方法,而後會執行 target.run,相似代理併發
@Override public void run() { if (target != null) { // 這個 target 就是建立線程時傳遞過來的那個任務 target.run(); } }
而後就能夠執行咱們的 worker 的 run 方法了,run 方法又會調用 runWorker 方法,下面咱們看下 這個方法異步
不斷從任務隊列中取任務並執行
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // 把 status 設置爲 0,容許中斷 w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 從任務隊列中獲取任務並執行 while (task != null || (task = getTask()) != null) { // 這裏加鎖是爲了不任務運行期間,其餘線程調用 shutdown 方法關閉線程池中正在執行任務的線程 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // 若是線程池狀態大於或等於 stop,即 runStateAtLeast(ctl.get(), STOP) 爲 true,這個時候就要確保線程是中斷的 // 不用看||後面的條件,直接判斷 !wt.isInterrupted(),由於線程池狀態爲暫停,要確保線程中斷,若是沒有中斷,就要手動中斷線程,即執行 wt.interrupt() // 若是線程池狀態不是 stop,即 runStateAtLeast(ctl.get(), STOP) 爲 false,就要確保線程沒有中斷,這樣才能在後面執行任務 // 這時候須要看 || 後面的 (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)) ,由於要確保線程沒有中斷,調用Thread.interrupted()清除中斷狀態, // 這裏須要再次進行驗證線程池的狀態,由於可能會有 shutdownNow 的狀況 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 空方法體,子類能夠實現,作一些特殊化處理工做 beforeExecute(wt, task); Throwable thrown = null; try { // 執行任務 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 空方法體,子類能夠實現,作一些特殊化處理工做 afterExecute(task, thrown); } } finally { task = null; // 統計當前 worker 完成了多少任務 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 執行清理工做 processWorkerExit(w, completedAbruptly); } }
在 runWorker 方法中,有個 getTask 方法,下面簡單介紹下
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 知足下面兩種狀況任意一個就返回 null 同時把線程個數減 1 // 1.線程池已經處於關閉狀態 // 2.線程池處於 shutdown,且隊列爲空 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // 判斷線程是否有時效性,前面說過,若是把 allowCoreThreadTimeOut 設爲 false,那麼核心線程數之內的線程是不會關閉的。若是設爲 true 就只會存活 keepAliveTime 這麼長時間 // 若是線程數大於核心線程數的線程都有時效性 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 這個判斷邏輯就是下面方法總結的第①點和第④點(這段代碼我看了足足有半個小時,結合方法的註釋終於弄明白了,感受本身好笨) // 超時且超時的 worker 線程須要終止。 // 若是任務隊列非空,要保證當前 worker 線程不是線程池中最後一個線程(若是任務爲空,當前線程是線程池中的最後一個線程也無妨,畢竟任務隊列爲空,當前 worker 線程關閉就關閉了,沒影響) // 這裏的判斷條件能夠當作 if (wc > maximumPoolSize || ((timed && timedOut) && (wc > 1 || workQueue.isEmpty())) if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 從隊列中取任務,分爲有超時限制和非超時限制兩種狀況 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { // 這裏會拋出中斷異常是由於可能會調用 setMaximumPoolSize 方法,把線程的最大數設置小了,那可能存在當前線程數大於新的最大線程數 // 這樣就得關閉多餘的線程,因此從新進入 for 循環,並返回 null timedOut = false; } } }
1.返回 task
2.返回 null,這種狀況是該 worker 線程須要退出,由於線程的數量減小了,發生這種狀況的可能緣由有以下 4 個
①線程池中的線程個數大於最大線程數了(由於能夠經過 setMaximumPoolSize 方法進行設置)
②線程池關閉了【既拒絕新任務,又不執行任務隊列中的任務】
③線程池處於 shutdown,同時任務隊列爲空【拒絕新任務】
④超時且超時的 worker 線程須要終止。若是任務隊列非空,要保證當前 worker 線程不是線程池中最後一個線程(若是任務爲空,當前線程是線程池中的最後一個線程也無妨,畢竟任務隊列爲空,當前 worker 線程關閉就關閉了,沒影響)
runWorker 方法中還有個執行清理工做的 processWorkerExit 方法,下面簡單介紹下
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 若是是忽然完成,須要調整線程數 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 計算線程池完成的任務個數,並從 worke r線程集中刪除當前 worker 線程 completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } // 嘗試把線程池的狀態設置爲 TERMINATED,該方法在後面分析 tryTerminate(); int c = ctl.get(); // 線程池狀態至少爲 STOP if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } // 新建 worker 線程的條件爲:當前線程數小於核心線程數或者任務隊列不爲空但沒有運行的線程了(容許核心線程超時的狀況下) addWorker(null, false); } }
final void tryTerminate() { for (;;) { int c = ctl.get(); // 若是處於下面三種任意一種狀況,就不能把線程池的狀態設爲 TERMINATED if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 代碼執行到這裏,說明有資格終止了。可是若是這個時候線程個數非 0,就中斷一個空閒的線程來確保 shutdown 信號傳播 if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 設置線程池狀態爲 TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { // 設置線程池狀態爲 TERMINATED ctl.set(ctlOf(TERMINATED, 0)); // 激活調用線程池中因調用 awaitTermination 系列方法而阻塞的線程 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
若是認真結合源碼看這篇文章到這裏的話,應該對線程池的執行原理有點小感受了吧?下面剩下最後的內容了,就是線程池關閉方法
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 權限校驗 checkShutdownAccess(); // 設置線程池狀態爲 SHUTDOWN advanceRunState(SHUTDOWN); // 中斷空閒線程 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 嘗試設置線程池狀態爲 TERMINATED tryTerminate(); }
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 權限校驗 checkShutdownAccess(); // 設置線程池狀態爲 STOP advanceRunState(STOP); // 中斷全部線程 interruptWorkers(); // 將任務隊列中的任務移動到 tasks 中 tasks = drainQueue(); } finally { mainLock.unlock(); } // 嘗試設置線程池狀態爲 TERMINATED tryTerminate(); return tasks; }
這篇文章是假期寫的,算是對線程池這塊知識的一個小梳理,固然裏面也許有問題,白牙但願你能批判的繼承,發現問題能夠去公衆號【天天曬白牙】留言指出,也能夠加我微信【dingaiminIT】,咱們一塊兒交流討論。
三面阿里被掛,幸獲內推名額,歷經 5 面終獲口碑 offer
老年代又佔用100%了,順便發現了vertx-redis-client 的bug
- https://juejin.im/entry/59b23...
- 《Java異步編程實戰》
歡迎關注公衆號 【天天曬白牙】,獲取最新文章,咱們一塊兒交流,共同進步!