線程池看懂了也很簡單

理論知識

週末上海下起了雨也降溫了,無事打開電腦看看源碼,就想到了線程池。線程池的技術網絡上已經有不少文章都已經寫過了,並且理論都是同樣的。java

可是理論歸理論,面試的時候也許你恰好看了一篇能應付過去,可是若是深究細節可能就會懵逼。因此我很建議任何理論咱們都須要本身去探究一下才好,本身實踐過的纔有本身的理解而不是死記硬背,這樣纔會經久不忘。程序員

線程池屬於開發中常見的一種池化技術,這類的池化技術的目的都是爲了提升資源的利用率和提升效率,相似的HttpClient鏈接池,數據庫鏈接池等。面試

在沒有線程池的時候,咱們要建立多線程的併發,通常都是經過繼承 Thread 類或實現 Runnable 接口或者實現 Callable 接口,咱們知道線程資源是很寶貴的,並且線程之間切換執行時須要記住上下文信息,因此過多的建立線程去執行任務會形成資源的浪費並且對CPU影響較大。數據庫

爲了方便, JDK 1.5 以後爲咱們提供了幾種建立線程池的方法:緩存

  • Executors.newFixedThreadPool(nThreads):建立一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。
  • Executors.newCachedThreadPool():建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程,若無可回收,則新建線程。
  • Executors.newSingleThreadExecutor():建立一個單線程化的線程池,它只會用惟一的工做線程來執行任務, 保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行。
  • Executors.newScheduledThreadPool(nThreads):建立一個定長線程池,支持定時及週期性任務執行。

雖然這些都是 JDK 默認提供的,可是仍是要說它們的定製性太差了並且有點雞肋,不少時候不能知足咱們的需求。例如經過 newFixedThreadPool 方式建立的固定線程池,它內部使用的隊列是 LinkedBlockingQueue,可是它的隊列大小默認是 Integer.MAX_VALUE,這會有什麼問題?安全

當核心線程滿了的時候,任務會進入隊列中等待,直到隊列滿了爲止。可是也許任務還未達到 Integer.MAX_VALUE 這個值的時候,內存就已經 OOM 了,由於內存放不下這麼多的任務,畢竟內存大小有限。markdown

因此更多的時候咱們都是自定義線程池,也就是使用 new ThreadPoolExecutor 的方式,其實你看源碼你能夠發現以上的4個線程池技術底層都是經過 ThreadPoolExecutor 來建立的,只不過它們本身爲咱們填充了這些參數的固定值而已。網絡

ThreadPoolExecutor 的構造函數以下所示:多線程

ThreadPoolExecutor(int corePoolSize,
                   int maximumPoolSize,
                   long keepAliveTime,
                   TimeUnit unit,
                   BlockingQueue<Runnable> workQueue,
                   ThreadFactory threadFactory,
                   RejectedExecutionHandler handler);
複製代碼

咱們來看下這幾個核心參數的涵義和做用:併發

  • corePoolSize: 爲線程池的核心線程基本大小。
  • maximumPoolSize: 爲線程池最大線程大小。
  • keepAliveTimeunit 則是線程空閒後的存活時間。
  • workQueue: 用於存聽任務的阻塞隊列。
  • handler: 當隊列和最大線程池都滿了以後的飽和策略。

經過這些參數的配置使得整個線程池的工做流程以下:

前幾年通常普通的技術面試瞭解了以上的知識內容也差很少就夠了,可是目前的大環境的影響或者面試更高級的開發上面的知識點是經不起深度考問的。例如如下幾個問題你是否瞭解:線程池的內部有哪些狀態?是如何判斷核心線程數是否已滿的?最大線程數是否包含核心線程數?當線程池中的線程數恰好達到 maximumPoolSize 這個值的時候,這個任務可否正常被執行?......,想要了解這些問題的答案咱們只能在線程池的源碼中尋找了。

實戰模擬測試

咱們自定義一個線程池,而後經過 for 循環連續建立10個任務並打印線程執行信息,總體代碼以下所示:

public static void main(String[] args) {

    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 6, 5L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(4));
    
    for (int i = 0; i < 10; i++) {
        threadPoolExecutor.execute(() -> {
             System.out.println("測試線程池:" + Thread.currentThread().getName() + "," + threadPoolExecutor.toString());
        });
    }
}
複製代碼

當 corePoolSize = 3,maximumPoolSize = 6,workQueue 大小爲4的時候,咱們的打印信息爲:

能夠發現總的建立了6個線程來執行完成了10個任務,其實很好理解,c=3個核心線程執行了3個任務,而後4個任務在隊列中等待覈心線程執行,最後額外建立了e=3個線程執行了剩下的3個任務,總建立的線程數就是 c + e = 6 <= 6(最大線程數)。

若是咱們調整對象建立的時候的構造函數參數,例如

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 5, 5L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(2));
複製代碼

咱們再次執行上述的代碼,則會報錯,拋出以下 RejectedExecutionException 異常信息,能夠看到是由於拒絕策略攔截的異常信息。

仍是按照上面的邏輯分析,這時核心線程數是 c = 3,而阻塞隊列的大小是 2,所以核心線程會處理掉其中5個任務,而剩下的5個任務會額外建立 e=5個線程去執行,那麼總線程數就是 c + e = 8,可是這時的最大線程數 maximumPoolSize = 5,所以超過了最大線程數的限制,這時就執行了默認的拒絕策略拋出異常。其實它在準備建立第6個線程的時候就已經報錯了,從這裏也能夠得知只要建立的總線程數 >= maximumPoolSize 的時候,線程池就不會繼續執行任務了而會去執行拒絕策略的邏輯

技術來源於生活

人們經常在生活中遇到一些困難的時候會進行頭腦風暴從而產生一些意想不到的解決方案,這些都是思想和智慧的結晶。咱們不少技術的解決方案也都來源於生活。

我常常想若是之後不作程序員應該作什麼?餐飲彷佛是最大衆的了,畢竟民以食爲天。

開餐館前期確定不能作太大,一是本金的問題,還有就是須要市場試水。在市場需求不明確的狀況下租個小店面仍是靠譜的,就算虧也不會太多。

店面租個幾十平的,就作香辣烤魚,餐桌大概15桌的樣子。而後就是員工了,除了廚師主要是服務員了,可是我不能招15個服務員啊,每桌分配一個太浪費了,須要提升資源利用率控制成本,因此員工不能招太多,我只須要招5個固定服務員負責在大廳招呼顧客和傳菜就能夠了,每一個人負責3個餐桌。

可是我沒想到咱們餐館作的烤魚很合大衆口味,很受歡迎又加上營銷效果好,成了一家網紅餐館。生意更是蒸蒸日上,天天座無虛席。可是空間有限啊,因此咱們只能讓後來無座的顧客稍微等候了,因而咱們安排了一個取號排隊等候區,顧客等待叫號有序就餐。

這時候餐館的人員不變,仍然是5個服務員負責處理大廳的主要服務工做,同時排隊等候區面積也不能過大,有個範圍限制,不能影響咱們的正常人員活動,同時也不能超過餐館的範圍排到餐館外,若是顧客排隊站到門外馬路上了,這是就很危險的。隨着口碑的發酵,一傳十,十傳百,咱們的顧客絡繹不絕,同時咱們爲了提升消費率又作起了外賣的服務,能夠打包外帶。

爲了不發生上述這種危險的狀況和提升訂單處理率,咱們只能額外請一些臨時工了,讓他們來幫忙處理咱們的外賣訂單從而提升業務處理能力。

可是也不是請的越多越好,咱們有成本控制,由於請的臨時工咱們也須要付工資。那怎麼辦呢?最終只能忍痛了啊,對於超出咱們處理能力的訂單,咱們就採起必定的拒絕策略,例如告知顧客當天的份額已經售罄,請改天再來。

以上就是咱們線程池運行的一個現實生活中的例子,核心線程就是咱們的5個固定服務員,而排隊等候區就是咱們的等待隊列,隊列不能設爲無限大,由於會形成OOM,若是隊列滿了線程池會另起額外線程去處理任務,也就是上述例子中的臨時工,餐館有經營成本控制因此有員工上限,不能請過多的臨時工,這就是最大線程數。若是臨時工達到最大數且隊列也滿了,那麼咱們只能經過拒絕策略暫時不接受額外的服務要求了。

