在一次聚會中,我和一個騰訊大佬聊起了池化技術,說起到java的線程池實現問題,我說這個我懂啊,而後巴拉巴拉說了一大堆,而後騰訊大佬問我說,那你知道線程池有什麼缺陷嗎?我頓時啞口無言,甘拜下風,因此此次我再回來思考一下線程池的實現原理java
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { //校驗幾個參數不能小於零,不然拋出異常 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
默認的幾個屬性:數據庫
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 這裏 COUNT_BITS 設置爲 29(32-3),意味着前三位用於存放線程狀態,後29位用於存放線程數 private static final int COUNT_BITS = Integer.SIZE - 3; //最大線程數是 2^29-1=536870911 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits //111 00000000000000000000000000000 private static final int RUNNING = -1 << COUNT_BITS; // 000 00000000000000000000000000000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 001 00000000000000000000000000000 private static final int STOP = 1 << COUNT_BITS; // 010 00000000000000000000000000000 private static final int TIDYING = 2 << COUNT_BITS; // 011 00000000000000000000000000000 private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl //將CAPACITY取費後和c進行取與運算,能夠獲得高3位的值,即線程池的狀態 private static int runStateOf(int c) { return c & ~CAPACITY; } //將c和CAPACITY取與運算,能夠獲得低29位的值,即線程池的個數 private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
採用一個 32 位的整數來存放線程池的狀態和當前池中的線程數,其中高 3 位用於存放線程池狀態,低 29 位表示線程數(CAPACITY)緩存
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //若是當前的線程數小於corePoolSize if (workerCountOf(c) < corePoolSize) { //調用addWorker新建一個線程 if (addWorker(command, true)) return; c = ctl.get(); } // 到這裏說明,要麼當前線程數大於等於核心線程數,要麼剛剛 addWorker 失敗了 //校驗當前線程狀態是RUNNING,並將command入隊 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //若是不是運行狀態,那麼移除隊列,並執行拒絕策略 if (! isRunning(recheck) && remove(command)) reject(command); // 若是線程池仍是 RUNNING 的,而且線程數爲 0,那麼開啓新的線程 //防止任務提交到隊列中了,可是線程都關閉了 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //到這裏說明隊列已經滿了,因此新建一個線程,若是新建的線程數已經超過了maximumPoolSize,那麼執行拒絕策略 else if (!addWorker(command, false)) reject(command); }
用一張圖來歸納一下上面的內容:
ide
咱們下面看一下addWorker是如何建立線程的:oop
addWorker源碼分析
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); //獲取當前線程池狀態 int rs = runStateOf(c); //1 // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); //2. 校驗傳入的線程數是否超過了容量大小, 或者是否超過了corePoolSize或maximumPoolSize if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //到了這裏說明線程數沒有超,那麼就用CAS將線程池的個數加1 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl //3 說明有其餘的線程搶先更新了狀態,繼續下一輪的循環,跳到外層循環 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //建立一個線程 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); //4 若是線程是沒有問題的話,那麼將worker加入到隊列中 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); // largestPoolSize 用於記錄 workers 中的個數的最大值 // 由於 workers 是不斷增長減小的,經過這個值能夠知道線程池的大小曾經達到的最大值 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } //若是worker入隊成功,那麼啓動線程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { //若是worker啓動失敗,那麼就回滾woker線程建立的狀態 if (! workerStarted) addWorkerFailed(w); } // 返回線程是否啓動成功 return workerStarted; }
addWorkerFailed學習
private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }
addWorkerFailed的處理就是將workers集合裏面的worker移除,而後count減1,優化
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } .... }
Worker是繼承AQS對象的,在建立Worker對象的時候會傳入一個Runnable對象,並設置AQS的state狀態爲-1,並從線程工廠中新建一個線程ui
調用thread.start方法會調用到Worker的run方法中this
public void run() { runWorker(this); }
Worker的run方法會調用到ThreadPoolExecutor的runWorker方法
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //若是task爲空,那麼就從workQueue裏面獲取task while (task != null || (task = getTask()) != null) { w.lock(); // 若是線程池狀態大於等於 STOP,那麼意味着該線程也要中斷 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 這是一個鉤子方法 beforeExecute(wt, task); Throwable thrown = null; try { //執行任務 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 這是一個鉤子方法 afterExecute(task, thrown); } } finally { // 置空 task,準備 getTask 獲取下一個任務 task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //異常狀況或getTask獲取不到任務時會執行關閉 processWorkerExit(w, completedAbruptly); } }
傳入一個Worker首先去校驗firstTask是否是null,若是是那麼就調用getTask方法從workQueue隊列裏面獲取,而後判斷一下當前的線程是否須要中斷,如須要的話執行鉤子方法,而後調用task的run方法執行task;
若是while循環裏面getTask獲取不到任務的話,就結束循環調用processWorkerExit方法執行關閉;
若是是異常緣由致使的while循環退出,那麼會調用processWorkerExit並傳入爲true
getTask
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); //要麼狀態大於STOP,要麼狀態等於SHUTDOWN而且隊列是空的,那麼線程數減一後返回null // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // 容許核心線程數內的線程回收,或當前線程數超過了核心線程數,那麼有可能發生超時關閉 // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //校驗線程數是否超了,或者是否超時 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 到 workQueue 中獲取任務 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
這個方法返回null有以下幾種狀況:
processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) { //若是是異常緣由中斷,那麼須要將運行線程數減一 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //設置完成任務數 completedTaskCount += w.completedTasks; //將worker從集合裏移除 workers.remove(w); } finally { mainLock.unlock(); } //判斷當前的線程池是否處於SHUTDOWN狀態,判斷是否要終止線程 tryTerminate(); int c = ctl.get(); //若是是RUNNING或SHUTDOWN則會進入這個方法 if (runStateLessThan(c, STOP)) { //如不是之外中斷則會往下走 if (!completedAbruptly) { //判斷是否保留最少核心線程數 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } //若是當前運行的Worker數比當前所須要的Worker數少的話,那麼就會調用addWorker,添加新的Worker addWorker(null, false); } }
這個任務處理流程看似簡單,實際上有不少坑,你在使用的時候必定要注意。
可是,咱們平時開發的 Web 系統一般都有大量的 IO 操做,比方說查詢數據庫、查詢緩存等等。任務在執行 IO 操做的時候 CPU 就空閒了下來,這時若是增長執行任務的線程數而不是把任務暫存在隊列中,就能夠在單位時間內執行更多的任務,大大提升了任務執行的吞吐量。因此你看 Tomcat 使用的線程池就不是 JDK 原生的線程池,而是作了一些改造,當線程數超過 coreThreadCount 以後會優先建立線程,直到線程數到達 maxThreadCount,這樣就比較適合於 Web 系統大量 IO 操做的場景了,你在實際運用過程當中也能夠參考借鑑。
我在實際項目中就曾經遇到過任務被丟給線程池以後,長時間都沒有被執行的詭異問題。最初,我認爲這是代碼的 Bug 致使的,後來通過排查發現,是由於線程池的 coreThreadCount 和 maxThreadCount 設置的比較小,致使任務在線程池裏面大量的堆積,在調大了這兩個參數以後問題就解決了。跳出這個坑以後,我就把重要線程池的隊列任務堆積量,做爲一個重要的監控指標放到了系統監控大屏上。
咱們這裏直接學習Tomcat是如何優化線程池的,在咱們平時的使用中若是使用LinkedBlockingQueue的話,默認是使用Integer.MAX_VALUE,即無界隊列(這種狀況下若是沒有配置隊列的capacity的話,隊列始終不會滿,那麼始終沒法進入開啓新線程到達maxThreads個數的地步,則此時配置maxThreads實際上是沒有意義的)。
而在Tomcat中使用的是TaskQueue,TaskQueue的隊列capacity爲maxQueueSize,默認也是Integer.MAX_VALUE。可是,其重寫offer方法,當其線程池大小小於maximumPoolSize的時候,返回false,即在必定程度改寫了隊列滿的邏輯,修復了使用LinkedBlockingQueue默認的capacity爲Integer.MAX_VALUE的時候,maxThreads失效的"bug"。從而能夠繼續增加線程到maxThreads,超過以後,繼續放入隊列。
因此綜上,Tomcat的線程池使用了本身擴展的taskQueue,修改了offer的邏輯,以作到最小的改動實現了線程池的改造。
咱們再來回顧一下ThreadPoolExecutor的execute方法是怎麼寫的:
ThreadPoolExecutor#execute
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //這裏,若是使用workQueue的offer成功的話,那麼就不會建立新的線程,若是失敗的話,就會走到else if方法進行建立新的線程 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
TaskQueue
public class TaskQueue extends LinkedBlockingQueue<Runnable> { private ThreadPoolExecutor parent = null; @Override public boolean offer(Runnable o) { //we can't do any checks if (parent==null) return super.offer(o); //we are maxed out on threads, simply queue the object if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o); //we have idle threads, just add it to the queue if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o); //當其線程池大小小於maximumPoolSize的時候,返回false if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false; //if we reached here, we need to add it to the queue return super.offer(o); } }
咱們從這裏能夠看到