Java Executor併發框架(三)ThreadPoolExecutor 隊列緩存策略

前面兩篇講解了線程池中線程建立後的運行狀況,其中有一系列的策略來保證線程正常運行。可是咱們知道線程池是能夠設置容量的,並且這容量的設置也是相當重要的,若是容量設置的過小,那麼將會影響系統的運行效率,若是設置的過大,也可能形成無止盡的線程堆積,最終形成系統內存溢出。對於此,線程池也提供了一些設置來防止這些現象。下面咱們將會介紹。java

線程初始化

當咱們建立線程池後,若是沒有新任務進來的話,默認是沒有線程的,提交任務後線程池纔會建立新的線程。若是你想建立線程池時就初始化corePoolSize數量的線程的話,線程池提供瞭如下兩個方法:數據庫

  • prestartCoreThread() : 當即初始化一個線程
  • prestartAllCoreThreads():當即初始化corePoolSize數量的線程

如下是具體方法實現:數組

public int prestartAllCoreThreads() {
        int n = 0;
        while (addIfUnderCorePoolSize(null))
            ++n;
        return n;
    }

public boolean prestartCoreThread() {
        return addIfUnderCorePoolSize(null);
    }

底層都是調用 addIfUnderCorePoolSize() 方法,上一篇有講過,若是傳入的參數爲null的話,則最後執行線程會阻塞在getTask方法中的,由於要等待堵塞隊列中有任務到達。緩存

任務堵塞隊列

當線程池池建立的線程數量大於 corePoolSize 後,新來的任務將會加入到堵塞隊列(workQueue)中等待有空閒線程來執行。workQueue的類型爲BlockingQueue ,一般能夠取下面三種類型: 服務器

  1. ArrayBlockingQueue:基於數組的FIFO隊列,是有界的,建立時必須指定大小
  2. LinkedBlockingQueue: 基於鏈表的FIFO隊列,是無界的,默認大小是 Integer.MAX_VALUE
  3. synchronousQueue:一個比較特殊的隊列,雖然它是無界的,但它不會保存任務,每個新增任務的線程必須等待另外一個線程取出任務,也能夠把它當作容量爲0的隊列

全部 BlockingQueue 均可用於傳輸和保持提交的任務。可使用此隊列與池大小進行交互:多線程

若是運行的線程少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。(若是當前運行的線程小於corePoolSize,則任務根本不會存放,添加到queue中,而是直接抄傢伙(thread)開始運行)操作系統

若是運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程線程

若是沒法將請求加入隊列,則建立新的線程,除非建立此線程超出 maximumPoolSize,在這種狀況下,任務將被拒絕。rest

排隊有三種通用策略:code

直接提交。工做隊列的默認選項是 SynchronousQueue,它將任務直接提交給線程而不保持它們。在 此,若是不存在可用於當即運行任務的線程,則試圖把任務加入隊列將失敗,所以會構造一個新的線程。此策略能夠避免在處理可能具備內部依賴性的請求集時出現 鎖。直接提交一般要求無界 maximumPoolSizes 以免拒絕新提交的任務。當命令以超過隊列所能處理的平均數連續到達時,此策略容許無界線 程具備增加的可能性。

無界隊列。使用無界隊列(例如,不具備預約義容量的 LinkedBlockingQueue)將致使在所 有 corePoolSize 線程都忙時新任務在隊列中等待。這樣,建立的線程就不會超過 corePoolSize。(因 此,maximumPoolSize 的值也就無效了。)當每一個任務徹底獨立於其餘任務,即任務執行互不影響時,適合於使用無界隊列;例如, 在 Web 頁服務器中。這種排隊可用於處理瞬態突發請求,當命令以超過隊列所能處理的平均數連續到達時,此策略容許無界線程具備增加的可能性。

有界隊列。當使用有限的 maximumPoolSizes 時,有界隊列 (如 ArrayBlockingQueue)有助於防止資源耗盡,可是可能較難調整和控制。隊列大小和最大池大小可能須要相互折衷:使用大型隊列和小型 池能夠最大限度地下降 CPU 使用率、操做系統資源和上下文切換開銷,可是可能致使人工下降吞吐量。若是任務頻繁阻塞(例如,若是它們是 I/O 邊 界),則系統可能爲超過您許可的更多線程安排時間。使用小型隊列一般要求較大的池大小,CPU 使用率較高,可是可能遇到不可接受的調度開銷,這樣也會降 低吞吐量。

BlockingQueue的選擇。

例子一:使用直接提交策略,也即SynchronousQueue。

首先SynchronousQueue是無界的,也就是說他存數任務的能力是沒有限制的,可是因爲該Queue自己的特性,在某次添加元素後必須等待其餘線程取走後才能繼續添加。在這裏不是核心線程即是新建立的線程,可是咱們試想同樣下,下面的場景。

咱們使用一下參數構造ThreadPoolExecutor:

new ThreadPoolExecutor(
   2, 3, 30, TimeUnit.SECONDS,
   new SynchronousQueue<Runnable>(),
   new RecorderThreadFactory("CookieRecorderPool"),
   new ThreadPoolExecutor.CallerRunsPolicy());

