面試官:來!聊聊線程池的實現原理以及使用時的問題

掃描下方二維碼或者微信搜索公衆號菜鳥飛呀飛,便可關注微信公衆號,閱讀更多Spring源碼分析Java併發編程文章。java

微信公衆號

前言

  不管是在工做中,仍是在書本中,咱們均可以聽到或者看到關於線程在使用時的一些建議:不要在代碼中本身直接建立線程,而是經過線程池的方式來使用線程。使用線程池的理由大體能夠總結爲如下幾點。面試

  • 1. 下降資源消耗。線程是操做系統十分寶貴的資源,當多我的同時開發一個項目時,在互不知情的狀況下,都本身在代碼中建立了線程,這樣就會致使線程數過多,並且線程的建立和銷燬,在操做系統層面,須要由用戶態切換到內核態,這是一個費時費力的過程。而使用線程池能夠避免頻繁的建立線程和銷燬線程,線程池中線程能夠重複使用。
  • 2. 提升響應速度。當請求到達時,因爲線程池中的線程已經建立好了,使用線程池,能夠省去線程建立的這段時間。
  • 3. 提升線程的可管理性。線程是稀缺資源,當建立過多的線程時,會形成系統性能的降低,而使用線程池,能夠對線程進行統一分配、調優和監控。     線程池的使用十分簡單,可是會用不表明用得好。在面試中,基本不會問線程池應該怎麼用,而是問線程池在使用不當時會形成哪些問題,實際上就是考察線程池的實現原理。所以搞明白線程池的實現原理是頗有必要的一件事,不只僅對面試會有幫助,也會讓咱們在平時工做中避過好多坑。

實現原理

  在線程池中存在幾個概念:核心線程數、最大線程數、任務隊列。核心線程數指的是線程池的基本大小;最大線程數指的是,同一時刻線程池中線程的數量最大不能超過該值;任務隊列是當任務較多時,線程池中線程的數量已經達到了核心線程數,這時候就是用任務隊列來存儲咱們提交的任務。   與其餘池化技術不一樣的是,線程池是基於生產者-消費者模式來實現的,任務的提交方是生產者,線程池是消費者。當咱們須要執行某個任務時,只須要把任務扔到線程池中便可。線程池中執行任務的流程以下圖以下。編程

線程池原理

  • 1. 先判斷線程池中線程的數量是否超過核心線程數,若是沒有超過核心線程數,就建立新的線程去執行任務;若是超過了核心線程數,就進入到下面流程。
  • 2. 判斷任務隊列是否已經滿了,若是沒有滿,就將任務添加到任務隊列中;若是已經滿了,就進入到下面的流程。
  • 3. 再判斷若是建立一個線程後,線程數是否會超過最大線程數,若是不會超過最大線程數,就建立一個新的線程來執行任務;若是會,則進入到下面的流程。
  • 4. 執行拒絕策略。

  在沒看線程池的具體實現以前,我一直存在這樣的疑惑:爲何是先判斷任務隊列有沒有滿,再判斷有沒有超過最大線程數?正常邏輯不是應該先儘量的建立線程,讓線程去處理任務嗎?當任務實在是太多了,線程處理不過來了,再將任務添加到任務隊列嗎?知道我看了線程池的具體代碼實現後,我才知道答案。問題答案在文末。數組