一塊兒看源碼

口說無憑,理論都是這樣說的,那實際上源碼是否是真是這樣寫的呢?咱們一塊兒來看下線程池的源碼。經過 threadPoolExecutor.execute(...)的入口進入源碼,刪除了註釋信息以後的源碼內容以下,因爲封裝的好,因此只有短短几行。

public void execute(Runnable command) {
    // #1 任務非空校驗
    if (command == null)
        throw new NullPointerException();

    // #2 添加核心線程執行任務
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    // #3 任務入隊列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        //二次校驗
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    
    // #4 添加普通線程執行任務,若是失敗則執行拒絕策略
    else if (!addWorker(command, false))
        reject(command);
}
複製代碼

若是不關注細節只關注總體,從以上源碼中咱們能夠發現其中主要分爲了四個步驟來處理邏輯。排除第一步的非空校驗代碼,咱們能夠看出剩下的三步其實就是咱們線程池的運行邏輯,也就是上面的運行流程圖的邏輯內容。

  • (1) 任務的非空校驗。
  • (2) 獲取當前RUNNING的線程數,若是小於核心線程數,則建立核心線程去執行任務,不然走#3。
  • (3) 若是當前線程池處於RUNNING狀態,那麼就將任務放入隊列中。這時還會再作個雙重校驗,由於可能存在有些線程在咱們上次檢查後死了,或者從咱們進入這個方法後pool被關閉了,因此咱們須要再次檢查state。若是線程池中止了就須要回滾剛纔的添加任務到隊列中的操做並經過拒絕策略拒絕該任務,或者若是池中沒有線程了,則新開啓一個線程執行任務。
  • (4) 若是隊列滿了以後沒法在將任務加入隊列,則建立新的線程去執行任務,若是也失敗了,那麼就多是線程池關閉了或者線程池飽和了,這時執行拒絕策略再也不接受任務。

雙重校驗中有如下兩個點須要注意:

1. 爲何須要 double check 線程池的狀態?

在多線程環境下,線程池的狀態時刻在變化,而 ctl.get() 是非原子操做,頗有可能剛獲取了線程池狀態後線程池狀態就改變了。判斷是否將 command 加入 workque 是線程池以前的狀態。假若沒有 double check,萬一線程池處於非 running 狀態(在多線程環境下頗有可能發生),那麼 command 永遠不會執行。

二、爲何 addWorker(null, false) 的任務爲null?

addWorker(null, false),這個方法執行時只是建立了一個新的線程,可是沒有傳入任務,這是由於前面已經將任務添加到隊列中了,這樣能夠防止線程池處於 running 狀態,可是沒有線程去處理這個任務。

而根據以上代碼的具體步驟咱們能夠畫出詳細的執行流程,以下圖所示

以上的源碼其實只有10幾行,看起來很簡單,主要是它的封裝性比較好,其中主要有兩個點須要重點解釋,分別是:線程池的狀態addWorker()添加工做的方法,這兩個點弄明白了這段線程池的源碼差很少也就理解了。

線程池運行狀態-runState

線程有狀態,線程池也有它的運行狀態,這些狀態提供了主生命週期控制,伴隨着線程池的運行,由內部來維護,從源碼中咱們能夠發現線程池共有5個狀態:RUNNINGSHUTDOWNSTOPTIDYINGTERMINATED

各狀態值所表明的的含義和該狀態值下可執行的操做,具體信息以下:

運行狀態 狀態描述
RUNNING 接收新任務,而且也能處理阻塞隊列中的任務。
SHUTDOWN 不接收新任務,可是卻能夠繼續處理阻塞隊列中的任務。
STOP 不接收新任務,同時也不處理隊列任務,而且中斷正在進行的任務。
TIDYING 全部任務都已終止,workercount(有效線程數)爲0,線程轉向 TIDYING 狀態將會運行 terminated() 鉤子方法。
TERMINATED terminated() 方法調用完成後變成此狀態。

