從0到1玩轉線程池

咱們通常不會選擇直接使用線程類Thread進行多線程編程,而是使用更方便的線程池來進行任務的調度和管理。線程池就像共享單車,咱們只要在咱們有須要的時候去獲取就能夠了。甚至能夠說線程池更棒,咱們只須要把任務提交給它,它就會在合適的時候運行了。可是若是直接使用Thread類,咱們就須要在每次執行任務時本身建立、運行、等待線程了,並且很難對線程進行總體的管理,這可不是一件輕鬆的事情。既然咱們已經有了線程池,那仍是把這些麻煩事交給線程池來處理吧。java

以前一篇介紹線程池使用及其源碼的文章篇幅太長了、跨度太大了一些,感受不是很好理解。因此我把內容從新組織了一下,拆爲了兩篇文章,而且補充了一些內容,但願能讓你們更容易地理解相關內容。編程

這篇文章將從線程池的概念與通常使用入手,首先介紹線程池的通常使用。而後詳細介紹線程池中經常使用的可配置項,例如任務隊列、拒絕策略等,最後會介紹四種經常使用的線程池配置。經過這篇文章,你們能夠熟練掌握線程池的使用方式,在實踐中游刃有餘地使用線程池對線程進行靈活的調度。緩存

閱讀本文須要對多線程編程有基本的認識,例如什麼是線程、多線程解決的是什麼問題等。不瞭解的讀者能夠參考一下我以前發佈的一篇文章《這一次,讓咱們徹底掌握Java多線程(2/10)》bash

通常咱們最經常使用的線程池實現類是ThreadPoolExecutor,咱們接下來會介紹這個類的基本使用方法。JDK已經對線程池作了比較好的封裝,相信這個過程會很是輕鬆。數據結構

線程池的基本使用

建立線程池

既然線程池是一個Java類,那麼最直接的使用方法必定是new一個ThreadPoolExecutor類的對象,例如ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>() )。那麼這個構造器的裏每一個參數是什麼意思呢?咱們能夠暫時不用關心這些細節,繼續完成線程池的使用之旅,稍後再回頭來研究這個問題。多線程

提交任務

當建立了一個線程池以後咱們就能夠將任務提交到線程池中執行了。提交任務到線程池中至關簡單,咱們只要把原來傳入Thread類構造器的Runnable對象傳入線程池的execute方法或者submit方法就能夠了。execute方法和submit方法基本沒有區別,二者的區別只是submit方法會返回一個Future對象,用於檢查異步任務的執行狀況和獲取執行結果(異步任務完成後)。異步

咱們能夠先試試如何使用比較簡單的execute方法,代碼例子以下:性能

public class ThreadPoolTest {

    private static int count = 0;

    public static void main(String[] args) throws Exception {
        Runnable task = new Runnable() {
            public void run() {
                for (int i = 0; i < 1000000; ++i) {
                    synchronized (ThreadPoolTest.class) {
                        count += 1;
                    }
                }
            }
        };

        // 重要:建立線程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 0L,
        TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

        // 重要:向線程池提交兩個任務
        threadPool.execute(task);
        threadPool.execute(task);

        // 等待線程池中的全部任務完成
        threadPool.shutdown();
        while (!threadPool.awaitTermination(1L, TimeUnit.MINUTES)) {
            System.out.println("Not yet. Still waiting for termination");
        }
        
        System.out.println("count = " + count);
    }
}
複製代碼

運行以後獲得的結果是兩百萬,咱們成功實現了第一個使用線程池的程序。那麼回到剛纔的問題,建立線程池時傳入的那些參數有什麼做用的呢?ui

深刻解析線程池

建立線程池的參數

下面是ThreadPoolExecutor的構造器定義:spa

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
複製代碼

