Java線程池二:線程池原理

最近精讀Netty源碼,讀到NioEventLoop部分的時候,發現對Java線程&線程池有些概念還有困惑, 因此深刻總結一下
Java線程池一:線程基礎
Java線程池二:線程池原理html

爲何須要使用線程池

Java線程映射的是系統內核線程,是稀缺資源,使用線程池主要有如下幾點好處java

  • 下降資源消耗:重複利用池中線程下降線程的建立和消耗形成的資源消耗。
  • 提升響應速度:任務到達時直接使用池總中空閒的線程,能夠不用等待線程建立。
  • 提升線程的可管理性:線程是稀缺資源,不能無限制建立,使用線程池能夠統一進行分配、監控、調優。

線程池框架簡介

  • Executor接口:提供execute方法提交任務
  • ExecutorService接口:提供能夠跟蹤任務執行結果的 submit方法 & 提供線程池關閉的方法(shutdown, shutdowNow)
  • AbstractExecutorService抽象類:實現submit方法
  • ThreadPoolExecutor: 線程池實現類
  • ScheduleThreadPoolExecutor:能夠執行定時任務的線程池

ThreadPoolExecutor原理

核心參數以及含義

  • corePoolSize:核心線程池大小
  • maximumPoolSize: 線程池最大大小
  • workQueue: 工做隊列(任務暫時存放的地方)
  • RejectedExecutionHandler:拒絕策略(線程池沒法執行該任務時的處理策略)

任務提交流程

任務提交過程見下流程圖
api

線程池的狀態

  • RUNNING:正常的線程池運行狀態
  • SHUTDOWN:調用shutdown方法到該狀態,該狀態下拒絕提交新任務,但會將已提交的任務的處理完畢
  • STOP:調用shutdownNow方法到該狀態,該狀態下拒絕新任務的提交 & 丟棄工做隊列中的任務 & 中斷正在執行任務的工做線程
  • TIDYING:工做隊列和線程池都爲空時自動到該狀態
  • TERMINATED:terminated方法返回以後自動到該狀態

工做隊列

核心線程池滿時,任務會嘗試提交到工做隊列,後續工做線程會從工做隊列中獲取任務執行。數組

由於涉及到多個線程對工做隊列的讀寫,因此工做隊列須要是線程安全的,Java提供瞭如下幾種線程安全的隊列(BlockingQueue)安全

實現類 工做機制
ArrayBlockingQueue 底層實現是數組
LinkedBlockingDeque 底層實現是鏈表
PriorityBlockingQueue 優先隊列,本質是個小頂堆
DelayQueue 延時隊列 (優先隊列 & 元素實現Delayed接口),ScheduledThreadPoolExecutor實現的關鍵
SynchronousQueue 同步隊列

BlockingQueue 多組讀寫操做API

操做 描述
add/remove 隊列已滿/隊列已空時,拋出異常
put/take 隊列已滿/隊列已空時,阻塞等待
offer/poll 隊列已滿/隊列已空時,返回特殊值(false/null)
offer(time) / poll(time) 超時時間內沒法寫入或者讀取成功,返回特殊值

拒絕策略

拒絕策略是當線程池滿負載時(任務隊列已滿 & 線程池已滿)對新提交任務的處理策略,jdk提供了以下四種實現,其中AbortPolicy是默認實現。併發

實現類 工做機制
AbortPolicy 拋出RejectedExecutionException異常
CallerRunsPolicy 調用線程執行該任務
DiscardOldestPolicy 丟棄工做隊列頭部任務,再嘗試提交該任務
DiscardPolicy 直接丟棄

固然咱們能夠有自定義的實現,好比記錄日誌、任務實例持久化,同時發送報警到開發人員。框架

跟蹤任務的執行結果

線程池提供了幾個submit方法, 調用線程能夠根據返回的Future對象獲取任務執行結果,那麼它的實現原理又是什麼吶?oop

裝飾模式對task的run方法進行加強this

1.提交任務前,會把task裝飾成一個FutureTask對象線程

public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
 }

2.FutureTask對象的run方法會存儲返回的結果或者異常。調用方能夠根據FutureTask獲取任務的執行結果。

//省略了部分代碼
public void run() {
      Callable<V> c = callable;
      if (c != null && state == NEW) {
        V result;
        boolean ran;
        try {
          //執行任務
          result = c.call();
          ran = true;
        } catch (Throwable ex) {
          result = null;
          ran = false;
          //存儲異常
          setException(ex);
        }
        if (ran)
          //存儲返回值
          set(result);
 }

線程池的關閉

shutdown

shutdown將線程池的狀態設置成SHUTDOWN,同時拒絕提交新的任務,可是已提交的任務會正常執行

shutdownNow

shutdownNow將線程池的狀態設置成STOP,該狀態下拒絕提交新的任務 & 丟棄工做隊列中的任務& 中斷當前活躍的線程(嘗試中止正在執行的任務)

須要注意的是shutdownNow對於正在執行的任務只是嘗試中止,不保證成功(取決於任務是否監聽處理中斷位)

ScheduledThreadPoolExecutor 定時調度原理

ScheduledThreadPoolExecutor在ThreadPoolExecutor之上擴展實現了定時調度的能力

1.實例化時工做隊列使用延時隊列(DelayedWorkQueue)--- 本質是個小頂堆

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), handler);
}

2.提交的任務裝飾成ScheduledFutureTask類型,並把任務加入到工做隊列(不直接調用execute)

public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        //裝飾
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
  			//任務加入工做隊列
        delayedExecute(t);
        return t;
    }

3.ScheduledFutureTask實現Delayed和Comparable接口

因此提交到工做隊列中的任務是按照任務執行時間排序的(最先執行的任務在頭部),由於工做隊列是個小頂堆。

public long getDelay(TimeUnit unit) {
    return unit.convert(time - now(), NANOSECONDS);
}

public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

4.只能從工做隊列中獲取已到執行時間的任務

public RunnableScheduledFuture<?> poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        RunnableScheduledFuture<?> first = queue[0];
      	//若是頭部的任務尚未到執行時間, 直接返回null
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return finishPoll(first);
    } finally {
        lock.unlock();
    }
}

線程池配置

假設:CPU核心數是N,每一個任務的執行時間是T,任務的超時時間是timeout,核心線程數是corePoolSize,工做隊列大小是workQueue, 最大線程數是 maxPoolSize, 任務最大併發數爲maxTasks

核心線程數配置

  1. 對於CPU密集型任務:corePoolSize 大小設置成和CPU核心數接近,如N+1 或者 N+2

  2. 對於IO密集型任務:corePoolSize能夠設置的比較大一些,如2N~3N;也能夠經過以下邏輯進行估算

    假設80%的時間是IO操做,那麼每一個任務須要佔用CPU時間大概是0.2T, 每秒每一個CPU核心最大能夠執行的任務數爲 = (1/0.2T) = 5/T;因此理論上 80%IO的狀況下corePoolSize能夠設置爲 5N (一個cpu能夠對應5個工做線程)

工做隊列大小配置

工做隊列的大小取決於任務的超時時間 & 核心線程池的吞吐量

workQueue = corePoolSize * (1/T) * timeout = (corePoolSize * timeout) / T

須要注意的是: 工做隊列不能使用無界隊列。(無界隊列異常狀況下可能耗盡系統資源,形成服務不可用)

最大線程數配置

最大線程數的大小取決於最大的任務併發數 & 工做隊列的大小 & 任務的執行時間

maxPoolSize = (maxTasks - workQueue) / T

拒絕策略配置

對於可有可無的任務,咱們能夠直接丟棄;對於一些重要的任務須要對任務進行持久化,以便後續進行補償和恢復。

線程池監控

咱們能夠有個定時腳本將線程池的最大線程數、工做隊列大小、已經執行的任務數、已經拒絕的任務數等數據推送到監控系統

這樣咱們能夠根據這些數據對線程池進行調優,也能夠即便感知線上業務異常。

相關文章
相關標籤/搜索