併發系列(6)之 ThreadPoolExecutor 詳解

本文將主要介紹咱們平時最經常使用的線程池 ThreadPoolExecutor ,有可能你平時沒有直接使用這個類,而是使用 Executors 的工廠方法建立線程池,雖然這樣很簡單,可是極可能由於這個線程池發生 OOM ,具體狀況文中會詳細介紹;html

2、ThreadPoolExecutor 概覽

ThreadPoolExecutor 的繼承關係如圖所示:java

executor

其中:異步

  • Executor:定義了 executor(Runnable command) 異步接口,可是沒有強制要求異步;
  • ExecutorService:提供了生命週期管理的方法,以及有返回值的任務提交;
  • AbstractExecutorService:提供了 ExecutorService 的默認實現;


1. 主體結構

public class ThreadPoolExecutor extends AbstractExecutorService {
  private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));  // 狀態控制變量,核心
  private final BlockingQueue<Runnable> workQueue;                         // 任務等待隊列
  private final HashSet<Worker> workers = new HashSet<Worker>();           // 工做線程集合
  private volatile ThreadFactory threadFactory;       // 線程構造工廠
  private volatile RejectedExecutionHandler handler;  // 拒絕策略
  private volatile long keepAliveTime;                // 空閒線程的存活時間(非核心線程)
  private volatile int corePoolSize;                  // 核心線程大小
  private volatile int maximumPoolSize;               // 工做線程最大容量

  public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
                            RejectedExecutionHandler handler) {
    if (corePoolSize < 0 || maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize || keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
  }
  ...
}

這裏已經能夠大體看出 ThreadPoolExecutor 的結構了:源碼分析

threadpool1


2. Worker 結構

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
  final Thread thread;  // 持有線程,只有在線程工廠運行失敗時爲空
  Runnable firstTask;   // 初始化任務,不爲空的時候,任務直接運行,不在添加到隊列
  volatile long completedTasks;  // 完成任務計數

  Worker(Runnable firstTask) {
    setState(-1);   // AQS 初始化狀態
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
  }

  public void run() {
    runWorker(this);  // 循環取任務執行
  }
  ...
  // AQS 鎖方法
}

這裏很容易理解的是 threadfirstTask;可是 Worker 還繼承了 AQS 作了一個簡易的互斥鎖,主要是在中斷或者 worker 狀態改變的時候使用;具體 AQS 的詳細說明能夠參考,AbstractQueuedSynchronizer 源碼分析優化


3. ctl 控制變量

ctl 控制變量(簡記 c)是一個 AtomicInteger 類型的變量,由兩部分信息組合而成(兩個值互補影響,又能夠經過簡單的大小比較判斷狀態):this

  • 線程池的運行狀態 (runState,簡記 rs),由 int 高位的前三位表示;
  • 線程池內有效線程的數量 (workerCount,簡記 wc),由 int 地位的29位表示;

源碼以下:線程

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;       // 用來表示線程數量的位數
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;  // 線程最大容量

                                                         // 狀態量
private static final int RUNNING    = -1 << COUNT_BITS;  // 高位 111,第一位是符號位,1表示負數
private static final int SHUTDOWN   =  0 << COUNT_BITS;  // 高位 000
private static final int STOP       =  1 << COUNT_BITS;  // 高位 001
private static final int TIDYING    =  2 << COUNT_BITS;  // 高位 010
private static final int TERMINATED =  3 << COUNT_BITS;  // 高位 011

private static int runStateOf(int c)     { return c & ~CAPACITY; }  // 運行狀態,取前3位
private static int workerCountOf(int c)  { return c & CAPACITY; }   // 線程數量,取後29位
private static int ctlOf(int rs, int wc) { return rs | wc; }        // 狀態和數量合成

private static boolean runStateLessThan(int c, int s) { return c < s; } // 狀態比較
private static boolean runStateAtLeast(int c, int s) { return c >= s; } 
private static boolean isRunning(int c) { return c < SHUTDOWN; } // RUNNING 是負數,必然小於 SHUTDOWN

代碼中能夠看到狀態判斷的時候都是直接比較的,這是由於 TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING ;他們的狀態變遷關係以下:code

threadpool2

其中:htm

  • RUNNING:運行狀態,可接收新任務;
  • SHUTDOWN:不可接收新任務,繼續處理已提交的任務;
  • STOP:不接收、不處理任務,中斷正在進行的任務
  • TIDYING:全部任務清空,線程中止;
  • TERMINATED:鉤子方法,執行後的最終狀態;


3、ThreadPoolExecutor 源碼分析

1. 增長工做線程

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 這裏正常狀況下,只要大於SHUTDOWN,則必然不能添加線程;可是這裏作了一個優化,
            // 若是線程池還在繼續處理任務,則能夠添加線程加速處理,
            // SHUTDOWN 表示不接收新任務,可是還在繼續處理,
            // firstTask 不爲空時,是在添加線程的時候,firstTask 不入隊,直接處理
            // workQueue 不爲空時,則還有任務須要處理
            // 因此連起來就是 rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||  // 容量超出,則返回
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;  // 線程數增長成功,則跳出循環
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)  // 若是線程狀態改變時,重頭開始重試
                    continue retry;
            }
        }
        // 此時線程計數,增長成功
      
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {  // 線程建立失敗時,直接退出
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) { // 這裏一樣檢查上面的優化條件
                        if (t.isAlive()) // 若是線程已經啓動,則狀態錯誤;
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize) largestPoolSize = s;  // 記錄工做線程的最大數,統計峯值用
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();  // 啓動線程
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted) addWorkerFailed(w); // 添加失敗清除
        }
        return workerStarted;
    }

2. 提交任務

public void execute(Runnable command) {
  if (command == null) throw new NullPointerException();
  int c = ctl.get();
  if (workerCountOf(c) < corePoolSize) {   // 若是小於核心線程,直接添加
    if (addWorker(command, true)) return;
    c = ctl.get();
  }
  if (isRunning(c) && workQueue.offer(command)) {  // 任務入隊
    int recheck = ctl.get();
    if (!isRunning(recheck) && remove(command))  // 再次檢查,狀態不是RUNNING的時候,拒絕並移除任務
      reject(command);
    else if (workerCountOf(recheck) == 0)  // 這裏是防止狀態爲SHUTDOWN時,已經添加的任務沒法執行
      addWorker(null, false);
  }
  else if (!addWorker(command, false))  // 任務入隊失敗時,直接添加線程,並運行
    reject(command);
}

流程圖以下:blog

threadpool2

因此影響任務提交的因數就有:

  • 核心線程的大小;
  • 是否爲阻塞隊列;
  • 線程池的大小;


3. 處理任務

工做線程啓動以後,首先處理 firstTask 任務(特別注意,這個任務是沒有入隊的),而後從 workQueue 中取出任務處理,隊列爲空時,超時等待 keepAliveTime

