前兩天和粉絲聊天的時候,粉絲問了我一個挺有意思的問題,說他以前在面試的時候被問到線程池的線程複用原理,當時我跟他簡單的說了一下,沒想到過了幾天又來問我這個問題了,說他最近又被問到了這個問題.......想了想,乾脆寫篇文章把這個東西講清楚吧,滿滿的乾貨都放在下面了java
在線程池中,經過同一個線程去執行不一樣的任務,這就是線程複用。面試
假設如今有 100 個任務,咱們建立一個固定線程的線程池(FixedThreadPool),核心線程數和最大線程數都是 3,那麼當這個 100 個任務執行完,都只會使用三個線程。數組
示例:ide
public class FixedThreadPoolDemo { static ExecutorService executorService = Executors.newFixedThreadPool(3); public static void main(String[] args) { for (int i = 0; i < 100; i++) { executorService.execute(() -> { System.out.println(Thread.currentThread().getName() + "-> 執行"); }); } // 關閉線程池 executorService.shutdown(); } }
執行結果:源碼分析
pool-1-thread-1-> 執行 pool-1-thread-2-> 執行 pool-1-thread-3-> 執行 pool-1-thread-1-> 執行 pool-1-thread-3-> 執行 pool-1-thread-2-> 執行 pool-1-thread-3-> 執行 pool-1-thread-1-> 執行 ...
線程池將線程和任務進行解耦,線程是線程,任務是任務,擺脫了以前經過 Thread 建立線程時的一個線程必須對應一個任務的限制。線程
在線程池中,同一個線程能夠從阻塞隊列中不斷獲取新任務來執行,其核心原理在於線程池對 Thread 進行了封裝,並非每次執行任務都會調用 Thread.start() 來建立新線程,而是讓每一個線程去執行一個「循環任務」,在這個「循環任務」中不停的檢查是否有任務須要被執行,若是有則直接執行,也就是調用任務中的 run 方法,將 run 方法當成一個普通的方法執行,經過這種方式將只使用固定的線程就將全部任務的 run 方法串聯起來。code
這部份內容在 Java 線程池的各個參數的含義 討論過,這裏咱們再複習一次,再從中去了解線程複用。blog
3.1 流程圖隊列
當任務提交以後,線程池首先會檢查當前線程數,若是當前的線程數小於核心線程數(corePoolSize),好比最開始建立的時候線程數爲 0,則新建線程並執行任務。
當提交的任務不斷增長,建立的線程數等於核心線程數(corePoolSize),新增的任務會被添加到 workQueue 任務隊列中,等待覈心線程執行完當前任務後,從新從 workQueue 中獲取任務執行。
假設任務很是多,達到了 workQueue 的最大容量,可是當前線程數小於最大線程數(maximumPoolSize),線程池會在覈心線程數(corePoolSize)的基礎上繼續建立線程來執行任務。
假設任務繼續增長,線程池的線程數達到最大線程數(maximumPoolSize),若是任務繼續增長,這個時候線程池就會採用拒絕策略來拒絕這些任務。
在任務不斷增長的過程當中,線程池會逐一進行如下 4 個方面的判斷rem
核心線程數(corePoolSize)
任務隊列(workQueue)
最大線程數(maximumPoolSize)
拒絕策略
java.util.concurrent.ThreadPoolExecutor#execute
public void execute(Runnable command) { // 若是傳入的Runnable的空,就拋出異常 if (command == null) throw new NullPointerException(); int c = ctl.get(); // 線程池中的線程比核心線程數少 if (workerCountOf(c) < corePoolSize) { // 新建一個核心線程執行任務 if (addWorker(command, true)) return; c = ctl.get(); } // 核心線程已滿,可是任務隊列未滿,添加到隊列中 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 任務成功添加到隊列之後,再次檢查是否須要添加新的線程,由於已存在的線程可能被銷燬了 if (! isRunning(recheck) && remove(command)) // 若是線程池處於非運行狀態,而且把當前的任務從任務隊列中移除成功,則拒絕該任務 reject(command); else if (workerCountOf(recheck) == 0) // 若是以前的線程已經被銷燬完,新建一個非核心線程 addWorker(null, false); } // 核心線程池已滿,隊列已滿,嘗試建立一個非核心新的線程 else if (!addWorker(command, false)) // 若是建立新線程失敗,說明線程池關閉或者線程池滿了,拒絕任務 reject(command); }
//若是傳入的Runnable的空,就拋出異常 if (command == null) throw new NullPointerException();
execute 方法中經過 if 語句判斷 command ,也就是 Runnable 任務是否等於 null,若是爲 null 就拋出異常。
if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); }
判斷當前線程數是否小於核心線程數,若是小於核心線程數就調用 addWorker() 方法增長一個 Worker,這裏的 Worker 就能夠理解爲一個線程。
addWorker 方法的主要做用是在線程池中建立一個線程並執行傳入的任務,若是返回 true 表明添加成功,若是返回 false 表明添加失敗。
第一個參數表示傳入的任務
第二個參數是個布爾值,若是布爾值傳入 true 表明增長線程時判斷當前線程是否少於 corePoolSize,小於則增長新線程(核心線程),大於等於則不增長;同理,若是傳入 false 表明增長線程時判斷當前線程是否少於 maximumPoolSize,小於則增長新線程(非核心線程),大於等於則不增長,因此這裏的布爾值的含義是以核心線程數爲界限仍是以最大線程數爲界限進行是否新增非核心線程的判斷
這一段判斷相關源碼以下
private boolean addWorker(Runnable firstTask, boolean core) { ... int wc = workerCountOf(c);//當前工做線程數 //判斷當前工做線程數>=最大線程數 或者 >=核心線程數(當core = true) if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; ...
最核心的就是 core ? corePoolSize : maximumPoolSize 這個三目運算。
// 核心線程已滿,可是任務隊列未滿,添加到隊列中 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 任務成功添加到隊列之後,再次檢查是否須要添加新的線程,由於已存在的線程可能被銷燬了 if (! isRunning(recheck) && remove(command)) // 若是線程池處於非運行狀態,而且把當前的任務從任務隊列中移除成功,則拒絕該任務 reject(command); else if (workerCountOf(recheck) == 0) // 若是以前的線程已經被銷燬完,新建一個非核心線程 addWorker(null, false); }
若是代碼執行到這裏,說明當前線程數大於或等於核心線程數或者 addWorker 失敗了,那麼就須要經過
if (isRunning(c) && workQueue.offer(command)) 檢查線程池狀態是否爲 Running,若是線程池狀態是 Running 就經過 workQueue.offer(command) 將任務放入任務隊列中,
任務成功添加到隊列之後,再次檢查線程池狀態,若是線程池不處於 Running 狀態,說明線程池被關閉,那麼就移除剛剛添加到任務隊列中的任務,並執行拒絕策略,代碼以下:
if (! isRunning(recheck) && remove(command)) // 若是線程池處於非運行狀態,而且把當前的任務從任務隊列中移除成功,則拒絕該任務 reject(command);
下面咱們再來看後一個 else 分支:
else if (workerCountOf(recheck) == 0) // 若是以前的線程已經被銷燬完,新建一個非核心線程 addWorker(null, false);
進入這個 else 說明前面判斷到線程池狀態爲 Running,那麼當任務被添加進來以後就須要防止沒有可執行線程的狀況發生(好比以前的線程被回收了或意外終止了),因此此時若是檢查當前線程數爲 0,也就是 workerCountOf(recheck) == 0,那就執行 addWorker() 方法新建一個非核心線程。
咱們再來看最後一部分代碼:
// 核心線程池已滿,隊列已滿,嘗試建立一個非核心新的線程 else if (!addWorker(command, false)) // 若是建立新線程失敗,說明線程池關閉或者線程池滿了,拒絕任務 reject(command);
執行到這裏,說明線程池不是 Running 狀態,又或者線程數 >= 核心線程數而且任務隊列已經滿了,根據規則,此時須要添加新線程,直到線程數達到「最大線程數」,因此此時就會再次調用 addWorker 方法並將第二個參數傳入 false,傳入 false 表明增長非核心線程。
addWorker 方法若是返回 true 表明添加成功,若是返回 false 表明任務添加失敗,說明當前線程數已經達到 maximumPoolSize,而後執行拒絕策略 reject 方法。
若是執行到這裏線程池的狀態不是 Running,那麼 addWorker 會失敗並返回 false,因此也會執行拒絕策略 reject 方法。
java.util.concurrent.ThreadPoolExecutor#runWorker
省略掉部分和複用無關的代碼以後,代碼以下:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // 釋放鎖 設置work的state=0 容許中斷 boolean completedAbruptly = true; try { //一直執行 若是task不爲空 或者 從隊列中獲取的task不爲空 while (task != null || (task = getTask()) != null) { task.run();//執行task中的run方法 } } completedAbruptly = false; } finally { //1.將 worker 從數組 workers 裏刪除掉 //2.根據布爾值 allowCoreThreadTimeOut 來決定是否補充新的 Worker 進數組 workers processWorkerExit(w, completedAbruptly); } }
能夠看到,實現線程複用的邏輯主要在一個不停循環的 while 循環體中。
經過獲取 Worker 的 firstTask 或者經過 getTask 方法從 workQueue 中獲取待執行的任務
直接經過 task.run() 來執行具體的任務(而不是新建線程)
在這裏,咱們找到了線程複用最終的實現,經過取 Worker 的 firstTask 或者 getTask 方法從 workQueue 中取出了新任務,並直接調用 Runnable 的 run 方法來執行任務,也就是如以前所說的,每一個線程都始終在一個大循環中,反覆獲取任務,而後執行任務,從而實現了線程的複用。
這篇關於線程池的線程複用原理的文章就到這裏了,你們看完有什麼不懂的歡迎在下方留言評論,也能夠私信問我,我看到了通常都會回覆的!