Java併發編程--ThreadPoolExecutor

概述

  爲何要使用線程池?  java

  合理利用線程池可以帶來三個好處。第一:下降資源消耗。經過重複利用已建立的線程下降線程建立和銷燬形成的消耗。第二:提升響應速度。當任務到達時,任務能夠不須要等到線程建立就能當即執行。第三:提升線程的可管理性。線程是稀缺資源,若是無限制的建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行統一的分配,調優和監控。可是要作到合理的利用線程池,必須對其原理了如指掌。——摘自http://www.infoq.com/cn/articles/java-threadPool。node

類圖

   

使用

  線程池的監控

    能夠經過線程池的如下屬性監控線程池的當前狀態:程序員

      getTaskCount():線程池已經執行的和未執行的任務總數,由於統計的過程當中可能會發生變化,該值是個近似值;算法

      getCompletedTaskCount():已完成的任務數量,是個近似值,該值小於等於TaskCount;數組

      getLargestPoolSize():線程池曾經的最大線程數量,能夠經過該值判斷線程池是否滿過。如該數值等於線程池的最大大小,則表示線程池曾經滿過;安全

      getPoolSize():線程池當前的線程數量;多線程

      getActiveCount():線程池中活動的線程數(正在執行任務),是個近似值。併發

    還能夠經過重寫線程池提供的hook方法(beforeExecute、afterExecute和terminated)進行監控,例如監控任務的平均執行時間、最大執行時間和最小執行時間等。異步

    程序員能夠經過重寫鉤子 hook 方法(如beforeExecute)實現ThreadPoolExecutor的擴展。ide

    擴展現例:添加了簡單的暫停/恢復功能的子類

 1 class PausableThreadPoolExecutor extends ThreadPoolExecutor {
 2     private boolean isPaused;    //標誌是否被暫停
 3     private ReentrantLock pauseLock = new ReentrantLock();    //訪問isPaused時須要加鎖,保證線程安全
 4     private Condition unpaused = pauseLock.newCondition();
 5 
 6     public PausableThreadPoolExecutor(...) { super(...); }
 7     
 8     //beforeExecute爲ThreadPoolExecutor提供的hood方法
 9     protected void beforeExecute(Thread t, Runnable r) {
10         super.beforeExecute(t, r);
11         pauseLock.lock();
12         try {
13             while (isPaused) 
14                 unpaused.await();
15         } catch(InterruptedException ie) {
16             t.interrupt();
17         } finally {
18             pauseLock.unlock();
19         }
20     }
21     //暫停
22     public void pause() {
23         pauseLock.lock();
24         try {
25             isPaused = true;
26         } finally {
27             pauseLock.unlock();
28         }
29     }
30     //取消暫停
31     public void resume() {
32         pauseLock.lock();
33         try {
34             isPaused = false;
35             unpaused.signalAll();
36         } finally {
37             pauseLock.unlock();
38         }
39     }
40 }

實現原理

  ThreadPoolExecutor源碼分析

    域

 1 //ctl是控制線程池狀態的一個變量,包含有效的線程數(workerCount)和線程池的運行狀態(runState)兩部分信息。高3位表示runState,低29位表示workerCount。
 2 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 3 private static final int COUNT_BITS = Integer.SIZE - 3;    //表示workerCount的位數,29位。
 4 private static final int CAPACITY   = (1 << COUNT_BITS) - 1;    //線程數的上限,(2^29)-1,大約5億
 5 
 6 // runState is stored in the high-order bits
 7 private static final int RUNNING    = -1 << COUNT_BITS;    //能接收新任務和處理隊列中的任務
 8 private static final int SHUTDOWN   =  0 << COUNT_BITS;    //不能接收新任務,但能夠處理隊列中的任務
 9 private static final int STOP       =  1 << COUNT_BITS;    //不能接收新任務,不能處理隊列中的任務,中斷正在執行的任務
10 private static final int TIDYING    =  2 << COUNT_BITS;    //全部的線程都被終止,workerCount爲0時會進入該狀態.
11 private static final int TERMINATED =  3 << COUNT_BITS;    //terminated()方法完成後將進入該狀態。

      以上ThreadPoolExecutor的成員變量表示線程池的狀態,狀態信息存儲在ctl變量中,ctl包含有效線程數(workerCount)和線程池運行狀態(runState)兩部分信息,ctl的高3位表示runState,低29位表示workerCount。ctl初始值爲RUNNING狀態且線程數爲0。

      線程池運行狀態的轉換以下:

        1)線程池在RUNNING狀態下調用shutdown()方法會進入到SHUTDOWN狀態,(finalize()方法也會調用shutdownNow())。

        2)在RUNNING和SHUTDOWN狀態下調用 shutdownNow() 方法會進入到STOP狀態。

        3)在SHUTDOWN狀態下,當阻塞隊列爲空且線程數爲0時進入TIDYING狀態;在STOP狀態下,當線程數爲0時進入TIDYING狀態。

        4)在TIDYING狀態,調用terminated()方法完成後進入TERMINATED狀態。

 

 1 //阻塞隊列
 2 private final BlockingQueue<Runnable> workQueue;
 3 //可重入鎖。訪問woker線程和相關記錄信息時須要獲取該鎖
 4 private final ReentrantLock mainLock = new ReentrantLock();
 5 //包含所有worker線程集合,Accessed only under mainLock,HashSet是非線程安全的.
 6 private final HashSet<Worker> workers = new HashSet<Worker>();
 7 private final Condition termination = mainLock.newCondition();
 8 //記錄最大的線程數量,Accessed only under mainLock.
 9 private int largestPoolSize;
