Java 線程池框架核心代碼分析

前言

多線程編程中,爲每一個任務分配一個線程是不現實的,線程建立的開銷和資源消耗都是很高的。線程池應運而生,成爲咱們管理線程的利器。Java 經過Executor接口,提供了一種標準的方法將任務的提交過程和執行過程解耦開來,並用Runnable表示任務。html

下面,咱們來分析一下 Java 線程池框架的實現ThreadPoolExecutorjava

下面的分析基於JDK1.7nginx

生命週期

ThreadPoolExecutor中,使用CAPACITY的高3位來表示運行狀態,分別是:編程

  1. RUNNING:接收新任務,而且處理任務隊列中的任務swift

  2. SHUTDOWN:不接收新任務,可是處理任務隊列的任務緩存

  3. STOP:不接收新任務,不出來任務隊列,同時中斷全部進行中的任務多線程

  4. TIDYING:全部任務已經被終止,工做線程數量爲 0,到達該狀態會執行terminated()框架

  5. TERMINATED:terminated()執行完畢ide

狀態轉換圖

狀態轉換圖函數

ThreadPoolExecutor中用原子類來表示狀態位

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

線程池模型

核心參數

  • corePoolSize:最小存活的工做線程數量(若是設置allowCoreThreadTimeOut,那麼該值爲 0)

  • maximumPoolSize:最大的線程數量,受限於CAPACITY

  • keepAliveTime:對應線程的存活時間,時間單位由TimeUnit指定

  • workQueue:工做隊列,存儲待執行的任務

  • RejectExecutionHandler:拒絕策略,線程池滿後會觸發

線程池的最大容量CAPACITY中的前三位用做標誌位,也就是說工做線程的最大容量爲(2^29)-1

四種模型

  • CachedThreadPool:一個可緩存的線程池,若是線程池的當前規模超過了處理需求時,那麼將回收空閒的線程,當需求增長時,則能夠添加新的線程,線程池的規模不存在任何的限制。

  • FixedThreadPool:一個固定大小的線程池,提交一個任務時就建立一個線程,直到達到線程池的最大數量,這時線程池的大小將再也不變化。

  • SingleThreadPool:一個單線程的線程池,它只有一個工做線程來執行任務,能夠確保按照任務在隊列中的順序來串行執行,若是這個線程異常結束將建立一個新的線程來執行任務。

  • ScheduledThreadPool:一個固定大小的線程池,而且以延遲或者定時的方式來執行任務,相似於Timer。

執行任務 execute

核心邏輯:

  1. 當前線程數量 < corePoolSize,直接開啓新的核心線程執行任務addWorker(command, true)

  2. 當前線程數量 >= corePoolSize,且任務加入工做隊列成功

    1. 檢查線程池當前狀態是否處於RUNNING

    2. 若是否,則拒絕該任務

    3. 若是是,判斷當前線程數量是否爲 0,若是爲 0,就增長一個工做線程。

  3. 開啓普通線程執行任務addWorker(command, false),開啓失敗就拒絕該任務

從上面的分析能夠總結出線程池運行的四個階段:

  1. poolSize < corePoolSize 且隊列爲空,此時會新建線程來處理提交的任務

  2. poolSize == corePoolSize,此時提交的任務進入工做隊列,工做線程從隊列中獲取任務執行,此時隊列不爲空且未滿。

  3. poolSize == corePoolSize,而且隊列已滿,此時也會新建線程來處理提交的任務,可是poolSize < maxPoolSize

  4. poolSize == maxPoolSize,而且隊列已滿,此時會觸發拒絕策略

拒絕策略

前面咱們提到任務沒法執行會被拒絕,RejectedExecutionHandler是處理被拒絕任務的接口。下面是四種拒絕策略。

  • AbortPolicy:默認策略,終止任務,拋出RejectedException

  • CallerRunsPolicy:在調用者線程執行當前任務,不拋異常

  • DiscardPolicy: 拋棄策略,直接丟棄任務,不拋異常

  • DiscardOldersPolicy:拋棄最老的任務,執行當前任務,不拋異常

線程池中的 Worker

Worker繼承了AbstractQueuedSynchronizerRunnable,前者給Worker提供鎖的功能,後者執行工做線程的主要方法runWorker(Worker w)(從任務隊列撈任務執行)。Worker 引用存在workers集合裏面,用mainLock守護。

private final ReentrantLock mainLock = new ReentrantLock();private final HashSet<Worker> workers = new HashSet<Worker>();

核心函數 runWorker

下面是簡化的邏輯,注意:每一個工做線程的run都執行下面的函數

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;    while (task != null || (task = getTask()) != null) {
        w.lock();
        beforeExecute(wt, task);
        task.run();
        afterExecute(task, thrown);
        w.unlock();
    }
    processWorkerExit(w, completedAbruptly);
}
  1. getTask()中獲取任務

  2. 鎖住 worker

  3. 執行beforeExecute(wt, task),這是ThreadPoolExecutor提供給子類的擴展方法

  4. 運行任務,若是該worker有配置了首次任務,則先執行首次任務且只執行一次。

  5. 執行afterExecute(task, thrown);

  6. 解鎖 worker

  7. 若是獲取到的任務爲 null,關閉 worker

獲取任務 getTask

線程池內部的任務隊列是一個阻塞隊列,具體實如今構造時傳入。

private final BlockingQueue<Runnable> workQueue;

getTask()從任務隊列中獲取任務,支持阻塞和超時等待任務,四種狀況會致使返回null,讓worker關閉。

  1. 現有的線程數量超過最大線程數量

  2. 線程池處於STOP狀態

  3. 線程池處於SHUTDOWN狀態且工做隊列爲空

  4. 線程等待任務超時,且線程數量超過保留線程數量

核心邏輯:根據timed在阻塞隊列上超時等待或者阻塞等待任務,等待任務超時會致使工做線程被關閉。

timed = allowCoreThreadTimeOut || wc > corePoolSize;Runnable r = timed ?
    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    workQueue.take();

在如下兩種狀況下等待任務會超時:

  1. 容許核心線程等待超時,即allowCoreThreadTimeOut(true)

  2. 當前線程是普通線程,此時wc > corePoolSize

工做隊列使用的是BlockingQueue,這裏就不展開了,後面再寫一篇詳細的分析。

總結

  • ThreadPoolExecutor基於生產者-消費者模式,提交任務的操做至關於生產者,執行任務的線程至關於消費者。

  • Executors提供了四種基於ThreadPoolExecutor構造線程池模型的方法,除此以外,咱們還能夠直接繼承ThreadPoolExecutor,重寫beforeExecuteafterExecute方法來定製線程池任務執行過程。

  • 使用有界隊列仍是×××隊列須要根據具體狀況考慮,工做隊列的大小和線程的數量也是須要好好考慮的。

  • 拒絕策略推薦使用CallerRunsPolicy,該策略不會拋棄任務,也不會拋出異常,而是將任務回退到調用者線程中執行。

相關文章
相關標籤/搜索