Java併發(六)線程池監控

目錄

  1、線程池監控參數java

  2、線程池監控類緩存

  3、注意事項服務器

 

在上一篇博文中,咱們介紹了線程池的基本原理和使用方法。瞭解了基本概念以後,咱們可使用 Executors 類建立線程池來執行大量的任務,使用線程池的併發特性提升系統的吞吐量。可是,線程池使用不當也會使服務器資源枯竭,致使異常狀況的發生,好比固定線程池的阻塞隊列任務數量過多、緩存線程池建立的線程過多致使內存溢出、系統假死等問題。所以,咱們須要一種簡單的監控方案來監控線程池的使用狀況,好比完成任務數量、未完成任務數量、線程大小等信息。併發

1、線程池監控參數

上一篇博文提到,線程池提供瞭如下幾個方法能夠監控線程池的使用狀況:框架

方法 含義
getActiveCount() 線程池中正在執行任務的線程數量
getCompletedTaskCount() 線程池已完成的任務數量,該值小於等於taskCount
getCorePoolSize() 線程池的核心線程數量
getLargestPoolSize() 線程池曾經建立過的最大線程數量。經過這個數據能夠知道線程池是否滿過,也就是達到了maximumPoolSize
getMaximumPoolSize() 線程池的最大線程數量
getPoolSize() 線程池當前的線程數量
getTaskCount() 線程池已經執行的和未執行的任務總數

經過這些方法,能夠對線程池進行監控,在 ThreadPoolExecutor 類中提供了幾個空方法,如 beforeExecute 方法, afterExecute 方法和 terminated 方法,能夠擴展這些方法在執行前或執行後增長一些新的操做,例如統計線程池的執行任務的時間等,能夠繼承自 ThreadPoolExecutor 來進行擴展。ide

2、線程池監控類

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 繼承ThreadPoolExecutor類,覆蓋了shutdown(), shutdownNow(), beforeExecute() 和 afterExecute()
 * 方法來統計線程池的執行狀況
 * <p>
 * Created by on 2019/4/19.
 */
public class ThreadPoolMonitor extends ThreadPoolExecutor {

    private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolMonitor.class);

    /**
     * 保存任務開始執行的時間,當任務結束時,用任務結束時間減去開始時間計算任務執行時間
     */
    private ConcurrentHashMap<String, Date> startTimes;

    /**
     * 線程池名稱,通常以業務名稱命名,方便區分
     */
    private String poolName;

    /**
     * 調用父類的構造方法,並初始化HashMap和線程池名稱
     *
     * @param corePoolSize    線程池核心線程數
     * @param maximumPoolSize 線程池最大線程數
     * @param keepAliveTime   線程的最大空閒時間
     * @param unit            空閒時間的單位
     * @param workQueue       保存被提交任務的隊列
     * @param poolName        線程池名稱
     */
    public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                             TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                Executors.defaultThreadFactory(), poolName);
    }


    /**
     * 調用父類的構造方法,並初始化HashMap和線程池名稱
     *
     * @param corePoolSize    線程池核心線程數
     * @param maximumPoolSize 線程池最大線程數
     * @param keepAliveTime   線程的最大空閒時間
     * @param unit            空閒時間的單位
     * @param workQueue       保存被提交任務的隊列
     * @param threadFactory   線程工廠
     * @param poolName        線程池名稱
     */
    public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                             TimeUnit unit, BlockingQueue<Runnable> workQueue,
                             ThreadFactory threadFactory, String poolName) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.startTimes = new ConcurrentHashMap<>();
        this.poolName = poolName;
    }

    /**
     * 線程池延遲關閉時(等待線程池裏的任務都執行完畢),統計線程池狀況
     */
    @Override
    public void shutdown() {
        // 統計已執行任務、正在執行任務、未執行任務數量
        LOGGER.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
                this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
        super.shutdown();
    }

    /**
     * 線程池當即關閉時,統計線程池狀況
     */
    @Override
    public List<Runnable> shutdownNow() {
        // 統計已執行任務、正在執行任務、未執行任務數量
        LOGGER.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
                this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
        return super.shutdownNow();
    }

    /**
     * 任務執行以前,記錄任務開始時間
     */
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        startTimes.put(String.valueOf(r.hashCode()), new Date());
    }

    /**
     * 任務執行以後,計算任務結束時間
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        Date startDate = startTimes.remove(String.valueOf(r.hashCode()));
        Date finishDate = new Date();
        long diff = finishDate.getTime() - startDate.getTime();
        // 統計任務耗時、初始線程數、核心線程數、正在執行的任務數量、
        // 已完成任務數量、任務總數、隊列裏緩存的任務數量、池中存在的最大線程數、
        // 最大容許的線程數、線程空閒時間、線程池是否關閉、線程池是否終止
        LOGGER.info("{}-pool-monitor: " +
                        "Duration: {} ms, PoolSize: {}, CorePoolSize: {}, Active: {}, " +
                        "Completed: {}, Task: {}, Queue: {}, LargestPoolSize: {}, " +
                        "MaximumPoolSize: {},  KeepAliveTime: {}, isShutdown: {}, isTerminated: {}",
                this.poolName,
                diff, this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(),
                this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(),
                this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated());
    }

    /**
     * 建立固定線程池,代碼源於Executors.newFixedThreadPool方法,這裏增長了poolName
     *
     * @param nThreads 線程數量
     * @param poolName 線程池名稱
     * @return ExecutorService對象
     */
    public static ExecutorService newFixedThreadPool(int nThreads, String poolName) {
        return new ThreadPoolMonitor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), poolName);
    }

    /**
     * 建立緩存型線程池,代碼源於Executors.newCachedThreadPool方法,這裏增長了poolName
     *
     * @param poolName 線程池名稱
     * @return ExecutorService對象
     */
    public static ExecutorService newCachedThreadPool(String poolName) {
        return new ThreadPoolMonitor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), poolName);
    }

    /**
     * 生成線程池所用的線程,只是改寫了線程池默認的線程工廠,傳入線程池名稱,便於問題追蹤
     */
    static class EventThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        /**
         * 初始化線程工廠
         *
         * @param poolName 線程池名稱
         */
        EventThreadFactory(String poolName) {
            SecurityManager s = System.getSecurityManager();
            group = Objects.nonNull(s) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
            namePrefix = poolName + "-pool-" + poolNumber.getAndIncrement() + "-thread-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
}

ThreadPoolMonitor 類繼承了 ThreadPoolExecutor 類,重寫了shutdown() 、shutdownNow() 、beforeExecute() 和 afterExecute()方法來統計線程池的執行狀況,這四個方法是 ThreadPoolExecutor 類預留給開發者進行擴展的方法,具體以下:高併發

方法 含義
shutdown() 線程池延遲關閉時(等待線程池裏的任務都執行完畢),統計已執行任務、正在執行任務、未執行任務數量
shutdownNow() 線程池當即關閉時,統計已執行任務、正在執行任務、未執行任務數量
beforeExecute(Thread t, Runnable r) 任務執行以前,記錄任務開始時間,startTimes這個HashMap以任務的hashCode爲key,開始時間爲值
afterExecute(Runnable r, Throwable t) 任務執行以後,計算任務結束時間。統計任務耗時、初始線程數、核心線程數、正在執行的任務數量、已完成任務數量、任務總數、隊列裏緩存的任務數量、池中存在的最大線程數、最大容許的線程數、線程空閒時間、線程池是否關閉、線程池是否終止信息

監控日誌:this

22:50:25.376 [cellphone-pool-1-thread-3] INFO org.cellphone.common.pool.ThreadPoolMonitor - cellphone-pool-monitor: Duration: 1009 ms, PoolSize: 3, CorePoolSize: 3, Active: 1, Completed: 17, Task: 18, Queue: 0, LargestPoolSize: 3, MaximumPoolSize: 3,  KeepAliveTime: 0, isShutdown: false, isTerminated: false

通常咱們會依賴 beforeExecute 和 afterExecute 這兩個方法統計的信息,具體緣由請參考須要注意部分的最後一項。有了這些信息以後,咱們能夠根據業務狀況和統計的線程池信息合理調整線程池大小,根據任務耗時長短對自身服務和依賴的其餘服務進行調優,提升服務的可用性。atom

3、注意事項

1. 在 afterExecute 方法中須要注意,須要調用 ConcurrentHashMap 的 remove 方法移除並返回任務的開始時間信息,而不是調用 get 方法,由於在高併發狀況下,線程池裏要執行的任務不少,若是隻獲取值不移除的話,會使 ConcurrentHashMap 愈來愈大,引起內存泄漏或溢出問題。該行代碼以下:spa

Date startDate = startTimes.remove(String.valueOf(r.hashCode()));

2. 有了ThreadPoolMonitor類以後,咱們能夠經過 newFixedThreadPool(int nThreads, String poolName) 和 newCachedThreadPool(String poolName) 方法建立兩個平常咱們使用最多的線程池,跟默認的 Executors 裏的方法不一樣的是,這裏須要傳入 poolName 參數,該參數主要是用來給線程池定義一個與業務相關並有具體意義的線程池名字,方便咱們排查線上問題。

3. 在生產環境中,謹慎調用 shutdown() 和 shutdownNow() 方法,由於調用這兩個方法以後,線程池會被關閉,再也不接收新的任務,若是有新任務提交到一個被關閉的線程池,會拋出 java.util.concurrent.RejectedExecutionException 異常。其實在使用Spring等框架來管理類的生命週期的條件下,也沒有必要調用這兩個方法來關閉線程池,線程池的生命週期徹底由該線程池所屬的Spring管理的類決定。 

相關文章
相關標籤/搜索