Java併發(三)線程池原理

Java中的線程池是運用場景最多的併發框架,幾乎全部須要異步或併發執行任務的程序均可以使用線程池。在開發過程當中,合理地使用線程池可以帶來3個好處。java

1. 下降資源消耗。經過重複利用已建立的線程下降線程建立和銷燬形成的消耗;安全

2. 提升響應速度。當任務到達時,任務能夠不須要等到線程建立就能當即執行;多線程

3. 提升線程的可管理性。線程是稀缺資源,若是無限制地建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行統一分配、調優和監控。可是,要作到合理利用線程池,必須對其實現原理了如指掌。併發

線程池實現原理

當向線程池提交一個任務以後,線程池是如何處理這個任務的呢?本節來看一下線程池的主要處理流程,處理流程圖以下圖所示:框架

從圖中能夠看出,當提交一個新任務到線程池時,線程池的處理流程以下。less

1. 線程池判斷核心線程池裏的線程是否都在執行任務。若是不是,則建立一個新的工做線程來執行任務。若是核心線程池裏的線程都在執行任務,則進入下個流程。異步

2. 線程池判斷工做隊列是否已經滿。若是工做隊列沒有滿,則將新提交的任務存儲在這個工做隊列裏。若是工做隊列滿了,則進入下個流程。ide

3. 線程池判斷線程池的線程是否都處於工做狀態。若是沒有,則建立一個新的工做線程來執行任務。若是已經滿了,則交給飽和策略來處理這個任務。oop

ThreadPoolExecutor執行execute()方法的示意圖,以下圖所示源碼分析

 

ThreadPoolExecutor執行execute方法分下面4種狀況。

1)若是當前運行的線程少於corePoolSize,則建立新線程來執行任務(注意,執行這一步驟須要獲取全局鎖)。

2)若是運行的線程等於或多於corePoolSize,則將任務加入BlockingQueue。

3)若是沒法將任務加入BlockingQueue(隊列已滿),則建立新的線程來處理任務(注意,執行這一步驟須要獲取全局鎖)。

4)若是建立新線程將使當前運行的線程超出maximumPoolSize,任務將被拒絕,並調用RejectedExecutionHandler.rejectedExecution()方法。

ThreadPoolExecutor採起上述步驟的整體設計思路,是爲了在執行execute()方法時,儘量地避免獲取全局鎖(那將會是一個嚴重的可伸縮瓶頸)。在ThreadPoolExecutor完成預熱以後(當前運行的線程數大於等於corePoolSize),幾乎全部的execute()方法調用都是執行步驟2,而步驟2不須要獲取全局鎖。

源碼分析:上面的流程分析讓咱們很直觀地瞭解了線程池的工做原理,讓咱們再經過源代碼來看看是如何實現的。

1、變量

public class ThreadPoolExecutor extends AbstractExecutorService {
    /**
     * The main pool control state, ctl, is an atomic integer packing
     * two conceptual fields
     *   workerCount, indicating the effective number of threads
     *   runState,    indicating whether running, shutting down etc
     *
     * In order to pack them into one int, we limit workerCount to
     * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
     * billion) otherwise representable. If this is ever an issue in
     * the future, the variable can be changed to be an AtomicLong,
     * and the shift/mask constants below adjusted. But until the need
     * arises, this code is a bit faster and simpler using an int.
     *
     * The workerCount is the number of workers that have been
     * permitted to start and not permitted to stop.  The value may be
     * transiently different from the actual number of live threads,
     * for example when a ThreadFactory fails to create a thread when
     * asked, and when exiting threads are still performing
     * bookkeeping before terminating. The user-visible pool size is
     * reported as the current size of the workers set.
     *
     * The runState provides the main lifecycle control, taking on values:
     *
     *   RUNNING:  Accept new tasks and process queued tasks
     *   SHUTDOWN: Don't accept new tasks, but process queued tasks
     *   STOP:     Don't accept new tasks, don't process queued tasks,
     *             and interrupt in-progress tasks
     *   TIDYING:  All tasks have terminated, workerCount is zero,
     *             the thread transitioning to state TIDYING
     *             will run the terminated() hook method
     *   TERMINATED: terminated() has completed
     *
     * The numerical order among these values matters, to allow
     * ordered comparisons. The runState monotonically increases over
     * time, but need not hit each state. The transitions are:
     *
     * RUNNING -> SHUTDOWN
     *    On invocation of shutdown(), perhaps implicitly in finalize()
     * (RUNNING or SHUTDOWN) -> STOP
     *    On invocation of shutdownNow()
     * SHUTDOWN -> TIDYING
     *    When both queue and pool are empty
     * STOP -> TIDYING
     *    When pool is empty
     * TIDYING -> TERMINATED
     *    When the terminated() hook method has completed
     *
     * Threads waiting in awaitTermination() will return when the
     * state reaches TERMINATED.
     *
     * Detecting the transition from SHUTDOWN to TIDYING is less
     * straightforward than you'd like because the queue may become
     * empty after non-empty and vice versa during SHUTDOWN state, but
     * we can only terminate if, after seeing that it is empty, we see
     * that workerCount is 0 (which sometimes entails a recheck -- see
     * below).
     */
    /**
     * ctl 爲原子類型的變量, 有兩個概念
     * workerCount, 表示有效的線程數
     * runState, 表示線程狀態, 是否正在運行, 關閉等
     */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 容量 2²⁹-1
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    // 線程池的五種狀態
    // 即高3位爲111, 接受新任務並處理排隊任務
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 即高3位爲000, 不接受新任務, 但處理排隊任務
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 即高3位爲001, 不接受新任務, 不處理排隊任務, 並中斷正在進行的任務
    private static final int STOP       =  1 << COUNT_BITS;
    // 即高3位爲010, 全部任務都已終止, 工做線程爲0, 線程轉換到狀態TIDYING, 將運行terminate()鉤子方法
    private static final int TIDYING    =  2 << COUNT_BITS;
    // 即高3位爲011, 標識terminate()已經完成
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl 用來計算線程的方法
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    ... ...
}

 ctl 是對線程池的運行狀態和線程池中有效線程的數量進行控制的一個字段, 它包含兩部分的信息:線程池的運行狀態 (runState) 和線程池內有效線程的數量 (workerCount),這裏能夠看到,使用了Integer類型來保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位減1(29個1),這個常量表示workerCount的上限值,大約是5億。

下面再介紹下線程池的運行狀態,線程池一共有五種狀態,分別是:

狀態 描述
RUNNING  能接受新提交的任務,而且也能處理阻塞隊列中的任務
SHUTDOWN 關閉狀態,再也不接受新提交的任務,但卻能夠繼續處理阻塞隊列中已保存的任務。在線程池處於 RUNNING 狀態時,調用 shutdown()方法會使線程池進入到該狀態。(finalize() 方法在執行過程當中也會調用shutdown()方法進入該狀態)
STOP 不能接受新任務,也不處理隊列中的任務,會中斷正在處理任務的線程。在線程池處於 RUNNING 或 SHUTDOWN 狀態時,調用 shutdownNow() 方法會使線程池進入到該狀態
TIDYING 若是全部的任務都已終止了,workerCount (有效線程數) 爲0,線程池進入該狀態後會調用 terminated() 方法進入TERMINATED 狀態
TERMINATED 在terminated() 方法執行完後進入該狀態,默認terminated()方法中什麼也沒有作

進入TERMINATED的條件以下:

  • 線程池不是RUNNING狀態;
  • 線程池狀態不是TIDYING狀態或TERMINATED狀態;
  • 若是線程池狀態是SHUTDOWN而且workerQueue爲空;
  • workerCount爲0;
  • 設置TIDYING狀態成功。

下圖爲線程池的狀態轉換過程:

計算線程的幾個方法:

方法 描述
runStateOf 獲取運行狀態
workerCountOf 獲取活動線程數
ctlOf 獲取運行狀態和活動線程數的值

 

2、execute方法

/**
 * Executes the given task sometime in the future.  The task
 * may execute in a new thread or in an existing pooled thread.
 *
 * If the task cannot be submitted for execution, either because this
 * executor has been shutdown or because its capacity has been reached,
 * the task is handled by the current {@code RejectedExecutionHandler}.
 *
 * @param command the task to execute
 * @throws RejectedExecutionException at discretion of
 *         {@code RejectedExecutionHandler}, if the task
 *         cannot be accepted for execution
 * @throws NullPointerException if {@code command} is null
 */
public void execute(Runnable command) {
    // 空則拋出異常
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    /*
     * 獲取當前線程池的狀態
     * clt記錄着runState和workerCount
     *
     */
    int c = ctl.get();
    /*
     * 計算工做線程數 並判斷是否小於核心線程數
     * workerCountOf方法取出低29位的值,表示當前活動的線程數;
     * 若是當前活動線程數小於corePoolSize,則新建一個線程放入線程池中;
     * 並把任務添加到該線程中。
     *
     */
    if (workerCountOf(c) < corePoolSize) {
        // addWorker提交任務, 提交成功則結束
        /*
         * addWorker中的第二個參數表示限制添加線程的數量是根據corePoolSize來判斷仍是maximumPoolSize來判斷;
         * 若是爲true,根據corePoolSize來判斷;
         * 若是爲false,則根據maximumPoolSize來判斷
         */
        if (addWorker(command, true))
            return;
        // 提交失敗再次獲取當前狀態
        c = ctl.get();
    }
    // 判斷線程狀態, 並插入隊列, 失敗則移除
    /*
     * 若是當前線程池是運行狀態而且任務添加到隊列成功
     */
    if (isRunning(c) && workQueue.offer(command)) {
        // 再次獲取狀態
        int recheck = ctl.get();
        // 若是狀態不是RUNNING, 並移除失敗
        /*
         * 再次判斷線程池的運行狀態,若是不是運行狀態,因爲以前已經把command添加到workQueue中了,
         * 這時須要移除該command
         * 執行事後經過handler使用拒絕策略對該任務進行處理,整個方法返回
         */
        if (! isRunning(recheck) && remove(command))
            // 調用拒絕策略
            reject(command);
        // 若是工做線程爲0 則調用 addWorker
        /*
         * 獲取線程池中的有效線程數,若是數量是0,則執行addWorker方法
         * 這裏傳入的參數表示:
         * 1. 第一個參數爲null,表示在線程池中建立一個線程,但不去啓動;
         * 2. 第二個參數爲false,將線程池的有限線程數量的上限設置爲maximumPoolSize,添加線程時根據maximumPoolSize來判斷;
         * 若是判斷workerCount大於0,則直接返回,在workQueue中新增的command會在未來的某個時刻被執行。
         */
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 提交任務失敗 走拒絕策略
    /*
     * 若是執行到這裏,有兩種狀況:
     * 1. 線程池已經不是RUNNING狀態;
     * 2. 線程池是RUNNING狀態,但workerCount >= corePoolSize而且workQueue已滿。
     * 這時,再次調用addWorker方法,但第二個參數傳入爲false,將線程池的有限線程數量的上限設置爲maximumPoolSize;
     * 若是失敗則拒絕該任務
     */
    else if (!addWorker(command, false))
        reject(command);
}

簡單來講,在執行 execute() 方法時若是狀態一直是RUNNING時,的執行過程以下:

  1. 若是workerCount < corePoolSize,則建立並啓動一個線程來執行新提交的任務;
  2. 若是workerCount >= corePoolSize,且線程池內的阻塞隊列未滿,則將任務添加到該阻塞隊列中;
  3. 若是workerCount >= corePoolSize && workerCount < maximumPoolSize,且線程池內的阻塞隊列已滿,則建立並啓動一個線程來執行新提交的任務;
  4. 若是workerCount >= maximumPoolSize,而且線程池內的阻塞隊列已滿, 則根據拒絕策略來處理該任務, 默認的處理方式是直接拋異常。

這裏要注意一下 addWorker(null, false) ,也就是建立一個線程,但並無傳入任務,由於任務已經被添加到workQueue中了,因此worker在執行的時候,會直接從workQueue中獲取任務。因此,在 workerCountOf(recheck) == 0 時執行 addWorker(null, false) 也是爲了保證線程池在RUNNING狀態下必需要有一個線程來執行任務。

execute方法執行流程以下:

3、addWorker方法

addWorker方法的主要工做是在線程池中建立一個新的線程並執行,firstTask參數 用於指定新增的線程執行的第一個任務,core參數爲true表示在新增線程時會判斷當前活動線程數是否少於corePoolSize,false表示新增線程前須要判斷當前活動線程數是否少於maximumPoolSize,代碼以下:

/**
 * Checks if a new worker can be added with respect to current
 * pool state and the given bound (either core or maximum). If so,
 * the worker count is adjusted accordingly, and, if possible, a
 * new worker is created and started, running firstTask as its
 * first task. This method returns false if the pool is stopped or
 * eligible to shut down. It also returns false if the thread
 * factory fails to create a thread when asked.  If the thread
 * creation fails, either due to the thread factory returning
 * null, or due to an exception (typically OutOfMemoryError in
 * Thread.start()), we roll back cleanly.
 *
 * @param firstTask the task the new thread should run first (or
 * null if none). Workers are created with an initial first task
 * (in method execute()) to bypass queuing when there are fewer
 * than corePoolSize threads (in which case we always start one),
 * or when the queue is full (in which case we must bypass queue).
 * Initially idle threads are usually created via
 * prestartCoreThread or to replace other dying workers.
 *
 * @param core if true use corePoolSize as bound, else
 * maximumPoolSize. (A boolean indicator is used here rather than a
 * value to ensure reads of fresh values after checking other pool
 * state).
 * @return true if successful
 */
/**
 * 檢查任務是否能夠提交
 */
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    // 外層循環
    for (;;) {
        // 獲取運行狀態
        int c = ctl.get();
        int rs = runStateOf(c);

        /*
         * 這個if判斷
         * 若是rs >= SHUTDOWN,則表示此時再也不接收新任務;
         * 接着判斷如下3個條件,只要有1個不知足,則返回false:
         * 1. rs == SHUTDOWN,這時表示關閉狀態,再也不接受新提交的任務,但卻能夠繼續處理阻塞隊列中已保存的任務
         * 2. firsTask爲空
         * 3. 阻塞隊列不爲空
         * 
         * 首先考慮rs == SHUTDOWN的狀況
         * 這種狀況下不會接受新提交的任務,因此在firstTask不爲空的時候會返回false;
         * 而後,若是firstTask爲空,而且workQueue也爲空,則返回false,
         * 由於隊列中已經沒有任務了,不須要再添加線程了
         */
        // Check if queue empty only if necessary. 檢查線程池是否關閉
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        // 內層循環
        for (;;) {
            // 獲取線程數
            int wc = workerCountOf(c);
            // 工做線程大於容量 或者大於 核心或最大線程數
            /*
             * 若是wc超過CAPACITY,也就是ctl的低29位的最大值(二進制是29個1),返回false;
             * 這裏的core是addWorker方法的第二個參數,若是爲true表示根據corePoolSize來比較,
             * 若是爲false則根據maximumPoolSize來比較。
             */
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // CAS 線程數增長, 成功則調到外層循環
            /*
             * 嘗試增長workerCount,若是成功,則跳出第一個for循環
             */
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 若是增長workerCount失敗,則從新獲取ctl的值
            c = ctl.get();  // Re-read ctl
            // 若是當前的運行狀態不等於rs,說明狀態已被改變,返回第一個for循環繼續執行
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    /**
     * 建立新worker 開始新線程
     */
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 根據firstTask來建立Worker對象
        w = new Worker(firstTask);
        // 每個Worker對象都會建立一個線程
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            // 加鎖
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                /*
                 * rs < SHUTDOWN表示是RUNNING狀態;
                 * 若是rs是RUNNING狀態或者rs是SHUTDOWN狀態而且firstTask爲null,向線程池中添加線程。
                 * 由於在SHUTDOWN時不會在添加新的任務,但仍是會執行workQueue中的任務
                 */
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 判斷線程是否存活, 已存活拋出非法異常
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //  設置包含池中的全部工做線程。僅在持有mainLock時訪問 workers是 HashSet 集合
                    workers.add(w);
                    int s = workers.size();
                    // 設置池最大大小, 並將 workerAdded設置爲 true
                    // largestPoolSize記錄着線程池中出現過的最大線程數量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                // 解鎖
                mainLock.unlock();
            }
            // 添加成功 開始啓動線程 並將 workerStarted 設置爲 true
            if (workerAdded) {
                // 啓動線程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 啓動線程失敗
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

注意一下這裏的 t.start() 這個語句,啓動時會調用Worker類中的run方法,Worker自己實現了Runnable接口,因此一個Worker類型的對象也是一個線程。

4、Worker類

工做線程:線程池建立線程時,會將線程封裝成工做線程Worker,接下來看看源碼:

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

Worker類繼承了AQS,並實現了Runnable接口,注意其中的 firstTask 和 thread 屬性: firstTask 用它來保存傳入的任務; thread 是在調用構造方法時經過 ThreadFactory 來建立的線程,是用來處理任務的線程。

在調用構造方法時,須要把任務傳入,這裏經過 getThreadFactory().newThread(this)來新建一個線程, newThread 方法傳入的參數是this,由於Worker自己繼承了Runnable接口,也就是一個線程,因此一個Worker對象在啓動的時候會調用Worker類中的run方法。

Worker繼承了AQS,使用AQS來實現獨佔鎖的功能。爲何不使用ReentrantLock來實現呢?能夠看到tryAcquire方法,它是不容許重入的,而ReentrantLock是容許重入的:

  1. lock方法一旦獲取了獨佔鎖,表示當前線程正在執行任務中;
  2. 若是正在執行任務,則不該該中斷線程;
  3. 若是該線程如今不是獨佔鎖的狀態,也就是空閒的狀態,說明它沒有在處理任務,這時能夠對該線程進行中斷;
  4. 線程池在執行shutdown方法或tryTerminate方法時會調用interruptIdleWorkers方法來中斷空閒的線程,interruptIdleWorkers方法會使用tryLock方法來判斷線程池中的線程是不是空閒狀態;
  5. 之因此設置爲不可重入,是由於咱們不但願任務在調用像setCorePoolSize這樣的線程池控制方法時從新獲取鎖。若是使用ReentrantLock,它是可重入的,這樣若是在任務中調用瞭如setCorePoolSize這類線程池控制的方法,會中斷正在運行的線程。

因此,Worker繼承自AQS,用於判斷線程是否空閒以及是否能夠被中斷。

此外,在構造方法中執行了 setState(-1) ,把state變量設置爲-1,爲何這麼作呢?是由於AQS中默認的state是0,若是剛建立了一個Worker對象,尚未執行任務時,這時就不該該被中斷,看一下tryAquire方法:

protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }

tryAcquire方法是根據state是不是0來判斷的,因此,setState(-1);將state設置爲-1是爲了禁止在執行任務前對線程進行中斷。

正由於如此,在runWorker方法中會先調用Worker對象的unlock方法將state設置爲0。

5、runWorker方法

在Worker類中的run方法調用了runWorker方法來執行任務,runWorker方法的代碼以下:

/**
 * Main worker run loop.  Repeatedly gets tasks from queue and
 * executes them, while coping with a number of issues:
 *
 * 1. We may start out with an initial task, in which case we
 * don't need to get the first one. Otherwise, as long as pool is
 * running, we get tasks from getTask. If it returns null then the
 * worker exits due to changed pool state or configuration
 * parameters.  Other exits result from exception throws in
 * external code, in which case completedAbruptly holds, which
 * usually leads processWorkerExit to replace this thread.
 *
 * 2. Before running any task, the lock is acquired to prevent
 * other pool interrupts while the task is executing, and then we
 * ensure that unless pool is stopping, this thread does not have
 * its interrupt set.
 *
 * 3. Each task run is preceded by a call to beforeExecute, which
 * might throw an exception, in which case we cause thread to die
 * (breaking loop with completedAbruptly true) without processing
 * the task.
 *
 * 4. Assuming beforeExecute completes normally, we run the task,
 * gathering any of its thrown exceptions to send to afterExecute.
 * We separately handle RuntimeException, Error (both of which the
 * specs guarantee that we trap) and arbitrary Throwables.
 * Because we cannot rethrow Throwables within Runnable.run, we
 * wrap them within Errors on the way out (to the thread's
 * UncaughtExceptionHandler).  Any thrown exception also
 * conservatively causes thread to die.
 *
 * 5. After task.run completes, we call afterExecute, which may
 * also throw an exception, which will also cause thread to
 * die. According to JLS Sec 14.20, this exception is the one that
 * will be in effect even if task.run throws.
 *
 * The net effect of the exception mechanics is that afterExecute
 * and the thread's UncaughtExceptionHandler have as accurate
 * information as we can provide about any problems encountered by
 * user code.
 *
 * @param w the worker
 */
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // 獲取第一個任務
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 容許中斷
    w.unlock(); // allow interrupts
    // 是否由於異常退出循環
    boolean completedAbruptly = true;
    try {
        // 若是task爲空,則經過getTask來獲取任務
        // getTask()方法循環獲取工做隊列的任務
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

這裏說明一下第一個if判斷,目的是:

  • 若是線程池正在中止,那麼要保證當前線程是中斷狀態;
  • 若是不是的話,則要保證當前線程不是中斷狀態;

這裏要考慮在執行該if語句期間可能也執行了shutdownNow方法,shutdownNow方法會把狀態設置爲STOP,回顧一下STOP狀態:

不能接受新任務,也不處理隊列中的任務,會中斷正在處理任務的線程。在線程池處於 RUNNING 或 SHUTDOWN 狀態時,調用 shutdownNow() 方法會使線程池進入到該狀態。

STOP狀態要中斷線程池中的全部線程,而這裏使用 Thread.interrupted() 來判斷是否中斷是爲了確保在RUNNING或者SHUTDOWN狀態時線程是非中斷狀態的,由於 Thread.interrupted() 方法會復位中斷的狀態。

總結一下runWorker方法的執行過程:

  1. while循環不斷地經過getTask()方法獲取任務;
  2. getTask()方法從阻塞隊列中取任務;
  3. 若是線程池正在中止,那麼要保證當前線程是中斷狀態,不然要保證當前線程不是中斷狀態;
  4. 調用task.run()執行任務;
  5. 若是task爲null則跳出循環,執行processWorkerExit()方法;
  6. runWorker方法執行完畢,也表明着Worker中的run方法執行完畢,銷燬線程。

這裏的beforeExecute方法和afterExecute方法在ThreadPoolExecutor類中是空的,留給子類來實現。

completedAbruptly變量來表示在執行任務過程當中是否出現了異常,在processWorkerExit方法中會對該變量的值進行判斷。

6、getTask方法

getTask方法用來從阻塞隊列中取任務,代碼以下:

/**
 * Performs blocking or timed wait for a task, depending on
 * current configuration settings, or returns null if this worker
 * must exit because of any of:
 * 1. There are more than maximumPoolSize workers (due to
 *    a call to setMaximumPoolSize).
 * 2. The pool is stopped.
 * 3. The pool is shutdown and the queue is empty.
 * 4. This worker timed out waiting for a task, and timed-out
 *    workers are subject to termination (that is,
 *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
 *    both before and after the timed wait, and if the queue is
 *    non-empty, this worker is not the last thread in the pool.
 *
 * @return task, or null if the worker must exit, in which case
 *         workerCount is decremented
 */
private Runnable getTask() {
    // timeOut變量的值表示上次從阻塞隊列中取任務時是否超時
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        /*
         * 若是線程池狀態rs >= SHUTDOWN,也就是非RUNNING狀態,再進行如下判斷:
         * 1. rs >= STOP,線程池是否正在stop;
         * 2. 阻塞隊列是否爲空。
         * 若是以上條件知足,則將workerCount減1並返回null。
         * 由於若是當前線程池狀態的值是SHUTDOWN或以上時,不容許再向阻塞隊列中添加任務。
         */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // 容許核心線程超時 或者當前線程數大於核心線程數
        /* timed變量用於判斷是否須要進行超時控制。
         * allowCoreThreadTimeOut默認是false,也就是核心線程不容許進行超時;
         * wc > corePoolSize,表示當前線程池中的線程數量大於核心線程數量;
         * 對於超過核心線程數量的這些線程,須要進行超時控制
         */
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        /*
         * wc > maximumPoolSize的狀況是由於可能在此方法執行階段同時執行了setMaximumPoolSize方法;
         * timed && timedOut 若是爲true,表示當前操做須要進行超時控制,而且上次從阻塞隊列中獲取任務發生了超時
         * 接下來判斷,若是有效線程數量大於1,或者阻塞隊列是空的,那麼嘗試將workerCount減1;
         * 若是減1失敗,則返回重試。
         * 若是wc == 1時,也就說明當前線程是線程池中惟一的一個線程了。
         */
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            /*
             * 根據timed來判斷,若是爲true,則經過阻塞隊列的poll方法進行超時控制,若是在keepAliveTime時間內沒有獲取到任務,則返回null;
             * 不然經過take方法,若是這時隊列爲空,則take方法會阻塞直到隊列不爲空。
             * 
             */
            Runnable r = timed ?
                    // 從工做隊列poll任務,不阻塞
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    // 阻塞等待任務
                workQueue.take();
            if (r != null)
                return r;
            // 若是 r == null,說明已經超時,timedOut設置爲true
            timedOut = true;
        } catch (InterruptedException retry) {
            // 若是獲取任務時當前線程發生了中斷,則設置timedOut爲false並返回循環重試
            timedOut = false;
        }
    }
}

這裏重要的地方是第二個if判斷,目的是控制線程池的有效線程數量。由上文中的分析能夠知道,在執行execute方法時,若是當前線程池的線程數量超過了corePoolSize且小於maximumPoolSize,而且workQueue已滿時,則能夠增長工做線程,但這時若是超時沒有獲取到任務,也就是timedOut爲true的狀況,說明workQueue已經爲空了,也就說明了當前線程池中不須要那麼多線程來執行任務了,能夠把多於corePoolSize數量的線程銷燬掉,保持線程數量在corePoolSize便可。

何時會銷燬?固然是runWorker方法執行完以後,也就是Worker中的run方法執行完,由JVM自動回收。

getTask方法返回null時,在runWorker方法中會跳出while循環,而後會執行processWorkerExit方法。

ThreadPoolExecutor中線程執行任務的示意圖以下圖所示。

線程池中的線程執行任務分兩種狀況,以下。

1)在execute()方法中建立一個線程時,會讓這個線程執行當前任務。

2)這個線程執行完上圖中1的任務後,會反覆從BlockingQueue獲取任務來執行。

7、processWorkerExit方法

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 若是completedAbruptly值爲true,則說明線程執行時出現了異常,須要將workerCount減1;
    // 若是線程執行時沒有出現異常,說明在getTask()方法中已經已經對workerCount進行了減1操做,這裏就沒必要再減了。  
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //統計完成的任務數
        completedTaskCount += w.completedTasks;
        // 從workers中移除,也就表示着從線程池中移除了一個工做線程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    // 根據線程池狀態進行判斷是否結束線程池
    tryTerminate();
    int c = ctl.get();
    /*
     * 當線程池是RUNNING或SHUTDOWN狀態時,若是worker是異常結束,那麼會直接addWorker;
     * 若是allowCoreThreadTimeOut=true,而且等待隊列有任務,至少保留一個worker;
     * 若是allowCoreThreadTimeOut=false,workerCount很多於corePoolSize。
     */
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

至此,processWorkerExit執行完以後,工做線程被銷燬,以上就是整個工做線程的生命週期,從execute方法開始,Worker使用ThreadFactory建立新的工做線程,runWorker經過getTask獲取任務,而後執行任務,若是getTask返回null,進入processWorkerExit方法,整個線程結束,如圖所示:

8、tryTerminate方法

tryTerminate方法根據線程池狀態進行判斷是否結束線程池,代碼以下:

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        /*
         * 當前線程池的狀態爲如下幾種狀況時,直接返回:
         * 1. RUNNING,由於還在運行中,不能中止;
         * 2. TIDYING或TERMINATED,由於線程池中已經沒有正在運行的線程了;
         * 3. SHUTDOWN而且等待隊列非空,這時要執行完workQueue中的task;
         */
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        // 若是線程數量不爲0,則中斷一個空閒的工做線程,並返回
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 這裏嘗試設置狀態爲TIDYING,若是設置成功,則調用terminated方法
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // terminated方法默認什麼都不作,留給子類實現
                    terminated();
                } finally {
                    // 設置狀態爲TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

 interruptIdleWorkers(ONLY_ONE); 的做用是由於在getTask方法中執行 workQueue.take() 時,若是不執行中斷會一直阻塞。在下面介紹的shutdown方法中,會中斷全部空閒的工做線程,若是在執行shutdown時工做線程沒有空閒,而後又去調用了getTask方法,這時若是workQueue中沒有任務了,調用 workQueue.take() 時就會一直阻塞。因此每次在工做線程結束時調用tryTerminate方法來嘗試中斷一個空閒工做線程,避免在隊列爲空時取任務一直阻塞的狀況。

9、shutdown方法

shutdown方法要將線程池切換到SHUTDOWN狀態,並調用interruptIdleWorkers方法請求中斷全部空閒的worker,最後調用tryTerminate嘗試結束線程池。

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 安全策略判斷
        checkShutdownAccess();
        // 切換狀態爲SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 中斷空閒線程
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // 嘗試結束線程池
    tryTerminate();
}

