前面咱們在java線程池ThreadPoolExecutor類使用詳解中對ThreadPoolExector線程池類的使用進行了詳細闡述,這篇文章咱們對其具體的源碼進行一下分析和總結;
html
首先咱們看下ThreadPoolExecutor用來表示線程池狀態的核心變量java
//用來標記線程池狀態(高3位),線程個數(低29位) //默認是RUNNING狀態,線程個數爲0 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //線程個數掩碼位數 private static final int COUNT_BITS = Integer.SIZE - 3; //線程最大個數(低29位)00011111111111111111111111111111 private static final int CAPACITY = (1 << COUNT_BITS) - 1; //(高3位):11100000000000000000000000000000 //接受新任務而且處理阻塞隊列裏的任務 private static final int RUNNING = -1 << COUNT_BITS; //(高3位):00000000000000000000000000000000 //拒絕新任務可是處理阻塞隊列裏的任務 private static final int SHUTDOWN = 0 << COUNT_BITS; //(高3位):00100000000000000000000000000000 //拒絕新任務而且拋棄阻塞隊列裏的任務同時會中斷正在處理的任務 private static final int STOP = 1 << COUNT_BITS; //(高3位):01000000000000000000000000000000 //全部任務都執行完(包含阻塞隊列裏面任務)當前線程池活動線程爲0,將要調用terminated方法 private static final int TIDYING = 2 << COUNT_BITS; //(高3位):01100000000000000000000000000000 //終止狀態。terminated方法調用完成之後的狀態 private static final int TERMINATED = 3 << COUNT_BITS;// // 獲取高三位 運行狀態 private static int runStateOf(int c) { return c & ~CAPACITY; } //獲取低29位 線程個數 private static int workerCountOf(int c) { return c & CAPACITY; } //計算ctl新值,線程狀態 與 線程個數 private static int ctlOf(int rs, int wc) { return rs | wc; }
ThreadPoolExecutor核心函數less
//提交任務函數 void execute(Runnable command) //執行拒絕策略的函數 void reject(Runnable command) //新增WOKER線程函數 boolean addWorker(Runnable firstTask, boolean core) //WOKER線程執行函數 void runWorker(Worker w) //獲取執行任務函數 Runnable getTask() //線程池中止函數 void shutdown() //線程池當即中止函數 void shutdownNow()
接下來咱們圍繞這幾個核心函數對ThreadPoolExector線程池類的執行流程和源碼進行分享ide
execute 作爲ThreadPoolExector的提交任務的函數,註解上已經說明了其實現的主要三步操做:函數
一、若是運行的線程小於corePoolSize,則嘗試使用用戶定義的Runnalbe對象建立一個新的線程,調用addWorker函數會原子性的檢查runState和workCount,經過返回false來防止在不該該添加線程時添加了線程。
2. 若是一個任務可以成功入隊列,在添加一個線城時仍須要進行雙重檢查(由於在前一次檢查後該線程死亡了),或者當進入到此方法時,線程池已經shutdown了,因此須要再次檢查狀態,如有必要,當中止時還須要回滾入隊列操做,或者當線程池沒有線程時須要建立一個新線程。
3. 若是沒法入隊列,那麼須要增長一個新線程,若是此操做失敗,那麼就意味着線程池已經shutdown或者已經飽和了,因此拒絕任務
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. 若是運行的線程小於corePoolSize,則嘗試使用用戶定義的Runnalbe對象建立一個新的線程, 調用addWorker函數會原子性的檢查runState和workCount, 經過返回false來防止在不該該添加線程時添加了線程。 * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. 若是一個任務可以成功入隊列,在添加一個線城時仍須要進行雙重檢查(由於在前一次檢查後該線程死亡了), 或者當進入到此方法時,線程池已經shutdown了,因此須要再次檢查狀態, 如有必要,當中止時還須要回滾入隊列操做, 或者當線程池沒有線程時須要建立一個新線程。 * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. 若是沒法入隊列,那麼須要增長一個新線程, 若是此操做失敗,那麼就意味着線程池已經shutdown或者已經飽和了,因此拒絕任務 */ //獲取線程池的運行狀態 int c = ctl.get(); //檢查覈心線程數量 if (workerCountOf(c) < corePoolSize) { //若是小於corePoolSize 則執行addWorker函數 if (addWorker(command, true)) return; c = ctl.get(); } //判斷當前線程是否處於Running狀態其任務是否其任務是否能夠繼續加入workQueue隊列(有界仍是無界任務隊列) if (isRunning(c) && workQueue.offer(command)) { //若是知足條件,則再次進行狀態檢查 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) //若是線程池已經不是Running狀態,在隊列中移除任務,切執行絕交策略 reject(command); else if (workerCountOf(recheck) == 0) //若是worker數量是0,則添加worker addWorker(null, false); } else if (!addWorker(command, false)) //添加worker失敗,執行拒絕策略 reject(command); }
經過上面的代碼咱們能夠看到execute函數自己並不執行提交的Runnable任務 主要做用是根據固然線程池的狀態,選擇執行addWorker函數仍是執行reject拒絕策略
final void reject(Runnable command) { handler.rejectedExecution(command, this); }
handler爲RejectedExecutionHandler接口的具體實現,執行相應的拒絕策略
下面代碼爲各拒絕策略的具體實現
//若是線程池的線程數量達到上限,該策略會把任務隊列中的任務放在調用者線程當中運行; public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { //若是線程池仍在運行狀態,執行Runnable的run方法 r.run(); } } } //該策略會直接拋出異常,阻止系統正常工做; public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { } /** * 直接拋出RejectedExecutionException異常 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } } //靜默策略,不予任何處理。 public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}. */ public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } //該策略會丟棄任務隊列中最老的一個任務,也就是當前任務隊列中最早被添加進去,立刻要被執行的那個任務,並嘗試再次提交; public static class DiscardOldestPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardOldestPolicy} for the given executor. */ public DiscardOldestPolicy() { } /** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { //直接移除任務隊列中第一個任務 e.getQueue().poll(); //再次提交 e.execute(r); } } }
addworker函數主要完成了兩部分功能:oop
一、經過cas的方式檢查線程池狀態與當前線程數量,若是符合條件 增長線程個數;post
二、若是上一部分cas檢查成功,線程數已經加一,那麼建立Worker對象並綁定經過線程工廠分配與啓動線程;ui
咱們看下具體的代碼this
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { //獲取線程池狀態 int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && // 線程池狀態大於等於SHUTDOWN,初始的ctl爲RUNNING,小於SHUTDOWN ! (rs == SHUTDOWN && //線程池狀態等於SHUTDOWN firstTask == null && //傳入的任務爲空 ! workQueue.isEmpty())) //workQueue隊列不爲空 return false; for (;;) { int wc = workerCountOf(c); //獲取當前的worker線程數量 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) //若是當前worker數量大於最大容量或大於設置的最大線程數,返回false return false; if (compareAndIncrementWorkerCount(c))//cas的方式增長woker線程數量 //cas增長線程數量成功,跳出循序 break retry; //cas失敗了,則看線程池狀態是否變化了 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) //若是發生變化則跳到外層循環從新獲取線程池狀態,不然內層循環從新cas。 continue retry; // else CAS failed due to workerCount change; retry inner loop } } //到這裏表明CAS成功了,也就是說線程個數加一了,可是如今任務還沒開始執行 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //建立一個worker對象,並經過線程工廠建立線程 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock;//加鎖,保證添加worker動做的同步,由於同一時間可能會有多個線程操做 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. //從新檢查,保證線程狀態正常沒有關閉 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable //若是線程池狀態異常,拋出異常 throw new IllegalThreadStateException(); //添加worker workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //添加worker成功,啓動線程 t.start(); workerStarted = true; } } } finally { if (!workerStarted) //若是添加失敗,作失敗處理 addWorkerFailed(w); } return workerStarted; }
經過上面的代碼能夠看到,線程池新增的線程最終會封裝爲一個Worker對象,這個對象會經過輪詢的方式不斷從任務隊列中獲取任務,並經過其綁定的線程執行,咱們看下Worker類的具體內容atom
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ //構造函數,綁定任務並經過線程工廠建立線程 Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ //run方法執行runWorker函數 public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } // protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { //線程中斷 t.interrupt(); } catch (SecurityException ignore) { } } } }
在Woker類的構造函數中經過線程工廠建立了線程,當線程start時就開始執行runWorker函數。
runWork函數主要實現的功能就是while輪詢方式經過getTask函數獲取執行任務,咱們來看下具體的代碼分析
final void runWorker(Worker w) { //獲取當前線程 Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //輪詢,經過getTask()獲取執行的任務 while (task != null || (task = getTask()) != null) { w.lock(); //(若是線程池至少是stop狀態 或 (線程池至少是stop狀態且線程是否處於中斷))且wt線程是否處於中斷 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run();//執行任務 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
若是沒有獲取到task任務的話,執行processWorkerExit函數
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 若是completedAbruptly值爲true,則說明線程執行時出現了異常,須要將workerCount減1; // 若是線程執行時沒有出現異常,說明在getTask()方法中已經已經對workerCount進行了減1操做,這裏就沒必要再減了。 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //統計完成的任務數 completedTaskCount += w.completedTasks; // 從workers中移除,也就表示着從線程池中移除了一個工做線程 workers.remove(w); } finally { mainLock.unlock(); } // 根據線程池狀態進行判斷是否結束線程池 tryTerminate(); int c = ctl.get(); /* * 當線程池是RUNNING或SHUTDOWN狀態時,若是worker是異常結束,那麼會直接addWorker; * 若是allowCoreThreadTimeOut=true,而且等待隊列有任務,至少保留一個worker; * 若是allowCoreThreadTimeOut=false,workerCount很多於corePoolSize。 */ if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
processWorkerExit執行完以後,工做線程被銷燬,以上就是整個工做線程的生命週期,從execute方法開始,Worker使用ThreadFactory建立新的工做線程,runWorker經過getTask獲取任務,而後執行任務,若是getTask返回null,進入processWorkerExit方法 而其中tryTerminate()函數的主要做用就是判斷是否有空閒線程,並設置中斷;
final void tryTerminate() { for (;;) { int c = ctl.get(); /* * 當前線程池的狀態爲如下幾種狀況時,直接返回: * 1. RUNNING,由於還在運行中,不能中止; * 2. TIDYING或TERMINATED,由於線程池中已經沒有正在運行的線程了; * 3. SHUTDOWN而且等待隊列非空,這時要執行完workQueue中的task; */ if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 若是線程數量不爲0,則中斷一個空閒的工做線程,並返回 if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 這裏嘗試設置狀態爲TIDYING,若是設置成功,則調用terminated方法 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // terminated方法默認什麼都不作,留給子類實現 terminated(); } finally { // 設置狀態爲TERMINATED ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
getTask函數經過workQueue.take()獲取任務
時,若是不執行中斷會一直阻塞。在下面介紹的shutdown方法中,會中斷全部空閒的工做線程,若是在執行shutdown時工做線程沒有空閒,而後又去調用了getTask方法,這時若是workQueue中沒有任務了,調用workQueue.take()
時就會一直阻塞。因此每次在工做線程結束時調用tryTerminate方法來嘗試中斷一個空閒工做線程,避免在隊列爲空時取任務一直阻塞的狀況。
getTask函數主要完成了三塊功能:
一、檢查線程池及隊列任務狀態;
二、根據maximumPoolSize、超時時間和隊列任務,控制線程數量;
三、知足條件從workQueue隊列中獲取任務;
private Runnable getTask() { //超時標誌 boolean timedOut = false; // Did the last poll() time out? for (;;) { //獲取線程池狀態 int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { //若是線程池狀態異常或任務隊列爲空,返回NULL decrementWorkerCount(); return null; } //獲取當前線程數量 int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; /* * wc > maximumPoolSize的狀況是由於可能在此方法執行階段同時執行了setMaximumPoolSize方法; * timed && timedOut 若是爲true,表示當前操做須要進行超時控制,而且上次從阻塞隊列中獲取任務發生了超時 * 接下來判斷,若是有效線程數量大於1,或者阻塞隊列是空的,那麼嘗試將workerCount減1; * 若是減1失敗,則返回重試。 * 若是wc == 1時,也就說明當前線程是線程池中惟一的一個線程了。 */ if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //取出任務 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
shutdown函數是用來中止線程池的,調用shutdown後,線程池就不會在接受新的任務了,可是工做隊列裏面的任務仍是要執行的,可是該方法馬上返回的,並不等待隊列任務完成在返回。
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock();//加鎖 try { //檢查是否容許Shutdown checkShutdownAccess(); //設置線程池狀態爲SHUTDOWN advanceRunState(SHUTDOWN); //中斷當前空閒的woker線程 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
調用shutdownNow函數後,會當即中止線程池,並丟棄和返回任務隊列中的任務
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //檢查是否容許Shutdown checkShutdownAccess(); //設置線程池狀態爲STOP advanceRunState(STOP); //中斷當前空閒的woker線程 interruptWorkers(); //獲取當前隊列中的任務 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
下面仔細分析一下:
workQueue.take()
進行阻塞;workQueue.take()
後會一直阻塞而不會被銷燬,由於在SHUTDOWN狀態下不容許再有新的任務添加到workQueue中,這樣一來線程池永遠都關閉不了了;workQueue.take()
時,若是發現當前線程在執行以前或者執行期間是中斷狀態,則會拋出InterruptedException,解除阻塞的狀態;下面就來分析一下interruptIdleWorkers方法。
private void interruptIdleWorkers() { interruptIdleWorkers(false); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
interruptIdleWorkers遍歷workers中全部的工做線程,若線程沒有被中斷tryLock成功,就中斷該線程。
本文對線程池的源碼進行了基本的分析與總結,大致歸納爲如下幾點:一、線程池經過CAS的方式記錄自己的運行狀態和線程池線程個數二、每一個線程都會被封裝爲一個WOKER線程對象,每一個worker線程能夠處理多個任務;三、線程池執行流程中要經過自身的狀態來判斷應該結束工做線程仍是阻塞線程等待新的任務,也解釋了爲何關閉線程池時要中斷工做線程以及爲何每個worker都須要lock。