自定義線程池影響的線上事故

  做爲一個牛逼的程序員,相信你們確定是接觸過多線程的概念的。而且可能會在實際的工做中由於一些業務場景須要使用自定義線程池來執行批量的任務或對線程進行管理。一樣,咱們項目中也存在一個兩個場景須要使用線程池。而這兩個場景分別爲:java

一、持續監聽某個外部接口的不間斷的返回信息,其實就是長連接的阻塞接口,總共12種資源須要監聽,因此就意味須要12個不間斷的線程執行阻塞任務。程序員

二、RabbitMQ的消費者,由於須要應用啓動的時候就執行消息的消費,因此也經過線程池中獲取線程執行消費任務。多線程

1、先看線程池的定義

public class ThreadPoolUtil {

    private static Logger logger = LoggerFactory.getLogger(ThreadPoolUtil.class);

    private static volatile ThreadPoolExecutor threadPoolExecutor = null;
    /**
     * 建立
     * @return
     */
    private static AtomicInteger nextId = new AtomicInteger(0);
    public static ThreadPoolExecutor createExecutor(){

        int corePoolSize = 12; // 核心線程12個
        int maxPoolSize = 16; // 最大線程數 16個
        int keepAliveSeconds = 60; //閒置存活時間60秒
        BlockingQueue<Runnable> queue = new ArrayBlockingQueue(500); // 臨時隊列500個
        RejectedExecutionHandler rejectedExecutionHandler = (r, executor) -> logger.error("隊列已經滿了{},直接拒絕吧", executor.getTaskCount()); 

    // 同步代碼塊
synchronized (ThreadPoolUtil.class){ if (threadPoolExecutor != null){ return threadPoolExecutor; }         //  建立單例的線程池 threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveSeconds, TimeUnit.SECONDS, queue, r -> { String fileName = Thread.currentThread().getStackTrace()[5].getFileName(); // 獲取外部用戶層的調用棧信息 String threadName = fileName.substring(0,fileName.indexOf("."))+"-"; // 獲取調用棧的名稱,做爲線程的名稱 Thread thread = new Thread(r, threadName+nextId.incrementAndGet()); return thread; }, rejectedExecutionHandler); } return threadPoolExecutor; } }

  看看上面的線程池設計,好像是沒有啥問題的。若是是放在普通的可終結的任務使用當前線程池,理論上是沒有太大問題。可是!咱們的應用恰好這幾個任務都是阻塞的。阻塞就意味着線程是沒法回收的,其餘的任務使用這個線程池以後,就只能先放到隊列中,而後一直得不到釋放的線程資源執行。最終隊列積壓,任務被拋棄。ide

 

2、線上事故描述

  由於在初始化的時候,已經將 12 個監聽都啓動了,而且使用的是當前線程池構造工具。啓動完成以後,12個核心線程就一直被阻塞佔用。這12個資源的監聽仍是比較正常的,而且可以對監聽數據進行處理和執行。工具

  由於須要MQ消費端啓動的時候就能夠執行消費,因此在啓動的時候,設置了啓動配置類中調用上述工具建立線程池,至關於用新的線程執行消息監聽的動做。然而MQ卻遲遲不見消費的過程,致使消息隊列一直積壓。而且沒法完成正確的數據處理。測試

3、問題猜想及理論支撐

  猜想:沒有被消費,應該就是咱們的線程池中沒有空閒的線程進行消息監聽處理。初始化的時候的消費監聽的任務被直接丟棄到了線程池的任務隊列中,而這個線程池的任務隊列中數數據只有在兩種狀況下才可能被執行。spa

  第一種:線程池中有空閒的線程,能夠進行執行線程

  第二種:消息隊列滿了,開啓了加大了線程池的線程數以便執行堆積的任務設計

  而咱們的這個一步開啓MQ消費監聽的任務被髮送到線程池的時候,由於核心線程數就是 12 ,而咱們前面的資源監聽接口已經開啓了12個阻塞任務,因此就沒有了可用線程。因此被存放到了線程池待執行任務隊列中。可怕的是,咱們這個線程池的隊列大小爲500 ,很顯然 1 < 500 ,因此就沒法觸發線程加大的動做,致使這個隊列的任務「被遺忘」。3d

 

 理論支撐:

  線程池的核心參數包括: coreSize , maxSize, quauaSize,RejectedExecutionHandler

  分別爲:核心線程數,最大線程數,可積壓的任務數,拒絕策略

  當建立線程的時候,首先會先建立核心線程數等量的線程,好比上面就是 12個核心線程, 而當咱們的核心線程都在執行階段的時候,再次加入的任務就會被存放到任務隊列中。當任務不斷的增長而且幅度遠遠大於核心線程的處理速度,致使任務隊列存放到最大值,好比上面的500,那麼就須要增長線程數,此時就是須要增長線程數到最大值,好比上面的16,然而,增大了以後,發現已然不能處理消化任務的投放數量,這個時候就用不一樣的處理策略,好比上面的  rejectedExecutionHandler 就是直接丟棄。

  

  猜想和理論匹配一下的話就是:核心線程是12 ,這12個線程被資源監聽的阻塞任務佔用沒法釋放,而開啓消費監聽的任務被丟到了待執行的任務隊列中,此時,任務隊列又不知足益處的條件,因此就沒有增長新的線程來處理,以致於,這個建立消費監聽的任務就「被遺忘」了。

 

  如何進行論證呢?使用以下測試代碼

  

 public static void main(String[] args) {
        ThreadPoolExecutor executor = createExecutor();
      // 臨界值 分別設置12 16 512 518
        for (int i =0; i < $臨界值;i++){

            int finalI = i;
            executor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("當前任務序號爲:"finalI +" ,活躍線程數"+ executor.getActiveCount());
                    Thread.sleep(10000*1000); // 這就看成是持久的任務在執行
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
    }

   測試結果:

  臨界值爲 12, 核心線程恰好夠用

    

  臨界值爲 16 , 雖然任務數大於了核心線程,可是並無新建線程數。因此驗證任務被放到了隊列中,先使用隊列存放,隊列滿了再開新線程

   

  臨界值爲 512 任務數量大於  核心線程數  因此新任務放到了隊列中,且恰好不會有超出,不觸發新的線程建立

    

  臨界值爲 516 任務數量大於  ( 核心線程數 + 隊列大小 ) 全部活躍線程被加到最大

  

  臨界值爲 518, 任務數量大於  ( 隊列大小 + 最大線程數) 全部產生丟棄

  

 

 

4、如何解決當前事故

  出現這個問題以後,咱們直接就增長了核心線程的數量,以保證總體大於在阻塞任務的數量。好比咱們這個就是從新設置爲核心線程數量 16 > 12,

  同時,咱們將阻塞任務同非阻塞任務所建立的線程池進行隔離,以減小共用線程池形成的 正常任務被遺忘的可能性。

 

5、如何設置你的線程池大小

  那麼在開發中,如何設置i線程池的大小?其實這沒有特定的規範,須要結合本身任務的執行時間而考慮,

  可是最好提早考慮好,任務是否爲阻塞性任務,若是是的話,建議作好線程隔離。

  在咱們通常將核心線程設置爲 n + 1  (n 爲內核數量)

  最大線程數量設置 2n + 1  (n 爲內核數量) 

相關文章
相關標籤/搜索