各個參數分別表示下面的含義:

  1. corePoolSize,核心線程池大小,通常線程池會至少保持這麼多的線程數量;
  2. maximumPoolSize,最大線程池大小,也就是線程池最大的線程數量;
  3. keepAliveTime和unit共同組成了一個超時時間,keepAliveTime是時間數量,unit是時間單位,單位加數量組成了最終的超時時間。這個超時時間表示若是線程池中包含了超過corePoolSize數量的線程,則在有線程空閒的時間超過了超時時間時該線程就會被銷燬;
  4. workQueue是任務的阻塞隊列,在沒有線程池中沒有足夠的線程可用的狀況下會將任務先放入到這個阻塞隊列中等待執行。這裏傳入的隊列類型就決定了線程池在處理這些任務時的策略,具體類型會在下文中介紹;
  5. threadFactory,線程的工廠對象,線程池經過該對象建立線程。咱們能夠經過傳入自定義的實現了ThreadFactory接口的類來修改線程的建立邏輯,能夠不傳,默認使用Executors.defaultThreadFactory()做爲默認的線程工廠;
  6. handler,拒絕策略,在線程池沒法執行或保存新提交的任務時進行處理的對象,經常使用的有如下幾種策略類:
    • ThreadPoolExecutor.AbortPolicy,默認策略,行爲是直接拋出RejectedExecutionException異常
    • ThreadPoolExecutor.CallerRunsPolicy,用調用者所在的線程來執行任務
    • ThreadPoolExecutor.DiscardOldestPolicy,丟棄阻塞隊列中最先提交的任務,並重試execute方法
    • ThreadPoolExecutor.DiscardPolicy,靜默地直接丟棄任務,不返回任何錯誤

看到這裏可能大部分讀者並不能理解每一個參數具體的做用,接下來咱們就經過線程池源代碼中使用了這些參數配置的代碼來深刻理解每個參數的意義。

execute方法的實現

咱們通常會使用execute方法提交咱們的任務,那麼線程池在這個過程當中作了什麼呢?在ThreadPoolExecutor類的execute()方法的源代碼中,咱們主要作了四件事:

  1. 若是當前線程池中的線程數小於核心線程數corePoolSize,則經過threadFactory建立一個新的線程,並把入參中的任務做爲第一個任務傳入該線程;
  2. 若是當前線程池中的線程數已經達到了核心線程數corePoolSize,那麼就會經過阻塞隊列workerQueueoffer方法來將任務添加到隊列中保存,並等待線程空閒後進行執行;
  3. 若是線程數已經達到了corePoolSize且阻塞隊列中沒法插入該任務(好比已滿),那麼線程池就會再增長一個線程來執行該任務,除非線程數已經達到了最大線程數maximumPoolSize
  4. 若是確實已經達到了最大線程數,那麼就會經過拒絕策略對象handler拒絕這個任務。

整體上的執行流程以下,左側的實心黑點表明流程開始,下方的黑色同心圓表明流程結束:

上面提到了線程池構造器參數中除了超時時間以外的全部參數的做用,相信你們根據上面的流程已經能夠理解每一個參數的意義了。可是有一個名詞咱們還一直沒有深刻講解,那就是阻塞隊列的含義。

線程池中的阻塞隊列

線程池中的阻塞隊列專門用於存放須要等待線程空閒的待執行任務,而阻塞隊列是這樣的一種數據結構,它是一個隊列(相似於一個List),能夠存放0到N個元素。咱們能夠對這個隊列進行插入和彈出元素的操做,彈出操做能夠理解爲是一個獲取並從隊列中刪除一個元素的操做。當隊列中沒有元素時,對這個隊列的獲取操做將會被阻塞,直到有元素被插入時纔會被喚醒;當隊列已滿時,對這個隊列的插入操做將會被阻塞,直到有元素被彈出後纔會被喚醒。

這樣的一種數據結構很是適合於線程池的場景,當一個工做線程沒有任務可處理時就會進入阻塞狀態,直到有新任務提交後才被喚醒。

在線程池中,不一樣的阻塞隊列類型會被線程池的行爲產生不一樣的影響,下面是三種咱們最經常使用的阻塞隊列類型:

  1. 直連隊列,以SynchronousQueue類爲表明,隊列不會存儲任何任務。當有任務提交線程試圖向隊列中添加待執行任務時會被阻塞,直到有任務處理線程試圖從隊列中獲取待執行任務時會與阻塞狀態中的任務提交線程發生直接聯繫,由任務提交線程把任務直接交給任務執行線程;
  2. 無界隊列,以LinkedBlockingQueue類爲表明,隊列中能夠存儲無限數量的任務。這種隊列永遠不會由於隊列已滿致使任務放入隊列失敗,因此結合前面介紹的流程咱們能夠發現,當使用無界隊列時,線程池中的線程最多隻能達到核心線程數就不會再增加了,最大線程數maximumPoolSize參數不會產生做用;
  3. 有界隊列,以ArrayBlockingQueue類爲表明,能夠保存固定數量的任務。這種隊列在實踐中比較經常使用,由於它既不會由於保存太多任務致使資源消耗過多(無界隊列),又不會由於任務提交線程被阻塞而影響到系統的性能(直連隊列)。整體上來講,有界隊列在實際效果上比較均衡。

閱讀execute方法的源碼

在IDE中,例如IDEA裏,咱們能夠點擊咱們樣例代碼裏的ThreadPoolExecutor類跳轉到JDK中ThreadPoolExecutor類的源代碼。在源代碼中咱們能夠看到不少java.util.concurrent包的締造者大牛「Doug Lea」所留下的各類註釋,下面的圖片就是該類源代碼的一個截圖。

這些註釋的內容很是有參考價值,建議有能力的讀者朋友能夠本身閱讀一遍。下面,咱們就一步步地抽絲剝繭,來揭開線程池類ThreadPoolExecutor源代碼的神祕面紗。不過這一步並非必須的,能夠跳過。

下面是ThreadPoolExecutorexecute方法帶有中文解釋的源代碼,有興趣的朋友能夠和上面的流程對照起來參考一下:

public void execute(Runnable command) {
    // 檢查提交的任務是否爲空
    if (command == null)
        throw new NullPointerException();
    
    // 獲取控制變量值
    int c = ctl.get();
    // 檢查當前線程數是否達到了核心線程數
    if (workerCountOf(c) < corePoolSize) {
        // 未達到核心線程數,則建立新線程
        // 並將傳入的任務做爲該線程的第一個任務
        if (addWorker(command, true))
            // 添加線程成功則直接返回,不然繼續執行
            return;

        // 由於前面調用了耗時操做addWorker方法
        // 因此線程池狀態有可能發生了改變,從新獲取狀態值
        c = ctl.get();
    }

    // 判斷線程池當前狀態是不是運行中
    // 若是是則調用workQueue.offer方法將任務放入阻塞隊列
    if (isRunning(c) && workQueue.offer(command)) {
        // 由於執行了耗時操做「放入阻塞隊列」,因此從新獲取狀態值
        int recheck = ctl.get();
        // 若是當前狀態不是運行中,則將剛纔放入阻塞隊列的任務拿出,若是拿出成功,則直接拒絕這個任務
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            // 若是線程池中沒有線程了,那就建立一個
            addWorker(null, false);
    }
    // 若是放入阻塞隊列失敗(如隊列已滿),則添加一個線程
    else if (!addWorker(command, false))
        // 若是添加線程失敗(如已經達到了最大線程數),則拒絕任務
        reject(command);
}
複製代碼

在這段源代碼中,咱們能夠看到,線程池是經過addWorker方法來建立線程的,這裏的這個Worker指的就是ThreadPoolExecutor類中用來對線程進行包裝和管理的Worker類對象。若是想了解Worker類的具體執行流程能夠閱讀一下下一篇深刻剖析線程池的任務執行流程的文章。

超時時間

那麼還有一個咱們沒有提到的超時時間在這個過程當中發揮了什麼做用呢?從前面咱們能夠看出,線程數量被劃分爲了核心線程數和最大線程數。當線程沒有任務可執行時會阻塞在從隊列中獲取新任務這個操做上,這時咱們稱這個線程爲空閒線程,一旦有新任務被提交,則該線程就會退出阻塞狀態並開始執行這個新任務。

