Spring中ThreadPoolTaskExecutor的線程調度及問題


  • 問題現象spring

  • 緣由分析併發

  • 任務調度邏輯異步

  • 彙總分析ide

  • 解決方案ui


問題現象

在咱們的系統中,使用了這樣的配置來開啓異步操做:this

spring配置atom

<task:annotation-driven executor="executor"url

    scheduler="scheduler" />spa

<task:executor id="executor" pool-size="16-128"線程

    keep-alive="60" rejection-policy="CALLER_RUNS" queue-capacity="1000" />

客戶端開啓異步代碼

@Async()

public Future<Result4Calculate> calculateByLendId(intdid) {

    // 標記1

    // 調用REST服務;監控調用時間。

}

獲取Future後的處理

try {

    keplerOverdue = summay4Overdue.get(5, TimeUnit.SECONDS);

    // 後續處理

catch (Exception e) {

    // 標記2

    // 異常報警

}

 

然而在這種配置下,客戶端在標記1處監控到的調用時間廣泛在4s之內(平均時間不到1s,個別峯值請求會突破5s,全天超過5s的請求不到10個)。然而,在標記2處捕獲到的超時異常卻很是多(一天接近700+)。

問題出在哪兒?


 

緣由分析

上述問題相關代碼的調用時序以下圖所示。

https://www.processon.com/view/link/585d381ee4b02e6c0ac86d66spacer.gif


其中,rest client 與rest server間的交互時間能夠明確監控到,用時超過5s的很是少。可是,get方法卻常常拋出超時異常。通過初步分析,問題出如今ThreadPoolTaskExecutor的任務調度過程當中。


 

任務調度邏輯

使用<task:executor>註解獲得的bean是ThreadPoolTaskExecutor的實例。這個類自己並不作調度,而是將調度工做委託給了ThreadPoolExecutor。後者的任務調度代碼以下:

 

ThreadPoolExecutor任務調度代碼

/**

 * Executes the given task sometime in the future.  The task

 * may execute in a new thread or in an existing pooled thread.

 *

 * If the task cannot be submitted for execution, either because this

 * executor has been shutdown or because its capacity has been reached,

 * the task is handled by the current {@code RejectedExecutionHandler}.

 *

 * @param command the task to execute

 * @throws RejectedExecutionException at discretion of

 *         {@code RejectedExecutionHandler}, if the task

 *         cannot be accepted for execution

 * @throws NullPointerException if {@code command} is null

 */

public void execute(Runnable command) {

    if (command == null)

        throw new NullPointerException();

    /*

     * Proceed in 3 steps:

     *

     * 1. If fewer than corePoolSize threads are running, try to

     * start a new thread with the given command as its first

     * task.  The call to addWorker atomically checks runState and

     * workerCount, and so prevents false alarms that would add

     * threads when it shouldn't, by returning false.

     *

     * 2. If a task can be successfully queued, then we still need

     * to double-check whether we should have added a thread

     * (because existing ones died since last checking) or that

     * the pool shut down since entry into this method. So we

     * recheck state and if necessary roll back the enqueuing if

     * stopped, or start a new thread if there are none.

     *

     * 3. If we cannot queue task, then we try to add a new

     * thread.  If it fails, we know we are shut down or saturated

     * and so reject the task.

     */

    int c = ctl.get();

    if (workerCountOf(c) < corePoolSize) {

        if (addWorker(command, true))

            return;

        c = ctl.get();

    }

    if (isRunning(c) && workQueue.offer(command)) {

        int recheck = ctl.get();

        if (! isRunning(recheck) && remove(command))

            reject(command);

        else if (workerCountOf(recheck) == 0)

            addWorker(nullfalse);

    }

    else if (!addWorker(command, false))

        reject(command);

}

經過其中的註釋,咱們能夠知道它的核心調度邏輯以下(省略了一些檢查等方法):

  1. 若是正在運行的線程數量小於corePoolSize(最小線程數),則嘗試啓動一個新線程,並將當入參command做爲該線程的第一個task。不然進入步驟二。

  2. 若是沒有按步驟1執行,那麼嘗試把入參command放入workQueue中。若是能成功入隊,作後續處理;不然,進入步驟三。

  3. 若是沒有按步驟2執行,那麼將嘗試建立一個新線程,而後作後續處理。

 

簡單的說,當向ThreadPoolExecutor提交一個任務時,它會優先交給線程池中的現有線程;若是暫時沒有可用的線程,那麼它會將任務放到隊列中;通常只有在隊列滿了的時候(致使沒法成功入隊),纔會建立新線程來處理隊列中的任務。

順帶一說,任務入隊後,在某些條件下也會建立新線程。但新線程不會當即執行當前任務,而是從隊列中獲取一個任務並開始執行。


 

彙總分析

綜上所述,咱們能夠肯定如下信息:

  1. 根據系統配置,ThreadPoolExecutor中的corePoolSize = 16。


  2. 當併發數超過16時,ThreadPoolExecutor會按照步驟二進行任務調度,即把任務放入隊列中,但沒有及時建立新線程來執行這個任務

    這一點是推測。但同時,經過日誌中的線程名稱確認的線程池內線程數量沒有增加。日誌中,異步線程的id從executor-一、executor-2一直到executor-16,但17及以上的都沒有出現過。


  3. 隊列中的任務出現積壓、時間累積,致使某一個任務超時後,後續大量任務都超時。可是超時並無阻止任務執行;任務仍然會繼續經過rest client調用rest server,並被監控代碼記錄下時間。
    任務在隊列中積壓、累積,是引起一天數百次異常、報警的緣由。而監控代碼並未監控到任務調度的時間,所以都沒有出現超時。

spacer.gif


 

解決方案

初步考慮方案有三:

  1. 提升初始線程數。
    提升步併發的初始線程數(如將16-128調整爲32-128)。以此減小新任務進入隊列的概率。
    可是這個方案只是下降隊列積壓的風險,並不解決問題。


  2. 關閉隊列。
    將隊列大小調整爲0,以此保證每個新任務都有一個新線程來執行。
    這個方案的問題在於,併發壓力大時,可能致使線程不夠用。此時的異步調用會根據rejection-policy="CALLER_RUNS"的配置而變爲同步調用。


  3. 更換線程池。使用優先建立新線程(而非優先入隊列)的線程池。改動最大的方案。

相關文章
相關標籤/搜索