@TOCjava
前面一遍文章 咱們看了下FutureTask的源碼,知道了怎麼樣去獲取一個任務的返回值,今天咱們看下ThreadPoolExecutor。面試
ThreadPoolExecutor 看名詞 咱們就能夠 看作是ThreadPool 和Executor的結合,大概意思咱們也能知道就是線程池執行器,哈哈這翻譯 真棒。這篇博文 會從源碼的角度去分析下 一個線程任務 加入的線程池之後 是怎麼被執行的~編程
上面 說線程的時候 咱們也說過 線程是系統中極其珍貴的資源,那咱們要合理的使用他,因此有了線程池的出現,那線程池能帶來哪些好處呢安全
首先 咱們看下ThreadPoolExecutor 的繼承關係多線程
public class ThreadPoolExecutor extends AbstractExecutorService{}
public abstract class AbstractExecutorService implements ExecutorService{}
public interface ExecutorService extends Executor {
<!--中止線程池,狀態設置爲SHUTDOWN,而且不在接受新的任務,已經提交的任務會繼續執行-->
void shutdown();
<!--中止線程池,狀態設置爲STOP,不在接受先任務,嘗試中斷正在執行的任務,返回還未執行的任務-->
List<Runnable> shutdownNow();
<!--是不是SHUTDOWN狀態-->
boolean isShutdown();
<!--是否全部任務都已經終止-->
boolean isTerminated();
<!--超時時間內,去等待任務執行任務-->
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
<!--Callable 去提交任務-->
<T> Future<T> submit(Callable<T> task);
<!--Runnable 去提交任務-->
<T> Future<T> submit(Runnable task, T result);
<!--Runnable 去提交任務-->
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
public interface Executor {
void execute(Runnable command);
}
複製代碼
咱們先從最下面的接口Executor 來看,這個接口就是一個實現,就是執行execute方法,這個接口就是線程執行的入口併發
ExecutorService接口繼承了Executor接口,裏面的的方法比較多,咱們常見的shutdownNow,shutdown 就是在這個接口裏面的,還有就是咱們常見往線程池裏面提交任務的時候submit方法。ExecutorService豐富了對任務執行和管理的功能函數
AbstractExecutorService是一個抽象類,實現了ExecutorService接口,這邊順帶說下,爲何java 源碼裏面存在大量 抽象類實現接口,而後類再繼承抽象類,爲何類不直接實現接口呢?還要套一層呢,以前我也不明白,後來我才清楚,抽象類去實現接口,就是去實現一些公共的接口方法,這樣類再次去實現接口的時候,只要關心我不一樣的實現就行了,由於 咱們知道接口的實現類不止一個,抽象類就是把這些要實現接口的類的公共的實現再次抽取出來,避免了大量的重複實現,尤爲List,Set 接口 你看下 幾乎都有響應的抽象類實現!高併發
<!--ctl 存儲了線程池狀態和線程的數量-->
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;//32-3=29
private static final int CAPACITY = (1 << COUNT_BITS) - 1;//2的29次方-1
// runState is stored in the high-order bits
<!--表示線程池正在運行,能夠接受任務 處理線程池中任務-->
private static final int RUNNING = -1 << COUNT_BITS;
<!--不接受新的任務,可是任然會處理隊列中的任務-->
private static final int SHUTDOWN = 0 << COUNT_BITS;
<!--不接受新的任務,不會處理隊裏中任務,對正在執行的任務進行中斷-->
private static final int STOP = 1 << COUNT_BITS;
<!--任務被中斷,正在處理整理狀態-->
private static final int TIDYING = 2 << COUNT_BITS;
<!--表示終結狀態-->
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
<!--獲取當前線程池的運行狀態-->
private static int runStateOf(int c) { return c & ~CAPACITY; }
<!--獲取當前線程池中工做線程的數量->
private static int workerCountOf(int c) { return c & CAPACITY; }
<!--獲取ctl的值 ->
private static int ctlOf(int rs, int wc) { return rs | wc; }
複製代碼
關於Ctl是怎麼處理線程狀態和線程數的數量的,能夠看下個人另一篇博文:blog.csdn.net/zxlp520/art…oop
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
複製代碼
這個構造函數 是全部構造函數最終調用的方法,那咱們就說下 這些具體的參數源碼分析
爲何要先講worker呢?由於咱們提交的任務Runnabale是以Worker這個對象去包裝後運行的,這個後面我我講addWorker方法的時候在細聊
先看下Worker的代碼:
/** Worker 繼承了AQS 和實現了Runnable接口 */
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
private static final long serialVersionUID = 6138294804551838833L;
/** worker 運行的主體線程 就是在哪一個線程裏面運行任務的 */
final Thread thread;
/** 須要運行的任務 */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);//這邊的this 就是當前的Worker 對象
}
/** 運行 當前的任務 runWorker是ThreadPoolExecutor裏面的方法 */
public void run() {
runWorker(this);
}
// Lock methods
// 0 表示 沒有鎖住狀態
// 1 表示 鎖住狀態
protected boolean isHeldExclusively() {
return getState() != 0;
}
<!--這個方法咱們應該很熟悉 我在將AQS的時候聊過這個方法,這邊作的就是嘗試修改state的狀態,這樣就是表示加鎖的意思,表示這個worker 是鎖住狀態,別的線程不能執行,-->
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {//CAS 去修改State的值,1表示 已經被上鎖
setExclusiveOwnerThread(Thread.currentThread());設置當前鎖的佔用者線程是當前線程
return true;
}
return false;
}
<!--釋放鎖,也就是修改State的值 爲0 unused這個字段命名也挺有意思,意思是說 沒用的意思-->
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);//設置當前鎖的佔用者線程是null
setState(0);
return true;
}
<!--給當前的Worker加鎖,若是獲取不到 就加入等待隊裏中,阻塞當前執行線程-->
public void lock() { acquire(1); }
<!--這邊至關於一個非公平鎖的實現 去嘗試下加鎖-->
public boolean tryLock() { return tryAcquire(1); }
<!--釋放鎖-->
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
<!--嘗試去中斷運行的線程任務,就是咱們調用shutdownNow 的時候-->
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
複製代碼
首先看下 這個Worker的繼承結構,首先是實現了Runnable,又了這樣的關係,Worker就能夠被Thread去執行了,另一個還有一個繼承了一個抽象類AbstractQueuedSynchronizer,簡稱AQS,這個類 哈哈 真的是好久不見了,我以前花了5篇文章解釋了這個AQS,可想而知其重要性,JUC 中不少實現都是 基於這個去作的,仍是不清楚的小夥伴能夠去到個人博客裏面去找下。
這邊又一行代碼 咱們須要留意下,挺有意思的,this.thread = getThreadFactory().newThread(this);這邊 的this 就是咱們構建的Worker,thread 就是用ThreadFactory去建立的一個線程而且執行的任務就是Worker,也就是調用thread.start()就能夠執行Worker了
execute是實現Executor接口的方法,就是執行的任務的入口方法,咱們看下一個任務的提交進來是怎麼作的
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */
int c = ctl.get();//獲取當前的ctl 值
/* * workerCountOf方法我上面也講過,就是獲取當前的工做線程數 * 若是當前的工做線程數小於設置的核心線程數量,就調用addWorker去新增一個工做線程,ture是表示要添加核心工做線程 * addWorker 若是添加成功就直接返回,若是添加失敗就繼續後去下ctl,這邊重寫獲取是爲了 防止在addWorker過程當中 ctl發生了改變 */
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
/* * 走到這步 說明當前的工做線程數大於核心線程數或者是addWorker發生了失敗 * 首先去判斷了下 當前的線程狀態是不是Running 而後把當前任務加入到阻塞隊列workQueue中 * 若是都成功了 那就再次獲取下ctl,由於咱們在offer Runnable的時候可能ctl也會發生變化 *這邊的多重驗證 考慮到高併發的狀況,代碼邏輯很是的嚴謹 * 繼續走下去的邏輯是 再次判斷下線程池狀態 若是是非Running,那就移除當前的任務,最後執行reject方法 根據不一樣的拒絕策略,作不一樣的行爲 * 最後走到 判斷當前線程數量若是是0,仍是回去調用addWorker方法,傳入一個空的Runnalbe,false 是表示建立一個非核心的工做線程 */
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/* * 走到這個判斷 說明當前線程池狀態是非Running或者入隊任務失敗,隊列多是滿了 * 這邊是去建立非核心線程去處理任務,若是建立失敗 就執行拒絕策略 */
else if (!addWorker(command, false))
reject(command);
}
複製代碼
這邊的英文註釋 我沒捨得刪除,讀者能夠去本身翻譯下 描述的可能比我準確,我相信 你們能看的懂,而後再對比下 我下面的中文註釋,我相信能清楚 一個任務新增進來 是怎麼個處理流程!
看完本身再回想下,何時去建立核心線程?何時去建立非核心線程?何時任務會加入的阻塞隊列中?最後執行拒絕策略 有那幾種狀況?知道這些答案 那麼execute方法你應該瞭然於心了!
下面咱們看下一個重點的方法,這個方法 調用的頻次很高,咱們進入去看下
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//這個是一個自旋 套了一個自旋 其目的就是CAS 新增線程池的數量
for (;;) {
int c = ctl.get();//獲取ctl的值
int rs = runStateOf(c);//獲取當前的線程狀態
// 這邊這個條件看上去很繞頭,可是仔細看看就能知道
// 第一個條件rs >= SHUTDOWN 說明線程池狀態不正常
// 後面有一個非的判斷 其實就是括號裏面的條件有一個不成立 整個條件就是false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 下面是獲取線程裏面的工做線程 若是大於最大值或者設置的閾值,就返回直接返回false 方法結束
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//這個的意思是 若是CAS修改workerCount成功 整個最外層的自旋就結束
if (compareAndIncrementWorkerCount(c))
break retry;
// 這邊爲何要用2個自旋 主要是這邊又判斷了下 當前這個自旋CAS修改WorkerCount失敗後,ctl會發生變化
//若是和外層的不相等,就要返回外層的自旋 去重寫作
這邊就是爲何用的是 continue retry
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;//worker是否開始執行了
boolean workerAdded = false;//worker 是否添加成功
Worker w = null;
try {
w = new Worker(firstTask);//將Runnable 傳入到worker的構造函數中,上面也講過,其實就是用firstTask去構造了先的Thread
final Thread t = w.thread;//當前的t就是執行Runnable的線程,在worker中建立
if (t != null) {
final ReentrantLock mainLock = this.mainLock;//重入鎖
mainLock.lock();//保證添加workder時候的線程安全
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);//添加worker到一個工做worker集合中HashSet存儲的
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();//釋放鎖
}
if (workerAdded) {//若是添加成功
t.start();//這個是真正執行Worker的地方 就是這兒
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);//若是最終Worker沒有運行,那就清理掉他 修改對應的WorkerCount
}
return workerStarted;
}
複製代碼
方法最開始的地方 用了2個自旋去解決併發狀況下的CAS修改workerCount失敗的狀況,這邊每一個細節,每種狀況都考慮的很到位,狀態判斷的特別的嚴謹,真正看明白,感受多線程狀況下的編程是多麼的麻煩,辛虧幫咱們作了封裝!
咱們看下 t.Start() 這邊方法,咱們知道t就是Worker裏面建立線程主體,是以本身爲任務傳入到Thread中的,咱們知道start是開始運行線程,最終是會調用到run方法的,那麼就是說會調到Worker裏面的run 方法,咱們在回看下Worker裏面的run方法
public void run() {
runWorker(this);//ThreadPoolExecutor裏面的方法
}
複製代碼
上面我也說了 線程start後會調用run方法,那麼也就是調用 runWorker方法,咱們在看下這個裏面寫的時候什麼
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;//獲取Worker裏面的任務
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//一直while循環
while (task != null || (task = getTask()) != null) {
w.lock();//鎖住Worker
//判斷若是當前的線程池狀態是stop 而且檢測當前線程的中斷狀態若是是false 就幫助當前線程執行中斷調用interrupt()
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);//執行任務的前置Action
Throwable thrown = null;
try {
task.run();//執行最終的Runnable任務
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);///執行任務的後置Action
}
} finally {
task = null;
w.completedTasks++;//Worker完成的任務+1
w.unlock();//釋放鎖
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);//Worker執行結束後退出
}
}
複製代碼
RunWorker方法是整個線程池運行任務的核心方法,線程會使用While循環 不斷的從阻塞隊裏裏面去獲取任務,而後去執行任務,若是阻塞隊列裏面沒有任務,這個時候 getTask() 方法就會阻塞線程,直至新任務的到來,因此咱們在作單元測試的時候,用到線程池,若是你不調用Shutdown 方法 ,你的debug 小紅點就一直在運行,就是這個緣由!
這個方法就是從阻塞隊列中取獲取任務
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//判斷線程池的狀態若是是SHUTDOWN而且隊列爲空 或者直接狀態就是null 就不會從阻塞隊列中 取出任務 直接返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
//timed 就是用來控制 獲取阻塞隊列中的任務 是否有等待時間,咱們設置的keepAliveTime值就會在這邊用到,若是一個工做線程在等待任務超過了設置的值就會退出等待,回收線程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))//工做線程數減1
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();// 獲取任務
if (r != null)
return r;
timedOut = true;//設置等待超時標誌 應該在自旋中,下次判斷會用到此值
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
複製代碼
咱們都知道 當咱們調用shutdown的時候 線程池狀態是ShUTDOWN,調用shutdownnow的時候線程狀態是Stop,那麼這2種狀態是怎麼處理阻塞隊列裏面的任務的呢,看了上文咱們應該能找到答案,當狀態是stop的時候,咱們獲取隊列中的任務是直接返回的null的也就是說隊列中的任務不會在執行了,可是當狀態是shutdown的時候 只有 隊列爲空的時候 纔會返回null,也就是隊列不空 仍是能夠獲取隊列中的任務的,這種問題 在面試題中常常出現,若是要正在知道答案,仍是要經過從源碼中去真正理解,光是被答案我相信你很快仍是會忘記的!
掌握了execute方法 在看submit方法 其實就很簡單了,submit通常是用於添加 帶返回值的任務,咱們看下 代碼
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);//將Runnable 包裝成FutureTask任務 去讓線程執行
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);//將Runnable 包裝成FutureTask任務 去讓線程執行
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
複製代碼
看到這邊的代碼,應該有點兒熟悉的味道,應該上篇文章聊FutureTask的時候 不少已經將過了,包括Runnable和Callable怎麼轉換的,Future是怎麼獲取返回值的? 不清楚的小夥伴 能夠去看下我以前的文章!blog.csdn.net/zxlp520/art…
上面三個構造函數,就是對應着FutureTask的構造函數,說白了就是咱們使用execute的時候都是用FutureTask去傳入的,由於FutureTask也是實現了Runable接口的
最後 用一張流程圖,來描述下一個任務從添加到運行結束,經歷了哪些方法!
ThreadPoolExecutor 雖然裏面執行方法不少,可是你若是掌握了常見的邏輯運算符,AQS,線程,FutureTask 等相關知識的基礎前提下 去看源碼,也不會那麼的累。最後我畫的流程圖,就是一個任務在新增到線程池中執行的整個流程!
最後分享下最近看到的一段話: 什麼是危機?
真正的危機,來源於在正確的時間作不正確的事。沒有在正確的時間,爲下一步作出積累,這纔是危機的根源。
若是你正在這條成長路上的朋友,晚醒不如早醒,這就是我想說的。千萬別等到中年才發現本身沒有創建好本身的護城河,這個時候才知道努力。在本身努力的階段,不只不努力反了選擇了縱容本身,這纔是危機的根源。
但願你們會有所收穫,不負時光,不負卿!