ThreadPoolExecutor使用和思考-線程池大小設置與BlockingQueue的三種

工做中多處接觸到了ThreadPoolExecutor。趁着如今還算空,學習總結一下。java

前記:程序員

  1. jdk官方文檔(javadoc)是學習的最好,最權威的參考。
  2. 文章分上中下。上篇中主要介紹ThreadPoolExecutor接受任務相關的兩方面入參的意義和區別,池大小參數corePoolSize和maximumPoolSize,BlockingQueue選型(SynchronousQueue,LinkedBlockingQueue,ArrayBlockingQueue);中篇中主要聊聊與keepAliveTime這個參數相關的話題;下片中介紹一下一些比較少用的該類的API,及他的近親:ScheduledThreadPoolExecutor。
  3. 若是理解錯誤,請直接指出。

查看JDK幫助文檔,能夠發現該類比較簡單,繼承自AbstractExecutorService,而AbstractExecutorService實現了ExecutorService接口。服務器

ThreadPoolExecutor的完整構造方法的簽名是:多線程

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 函數

先記着,後面慢慢解釋。性能

===============================神奇分割線==================================學習

其實對於ThreadPoolExecutor的構造函數網上有N多的解釋的,大多講得都很好,不過我想先換個方式,從Executors這個類入手。由於他的幾個構造工廠構造方法名字取得使人很容易瞭解有什麼特色。可是其實Executors類的底層實現即是ThreadPoolExecutor!spa

ThreadPoolExecutor是Executors類的底層實現。操作系統

在JDK幫助文檔中,有如此一段話:線程

強烈建議程序員使用較爲方便的 Executors 工廠方法 Executors.newCachedThreadPool()(無界線程池,能夠進行自動線程回收)、Executors.newFixedThreadPool(int)(固定大小線程池)和Executors.newSingleThreadExecutor()(單個後臺線程),它們均爲大多數使用場景預約義了設置。

能夠推斷出ThreadPoolExecutor與Executors類必然關係密切。

===============================神奇分割線==================================

OK,那就來看看源碼吧,從newFixedThreadPool開始。

ExecutorService newFixedThreadPool(int nThreads):固定大小線程池。

能夠看到,corePoolSize和maximumPoolSize的大小是同樣的(實際上,後面會介紹,若是使用無界queue的話maximumPoolSize參數是沒有意義的),keepAliveTime和unit的設值表名什麼?-就是該實現不想keep alive!最後的BlockingQueue選擇了LinkedBlockingQueue,該queue有一個特色,他是無界的

Java代碼  

public static ExecutorService newFixedThreadPool(int nThreads) {  

        return new ThreadPoolExecutor(nThreads, nThreads,  

                                      0L, TimeUnit.MILLISECONDS,  

                                      new LinkedBlockingQueue<Runnable>());  

      }  

ExecutorService newSingleThreadExecutor():單線程。

能夠看到,與fixedThreadPool很像,只不過fixedThreadPool中的入參直接退化爲1

Java代碼   收藏代碼

  1. public static ExecutorService newSingleThreadExecutor() {  
  2.         return new FinalizableDelegatedExecutorService  
  3.             (new ThreadPoolExecutor(11,  
  4.                                     0L, TimeUnit.MILLISECONDS,  
  5.                                     new LinkedBlockingQueue<Runnable>()));  
  6.     }  

ExecutorService newCachedThreadPool():無界線程池,能夠進行自動線程回收。

這個實現就有意思了。首先是無界的線程池,因此咱們能夠發現maximumPoolSize爲big big。其次BlockingQueue的選擇上使用SynchronousQueue。可能對於該BlockingQueue有些陌生,簡單說:該QUEUE中,每一個插入操做必須等待另外一個

線程的對應移除操做。好比,我先添加一個元素,接下來若是繼續想嘗試添加則會阻塞,直到另外一個線程取走一個元素,反之亦然。(想到什麼?就是緩衝區爲1的生產者消費者模式^_^)

注意到介紹中的自動回收線程的特性嗎,爲何呢?先不說,但注意到該實現中corePoolSize和maximumPoolSize的大小不一樣。

Java代碼   收藏代碼

public static ExecutorService newCachedThreadPool() {          

  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  60L, TimeUnit.SECONDS,                                        new SynchronousQueue<Runnable>());      

}  

===============================神奇分割線==================================

到此若是有不少疑問,那是必然了(除非你也很瞭解了)

先從BlockingQueue<Runnable> workQueue這個入參開始提及。在JDK中,其實已經說得很清楚了,一共有三種類型的queue。如下爲引用:(我會稍微修改一下,並用紅色突出顯示)

全部  BlockingQueue 均可用於傳輸和保持提交的任務。可使用此隊列與池大小進行交互:

  • 若是運行的線程少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。(什麼意思?若是當前運行的線程小於corePoolSize,則任務根本不會存放,添加到queue中,而是直接抄傢伙(thread)開始運行
  • 若是運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列而不添加新的線程
  • 若是沒法將請求加入隊列,則建立新的線程,除非建立此線程超出 maximumPoolSize,在這種狀況下,任務將被拒絕。

先不着急舉例子,由於首先須要知道queue上的三種類型。

排隊有三種通用策略:

  1. 直接提交。工做隊列的默認選項是 SynchronousQueue,它將任務直接提交給線程而不保持它們。在此,若是不存在可用於當即運行任務的線程,則試圖把任務加入隊列將失敗,所以會構造一個新的線程。此策略能夠避免在處理可能具備內部依賴性的請求集時出現鎖。直接提交一般要求無界 maximumPoolSizes 以免拒絕新提交的任務。當命令以超過隊列所能處理的平均數連續到達時,此策略容許無界線程具備增加的可能性。
  2. 無界隊列。使用無界隊列(例如,不具備預約義容量的 LinkedBlockingQueue)將致使在全部 corePoolSize 線程都忙時新任務在隊列中等待。這樣,建立的線程就不會超過 corePoolSize。(所以,maximumPoolSize 的值也就無效了。)當每一個任務徹底獨立於其餘任務,即任務執行互不影響時,適合於使用無界隊列;例如,在 Web 頁服務器中。這種排隊可用於處理瞬態突發請求,當命令以超過隊列所能處理的平均數連續到達時,此策略容許無界線程具備增加的可能性。
  3. 有界隊列。當使用有限的 maximumPoolSizes 時,有界隊列(如 ArrayBlockingQueue)有助於防止資源耗盡,可是可能較難調整和控制。隊列大小和最大池大小可能須要相互折衷:使用大型隊列和小型池能夠最大限度地下降 CPU 使用率、操做系統資源和上下文切換開銷,可是可能致使人工下降吞吐量。若是任務頻繁阻塞(例如,若是它們是 I/O 邊界),則系統可能爲超過您許可的更多線程安排時間。使用小型隊列一般要求較大的池大小,CPU 使用率較高,可是可能遇到不可接受的調度開銷,這樣也會下降吞吐量。  

===============================神奇分割線==================================

到這裏,該瞭解的理論已經夠多了,能夠調節的就是corePoolSize和maximumPoolSizes 這對參數還有就是BlockingQueue的選擇。

例子一:使用直接提交策略,也即SynchronousQueue。

首先SynchronousQueue是無界的,也就是說他存數任務的能力是沒有限制的,可是因爲該Queue自己的特性在某次添加元素後必須等待其餘線程取走後才能繼續添加。在這裏不是核心線程即是新建立的線程,可是咱們試想同樣下,下面的場景。

咱們使用一下參數構造ThreadPoolExecutor:

Java代碼   收藏代碼

  1. new ThreadPoolExecutor(  
  2.             2330, TimeUnit.SECONDS,   
  3.             new <span style="white-space: normal;">SynchronousQueue</span><Runnable>(),   
  4.             new RecorderThreadFactory("CookieRecorderPool"),   
  5.             new ThreadPoolExecutor.CallerRunsPolicy());  

 當核心線程已經有2個正在運行.

  1. 此時繼續來了一個任務(A),根據前面介紹的「若是運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列而不添加新的線程。」,因此A被添加到queue中。
  2. 又來了一個任務(B),且核心2個線程尚未忙完,OK,接下來首先嚐試1中描述,可是因爲使用的SynchronousQueue,因此必定沒法加入進去。
  3. 此時便知足了上面提到的「若是沒法將請求加入隊列,則建立新的線程,除非建立此線程超出maximumPoolSize,在這種狀況下,任務將被拒絕。」,因此必然會新建一個線程來運行這個任務。
  4. 暫時還能夠,可是若是這三個任務都還沒完成,連續來了兩個任務,第一個添加入queue中,後一個呢?queue中沒法插入,而線程數達到了maximumPoolSize,因此只好執行異常策略了。

因此在使用SynchronousQueue一般要求maximumPoolSize是無界的,這樣就能夠避免上述狀況發生(若是但願限制就直接使用有界隊列)。對於使用SynchronousQueue的做用jdk中寫的很清楚: 此策略能夠避免在處理可能具備內部依賴性的請求集時出現鎖

什麼意思?若是你的任務A1,A2有內部關聯,A1須要先運行,那麼先提交A1,再提交A2,當使用SynchronousQueue咱們能夠保證,A1一定先被執行,在A1麼有被執行前,A2不可能添加入queue中

例子二:使用無界隊列策略,即LinkedBlockingQueue

這個就拿 newFixedThreadPool來講,根據前文提到的規則:

 寫道

若是運行的線程少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。

 那麼當任務繼續增長,會發生什麼呢?

 寫道

若是運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。

 OK,此時任務變加入隊列之中了,那何時纔會添加新線程呢?

 寫道

若是沒法將請求加入隊列,則建立新的線程,除非建立此線程超出 maximumPoolSize,在這種狀況下,任務將被拒絕。

這裏就頗有意思了,可能會出現沒法加入隊列嗎?不像SynchronousQueue那樣有其自身的特色,對於無界隊列來講,老是能夠加入的(資源耗盡,固然另當別論)。換句說,永遠也不會觸發產生新的線程!corePoolSize大小的線程數會一直運行,忙完當前的,就從隊列中拿任務開始運行。因此要防止任務瘋長,好比任務運行的實行比較長,而添加任務的速度遠遠超過處理任務的時間,並且還不斷增長,若是任務內存大一些,不一下子就爆了,呵呵。

能夠仔細想一想哈。

例子三:有界隊列,使用ArrayBlockingQueue。

這個是最爲複雜的使用,因此JDK不推薦使用也有些道理。與上面的相比,最大的特色即是能夠防止資源耗盡的狀況發生。

舉例來講,請看以下構造方法:

Java代碼   收藏代碼

  1. new ThreadPoolExecutor(  
  2.             2430, TimeUnit.SECONDS,   
  3.             new ArrayBlockingQueue<Runnable>(2),   
  4.             new RecorderThreadFactory("CookieRecorderPool"),   
  5.             new ThreadPoolExecutor.CallerRunsPolicy());  

假設,全部的任務都永遠沒法執行完。

對於首先來的A,B來講直接運行,接下來,若是來了C,D,他們會被放到queu中,若是接下來再來E,F,則增長線程運行E,F。可是若是再來任務,隊列沒法再接受了,線程數也到達最大的限制了,因此就會使用拒絕策略來處理。

總結:

  1. ThreadPoolExecutor的使用仍是頗有技巧的。
  2. 使用無界queue可能會耗盡系統資源。
  3. 使用有界queue可能不能很好的知足性能,須要調節線程數和queue大小
  4. 線程數天然也有開銷,因此須要根據不一樣應用進行調節。

一般來講對於靜態任務能夠歸爲:

  1. 數量大,可是執行時間很短
  2. 數量小,可是執行時間較長
  3. 數量又大執行時間又長
  4. 除了以上特色外,任務間還有些內在關係

看完這篇問文章後,但願可以能夠選擇合適的類型了

相關文章
相關標籤/搜索