線程池運行模型源碼全解析

在上一篇文章《從0到1玩轉線程池》中,咱們瞭解了線程池的使用方法,以及向線程池中提交任務的完整流程和ThreadPoolExecutor.execute方法的源代碼。在這篇文章中,咱們將會從頭閱讀線程池ThreadPoolExecutor類的源代碼,深刻剖析線程池從提交任務到執行任務的完整流程,從而創建起完整的線程池運行模型。java

查看JDK源碼的方式

在IDE中,例如IDEA裏,咱們能夠點擊咱們樣例代碼裏的ThreadPoolExecutor類跳轉到JDK中ThreadPoolExecutor類的源代碼。在源代碼中咱們能夠看到不少java.util.concurrent包的締造者大牛「Doug Lea」所留下的各類註釋,下面的圖片就是該類源代碼的一個截圖。數組

這些註釋的內容很是有參考價值,建議有能力的讀者朋友能夠本身閱讀一遍。下面,咱們就開始閱讀ThreadPoolExecutor的源代碼吧。安全

控制變量與線程池生命週期

ThreadPoolExecutor類定義的開頭,咱們能夠看到以下的幾行代碼:bash

// 控制變量,前3位表示狀態,剩下的數據位表示有效的線程數
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Integer的位數減去3位狀態位就是線程數的位數
private static final int COUNT_BITS = Integer.SIZE - 3;
// CAPACITY就是線程數的上限(含),即2^COUNT_BITS - 1個
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
複製代碼

第一行是一個用來做爲控制變量的整型值,即一個Integer。之因此要用AtomicInteger類是由於要保證多線程安全,在本系列以後的文章中會對AtomicInteger進行具體介紹。一個整型通常是32位,可是這裏的代碼爲了保險起見,仍是使用了Integer.SIZE來表示整型的總位數。這裏的「位」指的是數據位(bit),在計算機中,8bit = 1字節,1024字節 = 1KB,1024KB = 1MB。每一位都是一個0或1的數字,咱們若是把整型想象成一個二進制(0或1)的數組,那麼一個Integer就是32個數字的數組。其中,前三個被用來表示狀態,那麼咱們就能夠表示2^3 = 8個不一樣的狀態了。剩下的29位二進制數字都會被用於表示當前線程池中有效線程的數量,上限就是(2^29 - 1)個,即常量CAPACITY數據結構

以後的部分列出了線程池的全部狀態:多線程

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
複製代碼

在這裏能夠忽略數字後面的<< COUNT_BITS,能夠把狀態簡單地理解爲前面的數字部分,這樣的簡化基本不影響結論。ui

各個狀態的解釋以下:this

  • RUNNING,正常運行狀態,能夠接受新的任務和處理隊列中的任務
  • SHUTDOWN,關閉中狀態,不能接受新任務,可是能夠處理隊列中的任務
  • STOP,中止中狀態,不能接受新任務,也不處理隊列中的任務,會中斷進行中的任務
  • TIDYING,待結束狀態,全部任務已經結束,線程數歸0,進入TIDYING狀態後將會運行terminated()方法
  • TERMINATED,結束狀態,terminated()方法調用完成後進入

這幾個狀態所對應的數字值是按照順序排列的,也就是說線程池的狀態只能從小到大變化,這也方便了經過數字比較來判斷狀態所在的階段,這種經過數字大小來比較狀態值的方法在ThreadPoolExecutor的源碼中會有大量的使用。spa

