深刻理解線程池--JDK1.8

線程池的參數

/*
這個是用一個int來表示workerCount和runState的,其中runState佔int的高3位,
其它29位爲workerCount的值。

workerCount:當前活動的線程數;
runState:線程池的當前狀態。

用AtomicInteger是由於其在併發下使用compareAndSet效率很是高;
當改變當前活動的線程數時只對低29位操做,如每次加一減一,workerCount的值變了,
但不會影響高3位的runState的值。當改變當前狀態的時候,只對高3位操做,不會改變低29位的計數值。
這裏有一個假設,就是當前活動的線程數不會超過29位能表示的值,即不會超過536870911,
就目前以及可預見的很長一段時間來說,這個值是足夠用了
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//COUNT_BITS,就是用來表示workerCount佔用一個int的位數,其值爲前面說的29
private static final int COUNT_BITS = Integer.SIZE - 3;

/*
CAPACITY爲29位能表示的最大容量,即workerCount實際能用的最大值。
其值的二進制爲:00011111111111111111111111111111(佔29位,29個1)
*/
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

/*
如下常量是線程池的狀態,狀態存儲在int的高3位,因此要左移29位。
騰出的低29位來表示workerCount
注意,這5個狀態是有大小關係的。RUNNING<shutdown<stop<tidying<terminated 當須要判斷多個狀態時,只須要用<或="">來判斷就能夠了
*/

/*
RUNNING的含義:線程池能接受新任務,而且能夠運行隊列中的任務
-1的二進制爲32個1,移位後爲:11100000000000000000000000000000
*/
private static final int RUNNING    = -1 << COUNT_BITS;

/*
SHUTDOWN的含義:再也不接受新任務,但仍能夠執行隊列中的任務
0的二進制爲32個0,移位後仍是全0
*/
private static final int SHUTDOWN   =  0 << COUNT_BITS;

/*
STOP的含義:再也不接受新任務,再也不執行隊列中的任務,並且要中斷正在處理的任務
1的二進制爲前面31個0,最後一個1,移位後爲:00100000000000000000000000000000
*/
private static final int STOP       =  1 << COUNT_BITS;

/*
TIDYING的含義:全部任務均已終止,workerCount的值爲0,
轉到TIDYING狀態的線程即將要執行terminated()鉤子方法.
2的二進制爲00000000000000000000000000000010
移位後01000000000000000000000000000000
*/
private static final int TIDYING    =  2 << COUNT_BITS;

/*
TERMINATED的含義:terminated()方法執行結束.
3的二進制爲00000000000000000000000000000011
移位後01100000000000000000000000000000
*/
private static final int TERMINATED =  3 << COUNT_BITS;
各狀態之間可能的轉變有如下幾種:
RUNNING -> SHUTDOWN
	調用了shutdown方法,線程池實現了finalize方法,在裏面調用了shutdown方法,所以shutdown多是在finalize中被隱式調用的
(RUNNING or SHUTDOWN) -> STOP
	調用了shutdownNow方法
SHUTDOWN -> TIDYING
	當隊列和線程池均爲空的時候
STOP -> TIDYING
	當線程池爲空的時候
TIDYING -> TERMINATED
	terminated()鉤子方法調用完畢
/*
傳入的參數爲存儲runState和workerCount的int值,這個方法用於取出runState的值。
~爲按位取反操做,~CAPACITY值爲:11100000000000000000000000000000,
再同參數作&操做,就將低29位置0了,而高3位仍是保持原先的值,也就是runState的值
*/
private static int runStateOf(int c)     { return c & ~CAPACITY; }

/*
傳入的參數爲存儲runState和workerCount的int值,這個方法用於取出workerCount的值。
由於CAPACITY值爲:00011111111111111111111111111111,因此&操做將參數的高3位置0了,
保留參數的低29位,也就是workerCount的值。
*/
private static int workerCountOf(int c)  { return c & CAPACITY; }

