Java併發系列[10]----ThreadPoolExecutor源碼分析

在平常的開發調試中,咱們常常會直接new一個Thread對象來執行某個任務。這種方式在任務數較少的狀況下比較簡單實用,可是在併發量較大的場景中卻有着致命的缺陷。例如在訪問量巨大的網站中,若是每一個請求都開啓一個線程來處理的話,即便是再強大的服務器也支撐不住。一臺電腦的CPU資源是有限的,在CPU較爲空閒的狀況下,新增線程能夠提升CPU的利用率,達到提高性能的效果。可是在CPU滿載運行的狀況下,再繼續增長線程不只不能提高性能,反而由於線程的競爭加大而致使性能降低,甚至致使服務器宕機。所以,在這種狀況下咱們能夠利用線程池來使線程數保持在合理的範圍內,使得CPU資源被充分的利用,且避免因過載而致使宕機的危險。在Executors中爲咱們提供了多種靜態工廠方法來建立各類特性的線程池,其中大多數是返回ThreadPoolExecutor對象。所以本篇咱們從ThreadPoolExecutor類着手,深刻探究線程池的實現機制。緩存

1. 線程池狀態和線程數的表示服務器

 1 //高3位表示線程池狀態, 後29位表示線程個數
 2 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 3 private static final int COUNT_BITS = Integer.SIZE - 3;
 4 private static final int CAPACITY = (1 << COUNT_BITS) - 1;
 5 
 6 //運行狀態  例:11100000000000000000000000000000
 7 private static final int RUNNING = -1 << COUNT_BITS;
 8 
 9 //關閉狀態  例:00000000000000000000000000000000
10 private static final int SHUTDOWN = 0 << COUNT_BITS;
11 
12 //中止狀態  例:00100000000000000000000000000000
13 private static final int STOP = 1 << COUNT_BITS;
14 
15 //整理狀態  例:01000000000000000000000000000000
16 private static final int TIDYING = 2 << COUNT_BITS;
17 
18 //終止狀態  例:01100000000000000000000000000000
19 private static final int TERMINATED = 3 << COUNT_BITS;
20 
21 private static int runStateOf(int c) { return c & ~CAPACITY; }
22 private static int workerCountOf(int c) { return c & CAPACITY; }
23 private static int ctlOf(int rs, int wc) { return rs | wc; }

在繼續接下來的探究以前,咱們先來搞清楚ThreadPoolExecutor是怎樣存放狀態信息和線程數信息的。ThreadPoolExecutor利用原子變量ctl來同時存儲運行狀態和線程數的信息,其中高3位表示線程池的運行狀態(runState),後面的29位表示線程池中的線程數(workerCount)。上面代碼中,runStateOf方法是從ctl取出狀態信息,workerCountOf方法是從ctl取出線程數信息,ctlOf方法是將狀態信息和線程數信息糅合進ctl中。具體的計算過程以下圖所示。併發

2. 線程池各個狀態的具體含義性能

就像人的生老病死同樣,線程池也有本身的生命週期,從建立到終止,線程池在每一個階段所作的事情是不同的。新建一個線程池時它的狀態爲Running,這時它不斷的從外部接收並處理任務,當處理不過來時它會把任務放到任務隊列中;以後咱們可能會調用shutdown()來終止線程池,這時線程池的狀態從Running轉爲Shutdown,它開始拒絕接收從外部傳過來的任務,可是會繼續處理完任務隊列中的任務;咱們也可能調用shutdownNow()來馬上中止線程池,這時線程池的狀態從Running轉爲Stop,而後它會快速排空任務隊列中的任務並轉到Tidying狀態,處於該狀態的線程池須要執行terminated()來作相關的掃尾工做,執行完terminated()以後線程池就轉爲Terminated狀態,表示線程池已終止。這些狀態的轉換圖以下所示。網站

3. 關鍵成員變量的介紹ui

 1 //任務隊列
 2 private final BlockingQueue<Runnable> workQueue;
 3 
 4 //工做者集合
 5 private final HashSet<Worker> workers = new HashSet<Worker>();
 6 
 7 //線程達到的最大值
 8 private int largestPoolSize;
 9 
10 //已完成任務總數
11 private long completedTaskCount;
12 
13 //線程工廠
14 private volatile ThreadFactory threadFactory;
15 
16 //拒絕策略
17 private volatile RejectedExecutionHandler handler;
18 
19 //閒置線程存活時間
20 private volatile long keepAliveTime;
21 
22 //是否容許核心線程超時
23 private volatile boolean allowCoreThreadTimeOut;
24 
25 //核心線程數量
26 private volatile int corePoolSize;
27 
28 //最大線程數量
29 private volatile int maximumPoolSize;
30 
31 //默認拒絕策略
32 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

在深刻探究線程池的實現機制以前,咱們有必要了解一下各個成員變量的做用。上面列出了主要的成員變量,除了一些用於統計的變量,例如largestPoolSize和completedTaskCount,其中大部分變量的值都是能夠在構造時進行設置的。下面咱們就看一下它的核心構造器。this

 1 //核心構造器
 2 public ThreadPoolExecutor(int corePoolSize,
 3                           int maximumPoolSize,
 4                           long keepAliveTime,
 5                           TimeUnit unit,
 6                           BlockingQueue<Runnable> workQueue,
 7                           ThreadFactory threadFactory,
 8                           RejectedExecutionHandler handler) {
 9     if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) {
10         throw new IllegalArgumentException();
11     }    
12     if (workQueue == null || threadFactory == null || handler == null) {
13         throw new NullPointerException();
14     }
15     this.corePoolSize = corePoolSize;                  //設置核心線程數量
16     this.maximumPoolSize = maximumPoolSize;            //設置最大線程數量
17     this.workQueue = workQueue;                        //設置任務隊列
18     this.keepAliveTime = unit.toNanos(keepAliveTime);  //設置存活時間
19     this.threadFactory = threadFactory;                //設置線程工廠
20     this.handler = handler;                            //設置拒絕策略
21 }

ThreadPoolExecutor有多個構造器,全部的構造器都會調用上面的核心構造器。經過核心構造器咱們能夠爲線程池設置不一樣的參數,由此線程池也能表現出不一樣的特性。所以完全搞懂這幾個參數的含義能使咱們更好的使用線程池,下面咱們就來詳細看一下這幾個參數的含義。
corePoolSize:
核心線程數最大值,默認狀況下新建線程池時並不建立線程,後續每接收一個任務就新建一個核心線程來處理,直到核心線程數達到corePoolSize。這時後面到來的任務都會被放到任務隊列中等待。
maximumPoolSize:
總線程數最大值,當任務隊列被放滿了以後,將會新建非核心線程來處理後面到來的任務。當總的線程數達到maximumPoolSize後,將再也不繼續建立線程,而是對後面的任務執行拒絕策略。
workQueue:
任務隊列,當核心線程數達到corePoolSize後,後面到來的任務都會被放到任務隊列中,該任務隊列是阻塞隊列,工做線程能夠經過定時或者阻塞方式從任務隊列中獲取任務。
keepAliveTime:
閒置線程存活時間,該參數默認狀況下只在線程數大於corePoolSize時起做用,閒置線程在任務隊列上等待keepAliveTime時間後將會被終止,直到線程數減至corePoolSize。也能夠經過設置allowCoreThreadTimeOut變量爲true來使得keepAliveTime在任什麼時候候都起做用,這時線程數最後會減至0。spa

4. execute方法的執行過程線程

 1 //核心執行方法
 2 public void execute(Runnable command) {
 3     if (command == null) throw new NullPointerException();
 4     int c = ctl.get();
 5     //線程數若小於corePoolSize則新建核心工做者
 6     if (workerCountOf(c) < corePoolSize) {
 7         if (addWorker(command, true)) return;
 8         c = ctl.get();
 9     }
10     //不然將任務放到任務隊列
11     if (isRunning(c) && workQueue.offer(command)) {
12         int recheck = ctl.get();
13         //若不是running狀態則將該任務從隊列中移除
14         if (!isRunning(recheck) && remove(command)) {
15             //成功移除後再執行拒絕策略
16           reject(command);
17         //若線程數爲0則新增一個非核心線程
18         }else if (workerCountOf(recheck) == 0) {
19           addWorker(null, false);
20         }
21     //若隊列已滿則新增非核心工做者
22     }else if (!addWorker(command, false)) {
23         //若新建非核心線程失敗則執行拒絕策略
24       reject(command);
25     }
26 }

execute方法是線程池接收任務的入口方法,當建立好一個線程池以後,咱們會調用execute方法並傳入一個Runnable交給線程池去執行。從上面代碼中能夠看到execute方法首先會去判斷當前線程數是否小於corePoolSize,若是小於則調用addWorker方法新建一個核心線程去處理該任務,不然調用workQueue的offer方法將該任務放入到任務隊列中。經過offer方法添加並不會阻塞線程,若是添加成功會返回true,若隊列已滿則返回false。在成功將任務放入到任務隊列後,還會再次檢查線程池是不是Running狀態,若是不是則將剛剛添加的任務從隊列中移除,而後再執行拒絕策略。若是從隊列中移除任務失敗,則再檢查一下線程數是否爲0(有可能恰好所有線程都被終止了),是的話就新建一個非核心線程去處理。若是任務隊列已經滿了,此時offer方法會返回false,接下來會再次調用addWorker方法新增一個非核心線程來處理該任務。若是期間建立線程失敗,則最後會執行拒絕策略。調試

5. 工做線程的實現

 1 //工做者類
 2 private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
 3     //關聯線程
 4     final Thread thread;
 5     //初始任務
 6     Runnable firstTask;
 7     //完成任務數
 8     volatile long completedTasks;
 9 
10     //構造器
11     Worker(Runnable firstTask) {
12         //抑制中斷直到runWorker
13         setState(-1);
14         //設置初始任務
15         this.firstTask = firstTask;
16         //設置關聯線程
17         this.thread = getThreadFactory().newThread(this);
18     }
19     
20     public void run() {
21         runWorker(this);
22     }
23     
24     //判斷是否佔有鎖, 0表明未佔用, 1表明已佔用
25     protected boolean isHeldExclusively() {
26         return getState() != 0;
27     }
28 
29     //嘗試獲取鎖
30     protected boolean tryAcquire(int unused) {
31         if (compareAndSetState(0, 1)) {
32             setExclusiveOwnerThread(Thread.currentThread());
33             return true;
34         }
35         return false;
36     }
37     
38     //嘗試釋放鎖
39     protected boolean tryRelease(int unused) {
40         setExclusiveOwnerThread(null);
41         setState(0);
42         return true;
43     }
44 
45     public void lock() { acquire(1); }
46     public boolean tryLock() { return tryAcquire(1); }
47     public void unlock() { release(1); }
48     public boolean isLocked() { return isHeldExclusively(); }
49 
50     //中斷關聯線程
51     void interruptIfStarted() {
52         Thread t;
53         //將活動線程和閒置線程都中斷
54         if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
55             try {
56                 t.interrupt();
57             } catch (SecurityException ignore) {
58                 //ignore
59             }
60         }
61     }
62 }

ThreadPoolExecutor內部實現了一個Worker類,用它來表示工做線程。每一個Worker對象都持有一個關聯線程和分配給它的初始任務。Worker類繼承自AQS並實現了本身的加鎖解鎖方法,說明每一個Worker對象也是一個鎖對象。同時Worker類還實現了Runnable接口,所以每一個Worker對象都是能夠運行的。Worker類有一個惟一的構造器,須要傳入一個初始任務給它,在構造器裏面首先將同步狀態設置爲-1,這個操做主要是抑制中斷直到runWorker方法運行,爲啥要這樣作?咱們繼續看下去,能夠看到在設置完初始任務以後,立刻就開始設置關聯線程,關聯線程是經過線程工廠的newThread方法來生成的,這時將Worker對象自己看成任務傳給關聯線程。所以在啓動關聯線程時(調用start方法),會運行Worker對象自身的run方法。而run方法裏面緊接着調用runWorker方法,也就是說只有在runWorker方法運行時才代表關聯線程已啓動,這時去中斷關聯線程纔有意義,所以前面要經過設置同步狀態爲-1來抑制中斷。那麼爲啥將同步狀態設置爲-1就能夠抑制中斷?每一個Worker對象都是經過調用interruptIfStarted方法來中斷關聯線程的,在interruptIfStarted方法內部會判斷只有同步狀態>=0時纔會中斷關聯線程。所以將同步狀態設置爲-1能起到抑制中斷的做用。

6. 工做線程的建立

 1 //添加工做線程
 2 private boolean addWorker(Runnable firstTask, boolean core) {
 3     retry:
 4     for (;;) {
 5         int c = ctl.get();
 6         int rs = runStateOf(c);
 7         //只有如下兩種狀況會繼續添加線程
 8         //1.狀態爲running
 9         //2.狀態爲shutdown,首要任務爲空,但任務隊列中還有任務
10         if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) {
11             return false;
12         }
13         for (;;) {
14             int wc = workerCountOf(c);
15             //如下三種狀況不繼續添加線程:
16             //1.線程數大於線程池總容量
17             //2.當前線程爲核心線程,且核心線程數達到corePoolSize
18             //3.當前線程非核心線程,且總線程達到maximumPoolSize
19             if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) {
20                 return false;
21             }
22             //不然繼續添加線程, 先將線程數加一
23             if (compareAndIncrementWorkerCount(c)) {
24                 //執行成功則跳過外循環
25                 break retry;
26             }
27             //CAS操做失敗再次檢查線程池狀態
28             c = ctl.get();
29             //若是線程池狀態改變則繼續執行外循環
30             if (runStateOf(c) != rs) {
31                 continue retry;
32             }
33             //不然代表CAS操做失敗是workerCount改變, 繼續執行內循環
34         }
35     }
36     boolean workerStarted = false;
37     boolean workerAdded = false;
38     Worker w = null;
39     try {
40         final ReentrantLock mainLock = this.mainLock;
41         w = new Worker(firstTask);
42         final Thread t = w.thread;
43         if (t != null) {
44             mainLock.lock();
45             try {
46                 int c = ctl.get();
47                 int rs = runStateOf(c);
48                 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
49                     //若是線程已經開啓則拋出異常
50                     if (t.isAlive()) throw new IllegalThreadStateException();
51                     //將工做者添加到集合中
52                     workers.add(w);
53                     int s = workers.size();
54                     //記錄線程達到的最大值
55                     if (s > largestPoolSize) {
56                         largestPoolSize = s;
57                     }
58                     workerAdded = true;
59                 }
60             } finally {
61                 mainLock.unlock();
62             }
63             //將工做者添加到集合後則啓動線程
64             if (workerAdded) {
65                 t.start();
66                 workerStarted = true;
67             }
68         }
69     } finally {
70         //若是線程啓動失敗則回滾操做
71         if (!workerStarted) {
72             addWorkerFailed(w);
73         }
74     }
75     return workerStarted;
76 }

上面咱們知道在execute方法裏面會調用addWorker方法來添加工做線程。經過代碼能夠看到進入addWorker方法裏面會有兩層自旋循環,在外層循環中獲取線程池當前的狀態,若是線程池狀態不符合就直接return,在內層循環中獲取線程數,若是線程數超過限定值也直接return。只有通過這兩重判斷以後纔會使用CAS方式來將線程數加1。成功將線程數加1以後就跳出外層循環去執行後面的邏輯,不然就根據不一樣條件來進行自旋,若是是線程池狀態改變就執行外層循環,若是是線程數改變就執行內層循環。當線程數成功加1以後,後面就是去新建一個Worker對象,並在構造時傳入初始任務給它。而後將這個Worker對象添加到工做者集合當中,添加成功後就調用start方法來啓動關聯線程。

7. 工做線程的執行

 1 //運行工做者
 2 final void runWorker(Worker w) {
 3     //獲取當前工做線程
 4     Thread wt = Thread.currentThread();
 5     //獲取工做者的初始任務
 6     Runnable task = w.firstTask;
 7     //將工做者的初始任務置空
 8     w.firstTask = null;
 9     //將同步狀態從-1設爲0
10     w.unlock();
11     boolean completedAbruptly = true;
12     try {
13         //初始任務不爲空則執行初始任務, 不然從隊列獲取任務
14         while (task != null || (task = getTask()) != null) {
15             //確保獲取到任務後才加鎖
16             w.lock(); 
17             //若狀態大於等於stop, 保證當前線程被中斷
18             //若狀態小於stop, 保證當前線程未被中斷
19             //在清理中斷狀態時可能有其餘線程在修改, 因此會再檢查一次
20             if ((runStateAtLeast(ctl.get(), STOP) || 
21                 (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) {
22                 wt.interrupt();
23             }
24             try {
25                 //任務執行前作些事情
26                 beforeExecute(wt, task);
27                 Throwable thrown = null;
28                 try {
29                     //執行當前任務
30                     task.run();
31                 } catch (RuntimeException x) {
32                     thrown = x; throw x;
33                 } catch (Error x) {
34                     thrown = x; throw x;
35                 } catch (Throwable x) {
36                     thrown = x; throw new Error(x);
37                 } finally {
38                     //任務執行後作一些事情
39                     afterExecute(task, thrown);
40                 }
41             } finally {
42                 //將執行完的任務置空
43                 task = null;
44                 //將完成的任務數加一
45                 w.completedTasks++;
46                 w.unlock();
47             }
48         }
49         //設置該線程爲正常完成任務
50         completedAbruptly = false;
51     } finally {
52         //執行完全部任務後將線程刪除
53         processWorkerExit(w, completedAbruptly);
54     }
55 }

上面咱們知道,將Worker對象添加到workers集合以後就會去調用關聯線程的start方法,因爲傳給關聯線程的Runnable就是Worker對象自己,所以會調用Worker對象實現的run方法,最後會調用到runWorker方法。咱們看到上面代碼,進入到runWorker方法裏面首先獲取了Worker對象的初始任務,而後調用unlock方法將同步狀態加1,因爲在構造Worker對象時將同步狀態置爲-1了,因此這裏同步狀態變回0,所以在這以後才能夠調用interruptIfStarted方法來中斷關聯線程。若是初始任務不爲空就先去執行初始任務,不然就調用getTask方法去任務隊列中獲取任務,能夠看到這裏是一個while循環,也就是說工做線程在執行完本身的任務以後會不斷的從任務隊列中獲取任務,直到getTask方法返回null,而後工做線程退出while循環最後執行processWorkerExit方法來移除本身。若是須要在全部任務執行以前或以後作些處理,能夠分別實現beforeExecute方法和afterExecute方法。

8. 任務的獲取

 1 //從任務隊列中獲取任務
 2 private Runnable getTask() {
 3     //上一次獲取任務是否超時
 4     boolean timedOut = false;
 5     retry:
 6     //在for循環裏自旋
 7     for (;;) {
 8         int c = ctl.get();
 9         int rs = runStateOf(c);
10         //如下兩種狀況會將工做者數減爲0並返回null,並直接使該線程終止:
11         //1.狀態爲shutdown而且任務隊列爲空
12         //2.狀態爲stop, tidying或terminated
13         if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
14             decrementWorkerCount();
15             return null;
16         }
17         
18         boolean timed;
19         //判斷是否要剔除當前線程
20         for (;;) {
21             int wc = workerCountOf(c);
22             //如下兩種狀況會在限定時間獲取任務:
23             //1.容許核心線程超時
24             //2.線程數大於corePoolSize
25             timed = allowCoreThreadTimeOut || wc > corePoolSize;
26             //如下兩種狀況不執行剔除操做:
27             //1.上次任務獲取未超時
28             //2.上次任務獲取超時, 但沒要求在限定時間獲取
29             if (wc <= maximumPoolSize && !(timedOut && timed)) {
30                 break;
31             }
32             //若上次任務獲取超時, 且規定在限定時間獲取, 則將線程數減一
33             if (compareAndDecrementWorkerCount(c)) {
34                 //CAS操做成功後直接返回null
35                 return null;
36             }
37             //CAS操做失敗後再次檢查狀態
38             c = ctl.get();
39             //若狀態改變就從外層循環重試
40             if (runStateOf(c) != rs) {
41                 continue retry;
42             }
43             //不然代表是workerCount改變, 繼續在內循環重試
44         }
45         
46         try {
47             //若timed爲true, 則在規定時間內返回
48             //若timed爲false, 則阻塞直到獲取成功
49             //注意:閒置線程會一直在這阻塞
50             Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
51             //獲取任務不爲空則返回該任務
52             if (r != null) {
53                 return r;
54             }
55             //不然將超時標誌設爲true
56             timedOut = true;
57         } catch (InterruptedException retry) {
58             timedOut = false;
59         }
60     }
61 }

工做線程在while循環裏不斷的經過getTask方法來從任務隊列中獲取任務,咱們看一下getTask方法是怎樣獲取任務的。進入第一個for循環以後有一個if判斷,從這裏咱們能夠看到,若是線程池狀態爲shutdown,會繼續消費任務隊列裏面的任務;若是線程池狀態爲stop,則中止消費任務隊列裏剩餘的任務。進入第二個for循環後會給timed變量賦值,因爲allowCoreThreadTimeOut變量默認是false,因此timed的值取決於線程數是否大於corePoolSize,小於爲false,大於則爲true。從任務隊列裏面獲取任務的操做在try塊裏面,若是timed爲true,則調用poll方法進行定時獲取;若是timed爲flase,則調用take方法進行阻塞獲取。也就是說默認狀況下,若是線程數小於corePoolSize,則調用take方法進行阻塞獲取,即便任務隊列爲空,工做線程也會一直等待;若是線程數大於corePoolSize,則調用poll方法進行定時獲取,在keepAliveTime時間內獲取不到任務則會返回null,對應的工做線程也會被移除,但線程數會保持在corePoolSize上。固然若是設置allowCoreThreadTimeOut爲true,則會一直經過調用poll方法來從任務隊列中獲取任務,若是任務隊列長時間爲空,則工做線程會減小至0。

9. 工做線程的退出

 1 //刪除工做線程
 2 private void processWorkerExit(Worker w, boolean completedAbruptly) {
 3     //若非正常完成則將線程數減爲0
 4     if (completedAbruptly) {
 5         decrementWorkerCount();
 6     }
 7     final ReentrantLock mainLock = this.mainLock;
 8     mainLock.lock();
 9     try {
10         //統計完成的任務總數
11         completedTaskCount += w.completedTasks;
12         //在這將工做線程移除
13         workers.remove(w);
14     } finally {
15         mainLock.unlock();
16     }
17     //嘗試終止線程池
18     tryTerminate();
19     //再次檢查線程池狀態
20     int c = ctl.get();
21     //若狀態爲running或shutdown, 則將線程數恢復到最小值
22     if (runStateLessThan(c, STOP)) {
23         //線程正常完成任務被移除
24         if (!completedAbruptly) {
25             //容許核心線程超時最小值爲0, 不然最小值爲核心線程數
26             int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
27             //若是任務隊列還有任務, 則保證至少有一個線程
28             if (min == 0 && !workQueue.isEmpty()) {
29                 min = 1;
30             }
31             //若線程數大於最小值則不新增了
32             if (workerCountOf(c) >= min) {
33                 return;
34             }
35         }
36         //新增工做線程
37         addWorker(null, false);
38     }
39 }

工做線程若是從getTask方法中得到null,則會退出while循環並隨後執行processWorkerExit方法,該方法會在這個工做線程終止以前執行一些操做,咱們看到它會去統計該工做者完成的任務數,而後將其從workers集合中刪除,每刪除一個工做者以後都會去調用tryTerminate方法嘗試終止線程池,但並不必定會真的終止線程池。從tryTerminate方法返回後再次去檢查一遍線程池的狀態,若是線程池狀態爲running或者shutdown,而且線程數小於最小值,則恢復一個工做者。這個最小值是怎樣計算出來的呢?咱們來看看。若是allowCoreThreadTimeOut爲true則最小值爲0,不然最小值爲corePoolSize。但還有一個例外狀況,就是雖然容許核心線程超時了,可是若是任務隊列不爲空的話,那麼必須保證有一個線程存在,所以這時最小值設爲1。後面就是判斷若是工做線程數大於最小值就不新增線程了,不然就新增一個非核心線程。從這個方法能夠看到,每一個線程退出時都會去判斷要不要再恢復一個線程,所以線程池中的線程總數也是動態增減的。

10. 線程池的終止

 1 //平緩關閉線程池
 2 public void shutdown() {
 3     final ReentrantLock mainLock = this.mainLock;
 4     mainLock.lock();
 5     try {
 6         //檢查是否有關閉的權限
 7         checkShutdownAccess();
 8         //將線程池狀態設爲shutdown
 9         advanceRunState(SHUTDOWN);
10         //中斷閒置的線程
11         interruptIdleWorkers();
12         //對外提供的鉤子
13         onShutdown();
14     } finally {
15         mainLock.unlock();
16     }
17     //嘗試終止線程池
18     tryTerminate();
19 }
20 
21 //馬上關閉線程池
22 public List<Runnable> shutdownNow() {
23     List<Runnable> tasks;
24     final ReentrantLock mainLock = this.mainLock;
25     mainLock.lock();
26     try {
27         //檢查是否有關閉的權限
28         checkShutdownAccess();
29         //將線程池狀態設爲stop
30         advanceRunState(STOP);
31         //中斷全部工做線程
32         interruptWorkers();
33         //排幹任務隊列
34         tasks = drainQueue();
35     } finally {
36         mainLock.unlock();
37     }
38     //嘗試終止線程池
39     tryTerminate();
40     return tasks;
41 }

能夠經過兩個方法來終止線程池,經過調用shutdown方法能夠平緩的終止線程池,經過調用shutdownNow方法能夠當即終止線程池。調用shutdown()方法後首先會將線程池狀態設置爲shutdown,這時線程池會拒絕接收外部傳過來的任務,而後調用interruptIdleWorkers()方法中斷閒置線程,剩餘的線程會繼續消費完任務隊列裏的任務以後纔會終止。調用shutdownNow()方法會將線程池狀態設置爲stop,這是線程池也再也不接收外界的任務,而且立刻調用interruptWorkers()方法將全部工做線程都中斷了,而後排幹任務隊列裏面沒有被處理的任務,最後返回未被處理的任務集合。調用shutdown()和shutdownNow()方法後還未真正終止線程池,這兩個方法最後都會調用tryTerminate()方法來終止線程池。咱們看看該方法的代碼。

 1 //嘗試終止線程池
 2 final void tryTerminate() {
 3     for (;;) {
 4         int c = ctl.get();
 5         //如下兩種狀況終止線程池,其餘狀況直接返回:
 6         //1.狀態爲stop
 7         //2.狀態爲shutdown且任務隊列爲空
 8         if (isRunning(c) || runStateAtLeast(c, TIDYING) ||
 9             (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty())) {
10             return;
11         }
12         //若線程不爲空則中斷一個閒置線程後直接返回
13         if (workerCountOf(c) != 0) {
14             interruptIdleWorkers(ONLY_ONE);
15             return;
16         }
17         final ReentrantLock mainLock = this.mainLock;
18         mainLock.lock();
19         try {
20             //將狀態設置爲tidying
21             if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
22                 try {
23                     //線程池終止後作的事情
24                     terminated();
25                 } finally {
26                     //將狀態設置爲終止狀態(TERMINATED)
27                     ctl.set(ctlOf(TERMINATED, 0));
28                     //喚醒條件隊列全部線程
29                     termination.signalAll();
30                 }
31                 return;
32             }
33         } finally {
34             mainLock.unlock();
35         }
36         //若狀態更改失敗則再重試
37     }
38 }

tryTerminate()方法在其餘不少地方也被調用過,好比processWorkerExit()和addWorkerFailed()。調用該方法來嘗試終止線程池,在進入for循環後第一個if判斷過濾了不符合條件的終止操做,只有狀態爲stop,或者狀態爲shutdown且任務隊列爲空這兩種狀況才能繼續執行。第二個if語句判斷工做者數量是否爲0,不爲0的話也直接返回。通過這兩重判斷以後才符合終止線程池的條件,因而先經過CAS操做將線程池狀態設置爲tidying狀態,在tidying狀態會調用用戶本身實現的terminated()方法來作一些處理。到了這一步,無論terminated()方法是否成功執行最後都會將線程池狀態設置爲terminated,也就標誌着線程池真正意義上的終止了。最後會喚醒全部等待線程池終止的線程,讓它們繼續執行。

11. 經常使用線程池參數配置

Executors中有許多靜態工廠方法來建立線程池,在平時使用中咱們都是經過Executors的靜態工廠方法來建立線程池的。這其中有幾個使用線程池的典型例子咱們來看一下。

 1 //固定線程數的線程池
 2 //注:該線程池將corePoolSize和maximumPoolSize都設置爲同一數值,線程池剛建立時線程數爲0,
 3 //以後每接收一個任務建立一個線程,直到線程數達到nThreads,此後線程數再也不增加。若是其中有某個
 4 //線程由於發生異常而終止,線程池將補充一個新的線程。
 5 public static ExecutorService newFixedThreadPool(int nThreads) {
 6     return new ThreadPoolExecutor(nThreads, nThreads,
 7                                   0L, TimeUnit.MILLISECONDS,
 8                                   new LinkedBlockingQueue<Runnable>());
 9 }
10 
11 //單個線程的線程池
12 //注:該線程池將corePoolSize和maximumPoolSize都設置爲1,所以線程池中永遠只有一個線程,
13 //若是該線程由於不可預知的異常而被終止,線程池將會補充一個新的線程。
14 public static ExecutorService newSingleThreadExecutor() {
15     return new FinalizableDelegatedExecutorService
16         (new ThreadPoolExecutor(1, 1,
17                                 0L, TimeUnit.MILLISECONDS,
18                                 new LinkedBlockingQueue<Runnable>()));
19 }
20 
21 //可緩存的線程池
22 //注:該線程池將corePoolSize設置爲0,將maximumPoolSize設置爲Integer.MAX_VALUE,
23 //空閒線程存活時間設置爲60S。也就是說該線程池一開始線程數爲0,隨着任務數的增長線程數也相應
24 //增長,線程數的上限爲Integer.MAX_VALUE。當任務數減小時線程數也隨之減小,最後會減小至0。
25 public static ExecutorService newCachedThreadPool() {
26     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
27                                   60L, TimeUnit.SECONDS,
28                                   new SynchronousQueue<Runnable>());
29 }
相關文章
相關標籤/搜索