上一篇中已經講了線程池的原理。這一次來講說源碼執行過程。建議先看看細說線程池---入門篇
細說線程池---中級篇java
依然使用newFixedThreadPool()方法建立線程池。併發
看源碼從execute(Runnable runable)開始。app
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); ///1.當前池中線程比核心數少,新建一個線程執行任務 //workerCountOf計算出線程個數 if (workerCountOf(c) < corePoolSize) { //線程池中建立一個線程worker if (addWorker(command, true)) return; c = ctl.get(); } 2.核心池已滿,但任務隊列未滿,添加到隊列中 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //任務成功添加到隊列之後,再次檢查是否須要添加新的線程, // 由於已存在的線程可能被銷燬了 if (! isRunning(recheck) && remove(command)) 若是線程池處於非運行狀態, // 而且把當前的任務從任務隊列中 //移除成功,則拒絕該任務 reject(command); 若是以前的線程已被銷燬完,新建一個線程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } 3.核心池已滿,隊列已滿,試着建立一個新線程 else if (!addWorker(command, false)) 若是建立新線程失敗了,說明線程池被關閉或者線程池徹底滿了, 拒絕任務 reject(command); }
ctl 的做用ide
在線程池中,ctl 貫穿在線程池的整個生命週期中函數
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING,0));
它是一個原子類,主要做用是用來保存線程數量和線程池的狀態。咱們來分析一下這段代碼,其實比較有意思,他用到了位運算一個 int 數值是 32 個 bit 位,這裏採用高 3 位來保存運行狀態,低 29 位來保存線程數量。咱們來分析默認狀況下,也就是 ctlOf(RUNNING)運行狀態,調用了 ctlOf(int rs,int wc)方法;其中oop
private static int ctlOf(int rs, int wc) { return rs | wc; }
其中 RUNNING =-1 << COUNT_BITS ;-1 左移 29 位,源碼分析
-1 的二進制是 32 個 1(1111 1111 11111111 1111 1111 1111 1111);
-1 的二進制計算方法原碼是 1000…001 . 高位 1 表示符號位。而後對原碼取反,高位不變獲得 1111…110而後對反碼進行+1 ,也就是補碼操做, 最後獲得 1111…1111ui
那麼-1 <<左移 29 位, 也就是 【111】 表示;rs | wc 。二進制的 111 | 000 。this
獲得的結果仍然是 111。spa
那麼同理可得其餘的狀態的 bit 位表示
//32-3private static final int COUNT_BITS = Integer.SIZE - 3;//將 1 的二進制向右位移 29 位,再減 1 表示最大線程容量private static final int CAPACITY = (1 << COUNT_BITS) - 1;// 運行狀態保存在 int 值的高 3 位 ( 全部數值左移 29 位 )// 接收新任務,並執行隊列中的任務private static final int RUNNING = -1 << COUNT_BITS;// 不接收新任務,可是執行隊列中的任務private static final int SHUTDOWN = 0 << COUNT_BITS;// 不接收新任務,不執行隊列中的任務,中斷正在執行中的任務private static final int STOP = 1 << COUNT_BITS;// 全部的任務都已結束,線程數量爲 0,處於該狀態的線程池即將調用 terminated()方法private static final int TIDYING = 2 << COUNT_BITS;// terminated()方法執行完成private static final int TERMINATED = 3 << COUNT_BITS;
線程池狀態變化圖
addWorker
再回到上面源碼中,當線程池中線程數小於核心線程數的時候:會調用 addWorker,顧名思義,其實就是要建立一個工做線程。咱們來看看源碼的實現源碼比較長,看起來比較唬人,其實就作了兩件事。
才用循環 CAS
操做來將線程數加 1
新建一個線程並啓用
private boolean addWorker(Runnable firstTask, boolean core) { //goto 語句,避免死循環 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); //若是線程處於非運行狀態,而且 rs 不等於 SHUTDOWN 且 firstTask 不等於空且且 // workQueue 爲空,直接返回 false (表示不可添加 work 狀態) // 1. 線程池已經 shutdown 後,還要添加新的任務,拒絕 // 2. (第二個判斷) SHUTDOWN 狀態不接受新任務,但仍然會執行已經加入任務隊列的任 // 務,因此當進入 SHUTDOWN 狀態,而傳進來的任務爲空,而且任務隊列不爲空的時候,是容許添加 // 新線程的 , 若是把這個條件取反,就表示不容許添加 worker if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //自旋 int wc = workerCountOf(c);//得到 Worker 工做線程數 //若是工做線程數大於默認容量大小或者大於核心線程數大小, // 則直接返回 false 表示不能再添加 worker。 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //經過 cas 來增長工做線程數, if (compareAndIncrementWorkerCount(c)) //若是 cas 失敗,則直接重試 break retry; // 再次獲取 ctl 的值 c = ctl.get(); //這裏若是不想等,說明線程的狀態發生了變化,繼續重試 if (runStateOf(c) != rs) continue retry; } } //上面這段代碼主要是對 worker 數量作原子+1 操做, // 下面的邏輯纔是正式構建一個 worker boolean workerStarted = false; //工做線程是否啓動的標識 boolean workerAdded = false; //工做線程是否已經添加成功的標識 Worker w = null; try { //構建一個 Worker,這個 worker 是什麼呢? //咱們 能夠看到構造方法裏面傳入了一個 Runnable 對象 w = new Worker(firstTask); final Thread t = w.thread; //從 worker 對象中取出線程 if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); //這裏有個重入鎖,避免併發問題 try { int rs = runStateOf(ctl.get()); //只有當前線程池是正在運行狀態,[或是 SHUTDOWN // 且 firstTask 爲空],才能添加到 workers 集合中 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //任務剛封裝到 work 裏面,還沒 start,你封裝的線程就是 alive, // 幾個意思?確定是要拋異常出去的 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); //將新建立的 Worker 添加到 workers 集合中 int s = workers.size(); //若是集合中的工做線程數大於最大線程數, // 這個最大線程數表示線程池曾經出現過的最大線程數 if (s > largestPoolSize) largestPoolSize = s; //更新線程池出現過的最大線程數 workerAdded = true;//表示工做線程建立成功了 } } finally { mainLock.unlock(); //釋放鎖 } if (workerAdded) {//若是 worker 添加成功 t.start();//啓動線程 workerStarted = true; } } } finally { if (! workerStarted) //若是添加失敗,就須要作一件事,就是遞減實際工做線 //程數(還記得咱們最開始的時候增長了工做線程數嗎) addWorkerFailed(w); } //返回結果 return workerStarted; }
Worker說明
咱們發現 addWorker 方法只是構造了一個 Worker,而且把 firstTask 封裝到 worker 中,它是作什麼的呢?咱們來看看
每一個 worker,都是一條線程,同時裏面包含了一個 firstTask,即初始化時要被首先執行的任務.
最終執行任務的,是 runWorker()方法Worker 類繼承了 AQS,並實現了 Runnable 接口,注意其中的 firstTask 和 thread 屬性:firstTask 用它來保存傳入的任務;thread 是在調用構造方法時經過 ThreadFactory 來建立的線程,是用來處理任務的線程。
在調用構造方法時,須要傳入任務,這裏經過 getThreadFactory().newThread(this);來新建一個線程,newThread 方法傳入的參數是 this,由於 Worker 自己繼承了 Runnable 接口,也就是一個線程,因此一個 Worker 對象在啓動的時候會調用 Worker 類中的 run 方法。Worker 繼承了 AQS,使用 AQS 來實現獨佔鎖的功能。爲何不使用 ReentrantLock 來實現呢?能夠看到 tryAcquire 方法,它是不容許重入的,而 ReentrantLock 是容許重入的:lock 方法一旦獲取了獨佔鎖,表示當前線程正在執行任務中;那麼它會有如下幾個做用
若是正在執行任務,則不該該中斷線程;
若是該線程如今不是獨佔鎖的狀態,也就是空閒的狀態,說明它沒有在處理任務,這時能夠對該線程進行中斷;
線程池在執行 shutdown 方法或 tryTerminate 方法時會調用 interruptIdleWorkers 方法來中斷空閒的線程,interruptIdleWorkers 方法會使用 tryLock 方法來判斷線程池中的線程是不是空閒狀態
之因此設置爲不可重入,是由於咱們不但願任務在調用像 setCorePoolSize 這樣的線程池控制方法時從新獲取鎖,這樣會中斷正在運行的線程
//繼承了AQS還實現了Runnableprivate final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; //注意了,這纔是真正執行task的線程,從構造函數可知是由 //ThreadFactury 建立的 final Thread thread; //這就是須要執行的 task Runnable firstTask; /** 每一個線程完成任務的計數器*/ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { 初始狀態 -1, 防止在調用 runWorker() , // 也就是真正執行task前中斷thread 。 setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
addWorkerFailed
方法
addWorker
方法中,若是添加 Worker 而且啓動線程失敗,則會作失敗後的處理。這個方法主要作兩件事
若是 worker 已經構造好了,則從 workers 集合中移除這個 worker
原子遞減核心線程數(由於在 addWorker 方法中先作了原子增長)
嘗試結束線程池
private void addWorkerFailed(java.util.concurrent.ThreadPoolExecutor.Worker w) { final ReentrantLock mainLock = this.mainLock; //上鎖 mainLock.lock(); try { if (w != null) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { //釋放鎖 mainLock.unlock(); } }
runWorker 方法
前面已經瞭解了 ThreadPoolExecutor
的核心方法 addWorker
,主要做用是增長工做線程,而 Worker 簡單理解其實就是一個線程,裏面從新了 run 方法,這塊是線程池中執行任務的真正處理邏輯,也就是 runWorker
方法,這個方法主要作幾件事:
若是 task 不爲空,則開始執行 task
若是 task 爲空,則經過 getTask()
再去取任務,並賦值給 task,若是取到的 Runnable 不爲空,則執行該任務
執行完畢後,經過 while 循環繼續 getTask()
取任務
若是 getTask()
取到的任務依然是空,那麼整個runWorker()
方法執行完畢