Java踩坑記系列之線程池

線程池你們都很熟悉,不管是平時的業務開發仍是框架中間件都會用到,大部分都是基於JDK線程池ThreadPoolExecutor作的封裝,好比tomcat的線程池,固然也有單獨開發的,但都會牽涉到這幾個核心參數的設置:核心線程數等待隊列最大線程數拒絕策略等。html

先說下咱們項目組在使用線程池時踩到的坑:前端

  1. 線程池的參數設置必定要結合具體的業務場景,區分I/O密集和CPU密集,若是是I/O密集型業務,核心線程數,workQueue等待隊列,最大線程數等參數設置不合理不只不能發揮線程池的做用,反而會影響現有業務
  2. 等待隊列workQueue填滿後,新建立的線程會優先處理新請求進來的任務,而不是去處理隊列裏的任務,隊列裏的任務只能等核心線程數忙完了才能被執行。有可能形成隊列裏的任務長時間等待,致使隊列積壓,尤爲是I/O密集場景
  3. 若是須要獲得線程池裏的線程執行結果,使用future的方式,拒絕策略不能使用DiscardPolicy,這種丟棄策略雖然不執行子線程的任務,可是仍是會返回future對象(其實在這種狀況下咱們已經不須要線程池返回的結果了),而後後續代碼即便判斷了future!=null也沒用,這樣的話仍是會走到future.get()方法,若是get方法沒有設置超時時間會致使一直阻塞下去!

僞代碼以下:java

// 若是線程池已滿,新的請求會直接執行拒絕策略
Future<String> future = executor.submit(() -> {
    // 業務邏輯,好比調用第三方接口等耗時操做放在線程池裏執行
    return result;
});

// 主流程調用邏輯
if(future != null) // 若是拒絕策略設置不合理仍是會走到下面代碼
  future.get(超時時間); // 調用方阻塞等待結果返回,直到超時
複製代碼

下面就結合實際業務狀況逐一進行分析。tomcat

固然這些問題一部分是對線程池理解不夠致使的,還有一部分是線程池自己的問題。app

一. 背景

公司有個接口部分功能使用了線程池,這個功能不依賴核心接口,但有必定的耗時,因此放在線程池裏和主線程並行執行,等線程池裏的任務執行完經過future.get的方式獲取線程池裏的線程執行結果,而後合併到主流程的結果裏返回給前端,業務場景很簡單,大體流程以下:框架

初衷也是爲了避免影響主流程的性能,不增長總體響應時間。異步

可是以前使用的線程池jdk的newCachedThreadPool,由於sonar掃描提示說有內存溢出的風險(最大線程數是Integer.MAX_VALUE)因此當時改爲使用原生的ThreadPoolExecutor,經過指定核心線程數和最大線程數,來解決sonar問題。ide

可是改過的線程池並不適合咱們這種I/O密集型的業務場景(大部分業務都是經過調用接口實現的),當時設置的核心線程數是cpu核數(線上機器是4核),等待隊列是2048,最大線程數是cpu核數*2,從而引起了一系列問題。。。性能

二. 排查過程

上線後的現象是使用線程池的接口總體響應時間變長,有的甚至到10秒才返回數據,經過線程dump分析發現有大量的線程都阻塞在future.get方法上,以下:ui

future.get方法會阻塞當前主流程,在超時時間內等待子線程返回結果,若是超時還沒結果則結束等待繼續執行後續的代碼,超時時間設置的是默認接口超時時間10秒(後面已改成200ms),至此能夠肯定接口總耗時是由於流程都卡在了future.get這一步了。

但這不是根本緣由,future是線程池返回的,僞代碼以下:

Future<String> future = executor.submit(() -> {
    // 業務邏輯,好比調用第三方接口等耗時操做放在線程池裏執行
    return result;
});
複製代碼

經過上面的代碼可知future沒有結果的緣由是提交到線程池裏的任務遲遲沒有被執行。

那爲何沒有執行呢?繼續分析線程池的dump文件發現,線程池裏的線程數已達到最大數量,滿負荷運行,如圖:

