線程池的使用

線程池的使用

一、線程池的使用場景

  • 等待返回任務的結果的多步驟的處理場景, 批量併發執行任務,總耗時是單個步驟耗時最長的那個,提供總體的執行效率,程序員

  • 最終一致性,異步執行任務,無需等待,快速返回數據庫

二、線程池的關鍵參數說明

通常狀況下咱們是經過ThreadPoolExecutor來構造咱們的線程池對象的。緩存

* 阿里巴巴的開發規範文檔是禁止直接使用Executors靜態工廠類來建立線程池的,緣由是 服務器

【強制】線程池不容許使用 Executors 去建立,而是經過 ThreadPoolExecutor 的方式,這樣
的處理方式讓寫的同窗更加明確線程池的運行規則,規避資源耗盡的風險。
說明: Executors 返回的線程池對象的弊端以下:
(1) FixedThreadPool 和 SingleThreadPool :
容許的請求隊列長度爲 Integer.MAX_VALUE ,可能會堆積大量的請求,從而致使 OOM 。
(2) CachedThreadPool 和 ScheduledThreadPool :
容許的建立線程數量爲 Integer.MAX_VALUE ,可能會建立大量的線程,從而致使 OOM 。併發

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
       
  }

參數說明:異步

  • corePoolSize:核心線程數,線程池最低的線程數
  • maximumPoolSize:容許的最大的線程數
  • keepAliveTime:當前線程數超過corePoolSize的時候,空閒線程保留的時間
  • unit: keepAliveTime線程保留的時間的單位
  • workQueue: 任務緩衝區
  • threadFactory: 線程的構造工廠
  • handler: 線程池飽含時候的處理策略

三、線程池的分類

Java經過Executors提供四種線程池,分別爲:async

  • newCachedThreadPool建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程,若無可回收,則新建線程。
  • newFixedThreadPool 建立一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。
  • newScheduledThreadPool 建立一個定長線程池,支持定時及週期性任務執行。
  • newSingleThreadExecutor 建立一個單線程化的線程池,它只會用惟一的工做線程來執行任務,保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行。
3.一、newCachedThreadPool
public static ExecutorService newCachedThreadPool(){
    return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.MILLISECONDS,new SynchronousQueue<Runnable>());
}

它是一個能夠無限擴大的線程池;優化

  • 它比較適合處理執行時間比較小的任務;
  • corePoolSize爲0,maximumPoolSize爲無限大,意味着線程數量能夠無限大;ui

  • keepAliveTime爲60S,意味着線程空閒時間超過60S就會被殺死;線程

  • 採用SynchronousQueue裝等待的任務,這個阻塞隊列沒有存儲空間,這意味着只要有請求到來,就必需要找到一條工做線程處理他,若是當前沒有空閒的線程,那麼就會再建立一條新的線程。

