■ 線程池的建立安全
在Java中,您能夠經過調整-Xss參數來調節每一個線程棧的大小(64bit系統默認1024KB),當減少該值時意味着能夠建立更多的線程數,但問題是JVM資源是有限的,線程不能無限建立!併發
從筆者開發經驗來看,線程池應該是併發包中使用頻率和運用場景最多的併發框架,幾乎全部併發/異步執行任務的需求都須要用到線程池,線程複用,之內部線程池的形式對外提供管理任務執行,線程調度,線程池管理等等服務。合理的使用線程池能夠帶來以下三個好處:框架
1.下降資源消耗:經過重用已建立的線程來下降線程建立和銷燬的消耗異步
2.提升響應速度:任務到達時不須要等待線程建立就能夠當即執行優化
3.提升線程的可管理性:線程池能夠統一管理、分配、調優和監控ui
■ ThreadPoolExecutor —— 線程池最核心的類this
- 類定義: 實現了 AbstractExecutorService 類,ExecutorService,Executor 接口spa
public class ThreadPoolExecutor extends AbstractExecutorService implements ExecutorService,Executor {
- 構造器:經過觀察每一個構造器的源碼具體實現,發現前面三個構造器都是調用的第四個構造器進行的初始化工做線程
/** * 線程工廠默認爲DefaultThreadFactory * 飽和策略默認爲AbortPolicy */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } /** * 線程工廠可配置 * 飽和策略默認爲AbortPolicy */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } /** * 線程工廠默認爲DefaultThreadFactory * 飽和策略可配置 */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } /** * 線程工廠可配置 * 飽和策略可配置 */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
- 重要變量設計
//線程池控制器 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //任務隊列 private final BlockingQueue<Runnable> workQueue; //全局鎖 private final ReentrantLock mainLock = new ReentrantLock(); //工做線程集合 private final HashSet<Worker> workers = new HashSet<Worker>(); //終止條件 - 用於等待任務完成後才終止線程池 private final Condition termination = mainLock.newCondition(); //曾建立過的最大線程數 private int largestPoolSize; //線程池已完成總任務數 private long completedTaskCount; //工做線程建立工廠 private volatile ThreadFactory threadFactory; //飽和拒絕策略執行器 private volatile RejectedExecutionHandler handler; //工做線程活動保持時間(超時後會被回收) - 納秒 private volatile long keepAliveTime; /** * 容許核心工做線程響應超時回收 * false:核心工做線程即便空閒超時依舊存活 * true:核心工做線程一旦超過keepAliveTime仍然空閒就被回收 */ private volatile boolean allowCoreThreadTimeOut; //核心工做線程數 private volatile int corePoolSize; //最大工做線程數 private volatile int maximumPoolSize; //默認飽和策略執行器 - AbortPolicy -> 直接拋出異常 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
■ ThreadPoolExecutor 的使用
- 建立線城池實際上就是實例化一個線程池對象,這裏咱們使用最完整的構造器來描述最完整的建立過程:
1. corePoolSize(核心工做線程數):無任務時,線程池容許(維護)的最小空閒線程池數;當一個任務被提交到線程池就新建一個工做線程來執行任務(即便此時有空閒的核心工做線程)直到(實際工做線程數 >= 核心工做線程數)爲止;調用 prestartAllCoreThreads()方法會提早建立並啓動全部核心工做線程
2. maximumPoolSize(最大工做線程數):線程池容許建立的最大工做線程數;當(隊列已滿 && 實際工做線程數 < 最大工做線程數)時,線程池會建立新的工做線程(即便此時仍有空閒的工做線程)執行任務直到最大工做線程數爲止;設置無界隊列時該參數其實無效
3. keepAliveTime(工做線程最大空閒時間):單位納秒,知足超時條件且空閒的工做線程會被回收;超時的非核心工做線程會被回收,核心工做線程不會被回收;當allowCoreThreadTimeOut=true 時,則超時的核心工做線程也會被回收;若該值沒有設置則線程會永遠存活;建議當場景爲任務短而多時,能夠調高時間以提升線程利用率
4. unit(線程活動保持時間單位): 線程活動保持時間單位,可選的包括NANOSECONDS納秒、MICROSECONDS微秒、MILLISECONDS毫秒、SECONDS秒、MINUTES分、HOURS時、DAYS天
5. workQueue(任務隊列): 用來保存等待執行的任務的阻塞隊列;當 (實際工做線程數 >= 核心工做線程數) && (任務數 < 任務隊列長度)時,任務會offer()入隊等待;關於任務隊列詳見下文的任務隊列與排隊策略
6. threadFactory(線程建立工廠): 顧名思義,就是用於建立線程的工廠,容許自定義建立工廠,能夠線程進行初始化配置,好比名字、守護線程、異常處理等等
7. handler(飽和策略執行器): 當線程池和隊列都已滿,此時說明線程已無力再接收更多的任務,即任務數飽和,無法接單了;此時須要使用一種飽和策略處理新提交的任務,默認是Abort(直拋Reject異常),還包括Discard(LIFO規則丟棄)、DiscardOldest(LRU規則丟棄) 以及 CallerRuns(調用者線程執行),容許自定義執行器
從上面給出的 ThreadPoolExecutor 類的代碼能夠知道,ThreadPoolExecutor 繼承了 AbstractExecutorService,咱們來看一下 AbstractExecutorService 的實現:
public abstract class AbstractExecutorService implements ExecutorService { protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { }; protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
public Future<?> submit(Runnable task) {}; public <T> Future<T> submit(Runnable task, T result) { }; public <T> Future<T> submit(Callable<T> task) { };
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { }; public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { }; public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { };
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { }; public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { }; }
AbstractExecutorService 是一個抽象類,它實現了ExecutorService 接口:
public interface ExecutorService extends Executor { void shutdown(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
而ExecutorService 又是繼承了Executor 頂層接口:
public interface Executor { void execute(Runnable command); }
- 提交、執行和關閉任務 (重要方法)
1. execute(): 適用於提交無須返回值的任務
- 該方法是沒法判斷任務是否被線程池執行成功
2. submit(): 適用於提交須要返回值的任務
-能夠經過返回的Future對象得知任務是否已經執行成功
-get() 方法會阻塞當前線程直到任務完成,但要注意防範無限阻塞!!!
-使用 get(long timeout,TimeUnit unit) 方法會阻塞當前線程直到任務完成或超時,不會有無限阻塞的發生但須要注意超時後任務可能還沒完成!!!
3. shutdown() : 有序地關閉線程池,已提交的任務會被執行(包含正在執行和任務隊列中的),但會拒絕新任務
shutdownNow(): 當即(嘗試)中止執行全部任務(包含正在執行和任務隊列中的),並返回待執行任務列表
■ ThreadPoolExecutor 實現原理
- 流程圖
- 線程池的狀態
線程狀態的流轉遵循以下順序,即由小到大順序排列:
RUNNING -> SHUTDOWN -> STOP -> TIDYING -> TERMINATED
* 補充:數值的變遷感受就比如咱們的年齡,越大離上帝就越近 = =
//線程池狀態控制器,用於保證線程池狀態和工做線程數 ps:低29位爲工做線程數量,高3位爲線程池狀態 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //設定偏移量 Integer.SIZE = 32 -> 即COUNT_BITS = 29 private static final int COUNT_BITS = Integer.SIZE - 3; //肯定最大的容量2^29-1 private static final int CAPACITY = (1 << COUNT_BITS) - 1; //獲取線程池狀態,取高3位 private static int runStateOf(int c) { return c & ~CAPACITY; } //獲取工做線程數量,取低29位 private static int workerCountOf(int c) { return c & CAPACITY; } /** * 獲取線程池狀態控制器 * @param rs 表示runState 線程池狀態 * @param wc 表示workerCount 工做線程數量 */ private static int ctlOf(int rs, int wc) { return rs | wc; }
這裏補充一點二進制運算符基礎知識方便忘卻的讀者理解一下:
&:與運算符,同位都爲1才爲1,不然爲0
|:或運算符,同位有一個爲1即爲1,不然爲0
~:非運算符,0和1互換,即如果0變成1,1則變成0
^:異或運算符,同位相同則爲0,不一樣則爲1
- 工人生產(生產者與消費者模式)
以前每一個變量的做用都已經標明出來了,這裏經過實例展現其做用:
/** 假若有一個工廠,工廠裏面有10個工人,每一個工人同時只能作一件任務。 所以只要當10個工人中有工人是空閒的,來了任務就分配給空閒的工人作; 當10個工人都有任務在作時,若是還來了任務,就把任務進行排隊等待; 若是說新任務數目增加的速度遠遠大於工人作任務的速度,那麼此時工廠主管可能會想補救措施,好比從新招4個臨時工人進來; 而後就將任務也分配給這4個臨時工人作; 若是說着14個工人作任務的速度仍是不夠,此時工廠主管可能就要考慮再也不接收新的任務或者拋棄前面的一些任務了。 當這14個工人當中有人空閒時,而新任務增加的速度又比較緩慢,工廠主管可能就考慮辭掉4個臨時工了,只保持原來的10個工人,畢竟請額外的工人是要花錢的。 **/
那麼咱們知道其實線程就至關於工人,因此咱們來看下線程池的內部類 Worker:
/**
Worker類封裝了 ( 鎖 + 線程 + 任務 ) 這三個部分,從而成爲了一個多面手的存在
*/
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ /** 實際上真正的工做線程 - 幕後大佬,但可能因線程工廠建立失敗而爲null */ final Thread thread; /** 待執行任務,可能爲null */ Runnable firstTask; /** 該工做線程已完成的任務數 -- 論KPI的重要性 */ volatile long completedTasks; Worker(Runnable firstTask) { //設置鎖狀態爲-1,目的是爲了阻止在runWorker()以前被中斷 setState(-1); /** * 新任務,任務來源有兩個: * 1.調用addWorker()方法新建線程時傳入的第一個任務 * 2.調用runWorker()方法時內部循環調用getTask() -- 這就是線程複用的具現 */ this.firstTask = firstTask; /** * 建立一個新的線程 -> 這個是真正的工做線程 * 注意Worker自己就是個Runnable對象 * 所以newThread(this)中的this也是個Runnable對象 */ this.thread = getThreadFactory().newThread(this); } }
- 執行任務
/** * 工做線程運行 * runWorker方法內部會經過輪詢的方式 * 不停地獲取任務和執行任務直到線程被回收 */ public void run() { runWorker(this); }
(重點) 這裏簡單介紹一下線程在線程池執行任務的工做流程:
1.工做線程開始執行前,需先對worker加鎖,任務完成解鎖
2.任務執行先後分別執行beforeExecute()和afterExecute()方法
3.執行中遇到異常會向外拋出,線程是否死亡取決於您對於異常的處理
4.每一個任務執行完後,當前工做線程任務完成數自增,同時會循環調用getTask()從任務隊列中反覆獲取任務並執行,無任務可執行時線程會阻塞在該方法上
5.當工做線程因各類理由退出時,會執行processWorkerExit()回收線程(核心是將該worker從workers集合中移除,注意以前worker已經退出任務循環,所以已經再也不作工了,從集合移除後就方便gc了)
- 鎖方法
// Lock methods // The value 0 represents the unlocked state. 0表示未鎖定 // The value 1 represents the locked state. 1表示已鎖定 protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { //鎖狀態非0即1,即不可重入 //特殊狀況:只有初始化時才爲-1,目的是防止線程初始化階段被中斷 if (compareAndSetState(0, 1)) { //當前線程佔有鎖 setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { //釋放鎖 setExclusiveOwnerThread(null); //狀態恢復成未鎖定狀態 setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()){ try { t.interrupt(); } catch (SecurityException ignore) { } } }
- 動態控制
/** * 設置核心工做線程數 * 1.若新值<當前值時,將調用interruptIdleWorkers()處理超出部分線程 * 2.若新值>當前值時,新建立的線程(如有必要)直接會處理隊列中的任務 */ public void setCorePoolSize(int corePoolSize) /** * 設置是否響應核心工做線程超時處理 * 1.設置false時,核心工做線程不會由於任務數不足(空閒)而被終止 * 2.設置true時,核心工做線程和非核心工做線程待遇同樣,會由於超時而終止 * 注意:爲了禁止出現持續性的線程替換,當設置true時,超時時間必須>0 * 注意:該方法一般應在線程池被使用以前調用 */ public void allowCoreThreadTimeOut(boolean value) /** * 設置最大工做線程數 * 1.若新值<當前值時,將調用interruptIdleWorkers()處理超出部分線程 * 注意:當新值>當前值時是無需作任何處理的,跟設置核心工做線程數不同 */ public void setMaximumPoolSize(int maximumPoolSize) /** * 設置超時時間,超時後工做線程將被終止 * 注意:若實際工做線程數只剩一個,除非線程池被終止,不然無須響應超時 */ public void setKeepAliveTime(long time, TimeUnit unit)
■ 任務提交與執行
- execute() - 提交任務
/** * 在將來的某個時刻執行給定的任務 * 這個任務由一個新線程執行,或者用一個線程池中已經存在的線程執行 * 若是任務沒法被提交執行,要麼是由於這個Executor已經被shutdown關閉 * 要麼是已經達到其容量上限,任務會被當前的RejectedExecutionHandler處理 */ public void execute(Runnable command) { //新任務不容許爲空,空則拋出NPE if (command == null) throw new NullPointerException(); /** * 1.若實際工做線程數 < 核心工做線程數,會嘗試建立一個工做線程去執行該 * 任務,即該command會做爲該線程的第一個任務,即第一個firstTask * * 2.若任務入隊成功,仍須要執行雙重校驗,緣由有兩點: * - 第一個是去確認是否須要新建一個工做線程,由於可能存在 * 在上次檢查後已經死亡died的工做線程 * - 第二個是可能在進入該方法後線程池被關閉了, * 好比執行shutdown() * 所以須要再次檢查state狀態,並分別處理以上兩種狀況: * - 若線程池中已無可用工做線程了,則須要新建一個工做線程 * - 若線程池已被關閉,則須要回滾入隊列(如有必要) * * 3.若任務入隊失敗(好比隊列已滿),則須要新建一個工做線程; * 若新建線程失敗,說明線程池已中止或者已飽和,必須執行拒絕策略 */ int c = ctl.get(); /** * 狀況一:當實際工做線程數 < 核心工做線程數時 * 執行方案:會建立一個新的工做線程去執行該任務 * 注意:此時即便有其餘空閒的工做線程也仍是會新增工做線程, * 直到達到核心工做線程數爲止 */ if (workerCountOf(c) < corePoolSize) { /** * 新增工做線程,true表示要對比的是核心工做線程數 * 一旦新增成功就開始執行當前任務 * 期間也會經過自旋獲取隊列任務進行執行 */ if (addWorker(command, true)) return; /** * 須要從新獲取控制器狀態,說明新增線程失敗 * 線程失敗的緣由可能有兩種: * - 1.線程池已被關閉,非RUNNING狀態的線程池是不容許接收新任務的 * - 2.併發時,假如都經過了workerCountOf(c) < corePoolSize校驗,但其餘線程 * 可能會在addWorker先建立出線程,致使workerCountOf(c) >= corePoolSize, * 即實際工做線程數 >= 核心工做線程數,此時須要進入狀況二 */ c = ctl.get(); } /** * 狀況二:當實際工做線程數>=核心線程數時,新提交任務須要入隊 * 執行方案:一旦入隊成功,仍須要處理線程池狀態突變和工做線程死亡的狀況 */ if (isRunning(c) && workQueue.offer(command)) { //雙重校驗 int recheck = ctl.get(); /** * recheck的目的是爲了防止線程池狀態的突變 - 即被關閉 * 一旦線程池非RUNNING狀態時,除了從隊列中移除該任務(回滾)外 * 還須要執行任務拒絕策略處理新提交的任務 */ if (!isRunning(recheck) && remove(command)) //執行任務拒絕策略 reject(command); /** * 若線程池仍是RUNNING狀態 或 隊列移除失敗(可能正好被一個工做線程拿處處理了) * 此時須要確保至少有一個工做線程還能夠幹活 * 補充一句:之全部無須與核心工做線程數或最大線程數相比,而只是比較0的緣由是 * 只要保證有一個工做線程能夠幹活就行,它會自動去獲取任務 */ else if (workerCountOf(recheck) == 0) /** * 若工做線程都已死亡,須要新增一個工做線程去幹活 * 死亡緣由多是線程超時或者異常等等複雜狀況 * * 第一個參數爲null指的是傳入一個空任務, * 目的是建立一個新工做線程去處理隊列中的剩餘任務 * 第二個參數爲false目的是提示能夠擴容到最大工做線程數 */ addWorker(null, false); } /** * 狀況三:一旦線程池被關閉 或者 新任務入隊失敗(隊列已滿) * 執行方案:會嘗試建立一個新的工做線程,並容許擴容到最大工做線程數 * 注意:一旦建立失敗,好比超過最大工做線程數,須要執行任務拒絕策略 */ else if (!addWorker(command, false)) //執行任務拒絕策略 reject(command); }
- addWorker() - 新增工做線程
/** * 新增工做線程須要遵照線程池控制狀態規定和邊界限制 * * @param core core爲true時容許擴容到核心工做線程數,不然爲最大工做線程數 * @return 新增成功返回true,失敗返回false */ private boolean addWorker(Runnable firstTask, boolean core) { //重試標籤 retry: /*** * 外部自旋 -> 目的是確認是否可以新增工做線程 * 容許新增線程的條件有兩個: * 1.知足線程池狀態條件 -> 條件一 * 2.實際工做線程知足數量邊界條件 -> 條件二 * 不知足條件時會直接返回false,表示新增工做線程失敗 */ for (;;) { //讀取原子控制量 - 包含workerCount(實際工做線程數)和runState(線程池狀態) int c = ctl.get(); //讀取線程池狀態 int rs = runStateOf(c); /** * 條件一.判斷是否知足線程池狀態條件 * 1.只有兩種狀況容許新增線程: * 1.1 線程池狀態==RUNNING * 1.2 線程池狀態==SHUTDOWN且firstTask爲null同時隊列非空 * * 2.線程池狀態>=SHUTDOWN時不容許接收新任務,具體以下: * 2.1 線程池狀態>SHUTDOWN,即爲STOP、TIDYING、TERMINATED * 2.2 線程池狀態==SHUTDOWN,但firstTask非空 * 2.3 線程池狀態==SHUTDOWN且firstTask爲空,但隊列爲空 * 補充:針對1.二、2.二、2.3的狀況具體請參加後面的"小問答"環節 */ if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; /*** * 內部自旋 -> 條件二.判斷實際工做線程數是否知足數量邊界條件 * -數量邊界條件知足會對嘗試workerCount實現CAS自增,不然新增失敗 * -當CAS失敗時會再次從新判斷是否知足新增條件: * 1.若此期間線程池狀態突變(被關閉),從新判斷線程池狀態條件和數量邊界條件 * 2.若此期間線程池狀態一致,則只需從新判斷數量邊界條件 */ for (;;) { //讀取實際工做線程數 int wc = workerCountOf(c); /** * 新增工做線程會因兩種實際工做線程數超標狀況而失敗: * 1.實際工做線程數 >= 最大容量 * 2.實際工做線程數 > 工做線程比較邊界數(當前最大擴容數) * -若core = true,比較邊界數 = 核心工做線程數 * -若core = false,比較邊界數 = 最大工做線程數 */ if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; /** * 實際工做線程計數CAS自增: * 1.一旦成功直接退出整個retry循環,代表新增條件都知足 * 2.因併發競爭致使CAS更新失敗的緣由有三種: * 2.1 線程池恰好已新增一個工做線程 * -> 計數增長,只需從新判斷數量邊界條件 * 2.2 恰好其餘工做線程運行期發生錯誤或因超時被回收 * -> 計數減小,只需從新判斷數量邊界條件 * 2.3 恰好線程池被關閉 * -> 計數減小,工做線程被回收, * 需從新判斷線程池狀態條件和數量邊界條件 */ if (compareAndIncrementWorkerCount(c)) break retry; //從新讀取原子控制量 -> 緣由是在此期間可能線程池被關閉了 c = ctl.get(); /** * 快速檢測是否發生線程池狀態突變 * 1.若狀態突變,從新判斷線程池狀態條件和數量邊界條件 * 2.若狀態一致,則只需從新判斷數量邊界條件 */ if (runStateOf(c) != rs) continue retry; } } /** * 這裏是addWorker方法的一個分割線 * 前面的代碼的做用是決定了線程池接受仍是拒絕新增工做線程 * 後面的代碼的做用是真正開始新增工做線程並封裝成Worker接着執行後續操做 * PS:雖然筆者以爲這個方法其實能夠拆分紅兩個方法的(在break retry的位置) */ //記錄新增的工做線程是否開始工做 boolean workerStarted = false; //記錄新增的worker是否成功添加到workers集合中 boolean workerAdded = false; Worker w = null; try { //將新提交的任務和當前線程封裝成一個Worker w = new Worker(firstTask); //獲取新建立的實際工做線程 final Thread t = w.thread; /** * 檢測是否有可執行任務的線程,便是否成功建立了新的工做線程 * 1.若存在,則選擇執行任務 * 2.若不存在,則須要執行addWorkerFailed()方法 */ if (t != null) { /** * 新增工做線程須要加全局鎖 * 目的是爲了確保安全更新workers集合和largestPoolSize */ final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { /** * 得到全局鎖後,需再次檢測當前線程池狀態 * 緣由在於預防兩種非法狀況: * 1.線程工廠建立線程失敗 * 2.在鎖被獲取以前,線程池就被關閉了 */ int rs = runStateOf(ctl.get()); /** * 只有兩種狀況是容許添加work進入works集合的 * 也只有進入workers集合後纔是真正的工做線程,並開始執行任務 * 1.線程池狀態爲RUNNING(即rs<SHUTDOWN) * 2.線程池狀態爲SHUTDOWN且傳入一個空任務 * (理由參見:小問答之快速檢測線程池狀態?) */ if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { /** * 若線程處於活動狀態時,說明線程已啓動,須要當即拋出"線程狀態非法異常" * 緣由是線程是在後面才被start的,已被start的不容許再被添加到workers集合中 * 換句話說該方法新增線程時,而線程是新的,自己應該是初始狀態(new) * 可能出現的場景:自定義線程工廠newThread有可能會提早啓動線程 */ if (t.isAlive()) throw new IllegalThreadStateException(); //因爲加鎖,因此能夠放心的加入集合 workers.add(w); int s = workers.size(); //更新最大工做線程數,因爲持有鎖,因此無需CAS if (s > largestPoolSize) largestPoolSize = s; //確認新建的worker已被添加到workers集合中 workerAdded = true; } } finally { //千萬不要忘記主動解鎖 mainLock.unlock(); } /** * 一旦新建工做線程被加入工做線程集合中,就意味着其能夠開始幹活了 * 有心的您確定發如今線程start以前已經釋放鎖了 * 緣由在於一旦workerAdded爲true時,說明鎖的目的已經達到 * 根據最小化鎖做用域的原則,線程執行任務無須加鎖,這是種優化 * 也但願您在使用鎖時儘可能保證鎖的做用域最小化 */ if (workerAdded) { /** * 啓動線程,開始幹活啦 * 若您看過筆者的"併發番@Thread一文通"確定知道start()後, * 一旦線程初始化完成便會當即調用run()方法 */ t.start(); //確認該工做線程開始幹活了 workerStarted = true; } } } finally { //若新建工做線程失敗或新建工做線程後沒有成功執行,須要作新增失敗處理 if (!workerStarted) addWorkerFailed(w); } //返回結果代表新建的工做線程是否已啓動執行 return workerStarted; }
結論之啓動調用會經歷一下過程:
(1) worker = new Worker(Runnable) --> (2) thread = newThread(worker) --> (3) thread.start() --> (4) thread.run()[JVM自動調用] --> (5) worker.run() --> (6) threadPoolExecuter.runWorker(worker)
- runWorker() - 執行任務
final void runWorker(Worker w) { //讀取當前線程 -即調用execute()方法的線程(通常是主線程) Thread wt = Thread.currentThread(); //讀取待執行任務 Runnable task = w.firstTask; //清空任務 -> 目的是用來接收下一個任務 w.firstTask = null; /** * 注意Worker自己也是一把不可重入的互斥鎖! * 因爲Worker初始化時state=-1,所以此處的解鎖的目的是: * 將state-1變成0,由於只有state>=0時才容許中斷; * 同時也側面說明在worker調用runWorker()以前是不容許被中斷的, * 即運行前不容許被中斷 */ w.unlock(); //記錄是否因異常/錯誤忽然完成,默認有異常/錯誤發生 boolean completedAbruptly = true; try { /** * 獲取任務並執行任務,取任務分兩種狀況: * 1.初始任務:Worker被初始化時賦予的第一個任務(firstTask) * 2.隊列任務:當firstTask任務執行好後,線程不會被回收,而是以後自動自旋從任務隊列中取任務(getTask) * 此時即體現了線程的複用 */ while (task != null || (task = getTask()) != null) { /** * Worker加鎖的目的是爲了在shutdown()時不要當即終止正在運行的worker, * 由於須要先持有鎖才能終止,而不是爲了處理併發狀況(注意不是全局鎖) * 在shutdownNow()時會當即終止worker,由於其無須持有鎖就能終止 * 關於關閉線程池下文會再具體詳述 */ w.lock(); /** * 當線程池被關閉且主線程非中斷狀態時,須要從新中斷它 * 因爲調用線程通常是主線程,所以這裏是主線程代指調用線程 */ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { /** * 每一個任務執行前都會調用"前置方法", * 在"前置方法"可能會拋出異常, * 結果是退出循環且completedAbruptly=true, * 從而線程死亡,任務未執行(並被丟棄) */ beforeExecute(wt, task); Throwable thrown = null; try { //執行任務 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { /** * 任務執行結束後,會調用"後置方法" * 該方法也可能拋異常從而致使線程死亡 * 但值得注意的是任務已經執行完畢 */ afterExecute(task, thrown); } } finally { //清空任務 help gc task = null; //不管成功失敗任務數都要+1,因爲持有鎖因此無須CAS w.completedTasks++; //必需要主動釋放鎖 w.unlock(); } } //無異常時須要清除異常狀態 completedAbruptly = false; } finally { /** * 工做線程退出循環的緣由有兩個: * 1.因意外的錯誤/異常退出 * 2.getTask()返回空 -> 緣由有四種,下文會詳述 * 工做線程退出循環後,須要執行相對應的回收處理 */ processWorkerExit(w, completedAbruptly); } }
- getTask() - 獲取任務
形成getTask()方法返回null的緣由有5種:
1.線程池被關閉,狀態爲(STOP || TIDYING || TERMINATED)
2.線程池被關閉,狀態爲SHUTDOWN且任務隊列爲空
3.實際工做線程數超過最大工做線程數
4.工做線程知足超時條件後,同時符合下述的任意一種狀況:
4.1 線程池中還存在至少一個其餘可用的工做線程
4.2 線程池中已沒有其餘可用的工做線程但任務隊列爲空
private Runnable getTask() { // 記錄任務隊列的poll()是否超時,默認未超時 boolean timedOut = false; //自旋獲取任務 for (;;) { /** * 線程池會依次判斷五種狀況,知足任意一種就返回null: * 1.線程池被關閉,狀態爲(STOP || TIDYING || TERMINATED) * 2.線程池被關閉,狀態爲SHUTDOWN且任務隊列爲空 * 3.實際工做線程數超過最大工做線程數 * 4.工做線程知足超時條件後,同時符合下述的任意一種狀況: * 4.1 線程池中還存在至少一個其餘可用的工做線程 * 4.2 線程池中已沒有其餘可用的工做線程但任務隊列爲空 */ int c = ctl.get(); int rs = runStateOf(c); /** * 判斷線程池狀態條件,有兩種狀況直接返回null * 1.線程池狀態大於SHUTDOWN(STOP||TIDYING||TERMINATED),說明不容許再執行任務 * - 由於>=STOP以上狀態時不容許接收新任務同時會中斷正在執行中的任務,任務隊列的任務也不執行了 * * 2.線程池狀態爲SHUTDOWN且任務隊列爲空,說明已經無任務可執行 * - 由於SHUTDOWN時還須要執行任務隊列的剩餘任務,只有當無任務纔可退出 */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { /** * 減小一個工做線程數 * 值得注意的是工做線程的回收是放在processWorkerExit()中進行的 * decrementWorkerCount()方法是內部不斷循環執行CAS的,保證最終必定會成功 * 補充:因線程池被關閉而計數減小可能與addWorker()的 * 計數CAS自增發生併發競爭 */ decrementWorkerCount(); return null; } //讀取實際工做線程數 int wc = workerCountOf(c); /** * 判斷是否須要處理超時: * 1.allowCoreThreadTimeOut = true 表示須要回收空閒超時的核心工做線程 * 2.wc > corePoolSize 表示存在空閒超時的非核心工做線程須要回收 */ boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; /** * 有三種狀況會實際工做線程計數-1且直接返回null * * 1.實際工做線程數超過最大線程數 * 2.該工做線程知足空閒超時條件須要被回收: * 2.1 當線程池中還存在至少一個其餘可用的工做線程 * 2.2 線程池中已沒有其餘可用的工做線程但任務隊列爲空 * * 結合2.1和2.2咱們能夠推導出: * * 1.當任務隊列非空時,線程池至少須要維護一個可用的工做線程, * 所以此時即便該工做線程超時也不會被回收掉而是繼續獲取任務 * * 2.當實際工做線程數超標或獲取任務超時時,線程池會由於 * 一直沒有新任務可執行,而逐漸減小線程直到核心線程數爲止; * 若設置allowCoreThreadTimeOut爲true,則減小到1爲止; * * 提示:因爲wc > maximumPoolSize時一定wc > 1,所以無須比較 * (wc > maximumPoolSize && workQueue.isEmpty()) 這種狀況 */ if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { /** * CAS失敗的緣由仍是出現併發競爭,具體參考上文 * 當CAS失敗後,說明實際工做線程數已經發生變化, * 必須從新判斷實際工做線程數和超時狀況 * 所以須要countinue */ if (compareAndDecrementWorkerCount(c)) return null; /** */ continue; } //若知足獲取任務條件,根據是否須要超時獲取會調用不一樣方法 try { /** * 從任務隊列中取任務分兩種: * 1.timed=true 代表須要處理超時狀況 * -> 調用poll(),超過keepAliveTime返回null * 2.timed=fasle 代表無須處理超時狀況 * -> 調用take(),無任務則掛起等待 */ Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); //一旦獲取到任務就返回該任務並退出循環 if (r != null) return r; //當任務爲空時說明poll超時 timedOut = true; /** * 關於中斷異常獲取簡單講一些超出本章範疇的內容 * take()和poll(long timeout, TimeUnit unit)都會throws InterruptedException * 緣由在LockSupport.park(this)不會拋出異常但會響應中斷; * 但ConditionObject的await()會經過reportInterruptAfterWait()響應中斷 * 具體內容筆者會在阻塞隊列相關番中進一步介紹 */ } catch (InterruptedException retry) { /** * 一旦該工做線程被中斷,須要清除超時標記 * 這代表當工做線程在獲取隊列任務時被中斷, * 若您不對中斷異常作任務處理,線程池就默認 * 您但願線程繼續執行,這樣就會重置以前的超時標記 */ timedOut = false; } } }
■ 關閉線程池
- 使用shutdown()關閉線程池最主要執行5個操做:
1.獲取全局鎖
2.CAS自旋變動線程池狀態爲SHUTDOWN
3.中斷全部空閒工做線程(設置中斷標記) -> 注意是空閒
4.釋放全局鎖
5.嘗試終止線程池
/** * 有序關閉線程池 * 在關閉過程當中,以前已提交的任務將被執行(包括正在和隊列中的), * 但新提交的任務會被拒絕 * 若是線程池已經被關閉,調用該方法不會有任何附加效果 */ public void shutdown() { //1.獲取全局鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); //2.CAS自旋變動線程池狀態爲SHUTDOWN advanceRunState(SHUTDOWN); //3.中斷全部空閒工做線程 interruptIdleWorkers(); //專門提供給ScheduledThreadPoolExecutor的鉤子方法 onShutdown(); } finally { //4.釋放全局鎖 mainLock.unlock(); } /** * 5.嘗試終止線程池,此時線程池知足兩個條件: * 1.線程池狀態爲SHUTDOWN * 2.全部空閒工做線程已被中斷 */ tryTerminate(); }
- 使用shutdownNow()關閉線程池最主要執行六個操做:
1.獲取全局鎖
2.CAS自旋變動線程池狀態爲SHUTDOWN
3.中斷全部工做線程(設置中斷標記)
4.將剩餘任務從新放入一個list中並清空任務隊列
5.釋放全局鎖
6.嘗試終止線程池
/** * 嘗試中斷全部工做線程,並返回待處理任務列表集合(從任務隊列中移除) * * 1.若想等待執行中的線程完成任務,可以使用awaitTermination() * 2.因爲取消任務操做是經過Thread#interrupt實現,所以 * 響應中斷失敗的任務可能永遠都不會被終止(謹慎使用!!!) * 響應中斷失敗指的是您選擇捕獲但不處理該中斷異常 */ public List<Runnable> shutdownNow() { List<Runnable> tasks; //1.獲取全局鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); //2.CAS自旋更新線程池狀態爲STOP advanceRunState(STOP); //3.中斷全部工做線程 interruptWorkers(); //4.將剩餘任務從新放入一個list中並清空任務隊列 tasks = drainQueue(); } finally { //5.釋放全局鎖 mainLock.unlock(); } /** * 6.嘗試終止線程池,此時線程池知足兩個條件: * 1.線程池狀態爲STOP * 2.任務隊列爲空 * 注意:此時不必定全部工做線程都被中斷回收,詳述見 * 7.3 tryTerminate */ tryTerminate(); //5.返回待處理任務列表集合 return tasks; }
■ 飽和拒絕策略
線程池的飽和拒絕策略主要用於拒絕任務(但這並不意味着該任務不會被執行),線程池原生提供了四種飽和拒絕策略,基本涵蓋常見的飽和處理場景:
AbortPolicy:默認策略,直接拋出異常
CallerRunsPolicy:只用調用線程執行該任務
DiscardPolicy:直接丟棄任務
DiscardOldestPolicy:丟棄隊尾任務並用線程池從新嘗試執行該任務
全部的拒絕策略都須要實現該拒絕處理器接口,以統一口徑:
/** * 用於拒絕線程池任務的處理器 */ public interface RejectedExecutionHandler { /** * 該方法用於拒絕接受線程池任務 * * 有三種狀況可能調用該方法: * 1.沒有更多的工做線程可用 * 2.任務隊列已滿 * 3.關閉線程池 * * 當沒有其餘處理選擇時,該方法會選擇拋出RejectedExecutionException異常 * 該異常會向上拋出直到execute()的調用者 */ void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
- CallerRunsPolicy
處理規則:新提交任務由調用者線程直接執行
推薦:拒絕策略推薦使用CallerRunsPolicy,理由是該策略不會拋棄任務,也不會拋出異常,而是將任務回退到調用者線程中執行
/** * 不會直接丟棄,而是直接用調用execute()方法的線程執行該方法 * 固然一旦線程池已經被關閉,仍是要丟棄的 * * 補充:值得注意的是全部策略類都是public的靜態內部類, * 其目的應該是告知使用者 -> 該類與線程池相關但無需線程池實例即可直接使用 */ public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } /** * 直接使用調用該方法的線程執行任務 * 除非線程池被關閉時纔會丟棄該任務 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { //一旦線程池被關閉,丟棄該任務 if (!e.isShutdown()) { //注意此時不是線程池執行該任務 r.run(); } } }
- AbortPolicy
處理規則:直接拋出RejectedExecutionException異常
/** * 簡單、粗暴的直接拋出RejectedExecutionException異常 */ public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } /** * 直接拋出異常,但r.toString()方法會告訴你哪一個任務失敗了 * 更人性化的一點是 e.toString()方法還會告訴你: * 線程池的狀態、工做線程數、隊列長度、已完成任務數 * 建議如果不處理異常起碼也要在日誌裏面打印一下,留個案底 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException( "Task " + r.toString() + " rejected from " + e.toString()); } }
- DiscardPolicy
處理規則:根據LIFO(後進先出)規則直接丟棄最新提交的任務
/** * 直接丟棄任務 * 這個太狠了,連個案底都沒有,慎用啊 */ public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } /** * 無做爲即爲丟棄 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
- DiscardOldestPolicy
處理規則:根據LRU(最近最少使用)規則丟棄最後一個任務,而後嘗試執行新提交的任務
/** * 比起直接丟棄,該類會丟棄隊列裏最後一個但仍未被處理的任務, * 而後會從新調用execute()方法處理當前任務 * 除非線程池被關閉時纔會丟棄該任務 * 此類充分證實了"來得早不如來的巧" */ public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } /** * 丟棄隊列裏最近的一個任務,並執行當前任務 * 除非線程池被關閉時纔會丟棄該任務 * 緣由是隊列是遵循先進先出FIFO原則,poll()會彈出隊尾元素 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { //一旦線程池被關閉,直接丟棄 if (!e.isShutdown()) { //彈出隊尾元素 e.getQueue().poll(); //直接用線程池執行當前任務 e.execute(r); } } }