生命週期狀態流轉以下圖所示:

不少時候咱們表示狀態都是經過簡單的 int 值來表示,例如數據庫數據的刪除標誌 delete_flag 其中0表示有效,1表示刪除。而在線程池的源碼裏咱們能夠看到它是經過以下方式來進行表示的,

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
複製代碼

線程池內部使用一個變量維護兩個值:運行狀態(runState)和線程數量 (workerCount)何作到的呢?將十進制 int 值轉換爲二進制的值,共32位,其中高3位表明運行狀態(runState ),而低29位表明工做線程數(workerCount)。

關於內部封裝的獲取生命週期狀態、獲取線程池線程數量的計算方法如如下代碼所示:

//獲取線程池狀態
private static int runStateOf(int c) { return c & ~CAPACITY; }
//獲取線程數量
private static int workerCountOf(int c) { return c & CAPACITY; }
// Packing and unpacking ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
複製代碼

經過巧妙的位運算能夠分別獲取高3位的運行狀態值低29位的線程數量值,若是感興趣的能夠去看下具體的實現代碼,這裏就再也不贅述了。

添加工做線程-addWorker

添加線程是經過 addWorker() 方法來實現的,這個方法有兩個入參,Runnable firstTaskboolean core

private boolean addWorker(Runnable firstTask, boolean core){...}
複製代碼
  • Runnable firstTask 便是當前添加的線程須要執行的首個任務.
  • boolean core 用來標記當前執行的線程是不是核心線程仍是普通線程.

返回前面的線程池的 execute() 方法的代碼中,能夠發現這個addWorker() 有三個地方在調用,分別在 #2,#3和#4。

  • #2:當工做線程數 < 核心線程數的時候,經過addWorker(command, true)添加核心線程執行command任務。
  • #3:double check的時候,若是發現線程池處於正常運行狀態可是裏面沒有工做線程,則添加個空任務和一個普通線程,這樣一個 task 爲空的 worker 在線程執行的時候會去阻塞任務隊列裏拿任務,這樣就至關於建立了一個新的線程,只是沒有立刻分配任務。
  • #4:隊列已滿的狀況下,經過添加普通線程(非核心線程)去執行當前任務,若是失敗了則執行拒絕策略。

addWorker() 方法調用的地方咱們看完了,接下來咱們一塊兒來看下它裏面究竟作了些什麼,源碼以下:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
複製代碼

這個方法稍微有點長,咱們分段來看下,將上面的代碼咱們拆分紅兩個部分來看,首先看第一部分:

retry:
for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);//獲取線程池的狀態

    // Check if queue empty only if necessary.
    if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null &&
           ! workQueue.isEmpty()))
        return false;

    for (;;) {
        int wc = workerCountOf(c);
        if (wc >= CAPACITY ||
            wc >= (core ? corePoolSize : maximumPoolSize))
            return false;
	    // 嘗試經過CAS方式增長workerCount
        if (compareAndIncrementWorkerCount(c))
            break retry;
        c = ctl.get();  // Re-read ctl
        // 若是線程池狀態發生變化,從新從最外層循環
        if (runStateOf(c) != rs)
            continue retry;
        // else CAS failed due to workerCount change; retry inner loop
    }
}
複製代碼

這部分代碼有兩層嵌套的 for 死循環,在第一行有個retry:代碼,這個也許有些同窗沒怎麼見過,這個是至關因而一個位置標記,retry後面跟循環,標記這個循環的位置。

咱們平時寫 for 循環的時候,是經過continue;break;來跳出當前循環,可是若是咱們有多重嵌套的 for 循環,若是咱們想在裏層的某個循環體中當達到某個條件的時候直接跳出全部循環或跳出到某個指定的位置,則使用retry:來標記這個位置就能夠了。

代碼中共有4個位置有改變循環體繼續執行下去,分別是兩個return false;,一個break retry;和一個continue retry;

首先咱們來看下第一個return false;,這個 return 在最外層的一個 for 循環,

if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
   return false;
複製代碼