3.二、newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads){
    return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
  • 它是一種固定大小的線程池;corePoolSize和maximunPoolSize都爲用戶設定的線程數量nThreads;
  • keepAliveTime爲0,意味着一旦有多餘的空閒線程,就會被當即中止掉;但這裏keepAliveTime無效;
  • 阻塞隊列採用了LinkedBlockingQueue,它是一個無界隊列;因爲阻塞隊列是一個無界隊列,所以永遠不可能拒絕任務;
  • 因爲採用了無界隊列,實際線程數量將永遠維持在nThreads,所以maximumPoolSize和keepAliveTime將無效。
3.三、ScheduledThreadPool
public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
  • 定時任務的使用
3.四、SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor(){
    return new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
  • 它只會建立一條工做線程處理任務;
  • 採用的阻塞隊列爲LinkedBlockingQueue;
3.五、總結
線程池 特色 建議使用場景
newCachedThreadPool 一、線程數無上限
二、空閒線程存活60s
三、阻塞隊列
一、任務執行時間短
二、任務要求響應時間短
newFixedThreadPool 一、線程數固定
二、無界隊列
一、任務比較平緩
二、控制最大的線程數
newScheduledThreadPool 核心線程數量固定、非核心線程數量無限制(閒置時立刻回收) 執行定時 / 週期性 任務
newSingleThreadExecutor 只有一個核心線程(保證全部任務按照指定順序在一個線程中執行,不須要處理線程同步的問題) 不適合併發但可能引發IO阻塞性及影響UI線程響應的操做,如數據庫操做,文件操做等

四、使用線程池容易出現的問題

現象 緣由
整個系統影響緩慢,大部分504 一、爲設置最大的線程數,任務積壓過多,線程數用盡
oom 一、隊列無界或者size設置過大
使用線程池對效率並無明顯的提高 一、線程池的參數設置太小,線程數太小或者隊列太小,或者是服務器的cpu核數過低

五、線程池的監控

5.一、爲何要對線程池進行監控

  • 線程池中線程數和隊列的類型及長度對線程會形成很大的影響,並且會爭奪系統稀有資源,線程數。設置不當,或是沒有最大的利用系統資源,提升系統的總體運行效率,或是致使整個系統的故障。典型的場景是線程數被佔滿,其餘的請求無響應。或是任務積壓過多,直接oom
  • 方便的排查線程中的故障以及優化線程池的使用

5.二、監控的原理

  • 另起一個定時單線程數的線程池newSingleThreadScheduledExecutor

  • 調用scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)定時執行監控任務;

  • 定時任務內 經過ThreadPoolExecutor對象獲取監控的對象信息,好比t線程池須要執行的任務數、線程池在運行過程當中已完成的任務數、曾經建立過的最大線程數、線程池裏的線程數量、線程池裏活躍的線程數量、當前排隊線程數

  • 根據預設的日誌或報警策略,進行規則控制

5.三、實現的細節

定義線程池並啓動監控

/**
     * 定義線程池的隊列的長度
     */
    private final Integer queueSize = 1000;

    /**
     * 定義一個定長的線程池
     */
    private ExecutorService executorService;

    @PostConstruct
    private void initExecutorService() {
        log.info(
                "executorService init with param: threadcount:{} ,queuesize:{}",
                systemConfig.getThreadCount(),
                systemConfig.getThreadQueueSize());
        executorService =
                new ThreadPoolExecutor(
                        systemConfig.getThreadCount(),
                        systemConfig.getThreadCount(),
                        0,
                        TimeUnit.MILLISECONDS,
                        new ArrayBlockingQueue(systemConfig.getThreadQueueSize()),
                        new BasicThreadFactory.Builder()
                                .namingPattern("async-sign-thread-%d")
                                .build(),
                        (r, executor) -> log.error("the async executor pool is full!!"));

        /** 啓動線程池的監控 */
        ThreadPoolMonitoring threadPoolMonitoring = new ThreadPoolMonitoring();
        threadPoolMonitoring.init();
    }

線程池的監控

/**
     * 功能說明:線程池監控
     *
     * @params
     * @return <br>
     *     修改歷史<br>
     *     [2019年06月14日 10:20:10 10:20] 建立方法by fengqingyang
     */
    public class ThreadPoolMonitoring {
        /** 用於週期性監控線程池的運行狀態 */
        private final ScheduledExecutorService scheduledExecutorService =
                Executors.newSingleThreadScheduledExecutor(
                        new BasicThreadFactory.Builder()
                                .namingPattern("async thread executor monitor")
                                .build());

        /**
         * 功能說明:自動運行監控
         *
         * @return <br>
         *     修改歷史<br>
         *     [2019年06月14日 10:26:51 10:26] 建立方法by fengqingyang
         * @params
         */
        public void init() {
            scheduledExecutorService.scheduleAtFixedRate(
                    () -> {
                        try {
                            ThreadPoolExecutor threadPoolExecutor =
                                    (ThreadPoolExecutor) executorService;
                            /** 線程池須要執行的任務數 */
                            long taskCount = threadPoolExecutor.getTaskCount();
                            /** 線程池在運行過程當中已完成的任務數 */
                            long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
                            /** 曾經建立過的最大線程數 */
                            long largestPoolSize = threadPoolExecutor.getLargestPoolSize();
                            /** 線程池裏的線程數量 */
                            long poolSize = threadPoolExecutor.getPoolSize();
                            /** 線程池裏活躍的線程數量 */
                            long activeCount = threadPoolExecutor.getActiveCount();
                            /** 當前排隊線程數 */
                            int queueSize = threadPoolExecutor.getQueue().size();
                            log.info(
                                    "async-executor monitor. taskCount:{}, completedTaskCount:{}, largestPoolSize:{}, poolSize:{}, activeCount:{},queueSize:{}",
                                    taskCount,
                                    completedTaskCount,
                                    largestPoolSize,
                                    poolSize,
                                    activeCount,
                                    queueSize);

                            /** 超過閥值的80%報警 */
                            if (activeCount >= systemConfig.getThreadCount() * 0.8) {
                                log.error(
                                        "async-executor monitor. taskCount:{}, completedTaskCount:{}, largestPoolSize:{}, poolSize:{}, activeCount:{},queueSize:{}",
                                        taskCount,
                                        completedTaskCount,
                                        largestPoolSize,
                                        poolSize,
                                        activeCount,
                                        queueSize);
                                ;
                            }
                        } catch (Exception ex) {
                            log.error("ThreadPoolMonitoring service error,{}", ex.getMessage());
                        }
                    },
                    0,
                    30,
                    TimeUnit.SECONDS);
        }
    }

六、須要注意的事項

  • 線程數要合理設置,通常建議值是核數的2倍。
  • 線程池隊列的類型和長度要根據業特性合理設置
  • 不一樣的業務須要線程池隔離,避免相互影響
  • 未每一個線程池增長特有的命名規範以及關鍵的日誌,方便出問題排查和優化

七、後續

更多精彩,敬請關注, 程序員導航網 https://chenzhuofan.top

相關文章
相關標籤/搜索