/*
將runState和workerCount存到同一個int中,這裏的rs就是runState,
是已經移位過的值,填充返回值的高3位,wc填充返回值的低29位
*/
private static int ctlOf(int rs, int wc) { return rs | wc; }

源碼分析,線程池是如何執行任務的

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */

        /*
          *分三步進行:
         *
          * 1.若是少於corePoolSize線程正在運行,請嘗試
          *用給定的命令做爲第一個啓動一個新的線程
          *任務。 對addWorker的調用會自動檢查runState和
          * workerCount,從而防止將添加的錯誤警報
          *線程,當它不該該經過返回false。
         *
          * 2.若是任務能夠成功排隊,那麼咱們仍然須要
          *再次檢查咱們是否應該添加一個線程
          *(由於現有的自上次檢查以來死亡)或者那個
          自從進入這個方法以來,池關閉了。 因此咱們
          *從新檢查狀態,若是有必要的話回滾入隊
          *中止,或者若是沒有的話,開始一個新的線程。
         *
          * 3.若是咱們不能排隊任務,那麼咱們嘗試添加一個新的
          *線程。 若是失敗了,咱們知道咱們已經關閉了,或者已經飽和了
          *所以拒絕任務。
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

處理過程: 1.活動線程數< corePoolSize 小於核心線程數時,直接啓動新的線程,而且添加到工做線程中。(addWorker true時,會從新檢查workerCount的值) 2.活動線程數 >corePoolSize 時,若是是運行時狀態,而且隊列未滿,添加到隊列中, 須要再次檢查狀態,1.不是running,而且移出失敗,則拒絕任務。2。處於RUNNing狀態,或者移出任務失敗的時候,若是沒有活動線程,添加一個空的任務,表示不在接受新的任務。3.隱藏的狀況,若是是運行狀態,而且能夠移出成功,則正常執行。 3.不是運行狀態,而且隊列已滿,啓動新的線程失敗,則拒絕任務。併發

/**
     * Checks if a new worker can be added with respect to current
     * pool state and the given bound (either core or maximum). If so,
     * the worker count is adjusted accordingly, and, if possible, a
     * new worker is created and started, running firstTask as its
     * first task. This method returns false if the pool is stopped or
     * eligible to shut down. It also returns false if the thread
     * factory fails to create a thread when asked.  If the thread
     * creation fails, either due to the thread factory returning
     * null, or due to an exception (typically OutOfMemoryError in
     * Thread.start()), we roll back cleanly.
     *
     * @param firstTask the task the new thread should run first (or
     * null if none). Workers are created with an initial first task
     * (in method execute()) to bypass queuing when there are fewer
     * than corePoolSize threads (in which case we always start one),
     * or when the queue is full (in which case we must bypass queue).
     * Initially idle threads are usually created via
     * prestartCoreThread or to replace other dying workers.
     *
     * @param core if true use corePoolSize as bound, else
     * maximumPoolSize. (A boolean indicator is used here rather than a
     * value to ensure reads of fresh values after checking other pool
     * state).
     * @return true if successful
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

addWorker有兩個參數:Runnable類型的firstTask,用於指定新增的線程執行的第一個任務;boolean類型的core,true表示在新增線程時會判斷當前活動線程數是否少於corePoolSize,false表示新增線程前須要判斷當前活動線程數是否少於maximumPoolSize。函數

該方法的返回值表明是否成功新增一個線程。oop

// Check if queue empty only if necessary.
            // 這條語句等價:rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null ||
            // workQueue.isEmpty())
            // 知足下列調價則直接返回false,線程建立失敗:
            // rs > SHUTDOWN:STOP || TIDYING || TERMINATED 此時再也不接受新的任務,且全部任務執行結束
            // rs = SHUTDOWN:firtTask != null 此時再也不接受任務,可是仍然會執行隊列中的任務
            // rs = SHUTDOWN:firtTask == null見execute方法的addWorker(null,
            // false),任務爲null && 隊列爲空
            // 最後一種狀況也就是說SHUTDONW狀態下,若是隊列不爲空還得接着往下執行,爲何?add一個null任務目的究竟是什麼?
            // 看execute方法只有workCount==0的時候firstTask纔會爲null結合這裏的條件就是線程池SHUTDOWN了再也不接受新任務
            // 可是此時隊列不爲空,那麼還得建立線程把任務給執行完才行。
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) 
// 等價實現
rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()),
private boolean addWorker(Runnable firstTask, boolean core) {
        retry: for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);// 當前線程池狀態

            // Check if queue empty only if necessary.
            // 這條語句等價:rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null ||
            // workQueue.isEmpty())
            // 知足下列調價則直接返回false,線程建立失敗:
            // rs > SHUTDOWN:STOP || TIDYING || TERMINATED 此時再也不接受新的任務,且全部任務執行結束
            // rs = SHUTDOWN:firtTask != null 此時再也不接受任務,可是仍然會執行隊列中的任務
            // rs = SHUTDOWN:firtTask == null見execute方法的addWorker(null,
            // false),任務爲null && 隊列爲空
            // 最後一種狀況也就是說SHUTDONW狀態下,若是隊列不爲空還得接着往下執行,爲何?add一個null任務目的究竟是什麼?
            // 看execute方法只有workCount==0的時候firstTask纔會爲null結合這裏的條件就是線程池SHUTDOWN了再也不接受新任務
            // 可是此時隊列不爲空,那麼還得建立線程把任務給執行完才行。
            if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
                return false;

            // 走到這的情形:
            // 1.線程池狀態爲RUNNING
            // 2.SHUTDOWN狀態,但隊列中還有任務須要執行
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))// 原子操做遞增workCount
                    break retry;// 操做成功跳出的重試的循環
                c = ctl.get(); // Re-read ctl
                if (runStateOf(c) != rs)// 若是線程池的狀態發生變化則重試
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        // wokerCount遞增成功

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 併發的訪問線程池workers對象必須加鎖
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    // RUNNING狀態 || SHUTDONW狀態下清理隊列中剩餘的任務
                    if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 將新啓動的線程添加到線程池中
                        workers.add(w);
                        // 更新largestPoolSize
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 啓動新添加的線程,這個線程首先執行firstTask,而後不停的從隊列中取任務執行
                // 當等待keepAlieTime尚未任務執行則該線程結束。見runWoker和getTask方法的代碼。
                if (workerAdded) {
                    t.start();// 最終執行的是ThreadPoolExecutor的runWoker方法
                    workerStarted = true;
                }
            }
        } finally {
            // 線程啓動失敗,則從wokers中移除w並遞減wokerCount
            if (!workerStarted)
                // 遞減wokerCount會觸發tryTerminate方法
                addWorkerFailed(w);
        }
        return workerStarted;
    }
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // Worker的構造函數中抑制了線程中斷setState(-1),因此這裏須要unlock從而容許中斷
        w.unlock();
        // 用於標識是否異常終止,finally中processWorkerExit的方法會有不一樣邏輯
        // 爲true的狀況:1.執行任務拋出異常;2.被中斷。
        boolean completedAbruptly = true;
        try {
            // 若是getTask返回null那麼getTask中會將workerCount遞減,若是異常了這個遞減操做會在processWorkerExit中處理
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted. This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
                        && !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 任務執行前能夠插入一些處理,子類重載該方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();// 執行用戶任務
                    } catch (RuntimeException x) {
                        thrown = x;
                        throw x;
                    } catch (Error x) {
                        thrown = x;
                        throw x;
                    } catch (Throwable x) {
                        thrown = x;
                        throw new Error(x);
                    } finally {
                        // 和beforeExecute同樣,留給子類去重載
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }

            completedAbruptly = false;
        } finally {
            // 結束線程的一些清理工做
            processWorkerExit(w, completedAbruptly);
        }
    }
相關文章
相關標籤/搜索