ThreadPoolExecutor策略配置以及應用場景

ThreadPoolExecutor 是用來處理異步任務的一個接口,能夠將其理解成爲一個線程池和一個任務隊列,提交到 ExecutorService 對象的任務會被放入任務隊或者直接被線程池中的線程執行。ThreadPoolExecutor 支持經過調整構造參數來配置不一樣的處理策略,本文主要介紹經常使用的策略配置方法以及應用場景。異步

ThreadPoolExecutor 的處理邏輯

首先看一下 ThreadPoolExecutor 構造函數的定義:ide

public ThreadPoolExecutor(int corePoolSize,  //線程池核心線程數量
                              int maximumPoolSize,   //線程池最大線程數量
                              long keepAliveTime,      //線程KeepAlive時間,當線程池數量超過核心線程數量之後,idle時間超過這個值的線程會被終止
                              TimeUnit unit,              //線程KeepAlive時間單位
                              BlockingQueue<Runnable> workQueue,    //任務隊列
                              ThreadFactory threadFactory,                  //建立線程的工廠對象
                              RejectedExecutionHandler handler)          //任務被拒絕後調用的handler

ThreadPoolExecutor 對線程池和隊列的使用方式以下:函數

  1. 從線程池中獲取可用線程執行任務,若是沒有可用線程則使用ThreadFactory建立新的線程,直到線程數達到corePoolSize限制
  2. 線程池線程數達到corePoolSize之後,新的任務將被放入隊列,直到隊列不能再容納更多的任務
  3. 當隊列不能再容納更多的任務之後,會建立新的線程,直到線程數達到maxinumPoolSize限制
  4. 線程數達到maxinumPoolSize限制之後新任務會被拒絕執行,調用 RejectedExecutionHandler 進行處理

三種經常使用的 ThreadPoolExecutor

Executors 是提供了一組工廠方法用於建立經常使用的 ExecutorService ,分別是 FixedThreadPool,CachedThreadPool 以及 SingleThreadExecutor。這三種ThreadPoolExecutor都是調用 ThreadPoolExecutor 構造函數進行建立,區別在於參數不一樣。線程

FixedThreadPool - 線程池大小固定,任務隊列無界

下面是 Executors 類 newFixedThreadPool 方法的源碼:日誌

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

能夠看到 corePoolSize 和 maximumPoolSize 設置成了相同的值,此時不存在線程數量大於核心線程數量的狀況,因此KeepAlive時間設置不會生效。任務隊列使用的是不限制大小的 LinkedBlockingQueue ,因爲是無界隊列因此容納的任務數量沒有上限。code

所以,FixedThreadPool的行爲以下:對象

  1. 從線程池中獲取可用線程執行任務,若是沒有可用線程則使用ThreadFactory建立新的線程,直到線程數達到nThreads
  2. 線程池線程數達到nThreads之後,新的任務將被放入隊列

FixedThreadPool的優勢是可以保證全部的任務都被執行,永遠不會拒絕新的任務;同時缺點是隊列數量沒有限制,在任務執行時間無限延長的這種極端狀況下會形成內存問題。接口

SingleThreadExecutor - 線程池大小固定爲1,任務隊列無界

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

這個工廠方法中使用無界LinkedBlockingQueue,並的將線程數設置成1,除此之外還使用FinalizableDelegatedExecutorService類進行了包裝。這個包裝類的主要目的是爲了屏蔽ThreadPoolExecutor中動態修改線程數量的功能,僅保留ExecutorService中提供的方法。雖然是單線程處理,一旦線程由於處理異常等緣由終止的時候,ThreadPoolExecutor會自動建立一個新的線程繼續進行工做。隊列

輸入圖片說明

SingleThreadExecutor 適用於在邏輯上須要單線程處理任務的場景,同時無界的LinkedBlockingQueue保證新任務都可以放入隊列,不會被拒絕;缺點和FixedThreadPool相同,當處理任務無限等待的時候會形成內存問題。圖片

CachedThreadPool - 線程池無限大(MAX INT),等待隊列長度爲1

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

SynchronousQueue是一個只有1個元素的隊列,入隊的任務須要一直等待直到隊列中的元素被移出。核心線程數是0,意味着全部任務會先入隊列;最大線程數是Integer.MAX_VALUE,能夠認爲線程數量是沒有限制的。KeepAlive時間被設置成60秒,意味着在沒有任務的時候線程等待60秒之後退出。CachedThreadPool對任務的處理策略是提交的任務會當即分配一個線程進行執行,線程池中線程數量會隨着任務數的變化自動擴張和縮減,在任務執行時間無限延長的極端狀況下會建立過多的線程。

三種ExecutorService特性總結

| 類型 | 核心線程數 | 最大線程數 | Keep Alive 時間 | 任務隊列 | 任務處理策略 | |--|---|-------------------|-----------------|---------------------| - | | FixedThreadPool | 固定大小 | 固定大小(與核心線程數相同) | 0 | LinkedBlockingQueue | 線程池大小固定,沒有可用線程的時候任務會放入隊列等待,隊列長度無限制 | | SingleThreadExecutor | 1 | 1 | 0 | LinkedBlockingQueue | 與 FixedThreadPool 相同,區別在於線程池的大小爲1,適用於業務邏輯上只容許1個線程進行處理的場景 | | CachedThreadPool | 0 | Integer.MAX_VALUE | 1分鐘 | SynchronousQueue | 線程池的數量無限大,新任務會直接分配或者建立一個線程進行執行 |

自定義ThreadPoolExecutor

咱們也能夠經過修改 ThreadPoolExecutor 的構造函數來自定義任務處理策略。例如面對的業務是將數據異步寫入HBase,當HBase嚴重超時的時候容許寫入失敗並記錄日誌以便過後補寫。對於這種應用場景,若是使用FixedThreadPool,在HBase服務嚴重超時的時候會致使隊列無限增加,引起內存問題;若是使用CachedThreadPool,會致使線程數量無限增加。對於這種場景,咱們能夠設置ExecutorService使用帶有長度限制的隊列以及限定最大線程個數的線程池,同時經過設置RejectedExecutionHandler處理任務被拒絕的狀況。

首先定義 RejectedExecutionHandler:

public class MyRejectedExecutionHandler implements RejectedExecutionHandler {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            // 處理任務被拒絕的狀況,例如記錄日誌等
        }
    }

建立 ThreadPoolExecutor:

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                10,                                                               //核心線程數設置成10
                30,                                                              //線程池最大線程數爲30
                30, TimeUnit.SECONDS,                              //超過核心線程數量的線程idle 30秒以後會退出
                new ArrayBlockingQueue<Runnable>(100),       //隊列長度爲100
                new MyRejectedExecutionHandler()                //任務被拒絕之後的處理類
        );

這樣設置之後,若是任務處理函數出現長時間掛起的狀況,會依次發生下列現象:

  1. 線程池線程數量達到核心線程數,向ArrayBlockingQueue中放入任務
  2. ArrayBlockingQueue達到上限,建立新的線程進行處理
  3. 線程池中的線程數量達到30個,調用MyRejectedExecutionHandler處理新提交的任務

總結

  • 對於須要保證全部提交的任務都要被執行的狀況,使用FixedThreadPool
  • 若是限定只能使用一個線程進行任務處理,使用SingleThreadExecutor
  • 若是但願提交的任務儘快分配線程執行,使用CachedThreadPool
  • 若是業務上容許任務執行失敗,或者任務執行過程可能出現執行時間過長進而影響其餘業務的應用場景,能夠經過使用限定線程數量的線程池以及限定長度的隊列進行容錯處理。
相關文章
相關標籤/搜索