10 //完成任務的數量,Accessed only under mainLock.
11 private long completedTaskCount;
12 
13 
14 //如下全部程序員能夠控制的參數都被聲明爲volatile變量,保證可見性。
15 
16 //建立線程的工廠
17 private volatile ThreadFactory threadFactory;
18 //線程池飽和或關閉時的處理策略(提供了四種飽和策略)
19 private volatile RejectedExecutionHandler handler;
20 //超出corePoolSize數量的空閒線程存活時間(allowCoreThreadTimeOut=true時有效)
21 private volatile long keepAliveTime;
22 //allowCoreThreadTimeOut=false,線程不會由於空閒時間超過keepAliveTime而被中止
23 private volatile boolean allowCoreThreadTimeOut;
24 //核心線程數
25 private volatile int corePoolSize;
26 //最大線程數,此變量的最大上限爲CAPACITY
27 private volatile int maximumPoolSize;

      1、線程池核心線程數和最大線程數

        ThreadPoolExecutor 將根據 corePoolSize (核心線程數)和 maximumPoolSize(最大線程數)設置的邊界自動調整線程池大小。當新任務在方法 execute(java.lang.Runnable) 中提交時,若是運行的線程少於 corePoolSize,則建立新線程來處理請求,即便其餘輔助線程是空閒的。若是運行的線程多於 corePoolSize 而少於 maximumPoolSize,則僅當隊列滿時才建立新線程。若是設置的 corePoolSize 和 maximumPoolSize 相同,則建立了固定大小的線程池。若是將 maximumPoolSize 設置爲基本的無界值(如 Integer.MAX_VALUE),則容許池適應任意數量的併發任務。在大多數狀況下,核心和最大池大小僅基於構造函數來設置,不過也可使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 進行動態更改。

 

      2、任務隊列

        workQueue是一個阻塞隊列,用來存儲執行的任務。全部的BlockingQueue均可用於workQueue。

          若是有效的線程數小於 corePoolSize,則線程池首選添加新線程,而不進行排隊。

          若是有效的線程數大於等於 corePoolSize,則線程池首選將任務加入隊列,而不添加新的線程。 

          若是隊列已滿,則建立新的線程,當線程數超出 maximumPoolSize 時,任務將被拒絕。

        經常使用的三種阻塞隊列的實現:

          1)直接提交。SynchronousQueue是一個不存儲元素的阻塞隊列。每一個插入操做必須等到另外一個線程調用移除操做,不然插入操做一直處於阻塞狀態。它將任務直接提交給線程而不存儲任務。直接提交一般要求不限制 maximumPoolSizes 以免拒絕新提交的任務。Executors.newCachedThreadPool使用了這個隊列。

          2)無界隊列。LinkedBlockingQueue是一個基於鏈表結構的阻塞隊列,默認的大小是Integer.MAX_VALUE。建立的線程就不會超過 corePoolSize,會使maximumPoolSize 的值無效。

          3)有界隊列。ArrayBlockingQueue是一個基於數組結構的有界阻塞隊列。有助於防止資源耗盡,可是可能較難調整和控制。

      3、飽和策略

        當 Executor 已經關閉,或者 Executor 將有限邊界用於最大線程和工做隊列容量且已經飽和時,在方法 execute(Runnable) 中提交的新任務將被拒絕。線程池提供了4種飽和策略:

          1)AbortPolicy。默認的飽和策略,直接拋出RejectedExecutionException異常。

          2)CallerRunsPolicy。用調用者所在的線程來執行任務,此策略提供簡單的反饋控制機制,可以減緩新任務的提交速度。

          3)DiscardPolicy。直接丟棄任務。

          4)DiscardOldestPolicy。若是執行程序還沒有關閉,則丟棄阻塞隊列中最靠前的任務,而後重試執行新任務(若是再次失敗,則重複此過程)。

        也可使用自定義的 RejectedExecutionHandler 類,但須要很是當心,尤爲是當策略僅用於特定容量或排隊策略時。

      4、threadFactory

        使用 ThreadFactory 建立新線程,默認狀況下在同一個 ThreadGroup 中一概使用 Executors.defaultThreadFactory() 建立線程,這些線程具備相同的 NORM_PRIORITY 優先級和非守護進程狀態。經過自定義的 ThreadFactory建立新線程,能夠改變線程的名稱、線程組、優先級、守護進程狀態等。

      5、workers用來存儲工做線程,注意HashSet<Worker>是非線程安全的,訪問時須要獲取mainLock;

      6、mainLock是一個獨佔式可重入鎖,用來保證訪問workers和其餘監控變量(如largestPoolSize、completedTaskCount等)的線程安全。

      7、keepAliveTime爲線程池的工做線程空閒後,保持存活的時間。因此若是任務不少,而且每一個任務執行的時間比較短,能夠調大這個時間,提升線程的利用率。allowCoreThreadTimeout變量表示是否容許核心線程超時,若是allowCoreThreadTimeOut=false,那麼當線程空閒時間達到keepAliveTime時,線程會退出,直到線程數量=corePoolSize;若是allowCoreThreadTimeOut=true,那麼當線程空閒時間達到keepAliveTime時,線程會退出,直到線程數量=0。

    執行任務(execute)

 1 public void execute(Runnable command) {
 2     if (command == null)
 3         throw new NullPointerException();
 4     /*
 5      * Proceed in 3 steps:
 6      *
 7      * 1. If fewer than corePoolSize threads are running, try to
 8      * start a new thread with the given command as its first
 9      * task.  The call to addWorker atomically checks runState and
10      * workerCount, and so prevents false alarms that would add
11      * threads when it shouldn't, by returning false.
12      *
13      * 2. If a task can be successfully queued, then we still need
14      * to double-check whether we should have added a thread
15      * (because existing ones died since last checking) or that
16      * the pool shut down since entry into this method. So we
17      * recheck state and if necessary roll back the enqueuing if
18      * stopped, or start a new thread if there are none.
19      *
20      * 3. If we cannot queue task, then we try to add a new
21      * thread.  If it fails, we know we are shut down or saturated
22      * and so reject the task.
23      */
24     int c = ctl.get();    //獲取線程池的狀態(runState和workerCount)
25     //若是線程數小於corePoolSize,新建一個線程執行該任務。
26     if (workerCountOf(c) < corePoolSize) {
27         if (addWorker(command, true))
28             return;
29         c = ctl.get();
30     }
31     //若是線程池是運行狀態,而且添加任務到隊列成功(隊列未滿)
32     if (isRunning(c) && workQueue.offer(command)) {
33         int recheck = ctl.get();
34         //再次判斷線程池的運行狀態,若是不是運行狀態,須要從隊列刪除該任務。使用拒絕策略處理該任務。
35         if (! isRunning(recheck) && remove(command))
36             reject(command);
37         //若是線程數爲0,執行addWorker方法。參數爲null的緣由是任務已經加入到隊列,新建的線程從隊列取任務執行便可。
38         else if (workerCountOf(recheck) == 0)
39             addWorker(null, false);
40     }
41     //線程池不是RUNNING狀態或隊列已滿,嘗試新建一個線程執行該任務。若是失敗則拒絕該任務。
42     else if (!addWorker(command, false))
43         reject(command);
44 }

     新增線程(addWorker)

      線程被封裝在Worker類中。

 1 //參數firstTask表示新建線程執行的第一個任務。若是firstTask爲null,表示
 2 //若是參數core=true,把corePoolSize做爲線程數上限的判斷條件;若是爲false,把maximumPoolSize做爲線程數上限的判斷條件
 3 private boolean addWorker(Runnable firstTask, boolean core) {
 4     retry:
 5     for (;;) {
 6         int c = ctl.get();
 7         int rs = runStateOf(c);
 8         /*
 9          * rs >= SHUTDOWN表示再也不接受新任務。 
10          * 1)線程池的運行狀態爲SHUTDOWN;2)firstTask == null;3)阻塞隊列不爲空,只有這三個條件同時知足纔不返回false
11          */
12         // Check if queue empty only if necessary.
13         if (rs >= SHUTDOWN &&
14             ! (rs == SHUTDOWN &&
15                firstTask == null &&
16                ! workQueue.isEmpty()))
17             return false;
18         
19         //自旋CAS遞增workerCount
20         for (;;) {
21             int wc = workerCountOf(c);
22             //若是線程數超過上限,返回false。若是參數core=true,把corePoolSize做爲線程數上限的判斷條件;若是爲false,把maximumPoolSize做爲線程數上限的判斷條件
23             if (wc >= CAPACITY ||
24                 wc >= (core ? corePoolSize : maximumPoolSize))
25                 return false;
26             //CAS遞增線程數。若是成功,跳出最外層循環;若是失敗,且運行狀態沒有改變,繼續內層循環直到成功。
27             if (compareAndIncrementWorkerCount(c))
28                 break retry;
29             //判斷runState是否改變,若是改變則繼續外層循環
30             c = ctl.get();  // Re-read ctl
31             if (runStateOf(c) != rs)
32                 continue retry;
33             // else CAS failed due to workerCount change; retry inner loop
34         }
35     }
36     
37     //走到這說明須要新建線程,且workerCount更新成功
38     //下面是新建Worker的過程。
39     boolean workerStarted = false;    //新建的Worker是否啓動標識
40     boolean workerAdded = false;    //新建的Worker是否被添加到workers標識
41     Worker w = null;
42     try {
43         final ReentrantLock mainLock = this.mainLock;
44         w = new Worker(firstTask);    //新建Worker
45         final Thread t = w.thread;
46         //什麼狀況下線程會爲null呢?在ThreadFactory建立線程失敗時可能會出現。
47         if (t != null) {
48             mainLock.lock();    //獲取mainLock鎖。對workers(HashSet非線程安全)和largestPoolSize更新必須加鎖
49             try {
50                 // Recheck while holding lock.
51                 // Back out on ThreadFactory failure or if
52                 // shut down before lock acquired.
53                 int c = ctl.get();
54                 int rs = runStateOf(c);
55                 /*
56                  *    若是運行狀態是RUNNING,或者運行狀態是SHUTDOWN且firstTask爲null,纔將新建的Worker添加到workers
57                  */
58                 if (rs < SHUTDOWN ||
59                     (rs == SHUTDOWN && firstTask == null)) {
60                     if (t.isAlive()) // precheck that t is startable
61                         throw new IllegalThreadStateException();
62                     workers.add(w);
63                     //更新largestPoolSize,標識線程池曾經出現過的最大線程數
64                     int s = workers.size();
65                     if (s > largestPoolSize)
66                         largestPoolSize = s;
67                     workerAdded = true;
68                 }
69             } finally {
70                 mainLock.unlock();    //釋放mainLock鎖
71             }
72             if (workerAdded) {
73                 //啓動線程
74                 t.start();
75                 workerStarted = true;
76             }
77         }
78     } finally {
79         //新建的Worker未啓動,進行失敗處理
80         if (! workerStarted)
81             addWorkerFailed(w);
82     }
83     return workerStarted;
84 }

    Worker類

      每一個線程被封裝爲一個Worker類實例。Worker類繼承了AbstractQueuedSynchronizer,並實現了一個互斥非重入鎖。Worker類同時繼承了Runnable,Worker類的實例也是一個線程。

 1 private final class Worker
 2     extends AbstractQueuedSynchronizer
 3     implements Runnable
 4 {
 5     /**
 6      * This class will never be serialized, but we provide a
 7      * serialVersionUID to suppress a javac warning.
 8      */
 9     private static final long serialVersionUID = 6138294804551838833L;
10 
11     /** Thread this worker is running in.  Null if factory fails. */
12     final Thread thread;    //處理任務的線程
13     /** Initial task to run.  Possibly null. */
14     Runnable firstTask;        //傳入的任務
15     /** Per-thread task counter */
16     volatile long completedTasks;    //完成的任務數
17 
18     /**
19      * Creates with given first task and thread from ThreadFactory.
20      * @param firstTask the first task (null if none)
21      */
22     Worker(Runnable firstTask) {
23         //同步狀態初始化爲-1,在執行runWorker方法前禁止中斷當前線程
24         setState(-1); // inhibit interrupts until runWorker 
25         this.firstTask = firstTask;
26         this.thread = getThreadFactory().newThread(this);    //經過ThreadFactory建立線程
27     }
28 
29     /** Delegates main run loop to outer runWorker  */
30     public void run() {
31         runWorker(this);
32     }
33 
34     // Lock methods
35     //
36     // The value 0 represents the unlocked state.
37     // The value 1 represents the locked state.
38     //實現了一個非重入互斥鎖,state=0表示解鎖狀態,state=1表示加鎖狀態
39     protected boolean isHeldExclusively() {
40         return getState() != 0;
41     }
42 
43     protected boolean tryAcquire(int unused) {
44         if (compareAndSetState(0, 1)) {
45             setExclusiveOwnerThread(Thread.currentThread());
46             return true;
47         }
48         return false;
49     }
50 
51     protected boolean tryRelease(int unused) {
52         setExclusiveOwnerThread(null);
53         setState(0);
54         return true;
55     }
56 
57     public void lock()        { acquire(1); }
58     public boolean tryLock()  { return tryAcquire(1); }
59     public void unlock()      { release(1); }
60     public boolean isLocked() { return isHeldExclusively(); }
61 
62     void interruptIfStarted() {
63         Thread t;
64         if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
65             try {
66                 t.interrupt();
67             } catch (SecurityException ignore) {
68             }
69         }
70     }
71 }

    runWorker方法

 1 final void runWorker(Worker w) {
 2     Thread wt = Thread.currentThread();
 3     Runnable task = w.firstTask;
 4     w.firstTask = null;
 5     //Worker初始化時同步狀態置爲-1,此處進行解鎖操做目的是將同步狀態置爲0,容許中斷。
 6     w.unlock(); // allow interrupts
 7     boolean completedAbruptly = true;    //是否由於異常跳出循環
 8     try {
 9         //若是firstTask爲null則經過getTask()方法從隊列中獲取。
10         //正常狀況下,會一直執行While循環,若是隊列爲空,getTask()方法中會阻塞當前線程,getTask()返回null時會跳出循環
11         while (task != null || (task = getTask()) != null) {
12             w.lock();    //加Worker鎖
13             // If pool is stopping, ensure thread is interrupted;
14             // if not, ensure thread is not interrupted.  This
15             // requires a recheck in second case to deal with
16             // shutdownNow race while clearing interrupt
17             /*
18              * 若是線程池正在中止,要保證當前線程是中斷狀態
19              * 若是不是,則要保證當前線程不是中斷狀態
20              *  STOP狀態要中斷線程池中的全部線程,而這裏使用Thread.interrupted()來判斷是否中斷是爲了確保在RUNNING或者SHUTDOWN狀態時線程是非中斷狀態的,由於Thread.interrupted()方法會復位中斷的狀態。
21              */
22             if ((runStateAtLeast(ctl.get(), STOP) ||
23                  (Thread.interrupted() &&
24                   runStateAtLeast(ctl.get(), STOP))) &&
25                 !wt.isInterrupted())
26                 wt.interrupt();
27             try {
28                 beforeExecute(wt, task);    //鉤子方法
29                 Throwable thrown = null;
30                 try {
31                     task.run();    //調用任務的run方法,而不是start()方法,由於Worker自己就是一個線程類
32                 } catch (RuntimeException x) {
33                     thrown = x; throw x;
34                 } catch (Error x) {
35                     thrown = x; throw x;
36                 } catch (Throwable x) {
37                     thrown = x; throw new Error(x);
38                 } finally {
39                     afterExecute(task, thrown);    //鉤子方法
40                 }
41             } finally {
42                 task = null;
43                 w.completedTasks++;
44                 w.unlock();        //釋放Worker鎖
45             }
46         }
47         completedAbruptly = false;
48     } finally {
49         //跳出循環,執行processWorkerExit()方法
50         processWorkerExit(w, completedAbruptly);
51     }
52 }

    getTask()方法

 1 //若是返回null,在runWorker方法中會執行processWorkerExit,即關閉該線程。
 2 private Runnable getTask() {
 3     //表示上次從隊列獲取任務是否超時
 4     boolean timedOut = false; // Did the last poll() time out?
 5 
 6     retry:
 7     for (;;) {
 8         int c = ctl.get();
 9         int rs = runStateOf(c);
10 
11         // Check if queue empty only if necessary.
12         // 若是rs >= STOP,或者 rs=SHUTDOWN且隊列爲空,此時再也不接收新任務,將WorkerCount遞減並返回null。
13         if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
14             decrementWorkerCount();    //自旋CAS遞減workerCount直到成功
15             return null;
16         }
17         
18         //timed用於判斷是否須要重試控制
19         boolean timed;      // Are workers subject to culling?
20 
21         for (;;) {
22             //allowCoreThreadTimeOut默認是false,核心線程不進行超時控制,當線程數量大於corePoolSize時須要進行超時控制
23             int wc = workerCountOf(c);
24             timed = allowCoreThreadTimeOut || wc > corePoolSize;
25             
26             //若是wc <= maximumPoolSize ,且上次從隊列獲取任務超時或本次須要進行超時控制,則跳出內層循環。
27             //timedOut=true表示上次從隊列獲取元素超時,說明隊列在上次獲取的keepAliveTime時間內是空的。
28             //timed=true說明線程數量大於corePoolSize。
29             //因此timedOut=true和timed=true同時知足則說明當前線程已經空閒了keepAliveTime時間,而且線程池的數量大於corePoolSize。這時就須要關閉多餘的空閒線程(即compareAndDecrementWorkerCount並返回null)。
30             if (wc <= maximumPoolSize && ! (timedOut && timed))
31                 break;
32             //若是線程數量大於maximumPoolSize,或者上次從隊列獲取任務超時且本次須要進行超時控制。須要遞減WorkerCount,若是遞減成功則返回null
33             if (compareAndDecrementWorkerCount(c))
34                 return null;
35             //檢查線程池運行狀態是否改變。若是改變,那麼繼續外層循環,若是未改變,那麼繼續內層循環。
36             c = ctl.get();  // Re-read ctl
37             if (runStateOf(c) != rs)
38                 continue retry;
39             // else CAS failed due to workerCount change; retry inner loop
40         }
41 
42         try {
43             Runnable r = timed ?
44                 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :    
45                 //超時方式獲取,注意keepAliveTime爲超出corePoolSize大小的線程的空閒存活時間
46                 workQueue.take();    //阻塞方式獲取,若是隊列爲空阻塞當前線程
47             if (r != null)
48                 return r;
49             timedOut = true;    //若是超時,繼續循環。
50         } catch (InterruptedException retry) {
51             //若是發生中斷,則將timedOut置爲false,繼續循環
52             timedOut = false;
53         }
54     }
55 }

    processWorkerExit方法

 1 private void processWorkerExit(Worker w, boolean completedAbruptly) {
 2     //若是completedAbruptly=false,說明是由getTask返回null致使的,WorkerCount遞減的操做已經執行。
 3     //若是completedAbruptly=true,說明是由執行任務的過程當中發生異常致使,須要進行WorkerCount遞減的操做。
 4     if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
 5         decrementWorkerCount();
 6 
 7     final ReentrantLock mainLock = this.mainLock;
 8     mainLock.lock();
 9     try {
10         completedTaskCount += w.completedTasks;
11         workers.remove(w);    //從workers中刪除當前worker,對workers更新須要加mainLock鎖。
12     } finally {
13         mainLock.unlock();
14     }
15 
16     tryTerminate();
17     
18     //若是是異常結束(completedAbruptly=true),須要從新調用addWorker()增長一個線程,保持線程數量。
19     //若是是由getTask()返回null致使的線程結束,須要進行如下判斷:
20     //    1)若是allowCoreThreadTimeOut=true且隊列不爲空,那麼須要至少保證有一個線程。
21     //    2)若是allowCoreThreadTimeOut=false,那麼須要保證線程數大於等於corePoolSize。
22     //
23     int c = ctl.get();
24     if (runStateLessThan(c, STOP)) {
25         if (!completedAbruptly) {
26             int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
27             if (min == 0 && ! workQueue.isEmpty())
28                 min = 1;
29             if (workerCountOf(c) >= min)
30                 return; // replacement not needed
31         }
32         addWorker(null, false);
33     }
34 }

    tryTerminate()方法

 1 //根據線程池狀態判斷是否結束線程池
 2 final void tryTerminate() {
 3     for (;;) {
 4         int c = ctl.get();
 5         //若是線程池運行狀態是RUNNING,或者大於等於TIDYING,或者運行狀態爲SHUTDOWN且隊列爲空,則直接return。
 6         if (isRunning(c) ||
 7             runStateAtLeast(c, TIDYING) ||
 8             (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
 9             return;
10         //若是線程數不爲0,則中斷一個空閒線程並return。爲何有這一步操做。
11         if (workerCountOf(c) != 0) { // Eligible to terminate
12             interruptIdleWorkers(ONLY_ONE);
13             return;
14         }
15 
16         final ReentrantLock mainLock = this.mainLock;
17         mainLock.lock();
18         try {
19             //嘗試將狀態設置爲TIDYING狀態,
20             if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
21                 try {
22                     //若是CAS成功,執行terminated()方法
23                     terminated();
24                 } finally {
25                     ctl.set(ctlOf(TERMINATED, 0));
26                     termination.signalAll();
27                 }
28                 return;
29             }
30         } finally {
31             mainLock.unlock();
32         }
33         // else retry on failed CAS
34     }
35 }

    shutdown()方法

      線程池運行狀態由RUNNING到SHUTDOWN的轉換。

 1 public void shutdown() {
 2     final ReentrantLock mainLock = this.mainLock;
 3     mainLock.lock();
 4     try {
 5         //安全管理,檢查方法調用者是否有權限中斷Worker線程
 6         checkShutdownAccess();
 7         //運行狀態改成SHUTDOWN
 8         advanceRunState(SHUTDOWN);    //自旋CAS
 9         //中斷空閒線程
10         interruptIdleWorkers();
11         onShutdown(); // hook for ScheduledThreadPoolExecutor
12     } finally {
13         mainLock.unlock();
14     }
15     //嘗試結束線程池
16     tryTerminate();
17 }
18 
19 private void interruptIdleWorkers() {
20     interruptIdleWorkers(false);
21 }
22 
23 private void interruptIdleWorkers(boolean onlyOne) {
24     final ReentrantLock mainLock = this.mainLock;
25     mainLock.lock();    //對workers的操做須要獲取mainLock
26     try {
27         //遍歷全部的線程,若是沒有被中斷且獲取鎖成功則中斷線程。獲取鎖失敗時極可能該線程正在執行任務(woker執行任務時須要對woker加鎖)。
28         for (Worker w : workers) {
29             Thread t = w.thread;
30             if (!t.isInterrupted() && w.tryLock()) {
31                 try {
32                     t.interrupt();
33                 } catch (SecurityException ignore) {
34                 } finally {
35                     w.unlock();
36                 }
37             }
38             if (onlyOne)
39                 break;
40         }
41     } finally {
42         mainLock.unlock();
43     }
44 }

    shutdownNow()方法

 1 public List<Runnable> shutdownNow() {
 2     List<Runnable> tasks;
 3     final ReentrantLock mainLock = this.mainLock;
 4     mainLock.lock();
 5     try {
 6         checkShutdownAccess();
 7         advanceRunState(STOP);
 8         //中斷全部線程,即便線程正在執行任務
 9         interruptWorkers();    
10         //取出隊列中的任務
11         tasks = drainQueue();
12     } finally {
13         mainLock.unlock();
14     }
15     //嘗試結束線程池
16     tryTerminate();
17     return tasks;
18 }
19 
20 private void interruptWorkers() {
21     final ReentrantLock mainLock = this.mainLock;
22     mainLock.lock();
23     try {
24         for (Worker w : workers)
25             w.interruptIfStarted();
26     } finally {
27         mainLock.unlock();
28     }
29 }
30 
31 private List<Runnable> drainQueue() {
32     BlockingQueue<Runnable> q = workQueue;
33     List<Runnable> taskList = new ArrayList<Runnable>();
34     q.drainTo(taskList);
35     if (!q.isEmpty()) {
36         for (Runnable r : q.toArray(new Runnable[0])) {
37             if (q.remove(r))
38                 taskList.add(r);
39         }
40     }
41     return taskList;
42 }

   

  FutureTask源碼分析

    利用FutureTask能夠實現獲取異步任務的返回值、取消異步任務等功能。看一下ThreadPoolExecutor的submit方法。submit方法根據任務構造一個FutureTask對象並返回,在主線程中能夠根據FutureTask提供的方法進行任務取消和獲取異步任務的返回值。

1 public <T> Future<T> submit(Callable<T> task) {
2     if (task == null) throw new NullPointerException();
3     RunnableFuture<T> ftask = newTaskFor(task);
4     execute(ftask);    //實際執行的任務是ftask
5     return ftask;
6 }

    域

private volatile int state;        //狀態,新建立時狀態爲NEW
private static final int NEW          = 0;    //新建立    
private static final int COMPLETING   = 1;    //正在執行
private static final int NORMAL       = 2;    //正常完成
private static final int EXCEPTIONAL  = 3;    //執行過程當中出現異常
private static final int CANCELLED    = 4;    //被取消
private static final int INTERRUPTING = 5;    //
private static final int INTERRUPTED  = 6;

/** The underlying callable; nulled out after running */
private Callable<V> callable;    //要執行的任務
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;    //執行callable的線程
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;    //Treiber算法實現的棧,用於存儲等待的線程

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

      狀態的轉換有如下幾種狀況:

        1)NEW -> COMPLETING -> NORMAL 正常執行並返回;

        2)NEW -> COMPLETING -> EXCEPTIONAL 執行過程當中出現異常;

        3)NEW -> CANCELLED 執行前被取消

        4)NEW -> INTERRUPTING -> INTERRUPTED 取消時被中斷。

    初始化

 1 public FutureTask(Callable<V> callable) {
 2     if (callable == null)
 3         throw new NullPointerException();
 4     this.callable = callable;
 5     this.state = NEW;       // ensure visibility of callable
 6 }
 7 
 8 public FutureTask(Runnable runnable, V result) {
 9     //因爲Runnable沒有返回值,經過Executors將Runnable轉換爲Callable。
10     this.callable = Executors.callable(runnable, result);
11     this.state = NEW;       // ensure visibility of callable
12 }

    執行任務-run()方法

 1 public void run() {
 2     //只執行state=NEW的任務。若是state!=NEW說明任務已經執行。
 3     //若是state=NEW,則經過CAS將runner置爲當前線程。若是失敗說明其餘線程已經執行。
 4     if (state != NEW ||
 5         !UNSAFE.compareAndSwapObject(this, runnerOffset,
 6                                      null, Thread.currentThread()))
 7         return;
 8     try {
 9         Callable<V> c = callable;
10         if (c != null && state == NEW) {
11             V result;    //任務執行結果
12             boolean ran;    //任務執行期間是否發生異常
13             try {
14                 result = c.call();    //執行任務
15                 ran = true;
16             } catch (Throwable ex) {
17                 result = null;
18                 ran = false;
19                 //若是發生異常,執行setException(ex)
20                 setException(ex);
21             }
22             //若是正常結束,執行set(result).
23             if (ran)
24                 set(result);
25         }
26     } finally {
27         // runner must be non-null until state is settled to
28         // prevent concurrent calls to run()
29         //無論任務執行是否正常,都須要將runner置爲null
30         runner = null;
31         // state must be re-read after nulling runner to prevent
32         // leaked interrupts
33         //防止中斷泄露,須要結合cancel方法研究
34         //若是s>=INTERRUPTING,說明狀態變換爲NEW -> INTERRUPTING -> INTERRUPTED,即在取消時被中斷。
35         int s = state;
36         if (s >= INTERRUPTING)
37             handlePossibleCancellationInterrupt(s);
38     }
39 }

      任務執行正常結束:

1 //任務正常結束,經過CAS更新state爲COMPLETING,若是成功,將state更新爲NORMAL,喚醒等待線程。
2 protected void set(V v) {
3     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
4         outcome = v;    //將運行結果result賦給outcome
5         UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
6         //刪除和喚醒全部的等待線程
7         finishCompletion();
8     }
9 }

      任務執行時發生異常:

1 //任務執行時發生異常,經過CAS更新state爲COMPLETING,若是成功,將state更新爲EXCEPTIONAL,喚醒等待線程
2 protected void setException(Throwable t) {
3     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
4         outcome = t;    //將異常信息賦給outcome
5         UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
6         finishCompletion();
7     }
8 }

      喚醒等待獲取任務運行結果的線程:

 1 private void finishCompletion() {
 2     // assert state > COMPLETING;
 3     //自旋CAS更新waiters爲null直到成功
 4     for (WaitNode q; (q = waiters) != null;) {
 5         if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
 6             for (;;) {
 7                 Thread t = q.thread;
 8                 if (t != null) {
 9                     q.thread = null;
10                     LockSupport.unpark(t);    //喚醒等待線程,WaitNode是在get方法中添加的
11                 }
12                 WaitNode next = q.next;
13                 if (next == null)
14                     break;
15                 q.next = null; // unlink to help gc
16                 q = next;
17             }
18             break;
19         }
20     }
21 
22     done();    //hook方法,默認不執行任何操做,子類能夠重寫該方法完成指定的功能(例如:回調)
23 
24     callable = null;        // to reduce footprint
25 }

      handlePossibleCancellationInterrupt方法要確保cancel(true)產生的中斷髮生在run或runAndReset方法執行的過程當中。這裏會循環的調用Thread.yield()來確保狀態在cancel方法中被設置爲INTERRUPTED。

 1 private void handlePossibleCancellationInterrupt(int s) {
 2     // It is possible for our interrupter to stall before getting a
 3     // chance to interrupt us.  Let's spin-wait patiently.
 4     if (s == INTERRUPTING)
 5         while (state == INTERRUPTING)
 6             Thread.yield(); // wait out pending interrupt
 7 
 8     // assert state == INTERRUPTED;
 9 
10     // We want to clear any interrupt we may have received from
11     // cancel(true).  However, it is permissible to use interrupts
12     // as an independent mechanism for a task to communicate with
13     // its caller, and there is no way to clear only the
14     // cancellation interrupt.
15     //
16     // Thread.interrupted();
17 }

    獲取運行結果-get()方法

 1 public V get() throws InterruptedException, ExecutionException {
 2     int s = state;
 3     //若是state爲NEW或COMPLETING,調用awaitDone方法將當前線程添加到waiters中並阻塞
 4     if (s <= COMPLETING)
 5         s = awaitDone(false, 0L);
 6     //若是已經完成(包括正常結束或異常結束),返回
 7     return report(s);
 8 }
 9 