final void runWorker(Worker w) {
  Thread wt = Thread.currentThread();
  Runnable task = w.firstTask;
  w.firstTask = null;
  w.unlock(); // allow interrupts
  boolean completedAbruptly = true;
  try {
    while (task != null || (task = getTask()) != null) {  // 獲取任務
      w.lock();
      // 整體條件表示線程池中止的時候,須要中斷線程,
      // 若是沒有中止,則清除中斷狀態,確保未中斷
      if ((runStateAtLeast(ctl.get(), STOP) ||
         (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
        !wt.isInterrupted()) 
        wt.interrupt();
      try {
        beforeExecute(wt, task); // 回調方法
        Throwable thrown = null;
        try {
          task.run();
        } catch (RuntimeException x) {
          thrown = x; throw x;
        } catch (Error x) {
          thrown = x; throw x;
        } catch (Throwable x) {
          thrown = x; throw new Error(x);
        } finally {
          afterExecute(task, thrown);  // 回調方法
        }
      } finally {
        task = null;
        w.completedTasks++;
        w.unlock();
      }
    }
    completedAbruptly = false;
  } finally {
    processWorkerExit(w, completedAbruptly);  // 退出時清理
  }
}
private Runnable getTask() {
  boolean timedOut = false; // Did the last poll() time out?

  for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

    // 此處保證 SHUTDOWN 狀態繼續處理任務,STOP 狀態中止處理
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
      decrementWorkerCount();
      return null;
    }
    int wc = workerCountOf(c);
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;  // 是否關閉空閒線程

    if ((wc > maximumPoolSize || (timed && timedOut))  // 若是線程大於最大容量,或者容許關閉,且第一次沒取到
      && (wc > 1 || workQueue.isEmpty())) {            // 返回空,最後由 processWorkerExit 清理
      if (compareAndDecrementWorkerCount(c))
        return null;
      continue;
    }

    try {
      // 是否超時獲取
      Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
      if (r != null)
        return r;
      timedOut = true;
    } catch (InterruptedException retry) {
      timedOut = false;
    }
  }
}

4. 中止線程池

public void shutdown() {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    checkShutdownAccess();     // 檢查中止權限
    advanceRunState(SHUTDOWN); // 設置線程池狀態
    interruptIdleWorkers();    // 設置全部線程中斷
    onShutdown();              // hook for ScheduledThreadPoolExecutor
  } finally {
    mainLock.unlock();
  }
  tryTerminate();              // 繼續執行等待隊列中的任務,完畢後設置 TERMINATED 狀態
}
public List<Runnable> shutdownNow() {
  List<Runnable> tasks;
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    checkShutdownAccess();
    advanceRunState(STOP);
    interruptWorkers();
    tasks = drainQueue();   // 清空全部等待隊列的任務,並返回
  } finally {
    mainLock.unlock();
  }
  tryTerminate();
  return tasks;
}

能夠看到 shutdownNow 只比 shutdown 多了,清空等待隊列,可是正在執行的任務仍是會繼續執行;

4、拒絕策略

以前提到了,提交任務失敗的時候,會執行拒絕操做,在 JDk 中爲咱們提供了四種策略:

  • AbortPolicy:直接拋出 RejectedExecutionException 異常,這是默認的拒絕策略;
  • CallerRunsPolicy:由調用線程自己運行任務,以減緩提交速度;
  • DiscardPolicy:不處理,直接丟棄掉;
  • DiscardOldestPolicy:丟棄最老的任務,並執行當前任務;

5、Executors 工廠方法

另外就是根據線程池參數的不一樣,Executors 爲咱們提供了4種典型的用法:

SingleThreadExecutor:單線程的線程池,提交任務順序執行;

public static ExecutorService newSingleThreadExecutor() {
  return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}

如代碼所示,就是最大線程、核心線程都是1,和無界隊列組成的線程池,提交任務的時候就會,直接將任務加入隊列順序執行;

FixedThreadPool:固定線程數量線程池:

public static ExecutorService newFixedThreadPool(int nThreads) {
  return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, 
                                new LinkedBlockingQueue<Runnable>());
}

SingleThreadExecutor 同樣,只是線程數量由用戶決定;

CachedThreadPool:動態調節線程池;

public static ExecutorService newCachedThreadPool() {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, 
                                new SynchronousQueue<Runnable>());
}

這裏核心線程爲0,隊列是 SynchronousQueue 容量爲1的阻塞隊列,而線程數最大,存活60s,因此有任務的時候直接建立新的線程,超時空閒60s;

ScheduledThreadPool:定時任務線程池,功能同 Timer 相似,具體細節後續還會講到;

總結

  • 決定線程池運行邏輯的主要有三個變量,核心線程大小,隊列容量,線程池容量
  • 最後發現其實 Executors 提供的幾種實現,都很典型;可是卻容易發生 OOM ,因此最好仍是本身手動建立比較好;
相關文章
相關標籤/搜索