ThreadPoolExecutor 是用來處理異步任務的一個接口,能夠將其理解成爲一個線程池和一個任務隊列,提交到 ExecutorService 對象的任務會被放入任務隊或者直接被線程池中的線程執行。ThreadPoolExecutor 支持經過調整構造參數來配置不一樣的處理策略,本文主要介紹經常使用的策略配置方法以及應用場景。異步
首先看一下 ThreadPoolExecutor 構造函數的定義:ide
public ThreadPoolExecutor(int corePoolSize, //線程池核心線程數量 int maximumPoolSize, //線程池最大線程數量 long keepAliveTime, //線程KeepAlive時間,當線程池數量超過核心線程數量之後,idle時間超過這個值的線程會被終止 TimeUnit unit, //線程KeepAlive時間單位 BlockingQueue<Runnable> workQueue, //任務隊列 ThreadFactory threadFactory, //建立線程的工廠對象 RejectedExecutionHandler handler) //任務被拒絕後調用的handler
ThreadPoolExecutor 對線程池和隊列的使用方式以下:函數
Executors 是提供了一組工廠方法用於建立經常使用的 ExecutorService ,分別是 FixedThreadPool,CachedThreadPool 以及 SingleThreadExecutor。這三種ThreadPoolExecutor都是調用 ThreadPoolExecutor 構造函數進行建立,區別在於參數不一樣。線程
下面是 Executors 類 newFixedThreadPool 方法的源碼:日誌
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
能夠看到 corePoolSize 和 maximumPoolSize 設置成了相同的值,此時不存在線程數量大於核心線程數量的狀況,因此KeepAlive時間設置不會生效。任務隊列使用的是不限制大小的 LinkedBlockingQueue ,因爲是無界隊列因此容納的任務數量沒有上限。code
所以,FixedThreadPool的行爲以下:對象
FixedThreadPool的優勢是可以保證全部的任務都被執行,永遠不會拒絕新的任務;同時缺點是隊列數量沒有限制,在任務執行時間無限延長的這種極端狀況下會形成內存問題。接口
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
這個工廠方法中使用無界LinkedBlockingQueue,並的將線程數設置成1,除此之外還使用FinalizableDelegatedExecutorService類進行了包裝。這個包裝類的主要目的是爲了屏蔽ThreadPoolExecutor中動態修改線程數量的功能,僅保留ExecutorService中提供的方法。雖然是單線程處理,一旦線程由於處理異常等緣由終止的時候,ThreadPoolExecutor會自動建立一個新的線程繼續進行工做。隊列
SingleThreadExecutor 適用於在邏輯上須要單線程處理任務的場景,同時無界的LinkedBlockingQueue保證新任務都可以放入隊列,不會被拒絕;缺點和FixedThreadPool相同,當處理任務無限等待的時候會形成內存問題。圖片
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
SynchronousQueue是一個只有1個元素的隊列,入隊的任務須要一直等待直到隊列中的元素被移出。核心線程數是0,意味着全部任務會先入隊列;最大線程數是Integer.MAX_VALUE,能夠認爲線程數量是沒有限制的。KeepAlive時間被設置成60秒,意味着在沒有任務的時候線程等待60秒之後退出。CachedThreadPool對任務的處理策略是提交的任務會當即分配一個線程進行執行,線程池中線程數量會隨着任務數的變化自動擴張和縮減,在任務執行時間無限延長的極端狀況下會建立過多的線程。
| 類型 | 核心線程數 | 最大線程數 | Keep Alive 時間 | 任務隊列 | 任務處理策略 | |--|---|-------------------|-----------------|---------------------| - | | FixedThreadPool | 固定大小 | 固定大小(與核心線程數相同) | 0 | LinkedBlockingQueue | 線程池大小固定,沒有可用線程的時候任務會放入隊列等待,隊列長度無限制 | | SingleThreadExecutor | 1 | 1 | 0 | LinkedBlockingQueue | 與 FixedThreadPool 相同,區別在於線程池的大小爲1,適用於業務邏輯上只容許1個線程進行處理的場景 | | CachedThreadPool | 0 | Integer.MAX_VALUE | 1分鐘 | SynchronousQueue | 線程池的數量無限大,新任務會直接分配或者建立一個線程進行執行 |
咱們也能夠經過修改 ThreadPoolExecutor 的構造函數來自定義任務處理策略。例如面對的業務是將數據異步寫入HBase,當HBase嚴重超時的時候容許寫入失敗並記錄日誌以便過後補寫。對於這種應用場景,若是使用FixedThreadPool,在HBase服務嚴重超時的時候會致使隊列無限增加,引起內存問題;若是使用CachedThreadPool,會致使線程數量無限增加。對於這種場景,咱們能夠設置ExecutorService使用帶有長度限制的隊列以及限定最大線程個數的線程池,同時經過設置RejectedExecutionHandler處理任務被拒絕的狀況。
首先定義 RejectedExecutionHandler:
public class MyRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // 處理任務被拒絕的狀況,例如記錄日誌等 } }
建立 ThreadPoolExecutor:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 10, //核心線程數設置成10 30, //線程池最大線程數爲30 30, TimeUnit.SECONDS, //超過核心線程數量的線程idle 30秒以後會退出 new ArrayBlockingQueue<Runnable>(100), //隊列長度爲100 new MyRejectedExecutionHandler() //任務被拒絕之後的處理類 );
這樣設置之後,若是任務處理函數出現長時間掛起的狀況,會依次發生下列現象: