在ThreadPoolExecutor的屬性定義中頻繁地用位移運算來表示線程池狀態,位移運算是改變當前值的一種高效手段,包括左移和右移。下面從屬性定義開始閱讀ThreadPoolExecutor的源碼。併發
1 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 2 //Integer 共有32位,最右邊29位表示工做線程數,最左邊3位表示線程池狀態。 3 //注:簡單的說,3個二進制位能夠表示從0-7的8個不一樣的數值(第1處) 4 private static final int COUNT_BITS = Integer.SIZE - 3; 5 //000-11111111111111111111111111111,相似於子網掩碼,用於位的與運算 6 //獲得最左邊的3位,仍是右邊的29位 7 private static final int CAPACITY = (1 << COUNT_BITS) - 1; 8 9 // runState is stored in the high-order bits 10 //用左邊3位,實現5種線程池狀態(在左邊3位以後加入中畫線有助於理解) 11 //111-00000000000000000000000000000,十進制值:-563,870,912 12 //此狀態即是線程池可以接受的新任務 13 private static final int RUNNING = -1 << COUNT_BITS; 14 15 //000-00000000000000000000000000000,十進制值:0 16 //此狀態再也不接受新任務,但能夠繼續執行隊列中的任務 17 private static final int SHUTDOWN = 0 << COUNT_BITS; 18 19 //001-00000000000000000000000000000,十進制值:563,870,912 20 //此狀態全面拒絕,並終止正在處理的任務 21 private static final int STOP = 1 << COUNT_BITS; 22 23 //010-00000000000000000000000000000,十進制值:1073,741,824 24 //此狀態表示全部的任務已經被終止 25 private static final int TIDYING = 2 << COUNT_BITS; 26 27 //011-00000000000000000000000000000 28 //此狀態表示已清理完現場 29 private static final int TERMINATED = 3 << COUNT_BITS; 30 31 // Packing and unpacking ctl 32 // 與運算,好比 001-00000000000000000000000100011,表示67個工做線程 33 // 掩碼取反 111-00000000000000000000000000000,即獲得最左邊3位 001 34 // 表示線程處於stop 狀態 35 private static int runStateOf(int c) { return c & ~CAPACITY; } 36 //同理掩碼 000-11111111111111111111111111111,獲得右邊29位,即工做線程數 37 private static int workerCountOf(int c) { return c & CAPACITY; } 38 //把左邊3位與右邊29位按或運算,合併成一個值 39 private static int ctlOf(int rs, int wc) { return rs | wc; }
第一處說明,線程池狀態用高3位表示,其中包括了符號位。五種狀態的十進制值按小道大依次排序爲:oop
RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED源碼分析
這樣設計的好處是能夠經過比較值的大小來肯定線程池的狀態,例如程序中常常出現isRuning的判斷。ui
private static boolean isRunning(int c) { return c < SHUTDOWN; }
咱們都知道Exexutor 接口有且只有一個方法execute,經過參數傳入待執行線程對象。下面分析ThreadPoolExecutor關於execute的實現。this
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //返回包含線程數及線程池狀態的Integer類型數值 int c = ctl.get(); //若是工做線程數小於核心線程數,則建立線程任務並執行 if (workerCountOf(c) < corePoolSize) { //addWorker 是另外一個極爲重要的方法,見下一段源碼分析(第1處) if (addWorker(command, true)) return; //若是建立失敗,防止外部已經在線程池中加入新任務,從新獲取一下 c = ctl.get(); } // 只有線程池處於RUNNING 狀態,才執行後半句:置入隊列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 若是線程池不是 RUNNING 狀態 則將剛加入隊列的任務移除 if (! isRunning(recheck) && remove(command)) reject(command); //若是以前的線程已經被消費完,新建一個線程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //核心池和隊列都已經滿,嘗試建立一個新線程 else if (!addWorker(command, false)) //若是addWorker返回false,即建立失敗,則喚醒拒絕策略(第2處) reject(command); }
第2處 發生拒絕的緣由有兩個(1)線程池狀態非Runing (2)等待隊列已滿。spa
下面繼續分析addWorker 源碼。線程
/** * 根據當前線程池狀態,檢查是否能夠添加新的任務線程,若是能夠則建立並啓動任務 * 若是一切正常則返回true。返回false 的可能以下: * 1.線程池沒有處於RUNNING狀態 * 2.線程工程建立新的任務線程失敗 * @param firstTask 外部啓動線程池時須要構造的第一個線程,它是線程的母體 * @param core 新增工做線程時的判斷指標,解釋以下 * true:表示新增線程時,須要判斷當前RUNNING 狀態的線程是否少於corePoolsize * false:表示新增線程時,須要判斷當前RUNNING 狀態的線程是否少於maxmemPoolsize * @return */ private boolean addWorker(Runnable firstTask, boolean core) { //不須要任務預約義的語法標籤,響應下文的continue retry.快速退出多層嵌套循環(第1處) retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 當前線程池狀態 // Check if queue empty only if necessary. // 這條語句等價:rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null ||workQueue.isEmpty()) // 知足下列條件則直接返回false,線程建立失敗: // rs > SHUTDOWN:STOP || TIDYING || TERMINATED 此時再也不接受新的任務,且全部任務執行結束 // rs = SHUTDOWN:firtTask != null 此時再也不接受任務,可是仍然會執行隊列中的任務 // rs = SHUTDOWN:firtTask == null見execute方法的addWorker(null, false),任務爲null && 隊列爲空 // 最後一種狀況也就是說SHUTDONW狀態下,若是隊列不爲空還得接着往下執行,爲何?add一個null任務目的究竟是什麼? // 看execute方法只有workCount==0的時候firstTask纔會爲null結合這裏的條件就是線程池SHUTDOWN了再也不接受新任務 // 可是此時隊列不爲空,那麼還得建立線程把任務給執行完才行。 //第2處 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // 走到這的情形: // 1.線程池狀態爲RUNNING // 2.SHUTDOWN狀態,但隊列中還有任務須要執行 for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) // 最大線程數不能超過2^29,不然影響左邊3位的線程池狀態值 return false; //(第3處) if (compareAndIncrementWorkerCount(c)) //當前活動線程數+1(第3處) 原子操做遞增workCount break retry; //操做成功跳出重試循環 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) //若是線程池的狀態發生變化則重試(第5處) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //開始建立工做線程 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //利用worker構造方法中的線程池工廠建立線程,並封裝成工做線程worker對象 w = new Worker(firstTask); //注意這是worker 中屬性對象thread (第6處) final Thread t = w.thread; if (t != null) { //對ThreadExecutor 的敏感操做時,都須要持有主鎖,避免在添加和啓動線程時被幹擾。 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); //整個線程池在運行期間的最大併發線程數 if (s > largestPoolSize) largestPoolSize = s;// 更新largestPoolSize workerAdded = true; } } finally { mainLock.unlock(); } // 啓動新添加的線程,這個線程首先執行firstTask,而後不停的從隊列中取任務執行 // 當等待keepAlieTime尚未任務執行則該線程結束。見runWoker和getTask方法的代碼。 if (workerAdded) { t.start(); //最終執行的是ThreadPoolExecutor的runWoker方法 並不是線程池execute 的command參數指向的線程 workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }