Java併發包中線程池ThreadPoolExecutor原理探究

1、線程池簡介

  線程池的使用主要是解決兩個問題:①當執行大量異步任務的時候線程池可以提供更好的性能,在不使用線程池時候,每當須要執行異步任務的時候直接new一個線程來運行的話,線程的建立和銷燬都是須要開銷的。而線程池中的線程是可複用的,不須要每次執行異步任務的時候從新建立和銷燬線程;②線程池提供一種資源限制和管理的手段,好比能夠限制線程的個數,動態的新增線程等等。html

  在下面的分析中,咱們能夠看到,線程池使用一個Integer的原子類型變量來記錄線程池狀態和線程池中的線程數量,經過線程池狀態來控制任務的執行,每一個工做線程Worker線程能夠處理多個任務。數組

2、ThreadPoolExecutor類

一、咱們先簡單看一下關於ThreadPoolExecutor的一些成員變量以及其所表示的含義

   ThreadPoolExecutor繼承了AbstractExecutorService,其中的成員變量ctl是一個Integer類型的原子變量,用來記錄線程池的狀態和線程池中的線程的個數,相似於前面講到的讀寫鎖中使用一個變量保存兩種信息。這裏(Integer看作32位)ctl高三位表示線程池的狀態,後面的29位表示線程池中的線程個數。以下所示是ThreadPoolExecutor源碼中的成員變量安全

 1 //(高3位)表示線程池狀態,(低29位)表示線程池中線程的個數;
 2 // 默認狀態是RUNNING,線程池中線程個數爲0
 3 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 4 
 5 //表示具體平臺下Integer的二進制位數-3後的剩餘位數表示的數纔是線程的個數;
 6 //其中Integer.SIZE=32,-3以後的低29位表示的就是線程的個數了
 7 private static final int COUNT_BITS = Integer.SIZE - 3;
 8 
 9 //線程最大個數(低29位)00011111111111111111111111111111(1<<29-1)
10 private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
11 
12 //線程池狀態(高3位表示線程池狀態)
13 //111 00000000000000000000000000000
14 private static final int RUNNING    = -1 << COUNT_BITS;
15 
16 //000 00000000000000000000000000000
17 private static final int SHUTDOWN   =  0 << COUNT_BITS;
18 
19 //001 00000000000000000000000000000
20 private static final int STOP       =  1 << COUNT_BITS;
21 
22 //010 00000000000000000000000000000
23 private static final int TIDYING    =  2 << COUNT_BITS;
24 
25 //011 00000000000000000000000000000
26 private static final int TERMINATED =  3 << COUNT_BITS;
27 
28 //獲取高3位(運行狀態)==> c & 11100000000000000000000000000000
29 private static int runStateOf(int c)     { return c & ~CAPACITY; }
30 
31 //獲取低29位(線程個數)==> c &  00011111111111111111111111111111
32 private static int workerCountOf(int c)  { return c & CAPACITY; }
33 
34 //計算原子變量ctl新值(運行狀態和線程個數)
35 private static int ctlOf(int rs, int wc) { return rs | wc; }

  下面咱們簡單解釋一下上面的線程狀態的含義:多線程

  ①RUNNING:接受新任務並處理阻塞隊列中的任務併發

  ②SHUTDOWN:拒絕新任務可是處理阻塞隊列中的任務異步

  ③STOP:拒絕新任務並拋棄阻塞隊列中的任務,同時會中斷當前正在執行的任務函數

  ④TIDYING:全部任務執行完以後(包含阻塞隊列中的任務)當前線程池中活躍的線程數量爲0,將要調用terminated方法源碼分析

  ⑥TERMINATED:終止狀態。terminated方法調用以後的狀態post

二、下面初步瞭解一下ThreadPoolExecutor的參數以及實現原理

  ①corePoolSize:線程池核心現車個數性能

  ②workQueue:用於保存等待任務執行的任務的阻塞隊列(好比基於數組的有界阻塞隊列ArrayBlockingQueue、基於鏈表的無界阻塞隊列LinkedBlockingQueue等等)

  ③maximumPoolSize:線程池最大線程數量

  ④ThreadFactory:建立線程的工廠

  ⑤RejectedExecutionHandler:拒絕策略,表示當隊列已滿而且線程數量達到線程池最大線程數量的時候對新提交的任務所採起的策略,主要有四種策略:AbortPolicy(拋出異常)、CallerRunsPolicy(只用調用者所在線程來運行該任務)、DiscardOldestPolicy(丟掉阻塞隊列中最近的一個任務來處理當前提交的任務)、DiscardPolicy(不作處理,直接丟棄掉)

  ⑥keepAliveTime:存活時間,若是當前線程池中的數量比核心線程數量多,而且當前線程是閒置狀態,該變量就是這些線程的最大生存時間

  ⑦TimeUnit:存活時間的時間單位。

  根據上面的參數介紹,簡單瞭解一下線程池的實現原理,以提交一個新任務爲開始點,分析線程池的主要處理流程

三、關於一些線程池的使用類型

  ①newFixedThreadPool:建立一個核心線程個數和最大線程個數均爲nThreads的線程池,而且阻塞隊列長度爲Integer.MAX_VALUE,keepAliveTime=0說明只要線程個數比核心線程個數多而且當前空閒即回收。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

  ②newSingleThreadExecutor:建立一個核心線程個數和最大線程個數都爲1 的線程池,而且阻塞隊列長度爲Integer.MAX_VALUE,keepAliveTime=0說明只要線程個數比核心線程個數多而且當前線程空閒即回收該線程。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

   ③newCachedThreadPoolExecutor:建立一個按需建立線程的線程池,初始線程個數爲0,最多線程個數爲Integer.MAX_VALUE,而且阻塞隊列爲同步隊列(最多隻有一個元素),keepAliveTime=60說明只要當前線程在60s內空閒則回收。這個類型的線程池的特色就是:加入同步隊列的任務會被立刻執行,同步隊列中最多隻有一個任務

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

   四、ThreadPoolExecutor中的其餘成員

  其中的ReentrantLock可參考前面寫到的Java中的鎖——Lock和synchronized,其中降到了ReentrantLock的具體實現原理;

  關於AQS部分可參考前面說到的Java中的隊列同步器AQS,也講到了關於AQS的具體實現原理分析;

  關於條件隊列的相關知識可參考前面寫的Java中的線程協做之Condition,裏面說到了關於Java中線程協做Condition的實現原理;

//獨佔鎖,用來控制新增工做線程Worker操做的原子性
private final ReentrantLock mainLock = new ReentrantLock();

//工做線程集合,Worker繼承了AQS接口和Runnable接口,是具體處理任務的線程對象
//Worker實現AQS,並本身實現了簡單不可重入獨佔鎖,其中state=0表示當前鎖未被獲取狀態,state=1表示鎖被獲取,
//state=-1表示Work建立時候的默認狀態,建立時候設置state=-1是爲了防止runWorker方法運行前被中斷
private final HashSet<Worker> workers = new HashSet<Worker>();

//termination是該鎖對應的條件隊列,在線程調用awaitTermination時候用來存放阻塞的線程
private final Condition termination = mainLock.newCondition();

3、execute(Runnable command)方法實現

  executor方法的做用是提交任務command到線程池執行,能夠簡單的按照下面的圖進行理解,ThreadPoolExecutor的實現相似於一個生產者消費者模型,當用戶添加任務到線程池中至關於生產者生產元素,workers工做線程則直接執行任務或者從任務隊列中獲取任務,至關於消費之消費元素。

 1 public void execute(Runnable command) {
 2     //(1)首先檢查任務是否爲null,爲null拋出異常,不然進行下面的步驟
 3     if (command == null)
 4         throw new NullPointerException();
 5     //(2)ctl值中包含了當前線程池的狀態和線程池中的線程數量
 6     int c = ctl.get();
 7     //(3)workerCountOf方法是獲取低29位,即獲取當前線程池中的線程個數,若是小於corePoolSize,就開啓新的線程運行
 8     if (workerCountOf(c) < corePoolSize) {
 9         if (addWorker(command, true))
10             return;
11         c = ctl.get();
12     }
13     //(4)若是線程池處理RUNNING狀態,就添加任務到阻塞隊列中
14     if (isRunning(c) && workQueue.offer(command)) {
15         //(4-1)二次檢查,獲取ctl值
16         int recheck = ctl.get();
17         //(4-2)若是當前線程池不是出於RUNNING狀態,就從隊列中刪除任務,並執行拒絕策略
18         if (! isRunning(recheck) && remove(command))
19             reject(command);
20         //(4-3)不然,若是線程池爲空,就添加一個線程
21         else if (workerCountOf(recheck) == 0)
22             addWorker(null, false);
23     }
24     //(5)若是隊列滿,則新增線程,若是新增線程失敗,就執行拒絕策略
25     else if (!addWorker(command, false))
26         reject(command);
27 }

  咱們在看一下上面代碼的執行流程,按照標記的數字進行分析:

  ①步驟(3)判斷當前線程池中的線程個數是否小於corePoolSize,若是小於核心線程數,會向workers裏面新增一個核心線程執行任務。

  ②若是當前線程池中的線程數量大於核心線程數,就執行(4)。(4)首先判斷當前線程池是否處於RUNNING狀態,若是處於該狀態,就添加任務到任務隊列中,這裏須要判斷線程池的狀態是由於線程池可能已經處於非RUNNING狀態,而在非RUNNING狀態下是須要拋棄新任務的。

  ③若是想任務隊列中添加任務成功,須要進行二次校驗,由於在添加任務到任務隊列後,可能線程池的狀態發生了變化,因此這裏須要進行二次校驗,若是當前線程池已經不是RUNNING狀態了,須要將任務從任務隊列中移除,而後執行拒絕策略;若是二次校驗經過,則執行4-3代碼從新判斷當前線程池是否爲空,若是線程池爲空沒有線程,那麼就須要新建立一個線程。

  ④若是上面的步驟(4)建立添加任務失敗,說明隊列已滿,那麼(5)會嘗試再開啓新的線程執行任務(類比上圖中的thread3和thread4,即不是核心線程的那些線程),若是當前線程池中的線程個數已經大於最大線程數maximumPoolSize,表示不能開啓新的線程。這就屬於線程池滿而且任務隊列滿,就須要執行拒絕策略了。

  下面咱們在看看addWorker方法的實現

 1 private boolean addWorker(Runnable firstTask, boolean core) {
 2     retry:
 3     for (;;) {
 4         int c = ctl.get();
 5         int rs = runStateOf(c);
 6 
 7         //(6)檢查隊列是否只在必要時候爲空
 8         if (rs >= SHUTDOWN &&
 9             ! (rs == SHUTDOWN &&
10                firstTask == null &&
11                ! workQueue.isEmpty()))
12             return false;
13 
14         //(7)使用CAS增長線程個數
15         for (;;) {
16             //根據ctl值得到當前線程池中的線程數量
17             int wc = workerCountOf(c);
18             //(7-1)若是線程數量超出限制,返回false
19             if (wc >= CAPACITY ||
20                 wc >= (core ? corePoolSize : maximumPoolSize))
21                 return false;
22             //(7-2)CAS增長線程數量,同時只有一個線程能夠成功
23             if (compareAndIncrementWorkerCount(c))
24                 break retry;
25             c = ctl.get();  // 從新讀取ctl值
26             //(7-3)CAS失敗了,須要查看當前線程池狀態是否發生變化,若是發生變化須要跳轉到外層循環嘗試從新獲取線程池狀態,不然內層循環從新進行CAS增長線程數量
27             if (runStateOf(c) != rs)
28                 continue retry;
29         }
30     }
31 
32     //(8)執行到這裏說明CAS增長新線程個數成功了,咱們須要開始建立新的工做線程Worker
33     boolean workerStarted = false;
34     boolean workerAdded = false;
35     Worker w = null;
36     try {
37         //(8-1)建立新的worker
38         w = new Worker(firstTask);
39         final Thread t = w.thread;
40         if (t != null) {
41             final ReentrantLock mainLock = this.mainLock;
42             //(8-2)加獨佔鎖,保證workers的同步,可能線程池中的多個線程調用了線程池的execute方法
43             mainLock.lock();
44             try {
45                 // (8-3)從新檢查線程池狀態,以避免在獲取鎖以前調用shutdown方法改變線程池狀態
46                 int rs = runStateOf(ctl.get());
47 
48                 if (rs < SHUTDOWN ||
49                     (rs == SHUTDOWN && firstTask == null)) {
50                     if (t.isAlive()) // precheck that t is startable
51                         throw new IllegalThreadStateException();
52                     //(8-4)添加新任務
53                     workers.add(w);
54                     int s = workers.size();
55                     if (s > largestPoolSize)
56                         largestPoolSize = s;
57                     workerAdded = true;
58                 }
59             } finally {
60                 mainLock.unlock();
61             }
62             //(8-6)添加新任務成功以後,啓動任務
63             if (workerAdded) {
64                 t.start();
65                 workerStarted = true;
66             }
67         }
68     } finally {
69         if (! workerStarted)
70             addWorkerFailed(w);
71     }
72     return workerStarted;
73 }

  簡單再分析說明一下上面的代碼,addWorker方法主要分爲兩部分,第一部分是使用CAS線程安全的添加線程數量,第二部分則是建立新的線程而且將任務併發安全的添加到新的workers之中,而後啓動線程執行。

  ①代碼(6)中檢查隊列是否只在必要時候爲空,只有線程池狀態符合條件纔可以進行下面的步驟,從(6)中的判斷條件來看,下面的集中狀況addWorker會直接返回false

  ( I )當前線程池狀態爲STOP,TIDYING或者TERMINATED ; (I I)當前線程池狀態爲SHUTDOWN而且已經有了第一個任務;   (I I I)當前線程池狀態爲SHUTDOWN而且任務隊列爲空

  ②外層循環中判斷條件經過以後,在內層循環中使用CAS增長線程數,當CAS成功就退出雙重循環進行(8)步驟代碼的執行,若是失敗須要查看當前線程池的狀態是否發生變化,若是發生變化須要進行外層循環從新判斷線程池狀態而後在進入內層循環從新進行CAS增長線程數,若是線程池狀態沒有發生變化可是上一次CAS失敗就繼續進行CAS嘗試。

  ③執行到(8)代碼處,代表當前已經成功增長 了線程數,可是尚未線程執行任務。ThreadPoolExecutor中使用全局獨佔鎖mainLock來控制將新增的工做線程Worker線程安全的添加到工做者線程集合workers中。

  ④(8-2)獲取了獨佔鎖,可是在獲取到鎖以後,還須要進行從新檢查線程池的狀態,這是爲了不在獲取全局獨佔鎖以前其餘線程調用了shutDown方法關閉了線程池。若是線程池已經關閉須要釋放鎖。不然將新增的線程添加到工做集合中,釋放鎖啓動線程執行任務。

  上面的addWorker方法最後幾行中,會判斷添加工做線程是否成功,若是失敗,會執行addWorkerFailed方法,將任務從workers中移除,而且workerCount作-1操做。

 1 private void addWorkerFailed(Worker w) {
 2     final ReentrantLock mainLock = this.mainLock;
 3     //獲取鎖
 4     mainLock.lock();
 5     try {
 6       //若是worker不爲null
 7       if (w != null)
 8           //workers移除worker
 9           workers.remove(w);
10       //經過CAS操做,workerCount-1
11       decrementWorkerCount();
12       tryTerminate();
13     } finally {
14       //釋放鎖
15       mainLock.unlock();
16     }
17 }

4、工做線程Worker的執行

(1)工做線程Worker類源碼分析

  上面查看addWorker方法在CAS更新線程數成功以後,下面就是建立新的Worker線程執行任務,因此咱們這裏先查看Worker類,下面是Worker類的源碼,咱們能夠看出,Worker類繼承了AQS並實現了Runnable接口,因此他既是一個自定義的同步組件,也是一個執行任務的線程類。下面咱們分析Worker類的執行

 1 private final class Worker
 2     extends AbstractQueuedSynchronizer
 3     implements Runnable
 4 {
 5 
 6     /** 使用線程工廠建立的線程,執行任務 */
 7     final Thread thread;
 8     /** 初始化執行任務 */
 9     Runnable firstTask;
10     /** 計數 */
11     volatile long completedTasks;
12 
13     /**
14      * 給出初始firstTask,線程建立工廠建立新的線程
15      */
16     Worker(Runnable firstTask) {
17         setState(-1); // 防止在調用runWorker以前被中斷
18         this.firstTask = firstTask;
19         this.thread = getThreadFactory().newThread(this); //使用threadFactory建立線程
20     }
21 
22     /** run方法實際上執行的是runWorker方法  */
23     public void run() {
24         runWorker(this);
25     }
26 
27     // 關於同步狀態(鎖)
28     //
29     // 同步狀態state=0表示鎖未被獲取
30     // 同步狀態state=1表示鎖被獲取
31 
32     protected boolean isHeldExclusively() {
33         return getState() != 0;
34     }
35 
36     //下面都是重寫AQS的方法,Worker爲自定義的同步組件
37     protected boolean tryAcquire(int unused) {
38         if (compareAndSetState(0, 1)) {
39             setExclusiveOwnerThread(Thread.currentThread());
40             return true;
41         }
42         return false;
43     }
44 
45     protected boolean tryRelease(int unused) {
46         setExclusiveOwnerThread(null);
47         setState(0);
48         return true;
49     }
50 
51     public void lock()        { acquire(1); }
52     public boolean tryLock()  { return tryAcquire(1); }
53     public void unlock()      { release(1); }
54     public boolean isLocked() { return isHeldExclusively(); }
55 
56     void interruptIfStarted() {
57         Thread t;
58         if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
59             try {
60                 t.interrupt();
61             } catch (SecurityException ignore) {
62             }
63         }
64     }
65 }

  在構造函數中咱們能夠看出,首先將同步狀態state置爲-1,而Worker這個同步組件的state有三個值,其中state=-1表示Work建立時候的默認狀態,建立時候設置state=-1是爲了防止runWorker方法運行前被中斷 前面說到過這個結論,這裏置爲-1是爲了不當前Worker在調用runWorker方法以前被中斷(當其餘線程調用線程池的shutDownNow時候,若是Worker的state>=0則會中斷線程),設置爲-1就不會被中斷了。而Worker實現Runnable接口,那麼須要重寫run方法,在run方法中,咱們能夠看到,實際上執行的是runWorker方法,在runWorker方法中,會首先調用unlock方法,該方法會將state置爲0,因此這個時候調用shutDownNow方法就會中斷當前線程,而這個時候已經進入了runWork方法了,就不會在尚未執行runWorker方法的時候就中斷線程。

(2)runWorker方法的源碼分析

 1 final void runWorker(Worker w) {
 2     Thread wt = Thread.currentThread();
 3     Runnable task = w.firstTask;
 4     w.firstTask = null;
 5     w.unlock(); // 這個時候調用unlock方法,將state置爲0,就能夠被中斷了
 6     boolean completedAbruptly = true;
 7     try {
 8         //(10)若是當前任務爲null,或者從任務隊列中獲取到的任務爲null,就跳轉到(11)處執行清理工做
 9         while (task != null || (task = getTask()) != null) {
10             //task不爲null,就須要線程執行任務,這個時候,須要獲取工做線程內部持有的獨佔鎖
11             w.lock();
12             /**若是線程池已被中止(STOP)(至少大於STOP狀態),要確保線程都被中斷
13              * 若是狀態不對,檢查當前線程是否中斷並清除中斷狀態,而且再次檢查線程池狀態是否大於STOP
14              * 若是上述知足,檢查該對象是否處於中斷狀態,不清除中斷標記
15              */
16             if ((runStateAtLeast(ctl.get(), STOP) ||
17                  (Thread.interrupted() &&
18                   runStateAtLeast(ctl.get(), STOP))) &&
19                 !wt.isInterrupted())
20                 //中斷該對象
21                 wt.interrupt();
22             try {
23                 //執行任務以前要作的事情
24                 beforeExecute(wt, task);
25                 Throwable thrown = null;
26                 try {
27                     task.run(); //執行任務
28                 } catch (RuntimeException x) {
29                     thrown = x; throw x;
30                 } catch (Error x) {
31                     thrown = x; throw x;
32                 } catch (Throwable x) {
33                     thrown = x; throw new Error(x);
34                 } finally {
35                     //執行任務以後的方法
36                     afterExecute(task, thrown);
37                 }
38             } finally {
39                 task = null;
40                 //更新當前已完成任務數量
41                 w.completedTasks++;
42                 //釋放鎖
43                 w.unlock();
44             }
45         }
46         completedAbruptly = false;
47     } finally {
48         //執行清理工做:處理並退出當前worker
49         processWorkerExit(w, completedAbruptly);
50     }
51 }

  咱們梳理一下runWorker方法的執行流程

  ①首先先執行unlock方法,將Worker的state置爲0,這樣工做線程就能夠被中斷了(後續的操做若是線程池關閉就須要線程被中斷)

  ②首先判斷判斷當前的任務(當前工做線程中的task,或者從任務隊列中取出的task)是否爲null,若是不爲null就往下執行,爲null就執行processWorkerExit方法。

  ③獲取工做線程內部持有的獨佔鎖(避免在執行任務期間,其餘線程調用shutdown後正在執行的任務被中斷,shutdown只會中斷當前被阻塞掛起的沒有執行任務的線程)

  ④而後執行beforeExecute()方法,該方法爲擴展接口代碼,表示在具體執行任務以前所作的一些事情,而後執行task.run()方法執行具體任務,執行完以後會調用afterExecute()方法,用以處理任務執行完畢以後的工做,也是一個擴展接口代碼。

  ⑤更新當前線程池完成的任務數,並釋放鎖

(3)執行清理工做的方法processWorkerExit

  下面是方法processWorkerExit的源碼,在下面的代碼中

  ①首先(1-1)處統計線程池完成的任務個數,而且在此以前獲取全局鎖,而後更新當前的全局計數器,而後從工做線程集合中移除當前工做線程,完成清理工做。

  ②代碼(1-2)調用了tryTerminate 方法,在該方法中,判斷了當前線程池狀態是SHUTDOWN而且隊列不爲空或者當前線程池狀態爲STOP而且當前線程池中沒有活動線程,則置線程池狀態爲TERMINATED。若是設置稱爲了TERMINATED狀態,還須要調用全局條件變量termination的signalAll方法喚醒全部由於調用線程池的awaitTermination方法而被阻塞住的線程,使得線程池中的全部線程都中止,從而使得線程池爲TERMINATED狀態。

  ③代碼(1-3)處判斷當前線程池中的線程個數是否小於核心線程數,若是是,須要新增一個線程保證有足夠的線程能夠執行任務隊列中的任務或者提交的任務。

 1 private void processWorkerExit(Worker w, boolean completedAbruptly) {
 2     /*
 3     *completedAbruptly:是由runWorker傳過來的參數,表示是否忽然完成的意思
 4     *當在就是在執行任務過程中出現異常,就會忽然完成,傳true
 5     *
 6     *若是是忽然完成,須要經過CAS操做,更新workerCount(-1操做)
 7     *不是忽然完成,則不須要-1,由於getTask方法當中已經-1(getTask方法中執行了decrementWorkerCount()方法)
 8     */
 9     if (completedAbruptly)
10         decrementWorkerCount();
11     //(1-1)在統計完成任務個數以前加上全局鎖,而後統計線程池中完成的任務個數並更新全局計數器,並從工做集中刪除當前worker
12     final ReentrantLock mainLock = this.mainLock;
13     mainLock.lock(); //得到全局鎖
14     try {
15         completedTaskCount += w.completedTasks; //更新已完成的任務數量
16         workers.remove(w); //將完成該任務的線程worker從工做線程集合中移除
17     } finally {
18         mainLock.unlock(); //釋放鎖
19     }
20     /**(1-2)
21      * 這一個方法調用完成了下面的事情:
22      * 判斷若是當前線程池狀態是SHUTDOWN而且工做隊列爲空,
23      * 或者當前線程池狀態是STOP而且當前線程池裏面沒有活動線程,
24      * 則設置當前線程池狀態爲TERMINATED,若是設置成了TERMINATED狀態,
25      * 還須要調用條件變量termination的signAll方法激活全部由於調用線程池的awaitTermination方法而被阻塞的線程
26      */
27     tryTerminate();
28 
29     //(1-3)若是當前線程池中線程數小於核心線程,則增長核心線程數
30     int c = ctl.get();
31     //判斷當前線程池的狀態是否小於STOP(RUNNING或者SHUTDOWN)
32     if (runStateLessThan(c, STOP)) {
33         //若是任務突然完成,執行後續的代碼
34         if (!completedAbruptly) {
35             //allowCoreThreadTimeOut表示是否容許核心線程超時,默認爲false
36             //min這裏當默認爲allowCoreThreadTimeOut默認爲false的時候,min置爲coorPoolSize
37             int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
38             //這裏說明:若是容許核心線程超時,那麼allowCoreThreadTimeOut可爲true,那麼min值爲0,不須要維護核心線程了
39             //若是min爲0而且任務隊列不爲空
40             if (min == 0 && ! workQueue.isEmpty())
41                 min = 1; //這裏表示若是min爲0,且隊列不爲空,那麼至少須要一個核心線程存活來保證任務的執行
42             //若是工做線程數大於min,表示當前線程數知足,直接返回
43             if (workerCountOf(c) >= min)
44                 return; // replacement not needed
45         }
46         addWorker(null, false);
47     }
48 }

   在tryTerminate 方法中,咱們簡單說明了該方法的做用,下面是該方法的源碼,能夠看出源碼實現上和上面所總結的功能是差很少的

 1 final void tryTerminate() {
 2     for (;;) {
 3         //獲取線程池狀態
 4         int c = ctl.get();
 5         //若是線程池狀態爲RUNNING
 6         //或者狀態大於TIDYING
 7         //或者狀態==SHUTDOWN並未任務隊列不爲空
 8         //直接返回,不能調用terminated方法
 9         if (isRunning(c) ||
10             runStateAtLeast(c, TIDYING) ||
11             (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
12             return;
13         //若是線程池中工做線程數不爲0,須要中斷線程
14         if (workerCountOf(c) != 0) { // Eligible to terminate
15             interruptIdleWorkers(ONLY_ONE);
16             return;
17         }
18         //得到線程池的全局鎖
19         final ReentrantLock mainLock = this.mainLock;
20         mainLock.lock();
21         try {
22             //經過CAS操做,將線程池狀態設置爲TIDYING
23             if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { //private static int ctlOf(int rs, int wc) { return rs | wc; }
24                 try {
25                     //調用terminated方法
26                     terminated();
27                 } finally {
28                     //最終將線程狀態設置爲TERMINATED
29                     ctl.set(ctlOf(TERMINATED, 0));
30                     //調用條件變量termination的signaAll方法喚醒全部由於
31                     //調用線程池的awaitTermination方法而被阻塞的線程
32                     //private final Condition termination = mainLock.newCondition();
33                     termination.signalAll();
34                 }
35                 return;
36             }
37         } finally {
38             mainLock.unlock();
39         }
40         // else retry on failed CAS
41     }
42 }

 5、補充(shutdown、shutdownNow、awaitTermination方法)

(1)shutdown操做

  咱們在使用線程池的時候知道,調用shutdown方法以後線程池就不會再接受新的任務了,可是任務隊列中的任務仍是須要執行完的。調用該方法會馬上返回,並非等到線程池的任務隊列中的全部任務執行完畢在返回的。

 1 public void shutdown() {
 2     //得到線程池的全局鎖
 3     final ReentrantLock mainLock = this.mainLock;
 4     mainLock.lock();
 5     try {
 6         //進行權限檢查
 7         checkShutdownAccess();
 8         
 9         //設置當前線程池的狀態的SHUTDOWN,若是線程池狀態已是該狀態就會直接返回,下面咱們會分析這個方法的源碼
10         advanceRunState(SHUTDOWN);
11         
12         //設置中斷 標誌
13         interruptIdleWorkers();
14         onShutdown(); // hook for ScheduledThreadPoolExecutor
15     } finally {
16         mainLock.unlock();
17     }
18     //嘗試將狀態變爲TERMINATED,上面已經分析過該方法的源碼
19     tryTerminate();
20 }

  該方法的源碼比較簡短,首先檢查了安全管理器,是查看當前調用shutdown命令的線程是否有關閉線程的權限,若是有權限還須要看調用線程是否有中斷工做線程的權限,若是沒有權限將會拋出SecurityException異常或者空指針異常。下面咱們查看一下advanceRunState   方法的源碼。

 1 private void advanceRunState(int targetState) {
 2     for (;;) {
 3         //下面的方法執行的就是:
 4         //首先獲取線程的ctl值,而後判斷當前線程池的狀態若是已是SHUTDOWN,那麼if條件第一個爲真就直接返回
 5         //若是不是SHUTDOWN狀態,就須要CAS的設置當前狀態爲SHUTDOWN
 6         int c = ctl.get();
 7         if (runStateAtLeast(c, targetState) ||
 8             //private static int ctlOf(int rs, int wc) { return rs | wc; }
 9             ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
10             break;
11     }
12 }

  咱們能夠看出advanceRunState   方法實際上就是判斷當前線程池的狀態是否爲SHUTDWON,若是是那麼就返回,不然就須要設置當前狀態爲SHUTDOWN。

  咱們再來看看shutdown方法中調用線程中斷的方法interruptIdleWorkers源碼

 1 private void interruptIdleWorkers() {
 2     interruptIdleWorkers(false);
 3 }
 4 private void interruptIdleWorkers(boolean onlyOne) {
 5     final ReentrantLock mainLock = this.mainLock;
 6     mainLock.lock();
 7     try {
 8         for (Worker w : workers) {
 9             Thread t = w.thread;
10             //若是工做線程沒有被中斷,而且沒有正在運行設置中斷標誌
11             if (!t.isInterrupted() && w.tryLock()) {
12                 try {
13                     //須要中斷當前線程
14                     t.interrupt();
15                 } catch (SecurityException ignore) {
16                 } finally {
17                     w.unlock();
18                 }
19             }
20             if (onlyOne)
21                 break;
22         }
23     } finally {
24         mainLock.unlock();
25     }
26 }

  上面的代碼中,須要設置全部空閒線程的中斷標誌。首先獲取線程池的全局鎖,同時只有一個線程能夠調用shutdown方法設置中斷標誌。而後嘗試獲取工做線程Worker本身的鎖,獲取成功則能夠設置中斷標誌(這是因爲正在執行任務的線程須要獲取本身的鎖,而且不可重入,因此正在執行的任務沒有被中斷),這裏要中斷的那些線程是阻塞到getTask()方法並嘗試從任務隊列中獲取任務的線程即空閒線程。

(2)shutdownNow操做

   在使用線程池的時候,若是咱們調用了shutdownNow方法,線程池不只不會再接受新的任務,還會將任務隊列中的任務丟棄,正在執行的任務也會被中斷,而後馬上返回該方法,不會等待激活的任務完成,返回值爲當前任務隊列中被丟棄的任務列表

 1 public List<Runnable> shutdownNow() {
 2     List<Runnable> tasks;
 3     final ReentrantLock mainLock = this.mainLock;
 4     mainLock.lock();
 5     try {
 6         checkShutdownAccess(); //仍是進行權限檢查
 7         advanceRunState(STOP); //設置線程池狀態臺STOP
 8         interruptWorkers(); //中斷全部線程
 9         tasks = drainQueue(); //將任務隊列中的任務移動到task中
10     } finally {
11         mainLock.unlock();
12     }
13     tryTerminate();
14     return tasks; //返回tasks
15 }

   從上面的代碼中,咱們能夠能夠發現,shutdownNow方法也是首先須要檢查調用該方法的線程的權限,以後不一樣於shutdown方法之處在於須要即刻設置當前線程池狀態爲STOP,而後中斷全部線程(空閒線程+正在執行任務的線程),移除任務隊列中的任務

 1 private void interruptWorkers() {
 2     final ReentrantLock mainLock = this.mainLock;
 3     mainLock.lock();
 4     try {
 5         for (Worker w : workers) //不須要判斷當前線程是否在執行任務(即不須要調用w.tryLock方法),中斷全部線程
 6             w.interruptIfStarted();
 7     } finally {
 8         mainLock.unlock();
 9     }
10 }

(3)awaitTermination操做

   當線程調用該方法以後,會阻塞調用者線程,直到線程池狀態爲TERMINATED狀態纔會返回,或者等到超時時間到以後會返回,下面是該方法的源碼。

 1 //調用該方法以後,會阻塞調用者線程,直到線程池狀態爲TERMINATED狀態纔會返回,或者等到超時時間到以後會返回
 2 public boolean awaitTermination(long timeout, TimeUnit unit)
 3     throws InterruptedException {
 4     long nanos = unit.toNanos(timeout);
 5     final ReentrantLock mainLock = this.mainLock;
 6     mainLock.lock();
 7     try {
 8         //阻塞當前線程,(獲取了Worker本身的鎖),那麼當前線程就不會再執行任務(由於獲取不到鎖)
 9         for (;;) {
10             //當前線程池狀態爲TERMINATED狀態,會返回true
11             if (runStateAtLeast(ctl.get(), TERMINATED)) 
12                 return true;
13             //超時時間到返回false
14             if (nanos <= 0) 
15                 return false;
16             nanos = termination.awaitNanos(nanos);
17         }
18     } finally {
19         mainLock.unlock();
20     }
21 }

  在上面的代碼中,調用者線程須要首先獲取線程Worker 本身的獨佔鎖,而後在循環判斷當前線程池是否已是TERMINATED狀態,若是是則直接返回,不然說明當前線程池中還有線程正在執行任務,這時候須要查看當前設置的超時時間是否小於0,小於0說明不須要等待就直接返回,若是大於0就須要調用條件變量termination的awaitNanos方法等待設置的時間,並在這段時間以內等待線程池的狀態變爲TERMINATED。

  咱們在前面說到清理線程池的方法processWorkerExit的時候,須要調用tryTerminated方法,在該方法中會查看當前線程池狀態是否爲TERMINATED,若是是該狀態也會調用termination.signalAll()方法喚醒全部線程池中因調用awaitTermination而被阻塞住的線程。

  若是是設置了超時時間,那麼termination的awaitNanos方法也會返回,這時候須要從新檢查線程池狀態是否爲TERMINATED,若是是則返回,不是就繼續阻塞本身。

相關文章
相關標籤/搜索