Java線程池總結

前一篇文章Java中實現多線程關鍵詞整理中介紹了Java中建立多線程的各類辦法,裏面提到了線程池,這裏對Java中的線程池作一個總結。html

1. 關於ThreadPoolExecutor

爲了更好地控制多線程,JDK提供了一套Executor框架,幫助開發人員有效的進行線程控制,其本質就是一個線程池。其中ThreadPoolExecutor是線程池中最核心的一個類,後面提到的四種線程池都是基於ThreadPoolExecutor實現的。java

ThreadPoolExecutor提供了四個構造方法,咱們看下最重要的一個構造函數:spring

public class ThreadPoolExecutor extends AbstractExecutorService {
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler);
}

函數的參數含義以下:編程

  • corePoolSize: 線程池維護線程的最少數量
  • maximumPoolSize:線程池維護線程的最大數量
  • keepAliveTime: 線程池維護線程所容許的空閒時間
  • unit: 線程池維護線程所容許的空閒時間的單位
  • workQueue: 線程池所使用的緩衝隊列
  • handler: 線程池對拒絕任務的處理策略

線程池執行的過程:數組

  1. 線程池剛建立時,裏面沒有一個線程。任務隊列是做爲參數傳進來的。不過,就算隊列裏面有任務,線程池也不會立刻執行它們。
  2. 當調用 execute() 方法添加一個任務時,線程池會作以下判斷:
    ​ a. 若是正在運行的線程數量小於 corePoolSize,那麼立刻建立線程運行這個任務;
    ​ b. 若是正在運行的線程數量大於或等於 corePoolSize,那麼將這個任務放入隊列。
    ​ c. 若是這時候隊列滿了,並且正在運行的線程數量小於 maximumPoolSize,那麼仍是要建立線程運行這個任務;
    ​ d. 若是隊列滿了,並且正在運行的線程數量大於或等於 maximumPoolSize,那麼線程池會拋出異常,告訴調用者「我不能再接受任務了」。
  3. 當一個線程完成任務時,它會從隊列中取下一個任務來執行。
  4. 當一個線程無事可作,超過必定的時間(keepAliveTime)時,線程池會判斷,若是當前運行的線程數大於corePoolSize,那麼這個線程就被停掉。因此線程池的全部任務完成後,它最終會收縮到 corePoolSize 的大小。

ThreadPoolExecutor的繼承關係:
緩存

ThreadPoolExecutor中的隊列:服務器

ThreadPoolExecutor內部應用了任務緩存隊列,即workQueue,它用來存放等待執行的任務。多線程

workQueue的類型爲BlockingQueue ,一般能夠取下面三種類型: 併發

  1. ArrayBlockingQueue:基於數組的先進先出隊列,此隊列建立時必須指定大小;
  2. LinkedBlockingQueue:基於鏈表的先進先出隊列,若是建立時沒有指定此隊列大小,則默認爲Integer.MAX_VALUE;
  3. synchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。

任務拒絕策略:框架

當線程池的任務緩存隊列已滿而且線程池中的線程數目達到maximumPoolSize,若是還有任務到來就會採起任務拒絕策略,一般有如下四種策略:

ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,可是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

擴展線程池(記錄任務執行日誌):

在默認的ThreadPoolExecutor實現中,提供了空的beforeExecutor和afterExecutor的實現,在實際應用中能夠對其進行擴展來實現對線程池運行狀態的追蹤,輸出一些有用的調試信息,以幫助系統故障診斷,這對於多線程程序錯誤排查是頗有幫助的。

ThreadPoolExecutor例子:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class ThreadPool {
    private int corePoolSize = 1; // 線程池維護線程的最少數量
    private int maximumPoolSize = 10;// 線程池維護線程的最大數量
    private long keepAliveTime = 3; // 線程池維護線程所容許的空閒時間
    private TimeUnit unit = TimeUnit.SECONDS;// 線程池維護線程所容許的空閒時間的單位
    private BlockingQueue<Runnable> workQueue; // 線程池所使用的緩衝隊列
    private RejectedExecutionHandler handler; // 線程池對拒絕任務的處理策略
    private static AtomicLong along = new AtomicLong(0);

    public void run() throws InterruptedException {
        ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize,
                maximumPoolSize, keepAliveTime, unit,
                new LinkedBlockingQueue<Runnable>(),
                new ThreadPoolExecutor.DiscardOldestPolicy()) {

            // 線程執行以前運行
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("...............beforeExecute");
            }

            // 線程執行以後運行
            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("...............afterExecute");
            }

            // 整個線程池中止以後
            protected void terminated() {
                System.out.println("...............thread stop");
            }
        };
        for (int i = 1; i <= 10; i++) {
            pool.execute(new ThreadPoolTask(i, along));
        }
        for (int i = 1; i <= 10; i++) {
            pool.execute(new ThreadPoolTask(-i, along));
        }
        pool.shutdown();
        Thread.sleep(25000);
        System.out.println(along.get());

    }

    public static void main(String[] args) {
        try {
            new ThreadPool().run();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class ThreadPoolTask implements Runnable {
    private int i = 0;
    private AtomicLong along;

    ThreadPoolTask(int i, AtomicLong along) {
        this.i = i;
        this.along = along;
    }
    
    @Override
    public void run() {
        try {
            // 模擬業務邏輯
            Thread.sleep(1000);
            along.addAndGet(i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "  " + i);
    }
}

咱們能夠利用這個特性實如今線程池中打印出異常堆棧信息(正常是不會打印出來的),這裏就不演示了。

2. 關於Executors提供的四種線程池

Executors 提供了一系列工廠方法用於創先線程池,返回的線程池都實現了 ExecutorService 接口。

// 建立固定數目線程的線程池。
public static ExecutorService newFixedThreadPool(int nThreads)

// 建立一個可緩存的線程池,調用execute將重用之前構造的線程(若是線程可用)。
// 若是現有線程沒有可用的,則建立一個新線 程並添加到池中。
// 終止並從緩存中移除那些已有 60 秒鐘未被使用的線程。
public static ExecutorService newCachedThreadPool()

// 建立一個單線程化的Executor。
public static ExecutorService newSingleThreadExecutor()

// 建立一個支持定時及週期性的任務執行的線程池,多數狀況下可用來替代Timer類。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

這四種方法都是用的 Executors 中的 ThreadFactory 創建的線程。

newCachedThreadPool()

  • 緩存型池子,先查看池中有沒有之前創建的線程,若是有,就 reuse 若是沒有,就建一個新的線程加入池中
  • 緩存型池子一般用於執行一些生存期很短的異步型任務 所以在一些面向鏈接的 daemon 型 SERVER 中用得很少。但對於生存期短的異步任務,它是 Executor 的首選。
  • 能 reuse 的線程,必須是 timeout IDLE 內的池中線程,缺省 timeout 是 60s,超過這個 IDLE 時長,線程實例將被終止及移出池。

newFixedThreadPool(int)

  • newFixedThreadPool 與 cacheThreadPool 差很少,也是能 reuse 就用,但不能隨時建新的線程。
  • 其獨特之處:任意時間點,最多隻能有固定數目的活動線程存在,此時若是有新的線程要創建,只能放在另外的隊列中等待,直到當前的線程中某個線程終止直接被移出池子。
  • 和 cacheThreadPool 不一樣,FixedThreadPool 沒有 IDLE 機制(可能也有,但既然文檔沒提,確定很是長,相似依賴上層的 TCP 或 UDP IDLE 機制之類的),因此 FixedThreadPool 多數針對一些很穩定很固定的正規併發線程,多用於服務器。
  • 從方法的源代碼看,cache池和fixed 池調用的是同一個底層 池,只不過參數不一樣:
    • fixed 池線程數固定,而且是0秒IDLE(無IDLE)。
    • cache 池線程數支持 0-Integer.MAX_VALUE(顯然徹底沒考慮主機的資源承受能力),60 秒 IDLE 。

newScheduledThreadPool(int)

  • 調度型線程池
  • 這個池子裏的線程能夠按 schedule 依次 delay 執行,或週期執行

SingleThreadExecutor()

  • 單例線程,任意時間池中只能有一個線程
  • 用的是和 cache 池和 fixed 池相同的底層池,但線程數目是 1-1,0 秒 IDLE(無 IDLE)

通常來講,CachedTheadPool 在程序執行過程當中一般會建立與所需數量相同的線程,而後在它回收舊線程時中止建立新線程,所以它是合理的 Executor 的首選,只有當這種方式會引起問題時(好比須要大量長時間面向鏈接的線程時),才須要考慮用 FixedThreadPool。

----《Thinking in Java》第四版

以上引用自極客學院,總結的太精彩了。

3. Spring中的線程池管理

Spring的TaskExecutor接口等同於java.util.concurrent.Executor接口。 實際上,它存在的主要緣由是爲了在使用線程池的時候,將對Java 5的依賴抽象出來。 這個接口只有一個方法execute(Runnable task),它根據線程池的語義和配置,來接受一個執行任務。最初建立TaskExecutor是爲了在須要時給其餘Spring組件提供一個線程池的抽象。例如ApplicationEventMulticaster組件、JMS的 AbstractMessageListenerContainer和對Quartz的整合都使用了TaskExecutor抽象來提供線程池。 固然,若是你的bean須要線程池行爲,你也可使用這個抽象層。

介紹下使用比較多的ThreadPoolTaskExecutor 類,這個實現只能在Java 5以上環境使用(如今應該沒有低於1.5的老環境了吧~),它暴露的bean properties能夠用來配置一個java.util.concurrent.ThreadPoolExecutor,把它包裝到一個TaskExecutor中。

spring中ThreadPoolTaskExecutor最經常使用方式就是作爲BEAN注入到容器中,其暴露的各個屬性實際上是ThreadPoolExecutor的屬性,並且這體現了DI容器的優點:

<bean id="threadPoolTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">  
    <property name="corePoolSize" value="2"/>  
    <property name="keepAliveSeconds" value="200"/>  
    <property name="maxPoolSize" value="10"/>  
    <property name="queueCapacity" value="60"/>  
</bean>

4. 優化線程池線程數量

線程池的理想大小取決於被提交任務的類型以及所部署系統的特性。在代碼中不會固定線程池的大小,而應該經過某種配置機制來來提供,或者根據Runtime.getRuntime().availableProcessors()來動態計算。

若是一臺服務器上只部署這一個應用而且只有一個線程池(N爲CPU總核數):

  • 若是是CPU密集型應用,則線程池大小設置爲N+1
  • 若是是IO密集型應用,則線程池大小設置爲2N+1

線程等待時間所佔比例越高,須要越多線程。線程CPU時間所佔比例越高,須要越少線程。

【黃金公式】最佳線程數目 = (線程等待時間與線程CPU時間之比 + 1)* CPU數目

一個實際的計算過程(慕課網):

假設值

  • tasks :每秒的任務數,假設爲500~1000
  • taskcost:每一個任務花費時間,假設爲0.1s
  • responsetime:系統容許容忍的最大響應時間,假設爲1s

計算

  • corePoolSize = 每秒須要多少個線程處理?

​ threadcount = tasks/(1/taskcost) =taskstaskcout = (500~1000)0.1 = 50~100 個線程。corePoolSize設置應該大於50

​ 根據8020原則,若是80%的每秒任務數小於800,那麼corePoolSize設置爲80便可

  • queueCapacity = (coreSizePool/taskcost)*responsetime

​ 計算可得 queueCapacity = 80/0.1*1 = 80。意思是隊列裏的線程能夠等待1s,超過了的須要新開線程來執行

​ 切記不能設置爲Integer.MAX_VALUE,這樣隊列會很大,線程數只會保持在corePoolSize大小,當任務陡增時,不能新開線程來執行,響應時間會隨之陡增。

  • maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)

​ 計算可得 maxPoolSize = (1000-80)/10 = 92

​ (最大任務數-隊列容量)/每一個線程每秒處理能力 = 最大線程數

  • rejectedExecutionHandler:根據具體狀況來決定,任務不重要可丟棄,任務重要則要利用一些緩衝機制來處理
  • keepAliveTimeallowCoreThreadTimeout採用默認一般能知足

參考博文:

Java併發編程:線程池的使用

線程池ThreadPoolExecutor使用簡介

參考書籍:

《Java併發編程實戰》

《Java高併發程序設計》

相關文章
相關標籤/搜索