SubThread是咱們本身定義的線程池裏線程的名字,8個線程都是runnable狀態,說明等待隊列裏已經塞滿任務了,以前設置的隊列長度是2048,也就是說還有2048個任務等待執行,這無疑加重了整個接口的耗時。

線程池的執行順序是:核心線程數 -> 等待隊列 -> 最大線程數 -> 拒絕策略

若是對線程dump分析不太瞭解的能夠看下以前的一篇文章:Windows環境下如何進行線程dump分析,雖然環境不同但原理相似。

這裏基本肯定接口耗時變長的主要緣由是線程池設置不合理致使的。

另外還有一些偶發問題,就是線上日誌顯示雖然線程池執行了,可是線程池裏的任務卻沒有記錄運行日誌,線程池裏的任務是調用另一個服務的接口,和對方接口負責人確認也確實調用了他們的接口,可咱們本身的日誌裏卻沒有記錄下調用報文,通過進一步查看代碼發現當時的線程池拒絕策略也被修改過,並非默認的拋出異常不執行策略AbortPolicy,而是設置的CallerRunsPolicy策略,即交給調用方執行!

也就是說當線程池達到最大負荷時執行的拒絕策略是讓主流程去執行提交到線程池裏的任務,這樣除了進一步加重整個接口的耗時外,還會致使主流程被hang死,最關鍵的是沒法肯定是在哪一步執行提交到線程池的任務

分析日誌埋點能夠推斷出調用的時間點應該是已經調用完了記錄日誌的方法,要返回給前端結果的時才執行線程池裏任務,此時記錄日誌的方法已調用過,不會再去打印日誌了,並且子任務返回的結果也沒法合併到主流程結果裏,由於合併主流程結果和線程池任務返回結果的方法也在以前調用過,不會回過頭來再調用了,大體流程以下:

其實這種拒絕策略並不適合咱們如今的業務場景,由於線程池裏的任務不是核心任務,不該該影響主流程的執行。

三. 改進

  1. 調整線程池參數,核心線程數基於線上接口的QPS計算,最大線程數參考線上tomcat的最大線程數配置,可以cover住高峯流量,隊列設置的儘可能小,避免形成任務擠壓。關於線程數如何設置會在後續文章中單獨講解。
  2. 擴展線程池,封裝原生JDK線程池ThreadPoolExecutor,增長對線程池各項指標的監控,包括線程池運行狀態、核心線程數、最大線程數、任務等待數、已完成任務數、線程池異常關閉等信息,便於實時監控和定位問題。
  3. 重寫線程池拒絕策略,主要也是記錄超出線程池負載狀況下的各項指標狀況,以及調用線程的堆棧信息,便於排查分析,經過拋出異常方式中斷執行,避免引用的future不爲null的問題。
  4. 合理調整future.get超時時間,防止阻塞主線程時間過長。

線程池內部流程:

線程池監控和自定義拒絕策略的代碼以下,你們能夠結合本身的業務場景拿去使用:

package com.javakk;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.MessageFormat;
import java.util.List;
import java.util.concurrent.*;

/**
 * 自定義線程池<p>
 * 1.監控線程池狀態及異常關閉等狀況<p>
 * 2.監控線程池運行時的各項指標, 好比:任務等待數、已完成任務數、任務異常信息、核心線程數、最大線程數等<p>
 * author: 老K
 */
public class ThreadPoolExt extends ThreadPoolExecutor{

    private static final Logger log = LoggerFactory.getLogger(ThreadPoolExt.class);

    private TimeUnit timeUnit;

    public ThreadPoolExt(int corePoolSize,
                         int maximumPoolSize,
                         long keepAliveTime,
                         TimeUnit unit,
                         BlockingQueue<Runnable> workQueue,
                         ThreadFactory threadFactory,
                         RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.timeUnit = unit;
    }

