Java併發之ThreadPoolExecutor源碼解析(二)

ThreadPoolExecutor

ThreadPoolExecutor是ExecutorService的一種實現,能夠用若干已經池化的線程執行被提交的任務。使用線程池能夠幫助咱們限定和整合程序資源,儘量避免建立新的線程來執行任務從而下降任務調用的開銷,在執行大量異步任務的時候反而能得到更好的性能。此外,ThreadPoolExecutor還會維護一些統計信息,好比已完成的任務數量。java

juc包的做者Doug Lea推薦程序員儘可能使用更爲便利的Executors類的工廠方法來配置線程池:程序員

  1. Executors.newCachedThreadPool():建立一個線程池,能夠根據線程池的須要來建立任務。這個線程池比較適用執行週期較短且量大的異步任務,在調用execute(...)方法時若是線程池中存在閒置的線程,將複用閒置線程,不然建立一個新線程執行任務。若是線程的閒置時間超過60s,線程將被終止並從線程池內移除。所以,該線程池即使空閒時間再長,也不會有資源的消耗。
  2. Executors.newFixedThreadPool(int nThreads):建立一個線程池,nThreads爲池內線程的數量,池內最多同時有nThreads個線程並行處理任務,若是有新的任務提交到線程池,會先暫存在線程池中的無邊界任務隊列進行等待,直到有線程可用。若是有線程在執行期間由於錯誤提早終止,線程池將啓動一個新的線程代替原先的線程繼續處理任務隊列中處於等待的任務。除非顯式調用shutdown(),不然線程池中的線程將一直存在。
  3. Executors.newSingleThreadExecutor():建立一個Executor,該Executor使用一個工做線程處理任務。若是線程在執行期間由於錯誤而終止,將啓動一個新的線程代替原先的線程繼續處理無邊界任務隊列中處於等待的任務。隊列中的任務是按順序執行,任什麼時候刻都不會有多個任務處於活躍狀態。與newFixedThreadPool(1)不一樣,newFixedThreadPool(int nThreads)生成的線程池,能夠強轉爲ThreadPoolExecutor類型,再調用setCorePoolSize(int corePoolSize)方法設置核心線程數,而newSingleThreadExecutor()的實現類爲FinalizableDelegatedExecutorService,沒法直接設置核心線程數。

上面三種是較爲常見的配置線程池的工廠方法,若是有須要根據業務場景特殊配置線程池的,請看下面的參數:安全

核心線程數和最大線程數

ThreadPoolExecutor將根據corePoolSize(核心線程數)和maximumPoolSize(最大線程數)設置的邊界自動調整線程池內工做線程的數量(經過getPoolSize()),corePoolSize能夠經過getCorePoolSize()、setCorePoolSize(int corePoolSize)獲取和設置核心線程數,maximumPoolSize能夠經過getMaximumPoolSize()、setMaximumPoolSize(int maximumPoolSize)獲取和設置最大線程數。當有新的任務提交時,若是工做線程少於核心線程數,將會建立一個新線程來執行該任務,即使其餘工做線程處於閒置狀態。若是工做線程多於corePoolSize但少於maximumPoolSize,則當任務隊列滿的時候纔會建立新線程。若是corePoolSize和maximumPoolSize數值同樣,則建立一個固定大小的線程池;若是將maximumPoolSize設置爲Integer.MAX_VALUE,則線程池能夠容納任意數量的併發任務。併發

按需構造

核心線程只有當有新任務到達時纔會建立,但咱們能夠重寫prestartCoreThread() 或者prestartAllCoreThreads()來預先啓動核心線程。若是在構造一個線程池時,傳入的任務隊列已經存在任務,則須要線程池初始化完畢後,預先啓動線程。異步

建立新線程

使用ThreadFactory(線程工廠)建立新線程。若是沒有特別指定,則使用Executors.defaultThreadFactory()做爲默認的線程工廠,該線程工廠所建立的線程都位於相同的線程組(ThreadGroup)中,線程的優先級都是NORM_PRIORITY,線程守護狀態都爲false。經過提供不一樣線程工廠的實現,你能夠修改線程名、線程組、線程優先級和守護狀態等等。函數

活躍時間

若是線程池中的線程數超過核心線程數,多出的線程若是空閒時間超出keepAliveTime(活躍時間)將會終止,回收再也不活躍的線程。當有須要時,新的線程會從新建立。能夠經過setKeepAliveTime(long time, TimeUnit unit)動態設置活躍時間。若是time設置爲Long.MAX_VALUE,unit設置爲TimeUnit.NANOSECONDS,那麼多餘的空閒線程將不會在關閉線程池以前回收。若是調用allowCoreThreadTimeOut(boolean value)傳入的value爲true,那麼keepAliveTime將適用於核心線程,若是allowCoreThreadTimeOut爲true且keepAliveTime不爲0,核心線程的空閒時間超出活躍時間,核心線程也會被回收。oop

隊列

阻塞隊列(BlockingQueue)容許在獲取元素時陷入等待,直到有元素加入到隊列中。調用阻塞隊列方法時,有些方法不必定立刻返回,可能會在將來某個時刻達成某些條件時返回。阻塞隊列的方法伴隨四種形式:佈局

  1. 拋出異常。
  2. 返回特殊值,null或者false,具體視操做而定。
  3. 調用線程無限期陷入阻塞直到某些條件達成。
  4. 限定阻塞時長。
  拋異常 特殊值 阻塞 超時
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
檢查(獲取但不移除隊列頭部元素) element() peek()    

阻塞隊列不接受null元素,若是調用add、put、offer嘗試添加一個null元素,將會拋出NullPointerException異常,當調用poll操做失敗時也會返回null。阻塞隊列可能有容量限制,不管什麼時候都不能向隊列添加超過剩餘容量的元素,不然只能調用put方法陷入阻塞,直到有剩餘的空間能夠容納元素。若是對隊列的容納空間沒有限制,則剩餘容量返回Integer.MAX_VALUE。阻塞隊列的實現通常用於生產者-消費者隊列的場景,此外阻塞隊列還實現了Collection接口,所以,隊列還可使用remove(x)來移除元素。性能

阻塞隊列是線程安全的,全部排隊方法的實現都是用內部鎖或者其餘併發控制手段來實現原子性的。然而,除非是特殊規定,不然大部分集合操做,如:addAll、containsAll、retainAll 、removeAll不必定要保證原子性。所以,可能出如今調用addAll(c)時,只添加c中一部分的元素就拋出異常。阻塞隊列本質上並不支持關閉的操做,如:close或shutdown,當有須要讓隊列再也不接受新元素。若是有這種須要或者特性更傾向於以來隊列的實現。一種常見的策略是生產者往隊列插入具備特殊標識的對象,當消費者使用對象時,會對特殊標識進行解釋。ui

