/* 這個是用一個int來表示workerCount和runState的,其中runState佔int的高3位, 其它29位爲workerCount的值。 workerCount:當前活動的線程數; runState:線程池的當前狀態。 用AtomicInteger是由於其在併發下使用compareAndSet效率很是高; 當改變當前活動的線程數時只對低29位操做,如每次加一減一,workerCount的值變了, 但不會影響高3位的runState的值。當改變當前狀態的時候,只對高3位操做,不會改變低29位的計數值。 這裏有一個假設,就是當前活動的線程數不會超過29位能表示的值,即不會超過536870911, 就目前以及可預見的很長一段時間來說,這個值是足夠用了 */ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //COUNT_BITS,就是用來表示workerCount佔用一個int的位數,其值爲前面說的29 private static final int COUNT_BITS = Integer.SIZE - 3; /* CAPACITY爲29位能表示的最大容量,即workerCount實際能用的最大值。 其值的二進制爲:00011111111111111111111111111111(佔29位,29個1) */ private static final int CAPACITY = (1 << COUNT_BITS) - 1; /* 如下常量是線程池的狀態,狀態存儲在int的高3位,因此要左移29位。 騰出的低29位來表示workerCount 注意,這5個狀態是有大小關係的。RUNNING<shutdown<stop<tidying<terminated 當須要判斷多個狀態時,只須要用<或="">來判斷就能夠了 */ /* RUNNING的含義:線程池能接受新任務,而且能夠運行隊列中的任務 -1的二進制爲32個1,移位後爲:11100000000000000000000000000000 */ private static final int RUNNING = -1 << COUNT_BITS; /* SHUTDOWN的含義:再也不接受新任務,但仍能夠執行隊列中的任務 0的二進制爲32個0,移位後仍是全0 */ private static final int SHUTDOWN = 0 << COUNT_BITS; /* STOP的含義:再也不接受新任務,再也不執行隊列中的任務,並且要中斷正在處理的任務 1的二進制爲前面31個0,最後一個1,移位後爲:00100000000000000000000000000000 */ private static final int STOP = 1 << COUNT_BITS; /* TIDYING的含義:全部任務均已終止,workerCount的值爲0, 轉到TIDYING狀態的線程即將要執行terminated()鉤子方法. 2的二進制爲00000000000000000000000000000010 移位後01000000000000000000000000000000 */ private static final int TIDYING = 2 << COUNT_BITS; /* TERMINATED的含義:terminated()方法執行結束. 3的二進制爲00000000000000000000000000000011 移位後01100000000000000000000000000000 */ private static final int TERMINATED = 3 << COUNT_BITS; 各狀態之間可能的轉變有如下幾種: RUNNING -> SHUTDOWN 調用了shutdown方法,線程池實現了finalize方法,在裏面調用了shutdown方法,所以shutdown多是在finalize中被隱式調用的 (RUNNING or SHUTDOWN) -> STOP 調用了shutdownNow方法 SHUTDOWN -> TIDYING 當隊列和線程池均爲空的時候 STOP -> TIDYING 當線程池爲空的時候 TIDYING -> TERMINATED terminated()鉤子方法調用完畢 /* 傳入的參數爲存儲runState和workerCount的int值,這個方法用於取出runState的值。 ~爲按位取反操做,~CAPACITY值爲:11100000000000000000000000000000, 再同參數作&操做,就將低29位置0了,而高3位仍是保持原先的值,也就是runState的值 */ private static int runStateOf(int c) { return c & ~CAPACITY; } /* 傳入的參數爲存儲runState和workerCount的int值,這個方法用於取出workerCount的值。 由於CAPACITY值爲:00011111111111111111111111111111,因此&操做將參數的高3位置0了, 保留參數的低29位,也就是workerCount的值。 */ private static int workerCountOf(int c) { return c & CAPACITY; } /* 將runState和workerCount存到同一個int中,這裏的rs就是runState, 是已經移位過的值,填充返回值的高3位,wc填充返回值的低29位 */ private static int ctlOf(int rs, int wc) { return rs | wc; }
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ /* *分三步進行: * * 1.若是少於corePoolSize線程正在運行,請嘗試 *用給定的命令做爲第一個啓動一個新的線程 *任務。 對addWorker的調用會自動檢查runState和 * workerCount,從而防止將添加的錯誤警報 *線程,當它不該該經過返回false。 * * 2.若是任務能夠成功排隊,那麼咱們仍然須要 *再次檢查咱們是否應該添加一個線程 *(由於現有的自上次檢查以來死亡)或者那個 自從進入這個方法以來,池關閉了。 因此咱們 *從新檢查狀態,若是有必要的話回滾入隊 *中止,或者若是沒有的話,開始一個新的線程。 * * 3.若是咱們不能排隊任務,那麼咱們嘗試添加一個新的 *線程。 若是失敗了,咱們知道咱們已經關閉了,或者已經飽和了 *所以拒絕任務。 */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
處理過程: 1.活動線程數< corePoolSize 小於核心線程數時,直接啓動新的線程,而且添加到工做線程中。(addWorker true時,會從新檢查workerCount的值) 2.活動線程數 >corePoolSize 時,若是是運行時狀態,而且隊列未滿,添加到隊列中, 須要再次檢查狀態,1.不是running,而且移出失敗,則拒絕任務。2。處於RUNNing狀態,或者移出任務失敗的時候,若是沒有活動線程,添加一個空的任務,表示不在接受新的任務。3.隱藏的狀況,若是是運行狀態,而且能夠移出成功,則正常執行。 3.不是運行狀態,而且隊列已滿,啓動新的線程失敗,則拒絕任務。併發
/** * Checks if a new worker can be added with respect to current * pool state and the given bound (either core or maximum). If so, * the worker count is adjusted accordingly, and, if possible, a * new worker is created and started, running firstTask as its * first task. This method returns false if the pool is stopped or * eligible to shut down. It also returns false if the thread * factory fails to create a thread when asked. If the thread * creation fails, either due to the thread factory returning * null, or due to an exception (typically OutOfMemoryError in * Thread.start()), we roll back cleanly. * * @param firstTask the task the new thread should run first (or * null if none). Workers are created with an initial first task * (in method execute()) to bypass queuing when there are fewer * than corePoolSize threads (in which case we always start one), * or when the queue is full (in which case we must bypass queue). * Initially idle threads are usually created via * prestartCoreThread or to replace other dying workers. * * @param core if true use corePoolSize as bound, else * maximumPoolSize. (A boolean indicator is used here rather than a * value to ensure reads of fresh values after checking other pool * state). * @return true if successful */ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
addWorker有兩個參數:Runnable類型的firstTask,用於指定新增的線程執行的第一個任務;boolean類型的core,true表示在新增線程時會判斷當前活動線程數是否少於corePoolSize,false表示新增線程前須要判斷當前活動線程數是否少於maximumPoolSize。函數
該方法的返回值表明是否成功新增一個線程。oop
// Check if queue empty only if necessary. // 這條語句等價:rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || // workQueue.isEmpty()) // 知足下列調價則直接返回false,線程建立失敗: // rs > SHUTDOWN:STOP || TIDYING || TERMINATED 此時再也不接受新的任務,且全部任務執行結束 // rs = SHUTDOWN:firtTask != null 此時再也不接受任務,可是仍然會執行隊列中的任務 // rs = SHUTDOWN:firtTask == null見execute方法的addWorker(null, // false),任務爲null && 隊列爲空 // 最後一種狀況也就是說SHUTDONW狀態下,若是隊列不爲空還得接着往下執行,爲何?add一個null任務目的究竟是什麼? // 看execute方法只有workCount==0的時候firstTask纔會爲null結合這裏的條件就是線程池SHUTDOWN了再也不接受新任務 // 可是此時隊列不爲空,那麼還得建立線程把任務給執行完才行。 if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) // 等價實現 rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()),
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c);// 當前線程池狀態 // Check if queue empty only if necessary. // 這條語句等價:rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || // workQueue.isEmpty()) // 知足下列調價則直接返回false,線程建立失敗: // rs > SHUTDOWN:STOP || TIDYING || TERMINATED 此時再也不接受新的任務,且全部任務執行結束 // rs = SHUTDOWN:firtTask != null 此時再也不接受任務,可是仍然會執行隊列中的任務 // rs = SHUTDOWN:firtTask == null見execute方法的addWorker(null, // false),任務爲null && 隊列爲空 // 最後一種狀況也就是說SHUTDONW狀態下,若是隊列不爲空還得接着往下執行,爲何?add一個null任務目的究竟是什麼? // 看execute方法只有workCount==0的時候firstTask纔會爲null結合這裏的條件就是線程池SHUTDOWN了再也不接受新任務 // 可是此時隊列不爲空,那麼還得建立線程把任務給執行完才行。 if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false; // 走到這的情形: // 1.線程池狀態爲RUNNING // 2.SHUTDOWN狀態,但隊列中還有任務須要執行 for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c))// 原子操做遞增workCount break retry;// 操做成功跳出的重試的循環 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs)// 若是線程池的狀態發生變化則重試 continue retry; // else CAS failed due to workerCount change; retry inner loop } } // wokerCount遞增成功 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 併發的訪問線程池workers對象必須加鎖 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c); // RUNNING狀態 || SHUTDONW狀態下清理隊列中剩餘的任務 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 將新啓動的線程添加到線程池中 workers.add(w); // 更新largestPoolSize int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 啓動新添加的線程,這個線程首先執行firstTask,而後不停的從隊列中取任務執行 // 當等待keepAlieTime尚未任務執行則該線程結束。見runWoker和getTask方法的代碼。 if (workerAdded) { t.start();// 最終執行的是ThreadPoolExecutor的runWoker方法 workerStarted = true; } } } finally { // 線程啓動失敗,則從wokers中移除w並遞減wokerCount if (!workerStarted) // 遞減wokerCount會觸發tryTerminate方法 addWorkerFailed(w); } return workerStarted; }
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // Worker的構造函數中抑制了線程中斷setState(-1),因此這裏須要unlock從而容許中斷 w.unlock(); // 用於標識是否異常終止,finally中processWorkerExit的方法會有不一樣邏輯 // 爲true的狀況:1.執行任務拋出異常;2.被中斷。 boolean completedAbruptly = true; try { // 若是getTask返回null那麼getTask中會將workerCount遞減,若是異常了這個遞減操做會在processWorkerExit中處理 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 任務執行前能夠插入一些處理,子類重載該方法 beforeExecute(wt, task); Throwable thrown = null; try { task.run();// 執行用戶任務 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 和beforeExecute同樣,留給子類去重載 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 結束線程的一些清理工做 processWorkerExit(w, completedAbruptly); } }