這是一個判斷線程池狀態和線程隊列狀況的代碼,這個邏輯判斷有點繞能夠改爲

rs >= shutdown && (rs != shutdown || firstTask != null || workQueue.isEmpty())
複製代碼

這樣就好理解了,邏輯判斷成立能夠分爲如下幾種狀況直接返回 false,表示添加工做線程失敗。

  • rs > shutdown:線程池狀態處於 STOPTIDYINGTERMINATED時,添加工做線程失敗,不接受新任務。
  • rs >= shutdown && firstTask != null:線程池狀態處於 SHUTDOWNSTOPTIDYINGTERMINATED狀態且worker的首個任務不爲空時,添加工做線程失敗,不接受新任務。
  • rs >= shutdown && workQueue.isEmppty:線程池狀態處於 SHUTDOWNSTOPTIDYINGTERMINATED狀態且阻塞隊列爲空時,添加工做線程失敗,不接受新任務。

這樣看來,最外層的 for 循環是不斷的校驗當前的線程池狀態是否能接受新任務,若是校驗經過了以後才能繼續往下運行。

而後接下來看第二個return false;,這個 return 是在內層的第二個 for 循環中,是判斷線程池中當前的工做線程數量的,不知足條件的話直接返回 false,表示添加工做線程失敗。

  • 工做線程數量是否超過可表示的最大容量(CAPACITY).
  • 若是添加核心工做線程,是否超過最大核心線程容量(corePoolSize).
  • 若是添加普通工做線程,是否超過線程池最大線程容量(maximumPoolSize).

後面的break retry; ,表示若是嘗試經過CAS方式增長工做線程數workerCount成功,則跳出這個雙循環,往下執行後面第二部分的代碼,而continue retry;是再次校驗下線程池狀態是否發生變化,若是發生了變化則從新從最外層 for 開始繼續循環執行。

經過第一部分代碼的解析,咱們發現只有break retry;的時候才能執行到後面第二部分的代碼,然後面第二部分代碼作了些什麼呢?

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    //建立Worker對象實例
    w = new Worker(firstTask);
    //獲取Worker對象裏的線程
    final Thread t = w.thread;
    if (t != null) {
        //開啓可重入鎖,獨佔
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Recheck while holding lock.
            // Back out on ThreadFactory failure or if
            // shut down before lock acquired.
            //獲取線程池運行狀態
            int rs = runStateOf(ctl.get());

            //知足 rs < SHUTDOWN 判斷線程池是不是RUNNING,或者
            //rs == SHUTDOWN && firstTask == null 線程池若是是SHUTDOWN,
            //且首個任務firstTask爲空,
            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();
                //將Worker實例加入線程池workers
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                //線程添加成功標誌位 -> true
                workerAdded = true;
            }
        } finally {
            //釋放鎖
            mainLock.unlock();
        }
        //若是worker實例加入線程池成功,則啓動線程,同時修改線程啓動成功標誌位 -> true
        if (workerAdded) {
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
        //添加線程失敗
        addWorkerFailed(w);
}
return workerStarted;
複製代碼

這部分代碼主要的目的其實就是啓動一個線程,前面是一堆的條件判斷,看是否可以啓動一個工做線程。它由兩個try...catch...finally內容組成,能夠將他們拆開來看,這樣就很容易看懂。

咱們先看裏面一層的try...catch...finally,當Worker實例中的 Thread 線程不爲空的時候,開啓一個獨佔鎖ReentrantLock mainLock,防止其餘線程也來修改操做。

try {
   //獲取線程池運行狀態
   int rs = runStateOf(ctl.get());

   if (rs < SHUTDOWN ||
       (rs == SHUTDOWN && firstTask == null)) {
       if (t.isAlive()) // precheck that t is startable
           throw new IllegalThreadStateException();
       workers.add(w);
       int s = workers.size();
       if (s > largestPoolSize)
           largestPoolSize = s;
       workerAdded = true;
   }
} finally {
   mainLock.unlock();
}
複製代碼
  • 首先檢查線程池的狀態,當線程池處於 RUNNING 狀態或者線程池處於 SHUTDOWN 狀態可是當前線程的 firstTask 爲空,知足以上條件時才能將 worker 實例添加進線程池,即workers.add(w);
  • 同時修改 largestPoolSize,largestPoolSize變量用於記錄出現過的最大線程數。
  • 將標誌位 workerAdded 設置爲 true,表示添加工做線程成功。
  • 不管成功與否,在 finally 中都必須執行 mainLock.unlock()來釋放鎖。