注意,阻塞隊列容許多個生產者和消費者同時使用,以下:

 class Producer implements Runnable {
   private final BlockingQueue queue;
   Producer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while (true) { queue.put(produce()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   Object produce() { ... }
 }

 class Consumer implements Runnable {
   private final BlockingQueue queue;
   Consumer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while (true) { consume(queue.take()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   void consume(Object x) { ... }
 }

 class Setup {
   void main() {
     BlockingQueue q = new SomeQueueImplementation();
     Producer p = new Producer(q);
     Consumer c1 = new Consumer(q);
     Consumer c2 = new Consumer(q);
     new Thread(p).start();
     new Thread(c1).start();
     new Thread(c2).start();
   }
 }

  

public interface BlockingQueue<E> extends Queue<E> {
    /**
     * 在不超過隊列容量的狀況下插入一個元素將返回true,,若是隊列沒有多餘的空間拋出
     * IllegalStateException異常,當使用容量受限的隊列時最好使用offer。
     *
     * @param e 待添加進隊列的元素
     * @return 返回true表明元素加入隊列成功
     */
    boolean add(E e);

    /**
     * 相比add(E)若是隊列滿時插入元素不報錯,只是返回false。
     *
     * @param e 待添加進隊列的元素
     * @return 返回true表明元素加入隊列成功,隊列滿時沒法插入返回false
     */
    boolean offer(E e);

    /**
     * 將一個元素插入到隊列,若是有必要會等待隊列有多餘空間能夠插入。若是調用
     * put(E)的線程被中斷,將拋出中斷異常InterruptedException
     *
     * @param e 待添加進隊列的元素
     */
    void put(E e) throws InterruptedException;

    /**
     * 相比offer(E)多了插入元素時陷入等待,若是等待期間隊列依舊
     * 沒有多餘的空間容納元素,則返回false,若是等待期間能插入則返回true。
     * 若是等待期間線程被中斷,則拋出中斷異常InterruptedException。
     *
     * @param e       待添加進隊列的元素
     * @param timeout 等待時長
     * @param unit    等待時長單位
     */
    boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException;

    /**
     * 取出並刪除隊列的頭部元素,若是隊列爲空,則會陷入等待,直到隊列有新的元素加入,
     * 若是等待期間線程被中斷,將拋出中斷異常
     *
     * @return 對頭元素
     */
    E take() throws InterruptedException;

    /**
     * 相比take()多了一個等待時長,若是隊列自己有元素,或者隊列原先有空但等待期間有元素
     * 加入則返回頭部元素,不然隊列爲空且等待期間沒有元素加入,則返回null。若是等待期間調用線程
     * 被中斷,則拋出InterruptedException異常。
     */
    E poll(long timeout, TimeUnit unit)
            throws InterruptedException;

    /**
     * 返回隊列理想情況下可無阻塞容納元素的容量。注意:咱們不能經過此方法判斷元素是否插入成功,
     * 由於可能存在別的線程插入或刪除隊列中的元素。
     */
    int remainingCapacity();

    /**
     * 從隊列中移除指定的元素,若是隊列中存在一個或多個相同的元素,即:o.equal(e),則刪除並返回true。
     */
    boolean remove(Object o);

    /**
     * 若是隊列存在一個或多個相同的元素,即:o.equal(e),則返回true。
     */
    boolean contains(Object o);

    /**
     * 刪除此隊列中全部可用元素,並將它們移動到給定的集合c中。當咱們把元素從原隊列取出時添加到集合c時,
     * 可能出現異常致使元素既不在原隊列,也不在集合中。隊列一樣實現了Collection接口,若是將原隊列當作
     * 參數傳入將拋出IllegalArgumentException異常
     * @param c 將隊列元素傳輸到給定的集合c。
     * @return 加入到集合c中元素的數量。
     */
    int drainTo(Collection<? super E> c);

    /**
     * 最多將maxElements個元素從隊列傳輸到給定集合,其餘和drainTo(Collection<? super E> c)同樣。
     */
    int drainTo(Collection<? super E> c, int maxElements);
}

  

任何阻塞隊列均可以用來獲取和保存任務,如何使用隊列視當前線程池的大小而定:

  1. 若是工做線程的數量小於corePoolSize,那麼Executor更傾向於添加新線程執行而非讓任務排隊。
  2. 若是工做線程的數量大於等於corePoolSize,那麼Executor更傾向於把任務添加到隊列等待執行,而非建立新線程。
  3. 若是隊列已滿,且線程池內的線程數小於maximumPoolSize,Executor會建立一個新線程來執行任務,不然任務會被拒絕。

一般有三種排隊策略:

  1. 同步隊列(SynchronousQueue):若是有線程從同步隊列獲取任務,則移交給線程,不然持有任務,若是有新的任務嘗試入隊將返回失敗,能夠根據入隊結果判斷是否要構造一個新線程。同步隊列能夠避免當處理多個請求時內部依賴出現鎖定,直接交接任務要求對maximumPoolSize這一參數不作限制,即maximumPoolSize爲Integer.MAX_VALUE,避免線程池拒絕提交任務。但若是線程池處理任務的速度不夠快,可能出現線程無限增加。
  2. 無界隊列(LinkedBlockingQueue):若是線程池核心工做線程都在執行任務時,新提交的任務將在隊列中等待。若是任務提交速度過快而執行任務的速度又慢,將致使隊列中的任務無限增加。
  3. 有界隊列(ArrayBlockingQueue):當與有限的maximumPoolSize配合使用時,有界隊列能夠防止資源耗盡,但如何設定有界隊列的大小是一個很難的問題。若是隊列過大而maximumPoolSize太小,能夠減小CPU的使用、操做系統資源和上下文切換的開銷,這有可能致使低吞吐量。若是隊列太小而maximumPoolSize過大,這會使得CPU十分繁忙,甚至出現巨大的調度開銷,一樣也會下降吞吐量。

拒絕任務

若是線程池關閉後有新任務提交、或者在任務隊列已滿的狀況下,線程池到達最大線程數且全部線程都在執行任務,將調用RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor)拒絕任務。默認提供四種預約義拒絕策略:

  1. ThreadPoolExecutor.AbortPolicy:默認狀況下使用的策略,拋出RejectedExecutionException異常。
  2. ThreadPoolExecutor.CallerRunsPolicy:使用調用線程執行任務,這種機制能夠下降任務提交的速度。
  3. ThreadPoolExecutor.DiscardPolicy:將沒法入隊也不能執行的任務直接丟棄。
  4. ThreadPoolExecutor.DiscardOldestPolicy:若是線程池未關閉,則獲取並丟棄隊列的頭部元素,再嘗試用線程池執行任務,這一策略可能會失敗,若是失敗則重複以前的步驟。

除了上述四種步驟,咱們也能夠自定義拒絕策略。

鉤子函數

ThreadPoolExecutor提供了可重寫函數beforeExecute(java.lang.Thread, java.lang.Runnable)、afterExecute(java.lang.Runnable, java.lang.Throwable),分別容許咱們在執行任務前和執行任務後作一些操做,這些方法能夠控制執行環境,例如:初始化ThreadLocals、收集統計信息、添加日誌等等。此外,也能夠重寫terminated()方法,當線程池徹底終止後會調用此方法。

若是鉤子函數或者回調函數拋出異常,工做線程可能會終止。

隊列維護

ThreadPoolExecutor提供了getQueue()容許外部獲取隊列進行監控和調試,但無論出於什麼目的儘可能少使用此方法。此外ThreadPoolExecutor還提供了remove(java.lang.Runnable) 和purge()用於刪除任務,purge()能夠取消大量處於排隊等待的任務。

銷燬

當一個線程池再也不有引用指向,且線程池內沒有存活線程將會自動關閉。若是你指望一個未手動調用shutdown()方法的線程池會被回收,你要設置合理的線程存活時間(keep-alive times)、設置核心線程數爲0,或者設置allowCoreThreadTimeOut爲true,當核心線程空閒時間超過存活時間將被回收,當線程池沒有引用指向,且無存活線程,就會被自動關閉並回收。

源碼解析

ThreadPoolExecutor的ctl變量類型爲AtomicInteger,這個數值有32位,包含兩個部分:

  1. runState(運行狀態):線程池是否處於運行中、是否已關閉等等。
  2. workerCount(工做線程數):線程池當前有多少個存活(忙碌或空閒)的線程。

爲了將運行狀態和工做線程數放在一個int字段,咱們劃分前3位存儲運行狀態,後29位存儲存活線程數量(2^29)-1(約5億)。將來有可能調整ctl爲AtomicLong類型,這可能須要調整移位和掩碼,但若是使用AtomicInteger,ThreadPoolExecutor的代碼會更簡單也更高效一些。

workerCount是線程池中還存活的線程數,該值有時候可能會短暫不一樣於池內實際的存活線程數。當須要增長工做線程時,會先用CAS的方式對workerCount+1,而後才向ThreadFactory申請建立一個線程。

runState爲線程池提供了生命週期控制,有如下幾種狀態:

  • RUNNING:容許接受新任務和處理隊列中任務。
  • SHUTDOWN:不接受新任務,但處理隊列中任務。
  • STOP:不接受新任務,不處理隊列中任務,同時嘗試中斷正在執行的任務。
  • TIDYING:全部任務都終止,workerCount爲0,線程池狀態過分到TIDYING,將要執行terminated()鉤子函數。
  • TERMINATED:terminated()函數執行完畢。

下面,咱們來看看線程池狀態的轉換:

  • RUNNING->SHUTDOWN:調用shutdown()。
  • (RUNNING or SHUTDOWN)->STOP:調用shutdownNow()。
  • SHUTDOWN->TIDYING:當線程池再也不有存活線程且隊列爲空。
  • STOP->TIDYING:當線程池再也不有存活線程。
  • TIDYING->TERMINATED:調用terminated()。

檢測線程池的狀態從SHUTDOWN過分到TIDYING並不是易事,由於在SHUTDOWN狀態下,隊列可能從非空變爲空,即仍然有存活的線程處理隊列中的任務。只有workerCount爲0且隊列爲空,才能結束線程池。 

public class ThreadPoolExecutor extends AbstractExecutorService {
    /**
     * 類型爲AtomicInteger的ctl能夠保證線程安全,該數值分兩個部分:
     * 前3位爲runState表明線程池當前的狀態:RUNNING~TERMINATED,
     * 後29位爲workerCount表明線程池內存活線程數量。
     */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    /**
     * Integer.SIZE=32,COUNT_BITS=32-3=29,用COUNT_BITS來劃分
     * runState和workerCount。
     */
    private static final int COUNT_BITS = Integer.SIZE - 3;
    /**
     * 1 << COUNT_BITS = 0010 0000 0000 0000 0000 0000 0000 0000
     * COUNT_MASK = (1 << COUNT_BITS) - 1 = 0001 1111 1111 1111 1111 1111 1111 1111
     * ~COUNT_MASK = 1110 0000 0000 0000 0000 0000 0000 0000
     * COUNT_MASK能夠幫助咱們計算線程池當前的runState和workerCount。
     * 假設ctl的值爲:0000 0000 0000 0000 0000 0000 0000 0011
     * 調用runStateOf(ctl.get()),將作ctl.get() & ~COUNT_MASK運算:
     * 0000 0000 0000 0000 0000 0000 0000 0011
     * & 1110 0000 0000 0000 0000 0000 0000 0000
     * = 0000 0000 0000 0000 0000 0000 0000 0000
     * 由此咱們能夠獲得,線程池當前狀態爲0,即SHUTDOWN。
     * 調用workerCountOf(ctl.get()),將作ctl.get() & COUNT_MASK運算:
     * 0000 0000 0000 0000 0000 0000 0000 0011
     * & 0001 1111 1111 1111 1111 1111 1111 1111
     * = 0000 0000 0000 0000 0000 0000 0000 0011
     * 由此咱們能夠獲得,線程池還有3個存活的線程。
     */
    private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

    /**
     * -1的二進制表示爲:1111 1111 1111 1111 1111 1111 1111 1111,
     * 左移29位爲:1110 0000 0000 0000 0000 0000 0000 0000
     */
    private static final int RUNNING = -1 << COUNT_BITS;
    /**
     * 0的二進制表示爲:0000 0000 0000 0000 0000 0000 0000 0000,
     * 左移29位和原先沒有變化。
     */
    private static final int SHUTDOWN = 0 << COUNT_BITS;
    /**
     * 0的二進制表示爲:0000 0000 0000 0000 0000 0000 0000 0001,
     * 左移29位爲:0010 0000 0000 0000 0000 0000 0000 0000
     */
    private static final int STOP = 1 << COUNT_BITS;
    /**
     * 2的二進制表示爲:0000 0000 0000 0000 0000 0000 0000 0010,
     * 左移29位爲:0100 0000 0000 0000 0000 0000 0000 0000
     */
    private static final int TIDYING = 2 << COUNT_BITS;
    /**
     * 3的二進制表示爲:0000 0000 0000 0000 0000 0000 0000 0011,
     * 左移29位爲:0110 0000 0000 0000 0000 0000 0000 0000
     */
    private static final int TERMINATED = 3 << COUNT_BITS;

    private static int runStateOf(int c) {
        return c & ~COUNT_MASK;
    }

    private static int workerCountOf(int c) {
        return c & COUNT_MASK;
    }

    /**
     * 根據runState和workerCount生成ctl,好比初始化線程池時,
     * ctl = new AtomicInteger(ctlOf(RUNNING, 0)),表明線程池
     * 的狀態爲RUNNING,存活線程數量爲0。
     * @param rs 線程池狀態
     * @param wc 存活線程數量
     * @return
     */
    private static int ctlOf(int rs, int wc) {
        return rs | wc;
    }

    /**
     * 經過位運算在進行一些狀態的判斷時,咱們不須要解析ctl的運行狀態,
     * 假設當前ctl:1110 0000 0000 0000 0000 0000 0000 0011,
     * 咱們要判斷線程池狀態是否小於STOP,因爲ctl開頭爲1110,以補碼的
     * 方式來計算,ctl的值必然爲負,STOP開頭爲0010,以補碼方式計算爲正數,
     * 因此ctl必然小於STOP。
     * ctl的佈局還能保證workerCount永遠不會爲負數。
     */
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    /**
     * 判斷線程池至少處於某個狀態,假設線程池如今隊列爲空且無任何存活線程,
     * 因此能保證線程池處於TIDYING狀態,若是s咱們傳入STOP,TIDYING的開頭
     * 爲0100,STOP的開頭爲0010,TIDYING>STOP,因此咱們能知道,線程池至少
     * 處於STOP以上包含STOP的狀態。
     */
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    //線程池是否處於RUNNING狀態
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }
    //CAS增長worker數量
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }
    //CAS減小worker數量
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }
}

  

