讓人抓頭的Java併發(二) 線程池ThreadPoolExecutor分析

線程的建立須要開闢虛擬機棧、本地方法棧、程序計數器等線程私有的內存空間,在線程銷燬時須要回收這些系統資源。頻繁的建立銷燬線程會浪費大量資源,使用線程池能夠更好的管理和協調線程的工做。緩存

線程池的好處

  • 下降資源消耗,經過重複利用已有線程下降線程建立和銷燬形成的消耗
  • 提升響應速度,任務到達沒必要等待線程的建立
  • 管理複用線程,限制最大併發數
  • 實現定時執行或週期執行的任務(ScheduledThreadPoolExecutor)
  • 隔離線程環境,避免不一樣服務線程相互影響,防止服務發生雪崩(在SpringCloud的hystrix中也是這樣作的,不一樣服務調用採用不一樣線程池)

線程池的使用

ThreadPoolExecutor的構造方法以下:
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        …………
    }
複製代碼
1)構造參數分析:
  • corePoolSize:表示常駐的核心線程數量。若是爲0執行任務以後沒有任何請求進入時將被銷燬,若是大於0則不會被銷燬。
  • maximumPoolSize:表示線程池最大可以容納同時執行的線程數,必須大於等於1。若是和corePoolSize相等便是固定大小線程池,若是待執行線程數大於此數則按照參數handler處理。
  • keepAliveTime:表示線程池中的線程空閒時間,當空閒時間達到此值時,線程會被銷燬直到剩下corePoolSize個線程。默認當線程數大於corePoolSize時纔會起做用,可是當ThreadPoolExecutor的allowCoreThreadTimeOut設置爲true時核心線程超時後也會被銷燬。
  • unit:keepAliveTime的時間單位
  • workQueue:表示緩存隊列,當請求線程數大於corePoolSize時,線程將進入BlockingQueue。
  • threadFactory:線程工廠,它用來生產一組相同任務的線程。經過給這個factory增長組名前綴來實現線程池命名,以方便在虛擬機棧分析時知道線程任務是由哪一個線程工程產生的。
  • handler:執行拒絕策略的對象。當workQueue滿了以後而且活動線程數大於maximumPoolSize的時候,線程池經過該策略處理請求。

2)拒絕策略分析: ThreadPoolExecutor中提供了四個RejectedExecutionHandler策略。bash

  • AbortPolicy(默認):丟棄任務並拋出RejectedExecutionException異常。
  • DiscardPolicy:丟棄當前任務。
  • DiscardOldestPolicy:丟棄任務中等待最久的任務,而後把當前任務加入隊列。
  • CallerRunsPolicy:調用任務的run()方法繞過線程池直接執行。

3)建立線程池的其餘方式(不推薦):Executors這個線程池靜態工廠能夠建立三個線程池的包裝對象:ForkJoinPool、ThreadPoolExecutor、ScheduledThreadPoolExecutor。Executors中關於ThreadPoolExecutor的核心方法以下:併發

// SynchronousQueue是不存儲元素的阻塞隊列,而且maximumPoolSize爲Integer.MAX_VALUE
便是無界,當主線程提交任務速度高於CachedThreadPool的處理速度時會不斷建立線程,
極端狀況下會發生OOM
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
                                      
                                      
// keepAliveTime爲0意味着多餘的空閒線程會被馬上終止,LinkedBlockingQueue的默認容量
是Integer.MAX_VALUE即無界,極端狀況下會發生OOM
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
                                      
// LinkedBlockingQueue的默認容量是Integer.MAX_VALUE即無界,極端狀況下會發生OOM
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
複製代碼

看過了上述方法以後能夠發現這三個方法構造出來的線程池都存在OOM的風險。而且不能靈活的配置線程工廠和拒絕策略,因此不推薦使用Executors來建立線程池。ui

4)向線程池提交任務:有兩個方法execute()和submit()能夠向線程池提交任務。execute()方法用於提交不須要返回值的任務,沒法判斷任務是否被線程池執行成功。submit()方法用於提交有返回值的任務(Callable)。線程池會返回一個future類型對象,經過future的get()方法能夠獲取返回值,值得注意的是get()方法會阻塞當前線程直到任務完成。spa

5)關閉線程池:有兩個方法shutdown()和shutdownNow()能夠關閉線程池。它們的原理是遍歷線程池中的工做線程,而後逐個的調用線程的interrupt()方法來中斷線程(沒法響應中斷的線程沒法終止)。它們的區別在於shutdownNow()首先將線程池狀態設置爲STOP,而後嘗試中止全部線程;shutdown()是將線程池狀態設置爲SHOTDOWN,而後中斷全部沒有正在執行任務的線程。線程

線程池的原理解析

當線程池接收到一個任務以後,執行流程以下圖:code

  1. 判斷當前工做線程數是否達到核心線程數,若是沒有則建立一個新的線程來執行任務,若是達到了則進行下一個判斷。
  2. 判斷工做隊列是否已經滿了,若是工做隊列沒有滿,則將任務加入工做隊列中,不然進行下一個判斷。
  3. 判斷線程池是否已經滿了,若是沒有則建立新線程執行任務,不然按照飽和策略處理任務。

ThreadPoolExecutor執行示意圖:cdn

下面是ThreadPoolExecutor中execute()方法的核心代碼:對象

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // 獲取用於返回線程數和線程池狀態的integer數值
        int c = ctl.get();
        // 一、若是工做線程數小於核心線程數,則建立任務並執行
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 二、若是線程池處於RUNNING狀態則將任務加入隊列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 三、核心線程池和隊列都滿了,嘗試建立一個新的線程
        else if (!addWorker(command, false))
            // 四、若是建立失敗則執行拒絕策略
            reject(command);
    }
複製代碼

addWorker()主要是建立工做線程 -- 將任務包裝成Worker類。在一、3兩個步驟中建立線程時須要獲取全局鎖ReentrantLock避免被幹擾,噹噹前工做線程數大於等於corePoolSize以後幾乎全部的execute()都是在執行步驟2。 Worker在執行完任務以後還會循環獲取工做隊列的任務來執行while (task != null || (task = getTask()) != null),getTask()方法中獲取阻塞隊列中的任務(poll()或take(),若是核心線程會被銷燬或者當前線程數大於核心線程數則用poll()超時獲取)blog

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 
                    : workQueue.take();
複製代碼

線程池的工做原理代碼在這裏就不具體分析了,下圖直觀的展現了線程池的工做原理。

線程池工做原理



合理配置線程池

想要合理的配置線程池首先須要分析任務特性:CPU密集型任務、IO密集型任務、混合型任務 .

  • CPU密集型任務:儘可能使用較小的線程池,通常爲CPU核心數+1。CPU密集型任務的CPU使用率很高,過多的線程數運行只能增長上下文切換的次數,所以會帶來額外的開銷。

  • IO密集型任務:使用稍大的線程池,通常爲2*CPU核心數。IO密集型任務CPU使用率並不高,可讓CPU在等待IO的時候去處理別的任務,充分利用CPU。

  • 混合型任務:能夠將任務分紅IO密集型和CPU密集型任務,而後分別用不一樣的線程池去處理。只要分完以後兩個任務的執行時間相差不大,那麼就會比串行執行高效。若是劃分以後兩個任務執行時間相差甚遠,那麼最終的時間仍然取決於後執行完的任務,並且還要加上任務拆分與合併的開銷。



在線程池的實現中還涉及了不少併發包中的知識好比BlockingQueue、ReentrantLock、Condition等,在這裏就暫時不進行介紹了,後續會介紹它們。

相關文章
相關標籤/搜索