外面一層的try...catch...finally主要是爲了判斷工做線程是否啓動成功,若是內層try...catch...finally代碼執行成功,即 worker 添加進線程池成功,workerAdded 標誌位置爲true,則啓動 worker 中的線程 t.start(),同時將標誌位 workerStarted 置爲 true,表示線程啓動成功。

if (workerAdded) {
    t.start();
    workerStarted = true;
}
複製代碼

若是失敗了,即 workerStarted == false,則在 finally 裏面必須執行addWorkerFailed(w)方法,這個方法至關因而用來回滾操做的,前面增的這裏移除,前面加的這裏減去。

private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            //從線程池中移除worker實例
            workers.remove(w);
        //經過CAS,將工做線程數量workerCount減1
        decrementWorkerCount();
        //
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}
複製代碼

Worker類

上面咱們分析了addWorker 方法的源碼,而且看到了 Thread t = w.threadworkers.add(w)t.start()等代碼,知道了線程池的運行狀態和添加工做線程的流程,那麼咱們還有一些疑問:

  • 這裏的 Worker 是什麼?和 Thread 有什麼區別?
  • 線程啓動後是如何拿任務?在哪拿任務去執行的?
  • 阻塞隊列滿後,額外新建立的線程是去隊列裏拿任務的嗎?若是不是那它是去哪拿的?
  • 核心線程會一直存在於線程池中嗎?額外建立的普通線程執行完任務後會銷燬嗎?