    @Override
    public void shutdown() {
        // 線程池將要關閉事件,此方法會等待線程池中正在執行的任務和隊列中等待的任務執行完畢再關閉
        monitor("ThreadPool will be shutdown:");
        super.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        // 線程池當即關閉事件,此方法會當即關閉線程池,可是會返回隊列中等待的任務
        monitor("ThreadPool going to immediately be shutdown:");
        // 記錄被丟棄的任務, 暫時只記錄日誌, 後續可根據業務場景作進一步處理
        List<Runnable> dropTasks = null;
        try {
            dropTasks = super.shutdownNow();
            log.error(MessageFormat.format("ThreadPool discard task count:{0}", dropTasks.size()));
        } catch (Exception e) {
            log.error("ThreadPool shutdownNow error", e);
        }
        return dropTasks;
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        // 監控線程池運行時的各項指標
        monitor("ThreadPool monitor data:");
    }

    @Override
    protected void afterExecute(Runnable r, Throwable ex) {
        if (ex != null) { // 監控線程池中的線程執行是否異常
            log.error("unknown exception caught in ThreadPool afterExecute:", ex);
        }
    }

    /**
     * 監控線程池運行時的各項指標, 好比:任務等待數、任務異常信息、已完成任務數、核心線程數、最大線程數等<p>
     */
    private void monitor(String title){
        try {
            // 線程池監控信息記錄, 這裏須要注意寫ES的時機,尤爲是多個子線程的日誌合併到主流程的記錄方式
            String threadPoolMonitor = MessageFormat.format(
                    "{0}{1}core pool size:{2}, current pool size:{3}, queue wait size:{4}, active count:{5}, completed task count:{6}, " +
                            "task count:{7}, largest pool size:{8}, max pool size:{9}, keep alive time:{10}, is shutdown:{11}, is terminated:{12}, " +
                            "thread name:{13}{14}",
                    System.lineSeparator(), title, this.getCorePoolSize(), this.getPoolSize(),
                    this.getQueue().size(), this.getActiveCount(), this.getCompletedTaskCount(), this.getTaskCount(), this.getLargestPoolSize(),
                    this.getMaximumPoolSize(), this.getKeepAliveTime(timeUnit != null ? timeUnit : TimeUnit.SECONDS), this.isShutdown(),
                    this.isTerminated(), Thread.currentThread().getName(), System.lineSeparator());
            log.info(threadPoolMonitor);
        } catch (Exception e) {
            log.error("ThreadPool monitor error", e);
        }
    }
}
複製代碼

自定義拒絕策略代碼:

package com.javakk;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.management.*;
import java.text.MessageFormat;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 自定義線程池拒絕策略:<p>
 * 1.記錄線程池的核心線程數,活躍數,已完成數等信息,以及調用線程的堆棧信息,便於排查<p>
 * 2.拋出異常中斷執行<p>
 * author: 老K
 */
public class RejectedPolicyWithReport implements RejectedExecutionHandler {

    private static final Logger log = LoggerFactory.getLogger(RejectedPolicyWithReport.class);

    private static volatile long lastPrintTime = 0;

    private static final long TEN_MINUTES_MILLS = 10 * 60 * 1000;

