線程池底層原理

目錄java

概述

JAVA經過多線程的方式實現併發,爲了方便線程池的管理,JAVA採用線程池的方式對線線程的整個生命週期進行管理。1.5後引入的Executor框架的最大優勢是把任務的提交和執行解耦編程

要執行任務的人只需把Task描述清楚,而後提交便可。這個Task是怎麼被執行的,被誰執行的,何時執行的,提交的人就不用關心了。數組

線程池同時能夠避免建立大量線程的開銷,提升響應速度。最近在閱讀JVM相關的東西,一個對象的建立須要如下過程:安全

  1. 檢查對應的類是否已經被加載、解析和初始化
  2. 類加載後,爲新生對象分配內存
  3. 將分配到的內存空間初始爲 0
  4. 對對象進行關鍵信息的設置,好比對象的hashcode等
  5. 而後執行 init 方法初始化對象

若是每次都是如此的建立線程->執行任務->銷燬線程,會形成很大的性能開銷。複用已建立好的線程能夠提升系統的性能,藉助池化技術的思想,經過預先建立好多個線程,放在池中,這樣能夠在須要使用線程的時候直接獲取,避免屢次重複建立、銷燬帶來的開銷。服務器

線程池的「池」

ThreadPoolExecutor

前面提到一個名詞——池化技術,那麼到底什麼是池化技術呢?池化技術簡單點來講,就是提早保存大量的資源,以備不時之需。在機器資源有限的狀況下,使用池化技術能夠大大的提升資源的利用率,提高性能等。多線程

在編程領域,比較典型的池化技術有:併發

線程池、鏈接池、內存池、對象池等。框架

在Java中建立線程池可使用ThreadPoolExecutor,其繼承關係以下圖函數

img

其構造函數爲:源碼分析

代碼塊

Java

public ThreadPoolExecutor(int corePoolSize,    //核心線程的數量
                          int maximumPoolSize,    //最大線程數量
                          long keepAliveTime,    //超出核心線程數量之外的線程空餘存活時間
                          TimeUnit unit,    //存活時間的單位
                          BlockingQueue<Runnable> workQueue,    //保存待執行任務的隊列
                          ThreadFactory threadFactory,    //建立新線程使用的工廠
                          RejectedExecutionHandler handler // 當任務沒法執行時的處理器
                          ) {...}
  • corePoolSize:核心線程池數量

在線程數少於核心數量時,有新任務進來就新建一個線程,即便有的線程沒事幹

等超出核心數量後,就不會新建線程了,空閒的線程就得去任務隊列裏取任務執行了

  • maximumPoolSize:最大線程數量

包括核心線程池數量 + 核心之外的數量

若是任務隊列滿了,而且池中線程數小於最大線程數,會再建立新的線程執行任務

  • keepAliveTime:核心池之外的線程存活時間,即沒有任務的外包的存活時間

若是給線程池設置 allowCoreThreadTimeOut(true),則核心線程在空閒時頭上也會響起死亡的倒計時

若是任務是多而容易執行的,能夠調大這個參數,那樣線程就能夠在存活的時間裏有更大可能接受新任務

  • workQueue:保存待執行任務的阻塞隊列

不一樣的任務類型有不一樣的選擇,下一小節介紹

  • threadFactory:每一個線程建立的地方

能夠給線程起個好聽的名字,設置個優先級啥的

  • handler:飽和策略,你們都很忙,咋辦呢,有四種策略

    • AbortPolicy:直接拋出 RejectedExecutionException 異常,本策略也是默認的飽和策略
    • CallerRunsPolicy:只要線程池沒關閉,就直接用調用者所在線程來運行任務
    • DiscardPolicy:悄悄把任務放生,不作了
    • DiscardOldestPolicy:把隊列裏待最久的那個任務扔了,而後再調用 execute() 嘗試執行
    • 咱們也能夠實現本身的 RejectedExecutionHandler 接口自定義策略,好比如記錄日誌什麼的

若是把線程比做員工,那麼線程池能夠比做一個團隊,核心池比做團隊中正式員工數,核心池外的比做外包員工。

線程池中任務的執行順序

經過Executors靜態工廠也能夠構建經常使用的線程池,在詳細介紹以前,還須要先了解線程池中任務的執行順序

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

從註釋中能夠看處處理邏輯,從判斷條件中能夠看到核心模塊

  • 第一個紅框:workerCountOf方法根據ctl的低29位,獲得線程池的當前線程數,若是線程數小於corePoolSize,則執行addWorker方法建立新的線程執行任務;
  • 第二個紅框:判斷線程池是否在運行,若是在,任務隊列是否容許插入,插入成功再次驗證線程池是否運行,若是不在運行,移除插入的任務,而後拋出拒絕策略。若是在運行,沒有線程了,就啓用一個線程。
  • 第三個紅框:若是添加非核心線程失敗,就直接拒絕了。

概略圖:

img

詳細流程圖:

img

Executors

按照上面的總結,能夠逐一分析Executors工廠類提供的現成的線程池:

img

1.newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

不招外包,有固定數量核心成員的正常互聯網團隊。

能夠看到,FixedThreadPool 的核心線程數和最大線程數都是指定值,也就是說當線程池中的線程數超過核心線程數後,任務都會被放到阻塞隊列中。

此外 keepAliveTime 爲 0,也就是多餘的空餘線程會被當即終止(因爲這裏沒有多餘線程,這個參數也沒什麼意義了)。

而這裏選用的阻塞隊列是 LinkedBlockingQueue,使用的是默認容量 Integer.MAX_VALUE,至關於沒有上限。

所以這個線程池執行任務的流程以下:

線程數少於核心線程數,也就是設置的線程數時,新建線程執行任務

線程數等於核心線程數後,將任務加入阻塞隊列

因爲隊列容量很是大,能夠一直加加加

執行完任務的線程反覆去隊列中取任務執行

FixedThreadPool 用於負載比較重的服務器,爲了資源的合理利用,須要限制當前線程數量。

2.newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

不招外包,只有一個核心成員的創業團隊。

從參數能夠看出來,SingleThreadExecutor 至關於特殊的 FixedThreadPool,它的執行流程以下:

線程池中沒有線程時,新建一個線程執行任務

有一個線程之後,將任務加入阻塞隊列,不停加加加

惟一的這一個線程不停地去隊列裏取任務執行

聽起來很可憐的樣子 - -。

SingleThreadExecutor 用於串行執行任務的場景,每一個任務必須按順序執行,不須要併發執行。

3.newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

所有外包,沒活最多待 60 秒的外包團隊。

能夠看到,CachedThreadPool 沒有核心線程,非核心線程數無上限,也就是所有使用外包,可是每一個外包空閒的時間只有 60 秒,超事後就會被回收。

CachedThreadPool 使用的隊列是 SynchronousQueue,這個隊列的做用就是傳遞任務,並不會保存。

所以當提交任務的速度大於處理任務的速度時,每次提交一個任務,就會建立一個線程。極端狀況下會建立過多的線程,耗盡 CPU 和內存資源。

它的執行流程以下:

沒有核心線程,直接向 SynchronousQueue 中提交任務

若是有空閒線程,就去取出任務執行;若是沒有空閒線程,就新建一個

執行完任務的線程有 60 秒生存時間,若是在這個時間內能夠接到新任務,就能夠繼續活下去,不然就拜拜

因爲空閒 60 秒的線程會被終止,長時間保持空閒的 CachedThreadPool 不會佔用任何資源。

CachedThreadPool 用於併發執行大量短時間的小任務,或者是負載較輕的服務器。

4.newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue());
}
private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;

按期維護的 2B 業務團隊,核心與外包成員都有。

ScheduledThreadPoolExecutor 繼承自 ThreadPoolExecutor, 最多線程數爲 Integer.MAX_VALUE ,使用 DelayedWorkQueue 做爲任務隊列。

ScheduledThreadPoolExecutor 添加任務和執行任務的機制與ThreadPoolExecutor 有所不一樣。

ScheduledThreadPoolExecutor 添加任務提供了另外兩個方法:

scheduleAtFixedRate() :按某種速率週期執行

scheduleWithFixedDelay():在某個延遲後執行

它倆的代碼以下:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
      throw new NullPointerException();
    if (period <= 0L)
      throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
      new ScheduledFutureTask<Void>(command,
                                    null,
                                    triggerTime(initialDelay, unit),
                                    unit.toNanos(period),
                                    sequencer.getAndIncrement());
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
      throw new NullPointerException();
    if (delay <= 0L)
      throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
      new ScheduledFutureTask<Void>(command,
                                    null,
                                    triggerTime(initialDelay, unit),
                                    -unit.toNanos(delay),
                                    sequencer.getAndIncrement());
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

能夠看到,這兩種方法都是建立了一個 ScheduledFutureTask 對象,調用 decorateTask() 方法轉成 RunnableScheduledFuture 對象,而後添加到隊列中。

看下 ScheduledFutureTask 的主要屬性:

private class ScheduledFutureTask<V>
        extends FutureTask<V> implements RunnableScheduledFuture<V> {
    //添加到隊列中的順序
    private final long sequenceNumber;
    //什麼時候執行這個任務
    private volatile long time;
    //執行的間隔週期
    private final long period;
    //實際被添加到隊列中的 task
    RunnableScheduledFuture<V> outerTask = this;
    //在 delay queue 中的索引,便於取消時快速查找
    int heapIndex;
    //...
}

DelayQueue 中封裝了一個優先級隊列,這個隊列會對隊列中的 ScheduledFutureTask 進行排序,兩個任務的執行 time 不一樣時,time 小的先執行;不然比較添加到隊列中的順序 sequenceNumber ,先提交的先執行。

ScheduledThreadPoolExecutor 的執行流程以下:

調用上面兩個方法添加一個任務

線程池中的線程從 DelayQueue 中取任務

而後執行任務

具體執行任務的步驟也比較複雜:

線程從 DelayQueue 中獲取 time 大於等於當前時間的 ScheduledFutureTask

DelayQueue.take()

執行完後修改這個 task 的 time 爲下次被執行的時間

而後再把這個 task 放回隊列中

DelayQueue.add()

ScheduledThreadPoolExecutor 用於須要多個後臺線程執行週期任務,同時須要限制線程數量的場景。

「不容許使用」Executors

阿里巴巴Java開發手冊中明確指出,『不容許』使用Executors建立線程池。
img
經過上面的例子,咱們知道了Executors建立的線程池存在OOM的風險,那麼究竟是什麼緣由致使的呢?咱們須要深刻Executors的源碼來分析一下。

其實,在上面的報錯信息中,咱們是能夠看出蛛絲馬跡的,在以上的代碼中其實已經說了,真正的致使OOM的實際上是LinkedBlockingQueue.offer方法。

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
    at com.hollis.ExecutorsDemo.main(ExecutorsDemo.java:16)

若是對Java中的阻塞隊列有所瞭解的話,看到這裏或許就可以明白緣由了。

Java中的BlockingQueue主要有兩種實現,分別是ArrayBlockingQueue 和 LinkedBlockingQueue。

ArrayBlockingQueue是一個用數組實現的有界阻塞隊列,必須設置容量。

LinkedBlockingQueue是一個用鏈表實現的有界阻塞隊列,容量能夠選擇進行設置,不設置的話,將是一個無邊界的阻塞隊列,最大長度爲Integer.MAX_VALUE。

這裏的問題就出在:不設置的話,將是一個無邊界的阻塞隊列,最大長度爲Integer.MAX_VALUE。也就是說,若是咱們不設置LinkedBlockingQueue的容量的話,其默認容量將會是Integer.MAX_VALUE。

而newFixedThreadPool中建立LinkedBlockingQueue時,並未指定容量。此時,LinkedBlockingQueue就是一個無邊界隊列,對於一個無邊界隊列來講,是能夠不斷的向隊列中加入任務的,這種狀況下就有可能由於任務過多而致使內存溢出問題。

上面提到的問題主要體如今newFixedThreadPool和newSingleThreadExecutor兩個工廠方法上,並非說newCachedThreadPool和newScheduledThreadPool這兩個方法就安全了,這兩種方式建立的最大線程數多是Integer.MAX_VALUE,而建立這麼多線程,必然就有可能致使OOM。

說回ThreadPoolService

addWorker

從方法execute的實現能夠看出:addWorker主要負責建立新的線程並執行任務,代碼以下(這裏代碼有點長,不要緊,也是分塊的,總共有5個關鍵的代碼塊):

img

  • 第一個紅框:作是否可以添加工做線程條件過濾:

    • 判斷線程池的狀態,若是線程池的狀態值大於或等SHUTDOWN,則不處理提交的任務,直接返回;
  • 第二個紅框:作自旋,更新建立線程數量:

    • 經過參數core判斷當前須要建立的線程是否爲核心線程,若是core爲true,且當前線程數小於corePoolSize,則跳出循環,開始建立新的線程。retry 是什麼?這個是java中的goto語法。只能運用在break和continue後面。

接着看後面的代碼:

img

  • 第一個紅框:獲取線程池主鎖。

    • 線程池的工做線程經過Woker類實現,經過ReentrantLock鎖保證線程安全。
  • 第二個紅框:添加線程到workers中(線程池中)。
  • 第三個紅框:啓動新建的線程。

接下來,咱們看看workers是什麼。

img

一個hashSet。因此,線程池底層的存儲結構其實就是一個HashSet

worker線程處理隊列任務

img

  • 第一個紅框:是不是第一次執行任務,或者從隊列中能夠獲取到任務。
  • 第二個紅框:獲取到任務後,執行任務開始前操做鉤子。
  • 第三個紅框:執行任務。
  • 第四個紅框:執行任務後鉤子。

這兩個鉤子(beforeExecute,afterExecute)容許咱們本身繼承線程池,作任務執行先後處理。

總結

到這裏,源代碼分析到此爲止。接下來作一下簡單的總結。

所謂線程池本質是一個hashSet。多餘的任務會放在阻塞隊列中。

只有當阻塞隊列滿了後,纔會觸發非核心線程的建立。因此非核心線程只是臨時過來打雜的。直到空閒了,而後本身關閉了。

線程池提供了兩個鉤子(beforeExecute,afterExecute)給咱們,咱們繼承線程池,在執行任務先後作一些事情。

線程池原理關鍵技術:鎖(lock,cas)、阻塞隊列、hashSet(資源池)

img

參考文檔

Java中線程池,你真的會用嗎?

深刻源碼分析Java線程池的實現原理

線程池的使用與執行流程

相關文章
相關標籤/搜索