下圖是這五個狀態之間的變化過程: 線程

  1. 當線程池被建立時會處於RUNNING狀態,正常接受和處理任務;
  2. shutdown()方法被直接調用,或者在線程池對象被GC回收時經過finalize()方法隱式調用了shutdown()方法時,線程池會進入SHUTDOWN狀態。該狀態下線程池仍然會繼續執行完阻塞隊列中的任務,只是再也不接受新的任務了。當隊列中的任務被執行完後,線程池中的線程也會被回收。當隊列和線程都被清空後,線程池將進入TIDYING狀態;
  3. 在線程池處於RUNNING或者SHUTDOWN狀態時,若是有代碼調用了shutdownNow()方法,則線程池會進入STOP狀態。在STOP狀態下,線程池會直接清空阻塞隊列中待執行的任務,而後中斷全部正在進行中的任務並回收線程。當線程都被清空之後,線程池就會進入TIDYING狀態;
  4. 當線程池進入TIDYING狀態時,將會運行terminated()方法,該方法執行完後,線程池就會進入最終的TERMINATED狀態,完全結束。

到這裏咱們就已經清楚地瞭解了線程從剛被建立時的RUNNING狀態一直到最終的TERMINATED狀態的整個生命週期了。那麼當咱們要向一個RUNNING狀態的線程池提交任務時會發生些什麼呢?

execute方法的實現

咱們通常會使用execute方法提交咱們的任務,那麼線程池在這個過程當中作了什麼呢?在ThreadPoolExecutor類的execute()方法的源代碼中,咱們主要作了四件事:

  1. 若是當前線程池中的線程數小於核心線程數corePoolSize,則經過threadFactory建立一個新的線程,並把入參中的任務做爲第一個任務傳入該線程;
  2. 若是當前線程池中的線程數已經達到了核心線程數corePoolSize,那麼就會經過阻塞隊列workerQueueoffer方法來將任務添加到隊列中保存,並等待線程空閒後進行執行;
  3. 若是線程數已經達到了corePoolSize且阻塞隊列中沒法插入該任務(好比已滿),那麼線程池就會再增長一個線程來執行該任務,除非線程數已經達到了最大線程數maximumPoolSize
  4. 若是確實已經達到了最大線程數,那麼就會經過拒絕策略對象handler拒絕這個任務。

整體上的執行流程以下,下方的黑色同心圓表明流程結束:

這裏解釋一下阻塞隊列的定義,方便你們閱讀:

線程池中的阻塞隊列專門用於存放須要等待線程空閒的待執行任務,而阻塞隊列是這樣的一種數據結構,它是一個隊列(相似於一個List),能夠存放0到N個元素。咱們能夠對這個隊列進行插入和彈出元素的操做,彈出操做能夠理解爲是一個獲取並從隊列中刪除一個元素的操做。當隊列中沒有元素時,對這個隊列的獲取操做將會被阻塞,直到有元素被插入時纔會被喚醒;當隊列已滿時,對這個隊列的插入操做將會被阻塞,直到有元素被彈出後纔會被喚醒。

這樣的一種數據結構很是適合於線程池的場景,當一個工做線程沒有任務可處理時就會進入阻塞狀態,直到有新任務提交後才被喚醒。

線程池中經常使用的阻塞隊列通常有三種類型:直連隊列、無界隊列、有界隊列。不一樣的阻塞隊列類型會被線程池的行爲產生不一樣的影響,有興趣的讀者能夠在上一篇文章《從0到1玩轉線程池》中找到不一樣類型阻塞隊列的具體解釋。

下面是帶有註釋的源代碼,你們能夠和上面的流程對照起來參考一下:

public void execute(Runnable command) {
    // 檢查提交的任務是否爲空
    if (command == null)
        throw new NullPointerException();
    
    // 獲取控制變量值
    int c = ctl.get();
    // 檢查當前線程數是否達到了核心線程數
    if (workerCountOf(c) < corePoolSize) {
        // 未達到核心線程數,則建立新線程
        // 並將傳入的任務做爲該線程的第一個任務
        if (addWorker(command, true))
            // 添加線程成功則直接返回,不然繼續執行
            return;

        // 由於前面調用了耗時操做addWorker方法
        // 因此線程池狀態有可能發生了改變,從新獲取狀態值
        c = ctl.get();
    }

    // 判斷線程池當前狀態是不是運行中
    // 若是是則調用workQueue.offer方法將任務放入阻塞隊列
    if (isRunning(c) && workQueue.offer(command)) {
        // 由於執行了耗時操做「放入阻塞隊列」,因此從新獲取狀態值
        int recheck = ctl.get();
        // 若是當前狀態不是運行中,則將剛纔放入阻塞隊列的任務拿出,若是拿出成功,則直接拒絕這個任務
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            // 若是線程池中沒有線程了,那就建立一個
            addWorker(null, false);
    }
    // 若是放入阻塞隊列失敗(如隊列已滿),則添加一個線程
    else if (!addWorker(command, false))
        // 若是添加線程失敗(如已經達到了最大線程數),則拒絕任務
        reject(command);
}
複製代碼

從上面的源碼中咱們能夠知道,當一個任務被經過ThreadPoolExecutorexecute方法提交到線程池中執行時,這個任務有可能以兩種方式被執行:

  1. 直接在建立一個新的Worker時被做爲第一個任務傳入,由這個新建立的線程來執行;
  2. 把任務放入一個阻塞隊列,等待線程池中的工做線程Worker撈取任務進行執行。

這裏的這個Worker指的就是ThreadPoolExecutor.Worker類,這是一個ThreadPoolExecutor的內部類,用於對基礎線程類Thread進行包裝和對線程進行管理。那麼線程池究竟是怎麼利用Worker類來實現持續不斷地接收提交的任務並執行的呢?接下來,咱們經過ThreadPoolExecutor的源代碼來一步一步抽絲剝繭,揭開線程池運行模型的神祕面紗。

addWorker方法

在上文中的execute方法的代碼中咱們能夠看到線程池是經過addWorker方法來向線程池中添加新線程的,那麼新的線程又是如何運行起來的呢?

這裏咱們暫時跳過addWorker方法的詳細源代碼,由於雖然這個方法的代碼行數較多,可是功能相對比較直接,只是經過new Worker(firstTask)建立了一個表明線程的Worker對象,而後調用了這個對象所包含的Thread對象的start()方法。

咱們知道一旦調用了Thread類的start()方法,則這個線程就會開始執行建立線程時傳入的Runnable對象。從下面的Worker類構造器源代碼能夠看出,Worker類正是把本身(this引用)傳入了線程的構造器當中,因此這個線程啓動後就會執行Worker類的run()方法了,而在Workerrun()方法中只執行了一行很簡單的代碼runWorker(this)

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

public void run() {
    runWorker(this);
}
複製代碼

runWorker方法的實現

咱們看到線程池中的線程在啓動時會調用對應的Worker類的runWorker方法,而這裏就是整個線程池任務執行的核心所在了。runWorker方法中包含有一個相似無限循環的while語句,讓worker對象能夠一直持續不斷地執行提交到線程池中的新任務或者等待下一個新任務的提交。

你們能夠配合代碼上帶有的註釋來理解該方法的具體實現:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 將worker的狀態重置爲正常狀態,由於state狀態值在構造器中被初始化爲-1
    w.unlock();
    // 經過completedAbruptly變量的值判斷任務是否正常執行完成
    boolean completedAbruptly = true;
    try {
        // 若是task爲null就經過getTask方法獲取阻塞隊列中的下一個任務
        // getTask方法通常不會返回null,因此這個while相似於一個無限循環
        // worker對象就經過這個方法的持續運行來不斷處理新的任務
        while (task != null || (task = getTask()) != null) {
            // 每一次任務的執行都必須獲取鎖來保證下方臨界區代碼的線程安全
            w.lock();
            
            // 若是狀態值大於等於STOP(狀態值是有序的,即STOP、TIDYING、TERMINATED)
            // 且當前線程尚未被中斷,則主動中斷線程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();

            // 開始
            try {
                // 執行任務前處理操做,默認是一個空實現
                // 在子類中能夠經過重寫來改變任務執行前的處理行爲
                beforeExecute(wt, task);

                // 經過thrown變量保存任務執行過程當中拋出的異常
                // 提供給下面finally塊中的afterExecute方法使用
                Throwable thrown = null;
                try {
                    // *** 重要:實際執行任務的代碼
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    // 由於Runnable接口的run方法中不能拋出Throwable對象
                    // 因此要包裝成Error對象拋出
                    thrown = x; throw new Error(x);
                } finally {
                    // 執行任務後處理操做,默認是一個空實現
                    // 在子類中能夠經過重寫來改變任務執行後的處理行爲
                    afterExecute(task, thrown);
                }
            } finally {
                // 將循環變量task設置爲null,表示已處理完成
                task = null;
                // 累加當前worker已經完成的任務數
                w.completedTasks++;
                // 釋放while體中第一行獲取的鎖
                w.unlock();
            }
        }

        // 將completedAbruptly變量設置爲false,表示任務正常處理完成
        completedAbruptly = false;
    } finally {
        // 銷燬當前的worker對象,並完成一些諸如完成任務數量統計之類的輔助性工做
        // 在線程池當前狀態小於STOP的狀況下會建立一個新的worker來替換被銷燬的worker
        processWorkerExit(w, completedAbruptly);
    }
}
複製代碼

runWorker方法的源代碼中有兩個比較重要的方法調用,一個是while條件中對getTask方法的調用,一個是在方法的最後對processWorkerExit方法的調用。下面是對這兩個方法更詳細的解釋。

getTask方法在阻塞隊列中有待執行的任務時會從隊列中彈出一個任務並返回,若是阻塞隊列爲空,那麼就會阻塞等待新的任務提交到隊列中直到超時(在一些配置下會一直等待而不超時),若是在超時以前獲取到了新的任務,那麼就會將這個任務做爲返回值返回。因此通常getTask方法是不會返回null的,只會阻塞等待下一個任務並在以後將這個新任務做爲返回值返回。

getTask方法返回null時會致使當前Worker退出,當前線程被銷燬。在如下狀況下getTask方法纔會返回null:

  1. 當前線程池中的線程數超過了最大線程數。這是由於運行時經過調用setMaximumPoolSize修改了最大線程數而致使的結果;
  2. 線程池處於STOP狀態。這種狀況下全部線程都應該被當即回收銷燬;
  3. 線程池處於SHUTDOWN狀態,且阻塞隊列爲空。這種狀況下已經不會有新的任務被提交到阻塞隊列中了,因此線程應該被銷燬;
  4. 線程能夠被超時回收的狀況下等待新任務超時。線程被超時回收通常有如下兩種狀況:
    • 超出核心線程數部分的線程等待任務超時
    • 容許核心線程超時(線程池配置)的狀況下線程等待任務超時

processWorkerExit方法會銷燬當前線程對應的Worker對象,並執行一些累加總處理任務數等輔助操做,但在線程池當前狀態小於STOP的狀況下會建立一個新的Worker來替換被銷燬的Worker。

getTaskprocessWorkerExit方法源代碼感興趣的讀者能夠閱讀下一節來具體瞭解一下,不過跳過這一節也是徹底能夠的。

getTask與processWorkerExit方法源代碼

如下是getTaskprocessWorkerExit兩個方法的帶有中文解釋的源代碼:

private Runnable getTask() {
    // 經過timeOut變量表示線程是否空閒時間超時了
    boolean timedOut = false;

    // 無限循環
    for (;;) {
        // 獲取線程池狀態
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 若是 線程池狀態>=STOP
        //    或者 (線程池狀態==SHUTDOWN && 阻塞隊列爲空)
        // 則直接減小一個worker計數並返回null(返回null會致使當前worker被銷燬)
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        // 獲取線程池中的worker計數
        int wc = workerCountOf(c);

        // 判斷當前線程是否會被超時銷燬
        // 會被超時銷燬的狀況:線程池容許核心線程超時 或 當前線程數大於核心線程數
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 若是 (當前線程數大於最大線程數 或 (容許超時銷燬 且 當前發生了空閒時間超時))
        //   且 (當前線程數大於1 或 阻塞隊列爲空) —— 該條件在阻塞隊列不爲空的狀況下保證至少會保留一個線程繼續處理任務
        // 則 減小worker計數並返回null(返回null會致使當前worker被銷燬)
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 從阻塞隊列中取出一個任務(若是隊列爲空會進入阻塞等待狀態)
            // 若是容許空閒超時銷燬線程的話則帶有一個等待的超時時間
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            // 若是獲取到了任務就直接返回該任務,返回後會開始執行該任務
            if (r != null)
                return r;
            // 若是任務爲null,則說明發生了等待超時,將空閒時間超時標誌設置爲true
            timedOut = true;
        } catch (InterruptedException retry) {
            // 若是等待被中斷了,那說明空閒時間(等待任務的時間)尚未超時
            timedOut = false;
        }
    }
}
複製代碼

processWorkerExit方法的源代碼:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 若是completedAbruptly爲true則表示任務執行過程當中拋出了未處理的異常
    // 因此尚未正確地減小worker計數,這裏須要減小一次worker計數
    if (completedAbruptly)
        decrementWorkerCount();

    // 獲取線程池的主鎖
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 把將被銷燬的線程已完成的任務數累計到線程池的完成任務總數上
        completedTaskCount += w.completedTasks;
        // 從worker集合中去掉將會銷燬的worker
        workers.remove(w);
    } finally {
        // 釋放線程池主鎖
        mainLock.unlock();
    }

    // 嘗試結束線程池
    // 這裏是爲了在關閉線程池時等到全部worker都被回收後再結束線程池
    tryTerminate();

    int c = ctl.get();
    // 若是線程池狀態 < STOP,即RUNNING或SHUTDOWN
    // 則須要考慮建立新線程來代替被銷燬的線程
    if (runStateLessThan(c, STOP)) {
        // 若是worker是正常執行完的,則要判斷一下是否已經知足了最小線程數要求
        // 不然直接建立替代線程
        if (!completedAbruptly) {
            // 若是容許核心線程超時則最小線程數是0,不然最小線程數等於核心線程數
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 若是阻塞隊列非空,則至少要有一個線程繼續執行剩下的任務
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            // 若是當前線程數已經知足最小線程數要求
            // 那麼就不建立替代線程了
            if (workerCountOf(c) >= min)
                return;
        }

        // 從新建立一個worker來代替被銷燬的線程
        addWorker(null, false);
    }
}
複製代碼

總結

到這裏咱們的線程池源代碼之旅就結束了,在這篇文章中咱們首先了解了線程池中的控制變量與狀態變換流程,以後咱們經過線程池的源代碼深刻解析了從提交任務到執行任務的全過程,相信經過這些知識咱們已經能夠在腦海中創建起一套完整的線程池運行模型了。若是你們有一些細節感受還不是特別清晰的話,建議不妨再返回到文章的開頭多讀幾遍,相信第二遍的閱讀能給你們帶來不同的體驗,由於我本身也是在第三次讀ThreadPoolExecutor類的源代碼時才真正打通了其中的一些重要關節的。

引子

在瀏覽ThreadPoolExexutor源碼的過程當中,有幾個點咱們其實並無徹底說清楚,好比對鎖的加鎖操做、對控制變量的屢次獲取、控制變量的AtomicInteger類型。在下一篇文章中,我將會介紹這些以鎖、volatile變量、CAS操做、AQS抽象類爲表明的一系列線程同步方法,歡迎感興趣的讀者繼續關注我後續發佈的文章~

相關文章
相關標籤/搜索