    private static Semaphore guard = new Semaphore(1);
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        try {
            String title = "thread pool execute reject policy!!";
            String msg = MessageFormat.format(
                    "{0}{1}core pool size:{2}, current pool size:{3}, queue wait size:{4}, active count:{5}, completed task count:{6}, " +
                            "task count:{7}, largest pool size:{8}, max pool size:{9}, keep alive time:{10}, is shutdown:{11}, is terminated:{12}, " +
                            "thread name:{13}{14}",
                    System.lineSeparator(), title, e.getCorePoolSize(), e.getPoolSize(), e.getQueue().size(), e.getActiveCount(),
                    e.getCompletedTaskCount(), e.getTaskCount(), e.getLargestPoolSize(), e.getMaximumPoolSize(), e.getKeepAliveTime(TimeUnit.SECONDS),
                    e.isShutdown(), e.isTerminated(), Thread.currentThread().getName(), System.lineSeparator());
            log.info(msg);
 threadDump(); // 記錄線程堆棧信息包括鎖爭用信息
        } catch (Exception ex) {
            log.error("RejectedPolicyWithReport rejectedExecution error", ex);
        }
        throw new RejectedExecutionException("thread pool execute reject policy!!");
    }

    /**
     * 獲取線程dump信息<p>
     * 注意: 該方法默認會記錄全部線程和鎖信息雖然方便debug, 使用時最好加開關和間隔調用, 不然可能會增長latency<p>
     * 1.當前線程的基本信息:id,name,state<p>
     * 2.堆棧信息<p>
     * 3.鎖相關信息(能夠設置不記錄)<p>
     *  默認在log記錄<p>
     * @return
     */
    private void threadDump() {
        long now = System.currentTimeMillis();
        // 每隔10分鐘dump一次
        if (now - lastPrintTime < TEN_MINUTES_MILLS) { 
            return; 
        } 
        if (!guard.tryAcquire()) { 
            return; 
        } 
        // 異步dump線程池信息 
        ExecutorService pool = Executors.newSingleThreadExecutor(); 
        pool.execute(() -> {
            try {
                ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean();
                StringBuilder sb = new StringBuilder();
                for (ThreadInfo threadInfo : threadMxBean.dumpAllThreads(true, true)) {
                    sb.append(getThreadDumpString(threadInfo));
                }
                log.error("thread dump info:", sb.toString());
            } catch (Exception e) {
                log.error("thread dump error", e);
            } finally {
                guard.release();
            }
            lastPrintTime = System.currentTimeMillis();
        });
        pool.shutdown();
    }

    @SuppressWarnings("all")
    private String getThreadDumpString(ThreadInfo threadInfo) {
        StringBuilder sb = new StringBuilder(""" + threadInfo.getThreadName() + """ +
                " Id=" + threadInfo.getThreadId() + " " +
                threadInfo.getThreadState());
        if (threadInfo.getLockName() != null) {
            sb.append(" on " + threadInfo.getLockName());
        }
        if (threadInfo.getLockOwnerName() != null) {
            sb.append(" owned by "" + threadInfo.getLockOwnerName() +
                    "" Id=" + threadInfo.getLockOwnerId());
        }
        if (threadInfo.isSuspended()) {
            sb.append(" (suspended)");
        }
        if (threadInfo.isInNative()) {
            sb.append(" (in native)");
        }
        sb.append('n');
        int i = 0;

        StackTraceElement[] stackTrace = threadInfo.getStackTrace();
        MonitorInfo[] lockedMonitors = threadInfo.getLockedMonitors();
        for (; i < stackTrace.length && i < 32; i++) {
            StackTraceElement ste = stackTrace[i];
            sb.append("tat " + ste.toString());
            sb.append('n');
            if (i == 0 && threadInfo.getLockInfo() != null) {
                Thread.State ts = threadInfo.getThreadState();
                switch (ts) {
                    case BLOCKED:
                        sb.append("t-  blocked on " + threadInfo.getLockInfo());
                        sb.append('n');
                        break;
                    case WAITING:
                        sb.append("t-  waiting on " + threadInfo.getLockInfo());
                        sb.append('n');
                        break;
                    case TIMED_WAITING:
                        sb.append("t-  waiting on " + threadInfo.getLockInfo());
                        sb.append('n');
                        break;
                    default:
                }
            }

            for (MonitorInfo mi : lockedMonitors) {
                if (mi.getLockedStackDepth() == i) {
                    sb.append("t-  locked " + mi);
                    sb.append('n');
                }
            }
        }
        if (i < stackTrace.length) {
            sb.append("t...");
            sb.append('n');
        }

        LockInfo[] locks = threadInfo.getLockedSynchronizers();
        if (locks.length > 0) {
            sb.append("ntNumber of locked synchronizers = " + locks.length);
            sb.append('n');
            for (LockInfo li : locks) {
                sb.append("t- " + li);
                sb.append('n');
            }
        }
        sb.append('n');
        return sb.toString();
    }
}
複製代碼

文章來源:javakk.com/188.html

相關文章
相關標籤/搜索