ThreadPoolExecutor

  在JUC包下,已經提供了線程池的具體的實現:ThreadPoolExecutor。ThreadPoolExecutor提供了不少的構造方法,其中最複雜的構造方法有7個參數。安全

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
}
複製代碼
  • corePoolSize:該參數表示的是線程池的核心線程數。當任務提交到線程池時,若是線程池的線程數量尚未達到corePoolSize,那麼就會新建立的一個線程來執行任務,若是達到了,就將任務添加到任務隊列中。微信

  • maximumPoolSize:該參數表示的是線程池中容許存在的最大線程數量。當任務隊列滿了之後,再有新的任務進入到線程池時,會判斷再新建一個線程是否會超過maximumPoolSize,若是會超過,則不建立線程,而是執行拒絕策略。若是不會超過maximumPoolSize,則會建立新的線程來執行任務。併發

  • keepAliveTime:當線程池中的線程數量大於corePoolSize時,那麼大於corePoolSize這部分的線程,若是沒有任務去處理,那麼就表示它們是空閒的,這個時候是不容許它們一直存在的,而是容許它們最多空閒一段時間,這段時間就是keepAliveTime,時間的單位就是TimeUnit。工具

  • unit:空閒線程容許存活時間的單位,TimeUnit是一個枚舉值,它能夠是納秒、微妙、毫秒、秒、分、小時、天。oop

  • workQueue:任務隊列,用來存聽任務。該隊列的類型是阻塞隊列,經常使用的阻塞隊列有ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、PriorityBlockingQueue等。   ArrayBlockingQueue是一個基於數組實現的阻塞隊列,元素按照先進先出(FIFO)的順序入隊、出隊。由於底層實現是數組,數組在初始化時必須指定大小,所以ArrayBlockingQueue是有界隊列。   LinkedBlockingQueue是一個基於鏈表實現的阻塞隊列,元素按照先進先出(FIFO)的順序入隊、出隊。由於頂層是鏈表,鏈表是基於節點之間的指針指向來維持先後關係的,若是不指鏈表的大小,它默認的大小是Integer.MAX_VALUE,即2^{32}-1,這個數值太大了,所以一般稱LinkedBlockingQueue是一個無界隊列。固然若是在初始化的時候,就指定鏈表大小,那麼它就是有界隊列了。   SynchronousQueue是一個不存儲元素的阻塞隊列。每一個插入操做必須得等到另外一個線程調用了移除操做後,該線程纔會返回,不然將一直阻塞。吞吐量一般要高於LinkedBlockingQueue。   PriorityBlockingQueue是一個將元素按照優先級排序的阻塞的阻塞隊列,元素的優先級越高,將會越先出隊列。這是一個無界隊列。源碼分析

  • threadFactory:線程池工廠,用來建立線程。一般在實際項目中,爲了便於後期排查問題,在建立線程時須要爲線程賦予必定的名稱,經過線程池工廠,能夠方便的爲每個建立的線程設置具備業務含義的名稱。

  • handler:拒絕策略。當任務隊列已滿,線程數量達到maximumPoolSize後,線程池就不會再接收新的任務了,這個時候就須要使用拒絕策略來決定最終是怎麼處理這個任務。默認狀況下使用AbortPolicy,表示沒法處理新任務,直接拋出異常。在ThreadPoolExecutor類中定義了四個內部類,分別表示四種拒絕策略。咱們也能夠經過實現RejectExecutionHandler接口來實現自定義的拒絕策略。

AbortPocily:再也不接收新任務,直接拋出異常。 CallerRunsPolicy:提交任務的線程本身處理。 DiscardPolicy:不處理,直接丟棄。 DiscardOldestPolicy:丟棄任務隊列中排在最前面的任務,並執行當前任務。(排在隊列最前面的任務並不必定是在隊列中待的時間最長的任務,由於有多是按照優先級排序的隊列)

  在使用ThreadPoolExecutor時,可使用execute(Runnable task)方法向線程池中提交一個任務,沒有返回值。也可使用submit()方法向線程池中添加任務,有返回值,返回值對象是Future。submit()方法有三個重載的方法。

  • submit(Runnable task),該方法雖然返回值對象是Future,可是使用Future.get()獲取結果是null。
  • submit(Runnable task,T result),方法的返回值對象是Future,經過Future.get()獲取具體的返回值時,結果與方法的第二個參數result相等。
  • submit(Callable task),該方法的參數是一個Callable類型的對象,方法有返回值。
  • 關於Future的原理有興趣的朋友能夠本身先了解下,後面一篇文章會專門介紹。