Worker 是 ThreadPoolExecutor的一個內部類,主要是用來維護線程執行任務的中斷控制狀態,它實現了Runnable 接口同時繼承了AQS,實現 Runnable 接口意味着 Worker 就是一個線程,繼承 AQS 是爲了實現獨佔鎖這個功能。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
        /** Thread this worker is running in. Null if factory fails. */
        final Thread thread;
        /** Initial task to run. Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
        
        //構造函數,初始化AQS的state值爲-1
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
}
複製代碼

至於爲何沒有使用可重入鎖 ReentrantLock,而是使用AQS,爲的就是實現不可重入的特性去反應線程如今的執行狀態。

  1. lock方法一旦獲取了獨佔鎖,表示當前線程正在執行任務中。
  2. 若是正在執行任務,則不該該中斷線程。
  3. 若是該線程如今不是獨佔鎖的狀態,也就是空閒的狀態,說明它沒有在處理任務,這時能夠對該線程進行中斷。
  4. 線程池在執行 shutdown 方法或 tryTerminate 方法時會調用 interruptIdleWorkers 方法來中斷空閒的線程,interruptIdleWorkers 方法會使用 tryLock 方法來判斷線程池中的線程是不是空閒狀態;若是線程是空閒狀態則能夠安全回收。

Worker 類有一個構造方法,構造參數爲給定的首個任務 firstTask,並持有一個線程thread。thread是在調用構造方法時經過 ThreadFactory 來建立的線程,能夠用來執行任務;

firstTask用它來初始化時傳入的第一個任務,這個任務能夠有也能夠爲null。若是這個值是非空的,那麼線程就會在啓動初期當即執行這個任務;若是這個值是null,那麼就須要建立一個線程去執行阻塞隊列中的任務,也就是非核心線程的建立。

任務運行-runWorker

上面咱們一塊兒看過線程的啓動t.start(),具體運行是在 Worker 的 run() 方法中

public void run() {
    runWorker(this);
}
複製代碼

run() 方法中又調用了runWorker() 方法,全部的實現都在這裏

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted. This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}
複製代碼

不少人看到這樣的代碼就感受頭痛,其實你細看,這裏面咱們能夠看關鍵點,裏面有三塊try...catch...finally代碼,咱們將這三塊分別單獨拎出來看而且將拋異常的地方暫時刪掉或註釋掉,這樣它看起來就清爽了不少

Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//因爲Worker初始化時AQS中state設置爲-1,這裏要先作一次解鎖把state更新爲0,容許線程中斷
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
    // 循環的判斷任務(firstTask或從隊列中獲取的task)是否爲空
    while (task != null || (task = getTask()) != null) {
        // Worker加鎖,本質是AQS獲取資源而且嘗試CAS更新state由0更變爲1
        w.lock();
        // 若是線程池運行狀態是stopping, 確保線程是中斷狀態;
        // 若是不是stopping, 確保線程是非中斷狀態. 
        if ((runStateAtLeast(ctl.get(), STOP) ||
             (Thread.interrupted() &&
              runStateAtLeast(ctl.get(), STOP))) &&
            !wt.isInterrupted())
            wt.interrupt();
            
            //此處省略了第二個try...catch...finally
    }
    // 走到這裏說明某一次getTask()返回爲null,線程正常退出
    completedAbruptly = false;
} finally {
    //處理線程退出
    processWorkerExit(w, completedAbruptly);
}
複製代碼

第二個try...catch...finally

try {
   beforeExecute(wt, task);
   Throwable thrown = null;
    
    //此處省略了第三個try...catch...finally
    
} finally {
    task = null;
    w.completedTasks++;
    w.unlock();
}
複製代碼

第三個try...catch...finally

try {
    // 運行任務
    task.run();
} catch (RuntimeException x) {
    thrown = x; throw x;
} catch (Error x) {
    thrown = x; throw x;
} catch (Throwable x) {
    thrown = x; throw new Error(x);
} finally {
    afterExecute(task, thrown);
}
複製代碼

上面的代碼中能夠看到有beforeExecuteafterExecuteterminaerd三個函數,它們都是鉤子函數,能夠分別在子類中重寫它們用來擴展ThreadPoolExecutor,例如添加日誌、計時、監視或者統計信息收集的功能。

  • beforeExecute():線程執行以前調用
  • afterExecute():線程執行以後調用
  • terminaerd():線程池退出時候調用

這樣拆分完以後發現,其實主要注意兩個點就好了,分別是getTask()task.run()task.run()就是運行任務,那咱們繼續來看下getTask()是如何獲取任務的。

獲取任務-getTask

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        //1.線程池狀態是STOP,TIDYING,TERMINATED
        //2.線程池shutdown而且隊列是空的.
        //知足以上兩個條件之一則工做線程數wc減去1,而後直接返回null
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        //容許核心工做線程對象銷燬淘汰或者工做線程數 > 最大核心線程數corePoolSize
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        //1.工做線程數 > 最大線程數maximumPoolSize 或者timed == true && timedOut == true
        //2.工做線程數 > 1 或者隊列爲空 
        //同時知足以上兩個條件則經過CAS把線程數減去1,同時返回null。CAS把線程數減去1失敗會進入下一輪循環作重試
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            /// 若是timed爲true,經過poll()方法作超時拉取,keepAliveTime時間內沒有等待到有效的任務,則返回null
            // 若是timed爲false,經過take()作阻塞拉取,會阻塞到有下一個有效的任務時候再返回(通常不會是null)
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
複製代碼

裏面有個關鍵字allowCoreThreadTimeOut,它的默認值爲false,在Java1.6開始你能夠經過threadPoolExecutor.allowCoreThreadTimeOut(true)方式來設置爲true,經過字面意思就能夠明白這個字段的做用是什麼了,便是否容許核心線程超時銷燬。

默認的狀況下核心線程數量會一直保持,即便這些線程是空閒的它也是會一直存在的,而當設置爲 true 時,線程池中 corePoolSize 線程空閒時間達到 keepAliveTime 也將銷燬關閉。

結尾

經過整片分析下來,線程池裏面有不少細節處須要注意,閱讀完源碼以後也理解了更多,解開了不少困惑,獲取到了更多的知識點,因此源碼的閱讀是很重要的。

相關文章
相關標籤/搜索