若是當前線程池中的線程總數大於核心線程數,那麼只要有線程的空閒時間超過了超時時間,那麼這個線程就會被銷燬;若是線程池中的線程總數小於等於核心線程數,那麼超時線程就不會被銷燬了(除了一些特殊狀況外)。這也就是超時時間參數所發揮的做用了。

其餘線程池操做

關閉線程池

在以前使用線程池執行任務的代碼中爲了等待線程池中的全部任務執行完已經使用了shutdown()方法,這是關閉線程池的一種方法。對於ThreadPoolExecutor,關閉線程池的方法主要有兩個:

  1. shutdown(),有序關閉線程池,調用後線程池會讓已經提交的任務完成執行,可是不會再接受新任務。
  2. shutdownNow(),直接關閉線程池,線程池中正在運行的任務會被中斷,正在等待執行的任務不會再被執行,可是這些還在阻塞隊列中等待的任務會被做爲返回值返回。

監控線程池運行狀態

咱們能夠經過調用線程池對象上的一些方法來獲取線程池當前的運行信息,經常使用的方法有:

  • getTaskCount,線程池中已完成、執行中、等待執行的任務總數估計值。由於在統計過程當中任務會發生動態變化,因此最後的結果並非一個準確值;
  • getCompletedTaskCount,線程池中已完成的任務總數,這一樣是一個估計值;
  • getLargestPoolSize,線程池曾經建立過的最大線程數量。經過這個數據能夠知道線程池是否充滿過,也就是達到過maximumPoolSize;
  • getPoolSize,線程池當前的線程數量;
  • getActiveCount,當前線程池中正在執行任務的線程數量估計值。

四種經常使用線程池

不少狀況下咱們也不會直接建立ThreadPoolExecutor類的對象,而是根據須要經過Executors的幾個靜態方法來建立特定用途的線程池。目前經常使用的線程池有四種:

  1. 可緩存線程池,使用Executors.newCachedThreadPool方法建立
  2. 定長線程池,使用Executors.newFixedThreadPool方法建立
  3. 延時任務線程池,使用Executors.newScheduledThreadPool方法建立
  4. 單線程線程池,使用Executors.newSingleThreadExecutor方法建立

下面經過這些靜態方法的源碼來具體瞭解一下不一樣類型線程池的特性與適用場景。

可緩存線程池

JDK中的源碼咱們經過在IDE中進行跳轉能夠很方便地進行查看,下面就是Executors.newCachedThreadPool方法中的源代碼。從代碼中咱們能夠看到,可緩存線程池其實也是經過直接建立ThreadPoolExecutor類的構造器建立的,只是其中的參數都已經被設置好了,咱們能夠不用作具體的設置。因此咱們要觀察的重點就是在這個方法中具體產生了一個怎樣配置的ThreadPoolExecutor對象,以及這樣的線程池適用於怎樣的場景。

從下面的代碼中,咱們能夠看到,傳入ThreadPoolExecutor構造器的值有: - corePoolSize核心線程數爲0,表明線程池中的線程數能夠爲0 - maximumPoolSize最大線程數爲Integer.MAX_VALUE,表明線程池中最多能夠有無限多個線程 - 超時時間設置爲60秒,表示線程池中的線程在空閒60秒後會被回收 - 最後傳入的是一個SynchronousQueue類型的阻塞隊列,表明每個新添加的任務都要立刻有一個工做線程進行處理

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
複製代碼

因此可緩存線程池在添加任務時會優先使用空閒的線程,若是沒有就建立一個新線程,線程數沒有上限,因此每個任務都會立刻被分配到一個工做線程進行執行,不須要在阻塞隊列中等待;若是線程池長期閒置,那麼其中的全部線程都會被銷燬,節約系統資源。

  • 優勢
    • 任務在添加後能夠立刻執行,不須要進入阻塞隊列等待
    • 在閒置時不會保留線程,能夠節約系統資源
  • 缺點
    • 對線程數沒有限制,可能會過量消耗系統資源
  • 適用場景
    • 適用於大量短耗時任務和對響應時間要求較高的場景

