爲了不頻繁重複的建立和銷燬線程,咱們可讓這些線程進行復用,在線程池中,總會有活躍的線程在佔用,可是線程池中也會存在沒有佔用的線程,這些線程處於空閒狀態,當有任務的時候會從池子裏面拿去一個線程來進行使用,當完成工做後,並無銷燬線程,而是將線程放回到池子中去。java
線程池主要解決兩個問題:編程
一是當執行大量異步任務時線程池可以提供很好的性能。數組
二是線程池提供了一種資源限制和管理的手段,好比能夠限制現成的個數,動態新增線程等。緩存
-《Java併發編程之美》安全
上面內容出自《Java併發編程之美》這本書,第一個問題上面已經提到過,線程的頻繁建立和銷燬是很損耗性能的,可是線程池中的線程是能夠複用的,能夠較好的提高性能問題,線程池內部是採用了阻塞隊列來維護Runnable對象。多線程
JDK爲咱們封裝了一套操做多線程的框架Executors,幫助咱們能夠更好的控制線程池,Executors下提供了一些線程池的工廠方法:併發
SchemeExecutorService
對象,線程池大小爲1,SchemeExecutorService
接口在ThreadPoolExecutor
類和 ExecutorService
接口之上的擴展,在給定時間執行某任務。SchemeExecutorService
對象,可指定線程池線程數量。對於核心的線程池來講,它內部都是使用了ThreadPoolExecutor
對象來實現的,只不過內部參數信息不同,咱們先來看兩個例子:nexFixedThreadPool
和newSingleThreadExecutor
以下所示:框架
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
複製代碼
由上面的線程池的建立過程能夠看到它們都是ThreadPoolExecutor
的封裝,接下來咱們來看一下ThreadPoolExecutor
的參數說明:異步
參數名稱 | 參數描述 |
---|---|
corePoolSize | 指定線程池線程的數量 |
maximumPoolSize | 指定線程池中線程的最大數量 |
keepAliveTime | 當線程池線程的數量超過corePoolSize的時候,多餘的空閒線程存活的時間,若是超過了corePoolSize,在keepAliveTime的時間以後,銷燬線程 |
unit | keepAliveTime的單位 |
workQueue | 工做隊列,將被提交但還沒有執行的任務緩存起來 |
threadFactory | 線程工廠,用於建立線程,不指定爲默認線程工廠DefaultThreadFactory |
handler | 拒絕策略 |
其中workQueue表明的是提交但未執行的隊列,它是BlockingQueue接口的對象,用於存放Runable對象,主要分爲如下幾種類型:ide
直接提交的隊列:SynchronousQueue
隊列,它是一個沒有容量的隊列,前面我有對其進行講解,當線程池進行入隊offer操做的時候,自己是無容量的,因此直接返回false,並無保存下來,而是直接提交給線程來進行執行,若是沒有空餘的線程則執行拒絕策略。
有界的任務隊列:可使用ArrayBlockingQueue
隊列,由於它內部是基於數組來進行實現的,初始化時必須指定容量參數,當使用有界任務隊列時,當有任務進行提交時,線程池的線程數量小於corePoolSize則建立新的線程來執行任務,當線程池的線程數量大於corePoolSize的時候,則將提交的任務放入到隊列中,當提交的任務塞滿隊列後,若是線程池的線程數量沒有超過maximumPoolSize,則建立新的線程執行任務,若是超過了maximumPoolSize則執行拒絕策略。
無界的任務隊列:可使用LinkedBlockingQueue
隊列,它內部是基於鏈表的形式,默認隊列的長度是Integer.MAX_VALUE
,也能夠指定隊列的長度,當隊列滿時進行阻塞操做,固然線程池中採用的是offer
方法並不會阻塞線程,當隊列滿時則返回false,入隊成功則則返回true,當使用LinkedBlockingQueue
隊列時,有任務提交到線程池時,若是線程池的數量小於corePoolSize,線程池會產生新的線程來執行任務,當線程池的線程數量大於corePoolSize時,則將提交的任務放入到隊列中,等待執行任務的線程執行完以後進行消費隊列中的任務,若後續仍有新的任務提交,而沒有空閒的線程時,它會不斷往隊列中入隊提交的任務,直到資源耗盡。
優先任務隊列:t有限任務隊列是帶有執行優先級的隊列,他可使用PriorityBlockingQueue
隊列,能夠控制任務的執行前後順序,它是一個無界隊列,該隊列能夠根據任務自身的優先級順序前後執行,在確保性能的同時,也能有很好的質量保證。
上面講解了關於線程池內部都是經過ThreadPoolExecutor
來進行實現的,那麼下面我以一個例子來進行源碼分析:
public class ThreadPoolDemo1 {
public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(5,
10,
60L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(5), new CustomThreadFactory());
for (int i = 0; i < 15; i++) {
executorService.execute(() -> {
try {
Thread.sleep(50000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("由線程:" + Thread.currentThread().getName() + "執行任務完成");
});
}
}
}
複製代碼
上面定義了一個線程池,線程池初始化的corePoolSize爲5,也就是線程池中線程的數量爲5,最大線程maximumThreadPoolSize爲10,空餘的線程存活的時間是60s,使用ArrayBlockingQueue來做爲阻塞隊列,這裏還發現我自定義了ThreadFactory
線程池工廠,這裏我真是針對線程建立的時候輸出線程池的名稱,源碼以下所示:
/** * 自定義的線程池構造工廠 */
public class CustomThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
public CustomThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
@Override
public Thread newThread(Runnable r) {
String name = namePrefix + threadNumber.getAndIncrement();
Thread t = new Thread(group, r,
name,
0);
System.out.println("線程池建立,線程名稱爲:" + name);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
複製代碼
代碼和DefaultThreadFactory
同樣,只是在newThread
新建線程的動做的時候輸出了線程池的名稱,方便查看線程建立的時機,上面main
方法中提交了15個任務,調用了execute
方法來進行提交任務,在分析execute
方法以前咱們先了解一下線程的狀態:
//假設Integer類型是32位的二進制表示。
//高3位表明線程池的狀態,低29位表明的是線程池的數量
//默認是RUNNING狀態,線程池的數量爲0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//線程個數位數,表示的Integer中除去最高的3位以後剩下的位數表示線程池的個數
private static final int COUNT_BITS = Integer.SIZE - 3;
//線程池的線程的最大數量
//這裏舉例是32爲機器,表示爲00011111111111111111111111111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//線程池的狀態
// runState is stored in the high-order bits
//11100000000000000000000000000000
//接受新任務而且處理阻塞隊列裏面任務
private static final int RUNNING = -1 << COUNT_BITS;
//00000000000000000000000000000000
//拒絕新任務可是處理阻塞隊列的任務
private static final int SHUTDOWN = 0 << COUNT_BITS;
//00100000000000000000000000000000
//拒接新任務而且拋棄阻塞隊列裏面的任務,同時會中斷正在處理的任務
private static final int STOP = 1 << COUNT_BITS;
//01000000000000000000000000000000
//全部任務都執行完(包括阻塞隊列中的任務)後當線程池活動線程數爲0,將要調用terminated方法。
private static final int TIDYING = 2 << COUNT_BITS;
//01100000000000000000000000000000
//終止狀態,terminated方法調用完成之後的狀態
private static final int TERMINATED = 3 << COUNT_BITS;
複製代碼
經過上面內容能夠看到ctl其實存放的是線程池的狀態和線程數量的變量,默認是RUNNING
,也就是11100000000000000000000000000000
,這裏咱們來假設運行的機器上的Integer的是32位的,由於有些機器上可能Integer並非32位,下面COUNT_BITS來控制位數,也就是先獲取Integer在該平臺上的位數,好比說是32位,而後32位-3位=29位,也就是低29位表明的是現成的數量,高3位表明線程的狀態,能夠清晰看到下面的線程池的狀態都是經過低位來進行向左位移的操做的,除了上面的變量,還提供了操做線程池狀態的方法:
// 操做ctl變量,主要是進行分解或組合線程數量和線程池狀態。
// 獲取高3位,獲取線程池狀態
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 獲取低29位,獲取線程池中線程的數量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 組合ctl變量,rs=runStatue表明的是線程池的狀態,wc=workCount表明的是線程池線程的數量
private static int ctlOf(int rs, int wc) { return rs | wc; }
/* * Bit field accessors that don't require unpacking ctl. * These depend on the bit layout and on workerCount being never negative. */
//指定的線程池狀態c小於狀態s
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
//指定的線程池狀態c至少是狀態s
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// 判斷線程池是否運行狀態
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/** * CAS增長線程池線程數量. */
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/** * CAS減小線程池線程數量 */
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/** * 將線程池的線程數量進行較少操做,若是競爭失敗直到競爭成功爲止。 */
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
複製代碼
下來咱們看一下ThreadPoolExecutor
對象下的execute
方法:
public void execute(Runnable command) {
// 判斷提交的任務是否是爲空,若是爲空則拋出NullPointException異常
if (command == null)
throw new NullPointerException();
// 獲取線程池的狀態和線程池的數量
int c = ctl.get();
// 若是線程池的數量小於corePoolSize,則進行添加線程執行任務
if (workerCountOf(c) < corePoolSize) {
//添加線程修改線程數量而且將command做爲第一個任務進行處理
if (addWorker(command, true))
return;
// 獲取最新的狀態
c = ctl.get();
}
// 若是線程池的狀態是RUNNING,將命令添加到隊列中
if (isRunning(c) && workQueue.offer(command)) {
//二次檢查線程池狀態和線程數量
int recheck = ctl.get();
//線程不是RUNNING狀態,從隊列中移除當前任務,而且執行拒絕策略。
//這裏說明一點,只有RUNNING狀態的線程池纔會接受新的任務,其他狀態所有拒絕。
if (! isRunning(recheck) && remove(command))
reject(command);
//若是線程池的線程數量爲空時,表明線程池是空的,添加一個新的線程。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//若是隊列是滿的,或者是SynchronousQueue隊列時,則直接添加新的線程執行任務,若是添加失敗則進行拒絕
//可能線程池的線程數量大於maximumPoolSize則採起拒絕策略。
else if (!addWorker(command, false))
reject(command);
}
複製代碼
經過分析execute方法總結如下幾點:
corePoolSize
時,直接添加線程到線程池而且將當前任務作爲第一個任務執行。RUNNING
,則能夠接受任務,將任務放入到阻塞隊列中,內部進行二次檢查,有可能在運行下面內容時線程池狀態已經發生了變化,在這個時候若是線程池狀態變成不是RUNNING
,則將當前任務從隊列中移除,而且進行拒絕策略。SynchronousQueue
這種特殊隊列無空間的時候,直接添加新的線程執行任務,當線程池的線程數量大於maximumPoolSize
時相應拒絕策略。offer
方法,該方法不會阻塞隊列,若是隊列已經滿時或超時致使入隊失敗,返回false,若是入隊成功返回true。針對上面例子源碼咱們來作一下分析,咱們源碼中阻塞隊列採用的是ArrayBlockingQueue
隊列,而且指定隊列的長度是5,咱們看下面提交的線程池的任務是15個,並且corePoolSize設置的是5個核心線程,最大線程數(maximumPoolSzie)是10個(包括核心線程數),假設全部任務都同時提交到了線程池中,其中有5個任務會被提交到線程中做爲第一個任務進行執行,會有5個任務被添加到阻塞隊列中,還有5個任務提交到到線程池中的時候發現阻塞隊列已經滿了,這時候會直接提交任務,發現當前線程數是5小於最大線程數,能夠進行新建線程來執行任務。
processWorkerExit
來處理線程的退出,接下來咱們來分析下
addWorker
都作了什麼內容:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//獲取線程池的狀態和線程池線程的數量
int c = ctl.get();
//單獨獲取線程池的狀態
int rs = runStateOf(c);
//檢查隊列是否只在必要時爲空
if (rs >= SHUTDOWN && //線程池的狀態是SHUTDOWN、STOP、TIDYING、TERMINATED
! (rs == SHUTDOWN && //能夠看作是rs!=SHUTDOWN,線程池狀態爲STOP、TIDYING、TERMINATED
firstTask == null && //能夠看作firstTask!=null,而且rs=SHUTDOWN
! workQueue.isEmpty())) //能夠看作rs=SHUTDOWN,而且workQueue.isEmpty()隊列爲空
return false;
//循環CAS增長線程池中線程的個數
for (;;) {
//獲取線程池中線程個數
int wc = workerCountOf(c);
//若是線程池線程數量超過最大線程池數量,則直接返回
if (wc >= CAPACITY ||
//若是指定使用corePoolSize做爲限制則使用corePoolSize,反之使用maximumPoolSize,最爲工做線程最大線程線程數量,若是工做線程大於相應的線程數量則直接返回。
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS增長線程池中線程的數量
if (compareAndIncrementWorkerCount(c))
//跳出增長線程池數量。
break retry;
//若是修改失敗,則從新獲取線程池的狀態和線程數量
c = ctl.get(); // Re-read ctl
//若是最新的線程池狀態和原有縣城出狀態不同時,則跳轉到外層retry中,不然在內層循環從新進行CAS
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//工做線程是否開始啓動標誌
boolean workerStarted = false;
//工做線程添加到線程池成功與否標誌
boolean workerAdded = false;
Worker w = null;
try {
//建立一個Worker對象
w = new Worker(firstTask);
//獲取worker中的線程,這裏線程是經過ThreadFactory線程工廠建立出來的,詳細看下面源碼信息。
final Thread t = w.thread;
//判斷線程是否爲空
if (t != null) {
//添加獨佔鎖,爲添加worker進行同步操做,防止其餘線程同時進行execute方法。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//獲取線程池的狀態
int rs = runStateOf(ctl.get());
//若是線程池狀態爲RUNNING或者是線程池狀態爲SHUTDOWN而且第一個任務爲空時,當線程池狀態爲SHUTDOWN時,是不容許添加新任務的,因此他會從隊列中獲取任務。
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//添加worker到集合中
workers.add(w);
int s = workers.size();
//跟蹤最大的線程池數量
if (s > largestPoolSize)
largestPoolSize = s;
//添加worker成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//若是添加worker成功就啓動任務
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//若是沒有啓動,w不爲空就已出worker,而且線程池數量進行減小。
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
複製代碼
經過上面addWorker
方法能夠分爲兩個部分來進行講解,第一部分是對線程池中線程數量的經過CAS的方式進行增長,其中第一部分中上面有個if語句,這個地方着重分析下:
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
複製代碼
能夠當作下面的樣子,將!
放到括號裏面,變成下面的樣子:
if (rs >= SHUTDOWN &&
(rs != SHUTDOWN ||
firstTask != null ||
workQueue.isEmpty()))
return false;
複製代碼
上半部分分爲內外兩個循環,外循環對線程池狀態的判斷,用於判斷是否須要添加工做任務線程,經過上面講的內容進行判斷,後面內循環則是經過CAS操做增長線程數,若是指定了core
參數爲true,表明線程池中線程的數量沒有超過corePoolSize
,當指定爲false時,表明線程池中線程數量達到了corePoolSize
,而且隊列已經滿了,或者是SynchronousQueue
這種無空間的隊列,可是尚未達到最大的線程池maximumPoolSize
,因此它內部會根據指定的core
參數來判斷是否已經超過了最大的限制,若是超過了就不能進行添加線程了,而且進行拒絕策略,若是沒有超過就增長線程數量。
第二部分主要是把任務添加到worker中,並啓動線程,這裏咱們先來看一下Worker對象。
// 這裏發現它是實現了AQS,是一個不可重入的獨佔鎖模式
// 而且它還集成了Runable接口,實現了run方法。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
private static final long serialVersionUID = 6138294804551838833L;
/** 執行任務的線程,經過ThreadFactory建立 */
final Thread thread;
/** 初始化第一個任務*/
Runnable firstTask;
/** 每一個線程完成任務的數量 */
volatile long completedTasks;
/** * 首先現將state值設置爲-1,由於在AQS中state=0表明的是鎖沒有被佔用,並且在線程池中shutdown方法會判斷可否爭搶到鎖,若是能夠得到鎖則對線程進行中斷操做,若是調用了shutdownNow它會判斷state>=0會被中斷。 * firstTask第一個任務,若是爲空則會從隊列中獲取任務,後面runWorker中。 */
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** 委託調用外部的runWorker方法 */
public void run() {
runWorker(this);
}
//是否獨佔鎖
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
//這裏就是上面shutdownNow中調用的線程中斷的方法,getState()>=0
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
複製代碼
能夠看到Worker是一個實現了AQS的鎖,它是一個不可重入的獨佔鎖,而且他也實現了Runnable
接口,實現了run
方法,在構造函數中將AQS的state
設置爲-1
,爲了不線程尚未進入runWorker
方法前,就調用了shutdown
或shutdownNow
方法,會被中斷,設置爲-1則不會被中斷。後面咱們看到run
方法,它調用的是ThreadPoolExecutor
的runWorker
方法,咱們這裏回想一下,在addWorker
方法中,添加worker
到HashSet<Worker>
中後,他會將workerAdded
設置爲true,表明添加worker
成功,後面有調用了下面代碼:
if (workerAdded) {
t.start();
workerStarted = true;
}
複製代碼
這個t表明的就是在Worker構造函數中的使用ThreadFactory
建立的線程,而且將本身(Worker本身)傳遞了當前線程,建立的線程就是任務線程,任務線程啓動的時候會調用Worker
下的run
方法,run
方法內部又委託給外部方法runWorker
來進行操做,它的參數傳遞的是調用者本身,Worker
中的run
方法以下所示:
public void run() {
runWorker(this); //this指Worker對象自己
}
複製代碼
這裏簡單畫一張圖來表示下調用的邏輯。
總體的邏輯是先進行建立線程,線程將Worker
設置爲執行程序,並將線程塞到Worker
中,而後再addWorker中將Worker中的線程取出來,進行啓動操做,啓動後他會調用Worker中的run方法,而後run方法中將調用ThreadPoolExecutor的runWorker,而後runWorker又會調用Worker中的任務firstTask,這個fistTask是要真正執行的任務,也是用戶本身實現的代碼邏輯。
接下來咱們就要看一下runWorker方法裏面具體內容:
final void runWorker(Worker w) {
//調用者也就是Worker中的線程
Thread wt = Thread.currentThread();
//獲取Worker中的第一個任務
Runnable task = w.firstTask;
//將Worker中的任務清除表明執行了第一個任務了,後面若是再有任務就從隊列中獲取。
w.firstTask = null;
//這裏還記的咱們在new Worker的時候將AQS的state狀態設置爲-1,這裏先進行解鎖操做,將state設置爲0
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//循環進行獲取任務,若是第一個任務不爲空,或者是若是第一個任務爲空,從任務隊列中獲取任務,若是有任務則返回獲取的任務信息,若是沒有任務能夠獲取則進行阻塞,阻塞也分兩種第一種是阻塞直到任務隊列中有內容,第二種是阻塞隊列必定時間以後仍是沒有任務就直接返回null。
while (task != null || (task = getTask()) != null) {
//先獲取worker的獨佔鎖,防止其餘線程調用了shutdown方法。
w.lock();
// 若是線程池正在中止,確保線程是被中斷的,若是沒有則確保線程不被中斷操做。
if ((runStateAtLeast(ctl.get(), STOP) || //若是線程池狀態爲STOP、TIDYING、TERMINATED直接拒絕任務中斷當前線程
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//執行任務以前作一些操做,可進行自定義
beforeExecute(wt, task);
Throwable thrown = null;
try {
//運行任務在這裏嘍。
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//執行任務以後作一些操做,可進行自定義
afterExecute(task, thrown);
}
} finally {
//將任務清空爲了下次任務獲取
task = null;
//統計當前Worker完成了多少任務
w.completedTasks++;
//獨佔鎖釋放
w.unlock();
}
}
completedAbruptly = false;
} finally {
//處理Worker的退出操做,執行清理工做。
processWorkerExit(w, completedAbruptly);
}
}
複製代碼
咱們看到若是Worker是第一次被啓動,它會從Worker中獲取firstTask任務來執行,而後執行成功後,它會getTask()來從隊列中獲取任務,這個地方比較有意思,它是分狀況進行獲取任務的,咱們都直到BlockingQueue中提供了幾種從隊列中獲取的方法,這個getTask中使用了兩種方式,第一種是使用poll進行獲取隊列中的信息,它採用的是過一點時間若是隊列中仍沒有任務時直接返回null,而後還有一個就是take方法,take方法是若是隊列中沒有任務則將當前線程進行阻塞,等待隊列中有任務後,會通知等待的隊列線程進行消費任務,讓咱們看一下getTask方法:
private Runnable getTask() {
boolean timedOut = false; //poll獲取超時
for (;;) {
//獲取線程池的狀態和線程數量
int c = ctl.get();
//獲取線程池的狀態
int rs = runStateOf(c);
//線程池狀態大於等於SHUTDOWN
//1.線程池若是是大於STOP的話減小工做線程池數量
//2.若是線程池狀態爲SHUTDOW而且隊列爲空時,表明隊列任務已經執行完,返回null,線程數量減小1
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//獲取線程池數量。
int wc = workerCountOf(c);
//若是allowCoreThreadTimeOut爲true,則空閒線程在必定時間未得到任務會清除
//或者若是線程數量大於corePoolSize的時候會進行清除空閒線程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//1.若是線程池數量大於最大的線程池數量或者對(空餘線程進行清除操做而且poll超時了,意思是隊列中沒有內容了,致使poll間隔一段時間後沒有獲取內容超時了。
//2.若是線程池的數量大於1或者是隊列已是空的
//總之意思就是當線程池的線程池數量大於corePoolSize,或指定了allowCoreThreadTimeOut爲true,當隊列中沒有數據或者線程池數量大於1的狀況下,嘗試對線程池的數量進行減小操做,而後返回null,用於上一個方法進行清除操做。
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//若是timed表明的是清除空閒線程的意思
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //等待一段時間若是沒有獲取到返回null。
workQueue.take(); //阻塞當前線程
//若是隊列中獲取到內容則返回
if (r != null)
return r;
//若是沒有獲取到超時了則設置timeOut狀態
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
複製代碼
咱們還記得第一張圖中有標記出來是core線程和普通線程,其實這樣標記不是很準確,準確的意思是若是線程池的數量超過了corePoolSize而且沒有特別指定allowCoreThreadTimeOut的狀況下,它會清除掉大於corePoolSize而且小於等於maximumPoolSize的一些線程,標記出core線程的意思是有corePoolSize不會被清除,可是會清除大於corePoolSize的線程,也就是線程池中的線程對獲取任務的時候進行判斷,也就是getTask中進行判斷,若是當前線程池的線程數量大於corePoolSize就使用poll方式獲取隊列中的任務,當過一段時間尚未任務就會返回null,返回null以後設置timeOut=true,而且獲取getTask也會返回null,到此會跳到調用者runWorker方法中,一直在while (task != null || (task = getTask()) != null)
此時的getTask返回null跳出while循環語句,設置completedAbruptly = false,表示不是忽然完成的而是正常完成,退出後它會執行finally的processWorkerExit(w, completedAbruptly)
,執行清理工做。咱們來看下源碼:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 若是忽然完成則調整線程數量
decrementWorkerCount(); // 減小線程數量1
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); //獲取鎖,同時只有一個線程得到鎖
try {
completedTaskCount += w.completedTasks; //統計整個線程池完成的數量
workers.remove(w); //將完成任務的worker從HashSet中移除
} finally {
mainLock.unlock(); //釋放鎖
}
//嘗試設置線程池狀態爲TERMINATED
//1.若是線程池狀態爲SHUTDOWN而且線程池線程數量與工做隊列爲空時,修改狀態。
//2.若是線程池狀態爲STOP而且線程池線程數量爲空時,修改狀態。
tryTerminate();
// 獲取線程池的狀態和線程池的數量
int c = ctl.get();
// 若是線程池的狀態小於STOP,也就是SHUTDOWN或RUNNING狀態
if (runStateLessThan(c, STOP)) {
//若是不是忽然完成,也就是正常結束
if (!completedAbruptly) {
//若是指定allowCoreThreadTimeOut=true(默認false)則表明線程池中有空餘線程時須要進行清理操做,不然線程池中的線程應該保持corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//這裏判斷若是線程池中隊列爲空而且線程數量最小爲0時,將最小值調整爲1,由於隊列中還有任務沒有完成須要增長隊列,因此這裏增長了一個線程。
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//若是當前線程數效益核心個數,就增長一個Worker
addWorker(null, false);
}
複製代碼
經過上面的源碼能夠得出,若是線程數超過核心線程數後,在runWorker
中就不會等待隊列中的消息,而是會進行清除操做,上面的清除代碼首先是先對線程池的數量進行較少操做,其次是統計整個線程池中完成任務的數量,而後就是嘗試修改線程池的狀態由SHUTDOWN->TIDYING->TERMINATED
或者是由STOP->TIDYING->TERMINATED
,修改線程池狀態爲TERMINATED
,須要有兩個條件:
當線程池線程數量和工做隊列爲空,而且線程池的狀態爲SHUTDOWN
時,纔會將狀態進行修改,修改的過程是SHUTDOWN->TIDYING->TERMINATED
當線程池的狀態爲STOP
而且線程池數量爲空時,纔會嘗試修改狀態,修改過程是STOP->TIDYING->TERMINATED
若是設置爲TERMINATED
狀態,還須要調用條件變量termination
的signalAll()
方法來喚醒全部由於調用awaitTermination
方法而被阻塞的線程,換句話說當調用awaitTermination
後,只有線程池狀態變成TERMINATED纔會被喚醒。
接下來咱們就來分析一下這個tryTerminate
方法,看一下他到底符不符合咱們上述說的內容:
final void tryTerminate() {
for (;;) {
// 獲取線程池的狀態和線程池的數量組合狀態
int c = ctl.get();
//這裏單獨下面進行分析,這裏說明兩個問題,須要反向來想這個問題。
//1.若是線程池狀態STOP則不進入if語句
//2.若是線程池狀態爲SHUTDOWN而且工做隊列爲空時,不進入if語句
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//若是線程池數量不爲空時,進行中斷操做。
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//修改狀態爲TIDYING,而且將線程池的數量進行清空
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//執行一些邏輯,默認是空的
terminated();
} finally {
//修改狀態爲TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
//喚醒調用awaitTermination方法的線程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
複製代碼
咱們單獨將上面的if語句摘出來進行分析,將上面的第一個if判斷進行修改以下,能夠看到return在else裏面,這時候內部if判斷進行轉換,轉換成以下所示:
if (!isRunning(c) &&
!runStateAtLeast(c, TIDYING) && //只能是SHUTDOWN和STOP
(runStateOf(c) != SHUTDOWN || workQueue.isEmpty())){
//這裏執行邏輯
}else {
return;
}
複製代碼
逐一分析分析內容以下:
!isRunning(c)
表明不是RUNNING,則可能的是SHUTDOWN
,STOP
,TIDYING
,TERMINATED
這四種狀態
中間的鏈接符是而且的意思,跟着runStateAtLeast(c, TIDYING)
這句話的意思是至少是TIDYING
,TERMINATED
這兩個,反過來就是多是RUNNING
,SHUTDOWN
,STOP
,可是前面已經判斷了不能是RUNINNG
狀態,因此前面兩個連在一塊兒就是隻能是狀態爲SHUTDOWN
,STOP
runStateOf(c) != SHUTDOWN || workQueue.isEmpty()
當前面的狀態是SHUTDOWN
時,則會出發workQueue.isEmpty()
,連在一塊兒就是狀態是SHUTDOWN
並工做隊列爲空,當線程池狀態爲STOP
時,則會進入到runStateOf(c) != SHUTDOWN
,直接返回true,就表明線程池狀態爲STOP
後面還有一個語句一個if語句將其轉換一下邏輯就是下面的內容:
if (workerCountOf(c) == 0) {
//執行下面的邏輯
}else{
interruptIdleWorkers(ONLY_ONE);
return;
}
複製代碼
這裏咱們也進行轉換下,就能夠看出來當線程池的數量爲空時,纔會進行下面的邏輯,下面的邏輯就是修改線程池狀態爲TERMINATED
,兩個連在一塊兒就是上面分析的修改狀態爲TERMINATED
的條件,這裏畫一張圖來表示線程池狀態的信息:
其實上面圖中咱們介紹了關於從SHUTDOWN
或STOP
到TERMINATED
的變化,沒有講解關於如何從RUNNING
狀態轉變成SHUTDOWN
或STOP
狀態,實際上是調用了shutdown()
或shutdownNow
方法對其進行狀態的變換,下面來看一下shutdown
方法源碼:
public void shutdown() {
//獲取全局鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//權限檢查
checkShutdownAccess();
//設置線程池狀態爲SHUTDOWN,若是狀態已是大於等於SHUTDOWN則直接返回
advanceRunState(SHUTDOWN);
//若是線程沒有設置中斷標識而且線程沒有運行則設置中斷標識
interruptIdleWorkers();
//空的能夠實現的內容
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//嘗試修改線程池狀態爲TERMINATED
tryTerminate();
}
複製代碼
SecurityException
或NullPointException
異常。接下來咱們來看一下advanceRunState
內容以下所示:
private void advanceRunState(int targetState) {
for (;;) {
//獲取線程池狀態和線程池的線程數量
int c = ctl.get();
if (runStateAtLeast(c, targetState) || //若是線程池的狀態>=SHUTDOWN
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) //設置線程池狀態爲SHUTDOWN
//返回
break;
}
}
複製代碼
interruptIdleWorkers
代碼以下所示:
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
複製代碼
private void interruptIdleWorkers(boolean onlyOne) {
//獲取全局鎖,同時只能有一個線程可以調用shutdown方法
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//遍歷工做線程
for (Worker w : workers) {
Thread t = w.thread;
//若是當前線程沒有設置中斷標誌而且能夠獲取Worker本身的鎖
if (!t.isInterrupted() && w.tryLock()) {
try {
//設置中斷標誌
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
//執行一次,清理空閒線程。
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
複製代碼
咱們看到當咱們調用shutdown方法的時候,只是將空閒的線程給設置了中斷標識,也就是活躍正在執行任務的線程並無設置中斷標識,直到將任務所有執行完後纔會逐步清理線程操做,咱們還記的在getTask中的方法裏面有這樣一段代碼:
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
複製代碼
判斷是不是狀態>=SHUTDOWN,而且隊列爲空時,將線程池數量進行減小操做,內部進行CAS操做,直到CAS操做成功爲止,而且返回null,返回null後,會調用processWorkerExit(w, false);
清理Workers線程信息,而且嘗試將線程設置爲TERMINATED
狀態,上面是對全部shutdown
方法的分析,下面來看一下shutdownNow
方法而且比較兩個之間的區別:
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//權限檢查
checkShutdownAccess();
//設置線程池狀態爲STOP,若是狀態已是大於等於STOP則直接返回
advanceRunState(STOP);
//這裏是和SHUTDOWN區別的地方,這裏是強制進行中斷操做
interruptWorkers();
//將爲完成任務複製到list集合中
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//嘗試修改線程池狀態爲TERMINATED
tryTerminate();
return tasks;
}
複製代碼
shutdownNow
方法返回了未完成的任務信息列表tasks = drainQueue();
,其實該方法和shutdown
方法主要的區別在於一下幾點內容:
shutdownNow
方法將線程池狀態設置爲STOP
,而shutdown
則將狀態修改成SHUTDOWN
shutdownNow
方法將工做任務進行中斷操做,也就是說若是工做線程在工做也會被中斷,而shutdown
則是先嚐試獲取鎖若是得到鎖成功則進行中斷標誌設置,也就是中斷操做,若是沒有獲取到鎖則等待進行完成後自動退出。shutdownNow
方法返回未完成的任務列表。下面代碼是shutDownNow
的interruptWorkers
方法:
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
//直接進行中斷操做。
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
複製代碼
內部調用了Worker
的interruptIfStarted
方法,方法內部是針對線程進行中斷操做,可是中斷的前提條件是AQS的state狀態必須大於等於0,若是狀態爲-1的則不會被中斷,可是若是任務運行起來的時候在runWorker
中則不會執行任務,由於線程池狀態爲STOP
,若是線程池狀態爲STOP則會中斷線程,下面代碼是Worker中的interruptIfStarted
:
void interruptIfStarted() {
Thread t;
//當前Worker鎖狀態大於等於0而且線程沒有被中斷
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
複製代碼
JDK內置的拒絕策略以下:
首先先上一張圖,針對這張圖來進行總結:
addWorker
進行建立線程,並將線程放入到線程池中,這裏咱們看到第二步是將線程添加到核心線程中,其實線程池內部不分核心線程和非核心線程,只是根據corePoolSize和maximumPoolSize設置的大小來進行區分,由於超過corePoolSize的線程會被回收,至於回收那些線程,是根據線程獲取任務的時候進行判斷,當前線程池數量大於corePoolSize,或者指定了allowCoreThreadTimeOut
爲true,則他等待必定時間後會返回,不會一直等待