當核心線程已經有2個正在運行.

  1. 此時繼續來了一個任務(A),根據前面介紹的「若是運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。」,因此A被添加到queue中。
  2. 又來了一個任務(B),且核心2個線程尚未忙完,OK,接下來首先嚐試1中描述,可是因爲使用的SynchronousQueue,因此必定沒法加入進去。
  3. 此時便知足了上面提到的「若是沒法將請求加入隊列,則建立新的線程,除非建立此線程超出maximumPoolSize,在這種狀況下,任務將被拒絕。」,因此必然會新建一個線程來運行這個任務。
  4. 暫時還能夠,可是若是這三個任務都還沒完成,連續來了兩個任務,第一個添加入queue中,後一個呢?queue中沒法插入,而線程數達到了maximumPoolSize,因此只好執行異常策略了。

因此在使用SynchronousQueue一般要求maximumPoolSize是無界的,這樣就能夠避免上述狀況發生(若是但願限制就直接使 用有界隊列)。對於使用SynchronousQueue的做用jdk中寫的很清楚:此策略能夠避免在處理可能具備內部依賴性的請求集時出現鎖。

什麼意思?若是你的任務A1,A2有內部關聯,A1須要先運行,那麼先提交A1,再提交A2,當使用SynchronousQueue咱們能夠保證,A1一定先被執行,在A1麼有被執行前,A2不可能添加入queue中。

例子二:使用無界隊列策略,即LinkedBlockingQueue

這個就拿newFixedThreadPool來講,根據前文提到的規則:

若是運行的線程少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。那麼當任務繼續增長,會發生什麼呢?

若是運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。OK,此時任務變加入隊列之中了,那何時纔會添加新線程呢?

若是沒法將請求加入隊列,則建立新的線程,除非建立此線程超出 maximumPoolSize,在這種狀況下,任務將被拒絕。這裏就頗有意思了, 可能會出現沒法加入隊列嗎?不像SynchronousQueue那樣有其自身的特色,對於無界隊列來講,老是能夠加入的(資源耗盡,固然另當別論)。換 句說,永遠也不會觸發產生新的線程!corePoolSize大小的線程數會一直運行,忙完當前的,就從隊列中拿任務開始運行。因此要防止任務瘋長,好比 任務運行的實行比較長,而添加任務的速度遠遠超過處理任務的時間,並且還不斷增長,不一下子就爆了。

例子三:有界隊列,使用ArrayBlockingQueue。

這個是最爲複雜的使用,因此JDK不推薦使用也有些道理。與上面的相比,最大的特色即是能夠防止資源耗盡的狀況發生。

舉例來講,請看以下構造方法:

new ThreadPoolExecutor(
     2, 4, 30, TimeUnit.SECONDS,
     new ArrayBlockingQueue<Runnable>(2),
     new RecorderThreadFactory("CookieRecorderPool"),
     new ThreadPoolExecutor.CallerRunsPolicy());

假設,全部的任務都永遠沒法執行完。

對於首先來的A,B來講直接運行,接下來,若是來了C,D,他們會被放到queue中,若是接下來再來E,F,則增長線程運行E,F。可是若是再來任務,隊列沒法再接受了,線程數也到達最大的限制了,因此就會使用拒絕策略來處理。

keepAliveTime

jdk中的解釋是:當線程數大於核心時,此爲終止前多餘的空閒線程等待新任務的最長時間。

有點拗口,其實這個不難理解,在使用了「池」的應用中,大多都有相似的參數須要配置。好比數據庫鏈接池,DBCP中的maxIdle,minIdle參數。

什麼意思?接着上面的解釋,後來向老闆派來的工人始終是「借來的」,俗話說「有借就有還」,但這裏的問題就是何時還了,若是借來的工人剛完成一個任務就還回去,後來發現任務還有,那豈不是又要去借?這一來一往,老闆確定頭也大死了。

合理的策略:既然借了,那就多借一下子。直到「某一段」時間後,發現再也用不到這些工人時,即可以還回去了。這裏的某一段時間即是keepAliveTime的含義,TimeUnit爲keepAliveTime值的度量。

任務拒絕策略

  線程池堵塞隊列容量滿以後,將會直接新建線程,數量等於 maximumPoolSize 後,將會執行任務拒絕策略不在接受任務,有如下四種拒絕策略:

  1. ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。
  2. ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,可是不拋出異常。
  3. ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程)
  4. ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

線程池的關閉

  ThreadPoolExecutor提供了兩個方法,用於線程池的關閉,分別是shutdown()和shutdownNow(),其中:

  • shutdown():不會當即終止線程池,而是要等全部任務緩存隊列中的任務都執行完後才終止,但不再會接受新的任務
  • shutdownNow():當即終止線程池,並嘗試打斷正在執行的任務,而且清空任務緩存隊列,返回還沒有執行的任務

線程池容量的動態調整

ThreadPoolExecutor提供了動態調整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

  • setCorePoolSize:設置核心池大小
  • setMaximumPoolSize:設置線程池最大能建立的線程數目大小

  當上述參數從小變大時,ThreadPoolExecutor進行線程賦值,還可能當即建立新的線程來執行任務。

相關文章
相關標籤/搜索