【重回基礎】線程池框架與核心源碼解讀

1、前言

目錄:html

1、 前言
2、 線程池框架
     2.1 Executor 接口
     2.2 ExecutorService 接口
     2.3 ScheduledExecutorService 接口
3、 核心源碼
     3.1 ctl:線程池狀態和線程數量
     3.2 execute:任務提交
     3.3 Worker 工做線程結構體
     3.4 runWorker:Worker工做主循環
     3.5 getTtask():Worker獲取任務方法
     3.6 processWorkerExit:Worker工做結束處理方法
     3.7 addWorker:建立Worker線程
參考
複製代碼

2、 線程池框架

線程池的主要框架以下:java

在這裏插入圖片描述

  • Executor 接口:只是簡單定義了任務提交execute方法,該任務可能執行於新建線程,也多是調用線程。
  • ExecutorService 接口:主要是定義線程池的生命週期管理方法(如shutdown、shutdownNow等),和返回Future 的可供追溯的任務提交方法(如submit、invokeAny等)。
  • AbstractExecutorServie:主要提供了ExecutorService接口的submitinvokeAnyinvokeAll 三個方法的實現。
  • ThreadPoolExecutor:線程池的具體實現。
  • ScheduledExecutorService接口:繼承了ExecutorService 接口,定義了一些延遲,或週期性執行任務的方法。
  • ScheduledThreadPoolExecutor:繼承了ThreadPoolExecutor,實現了ScheduledExecutorService 接口,主要提供了延遲,或週期性執行任務方法的實現。
  • Executors:提供許多靜態工廠方法,包括構建ExecutorServiceScheduledExecutorServiceThreadFactoryCallable

2.1 Executor 接口

僅定義了一個接口,肯定了任務提交與執行解藕的方式:bash

public interface Executor {
    /** * 提交執行任務,提交與執行進行解藕,任務的執行可能在新建線程,也可能在調用線程 * @param command 待執行任務 * @throws RejectedExecutionException 任務不被接收,執行拒絕邏輯後,拋出拒絕異常 * @throws NullPointerException if command is null 若無若空,拋出空指針異常 */
    void execute(Runnable command);
}
複製代碼

2.2 ExecutorService 接口

主要定義了線程池生命週期管理方法和帶返回值的任務提交方法:框架

如下爲生命週期管理方法:dom

/** * 將線程池狀態置爲SHUTDOWN狀態,不接受新任務,執行已提交任務。 * 該方法並不阻塞等待全部已提交任務執行完畢,而是當即返回。 * 若須要返回結果,可利用{@link #awaitTermination awaitTermination} */
void shutdown();

/** * 將線程池狀態置爲STOP狀態,不接受新任務,嘗試中止全部執行任務中的線程, * 取消全部任務隊列中待執行的任務,並返回這些任務。 * 該方法不阻塞等待執行任務中的線程關閉,而是當即返回。 * 若須要返回結果,可利用{@link #awaitTermination awaitTermination} * * 須要注意的是,因爲Java並不提供搶佔的方式,而是經過線程協做, * 所以,該方法只能保證盡最大努力中止執行任務中的線程。 * 可是並不保證全部的線程都可以及時響應中斷。 */
List<Runnable> shutdownNow();

/** * 返回當前線程池是否處於SHUTDOWN狀態 */
boolean isShutdown();

/** * 返回當前線程池是否處於TERMINATED狀態 */
boolean isTerminated();

/** * 阻塞等待線程池狀態變動爲TERMINATED,或超時。 * 能夠用來阻塞等待執行shutdown()後,全部已提交任務執行完畢, * 或等待執行shutdownNow()後,全部線程回收,任務列表清空完成。 */
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
複製代碼

如下爲任務提交方法:函數

/** * 提交帶返回值的執行任務,數據類型爲Future,執行get()獲取執行結果,若執行未完成會阻塞。 * 所以,若須要同步執行等待結果,可利用exec.submit(task).get();方式調用。 */
<T> Future<T> submit(Callable<T> task);

/** * 提交實現Runnable接口的任務,若任務執行成功,返回給定的執行結果result。 */
<T> Future<T> submit(Runnable task, T result);

/** * 提交實現Runnable接口的任務,若任務執行成功,返回null。 */
Future<?> submit(Runnable task);

/** * 批量執行任務,當全部任務執行完畢(阻塞),返回帶狀態和執行結果的Future。 */
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  throws InterruptedException;

/** * 批量執行任務,當全部任務執行完畢,或超時,返回帶狀態和執行結果的Future。 * 須要注意的是,若超時,一樣會把未執行完畢的 Future 一併返回。 */
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                              long timeout, TimeUnit unit)
  throws InterruptedException;

/** * 給定任務列表,返回任意執行成功的任務執行結果,該方法會阻塞。 */
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

/** * 給定任務列表,返回任意,在給定時間內執行成功的任務執行結果。 */
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
複製代碼

2.3 ScheduledExecutorService 接口

ScheduledExecutorService 繼承了 ExecutorService 接口,所以,除了一樣具有線程池生命週期管理方法和帶返回值Future的任務提交方法,其自身提供了一些延遲,或週期性執行的任務提交方法:ui

/** * 給定實現Runnable接口的任務,延遲給定的時間,執行任務。 */
public ScheduledFuture<?> schedule(Runnable command,
                                   long delay, TimeUnit unit);

/** * 給定實現Callable接口的任務,延遲給定的時間,執行任務。 */
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

/** * 給定實現Runnable的任務,在給定初始延遲時間後,根據給定period時間週期性執行。 * 提交任務後,通過initialDelay執行第一次, * 第二次任務執行於initialDelay + period,第三次在initialDelay + 2*period, * 以此類推。 * * 除非任務執行上拋異常,不然,該任務只能經過取消任務,或線程池關閉來中止。 */
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit);

/** * 給定實現Runnable的任務,在給定初始延遲時間後,首次執行任務。 * 後繼的任務,於上次任務執行完畢後,延遲給定的delay時間後執行。 * * 一樣,除非任務執行上拋異常,不然,只能經過取消任務,或線程池關閉來終止。 */
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
複製代碼

3、 核心源碼

3.1 ctl:線程池狀態和線程數量

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
複製代碼

線程池採用了一個原子變量同時表示線程池狀態和線程數量,變量共32bit,其中高位3bit表示線程池狀態,剩餘的29bit表示線程數量。this

線程池狀態分爲如下五種:spa

private static final int COUNT_BITS = Integer.SIZE - 3;
// 接收新任務
private static final int RUNNING    = -1 << COUNT_BITS;
// 不接收新任務,執行隊列中待執行任務
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 不接收新任務,不執行隊列中待執行任務
private static final int STOP       =  1 << COUNT_BITS;
// 線程池的線程、任務都已清空,喚起 terminated
private static final int TIDYING    =  2 << COUNT_BITS;
// terminated 執行完畢
private static final int TERMINATED =  3 << COUNT_BITS;	
複製代碼

狀態流程:線程

在這裏插入圖片描述

線程數量的增減,採用CAS機制:

// 線程數加一
private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}
// 線程數減一
private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
}
複製代碼

將線程池狀態和線程數量整合爲一個原子變量,該設計很巧妙:判斷線程池的狀態,因爲狀態處於高位,因此可用大小判斷;獲取線程數量,可利用低位29bit 1 值的 mask作位與運算。可是該設計的初衷想必不是爲了節省空間,由於多一個原子整型其實也就四個字節,卻耗費更多的位運算。更多的是可能爲了維護狀態、線程數量兩個變量的原子性,防止出現一個變量改了,而另外一個變量未改的狀態不一致。

3.2 execute:任務提交

execute提交任務主要分爲三個階段,如下流程圖幫助理解:

在這裏插入圖片描述

如下爲源碼:

/** * 任務提交。若是線程池已關閉,或隊列達到上限, * 則由 RejectedExecutionHandler 執行拒絕邏輯,並拋出 RejectedExecutionException 異常。 * 主要分爲3步: * 1. 若線程數量小於核心線程數,嘗試建立新worker,該任務做爲初始化任務。若成功返回,失敗繼續。 * 2. 嘗試讓任務入隊,若成功,仍須要double-check雙重校驗,重複檢查線程池狀態和線程數量。失敗繼續。 * 3. 嘗試建立新線程,若失敗,執行拒絕邏輯。 */
public void execute(Runnable command) {
    // 提交的任務爲空,則拋出空指針異常
    if (command == null)
        throw new NullPointerException();
    
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        // 嘗試建立新worker,由於方法內部具有線程池狀態等校驗,所以這裏無需再次校驗
      	if (addWorker(command, true))
            return;
        // 若建立失敗,從新獲取 ctl
        c = ctl.get();
    }
    // 若線程池狀態爲RUNNING,任務嘗試進入阻塞隊列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // double-check雙重校驗,再次校驗狀態
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // double-check雙重校驗,再次校驗線程數量,防止線程池無工做線程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 嘗試建立新worker線程
    else if (!addWorker(command, false))
        // 失敗執行拒絕邏輯
        reject(command);
}	
複製代碼

3.3 Worker 結構體

/** * Worker 主要負責管理線程執行、中斷。 * 爲防止任務執行時中斷,每次執行任務時須要加鎖。 * 鎖的實現經過經過繼承AbstractQueuedSynchronizer簡化。 * 鎖的機制爲非重入互斥鎖,防止經過 setCorePoolSize 等方法獲取到鎖,並執行中斷等。 * 另外,Worker初始化時,state設置爲-1,防止線程未啓動卻執行中斷。 */
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    ...
   
    // 該 Worker 運行所在線程,便於執行 interrupt 等管理
    final Thread thread;
    // 初始化任務
    Runnable firstTask;
    // 完成任務數
    volatile long completedTasks;
    // 構造函數
    Worker(Runnable firstTask) {
        // 防止線程未開始就執行interrupt
        setState(-1);
        this.firstTask = firstTask;
        // 線程工廠建立線程
        this.thread = getThreadFactory().newThread(this);
    }
    // 工做線程的工做內容,包裝在 runWorker 方法
    public void run() {
        runWorker(this);
    }
		// 是否持有獨佔鎖,status 0:否,1:是
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
		// 採用CAS機制嘗試將status由0變爲1,即持有獨佔鎖
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
		// 釋放鎖
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
		// 獲取獨佔鎖,若已被獨佔,則進入FIFO隊列排隊待鎖,直到獲取到鎖
    public void lock() { acquire(1); }
    // 嘗試獲取獨佔鎖
    public boolean tryLock() { return tryAcquire(1); }
    // 釋放獨佔鎖
    public void unlock() { release(1); }
    // 判斷是否持有獨佔鎖
    public boolean isLocked() { return isHeldExclusively(); }
		// 將當前 Worker 所在線程標記爲中斷狀態
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}
複製代碼

3.4 runWorker:Worker 工做主循環

Worker 線程工做流程圖:

在這裏插入圖片描述

下面是源碼:

/** * Worker 線程的循環工做內容,就是重複不停地從隊列中獲取任務,並執行。 * * 1. 初始化任務可帶可不帶。只要線程池狀態爲 RUNNING ,那麼就循環調用 getTask() 獲取任務。 * 循環結果有兩種: * (1) getTask() 結果爲 null,通常因爲線程池狀態的變動,或線程池配置參數限制。 * (2) task.run() 出現異常,completedAbruptly 會被標記爲 true,當前線程中斷。 * * 2. 在執行任何任務以前,會對當前 Worker 加上互斥鎖,防止 shutdown() 中斷操做終止運行中的 Worker。 * 確保除非線程池狀態爲關閉中,不然線程不能別中斷。 * * 3. 每一個任務執行前會調用 beforeExecute(),該方法若拋出異常,會致使當前線程死亡,而沒有執行任務。 * * 4. task.run() 任務執行拋出來的任何 RuntimeException、Error、Throwable 都會被收集交給 * afterExecute(task, thrown) 方法,而且上拋,致使當前線程的死亡。 * * 5. afterExecute(task, thrown) 方法若拋出異常,一樣會引發當前線程的死亡。 */
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // runWorker 開始執行後,將 status 設置爲0,容許 interrupt 中斷
    w.unlock();
    boolean completedAbruptly = true;
    try {
        // 判斷 Worker 初始化任務是否爲空
        // 若空,則 getTask() 方法從阻塞隊列中嘗試獲取新任務,這裏可能陷入長久阻塞
        // 若返回爲 null,退出循環,執行 processWorkerExit() 方法處理線程終結邏輯
        while (task != null || (task = getTask()) != null) {
            // 任務執行前,會對當前 Worker 進行加鎖,固然,並非爲了防止當前線程執行多任務,
            // 由於任務的獲取也要等當前任務執行完畢,到下一個循環。
            // 這裏的鎖是爲了防止例如 shutdown() 等某些方法中斷執行任務中的線程。
            w.lock();
            // 整體思想就是,若線程狀態爲 STOP 就中斷線程,若不是 STOP,則確保線程不被中斷。
            // 具體:
            // 1. 若線程池狀態爲關閉,且當前線程未中斷,則當前線程標記中斷。
            // 2. 若未關閉,則執行 Thread.interrupted() && runStateAtLeast(ctl.get(), STOP) 
            // 即獲取當前線程狀態,並清理狀態,若獲取獲得狀態爲中斷,再次從新檢查線程池的狀態,
            // 知足則從新設置爲中斷狀態;不知足,則在 Thread.interrupted() 已清理線程狀態,直接略過。
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
          
            try {
                // 執行前調用,子類實現
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 執行任務
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 執行後調用,子類實現,傳遞收集的 thrown
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
      	// 執行結束非中斷標記
        completedAbruptly = false;
    } finally {
        // 工做線程結束處理
        processWorkerExit(w, completedAbruptly);
    }
}
複製代碼

其中,中間的判斷語句比較晦澀:

// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
     (Thread.interrupted() &&
      runStateAtLeast(ctl.get(), STOP))) &&
    !wt.isInterrupted())
  wt.interrupt();
複製代碼

對語句進行拆解,方便閱讀:

在這裏插入圖片描述

  1. 首先,編號1若爲true時,即線程池狀態大於 STOP ,出於關閉狀態。那麼驗證編號3,若當前線程爲非中斷狀態,則中斷,若中斷則不用處理了。

  2. 若編號1爲false,那麼驗證編號2.1和2.2,獲取當前線程中斷狀態,並將中斷狀態清理爲false:

    若編號2.1爲true,則驗證編號2.2,即二次檢查線程池狀態,若關閉狀態,則驗證編號3,這時編號3必然經過,由於在編號2.1已進行清理。

    若編號2.1位false,即線程池非關閉狀態,且當前線程非中斷狀態,不處理。

總結起來,就是確保:線程池爲關閉狀態時,中斷線程;若非關閉狀態,線程不被中斷。

3.5 getTask() :Worker 獲取任務方法

/** * 從阻塞隊列中獲取待執行任務,根據線程池的狀態,可能限時或不限時阻塞。出現如下任何狀況會返回 null: * 1. 當前線程數量大於最大線程數。 * 2. 線程池狀態爲 STOP。 * 3. 線程池狀態爲 SHUTDOWN,且阻塞隊列爲空。 * 4. 在阻塞隊列執行 poll 操做超時,且獲取不到任務。 * 能夠注意到,方法若返回 null,runWorker 便再也不循環,所以,這裏返回 null 的地方,都對線程數量進行扣減。 */
private Runnable getTask() {
    boolean timedOut = false;

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 2和3點:若線程池狀態爲STOP,或爲SHUTDOWN且阻塞隊列爲空時,減小線程數計數,返回null待終結。
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // 不斷嘗試線程數量減一,直到成功
            decrementWorkerCount();
            return null;
        }
        int wc = WorkerCountOf(c);
        // 是否須要關注超時:容許核心線程超時回收,或線程數量大於核心線程數量
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
				// 1和4點:線程數量大於最大線程數,或執行 poll 超時。
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // 嘗試線程數量減一,不成功則重試
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            // 若須要關注超時,則調用 poll,給予時限。若無需關注超時,則調用 take,長時間等待任務。
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            // 若任務不爲空,返回;若空,則標記超時
            if (r != null)
                return r;
            timedOut = true;
        // poll 和 take 上拋的等待中斷異常
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
複製代碼

3.6 processWorkerExit:Worker 工做結束處理方法

/** * 主要作三件事情: * 1. 維護Worker線程結束後的線程池狀態,好比移出woker集合,統計完成任務數。 * 2. 檢測線程池是否知足 TIDYING 狀態,知足則調整狀態,觸發 terminated()。 * 3. 當線程池狀態爲RUNNING或SHUTDOWN時,檢測如下三種狀況從新建立新的Worker: * (1) 任務執行異常引發的Worker線程死亡。 * (2) 線程數量爲0且任務隊列不爲空。 * (3) 若不容許核心線程超時回收,線程數量少於核心線程時。 */
private void processWorkerExit(Worker w, boolean completedAbruptly) {
		// 若因爲任務執行異常引發的線程終結,線程數量減一。
    // 非任務執行異常引發,說明是因爲getTask()方法返回null,線程數量減一已在返回時處理。
    // 所以,這裏只須要處理用戶任務執行異常引發的線程終結。
  	if (completedAbruptly)
        decrementWorkerCount();
		// 操做線程池共享變量加鎖
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        Workers.remove(w);
    } finally {
        mainLock.unlock();
    }
		// 嘗試進入 TIDYING 狀態
    tryTerminate();
  
    int c = ctl.get();
    // 若線程池爲 RUNNING 或 SHUTDOWN
    if (runStateLessThan(c, STOP)) {
        // 若因爲任務執行異常引發則直接跳過,建立新的Worker代替
        if (!completedAbruptly) {
            // 若容許核心線程超時回收,則最低線程數量爲0,不然爲核心線程數
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 若最低值爲0,檢測任務隊列是否非空,非空最低改成1
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            // 若當前線程數量大於最低值則跳過,不然建立新的Worker代替
            if (WorkerCountOf(c) >= min)
                return;
        }
        // 建立新 Worker
        addWorker(null, false);
    }
}
複製代碼

3.7 addWorker:建立Worker線程

/** * 主要負責檢查是否知足線程建立條件,若知足則新建Worker線程。線程建立成功返回true; * 若線程池狀態爲STOP,或爲不知足條件的SHUTDOWN時,或線程工廠建立失敗時,返回false。 * 線程建立失敗也可能拋出異常,尤爲是內存不足時。 */
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    // 外圈循環,主要判斷線程池狀態
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 判斷是否容許建立新的Worker線程,看着比較拗口,實際主要拒絕如下三種場景下,進行建立線程:
        // 1. 線程池狀態爲STOP、TIDYING、TERMINATE。
        // 2. 線程池狀態爲SHUTDOWN,新任務試圖進入線程池並建立新線程。
        // 3. 線程池狀態爲SHUTDOWN,任務隊列爲空。
        // 後繼對該判斷語句進行拆解解析
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        
        // 內圈循環,主要判斷線程數量
        for (;;) {
            int wc = WorkerCountOf(c);
            // 若線程數量超越了ctl的bit數,或者核心線程數量已滿時建立核心線程,或線程已達最大線程數
            // 則返回false,拒絕建立
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 採用CAS機制嘗試線程數量加一,成功則再也不進行retry外圈循環
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // CAS操做線程數量加一失敗,說明線程池ctl在當時已發生變化,所以從新獲取
            c = ctl.get();
            // 若ctl變化的是線程池狀態,則循環外圈,從新判斷線程池狀態
            // 若ctl變化的只是線程數量,則無需外圈循環從新判斷線程池狀態,只須要內圈循環,嘗試線程數量加一
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
	  // 線程數成功加一,開始建立Worker
    boolean WorkerStarted = false;
    boolean WorkerAdded = false;
    Worker w = null;
    try {
        // 調用Worker構造方法,內部採用了線程工廠建立線程,可能返回null,也可能拋出異常,一般由於內存不足
        w = new Worker(firstTask);
        final Thread t = w.thread;
        // 線程建立成功
        if (t != null) {
            // 操做線程池共享變量時取鎖
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 獲取到鎖後,從新檢查線程池狀態
                int rs = runStateOf(ctl.get());
								// 拿到鎖後從新檢查線程池狀態,只容許爲RUNNING或SHUTDOWN且非新建任務開闢線程時容許繼續
                // 不然,釋放鎖,回滾線程數量
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 線程工廠建立出來的新線程已經start,則拋出線程狀態異常
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
                    // 新Worker進入集合
                  	Workers.add(w);
                    int s = Workers.size();
                    // 更新線程池最大線程數(區別於最大線程數,這個變量更多的是統計)
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    WorkerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // Worker入列成功,開啓線程
            if (WorkerAdded) {
                t.start();
                WorkerStarted = true;
            }
        }
    } finally {
        // 若線程建立失敗,則回滾
        if (! WorkerStarted)
            addWorkerFailed(w);
    }
    // 返回是否新線程啓動成功
    return WorkerStarted;
}
複製代碼

其中,對中間那句比較拗口的判斷語句剖析一下:

// 判斷是否容許建立新的Worker線程,看着比較拗口,實際主要拒絕如下三種場景下,進行建立線程:
// 1. 線程池狀態爲STOP、TIDYING、TERMINATE。
// 2. 線程池狀態爲SHUTDOWN,新任務試圖進入線程池並建立新線程。
// 3. 線程池狀態爲SHUTDOWN,任務隊列爲空。
// 後繼對該判斷語句進行拆解解析
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
		return false;
複製代碼

語句能夠轉換成:

if (rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()))
		return false;
複製代碼

即當線程池狀態大於等於SHUTDOWN時,若後續條件 (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()) 知足任意一個,則不容許建立。

  1. 先看 rs!=SHUTDOWN,若爲true,即意味着線程池狀態爲STOP、TIDYING、TERMINATE,那麼皆不容許建立新線程。
  2. rs!=SHUTDOWN 爲false,即rs=SHUTDOWN。從addWorker 方法的調用可知,只有當任務提交新建線程時會帶有 firstTask 參數。所以,第二個條件 firstTask!=null,用來拒絕線程池狀態爲SHUTDOWN時,新任務想建立線程。
  3. 若前兩個都不知足,即rs=SHUTDOWNfirstTask=null,那麼驗證第三個條件workQueue.isEmpty(),若任務線程爲空,則知足拒絕建立;若非空則容許建立。

參考

  1. Java線程池和ThreadPoolExecutor使用和分析(一)
  2. Java線程池和ThreadPoolExecutor使用和分析(二)
相關文章
相關標籤/搜索