源碼分析

  瞭解了ThreadPoolExecutor的基本用法,下面將結合源碼來分析下線程池的代碼實現。從上面的線程池的原理中,咱們能夠發現,線程池的原理相對比較簡單,代碼實現起來應該不難,看源碼主要是爲了學習他人寫的優秀代碼,尤爲是編程大師Doug Lea寫的代碼。     對於一個線程池,除了上面介紹的幾個重要屬性之外,咱們還須要一個變量來表示線程池狀態,線程池也須要有運行中、關閉中、已關閉等狀態。若是要咱們去實現一個線程池,可能第一反應就是用一個單獨的變量來表示線程池的狀態,再用另外一個變量來表示線程池中線程的數量。這樣的確能夠,不過Doug Lea並非這樣實現的,他將二者用一個變量來表示(不得不感嘆下,大佬就是大佬,想法果真和他人不同)。那麼問題來了,如何用一個變量來表示兩個值?閱讀過讀寫鎖源碼的朋友可能就能立想到另外這一中方案(關於讀寫鎖的介紹能夠閱讀這一篇文章: 讀寫鎖ReadWriteLock的實現原理)。在讀寫鎖的實現中將一個int型的數值,按照高低位來拆分,高位表示一個數,低位再表示另外一個數,在線程池的實現中,Doug Lea再次使用了按位拆分的技巧。     在線程池中,使用了一個原子類AtomicInteger的變量來表示線程池狀態和線程數量,該變量在內存中會佔用4個字節,也就是32bit,其中高3位用來表示線程池的狀態,低29位用來表示線程的數量。線程池的狀態一共有5中狀態,用3bit最多能夠表示8種狀態,所以採用高3位來表示線程池的狀態徹底能知足需求。示意圖以下。

線程池狀態設計

  在看線程池的核心實現邏輯以前,先簡單看下ThreadPoolExecutor中關於各類變量和方法的定義。由於在覈心邏輯中,常常會用到它們,並且這些方法和變量大量用到了位運算,看起來不是特別直觀,因此提早熟悉它們的功能對於看核心邏輯有很大的幫助。相關源碼和註釋以下。

// 高2位表示線程池的狀態,其餘29位表示線程的數量,即worker的數量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 計數的位數,29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 線程池的最大大小,2^29 -1,
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
// 線程池的狀態,高三位表示線程的運行狀態,
// RUNNING: 111
private static final int RUNNING    = -1 << COUNT_BITS;
// SHUTDOWN: 000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// STOP: 001
private static final int STOP       =  1 << COUNT_BITS;
// TIFYING(整理): 010
private static final int TIDYING    =  2 << COUNT_BITS;
// TERMINATED: 011
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
// 計算線程池的狀態,計算結果的低29位全爲0,所以最終結果就是線程池的狀態
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 工做線程的數量,計算結果的高3位全是0,所以最終結果就是工做線程的數量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 根據線程池的狀態和工做線程的數量,計算ctl,實際上就是將二者合併成ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }

/* * Bit field accessors that don't require unpacking ctl. * These depend on the bit layout and on workerCount being never negative. */

// 線程池的狀態是否小於s
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}

// 線程池的狀態大於等於s
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

// 判斷線程池是否處於運行狀態
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}
複製代碼

  有了前面的基礎,接下來分析下核心實現。再使用線程池時咱們能夠經過execute(Runnable task)來提交一個任務到線程池,所以咱們從核心入口execute(Runnable task)方法開始分析。execute()方法的源碼以下。在源碼中我添加了部分註釋,以供參考。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    
    int c = ctl.get();
    // workerCountOf(c)方法時計算工做線程的數量,
    // 1. 工做線程數是否小於核心線程數,若是小於核心線程數,就建立新的線程去執行任務
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        // 任務執行失敗後,從新獲取ctl的值,供下面的邏輯計算
        c = ctl.get();
    }
    // 2. 當工做線程數大於等於核心線程數時,線程池是運行狀態,且任務添加到隊列成功
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 若是線程池狀態不是RUNNING狀態,就執行拒絕策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 若是線程池中線程的數量爲0,就調用addWorker()方法建立新的worker線程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3. 線程池非運行狀態,或者任務入隊失敗,就嘗試建立新的worker線程
    else if (!addWorker(command, false))
        // 4. 若是建立新的worker線程失敗,就執行拒絕策略。
        reject(command);
}
複製代碼

  execute()方法的邏輯大體分爲4部分,分別對應線程池原理部分所提到的4個步驟。

  • 先經過workerCountOf(c)方法計算當前線程池中線程的數量,而後與初始化線程池時指定的核心線程數相比較,若是小於核心線程數,就調用addWorker()方法,實際上就是建立新的線程來執行任務。addWorker()方法的源碼後面分析。
  • 若是當前線程數大於等於核心線程數,那麼就經過workQueue.off(command)方法,將任務添加到任務隊列中。若是此時任務隊列尚未滿,那麼就會添加成功,workQueue.off(command)就會返回true,那麼就會進入到if邏輯塊中,進行一些其餘的判斷。若是此時任務隊列已經滿了,workQueue.off(command)方法就會返回false,那麼就會執行後面的3和4。
  • 當任務隊列滿了之後,就會再次調用addWorker()方法,在addworker()方法中會在建立新的線程以前,判斷線程數會不會超過最大線程數,若是會,addworker()就會返回false。若是不會,就會建立新的線程去執行任務。
  • 若是addWorker()返回true,表示不能再建立新的線程了,那麼此時就會執行到4,即調用reject()方法,執行拒絕策略。reject()方法的邏輯比較簡單,就是調用了咱們指定的handlerrejectedExecution()方法。其源碼以下。
final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}
複製代碼

  從上面的分析中,能夠看出,在好幾處邏輯中均調用了addWorker()方法,這說明該方法十分重要。事實也是如此,該方法不只重要,代碼實現還十分複雜。該方法須要兩個參數,第一個參數就是咱們傳入的Runnable任務,第二參數是一個boolean值,傳入true表示的是當前線程池中的線程數尚未達到核心線程數,傳false表示當前線程數已經大於等於核心線程數了。addWorker()方法的源碼很長,這裏我將其分爲兩個部分,下面先看前面一部分的源碼。

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 1. 當線程池的狀態大於SHUTDOWN時,返回false。由於線程池處於關閉狀態了,就不能再接受任務了
        // 2. 當線程池的狀態等於SHUTDOWN時,firstTask不爲空或者任務隊列爲空,返回false
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            // 1. 線程數大於等於理論上的最大數(2^29-1),則直接返回false。(由於線程數不能再增長了)
            // 2. 根據core來決定線程數應該和誰比較。當線程數大於核心線程數或者最大線程數時,直接返回false。
            // (由於當大於核心線程數時,表示此時任務應該直接添加到隊列中(若是隊列滿了,可能入隊失敗);當大於最大線程數時,確定不能再新建立線程了,否則設置最大線程數有毛用)
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 線程數+1,若是設置成功,就跳出外層循環
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 再次獲取線程池的狀態,並將其與前面獲取到的值比較,
            // 若是相同,說明線程池的狀態沒有發生變化,繼續在內循環中進行循環
            // 若是不相同,說明在這期間,線程池的狀態發生了變化,須要跳到外層循環,而後再從新進行循環
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    // 省略後半部分代碼
    // ......
}
複製代碼

    在這部分代碼中,咱們能夠看到一個很陌生的語法:retry...break retry...continue retry。這種寫法真的是太少見了,少見到我第一眼看到的時候,覺得是我不當心碰到鍵盤,把源碼給改了。這種語法有點相似於C語言裏面的goto(已經被遺棄),其做用就是在for循環以前定義一個retry,而後在循環中,使用break retry時,就會讓代碼跳出循環,並再也不進入循環;當使用continue retry時,表示跳出當前循環,立馬進入到下一次循環。因爲這裏使用了兩層for循環,所以爲了方便在內層循環中一會兒跳出到外層循環,就使用了retry這種語法。須要說明的是,這裏的retry並非Java裏面的關鍵字,而是隨機定義的一個字符串,咱們也能夠寫成a,b,c等,可是後面的break和continue後面的字符串須要和前面定義的這個字符串對應。   咱們能夠看到,前半部分的核心邏輯就是使用了兩個無限for和一個CAS操做來設置線程池的線程數量。若是線程池的線程數修改爲功,就中斷循環,進入後半部分代碼的邏輯,若是修改失敗,就利用for循環再一次進行修改,這樣的好處是,既實現了線程安全,也避免使用鎖,提升了效率。在這一部分代碼中,進行了不少判斷,這些判斷主要是校驗線程池的狀態以及線程數,我的認爲不是特別重要,咱們主要抓住核心邏輯便可。   當成功修改線程數量之後,就會執行addWorker()方法的後半部分代碼,其源碼以下。

