一塊兒共讀,共同閱步。java
源碼閱讀本就是要貫注全神、戒驕戒躁的沉浸式過程。我本着浮躁至極的心態,單刀直入,從入口方法先殺入「敵軍」內部,讓你們短期內享受到最大的學習成就感,而後再橫向鋪開,帶你們一窺源碼的究竟。有不對之處,輕噴、指出。安全
java1.5引入的線程池的標準類,ThreadPoolExecutor。bash
咱們一般是經過Executors這個工廠類來建立具體的實例,如:多線程
Executors.newCachedThreadPool(...)
Executors.newScheduledThreadPool(...)
複製代碼
前者建立的就是咱們要講的ThreadPoolExecutor實例。後者是有延遲功能的線程池,ScheduledThreadPoolExecutor,有機會再講吧。ThreadPoolExecutor
這個線程池實例,內部維護了一個線程的集合,用來存放線程;有一個存放待執行任務的隊列,在池內線程數達到最大值時,任務就暫時入隊,等待線程取走運行。因此,目前來看,ThreadPoolExecutor
的結構以下: 函數
我會先列一下該源碼涉及到的重要的邏輯方法,而後按使用時一般的調用順序,挨個講解,最後合併總結。oop
execute
:執行任務方法,內部封裝了新建線程、任務入隊等重要邏輯addWorker
:新建線程方法getTask
:從任務隊列內獲取一個任務runWorker
:池內線程的主循環邏輯。提醒一下,多線程都會調用這同一個方法,因此尤爲注意同步問題。workQueue
[BlockingQueue
],任務隊列,是一個BlockingQueue對象,線程安全ctl
[Integer
],記錄了線程池的運行狀態值跟池內的線程數workers
[HashSet<Worker>
],具體存放線程的set
對象corePoolSize
[volatile int
],線程池核心線程數配置,低於這個數值時,新進來的任務一概以新啓動線程處理ctl
基礎知識準備ThreadPoolExecutor實例表明一個線程池,相應地,這個池子就有一些運行狀態,以及池子內線程數量的配置。這二者的實現以下:學習
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
複製代碼
線程池狀態跟池內線程數的統計都記錄在ctl
這個AtomicInteger
(它是誰,先本身查吧。後面估計會專門寫下,這裏只要記住它的加減是線程安全的就好)中。具體實現是: java內一個整型量是32位,這裏AtomicInteger
也是。源碼做者將ctl
的32位中高3位用來記錄線程池狀態,低29位用來記錄線程數量。 驗證來看,COUNT_BITS
的值是29,方法 runStateOf(int c) {return c & ~CAPACITY;}
這裏的c
就是ctl
變量,而CAPACITY
就是一個mask面紗,用來從 ctl
中提取上面兩個變量的,它是這樣的: ui
因此,runState
就是ctl
與取反後的 CAPACITY
相與,也就是隻有高4位有效,正好對應線程池狀態的記錄位。 因此,各類狀態下,ctl
的值以下: RUNNING
:1001 x(28個),-1,最高位是符號位,這個<<
位移操做是忽略符號位的位移 SHUTDOWN
:0000 x(28), 0 STOP
: 0001 x(28), 1 TIDYING
: 0010 x(28), 2 TERMINATED
: 0011 x(28), 3this
execute(Runnable command)
用過線程池的人應該都用過這個入口函數,它就是用來將一個Runnable
任務體放入線程池,下面讓咱們來具體看看它的邏輯(代碼塊無法高亮了,你們看下代碼段中註釋的翻譯部分):atom
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
* 主要的邏輯註釋,在這
* 1. If fewer than corePoolSize threads are running, try to
* 若是當前池內線程數小於corePoolSize,那就嘗試去新建一
* start a new thread with the given command as its first
* 條線程,傳進來的command參數做爲該線程的第一個任務
* task. The call to addWorker atomically checks runState and
* 體。調用addWorker函數會自動檢查線程池的狀態和池內活躍的線程數
* workerCount, and so prevents false alarms that would add
* 若是在不應或不能新建線程時新建了,那不會拋出異常,會返回false
* threads when it shouldn't, by returning false. * * 即便任務體成功入隊,咱們仍須要再去檢查一遍,咱們是否應該是新建線程而不是入隊, * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (由於可能自第一次檢查後,原池內活躍的線程 * (because existing ones died since last checking) or that * 又死掉啦)又或者線程池的狀態發生了變化 * the pool shut down since entry into this method. So we * 因此咱們去二次檢查池內狀態,若線程池狀態變爲中止,就去回滾,或者線程池此時一個活躍線程都沒有的話, * recheck state and if necessary roll back the enqueuing if * 新建一個線程去執行任務體。(PS:由於ThreadPoolExecutor * stopped, or start a new thread if there are none. * 主要是經過addWorker來建立線程,因此,若是池內一個活躍線程都沒有, * 這時咱們任務體入隊了,也沒有線程去跑...固然爲何只檢查一遍?我是想,可 * 能就只是做者單純地在這裏想檢查一遍,稍微確保下。由於即便這個二次檢查 * 沒問題,後續的,到池內線程確切地去跑這個任務體以前的代碼,每一行 * 代碼,都仍有發生這種狀況的可能。這,就是多線程...) * 3. If we cannot queue task, then we try to add a new * 若是咱們任務體入隊失敗,那我嘗試新建線程,若是還失敗 * 那就說明線程池已經被shutdown了,或者整個池子已經滿了,那咱們 * 就去拒絕這個任務體。這個拒絕,就會用到所謂的RejectPolicy對象 * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ // 獲取ctl對象 int c = ctl.get(); // 若是池內活躍線程數小於corePoolSize if (workerCountOf(c) < corePoolSize) { // 新建線程,第二個參數true能夠先忽略 if (addWorker(command, true)) return; // 新建線程失敗,那咱們獲取最新的線程池狀態變量ctl c = ctl.get(); } // 若是當前線程池仍在運行並且任務體入隊成功。 // (workQueue就是ThreadPoolExecutor具體的任務隊列。 // 而這裏就是咱們上面註釋提到的那段二次檢查的邏輯) if (isRunning(c) && workQueue.offer(command)) { // 二次檢查。獲取最新的線程池狀態字段 int recheck = ctl.get(); // 若是線程不在運行狀態 而且也成功把入隊的任務體刪除了 // 那就菜哦用拒絕策略來拒絕 if (! isRunning(recheck) && remove(command)) reject(command); // 或者,在線程池內活躍的線程數爲0時,新建一個線程 // 這裏傳參跟上面不同,先忽略。記錄這個新啓動一個線程就夠了 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 若是上面的If失敗了,就嘗試新啓動線程,啓動失敗了,那說明 // 上面的失敗,是isRunning形成的,因此拒絕任務體。啓動成功了,那就是成功了。 else if (!addWorker(command, false)) reject(command); } 複製代碼
這裏涉及到ThreadPoolExecutor
線程池增長線程的一個判斷邏輯: 每當ThreadPoolExecutor.execute
執行一個任務時,先判斷corePoolSize
,當池內線程數小於這個時,直接新增線程,若大於這個,則向workQueue
任務隊列入隊,隊列滿了時,則以maximumPoolSize
爲界開始繼續新建線程,當超過maximumPoolSize
時,就採用最後的RejectPolicy進行拒絕處理。
addWorker(Runnable firstTask, boolean core)
這個函數主要邏輯是新啓動一個線程,firstTask
是新啓動線程的第一個任務,能夠爲null
,爲null
時,就是單純地啓動一個線程,記得咱們以前在execute(Runnable command)
方法中,在線程池內沒有有效線程時,調用firstTask
爲null
的方法來啓動一條線程。 第二個參數core
是用來辨別,啓動一個新線程時,是以corePoolSize
這個線程數配置量來做爲限制,仍是以maximumPoolSize
這個線程數配置量做爲限制。 看下源碼(邏輯主要放註釋裏了):
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
// 獲取到線程池的運行狀態
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 檢查任務隊列在它必要的時候才能爲空。這裏
// 只能直接翻譯下,具體的,if裏的判斷邏輯也能推斷出來
// 可是目前我也不能肯定說出在這種狀況下要false退出的
// 緣由。若是想搞清它,可能只能完全把線程池的運行狀態、
// 線程池內的線程數、任務隊列內的任務數三者全部可能的狀況的
// 前提下才能肯定。這裏待大神指出來了。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 第二個參數的做用就在這裏產生了!
// 這裏在確保池內線程數不超過ctl極限CAPACITY
// 以及不超過相應的xxxPoolSize的狀況下,經過
// CAI操做去給線程數加1,成功了,則跳出retry標記後
// 的循環。至於CAI是什麼?先記住它是線程安全的給數值+1的操做就好
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 到此位置,線程池內的線程數標記字段已經加1了
// 接下來的,就是具體添加一個線程的操做了
//
// 這裏就不可避免的涉及到了ThreadPoolSize中的
// Worker這個內部類了,這個類就是具體的ThreadPoolSize
// 內部用來表明一個線程的封裝對象,他封裝了一個線程實例
// ,用來跑具體的任務;封裝了一個Runnable 實例,表明具體的任務;
// 同時,它繼承、實現了AQS(AbstractQueuedSynchronizer)跟Runnable,因此,這個
// Worker實例能夠理解成一個小勞工,有本身的運行線程,有
// 本身的具體的執行任務體,同時,本身也有同步機制AQS。
// 這裏涉及到AQS,你們夥就暫且理解成AQS賦予Worker同步的性質便可(調用AQS的方法就能實現)
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 初始化一個Worker勞工,同時指定給他的任務。這個任務
// 能夠爲null,空。表示什麼也不作。同時,初始化的時候,也會
// 初始化Worker體內的線程對象,這條線程的對象的啓動,是
// 在worker對象的Runnable.run實現方法裏
w = new Worker(firstTask);
final Thread t = w.thread;
// 這個mainLock是ThreadPoolExecutor用來同步對
// workers線程隊列的CRUD操做的
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 當線程池處於RUNNING狀態,則能夠繼續操做;
// 或者當線程池處於SHUTDOWN,可是firstTask 爲null
// 也就是說,這裏是爲了增長一個線程的,因此,也能夠放行
// 由於SHUTDOWN狀態,是容許啓動線程將任務隊列內的任務跑完的
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 將新生成的線程勞工加入到線程集合中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 若是線程勞工添加成功,則啓動它
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 若是線程啓動失敗,則運行回滾操做
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
複製代碼
Worker
對象的線程start()
這裏講述的方法是接上面addWorker
時,成功調用的t.start()
,這裏啓動了Worker
封裝的線程。這個線程是Worker
構造函數裏生成的,以下:
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// 這裏設置了AQS的state,用來抑制interrupt直到
// runWorker方法
setState(-1); // inhibit interrupts until runWorker
// 這裏傳遞了線程的任務體,能夠爲null
this.firstTask = firstTask;
// 初始化線程時,給線程指定了worker實例自身這個Runnable,所以,線程在start後,
// 就是在運行worker當前實例自身的run方法
this.thread = getThreadFactory().newThread(this);
}
複製代碼
看完上面代碼的註釋,接着看worker
實例自身的run
方法
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
複製代碼
能夠看到這裏調用了runWorker
方法,傳參是worker
自身。runWorker
是同一個ThreadPoolExecutor
實例的方法,因此,線程池實例下的全部Worker
線程都是在跑這同一個runWorker
方法。
runWorker(Worker worker)
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them...
*/
final void runWorker(Worker w) {
// 這裏獲取到線程對象,其實就是參數Worker對象內封裝的Thread對象。
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// allow interrupts,容許Interrupt打斷,記得Worker對象的構造函數嘛?
// 構造函數一開始就調用了setState(-1)去抑制interrupt。這裏就是去釋放它。
// 固然,這裏具體的抑制interrupt的含義,要結合AQS來了解了,我後面再加吧。
w.unlock();
boolean completedAbruptly = true;
try {
// 若是Worker中的firstTask對象不是空的,則
// 直接跑它;若否則,調用getTask從隊列中獲取一條任務來執行。這裏
// 會一直while循環,因此worker們在任務隊列中有任務時
// 會一直在這個runWorker中循環while取任務執行
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 這裏有一個很巧妙的判斷邏輯,這段代碼也主要是實現了
// 上面英文註釋中的邏輯,可是可能比較難理解,稍等
// 在後面我會專門講一下
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 任務體的前置函數,如今默認是空函數
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 任務體的執行函數
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
// 後置函數
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
// 這個參數記錄worker運行是不是被打斷的,若是不是,代碼
// 會安全地走到這裏,而後置字段爲false。
// 不然,異常狀況下就直接跳到finally中了,值仍爲初始化時的true
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
複製代碼
這段源碼上方我放了一段註釋,翻譯過來就是: worker
對象的主要Loop循環體。從隊列(workQueue
)中獲取任務體,而後執行。
getTask
函數private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
// 這個for(;;)也是個無限循環的實現,它比while(true)的好處是
// 在字節碼層面上,它比while(true)少兩行字節碼代碼,因此
// 效率更高
for (;;) {
// 獲取線程池最新狀態
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// allowCoreThreadTimeOut表示coreThread,實際上是判斷
// 線程數在這個coreThreadPoolSize範圍內時,線程是否能夠超時。
// 這裏的判斷邏輯也很巧妙,若是allowxxxTimeOut爲true,coreThread
// 能夠超時,則 || 後面判斷coreThread的邏輯也就無所謂了,是吧。
// 但若是allowxxxxTimeOut爲false,coreThread不容許超時,
// 則須要去判斷在判斷的線程是否實在coreThread範圍內,是的話,
// 則最終結果也爲false,符合coreThread不能超時的邏輯;若是大於,
// 則說明當前方法的線程不是在coreThread,
// 注意去理解這個是否是coreThread這個概念
// 因此,timed爲true,也就是能夠超時,符合邏輯
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 這裏我把代碼格式化了下,方便你們去看
// 這裏的判斷就是,在符合了一些邏輯後,就去直接
// wokerCount減一,表明當前這個woker就直接幹掉了,
// 而在方法內返回null這個邏輯,在調用getTask的代碼處
// 確實也是去幹掉當前的worker實例。可是,woker不能
// 瞎幹掉,必需要確保線程池能正常產生做用,這個正常做用
// 的實現,要麼就是幹掉當前的worker還剩下至少一個,
// 要麼就是任務隊列空了,這個邏輯就在(wc > 1 || workQueue.isEmpty)
// 實現了。再來看 && 以前,在當前線程數大於
// maximumPoolSize限制時,或者當前woker能夠超時,
// 即timed爲true,同時,上一次獲取任務體時也超時了(timedOut)
// 則,當前的worker就幹掉。這段邏輯有一個timedOut
// 判斷,即上一次當前worker獲取任務體時就超時了。
// 我猜想,加這個邏輯,可能就是純粹的統計學上的效率
// 提升。固然,歡迎更多想法。
//
// 在符合上述條件後,CAS操做來減小workerCount數
// 再返回null,去幹掉當前worker實例。
if (
(wc > maximumPoolSize || (timed && timedOut))
&&
(wc > 1 || workQueue.isEmpty())
) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 根據當前的worker是否能夠超時,調用BlockingQueue
// 的不一樣方法來獲取任務體。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 獲取到任務體,則返回
if (r != null)
return r;
// 超時了,記錄標記位
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
複製代碼
這裏咱們總結下:
ThreadPoolExecutor的實際邏輯圖
workers
線程集合中的Worker
對象,在runWorker
中循環自workQueue
中獲取Runnable
任務體進行執行。對workers
線程集合的訪問要通過mainLock
這個鎖。