10 //若是超時則拋出TimeoutException異常
11 public V get(long timeout, TimeUnit unit)
12     throws InterruptedException, ExecutionException, TimeoutException {
13     if (unit == null)
14         throw new NullPointerException();
15     int s = state;
16     if (s <= COMPLETING &&
17         (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
18         throw new TimeoutException();
19     return report(s);
20 }

      awaitDone方法,阻塞線程。

 1 //timed參數表示是否使用超時機制
 2 private int awaitDone(boolean timed, long nanos)
 3     throws InterruptedException {
 4     final long deadline = timed ? System.nanoTime() + nanos : 0L;
 5     WaitNode q = null;
 6     boolean queued = false;    //是否已經入棧
 7     for (;;) {
 8         //若當前線程被中斷,則刪除q並拋出InterruptedException()
 9         if (Thread.interrupted()) {
10             removeWaiter(q);
11             throw new InterruptedException();
12         }
13 
14         int s = state;
15         //若是state大於COMPLETING,代表任務已經完成,則將節點q的線程置爲null並返回狀態值。
16         if (s > COMPLETING) {
17             if (q != null)
18                 q.thread = null;
19             return s;
20         }
21         //s==COMPLETING,說明任務已經執行完成但尚未設置最終狀態。
22         //Thread.yield();讓當前正在運行的線程回到可運行狀態,以容許其餘線程(包括當前線程)得到運行的機會。注意目的是嘗試讓狀態改變,繼續下個循環。
23         else if (s == COMPLETING) // cannot time out yet
24             Thread.yield();
25         else if (q == null)
26             q = new WaitNode();    //新建WaitNode節點
27         //CAS添加到waiters棧,在阻塞以前先將節點q添加棧,入棧成功後queued更新爲true。
28         else if (!queued)
29             queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
30                                                  q.next = waiters, q);
31         else if (timed) {
32             nanos = deadline - System.nanoTime();
33             //若是已通過期,則刪除節點q並返回
34             if (nanos <= 0L) {
35                 removeWaiter(q);
36                 return state;
37             }
38             LockSupport.parkNanos(this, nanos);    //超時機制阻塞當前線程
39         }
40         else
41             LockSupport.park(this);    //阻塞當前線程
42     }
43 }
44             
45 //刪除指定節點(Treiber算法實現的棧)
46 private void removeWaiter(WaitNode node) {
47     if (node != null) {
48         node.thread = null;    //將線程置爲null,由於下面要根據thread是否爲null判斷是否要把node移出
49         retry:
50         for (;;) {          // restart on removeWaiter race
51             for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
52                 s = q.next;
53                 if (q.thread != null)
54                     pred = q;
55                 else if (pred != null) {
56                     pred.next = s;
57                     if (pred.thread == null) // check for race
58                         continue retry;
59                 }
60                 else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
61                     continue retry;
62             }
63             break;
64         }
65     }
66 }

      report方法,返回運行結果或拋出異常。

 1 //任務完成返回執行結果或拋出異常
 2 private V report(int s) throws ExecutionException {
 3     Object x = outcome;
 4     //若是任務正常完成,返回執行結果
 5     if (s == NORMAL)
 6         return (V)x;
 7     //若是s >= CANCELLED,說明任務被取消,那麼就拋出CancellationException
 8     if (s >= CANCELLED)
 9         throw new CancellationException();
10     //最後s==EXCEPTIONAL,任務執行時發生異常,拋出該異常
11     throw new ExecutionException((Throwable)x);
12 }

    取消任務-cancel方法

      試圖取消對此任務的執行。若是任務已完成、或已取消,或者因爲某些其餘緣由而沒法取消,則此嘗試將失敗。當調用 cancel 時,若是調用成功,而此任務還沒有啓動,則此任務將永不運行。若是任務已經啓動,則 mayInterruptIfRunning 參數肯定是否應該以試圖中止任務的方式來中斷執行此任務的線程。

 1 public boolean cancel(boolean mayInterruptIfRunning) {
 2     //若state != NEW,說明任務已經啓動,則直接返回失敗。
 3     if (state != NEW)
 4         return false;
 5     //若是mayInterruptIfRunning爲true,要中斷當前執行任務的線程。
 6     if (mayInterruptIfRunning) {
 7         //CAS更新state爲INTERRUPTING不成功,說明state已被改變(即state != NEW),則直接返回失敗。若是成功則中斷正在執行任務的線程,並喚醒等待獲取結果的線程。
 8         if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
 9             return false;
10         Thread t = runner;
11         if (t != null)
12             t.interrupt();    //中斷當前線程
13         //更新state爲INTERRUPTED
14         UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
15     }
16     //mayInterruptIfRunning=flase,CAS更新state爲CANCELLED,若成功則喚醒等待的線程(不中斷正在執行任務的線程),若失敗返回false。
17     else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
18         return false;
19     finishCompletion();
20     return true;
21 }

 

  Executors源碼解析

    Executors是一個工具類,提供了公共的靜態方法,例如建立默認線程工廠、建立線程池、把Runnable包裝成Callable的方法等。

    建立默認線程工廠

      DefaultThreadFactory類

 1 static class DefaultThreadFactory implements ThreadFactory {
 2     private static final AtomicInteger poolNumber = new AtomicInteger(1);    //線程池序號
 3     private final ThreadGroup group;    //線程組
 4     private final AtomicInteger threadNumber = new AtomicInteger(1);    //線程號
 5     private final String namePrefix;
 6 
 7     DefaultThreadFactory() {
 8         SecurityManager s = System.getSecurityManager();
 9         group = (s != null) ? s.getThreadGroup() :
10                               Thread.currentThread().getThreadGroup();
11         namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
12     }
13 
14     public Thread newThread(Runnable r) {
15         Thread t = new Thread(group, r,
16                               namePrefix + threadNumber.getAndIncrement(),    //線程名
17                               0);
18         //非守護線程
19         if (t.isDaemon())
20             t.setDaemon(false);
21         //相同的優先級
22         if (t.getPriority() != Thread.NORM_PRIORITY)
23             t.setPriority(Thread.NORM_PRIORITY);
24         return t;
25     }
26 }

      建立默認工廠方法:

