咱們知道,線程池幫咱們重複管理線程,避免建立大量的線程增長開銷。
合理的使用線程池可以帶來3個很明顯的好處:
1.下降資源消耗:經過重用已經建立的線程來下降線程建立和銷燬的消耗
2.提升響應速度:任務到達時不須要等待線程建立就能夠當即執行。
3.提升線程的可管理性:線程池能夠統一管理、分配、調優和監控。
java源生的線程池,實現於ThreadPoolExecutor類,這也是咱們今天討論的重點java
Jdk使用ThreadPoolExecutor類來建立線程池,咱們來看看它的構造方法。安全
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
TimeUnit unit, //存活時間的單位,有以下幾種選擇併發
TimeUnit.DAYS; //天 TimeUnit.HOURS; //小時 TimeUnit.MINUTES; //分鐘 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //納秒
BlockingQueue<Runnable> workQueue, //保存待執行任務的隊列,常見的也有以下幾種:less
ArrayBlockingQueue; LinkedBlockingQueue; SynchronousQueue; PriorityBlockingQueue
ThreadPoolExecutor中有一個控制狀態的屬性叫ctl,它是一個AtomicInteger類型的變量,它一個int值能夠儲存兩個概念的信息:ide
runState:代表當前線程池的狀態,經過workerCountOf方法得到,最後存放在ctl的高3bit中,他們是整個線程池的運行生命週期,有以下取值,分別的含義是:函數
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
COUNT_BITS=32(integer的size)-3=29,因而五種狀態左移29位分別是:oop
而ThreadPoolExecutor是經過runStateOf和workerCountOf得到者兩個概念的值的。ui
runStateOf和workerCountOf方法是如何剝離出ctl變量的兩個有效值呢?這其中咱們能夠看到CAPACITY是實現一個字段存兩個值的最重要的字段。this
CAPACITY=(1 << COUNT_BITS) – 1 轉成二進制爲:000 11111111111111111111111111111,他是線程池理論上能夠容許的最大的線程數。
因此很明顯,它的重點在於,其高3bit爲0,低29bit爲1;
這樣,workderCountOf方法中,CAPACITY和ctl進行&運算時,它能得到高3位都是0,低29位和ctl低29位相同的值,這個值就是workerCount;
同理,runStateOf方法,CAPACITY的取反和ctl進行&操做,得到高3位和ctl高三位相等,低29位都爲0的值,這個值就是runState;atom
/** * The queue used for holding tasks and handing off to worker * threads. We do not require that workQueue.poll() returning * null necessarily means that workQueue.isEmpty(), so rely * solely on isEmpty to see if the queue is empty (which we must * do for example when deciding whether to transition from * SHUTDOWN to TIDYING). This accommodates special-purpose * queues such as DelayQueues for which poll() is allowed to * return null even if it may later return non-null when delays * expire. */ private final BlockingQueue<Runnable> workQueue;
一個BlockingQueue<Runnable>隊列,自己的結構能夠保證訪問的線程安全(這裏不展開了)。這是一個排隊等待隊列。當咱們線程池裏線程達到corePoolSize的時候,一些須要等待執行的線程就放在這個隊列裏等待。
/** * Set containing all worker threads in pool. Accessed only when * holding mainLock. */ private final HashSet<Worker> workers = new HashSet<Worker>();
一個HashSet<Worker>的集合。線程池裏全部能夠當即執行的線程都放在這個集合裏。這也是咱們直觀理解的線程的池子。
private final ReentrantLock mainLock = new ReentrantLock();
mainLock是線程池的主鎖,是可重入鎖,當要操做workers set這個保持線程的HashSet時,須要先獲取mainLock,還有當要處理largestPoolSize、completedTaskCount這類統計數據時須要先獲取mainLock
private int largestPoolSize; //用來記錄線程池中曾經出現過的最大線程數 private long completedTaskCount; //用來記錄已經執行完畢的任務個數 private volatile boolean allowCoreThreadTimeOut; //是否容許爲核心線程設置存活時間
Worker類是線程池中具化一個線程的對象,是線程池的核心,咱們來看看源碼:
/** * Class Worker mainly maintains interrupt control state for * threads running tasks, along with other minor bookkeeping. * This class opportunistically extends AbstractQueuedSynchronizer * to simplify acquiring and releasing a lock surrounding each * task execution. This protects against interrupts that are * intended to wake up a worker thread waiting for a task from * instead interrupting a task being run. We implement a simple * non-reentrant mutual exclusion lock rather than use * ReentrantLock because we do not want worker tasks to be able to * reacquire the lock when they invoke pool control methods like * setCorePoolSize. Additionally, to suppress interrupts until * the thread actually starts running tasks, we initialize lock * state to a negative value, and clear it upon start (in * runWorker). */ private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { //設置AQS的同步狀態private volatile int state,是一個計數器,大於0表明鎖已經被獲取 // 在調用runWorker()前,禁止interrupt中斷,在interruptIfStarted()方法中會判斷 getState()>=0 setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this);//根據當前worker建立一個線程對象 //當前worker自己就是一個runnable任務,也就是不會用參數的firstTask建立線程,而是調用當前worker.run()時調用firstTask.run() //後面在addworker中,咱們會啓動worker對象中組合的Thread,而咱們的執行邏輯runWorker方法是在worker的run方法中被調用。 //爲何執行thread的run方法會調用worker的run方法呢,緣由就是在這裏進行了注入,將worker自己this注入到了thread中 } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); }//runWorker()是ThreadPoolExecutor的方法 // Lock methods // // The value 0 represents the unlocked state. 0表明「沒被鎖定」狀態 // The value 1 represents the locked state. 1表明「鎖定」狀態 protected boolean isHeldExclusively() { return getState() != 0; } /** * 嘗試獲取鎖 * 重寫AQS的tryAcquire(),AQS原本就是讓子類來實現的 */ protected boolean tryAcquire(int unused) { //嘗試一次將state從0設置爲1,即「鎖定」狀態,但因爲每次都是state 0->1,而不是+1,那麼說明不可重入 //且state==-1時也不會獲取到鎖 if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } /** * 嘗試釋放鎖 * 不是state-1,而是置爲0 */ protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } /** * 中斷(若是運行) * shutdownNow時會循環對worker線程執行 * 且不須要獲取worker鎖,即便在worker運行時也能夠中斷 */ void interruptIfStarted() { Thread t; //若是state>=0、t!=null、且t沒有被中斷 //new Worker()時state==-1,說明不能中斷 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
咱們看worker類時,會發現最重要的幾個部分在於它裏面定義了一個Thread thread和Runnable firstTask。看到這裏,咱們可能會比較奇怪,咱們只是要一個能夠執行的線程,這裏放一個Thread和一個Runnable的變量作什麼呢?
其實之因此Worker本身實現Runnable,並建立Thread,在firstTask外包一層,是由於要經過Worker負責控制中斷,而firstTask這個工做任務只是負責執行業務,worker的run方法調用了runWorker方法,在這裏面,worker裏的firstTask的run方法被執行。稍後咱們會聚焦這個執行任務的runWorker方法。
好了,基本上咱們將線程池的幾個主角,ctl,workQueue,workers,Worker簡單介紹了一遍,如今,咱們來看看線程池是怎麼玩的。
這是線程池實現類外露供給外部實現提交線程任務command的核心方法,對於無需瞭解線程池內部的使用者來講,這個方法就是把某個任務交給線程池,正常狀況下,這個任務會在將來某個時刻被執行,實現和註釋以下:
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * 在將來的某個時刻執行給定的任務。這個任務用一個新線程執行,或者用一個線程池中已經存在的線程執行 * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * 若是任務沒法被提交執行,要麼是由於這個Executor已經被shutdown關閉,要麼是已經達到其容量上限,任務會被當前的RejectedExecutionHandler處理 * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * 若是運行的線程少於corePoolSize,嘗試開啓一個新線程去運行command,command做爲這個線程的第一個任務 * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * 若是任務成功放入隊列,咱們仍須要一個雙重校驗去確認是否應該新建一個線程(由於可能存在有些線程在咱們上次檢查後死了) * 或者 從咱們進入這個方法後,pool被關閉了 * 因此咱們須要再次檢查state,若是線程池中止了須要回滾入隊列,若是池中沒有線程了,新開啓 一個線程 * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. * 若是沒法將任務入隊列(可能隊列滿了),須要新開區一個線程(本身:往maxPoolSize發展) * 若是失敗了,說明線程池shutdown 或者 飽和了,因此咱們拒絕任務 */ int c = ctl.get(); // 一、若是當前線程數少於corePoolSize(多是因爲addWorker()操做已經包含對線程池狀態的判斷,如此處沒加,而入workQueue前加了) if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; /** * 沒有成功addWorker(),再次獲取c(凡是須要再次用ctl作判斷時,都會再次調用ctl.get()) * 失敗的緣由多是: * 一、線程池已經shutdown,shutdown的線程池再也不接收新任務 * 二、workerCountOf(c) < corePoolSize 判斷後,因爲併發,別的線程先建立了worker線程,致使workerCount>=corePoolSize */ c = ctl.get(); } /** * 二、若是線程池RUNNING狀態,且入隊列成功 */ if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); /** * 再次校驗放入workerQueue中的任務是否能被執行 * 一、若是線程池不是運行狀態了,應該拒絕添加新任務,從workQueue中刪除任務 * 二、若是線程池是運行狀態,或者從workQueue中刪除任務失敗(恰好有一個線程執行完畢,並消耗了這個任務), * 確保還有線程執行任務(只要有一個就夠了) */ //若是再次校驗過程當中,線程池不是RUNNING狀態,而且remove(command)--workQueue.remove()成功,拒絕當前command if (! isRunning(recheck) && remove(command)) reject(command); //若是當前worker數量爲0,經過addWorker(null, false)建立一個線程,其任務爲null //爲何只檢查運行的worker數量是否是0呢?? 爲何不和corePoolSize比較呢?? //只保證有一個worker線程能夠從queue中獲取任務執行就好了?? //由於只要還有活動的worker線程,就能夠消費workerQueue中的任務 else if (workerCountOf(recheck) == 0) addWorker(null, false);//第一個參數爲null,說明只爲新建一個worker線程,沒有指定firstTask ////第二個參數爲true表明佔用corePoolSize,false佔用maxPoolSize } /** * 三、若是線程池不是running狀態 或者 沒法入隊列 * 嘗試開啓新線程,擴容至maxPoolSize,若是addWork(command, false)失敗了,拒絕當前command */ else if (!addWorker(command, false)) reject(command); }
咱們能夠簡單概括以下(注:圖來源見水印,謝謝大神的概括):
在execute方法中,咱們看到核心的邏輯是由addWorker方法來實現的,當咱們將一個任務提交給線程池,線程池會如何處理,就是主要由這個方法加以規範:
該方法有兩個參數:
排列組合,addWorker方法有4種傳參的方式:
一、addWorker(command, true) 二、addWorker(command, false) 三、addWorker(null, false) 四、addWorker(null, true)
在execute方法中就使用了前3種,結合這個核心方法進行如下分析
第一個:線程數小於corePoolSize時,放一個須要處理的task進Workers Set。若是Workers Set長度超過corePoolSize,就返回false 第二個:當隊列被放滿時,就嘗試將這個新來的task直接放入Workers Set,而此時Workers Set的長度限制是maximumPoolSize。若是線程池也滿了的話就返回false 第三個:放入一個空的task進workers Set,長度限制是maximumPoolSize。這樣一個task爲空的worker在線程執行的時候會去任務隊列裏拿任務,這樣就至關於建立了一個新的線程,只是沒有立刻分配任務 第四個:這個方法就是放一個null的task進Workers Set,並且是在小於corePoolSize時,若是此時Set中的數量已經達到corePoolSize那就返回false,什麼也不幹。實際使用中是在prestartAllCoreThreads()方法,這個方法用來爲線程池預先啓動corePoolSize個worker等待從workQueue中獲取任務執行
/** * Checks if a new worker can be added with respect to current * pool state and the given bound (either core or maximum). If so, * the worker count is adjusted accordingly, and, if possible, a * new worker is created and started, running firstTask as its * first task. This method returns false if the pool is stopped or * eligible to shut down. It also returns false if the thread * factory fails to create a thread when asked. If the thread * creation fails, either due to the thread factory returning * null, or due to an exception (typically OutOfMemoryError in * Thread.start()), we roll back cleanly. * 檢查根據當前線程池的狀態和給定的邊界(core or maximum)是否能夠建立一個新的worker * 若是是這樣的話,worker的數量作相應的調整,若是可能的話,建立一個新的worker並啓動,參數中的firstTask做爲worker的第一個任務 * 若是方法返回false,可能由於pool已經關閉或者調用過了shutdown * 若是線程工廠建立線程失敗,也會失敗,返回false * 若是線程建立失敗,要麼是由於線程工廠返回null,要麼是發生了OutOfMemoryError * * @param firstTask the task the new thread should run first (or * null if none). Workers are created with an initial first task * (in method execute()) to bypass queuing when there are fewer * than corePoolSize threads (in which case we always start one), * or when the queue is full (in which case we must bypass queue). * Initially idle threads are usually created via * prestartCoreThread or to replace other dying workers. * * @param core if true use corePoolSize as bound, else * maximumPoolSize. (A boolean indicator is used here rather than a * value to ensure reads of fresh values after checking other pool * state). * @return true if successful */ private boolean addWorker(Runnable firstTask, boolean core) { //外層循環,負責判斷線程池狀態 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. /** * 線程池的state越小越是運行狀態,runnbale=-1,shutdown=0,stop=1,tidying=2,terminated=3 * 要想這個if爲true,線程池state必須已經至少是shutdown狀態了 * 這時候如下3個條件任意一個是false都會進入if語句,即沒法addWorker(): * 1,rs == SHUTDOWN (隱含:rs>=SHUTDOWN)false狀況: 線程池狀態已經超過shutdown, * 多是stop、tidying、terminated其中一個,即線程池已經終止 * 2,firstTask == null (隱含:rs==SHUTDOWN)false狀況: firstTask不爲空,rs==SHUTDOWN 且 firstTask不爲空, * return false,場景是在線程池已經shutdown後,還要添加新的任務,拒絕 * 3,! workQueue.isEmpty() (隱含:rs==SHUTDOWN,firstTask==null)false狀況: workQueue爲空, * 當firstTask爲空時是爲了建立一個沒有任務的線程,再從workQueue中獲取任務, * 若是workQueue已經爲空,那麼就沒有添加新worker線程的必要了 * return false, */ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; //內層循環,負責worker數量+1 for (;;) { int wc = workerCountOf(c); //入參core在這裏起做用,表示加入的worker是加入corePool仍是非corepool,換句話說,受到哪一個size的約束 //若是worker數量>線程池最大上限CAPACITY(即便用int低29位能夠容納的最大值) //或者( worker數量>corePoolSize 或 worker數量>maximumPoolSize ),即已經超過了給定的邊界,不添加worker if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //CAS嘗試增長線程數,,若是成功加了wc,那麼break跳出檢查 //若是失敗,證實有競爭,那麼從新到retry。 if (compareAndIncrementWorkerCount(c)) break retry; //若是不成功,從新獲取狀態繼續檢查 c = ctl.get(); // Re-read ctl //若是狀態不等於以前獲取的state,跳出內層循環,繼續去外層循環判斷 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop // else CAS失敗時由於workerCount改變了,繼續內層循環嘗試CAS對worker數量+1 } } //worker數量+1成功的後續操做 // 添加到workers Set集合,並啓動worker線程 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //新建worker//構造方法作了三件事//一、設置worker這個AQS鎖的同步狀態state=-1 w = new Worker(firstTask); //二、將firstTask設置給worker的成員變量firstTask //三、使用worker自身這個runnable,調用ThreadFactory建立一個線程,並設置給worker的成員變量thread final Thread t = w.thread; if (t != null) { //獲取重入鎖,而且鎖上 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); // rs!=SHUTDOWN ||firstTask!=null // 若是線程池在運行running<shutdown 或者 // 線程池已經shutdown,且firstTask==null(多是workQueue中仍有未執行完成的任務,建立沒有初始任務的worker線程執行) // worker數量-1的操做在addWorkerFailed() if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // // precheck that t is startable 線程已經啓動,拋非法線程狀態異常 throw new IllegalThreadStateException(); workers.add(w); //設置最大的池大小largestPoolSize,workerAdded設置爲true int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) {//若是往HashSet中添加worker成功,啓動線程 //經過t.start()方法正式執行線程。在這裏一個線程纔算是真正的執行起來了。 t.start(); workerStarted = true; } } } finally { //若是啓動線程失敗 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
一樣的,咱們能夠概括一下:
在addWorker方法中,咱們將一個新增進去的worker所組合的線程屬性thread啓動了,但咱們知道,在worker的構造方法中,它將本身自己注入到了thread的target屬性裏,因此繞了一圈,線程啓動後,調用的仍是worker的run方法,而在這裏面,runWorker定義了線程執行的邏輯:
/** * Main worker run loop. Repeatedly gets tasks from queue and * executes them, while coping with a number of issues: * * 1. We may start out with an initial task, in which case we * don't need to get the first one. Otherwise, as long as pool is * running, we get tasks from getTask. If it returns null then the * worker exits due to changed pool state or configuration * parameters. Other exits result from exception throws in * external code, in which case completedAbruptly holds, which * usually leads processWorkerExit to replace this thread. * 咱們可能使用一個初始化任務開始,即firstTask爲null * 而後只要線程池在運行,咱們就從getTask()獲取任務 * 若是getTask()返回null,則worker因爲改變了線程池狀態或參數配置而退出 * 其它退出由於外部代碼拋異常了,這會使得completedAbruptly爲true,這會致使在processWorkerExit()方法中替換當前線程 * * 2. Before running any task, the lock is acquired to prevent * other pool interrupts while the task is executing, and then we * ensure that unless pool is stopping, this thread does not have * its interrupt set. * 在任何任務執行以前,都須要對worker加鎖去防止在任務運行時,其它的線程池中斷操做 * clearInterruptsForTaskRun保證除非線程池正在stoping,線程不會被設置中斷標示 * * 3. Each task run is preceded by a call to beforeExecute, which * might throw an exception, in which case we cause thread to die * (breaking loop with completedAbruptly true) without processing * the task. * 每一個任務執行前會調用beforeExecute(),其中可能拋出一個異常,這種狀況下會致使線程die(跳出循環,且completedAbruptly==true),沒有執行任務 * 由於beforeExecute()的異常沒有cache住,會上拋,跳出循環 * * 4. Assuming beforeExecute completes normally, we run the task, * gathering any of its thrown exceptions to send to afterExecute. * We separately handle RuntimeException, Error (both of which the * specs guarantee that we trap) and arbitrary Throwables. * Because we cannot rethrow Throwables within Runnable.run, we * wrap them within Errors on the way out (to the thread's * UncaughtExceptionHandler). Any thrown exception also * conservatively causes thread to die. * * 5. After task.run completes, we call afterExecute, which may * also throw an exception, which will also cause thread to * die. According to JLS Sec 14.20, this exception is the one that * will be in effect even if task.run throws. * * The net effect of the exception mechanics is that afterExecute * and the thread's UncaughtExceptionHandler have as accurate * information as we can provide about any problems encountered by * user code. * * @param w the worker */
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts //標識線程是否是異常終止的 boolean completedAbruptly = true; try { //task不爲null狀況是初始化worker時,若是task爲null,則去隊列中取線程--->getTask() //能夠看到,只要getTask方法被調用且返回null,那麼worker一定被銷燬,而肯定一個線程是否應該被銷燬的邏輯,在getTask方法中 while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //線程開始執行以前執行此方法,能夠實現Worker未執行退出,本類中未實現 beforeExecute(wt, task); Throwable thrown = null; try { task.run();//runWorker方法最本質的存在乎義,就是調用task的run方法 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //線程執行後執行,能夠實現標識Worker異常中斷的功能,本類中未實現 afterExecute(task, thrown); } } finally { task = null;//運行過的task標null w.completedTasks++; w.unlock(); } } //標識線程不是異常終止的,是由於不知足while條件,被迫銷燬的 completedAbruptly = false; } finally { //處理worker退出的邏輯 processWorkerExit(w, completedAbruptly); } }
咱們概括:
runWorker方法中的getTask()方法是線程處理完一個任務後,從隊列中獲取新任務的實現,也是處理判斷一個線程是否應該被銷燬的邏輯所在:
/** * Performs blocking or timed wait for a task, depending on * current configuration settings, or returns null if this worker * must exit because of any of: 如下狀況會返回null * 1. There are more than maximumPoolSize workers (due to * a call to setMaximumPoolSize). * 超過了maximumPoolSize設置的線程數量(由於調用了setMaximumPoolSize()) * 2. The pool is stopped. * 線程池被stop * 3. The pool is shutdown and the queue is empty. * 線程池被shutdown,而且workQueue空了 * 4. This worker timed out waiting for a task, and timed-out * workers are subject to termination (that is, * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) * both before and after the timed wait. * 線程等待任務超時 * * @return task, or null if the worker must exit, in which case * workerCount is decremented * 返回null表示這個worker要結束了,這種狀況下workerCount-1 */ private Runnable getTask() { // timedOut 主要是判斷後面的poll是否要超時 boolean timedOut = false; // Did the last poll() time out? /** * 用於判斷線程池狀態 */ for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. /** * 對線程池狀態的判斷,兩種狀況會workerCount-1,而且返回null * 1,線程池狀態爲shutdown,且workQueue爲空(反映了shutdown狀態的線程池仍是要執行workQueue中剩餘的任務的) * 2,線程池狀態爲>=stop(只有TIDYING和TERMINATED會大於stop)(shutdownNow()會致使變成STOP)(此時不用考慮workQueue的狀況) */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount();//循環的CAS減小worker數量,直到成功 return null; } int wc = workerCountOf(c); // Are workers subject to culling? //allowCoreThreadTimeOut字段,表示是否容許核心線程超過閒置時間後被摧毀,默認爲false //咱們前面說過,若是getTask方法返回null,那麼這個worker只有被銷燬一途 //因而這個timed有3種狀況 //(1)當線程數沒有超過核心線程數,且默認allowCoreThreadTimeOut爲false時 // timed值爲false。看下面if的判斷邏輯,除非目前線程數大於最大值,不然下面的if始終進不去,該方法不可能返回null,worker也就不會被銷燬。 // 由於前提"線程數不超過核心線程數"與"線程數大於最大值"兩個命題互斥,因此(1)狀況,邏輯進入下面的if(返回null的線程銷燬邏輯)的可能性不存在。 // 也就是說,當線程數沒有超過核心線程數時,線程不會被銷燬。 //(2)噹噹前線程數超過核心線程數,且默認allowCoreThreadTimeOut爲false時//timed值爲true。 //(3)若是allowCoreThreadTimeOut爲true,則timed始終爲true boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //wc > maximumPoolSize則必銷燬,由於這狀況下,wc>1也確定爲true //wc <= maximumPoolSize,且(timed && timedOut) = true,這種狀況下通常也意味着worker要被銷燬,由於超時通常是由阻塞隊列爲空形成的,因此workQueue.isEmpty()也大機率爲真,進入if邏輯。 //通常狀況是這樣,那不通常的狀況呢?阻塞隊列沒有爲空,可是由於一些緣由,仍是超時了,這時候取決於wc > 1,它爲真就銷燬,爲假就不銷燬。 // 也就是說,若是阻塞隊列還有任務,可是wc=1,線程池裏只剩下本身這個線程了,那麼就不能銷燬,這個if不知足,咱們的代碼繼續往下走 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //若是timed爲true那麼使用poll取線程。不然使用take() Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //workQueue.poll():若是在keepAliveTime時間內,阻塞隊列仍是沒有任務,返回null workQueue.take(); //workQueue.take():若是阻塞隊列爲空,當前線程會被掛起等待;當隊列中有任務加入時,線程被喚醒,take方法返回任務 //若是正常返回,那麼返回取到的task。 if (r != null) return r; //不然,設爲超時,從新執行循環, timedOut = true; } catch (InterruptedException retry) { //在阻塞從workQueue中獲取任務時,能夠被interrupt()中斷,代碼中捕獲了InterruptedException,重置timedOut爲初始值false,再次執行第1步中的判斷,知足就繼續獲取任務,不知足return null,會進入worker退出的流程 timedOut = false; } }
概括:
在runWorker方法中,咱們看到當不知足while條件後,線程池會執行退出線程的操做,這個操做,就封裝在processWorkerExit方法中。
/** * Performs cleanup and bookkeeping for a dying worker. Called * only from worker threads. Unless completedAbruptly is set, * assumes that workerCount has already been adjusted to account * for exit. This method removes thread from worker set, and * possibly terminates the pool or replaces the worker if either * it exited due to user task exception or if fewer than * corePoolSize workers are running or queue is non-empty but * there are no workers. * * @param w the worker * @param completedAbruptly if the worker died due to user exception */ private void processWorkerExit(Worker w, boolean completedAbruptly) { //參數: //worker: 要結束的worker //completedAbruptly: 是否忽然完成(是否由於異常退出) /** * 一、worker數量-1 * 若是是忽然終止,說明是task執行時異常狀況致使,即run()方法執行時發生了異常,那麼正在工做的worker線程數量須要-1 * 若是不是忽然終止,說明是worker線程沒有task可執行了,不用-1,由於已經在getTask()方法中-1了 */ if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted 代碼和註釋正好相反啊 decrementWorkerCount(); /** * 二、從Workers Set中移除worker */ final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; //把worker的完成任務數加到線程池的完成任務數 workers.remove(w); //從HashSet<Worker>中移除 } finally { mainLock.unlock(); } /** * 三、在對線程池有負效益的操做時,都須要「嘗試終止」線程池 * 主要是判斷線程池是否知足終止的狀態 * 若是狀態知足,但線程池還有線程,嘗試對其發出中斷響應,使其能進入退出流程 * 沒有線程了,更新狀態爲tidying->terminated */ tryTerminate(); /** * 四、是否須要增長worker線程 * 線程池狀態是running 或 shutdown * 若是當前線程是忽然終止的,addWorker() * 若是當前線程不是忽然終止的,但當前線程數量 < 要維護的線程數量,addWorker() * 故若是調用線程池shutdown(),直到workQueue爲空前,線程池都會維持corePoolSize個線程,而後再逐漸銷燬這corePoolSize個線程 */ int c = ctl.get(); //若是狀態是running、shutdown,即tryTerminate()沒有成功終止線程池,嘗試再添加一個worker if (runStateLessThan(c, STOP)) { //不是忽然完成的,即沒有task任務能夠獲取而完成的,計算min,並根據當前worker數量判斷是否須要addWorker() if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //allowCoreThreadTimeOut默認爲false,即min默認爲corePoolSize //若是min爲0,即不須要維持核心線程數量,且workQueue不爲空,至少保持一個線程 if (min == 0 && ! workQueue.isEmpty()) min = 1; //若是線程數量大於最少數量,直接返回,不然下面至少要addWorker一個 if (workerCountOf(c) >= min) return; // replacement not needed } //添加一個沒有firstTask的worker //只要worker是completedAbruptly忽然終止的,或者線程數量小於要維護的數量,就新添一個worker線程,即便是shutdown狀態 addWorker(null, false); } }
總而言之:若是線程池尚未徹底終止,就仍須要保持必定數量的線程。
線程池狀態是running 或 shutdown的狀況下:
A、若是當前線程是忽然終止的,addWorker() B、若是當前線程不是忽然終止的,但當前線程數量 < 要維護的線程數量,addWorker() 故若是調用線程池shutdown(),直到workQueue爲空前,線程池都會維持corePoolSize個線程,而後再逐漸銷燬這corePoolSize個線程
前面咱們講過execute方法,其做用是將一個任務提交給線程池,以期在將來的某個時間點被執行。
submit方法在做用上,和execute方法是同樣的,將某個任務提交給線程池,讓線程池調度線程去執行它。
那麼它和execute方法有什麼區別呢?咱們來看看submit方法的源碼:
submit方法的實如今ThreadPoolExecutor的父類AbstractExecutorService類中,有三種重載方法:
/** * 提交一個 Runnable 任務用於執行,並返回一個表示該任務的 Future。該Future的get方法在成功完成時將會返回null。 * submit 參數: task - 要提交的任務 返回:表示任務等待完成的 Future * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } /** * 提交一個Runnable 任務用於執行,並返回一個表示該任務的 Future。該 Future 的 get 方法在成功完成時將會返回給定的結果。 * submit 參數: task - 要提交的任務 result - 完成任務時要求返回的結果 * 返回: 表示任務等待完成的 Future * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } /** * 提交一個Callable的任務用於執行,返回一個表示任務的未決結果的 Future。該 Future 的 get 方法在成功完成時將會返回該任務的結果。 * 若是想當即阻塞任務的等待,則可使用 result = exec.submit(aCallable).get(); 形式的構造。 * 參數: task - 要提交的任務 返回: 表示任務等待完成的Future * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
源碼很簡單,submit方法,將任務task封裝成FutureTask(newTaskFor方法中就是new了一個FutureTask),而後調用execute。因此submit方法和execute的全部區別,都在這FutureTask所帶來的差別化實現上。
總而言之,submit方法將一個任務task用future模式封裝成FutureTask對象,提交給線程執行,並將這個FutureTask對象返回,以供主線程在該任務被線程池執行以後獲得執行結果。
注意,得到執行結果的方法FutureTask.get(),會阻塞執行該方法的線程,尤爲是當任務被DiscardPolicy策略和DiscardOldestPolicy拒絕的時候,get方法會一直阻塞在那裏,因此咱們最好使用自帶超時時間的future。
講完了線程池的基本運轉過程,在方法章的最後,咱們來看看負責線程池生命週期最後收尾工做的幾個重要方法,首先是shutdown方法。
/** * Initiates an orderly shutdown in which previously submitted * tasks are executed, but no new tasks will be accepted. * Invocation has no additional effect if already shut down. * * <p>This method does not wait for previously submitted tasks to * complete execution. Use {@link #awaitTermination awaitTermination} * to do that. * 開始一個順序的shutdown操做,shutdown以前被執行的已提交任務,新的任務不會再被接收了。若是線程池已經被shutdown了,該方法的調用沒有其餘任何效果了。 * **該方法不會等待以前已經提交的任務執行完畢**,awaitTermination方法纔有這個效果。 * * @throws SecurityException {@inheritDoc} */ public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //判斷是否能夠操做關閉目標線程。 checkShutdownAccess(); //advanceRunState方法,參數:目標狀態;做用:一直執行,直到成功利用CAS將狀態置爲目標值。 //設置線程池狀態爲SHUTDOWN,此處以後,線程池中不會增長新Task advanceRunState(SHUTDOWN); //中斷全部的空閒線程 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } //嘗試進行terminate操做,但其實咱們上面將狀態置爲shutdown,就已經算是「停止」了一個線程池了,它不會再執行任務,於外部而言,已經失去了做用。而這裏,也只是嘗試去將線程池的狀態一擼到底而已,並非必定要terminate掉。該方法咱們後面會說到。 tryTerminate(); }
咱們能夠看到,shutdown方法只不過是中斷喚醒了全部阻塞的線程,而且把線程池狀態置爲shutdown,正如註釋所說的,它沒有等待全部正在執行任務的線程執行完任務,把狀態置爲shutdown,已經足夠線程池喪失基本的功能了。
在該方法中,線程池如何中斷線程是咱們最須要關心的,咱們來看一下interruptIdleWorkers方法:
private void interruptIdleWorkers(boolean onlyOne) {//參數onlyOne表示是否值中斷一個線程就退出,在shutdown中該值爲false。 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //遍歷workers 對全部worker作中斷處理。 for (Worker w : workers) { Thread t = w.thread; // w.tryLock()對Worker獲取鎖,由於正在執行的worker已經加鎖了(見runWorker方法,w.lock()語句) //因此這保證了正在運行執行Task的Worker不會被中斷。只有阻塞在getTask方法的空閒線程纔會進這個if判斷(被中斷),但中斷不表明線程馬上中止,它要繼續處理到阻塞隊列爲空時纔會被銷燬。 if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
咱們能夠看到,在中斷方法中,咱們調用了worker的tryLock方法去嘗試獲取worker的鎖,因此咱們說,worker類這一層的封裝,是用來控制線程中斷的,正在執行任務的線程已經上了鎖,沒法被中斷,只有在獲取阻塞隊列中的任務的線程(咱們稱爲空閒線程)纔會有被中斷的可能。
以前咱們看過getTask方法,在這個方法中, worker是不加鎖的,因此能夠被中斷。咱們爲何說「中斷不表明線程馬上中止,它要繼續處理到阻塞隊列爲空時纔會被銷燬」呢?具體邏輯,咱們再來看一下getTask的源碼,以及咱們的註釋(咱們模擬中斷髮生時的場景):
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? /** * 當執行過程當中拋出InterruptedException 的時候,該異常被catch住,邏輯從新回到這個for循環 * catch塊在getTask方法的最後。 */ for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. /** * 由於邏輯是在拋出中斷異常後來到這裏的,那說明線程池的狀態已經在shutdown方法中被置爲shutdown了,rs >= SHUTDOWN爲true,rs >=STOP爲false(只有TIDYING和TERMINATED狀態會大於stop) * 這時候,若是workQueue爲空,判斷爲真,線程被銷燬。 * 不然,workQueue爲非空,判斷爲假,線程不會進入銷燬邏輯。 */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount();//循環的CAS減小worker數量,直到成功 return null; } int wc = workerCountOf(c); // Are workers subject to culling? //由於在catch塊中,timeOut已經爲false了。 //因此只要不發生當前線程數超過最大線程數這種極端狀況,命題(wc > maximumPoolSize || (timed && timedOut)必定爲false,線程依舊不被銷燬。 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //繼續執行正常的從阻塞隊列中取任務的邏輯,直到阻塞隊列完全爲空,這時候,上面第一個if判斷符合,線程被銷燬,壽命完全結束。 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); //若是正常返回,那麼返回取到的task。 if (r != null) return r; //不然,設爲超時,從新執行循環, timedOut = true; } catch (InterruptedException retry) { //捕獲中斷異常 timedOut = false; } } }
總結:正阻塞在getTask()獲取任務的worker在被中斷後,會拋出InterruptedException,再也不阻塞獲取任務。捕獲中斷異常後,將繼續循環到getTask()最開始的判斷線程池狀態的邏輯,當線程池是shutdown狀態,且workQueue.isEmpty時,return null,進行worker線程退出邏輯。
因此,這就是咱們爲何說,shutdown方法不會馬上中止線程池,它的做用是阻止新的任務被添加進來(邏輯在addWorker方法的第一個if判斷中,能夠返回去看一下),而且繼續處理完剩下的任務,而後tryTerminated,嘗試關閉。
/** * Transitions to TERMINATED state if either (SHUTDOWN and pool * and queue empty) or (STOP and pool empty). If otherwise * eligible to terminate but workerCount is nonzero, interrupts an * idle worker to ensure that shutdown signals propagate. This * method must be called following any action that might make * termination possible -- reducing worker count or removing tasks * from the queue during shutdown. The method is non-private to * allow access from ScheduledThreadPoolExecutor. * 在如下狀況將線程池變爲TERMINATED終止狀態 * shutdown 且 正在運行的worker 和 workQueue隊列 都empty * stop 且 沒有正在運行的worker * * 這個方法必須在任何可能致使線程池終止的狀況下被調用,如: * 減小worker數量 * shutdown時從queue中移除任務 * * 這個方法不是私有的,因此容許子類ScheduledThreadPoolExecutor調用 */ final void tryTerminate() { for (;;) { int c = ctl.get(); /** * 線程池是否須要終止 * 若是如下3中狀況任一爲true,return,不進行終止 * 一、還在運行狀態 * 二、狀態是TIDYING、或 TERMINATED,已經終止過了 * 三、SHUTDOWN 且 workQueue不爲空 */ if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; /** * 只有shutdown狀態 且 workQueue爲空,或者 stop狀態能執行到這一步 * 若是此時線程池還有線程(正在運行任務或正在等待任務,總之count不等於0) * 中斷喚醒一個正在等任務的空閒worker *(中斷喚醒的意思就是讓阻塞在阻塞隊列中的worker拋出異常,而後從新判斷狀態,getTask方法邏輯) * 線程被喚醒後再次判斷線程池狀態,會return null,進入processWorkerExit()流程(runWorker邏輯) */ if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE);//中斷workers集合中的空閒任務,參數爲true,只中斷一個。(該邏輯的意義應該在於通知被阻塞在隊列中的線程:別瞎jb等了,這個線程池都要倒閉了,趕忙收拾鋪蓋準備銷燬吧你個逼玩意兒)。 //嘗試終止失敗,返回。可能你們會有疑問,shutdown只調用了一次tryTerminate方法,若是一次嘗試失敗了,是否是就意味着shutdown方法極可能最終沒法終止線程池? //其實看註釋,咱們知道線程池在進行全部負面效益的操做時都會調用該方法嘗試終止,上面咱們中斷了一個阻塞線程讓他被銷燬,他銷燬時也會嘗試終止(這其中又喚醒了一個阻塞線程去銷燬),以此類推,直到最後一個線程執行tryTerminate時,邏輯纔有可能走到下面去。 return; } /** * 若是狀態是SHUTDOWN,workQueue也爲空了,正在運行的worker也沒有了,開始terminated */ final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //CAS:將線程池的ctl變成TIDYING(全部的任務被終止,workCount爲0,爲此狀態時將會調用terminated()方法),期間ctl有變化就會失敗,會再次for循環 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { //方法爲空,需子類實現 terminated(); } finally { //將狀態置爲TERMINATED ctl.set(ctlOf(TERMINATED, 0)); //最後執行termination.signalAll(),並喚醒全部等待線程池終止這個Condition的線程(也就是調用了awaitTermination方法的線程,這個方法的做用是阻塞調用它的線程,直到調用該方法的線程池真的已經被終止了。) termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
總結一下:tryTerminate被調用的時機主要有:
1,shutdown方法時
2,processWorkerExit方法銷燬一個線程時
3,addWorkerFailed方法添加線程失敗或啓動線程失敗時
4,remove方法,從阻塞隊列中刪掉一個任務時
咱們知道,shutdown後線程池將變成shutdown狀態,此時不接收新任務,但會處理完正在運行的 和 在阻塞隊列中等待處理的任務。
咱們接下來要說的shutdownNow方法,做用是:shutdownNow後線程池將變成stop狀態,此時不接收新任務,再也不處理在阻塞隊列中等待的任務,還會嘗試中斷正在處理中的工做線程。
代碼以下:
/** * Attempts to stop all actively executing tasks, halts the * processing of waiting tasks, and returns a list of the tasks * that were awaiting execution. These tasks are drained (removed) * from the task queue upon return from this method. * 嘗試中止全部活動的正在執行的任務,中止等待任務的處理,並返回正在等待被執行的任務列表 * 這個任務列表是從任務隊列中排出(刪除)的 * <p>This method does not wait for actively executing tasks to * terminate. Use {@link #awaitTermination awaitTermination} to * do that. * 這個方法不用等到正在執行的任務結束,要等待線程池終止可以使用awaitTermination() * <p>There are no guarantees beyond best-effort attempts to stop * processing actively executing tasks. This implementation * cancels tasks via {@link Thread#interrupt}, so any task that * fails to respond to interrupts may never terminate. * 除了盡力嘗試中止運行中的任務,沒有任何保證 * 取消任務是經過Thread.interrupt()實現的,因此任何響應中斷失敗的任務可能永遠不會結束 * @throws SecurityException {@inheritDoc} */ public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //判斷調用者是否有權限shutdown線程池 checkShutdownAccess(); //CAS+循環設置線程池狀態爲stop advanceRunState(STOP); //中斷全部線程,包括正在運行任務的 interruptWorkers(); //將workQueue中的元素放入一個List並返回 tasks = drainQueue(); } finally { mainLock.unlock(); } //嘗試終止線程池 tryTerminate(); //返回workQueue中未執行的任務 return tasks; }
interruptWorkers 很簡單,循環對全部worker調用 interruptIfStarted,其中會判斷worker的AQS state是否大於0,即worker是否已經開始運做,再調用Thread.interrupt
須要注意的是,對於運行中的線程調用Thread.interrupt並不能保證線程被終止,task.run內部可能捕獲了InterruptException,沒有上拋,致使線程一直沒法結束
該方法的做用是等待線程池終止,參數是timeout:超時時間和unit: timeout超時時間的單位,返回結果:true:線程池終止,false:超過timeout指定時間
// public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { //是否terminated終止 if (runStateAtLeast(ctl.get(), TERMINATED)) return true; //是否已經超過超時時間 if (nanos <= 0) return false; //核心邏輯:看註釋咱們能知道,該方法讓調用線程等待一段時間,直到被喚醒(有且僅有以前咱們說過的tryTerminate方法中的 termination.signalAll()),或者被異常中斷,或者傳入了nanos時間參數流逝完。 nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } }
termination.awaitNanos() 是經過 LockSupport.parkNanos(this, nanosTimeout)實現的阻塞等待
阻塞等待過程當中發生如下具體狀況會解除阻塞(對上面3種狀況的解釋):
一、若是發生了 termination.signalAll()(內部實現是 LockSupport.unpark())會喚醒阻塞等待,且因爲ThreadPoolExecutor只有在 tryTerminated()嘗試終止線程池成功,將線程池更新爲terminated狀態後纔會signalAll(),故awaitTermination()再次判斷狀態會return true退出
二、若是達到了超時時間 termination.awaitNanos() 也會返回,此時nano==0,再次循環判斷return false,等待線程池終止失敗
三、若是當前線程被 Thread.interrupt(),termination.awaitNanos()會上拋InterruptException,awaitTermination()繼續上拋給調用線程,會以異常的形式解除阻塞
綜上,要想優雅的關閉線程池,咱們應該:
executorService.shutdown(); try{ while(!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) { LOGGER.debug("Waiting for terminate"); } } catch (InterruptedException e) { //中斷處理 }