線程池的基本概念

線程池,是一種線程的使用模式,它爲了下降線程使用中頻繁的建立和銷燬所帶來的資源消耗與代價。
經過建立必定數量的線程,讓他們時刻準備就緒等待新任務的到達,而任務執行結束以後再從新回來繼續待命。java

這就是線程池最核心的設計思路,「複用線程,平攤線程的建立與銷燬的開銷代價」。git

相比於來一個任務建立一個線程的方式,使用線程池的優點體如今以下幾點:程序員

  1. 避免了線程的重複建立與開銷帶來的資源消耗代價
  2. 提高了任務響應速度,任務來了直接選一個線程執行而無需等待線程的建立
  3. 線程的統一分配和管理,也方便統一的監控和調優

線程池的實現天生就實現了異步任務接口,容許你提交多個任務到線程池,線程池負責選用線程執行任務調度。github

異步任務在上一篇文章中已經作過一點鋪墊介紹,那麼本篇就在前一篇的基礎上深刻的去探討一下異步任務與線程池的相關內容。微信

基本介紹

在正式介紹線程池相關概念以前,咱們先看一張線程池相關接口的類圖結構,網上盜來的,但畫的仍是很全面的。異步

線程池相關類圖

右上角的幾個接口能夠先不看,等咱們介紹到組合任務的時候會繼續說的,咱們看左邊,Executor、ExecutorService 以及 AbstractExecutorService 都是咱們熟悉的,它們抽象了任務執行者的基本模型。函數

ThreadPoolExecutor 是對線程池概念的抽象,它天生實現了任務執行的相關接口,也就是說,線程池也是一個任務的執行者,容許你向其中提交多個任務,線程池將負責分配線程與調度任務。學習

至於 Schedule 線程池,它是擴展了基礎的線程池實現,提供「計劃調度」能力,定時調度任務,延時執行等。this

線程池基本原理

ThreadPoolExecutor 的建立並不複雜,直接 new 就好,只不過構造函數有很久個重載,咱們直接看最底層的那個,也就是參數最多的那個。線程

public ThreadPoolExecutor
(   int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler
)

建立一個線程池須要傳這麼多參數?是否是以爲有點喪心病狂?

不要擔憂,我說了,這是最複雜的一個構造函數重載,須要傳入最全面的構造參數。而你平常使用時,固然可使用 ThreadPoolExecutor 中的其餘較爲簡便的構造函數,只不過有些你沒傳的參數將配置爲默認值而已。

下面咱們將從這些參數的含義出發,看看線程池 ThreadPoolExecutor 具有一個怎樣的構成結構。

一、線程池容量問題

構造函數中有這麼幾個參數是用於配置線程池中線程容量與生命週期的:

  • corePoolSize
  • maximumPoolSize
  • keepAliveTime

corePoolSize 指定了線程池中的核心線程的個數,核心線程就是永遠不會被銷燬的線程,一旦被建立出來就將永遠存活在線程池之中。

maximumPoolSize 指定了線程池可以建立的最大線程數量。

keepAliveTime 是用於控制非核心線程最長空閒等待時間,若是一個非核心線程處理完任務後回到線程池待命,超過這個指定時長依然沒有新任務的分配將致使線程被銷燬。

二、任務阻塞問題

ThreadPoolExecutor 中有這麼一個字段:

private final BlockingQueue workQueue;

這個隊列的做用很明顯,就是當線程池中的線程不夠用的時候,讓任務排隊,等待有線程空閒再來取任務去執行。

三、線程工廠

線程工廠 ThreadFactory 中只定義了一個方法 newThread,子類實現它並按照本身的需求建立一個線程返回。

例如 DefaultThreadFactory 實現的該方法將建立一個線程,名稱格式: pool- <線程池編號> -thread- <線程編號> ,設置線程的優先級爲標準優先級,非守護線程等。

四、任務拒絕策略

構造函數中還有一個參數 handle 是必須傳的,它將爲 ThreadPoolExecutor 中的同名字段賦值。

private volatile RejectedExecutionHandler handler;

RejectedExecutionHandler 中定義了一個 rejectedExecution 用於描述一種任務拒絕策略。那麼哪一種狀況下才會觸發該方法的調用呢?

當線程池中的全部線程所有分配出去工做了,而且任務阻塞隊列也阻塞滿了,那麼此時新提交的任務將觸發任務拒絕策略

而拒絕策略主要有如下四個子類實現,而它們都是定義在 ThreadPoolExecutor 的內部類,咱們看一看都是哪四種策略:

  • AbortPolicy
  • CallerRunsPolicy
  • DiscardOldestPolicy
  • DiscardPolicy

AbortPolicy 是默認的拒絕策略,他的實現就是直接拋出 RejectedExecutionException 異常。

CallerRunsPolicy 暫停當前提交任務的線程返回,本身去執行本身提交過來的任務。

DiscardOldestPolicy 策略將從阻塞任務隊列對頭移除一個任務並將本身排到隊列尾部等待調度執行。

DiscardPolicy 是一種佛系策略,方法體的實現爲空,什麼也不作,也即忽略當前任務的提交。

這樣,咱們零零散散的對線程池的內部有了一個基本的認識,下面咱們要把這些都串起來,看一看源碼。從一個任務的提交,到分配到線程執行任務,一整個過程的相關邏輯作一個探究。

看一看源碼

先來看一看任務的提交方法,submit

submit

以前的文章咱們也說過,這個 submit 方法有四個重載,分別容許你傳入不一樣類型的任務,Runnable 或是 Callable。咱們這裏就之前者爲例。

這個 RunnableFuture 類型咱們以前說過,他只不過是同時繼承了 Runnable 和 Future 接口,象徵性的描述了「這是一個可監控的任務」。

而後你會發現,整個 submit 的核心邏輯在 execute 方法裏面,也就是說 execute 方法纔是真正向線程池提交任務的方法。咱們重點看一看這個 execute 方法。

先看看 ThreadPoolExecutor 中定義幾個重要的字段:

image

ctl 是一個原子變量,它用了一個 32 位的整型描述了兩個重要信息。當前線程池運行狀態(runState)和當前線程池中有效的線程個數(workCount)。

runState 佔用高 3 比特位,workCount 佔用低 29 比特位。

接着咱們來看 execute 方法的實現:

image

紅框部分:

若是當前線程池中的實際工做線程數還未達到配置的核心線程數量,那麼將調用 addWorker 爲當前任務建立一個新線程並啓動執行。

addWorker 方法代碼仍是有點多的,這裏就截圖出來進行分析了,由於並不難,咱們總結下該方法的邏輯:

  1. 死循環中判斷線程池狀態是否正常,若是不正常被關閉了等,將直接返回 false
  2. 若是正常則 CAS 嘗試爲 workerCount 增長一,並建立一個新的線程調用 start 方法執行任務。

不知道你留意到 addWorker 方法的第二個參數了沒有,這個參數用於指定線程池的上界。

若是傳的是 true,則說明使用 corePoolSize 做爲上界,也就是這次爲任務分配線程若是線程池中全部的工做線程數達到這個 corePoolSize 則將拒絕分配並返回添加失敗。

若是傳的是 false,則使用 maximumPoolSize 做爲上界,道理是同樣的。

藍框部分:

從紅框出來,你能夠認爲任務分配線程失敗了,大機率是全部正常工做的線程數達到核心線程數量了。這部分作的事情就是:

  1. 若是線程池狀態正常,就嘗試將當前任務添加到任務阻塞隊列上。
  2. 再一次檢查線程池狀態,若是異常了,將撤回剛纔添加的任務並根據咱們設定的拒絕策略予以拒絕。
  3. 若是發現線程池自上次檢查後,所喲線程所有死亡,那麼將建立一個空閒線程,適當的時候他會去從任務隊列取咱們剛剛添加的任務的

黃框部分:

到達黃色部分必然說明線程池狀態異常或是隊列添加失敗,大機率是由於隊列滿了沒法再添加了。

此時再次調用 addWorker 方法,不過此次傳入 false,意思是,我知道全部的核心線程都在忙而且任務隊列也排滿了,那麼你就額外建立一個非核心線程來執行個人任務吧。

若是失敗了,執行拒絕策略。

咱們總結一下任務的提交到分配線程,甚至阻塞到任務隊列這一系列過程:

一個任務過來,若是線程池中的線程數不足咱們配置的核心線程數,那麼會嘗試建立新線程來執行任務,不然會優先把任務往阻塞隊列上添加

若是阻塞隊列上滿員了,那麼說明當前線程池中核心線程工做量有點大,將開始建立非核心線程共同執行任務,直到達到上限或是阻塞隊列再也不滿員。

到這裏呢,咱們對於任務的提交與線程分配已經有了一個基本的認識了,相信你也必定好奇當一個線程的任務執行結束以後,他是如何去取下一個任務的。

這部分咱們也來分析分析

線程池的內部定義了一個 Worker 內部類,這個類有兩個字段,一個用於保存當前的任務,一個用於保存用於執行該任務的線程。

addWorker 中會調用線程的 start 方法,進而會執行 Worker 實例的 run 方法,這個 run 方法是這樣的:

public void run() {
    runWorker(this);
}

runWorker 很長,就不截出來一點點分析了,我總結下他的實現邏輯:

  1. 若是本身內部的任務是空,則嘗試從阻塞隊列上獲取一個任務
  2. 執行任務
  3. 循環的執行 1和2 兩個步驟,直到阻塞隊列中沒有任務可獲取
  4. 調用 processWorkerExit 方法移除當前線程在線程池中的引用,也就至關於銷燬了一個線程,由於不久後會被 GC 回收

可是這裏有一個細節和你們說一下,第一個步驟從任務隊列中取一個任務調用的是 getTask 方法。

這個方法設定了一個邏輯,若是線程池中正在工做的線程數大於設定的核心線程數,也就是說線程池中存在非核心線程,那麼當前線程獲取任務時,若是超過指定時長依然沒有獲取,就將返回跳過循環執行咱們 runWorker 的第四個步驟,移除對該線程的引用。

反之,若是此時有效工做線程數少於規定的核心線程數,則認定當前線程是一個核心線程,因而對於獲取任務失敗的處理是「阻塞到條件隊列上,等待其餘線程喚醒」。

何時喚醒也很容易想到了,就是當任務隊列有新任務添加時,會喚醒全部的核心線程,他們會去隊列上取任務,沒搶到的依然回去阻塞。

至此,線程池相關的內容介紹完畢,有些方法的實現我只是總結了大概的邏輯,具體的尤待大家本身去探究,有問題也歡迎你和我討論。

關注公衆不迷路,一個愛分享的程序員。

公衆號回覆「1024」加做者微信一塊兒探討學習!

每篇文章用到的全部案例代碼素材都會上傳我我的 github

https://github.com/SingleYam/overview_java

歡迎來踩!

YangAM 公衆號

相關文章
相關標籤/搜索