先前,筆者講解到ThreadPoolExecutor.addWorker(Runnable firstTask, boolean core),在這個方法中工做線程可能建立成功,也可能建立失敗,具體視線程池的邊界條件,以及當前內存狀況而定。java
那麼,若是線程池當前的狀態,是容許建立Worker對象的,那麼建立Worker的內部流程又是怎樣呢?線程池爲什麼要使用Worker包裝Thread來建立一個線程,爲什麼不直接使用原生的Thread來建立線程?若是建立Worker的firstTask不爲空,那麼Worker理所固然應該優先執行firstTask任務,若是firstTask爲空,那Worker又要如何獲取任務來執行呢?咱們還有一堆亟待解決的問題。程序員
首先咱們來解決前兩個問題,Worker的建立流程,以及爲何不使用原生Thread代替Worker?首先,Doug Lea用Worker包裝Thread,意味着Worker比Thread擁有更多的功能。例如:Worker會統計它所對應的線程執行了多少任務、經過Worker能夠知道線程是否已啓動、線程是否正在執行任務?而這些信息都是原生Thread所沒有的,因此須要一個Worker類來擴展Thread。安全
在建立Worker時,會先設置其state的值爲-1,表明Worker所對應的線程還沒有啓動,即尚未調用Worker.thread.start(),以後會進行firstTask的賦值,向線程工廠申請建立線程,建立完畢後,等待外部調用Worker.thread.start()啓動一個線程執行Worker.run()方法。多線程
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /* * 當初始化一個Worker時,會向線程工廠申請建立一個Thread對象 * 用來執行任務,爲null表明線程工廠建立失敗。 */ final Thread thread; //首要執行任務,該字段可能爲null。 Runnable firstTask; //thread已完成任務數。 volatile long completedTasks; Worker(Runnable firstTask) { /* * state初始爲-1,表明還未調用Worker.thread.start(), * Worker對應的線程還沒有被建立,還不能中斷。線程啓動後, * 若是線程正在執行任務,state爲1,若是線程啓動後沒在 * 執行任務的狀態則state爲0。 */ setState(-1);//<1> this.firstTask = firstTask; /* * 建立Thread對象的時候,會把Worker對象自己傳入,而Worker * 自己實現了Runnable接口,當調用thead.start()啓動一個線程 * 執行thread.run()時,會進而調用Worker.run()方法。 */ this.thread = getThreadFactory().newThread(this); } //Worker對象將任務的執行委託給ThreadPoolExecutor.runWorker(Worker w). public void run() { runWorker(this); } //判斷Worker是否處於被某個線程持有狀態。 protected boolean isHeldExclusively() { return getState() != 0;//<2> } /* * 線程嘗試持有worker對象,若是worker沒有被某個線程 * 持有,則state爲0,則用CAS的方式將worker的state * 改成1,並設置exclusiveOwnerThread爲當前線程。 */ protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) {//<3> setExclusiveOwnerThread(Thread.currentThread());//<4> return true; } return false; } /* * 線程釋放worker對象,將state改成0,exclusiveOwnerThread * 改成null。 */ protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } /* * 調用父類acquire(int arg)時,會進而調用到Worker自己實現的 * tryAcquire(int unused)。 */ public void lock() { acquire(1);//<5> } public boolean tryLock() { return tryAcquire(1); } /* * 調用父類的release(int arg)時,會進而調用到Worker自己實現的 * tryRelease(int unused)。 */ public void unlock() { release(1);//<6> } public boolean isLocked() { return isHeldExclusively(); } /* * 嘗試中斷worker的對應線程,若是線程已經啓動。建立 * worker時,state爲-1,直到調用worker.thread.start() * 後,worker的state爲0,若是worker的state>=0,則嘗試 * 中斷線程。 */ void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
從上面的代碼咱們能夠注意到,<1>、<2>、<3>、<4>、<5>、<6>處的方法並非Worker自己有的方法,而是Worker繼承自父類AbstractQueuedSynchronizer的方法。那麼Worker爲何須要繼承AbstractQueuedSynchronizer(AQS)?AQS又是何方神聖呢?併發
這裏先簡單介紹下AQS,它定義了若干接口交由程序員實現,諸如:lock()、unlock()、tryAcquire(int arg)、tryRelease(int arg)……等,以保證多個線程不會同時訪問同一資源。ThreadPoolExecutor中的字段mainLock爲可重入鎖ReentrantLock,某種程度上來講也是實現了AQS,ThreadPoolExecutor經過mainLock的lock()、unlock()以保證線程池內一些非線程安全的對象不會出現併發讀寫,如:workers、completedTaskCount……等。函數
public class ReentrantLock implements Lock, java.io.Serializable { private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer {//...} static final class NonfairSync extends Sync {//...} public ReentrantLock() { sync = new NonfairSync(); } //... }
那麼Worker繼承了AQS,Worker也實現了lock()、unlock()方法,說明Worker自己也存在多線程訪問的可能,那是何時會出現多線程訪問Worker呢?這裏咱們先按下這個問題,在介紹完Worker.run()以後你就會明白爲什麼Worker要繼承AQS以保證線程訪問的順序性。ui
咱們知道當調用Worker.thread.start()方法時,會進而調用Worker.run()方法,而Worker.run()方法會進而調用ThreadPoolExecutor.runWorker(Worker w)。下面,咱們來看看runWorker(Worker w)的執行流程:this
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; /* * 將worker.firstTask設置爲null,由於worker可能會存在較長的一段時間, * 而task可能很快執行完畢,避免worker長時間引用已完成task,以便GC回收 * 已完成task。 */ w.firstTask = null; /* * 新建立的worker對象state爲-1,調用worker.unlock()會進而調用 * worker.tryRelease(int unused),將state設置爲0,表明worker * 對應工做線程已啓動,線程處於可中斷狀態。 */ w.unlock(); // allow interrupts /* * 標誌worker是否異常完成,若是在<1>處task爲空,且沒法經過 * getTask()從任務隊列中獲取新的任務,則會跳出循環,並在<3>處 * 賦值爲false,表明worker並無異常完成。 * 無論worker是正常完成仍是異常完成,最後都會將completedAbruptly的結果 * 傳給processWorkerExit(...),若是是worker是正常退出,則將workerCount-1, * 並將worker從workers集合中移除。若是是異常退出,則不減小workerCount,僅僅 * 是將異常worker從workers集合中移除,並嘗試新增一個worker。 */ boolean completedAbruptly = true; try { /* * 若是task不爲空,或者調用getTask()能任務隊列中獲取到新的任務, * 則進入while塊的代碼。若是任務隊列中沒有待執行的任務,調用getTask() * 會讓當前線程陷入阻塞,直到超時或者有新任務進入任務隊列。 */ while (task != null || (task = getTask()) != null) {//<1> /* * 若是能進入循環,表明worker準備開始執行任務,但在執行任務 * 前會先上鎖,等到任務執行結束又會在<2>處釋放鎖,然而線程池 * 又不會讓多個線程同時執行同一個任務,那麼爲何在執行任務前 * 要讓worker先上鎖,執行完畢再釋放鎖呢? * 咱們假設有一個線程池有5個線程,其中A、B線程正在執行任務, * C、D、E處於空閒狀態。因此咱們能肯定A、B兩個worker已經 * 得到了鎖,而C、D、E還阻塞在getTask()方法中。如今線程池 * 執行shutdown()方法,該方法會進而調用interruptIdleWorkers() * 中斷處於空閒狀態的工做線程。而在interruptIdleWorkers()方法中 * 判斷一個worker是否處於空閒,會調用worker.tryLock(),若是能 * 成功獲取到鎖,則表明該worker處於空閒狀態,則中斷該worker對應的 * 線程。所以,一個worker是有可能被多個線程訪問的,好比worker自己 * 對應的線程,又或者關閉線程池的線程。 */ w.lock(); /* * 若是線程池的運行狀態>=STOP,則中斷當前線程。若是運行狀態<STOP, * 則確保線程沒有被中斷。 */ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //空方法,在執行任務前執行 beforeExecute(wt, task); try { task.run();//開始執行任務 afterExecute(task, null);//空方法,在執行任務完畢後執行。 } catch (Throwable ex) { afterExecute(task, ex); throw ex; } } finally { /* * 設置當前任務爲空,這樣就能夠在下一次循環中獲取新的任務。 * 對worker執行的任務數+1,並釋放鎖。 */ task = null; w.completedTasks++; w.unlock();//<2> } } completedAbruptly = false;//<3> } finally { processWorkerExit(w, completedAbruptly); } }
在上面的runWorker(Worker w)中若是執行完worker的首要任務,或者首要任務爲null,便會調用getTask()嘗試從任務隊列中獲取任務,但調用getTask()可能會使當前線程陷入等待或者阻塞直到有任務入隊。getTask()也有可能返回null致使當前worker對應的線程退出,有如下幾個緣由可能致使工做線程退出:線程
private Runnable getTask() { //超時標誌,默認爲false,獲取任務若是超時則會在<5>賦值爲true。 boolean timedOut = false; for (; ; ) { int c = ctl.get(); /* * 若是線程池處於RUNNING狀態,則runStateAtLeast(c, SHUTDOWN) * 爲false,不會再判斷以後的邏輯爲true或者false。 * 若是線程池處於SHUTDOWN狀態,且workQueue.isEmpty()爲true,即 * 任務隊列爲空,則直接返回。若是任務隊列不爲空,則沒法進入if分支, * 依然要返回任務,按照SHUTDOWN的要求再也不接受新任務,但仍要處理隊列 * 中的任務。 * 若是線程池處於STOP狀態,即使任務隊列不爲空,也再也不處理,則直接進入 * if分支後返回。 * 因此總結一下,只有兩種狀況不會進入此分支: * 1.線程池處於RUNNING狀態。 * 2.線程池處於SHUTDOWN狀態且任務隊列不爲空。 */ if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {//<1> decrementWorkerCount(); return null; } int wc = workerCountOf(c); /* * timed決定是若是任務隊列沒有任務的話,是以無限期的方式 * 等待任務入隊,或者一旦等待時間超過keepAliveTime,則 * 返回null。 * 若是allowCoreThreadTimeOut爲true,則核心線程等待 * 任務時間超過keepAliveTime後會被回收。 * 若是當前工做線程數量workerCount大於核心線程數,也會在 * 線程等待任務超過keepAliveTime後回收線程。 */ boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; /* * 咱們先看進入此分支後會作的事,再分析如何進入這個分支。進入 * 此分支後,會用CAS減小workerCount的數量,成功則返回null。 * 不然continue從新開始新一輪的for循環。 * 如今,咱們來分析下進入此分支的邏輯: * 首先是wc > maximumPoolSize,通常workerCount不會大於 * maximumPoolSize,除非線程池運行期間經過 * setMaximumPoolSize(int maximumPoolSize)將線程池 * 的最大線程數改小。 * (timed && timedOut)在第一輪for循環永遠爲false,由於 * timedOut要爲true的條件,首先是timed爲true,即線程池內存在空閒後 * 可回收的線程,不論是線程池容許回收核心線程,或者線程數大於核心線程數。 * 只有在<4>獲取任務超時後workQueue返回null,纔有可能到達<5>處將 * timedOut賦值爲true,而且開始新一輪的循環。 * 以後的兩個判斷wc>1和workQueue.isEmpty(),判斷隊列爲空還好理解, * 爲何要判斷wc>1?首先咱們要知道maximumPoolSize必須大於等於1,當咱們 * 往線程池傳入的maximumPoolSize<=0會拋出異常。其次,若是咱們將<2>處改成: * (wc >= 1 || workQueue.isEmpty()),有可能出現線程池內只有一個線程, * 但任務隊列不爲null。依舊會進入此分支內部執行<3>的代碼以CAS的方式對 * workerCount-1,從而出現一個尷尬的狀況,任務隊列中有任務,但工做線程數 * 爲0。因此<2>處必須保證wc>1。 * 思考一種狀況:假設一個線程池核心線程數爲3,最大線程數爲5, * allowCoreThreadTimeOut爲false,線程池當前工做線程數量也爲5,5個線程 * 同時完成任務,並執行getTask()獲取任務,可想而知timed爲true,由於工做 * 線程數(5)大於核心線程數(3),5個工做線程都是調用<4>處workQueue.poll(...) * 等待任務,超時則返回null。若是超時時間到達,3個核心線程如何從新進入等待狀態, * 剩餘2個線程如何被回收? * 當5個線程超時返回後,會將timedOut賦值爲true,而後從新開始新一輪的for循環,一直 * 執行到此分支,此時(timed && timedOut)都爲true,隊列也都爲null,因此5個線程會 * 進入此分支。用CAS成功對workerCount-1的線程將被回收,失敗的線程則continue又開始 * 新一輪的for循環,直到wc<=corePoolSize,timed爲false,最後剩餘的工做線程數調用 * workQueue.take()無限期地等待任務的到來,除非線程被中斷。 */ if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {//<2> if (compareAndDecrementWorkerCount(c))//<3> return null; continue; } try { /* * workQueue.poll(...)和workQueue.take()都有可能使當前線程陷入 * 等待,直到返回任務,只不過前者相比後者多了一個超時時間,到達超時時間 * 若是有任務入隊,則r不爲null,直接返回任務。 * 線程等待期間,若是線程被中斷,則會拋出InterruptedException異常。 * 通常關閉線程池時,會嘗試中斷空閒線程,而處於等待任務的空閒線程會跳到<6>處, * 從新開始新一輪的for循環,而且在<1>處判斷線程池處於SHUTDOWN或者STOP, * 從而退出。 */ Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://<4> workQueue.take(); if (r != null) return r; /* * 只有等待任務超時控制流纔會執行到這裏,將超時標誌賦值爲true, * 從新開始新一輪的for循環。 */ timedOut = true;//<5> } catch (InterruptedException retry) { timedOut = false;//<6> } } }
在runWorker(Worker w)中有兩種方式能夠進到下面的處理線程退出processWorkerExit(Worker w, boolean completedAbruptly)方法:對象
上面兩種方式傳遞給processWorkerExit(...)的completedAbruptly是不一樣的,第一個方式傳入的completedAbruptly爲true,第二個方式爲false,雖然worker是同一個。那麼當completedAbruptly爲true或者false,processWorkerExit(...)的流程又是怎麼走的呢?
private void processWorkerExit(Worker w, boolean completedAbruptly) { /* * 從runWorker(Worker w)傳遞而來的變量,標誌worker是否意外完成, * 當worker執行任務時拋出異常,該變量爲true。若是是意外完成,則代表 * workerCount還沒有-1。若是worker獲取任務超時從而要讓線程被回收,在 * getTask()方法中會對workerCount-1。 */ if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { /* * 獲取可重入鎖後,將worker已完成的任務數加到線程池已完成任務數, * 並將worker從workers集合中移除。 */ completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } //嘗試終止線程。 tryTerminate(); int c = ctl.get(); //若是線程池運行狀態處於RUNNING或SHUTDOWN,則進入此分支。 if (runStateLessThan(c, STOP)) { //若是線程是正常退出,則進入此分支 if (!completedAbruptly) { /* * min爲線程池建立核心線程後,容許最小的核心線程數,若是 * allowCoreThreadTimeOut爲true則表明核心線程能夠被回收, * 則min爲0,不然min爲核心線程數量。 * 若是線程池容許回收線程,且隊列不爲空,則判斷在移除當前worker * 後,線程池工做線程的數量是否還大於等於1,避免出現隊列裏有任務 * 但沒有線程執行的狀況,若是工做線程數大於等於1,則退出線程線程。 * 不然調用addWorker(Runnable firstTask, boolean core) * 嘗試增長新的線程。 */ int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && !workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
最後,咱們還須要介紹關閉線程池以後作的操做。關閉線程池會修改線程池運行狀態,在advanceRunState(int targetState)會使用CAS自旋的方式,將線程池狀態修改成SHUTDOWN。以後調用interruptIdleWorkers()中斷空閒線程,這裏咱們看到中斷的時候調用worker的tryLock()和unlock()。Worker之因此繼承AQS就是爲了方便區分哪些worker正在執行任務,哪些worker處於空閒中,以便在關閉線程池時中斷全部空閒的worker。
/** * 調用此方法後再也不接受新任務,但會執行現有隊列任務。 * 若是調用此方法前該方法已被調用,則不會有任何效果。 * 該方法不會等待全部任務完成,須要調用: * awaitTermination(long timeout, TimeUnit unit) */ public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN);//設置線程池狀態爲SHUTDOWN interruptIdleWorkers();//中斷空閒線程 onShutdown(); //鉤子方法,按用戶須要實現。 } finally { mainLock.unlock(); } /* * 嘗試終止線程池,可能線程池有任務在執行,當前線程終止失敗。 * 但隨着工做線程逐個退出,最後一個工做線程將成功終止線程池。 */ tryTerminate(); } private void advanceRunState(int targetState) { for (; ; ) { int c = ctl.get(); /* * 若是線程池運行狀態>=targetState,則 * runStateAtLeast(c, targetState)爲true直接退出。 * 不然調用ctlOf(int rs, int wc),根據運行狀態和工做 * 線程數生成的值以CAS自旋的方式set進ctl。 */ if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } } //調用shutdown()會進而調用此方法,中斷空閒線程。 private void interruptIdleWorkers() { interruptIdleWorkers(false); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; /* * 若是線程未被中斷,則嘗試獲取worker的鎖,若是能 * 成功獲取,表明worker線程處於空閒中。 */ if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } //若是onlyOne爲true表明最多隻中斷一個線程 if (onlyOne) break; } } finally { mainLock.unlock(); } }
在中斷空閒線程後shutdown()還會調用tryTerminate(),若是查看tryTerminate()的引用,能夠發現不僅僅shutdown()有調用,像addWorkerFailed(Worker w)、processWorkerExit(...)……都有調用。這個方法旨在終止線程池,若是當前有線程關閉了線程池,線程池若是沒有存活線程,或者線程都處於空閒狀態,天然而然執行嘗試終止線程池方法;若是關閉線程池時,線程池仍然有線程處於執行任務狀態,沒法終止線程池,就要靠這些工做線程在退出時終止線程池。
final void tryTerminate() { for (; ; ) { int c = ctl.get(); /* * 若是線程池運行狀態處於RUNNING、TIDYING則退出, * 或者運行狀態小於STOP(即處於RUNNING、SHUTDOWN) * 且隊列不爲空,則退出。 * 總結一下,只有當運行狀態處於STOP時或者狀態處於SHUTDOWN * 但隊列爲空,纔不會進此分支。 */ if (isRunning(c) || runStateAtLeast(c, TIDYING) ||//<1> (runStateLessThan(c, STOP) && !workQueue.isEmpty())) return; //若是工做線程數不爲0,則嘗試最多中斷一個空閒線程後退出。 if (workerCountOf(c) != 0) { interruptIdleWorkers(ONLY_ONE); return; } /* * 在processWorkerExit(...)方法中若是worker是 * 異常退出會對workerCount-1,若是是正常退出,則 * workerCount在getTask()中-1。以後在processWorkerExit(...) * 中移除worker。 * 不論worker是正常退出仍是異常退出,終歸workerCount會慢慢回到0。 * 而processWorkerExit(...)中在對workerCount-1後,還會調用 * tryTerminate()。所以一個被關閉的線程池,它的最後一個線程,會 * 執行到此處,處理終止線程池的工做。 */ final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { /* * 用CAS設置線程池運行狀態爲TIDYING,可能存在多個線程同時 * 調用shutdown()後並依次執行到這一步(由於要得到可重入鎖), * 但只有一個線程能夠CAS成功,其餘線程CAS失敗後返回<1>處後 * 判斷運行狀態>=TIDYING則退出。 */ if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { //空函數,具體由用戶實現,主要用於終止線程池後的操做。 terminated(); } finally { //最後設置線程池的狀態爲TERMINATED ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } } }