ThreadPoolExecutor線程池

一:類繼承結構安全

                            繼承關係服務器

二:構造函數併發

                                                   構造函數less

(1)線程池的大小除了顯示的限制外,還可能因爲其餘資源上的約束而存在一些隱式限制。好比JDBC鏈接池。函數

(2)運行時間較長的任務。工具

    若是任務阻塞的時間過長,即便不出現死鎖,線程池的響應性也會變得糟糕。執行時間較長的任務不只會形成線程池阻塞,甚至還會增長執行時間。若是線程池中線程的數量遠小於在穩定狀態下執行時間較長任務的數量,那麼到最後可能全部的線程都會運行這些執行時間較長的任務,從而影響總體的響應性。性能

    有一項技術能夠緩解執行時間較長任務形成的影響,即限定任務等待資源的時間,而不要無限制地等待。在平臺類庫的大多數可阻塞方法中,都同時定義了限時版本和無限時版本,例如:Thread.join,BlockingQueue.put,CountDownLatch.await以及Selector.select等。若是等待超時,能夠把任務標識爲失敗,而後停止任何或者將任務從新放回隊列以便隨後執行。若是在線程池中老是充滿了唄阻塞的任務,那麼也可能表示線程池的規模太小。spa

 

(3)設置線程池的大小線程

(3.1)線程池的理想大小取決於被提交任務的類型以及所部署系統的特性。在代碼中不會固定線程池的大小,而應該經過某種配置機制來提供,或者根據Runtime.availableProcessors來動態計算。調試

(3.2)要設置線程池的大小也並不困難,只須要避免「過大」或「太小」這兩種極端狀況。若是設置過大,那麼大量的線程將在相對不多的CPU和內存資源上發生競爭,這不只會致使更高的內存使用量,並且還可能耗盡資源。若是設置太小,那麼將致使不少空閒的處理器沒法執行工做,從而下降吞吐率。

(3.3)要想正確設置線程池的大小,必須分析計算環境,資源預算和任務的特性。在部署的系統中有多少CPU?多大的內存?計算是計算密集型、I/O密集型仍是兩者皆可?它們是否須要像JDBC鏈接這樣的稀缺資源?若是須要執行不一樣類別的任務,而且它們之間的行爲相差很大,那麼應該考慮使用多個線程池,從而使每一個線程池能夠根據本身的工做負載來調整。

(3.4)對於計算密集型的任務,在擁有Ncpu個處理器的系統上,當線程池的大小爲Ncpu+1時,一般能實現最優的利用率。(計算當計算密集型的線程偶爾因爲頁缺失故障或者其餘緣由而暫停時,這個「額外」的線程也能確保CPU的時鐘週期不會被浪費)

(3.5)對於包含I/O操做或者其餘阻塞操做的任務,你必須估算出任務的等待時間與計算時間的比值。這種估算不須要很精確,而且能夠經過一些分析或者監控工具來得到。你還能夠經過另外一種方法來調節線程池的大小:在某個基準負載下,分別設置不一樣大小的線程池來運行應用程序,並觀察CPU利用率的水平。給定以下列定義:

                      Ncpu = number of CPUs

                     Ucpu = target CPU utilization,0 <= Ucpu <= 1

                     W/C = ratio of wait time to compute time

      要使處理器達到指望的使用率,線程池的最優大小等於:

                    Nthreads = Ncpu * Ucpu * (1 + W/C)

     能夠經過Runtime來得到CPU的數目:

                   int N_CPUS = Runtime.getRuntime().availableProcessors();

 

(4)參數解析

  • corePoolSize

    線程池的基本大小(線程池的目標大小),默認狀況下會一直存活,即便處於閒置狀態也不會受keepAliveTime限制,除非將allowCoreThreadTimeOut設置爲true。

  • maximumPoolSize

    最大線程池大小,表示可同時活動的線程數量的上限。

  • keepAliveTime

    若是某個線程的空閒時間超過了存活時間,那麼將被標記爲可回收的,而且當線程池的當前大小超過了基本大小時,這個線程將被終止。

分析:線程池的基本大小(corePoolSize)、最大大小(maximumPoolSize)以及存活時間等因素共同負責線程的建立與銷燬。經過調節線程池的基本大小和存活時間,能夠幫助線程池回收空閒線程佔用的資源,從而使得這些資源能夠用於執行其餘工做。

 

三:基本實現

(1)newCachedThreadPool()

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

 

    線程池的基本大小設置爲零,最大大小設置爲Integer.MAX_VALUE,線程池能夠被無限擴展,需求下降時自動收縮,最大大小設置過大在某些狀況下也是缺點。

(2)newFixedThreadPool(int nThreads)

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

 

    缺點是LinkedBlockingQueue是無界隊列,有些狀況下排隊的任務會不少。

(3)newScheduledThreadExecutor()

   

/**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

/**
     * Creates a new {@code ScheduledThreadPoolExecutor} with the
     * given core pool size.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

 

(4)newSingleThreadExecutor()

    

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

 

總結:都由Executors類的靜態方法統一提供,如Executors.newCachedThreadPool(),底層經過ThreadPoolExecutor來實現。ThreadPoolExecutor提供了不少構造函數,它是一個靈活的、穩定的線程池,容許進行各類定製。

 

四:管理隊列任務

(1)單線程的Executor是一種值得注意的特例:它們能確保不會有任務併發執行,由於它們經過線程封閉來實現線程安全性。

(2)若是無限制地建立線程,將會致使不穩定性。能夠經過採用固定大小的線程池(而不是每收到一個請求就建立一個新線程)來解決這個問題。然而,這個方案並不完整。在高負載的狀況下,應用程序仍可能耗盡資源,知識問題的機率較小。若是新請求的到達超過了線程池的處理效率,那麼新到來的請求將累計起來。在線程池中,這些請求會在一個由Executor管理的Runable隊列中等待,而不會像線程那樣去競爭CPU資源,經過一個Runnable和一個鏈表節點來表現一個等待中的任務,固然比使用線程來表示的開銷低得多,但若是客戶提交給服務器請求的效率超過了服務器的處理效率,那麼仍可能會耗盡資源。

(3)ThreadPoolExecutor容許提供一個BlockingQueue來保存等待執行的任務。基本的任務排隊方法有3種:無界隊列、有界隊列和同步移交(Synchronous Handoff)。隊列的選擇與其餘的配置參數有關,例如線程池的大小。

(4)newFixedThreadPool和newSingleThreadExecutor在默認的狀況下將使用一個無界的LinkedBlockingQueue。若是全部工做者線程都處於忙碌狀態,那麼任務將在隊列中等候。若是任務持續快速地到達,而且超過了線程池處理它們的速度,那麼隊列將無限制地增長。

(5)一種更穩妥的資源管理策略是使用有界隊列,例如ArrayBlockingQueue、有界的LinkedBlockQueue、PriorityBlockingQueue。有界隊列有助於避免資源耗盡的狀況發生,但它又帶來了新的問題:當隊列填滿後,新的任務該怎麼辦?(有許多飽和策略能夠解決這個問題)在使用有界

的工做隊列時,隊列的大小和線程池的大小必須一塊兒調節。若是線程池較小而隊列較大,那麼有助於減小內存使用量,下降CPU的使用率,同時還能夠減小上下文切換,但付出的代價是可能會限制吞吐量。

(6)對於很是大的或者無界的線程池,能夠經過使用SynchronousQueue來避免任務排隊,以及直接將任務從生產者移交給工做者線程。SynchronousQueue不是一個真正的隊列,而是一種在線程之間進行移交的機制。要將一個元素放入SynchronousQueue中,必須有另外一個線程正在等待接受這個元素。

(7)當使用像LinkedBlockingQueue或ArrayBlockingQueue這樣的FIFO(先進先出)隊列時,任務的執行順序與它們的到達順序相同。若是想進一步控制任務執行順序,還可使用PriorityBlockingQueue,這個隊列將根據優先級來安排任務。

(8)只有當任務相互獨立時,爲線程池或工做隊列設置界限纔是合理的。若是任務之間存在依賴性,那麼有界的線程池或隊列就可能致使線程「飢餓」死鎖問題。此時應該使用無界的線程池,例如newCacheThreadPool。

 

五:飽和策略

(1)當有界隊列被填滿後,飽和策略開始發揮做用。ThreadPoolExecutor的飽和策略能夠經過調用setRejectedExecutionHander來修改。(若是某個任務被提交到一個已被關閉的Executor時,也會用到飽和策略。)JDK提供了幾種不一樣的RejectedExecutionHandler實現,每種實現都包含不一樣的飽和策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy和DiscardOldestPolicy。

(2)停止(Abort)策略是默認的飽和策略,該策略將拋出未檢查的RejectedExecutionException。調用者能夠捕獲這個異常,而後根據須要編寫本身的處理代碼。

(3)當新提交的任務沒法保存到隊列中等待執行時,「拋棄(Discard)」策略會悄悄拋棄該任務。

(4)「拋棄最舊的(Discard-Oldest)」策略則會拋棄下一個將被執行的任務,而後嘗試從新提交新的任務。(若是工做隊列是一個優先隊列,那麼「拋棄最舊的」策略將致使拋棄優先級最高的任務,所以最好不要將「拋棄最舊的」飽和策略和優先隊列放在一塊兒使用。)

(5)「調用者運行(Caller-Runs)」策略實現了一種調度機制,該策略既不會拋棄任務,也不會拋出異常,而是將某些任務回退到調用者,從而下降新任務的流量。它不會在線程池的某個線程中執行新提交的任務,而是在一個調用了execute的線程中執行該任務。若是採用有界隊列和「調用者運行」飽和策略,當線程池中的全部線程都被佔用,而且工做隊列被填滿後,下一個任務會在調用execute時在主線程中執行。因爲執行任務須要必定的時間,所以主線程至少在一段時間內不能提交任何任務,從而使得工做者線程有時間來處理完正在執行的任務。在此期間,主線程不會調用accept,所以到達的請求將被保存在TCP層的隊列中而不是在應用程序的隊列中。若是持續過載,那麼TCP層將最終發現它的請求隊列被填滿,所以一樣會開始拋棄請求。當服務器過載時,這種過載狀況會逐漸向外蔓延開來——從線程池到工做隊列到應用程序再到TCP層,最終達到客戶端,致使服務器在高負載下實現一種平緩的性能下降。

    可經過以下方式設置飽和策略:

          executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

 

六:線程工廠

(1)每當線程池須要建立一個線程時,都是經過線程工廠方法來完成的。默認的線程工廠方法將建立一個新的、非守護的線程,而且不包含特殊的配置信息。經過指定一個線程工廠方法,能夠定製線程池的配置信息。在ThreadFactory中只定義了一個方法newThread,每當線程池須要建立一個新線程時都會調用這個方法。

(2)許多狀況下都須要使用定製的線程工廠方法。例如,你但願爲線程池中的線程指定一個UncaughtExceptionHandler,或者實例化一個定製的Thread類用於執行調試信息的記錄。你還可能但願修改線程的優先級(這一般並非一個好主意)或者守護狀態(一樣,這也不是一個好主意)。或許你只是但願給線程取一個更有意義的名稱,用來解釋線程的轉儲信息和錯誤日誌。

public interface ThreadFactory {

    /**
     * Constructs a new {@code Thread}.  Implementations may also initialize
     * priority, name, daemon status, {@code ThreadGroup}, etc.
     *
     * @param r a runnable to be executed by new thread instance
     * @return constructed thread, or {@code null} if the request to
     *         create a thread is rejected
     */
    Thread newThread(Runnable r);
}

 

經過實現該接口,能夠定製本身的線程池工廠方法。

 

七:調用構造函數後再定製ThreadPoolExecutor

(1)在調用完ThreadPoolExecutor的構造函數後,仍然能夠經過設置函數(setter)來修改大多數傳遞給它的構造函數的參數(例如線程池的基本大小、最大大小、存活時間、線程工廠以及拒絕執行處理器)。若是Executor是經過Executors中的某個(newSingleThreadExecutor除外)工廠方法建立的,那麼能夠將結果的類型轉換爲ThreadPoolExecutor以訪問設置器,以下:

    ExecutorService exec = Executors.newCachedThreadPool();

       if ( exec instanceof ThreadPoolExecutor)

              ((ThreadPoolExecutor) exec).setCorePoolSize(10);

      else

              throw new AssertionError("Oops, bad assumption");

 

八:擴展ThreadPoolExecutor

(1)ThreadPoolExecutor是可擴展的,它提供了幾個能夠在子類化中改寫的方法:beforeExecute、afterExecute和 terminated,這些方法能夠用於擴展ThreadPoolExecutor的行爲。

(2)在執行任務的線程中將調用beforeExecute 和 afterExecute等方法,在這些方法中還能夠添加日誌、計時、監視或統計信息收集的功能。不管任務是從run中正常返回仍是拋出一個異常而返回,afterExecute都會被調用。(若是任何在調用完成後帶有一個Error,那麼就不會調用afterExecute。)若是beforeExecute拋出一個RuntimeException,那麼任務將不被執行,而且afterExecute也不會被調用。

(3)在線程池完成關閉操做時調用terminated,也就是在全部任務都已經完成而且全部工做者線程也已經關閉後。terminated能夠用來釋放Executor在其生命週期裏分配的各類資源,此外還能夠執行發送通知、記錄日誌或者收集finalize統計信息等操做。

相關文章
相關標籤/搜索