這裏思考一個問題:在runWorker方法中,執行任務時對Worker對象w進行了lock操做,爲何要在執行任務的時候對每一個工做線程都加鎖呢?

下面仔細分析一下:

  • 在getTask方法中,若是這時線程池的狀態是SHUTDOWN而且workQueue爲空,那麼就應該返回null來結束這個工做線程,而使線程池進入SHUTDOWN狀態須要調用shutdown方法;
  • shutdown方法會調用interruptIdleWorkers來中斷空閒的線程,interruptIdleWorkers持有mainLock,會遍歷workers來逐個判斷工做線程是否空閒。但getTask方法中沒有mainLock;
  • 在getTask中,若是判斷當前線程池狀態是RUNNING,而且阻塞隊列爲空,那麼會調用 workQueue.take() 進行阻塞;
  • 若是在判斷當前線程池狀態是RUNNING後,這時調用了shutdown方法把狀態改成了SHUTDOWN,這時若是不進行中斷,那麼當前的工做線程在調用了 workQueue.take() 後會一直阻塞而不會被銷燬,由於在SHUTDOWN狀態下不容許再有新的任務添加到workQueue中,這樣一來線程池永遠都關閉不了了;
  • 由上可知,shutdown方法與getTask方法(從隊列中獲取任務時)存在競態條件;
  • 解決這一問題就須要用到線程的中斷,也就是爲何要用interruptIdleWorkers方法。在調用 workQueue.take() 時,若是發現當前線程在執行以前或者執行期間是中斷狀態,則會拋出InterruptedException,解除阻塞的狀態;
  • 可是要中斷工做線程,還要判斷工做線程是不是空閒的,若是工做線程正在處理任務,就不該該發生中斷;
  • 因此Worker繼承自AQS,在工做線程處理任務時會進行lock,interruptIdleWorkers在進行中斷時會使用tryLock來判斷該工做線程是否正在處理任務,若是tryLock返回true,說明該工做線程當前未執行任務,這時才能夠被中斷。

下面就來分析一下interruptIdleWorkers方法。

10、interruptIdleWorkers方法

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

interruptIdleWorkers遍歷workers中全部的工做線程,若線程沒有被中斷tryLock成功,就中斷該線程。

爲何須要持有mainLock?由於workers是HashSet類型的,不能保證線程安全。

11、shutdownNow方法

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        // 中斷全部工做線程,不管是否空閒
        interruptWorkers();
        // 取出隊列中沒有被執行的任務
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

shutdownNow方法與shutdown方法相似,不一樣的地方在於:

  1. 設置狀態爲STOP;
  2. 中斷全部工做線程,不管是不是空閒的;
  3. 取出阻塞隊列中沒有被執行的任務並返回。

shutdownNow方法執行完以後調用tryTerminate方法,該方法在上文已經分析過了,目的就是使線程池的狀態設置爲TERMINATED。

 

參考:深刻理解Java線程池:ThreadPoolExecutor

相關文章
相關標籤/搜索