面試官:線程池如何按照core、max、queue的執行循序去執行?(內附詳細解析)

前言

這是一個真實的面試題。html

前幾天一個朋友在羣裏分享了他剛剛面試候選者時問的問題:"線程池如何按照core、max、queue的執行循序去執行?"java

咱們都知道線程池中代碼執行順序是:corePool->workQueue->maxPool,源碼我都看過,你如今問題讓我改源碼?? git

一時間羣裏炸開了鍋,小夥伴們紛紛打聽他所在的公司,而後拉黑避坑。(手動狗頭,你們一塊兒調侃٩(๑❛ᴗ❛๑)۶)github

關於線程池他一共問了這麼幾個問題:面試

  • 線程池如何按照core、max、queue的順序去執行?
  • 子線程拋出的異常,主線程能感知到麼?
  • 線程池發生了異常改怎樣處理?

全是一些有意思的問題,我以前也寫過一篇很詳細的圖文教程:【萬字圖文-原創】 | 學會Java中的線程池,這一篇也許就夠了! ,不瞭解的小夥伴能夠再回顧下~apache

可是針對這幾個問題,可能你們一時間也有點懵。今天的文章咱們以源碼爲基礎來分析下該如何回答這三個問題。(以前沒閱讀過源碼也不要緊,全部的分析都會貼出源碼及圖解)異步

線程池如何按照core、max、queue的順序執行?

問題思考

對於這個問題,不少小夥伴確定會疑惑:"別人源碼中寫好的執行流程你爲啥要改?這面試官腦子有病吧......"ide

這裏來思考一下現實工做場景中是否有這種需求?以前也看到過一份簡歷也寫到過這個問題:源碼分析

場景描述.png

一個線程池執行的任務屬於IO密集型,CPU大多屬於閒置狀態,系統資源未充分利用。若是一瞬間來了大量請求,若是線程池數量大於coreSize時,多餘的請求都會放入到等待隊列中。等待着corePool中的線程執行完成後再來執行等待隊列中的任務。學習

試想一下,這種場景咱們該如何優化?

咱們能夠修改線程池的執行順序爲corePool->maxPool->workQueue。 這樣就可以充分利用CPU資源,提交的任務會被優先執行。當線程池中線程數量大於maxSize時纔會將任務放入等待隊列中。

你就說巧不巧?面試官的這個問題顯然是通過認真思考來提問的,這是一個頗有意思的溫恩提,下面就一塊兒看看如何解決吧。

線程池運行流程

咱們都知道線程池執行流程是先corePoolworkQueue,最後纔是maxPool的一個執行流程。

執行流程.png

線程池核心參數

在回顧下ThreadPoolExecutor.execute()源碼前咱們先回顧下線程池中的幾個重要參數:

線程池核心參數.png

咱們來看下這幾個參數的定義:
corePoolSize: 線程池中核心線程數量
maximumPoolSize: 線程池中最大線程數量
keepAliveTime: 非核心的空閒線程等待新任務的時間
unit: 時間單位。配合allowCoreThreadTimeOut也會清理核心線程池中的線程。
workQueue: 基於Blocking的任務隊列,最好選用有界隊列,指定隊列長度
threadFactory: 線程工廠,最好自定義線程工廠,能夠自定義每一個線程的名稱
handler: 拒絕策略,默認是AbortPolicy

ThreadPoolExecutor.execute()源碼分析

咱們能夠看下execute()以下:

execute執行源碼.png

接着來分析下執行過程:

  1. 第一步:workerCountOf(c)時間計算當前線程池中線程的個數,當線程個數小於核心線程數
  2. 第二步:線程池線程數量大於核心線程數,此時提交的任務會放入workQueue中,使用offer()進行操做
  3. 第三步:workQueue.offer()執行失敗,新提交的任務會直接執行,addWorker()會判斷若是當前線程池數量大於最大線程數,則執行拒絕策略

好了,到了這裏咱們都已經很清楚了,關鍵在於第二步和第三步如何交換順序執行呢?

解決思路

仔細想想,若是修改workQueue.offer()的實現不就能夠達到目的了?咱們先來畫圖來看一下:

問題思路.png

如今的問題就在於,若是當前線程池中coreSize < workCount < maxSize時,必定會先執行offer()操做。

咱們若是修改offer的實現是否能夠完成執行順序的更換呢?這裏也是畫圖來展現一下:

解決方式.png

Dubbo中EagerThreadPool解決方案

湊巧Dubbo中也有相似的實現,在DubboEagerThreadPool自定義了一個BlockingQueue,在offer()方法中,若是當前線程池數量小於最大線程池時,直接返回false,這裏就達到了調節線程池執行順序的目的。

dubbo中解決方案.png

源碼直達https://github.com/apache/dub...

看到這裏一切都真相大白了,解決思路以及方案都很簡單,學會了沒有?

這個問題背後還隱藏了一些場景的優化、源碼的擴展等等知識,果真是一個值得思考的好問題。

子線程拋出的異常,主線程能感知到麼?

問題思考

這個問題其實也很容易回答,也僅僅是一個面試題而已,實際工做中子線程的異常不該該由主線程來捕獲。

針對這個問題,但願你們清楚的是: 咱們要明確線程代碼的邊界,異步化過程當中,子線程拋出的異常應該由子線程本身去處理,而不是須要主線程感知來協助處理。

解決方案

解決方案很簡單,在虛擬機中,當一個線程若是沒有顯式處理異常而拋出時會將該異常事件報告給該線程對象的 java.lang.Thread.UncaughtExceptionHandler 進行處理,若是線程沒有設置 UncaughtExceptionHandler,則默認會把異常棧信息輸出到終端而使程序直接崩潰。

因此若是咱們想在線程意外崩潰時作一些處理就能夠經過實現 UncaughtExceptionHandler 來知足需求。

咱們使用線程池設置ThreadFactory時能夠指定UncaughtExceptionHandler,這樣就能夠捕獲到子線程拋出的異常了。

代碼示例

具體代碼以下:

/**
 * 測試子線程異常問題
 *
 * @author wangmeng
 * @date 2020/6/13 18:08
 */
public class ThreadPoolExceptionTest {

    public static void main(String[] args) throws InterruptedException {
        MyHandler myHandler = new MyHandler();
        ExecutorService execute = new ThreadPoolExecutor(10, 10,
                0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder().setUncaughtExceptionHandler(myHandler).build());

        TimeUnit.SECONDS.sleep(5);
        for (int i = 0; i < 10; i++) {
            execute.execute(new MyRunner());
        }
    }


    private static class MyRunner implements Runnable {
        @Override
        public void run() {
            int count = 0;
            while (true) {
                count++;
                System.out.println("我要開始生產Bug了============");
                if (count == 10) {
                    System.out.println(1 / 0);
                }

                if (count == 20) {
                    System.out.println("這裏是不會執行到的==========");
                    break;
                }
            }
        }
    }
}

class MyHandler implements Thread.UncaughtExceptionHandler {
    private final static Logger LOGGER = LoggerFactory.getLogger(MyHandler.class);
    @Override
    public void uncaughtException(Thread t, Throwable e) {
        LOGGER.error("threadId = {}, threadName = {}, ex = {}", t.getId(), t.getName(), e.getMessage());
    }
}

執行結果:
執行結果.png

UncaughtExceptionHandler 解析

咱們來看下Thread中的內部接口UncaughtExceptionHandler

public class Thread {
    ......
    /**
     * 當一個線程因未捕獲的異常而即將終止時虛擬機將使用 Thread.getUncaughtExceptionHandler()
     * 獲取已經設置的 UncaughtExceptionHandler 實例,並經過調用其 uncaughtException(...) 方
     * 法而傳遞相關異常信息。
     * 若是一個線程沒有明確設置其 UncaughtExceptionHandler,則將其 ThreadGroup 對象做爲其
     * handler,若是 ThreadGroup 對象對異常沒有什麼特殊的要求,則 ThreadGroup 會將調用轉發給
     * 默認的未捕獲異常處理器(即 Thread 類中定義的靜態未捕獲異常處理器對象)。
     *
     * @see #setDefaultUncaughtExceptionHandler
     * @see #setUncaughtExceptionHandler
     * @see ThreadGroup#uncaughtException
     */
    @FunctionalInterface
    public interface UncaughtExceptionHandler {
        /**
         * 未捕獲異常崩潰時回調此方法
         */
        void uncaughtException(Thread t, Throwable e);
    }

    /**
     * 靜態方法,用於設置一個默認的全局異常處理器。
     */
    public static void setDefaultUncaughtExceptionHandler(UncaughtExceptionHandler eh) {
         defaultUncaughtExceptionHandler = eh;
     }

    /**
     * 針對某個 Thread 對象的方法,用於對特定的線程進行未捕獲的異常處理。
     */
    public void setUncaughtExceptionHandler(UncaughtExceptionHandler eh) {
        checkAccess();
        uncaughtExceptionHandler = eh;
    }

    /**
     * 當 Thread 崩潰時會調用該方法獲取當前線程的 handler,獲取不到就會調用 group(handler 類型)。
     * group 是 Thread 類的 ThreadGroup 類型屬性,在 Thread 構造中實例化。
     */
    public UncaughtExceptionHandler getUncaughtExceptionHandler() {
        return uncaughtExceptionHandler != null ?
            uncaughtExceptionHandler : group;
    }

    /**
     * 線程全局默認 handler。
     */
    public static UncaughtExceptionHandler getDefaultUncaughtExceptionHandler() {
        return defaultUncaughtExceptionHandler;
    }
    ......
}

部份內容參考自:https://mp.weixin.qq.com/s/gh...

線程池發生了異常改怎樣處理?

線程池中線程運行過程當中出現了異常該怎樣處理呢?線程池提交任務有兩種方式,分別是execute()submit(),這裏會依次說明。

ThreadPoolExecutor.runWorker()實現

不論是使用execute()仍是submit()提交任務,最終都會執行到ThreadPoolExecutor.runWorker(),咱們來看下源碼(源碼基於JDK1.8):

runWorker().png

咱們看到在執行task.run()時,出現異常會直接向上拋出,這裏處理的最好的方式就是在咱們業務代碼中使用try...catch()來捕獲異常。

FutureTask.run()實現

若是咱們使用submit()來提交任務,在ThreadPoolExecutor.runWorker()方法執行時最終會調用到FutureTask.run()方法裏面去,不清楚的小夥伴也能夠看下我以前的文章:

線程池續:你必需要知道的線程池submit()實現原理之FutureTask!

FutureTask.run().png

這裏能夠看到,若是業務代碼拋出異常後,會被catch捕獲到,而後調用setExeception()方法:

FutureTask.setException().png

能夠看到其實相似於直接吞掉了,當咱們調用get()方法的時候異常信息會包裝到FutureTask內部的變量outcome中,咱們也會獲取到對應的異常信息。

ThreadPoolExecutor.runWorker()最後finally中有一個afterExecute()鉤子方法,若是咱們重寫了afterExecute()方法,就能夠獲取到子線程拋出的具體異常信息Throwable了。

結論

對於線程池、包括線程的異常處理推薦如下方式:

  1. 直接使用try/catch,這個也是最推薦的方式
  2. 在咱們構造線程池的時候,重寫uncaughtException()方法,上面示例代碼也有提到:
public class ThreadPoolExceptionTest {

    public static void main(String[] args) throws InterruptedException {
        MyHandler myHandler = new MyHandler();
        ExecutorService execute = new ThreadPoolExecutor(10, 10,
                0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder().setUncaughtExceptionHandler(myHandler).build());

        TimeUnit.SECONDS.sleep(5);
        for (int i = 0; i < 10; i++) {
            execute.execute(new MyRunner());
        }
    }
}

class MyHandler implements Thread.UncaughtExceptionHandler {
    private final static Logger LOGGER = LoggerFactory.getLogger(MyHandler.class);
    @Override
    public void uncaughtException(Thread t, Throwable e) {
        LOGGER.error("threadId = {}, threadName = {}, ex = {}", t.getId(), t.getName(), e.getMessage());
    }
}

3 直接重寫afterExecute()方法,感知異常細節

總結

這篇文章到這裏就結束了,不知道小夥伴們有沒有一些感悟或收穫?

經過這幾個面試問題,我也深入的感覺到學習知識要多思考,看源碼的過程當中要多設置一些場景,這樣纔會收穫更多。

原創乾貨分享.png

相關文章
相關標籤/搜索