咱們知道AbstractExecutorService.submit(...)方法最終會調用execute(Runnable command)方法,而AbstractExecutorService類中並無實現execute(Runnable command)方法,它將execute(Runnable command)的實現交由子類。那麼咱們來看看ThreadPoolExecutor又是如何實現execute(Runnable command)方法呢?當一個任務提交到線程池,它的執行流程又是如何呢?來看下面的代碼註釋:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        //若是工做線程的數量小於核心線程的數量,則嘗試增長工做線程
        if (workerCountOf(c) < corePoolSize) {//<1>
            /*
             * 若是成功增長工做線程,工做線程會執行咱們提交的任務,咱們就能夠安心退出,
             * 但線程池能夠併發提交任務,可能存在在<1>處時工做線程小於核心線程數,
             * 執行<2>處的addWorker(Runnable firstTask, boolean core)時,
             * 其餘線程先當前線程提交任務並增長工做線程,線程池內工做線程數超過核心線程數,
             * 當前線程增長工做線程失敗,不能直接退出。
             * 注:addWorker(Runnable firstTask, boolean core),core爲true
             * 表明增長核心線程,會將任務做爲firstTask傳入;而false表明增長非核心線程,
             * 若是傳入firstTask爲null,則表明讓工做線程去隊列中拉取任務。
             */
            if (addWorker(command, true))//<2>
                return;
            //若是<2>處增長工做線程失敗,則從新獲取ctl的值。
            c = ctl.get();
        }
        //判斷線程池是否處於運行中,且任務能夠入隊成功,若是二者成立,則進入<3>分支
        if (isRunning(c) && workQueue.offer(command)) {//<3>
            int recheck = ctl.get();
            /*
             * 從新獲取ctl,由於可能在進入<3>分支的時候,線程池被關閉,
             * 因此要從新判斷線程池狀態,若是線程池不是處於運行狀態,且
             * 任務成功被移除,則進入<4>分支,拒絕任務。
             */
            if (!isRunning(recheck) && remove(command))//<4>
                reject(command);
            /*
             * 爲何這裏要判斷工做線程數量是否爲0?由於若是設置allowCoreThreadTimeOut
             * 爲true的話,核心線程是能夠爲0的,可能代碼執行到<3>處workQueue.offer(command)以前,
             * 即任務還未入隊,工做線程數量已經爲0了,因此這裏要從新根據ctl判斷工做線程是否爲0,
             * 若是爲0得再增長非核心線程去隊列拉取並執行任務。
             */
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        /*
         * 若是沒有進入<3>分支,而到達<5>分支,通常分兩種狀況:
         * 1.線程池被關閉,<3>處isRunning(c)爲false,此時調用<5>處的
         * addWorker(...)必然返回false,而後執行拒絕策略。
         * 2.線程池處於運行狀態,<3>處isRunning(c)爲true,但隊列已滿
         * workQueue.offer(command)返回false,入隊失敗。
         * 這時候應該嘗試建立非核心工做線程執行任務,若是工做線程數量沒到達最大線程數,
         * 則建立線程並執行任務,若是工做線程到達最大線程數,則addWorker(...)返回
         * false,執行拒絕策略。
         */
        else if (!addWorker(command, false))//<5>
            reject(command);
    }

    

從ThreadPoolExecutor.execute(Runnable command)的實現,咱們能夠知道addWorker(Runnable firstTask, boolean core)方法是相當重要的,它決定了是否將任務添加進線程池執行。下面,咱們再來看看addWorker(Runnable firstTask, boolean core)方法:

    /**
     * 此方法會根據線程池當前的運行狀態、線程池所設定的邊界(核心線程數和最大線程數)。
     * 若是線程池容許建立線程執行任務,則建立線程執行firstTask並相應調整工做線程的數量。
     * 若是線程池狀態處於已中止(STOP)、關閉(SHUTDOWN)則會返回false。若是向線程工
     * 廠請求建立線程失敗,也會返回false。線程建立失敗分兩種狀況,一種是線程工廠返回null,
     * 或者執行Thread.start()時出現異常(一般爲OOM異常)。
     *
     * @param firstTask:新線程首要執行任務,若是沒有則傳入null。當工做線程數少於核心    線程數,線程池老是建立一個新線程來執行firstTask。
     * @param core:根據core爲true或者false,決定是以核心線程數或者最大線程數做爲界限, 判斷當前線程池的工做線程池是否小於界限,若是小於則容許建立線程。
     * @return 若是成功添加工做線程則返回true。
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (int c = ctl.get(); ; ) {//<1>
            // Check if queue empty only if necessary.
            /*
             * 在這個地方addWorker(...)會返回false,即添加工做線程失敗,
             * 咱們來看看是什麼狀況下會進入這個分支:
             * runStateAtLeast(c, SHUTDOWN)表明線程池運行狀態至少處於
             * SHUTDOWN,若是線程池還處於RUNNING運行狀態,此方法不會當即
             * 返回失敗。因此咱們知道,要進入此分支,首要條件就是運行狀態大於
             * 等於SHUTDOWN。
             * 以後若是runStateAtLeast(c, STOP)、firstTask != null、
             * workQueue.isEmpty())這三個條件其一爲true,則添加線程失敗。
             * 首先是runStateAtLeast(c, STOP),若是線程池當前處於STOP
             * 狀態,這時候既不接受新任務,也不處理隊列裏的任務,因此無論
             * firstTask是否爲null,都返回false。
             * 若是runStateAtLeast(c, STOP)爲false,那運行狀態只能是
             * SHUTDOWN,SHUTDOWN狀態下會處理隊列裏的任務,但再也不接受新
             * 任務,因此firstTask不爲null,也直接返回false。
             * 若是運行狀態既處於SHUTDOWN、firstTask也會空,且任務隊列也
             * 爲空,則毫無必要增長工做線程,也直接返回false。
             * 因此總結一下有兩種狀況不會進入此分支:
             * 1.線程池處於RUNNING狀態的時候。
             * 2.線程池處於SHUTDOWN,但firstTask爲空隊列不爲空時。
             */
            if (runStateAtLeast(c, SHUTDOWN)
                    && (runStateAtLeast(c, STOP)
                    || firstTask != null
                    || workQueue.isEmpty()))
                return false;

            for (; ; ) {//<2>
                //根據core判斷工做線程的上限,若是大於上限則返回false。
                if (workerCountOf(c)
                        >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;
                /*
                 * 若是用CAS的方式成功增長工做線程的數量,則用break retry的方式
                 * 結束了retry對應的外層循環(即<1>處for循環),而不是break所在
                 * 的本層循環(即<2>處循環),代碼會從<3>處開始執行。
                 */
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                /*
                 * 若是上面用CAS的方式增長工做線程失敗,則會從新判斷線程池當前
                 * 狀態是否至少處於SHUTDOWN,若是線程池已關閉,代碼會跳到retry
                 * 處從新執行<1>處的for循環。若是線程池仍然處於RUNNING狀態,則
                 * 重複執行<2>處的循環。
                 */
                if (runStateAtLeast(c, SHUTDOWN))
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        //<3>
        boolean workerStarted = false;//若是工做線程啓動成功,則賦值爲true
        boolean workerAdded = false;//若是工做線程添加成功則賦值爲true
        Worker w = null;
        try {
            //建立一個Worker對象,Worker對象會向線程工廠申請建立一個線程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            //若是線程工廠建立的Thread對象不爲null,則進入此分支
            if (t != null) {
                /*
                 * 這裏用可重入鎖鎖住try模塊代碼,由於要將以前建立好的
                 * w對象放進workers集合。
                 * 注:重入鎖ReentrantLock的概念筆者會在之後的文章裏
                 * 單獨介紹,這裏先簡單理解,可重入鎖就是禁止其餘線程同時
                 * 訪問mainLock.lock()到mainLock.unlock()之間的代碼,
                 * 和synchronized有些相似。
                 */
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    /*
                     * 從新獲取ctl,判斷線程池當前是否處於運行狀態,或小於STOP狀態,
                     * 即線程池處於RUNNING或SHUTDOWN,若是處於RUNNING則直接進入分支,
                     * 若是處於SHUTDOWN且首要執行任務爲空,表明可能要啓動一個工做線程
                     * 來執行隊列中的任務。
                     */
                    if (isRunning(c) ||
                            (runStateLessThan(c, STOP) && firstTask == null)) {
                        /*
                         * 判斷線程是否已經啓動,若是使用的是Executors.DefaultThreadFactory
                         * 默認的線程工廠,正常來講建立出來的Thread對象都是線程未啓動的,即:還沒有
                         * 調用Thread.start()。但ThreadPoolExecutor容許咱們傳入定製化的線程
                         * 工廠,因此會存在線程工廠建立出Thread對象,但Thread對象已調用過start()
                         * 方法的可能。
                         */
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //將建立好的worker添加進集合workers。
                        workers.add(w);
                        //更新歷史上最大的工做線程數,即workers.size()。
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        //將worker添加進workers後,更新workerAdded的值爲true。
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //若是worker成功加入集合,則啓動線程,並更新workerStarted爲true。
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            /*
             * 若是worker沒有啓動,表明worker沒有加入到workers集合,
             * 可能線程池狀態>=STOP,則須要執行添加工做線程失敗操做。
             */
            if (!workerStarted)
                addWorkerFailed(w);
        }
        //返回工做線程是否啓動成功
        return workerStarted;
    }
相關文章
相關標籤/搜索