1 public static ThreadFactory defaultThreadFactory() {
2     return new DefaultThreadFactory();
3 }

 

    建立線程池

      1) newFixedThreadPool方法

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

 

        固定線程數的線程池,corePoolSize和 maximumPoolSize 都被設置爲nThreads,keepAliveTime=0,因爲corePoolSize等於maximumPoolSize,因此keepAliveTime和maximumPoolSize參數是無效的。阻塞隊列是LinkedBlockingQueue,是一個無界隊列。正常狀況下(未執行方法shutdown()或shutdownNow()),不會調用飽和策略。

      2)newSingleThreadExecutor方法

1 public static ExecutorService newSingleThreadExecutor() {
2     return new FinalizableDelegatedExecutorService
3         (new ThreadPoolExecutor(1, 1,
4                                 0L, TimeUnit.MILLISECONDS,
5                                 new LinkedBlockingQueue<Runnable>()));
6 }

        單個線程的線程池,corePoolSize和maximumPoolSize都爲1,其餘同FixedThreadPool。能保證任務按順序執行。

      3)newCachedThreadPool方法

1 public static ExecutorService newCachedThreadPool() {
2     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
3                                   60L, TimeUnit.SECONDS,
4                                   new SynchronousQueue<Runnable>());
5 }

        線程數可改變的線程池,corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,核心線程數爲0,最大線程數爲CAPACITY(由於CAPACITY<Integer.MAX_VALUE).keepAliveTime=60L,意味着CachedThreadPool中的空閒線程等待新任務的最長時間爲60秒,空閒線程超過60秒後將會被終止。CachedThreadPool使用沒有容量的SynchronousQueue做爲線程池的工做隊列.這意味着,若是主線程提交任務的速度高於maximumPool中線程處理任務的速度時,CachedThreadPool會不斷建立新線程。極端狀況下,CachedThreadPool會由於建立過多線程而耗盡CPU和內存資源。

 

    把Runnable包裝成Callable的方法

 1 public static <T> Callable<T> callable(Runnable task, T result) {
 2     if (task == null)
 3         throw new NullPointerException();
 4     return new RunnableAdapter<T>(task, result);
 5 }
 6 
 7 public static Callable<Object> callable(Runnable task) {
 8     if (task == null)
 9         throw new NullPointerException();
10     return new RunnableAdapter<Object>(task, null);
11 }

 

 

 

 

參考資料

  深刻理解Java線程池:ThreadPoolExecutor

  聊聊併發(三)——JAVA線程池的分析和使用

  FutureTask源碼解析

  FutureTask中的waiters爲何這麼設計?

  Treiber Stack

相關文章
相關標籤/搜索