private boolean addWorker(Runnable firstTask, boolean core) {
    // 省略前半部分代碼
    // ......
    
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 建立一個新的worker線程
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                // 再次獲取線程池的狀態,由於在獲取鎖期間,線程池的狀態可能改變了
                int rs = runStateOf(ctl.get());

                // 若是線程池狀態時運行狀態或者是關閉狀態可是firstTask是空,就將worker線程添加到線程池
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 判斷worker線程是否已經啓動了,若是已經啓動,就拋出異常
                    // 我的以爲這一步沒有任何意義,由於worker線程是剛new出來的,沒有在任何地方啓動
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 啓動線程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {

        // 若是啓動失敗,就將worker線程從線程池移除,並將線程數減1
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
複製代碼

  在這一部分代碼中,經過new Worker(firstTask)建立了一個Worker對象,Worker對象繼承了AQS,同時實現了Runnable接口,它是線程池中真正幹活的人。咱們提交到線程的任務,最終都是封裝成Worker對象,而後由Worker對象來完成任務。先簡單看下Woker的構造方法,其源碼以下。在構造方法中,首先設置了同步變量state爲-1,而後經過ThreadFactory建立了一個線程,注意在經過ThreadFactory建立線程時,將Worker自身也就是this,傳入了進去,也就是說最後建立出來的線程對象,它裏面的target屬性就是指向這個Worker對象

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{

    /** Thread this worker is running in. Null if factory fails. */
    final Thread thread;
    /** Initial task to run. Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 將this傳入進去,最後就是使線程的target屬性等於當前的worker對象
        this.thread = getThreadFactory().newThread(this);
    }
}
複製代碼

  當經過new Worker(firstTask)建立完worker對象後,此時線程已經被建立好了,在啓動線程以前,先經過t.isAlive()判斷線程是已經啓動,若是沒有啓動,纔會調用線程的start()方法來啓動線程。這裏有一點須要注意的是,建立完worker對象後,調用了mainLock.lock()來保證線程安全,由於這一步workers.add(w)存在併發的可能,因此須要經過獲取鎖來保證線程安全。   當調用線程的start()方法以後,若是線程獲取到CPU的執行權,那麼就會執行線程的run()方法,在線程的run()方法中,會執行線程中target屬性的run()方法。這裏線程的target屬性就是咱們建立的worker對象,所以最終會執行到Worker的run()方法。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{

    public void run() {
        runWorker(this);
    }
}
複製代碼

  在Worker類的run()中,直接調用了runWorker()方法。因此Worker執行任務的核心邏輯就是在runWorker()方法中實現的。其源碼以下。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 若是worker線程的firstTask不爲空或者能從任務隊列中獲取到任務,就執行
        // 不然就會一直阻塞到getTask()方法處
        while (task != null || (task = getTask()) != null) {
            // 保證worker串行的執行線程
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted. This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // beforeExecute()是空方法,由子類具體實現
                // 該方法的目的是爲了讓任務執行前作一些其餘操做
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 真正執行任務
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // afterExecute()空方法,由子類具體實現
                    // 該方法的目的是爲了讓任務執行後作一些其餘操做
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}
複製代碼

    runWorker()的源碼看起來也比較長,但核心邏輯就一行,即:task.run(),最終執行了咱們提交的Runnable任務。在runWorker()方法中經過一個while循環來讓Worker對象一直來執行任務。當傳入的task對象不爲空或者經過getTask()方法能從任務隊列中獲取到任務時,worker就會一直執行。不然將在finally語句塊中調用processWorkerExit退出,讓線程中斷,最終銷燬。   getTask()方法是一個阻塞的方法,當能從任務隊列中獲取到任務時,就會當即返回一個任務。若是獲取不到任務,就會阻塞。它支持超時,當超過線程池初始化時指定的線程最大存活時間後,就會返回null,從而致使worker線程退出while循環,最終線程銷燬。   到這兒線程池的execute()方法就分析完了。最後簡單分析下線程池的shutdown()方法和shutdownNow()方法。當調用shutdown()方法時,會令線程池的狀態爲SHUTDOWN,而後中斷空閒的線程,對於已經在執行任務的線程並不會中斷。當調用shutdownNow()方法時,會令線程池的狀態爲STOP,而後在中斷全部的線程,包括正在執行任務的線程

  • shutdown()方法的源碼以下。
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 權限校驗
        checkShutdownAccess();
        // 將線程池的狀態設置爲SHUTDOWN狀態
        advanceRunState(SHUTDOWN);
        // 中斷空閒的worker線程
        interruptIdleWorkers();
        // 空方法,由子類具體去實現,例如ScheduledThreadPoolExecutor
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // 嘗試將線程池的狀態設置爲TERMINATED
    tryTerminate();
}
複製代碼
  • shutdownNow()方法的源碼以下。
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 線程池狀態設置爲STOP
        advanceRunState(STOP);
        // 中斷全部線程,包括正在執行任務的線程
        interruptWorkers();
        // 清除任務隊列中的全部任務
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
複製代碼

 

  • 在實際工做當中,對於究竟應該使用哪種方法去中斷線程池,應該結合具體的任務來決定,若是要求任務必須執行完成,那麼就是用shutdown()方法。一般也建議使用shutdown()方法,更加優雅。

總結

  • 本文主要介紹了線程池的實現原理,其原理主要分爲4個核心步驟,先判斷線程數量是否超過核心線程數,而後再判斷任務隊列是否已經滿了,再判斷線程數會不會超過設置的最大線程數,最後執行拒絕策略。接着本文詳細介紹了線程池的幾個核心參數corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler以及它們的各自的意義,而後結合源碼實現,詳細分析了任務的執行過程。
  • 不管是讀寫鎖的實現,仍是線程池的實現,Doug Lea都使用了將一個int型的變量按照高低位拆分的技巧,這種思想很值得學習,不只是由於設計巧妙,還由於在計算機中位運算的執行效率更高。
  • 最後解釋下關於文章開頭提到的一點疑惑:爲何是先判斷任務隊列有沒有滿,再判斷線程數有沒有超過最大線程數?而不是先判斷最大線程數,再判斷任務隊列是否已滿?
  • 答案和具體的源碼實現有關。由於當須要建立線程的時候,都會調用addWorker()方法,在addWorker()的後半部分的邏輯中,會調用mainLock.lock()方法來獲取全局鎖,而獲取鎖就會形成必定的資源爭搶。若是先判斷最大線程數,再判斷任務隊列是否已滿,這樣就會形成線程池原理的4個步驟中,第1步判斷核心線程數時要獲取全局鎖,第2步判斷最大線程數時,又要獲取全局鎖,這樣相比於先判斷任務隊列是否已滿,再判斷最大線程數,就可能會多出一次獲取全局鎖的過程。所以在設計線程池,爲了儘量的避免由於獲取全局鎖而形成資源的爭搶,因此會先判斷任務隊列是否已滿,再判斷最大線程數。
  • 另一個疑惑就是:LinkedBlockingQueue的吞吐量比ArrayBlockingQueue的吞吐量要高。前者是基於鏈表實現的,後者是基於數組實現的,正常狀況下,不該該是數組的性能要高於鏈表嗎?
  • 而後看了一下這兩個阻塞隊列的源碼才發現,這是由於LinkedBlockingQueue的讀和寫操做使用了兩個鎖,takeLock和putLock,讀寫操做不會形成資源的爭搶。而ArrayBlockingQueue的讀和寫使用的是同一把鎖,讀寫操做存在鎖的競爭。所以LinkedBlockingQueue的吞吐量高於ArrayBlockingQueue。

推薦

微信公衆號
相關文章
相關標籤/搜索