定長線程池

傳入ThreadPoolExecutor構造器的值有:

  • corePoolSize核心線程數和maximumPoolSize最大線程數都爲固定值nThreads,即線程池中的線程數量會保持在nThreads,因此被稱爲「定長線程池」
  • 超時時間被設置爲0毫秒,由於線程池中只有核心線程,因此不須要考慮超時釋放
  • 最後一個參數使用了無界隊列,因此在全部線程都在處理任務的狀況下,能夠無限添加任務到阻塞隊列中等待執行
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
複製代碼

定長線程池中的線程數會逐步增加到nThreads個,而且在以後空閒線程不會被釋放,線程數會一直保持在nThreads個。若是添加任務時全部線程都處於忙碌狀態,那麼就會把任務添加到阻塞隊列中等待執行,阻塞隊列中任務的總數沒有上限。

  • 優勢
    • 線程數固定,對系統資源的消耗可控
  • 缺點
    • 在任務量暴增的狀況下線程池不會彈性增加,會致使任務完成時間延遲
    • 使用了無界隊列,在線程數設置太小的狀況下可能會致使過多的任務積壓,引發任務完成時間過晚和資源被過分消耗的問題
  • 適用場景
    • 任務量峯值不會太高,且任務對響應時間要求不高的場景

延時任務線程池

與以前的兩個方法不一樣,Executors.newScheduledThreadPool返回的是ScheduledExecutorService接口對象,能夠提供延時執行、定時執行等功能。在線程池配置上有以下特色:

  • maximumPoolSize最大線程數爲無限,在任務量較大時能夠建立大量新線程執行任務
  • 超時時間爲0,線程空閒後會被當即銷燬
  • 使用了延時工做隊列,延時工做隊列中的元素都有對應的過時時間,只有過時的元素纔會被彈出
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
複製代碼

延時任務線程池實現了ScheduledExecutorService接口,主要用於須要延時執行和定時執行的狀況。

單線程線程池

單線程線程池中只有一個工做線程,能夠保證添加的任務都以指定順序執行(先進先出、後進先出、優先級)。可是若是線程池裏只有一個線程,爲何咱們還要用線程池而不直接用Thread呢?這種狀況下主要有兩種優勢:一是咱們能夠經過共享的線程池很方便地提交任務進行異步執行,而不用本身管理線程的生命週期;二是咱們可使用任務隊列並指定任務的執行順序,很容易作到任務管理的功能。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
複製代碼

總結

在這篇文章中咱們從線程池的概念和基本使用方法提及,經過execute方法的源碼深刻剖析了任務提交的全過程和各個線程池構造器參數在線程池實際運行過程當中所發揮的做用,還真正閱讀了線程池類ThreadPoolExecutor的execute方法的源代碼。最後,咱們介紹了線程池的其餘經常使用操做和四種經常使用的線程池。

到這裏咱們的線程池源代碼之旅就結束了,但願你們在看完這篇文章以後能對線程池的使用和運行流程有了一個大概的印象。爲何說只是有了一個大概的印象呢?由於我以爲不少沒有相關基礎的讀者讀到這裏可能還只是對線程池有了一個本身的認識,對其中的一些細節可能尚未徹底捕捉到。因此我建議你們在看完這篇文章後不妨再返回到文章的開頭多讀幾遍,相信第二遍的閱讀能給你們帶來不同的體驗,由於我本身也是在第三次讀ThreadPoolExecutor類的源代碼時才真正打通了其中的一些重要關節的。

引子

在這篇文章中,咱們還只是探究了線程池的基本使用方法,以及提交任務方法execute的源代碼。那麼在任務提交之後是怎麼被線程池所執行的呢?在下一篇文章中咱們就能夠找到答案,在下一篇文章中,咱們會深刻剖析線程池的任務執行流程。

相關文章
相關標籤/搜索