從源碼的角度解析線程池運行原理

微信公衆號「後端進階」,專一後端技術分享:Java、Golang、WEB框架、分佈式中間件、服務治理等等。
老司機傾囊相授,帶你一路進階,來不及解釋了快上車!java

在講解完線程池的構造參數和一些不經常使用的設置以後,有些同窗仍是想繼續深刻地瞭解線程池的原理,因此這篇文章我會帶你們深刻源碼,從底層吃透線程池的運行原理。面試

ThreadPoolExecutor

在深刻源碼以前先來看看J.U.C包中的線程池類圖:後端

它們的最頂層是一個Executor接口,它只有一個方法:安全

public interface Executor {
    void execute(Runnable command);
}
複製代碼

它提供了一個運行新任務的簡單方法,Java線程池也稱之爲Executor框架。微信

ExecutorService擴展了Executor,添加了操控線程池生命週期的方法,如shutDown(),shutDownNow()等,以及擴展了可異步跟蹤執行任務生成返回值Future的方法,如submit()等方法。數據結構

ThreadPoolExecutor繼承自AbstractExecutorService,同時實現了ExecutorService接口,也是Executor框架默認的線程池實現類,也是這篇文章重點分析的對象,通常咱們使用線程池,如沒有特殊要求,直接建立ThreadPoolExecutor,初始化一個線程池,若是須要特殊的線程池,則直接繼承ThreadPoolExecutor,並實現特定的功能,如ScheduledThreadPoolExecutor,它是一個具備定時執行任務的線程池。多線程

下面咱們開始ThreadPoolExecutor的源碼分析了(如下源碼爲JDK8版本):併發

ctl變量

ctl是一個Integer值,它是對線程池運行狀態和線程池中有效線程數量進行控制的字段,Integer值一共有32位,其中高3位表示"線程池狀態",低29位表示"線程池中的任務數量"。咱們看看Doug Lea大神是如何實現的:框架

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
// 經過位運算獲取線程池運行狀態
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 經過位運算獲取線程池中有效的工做線程數
private static int workerCountOf(int c) { return c & CAPACITY; }
// 初始化ctl變量值
private static int ctlOf(int rs, int wc) { return rs | wc; }
複製代碼

線程池一共有狀態5種狀態,分別是:異步

  1. Running:線程池初始化時默認的狀態,表示線程正處於運行狀態,可以接受新提交的任務,同時也可以處理阻塞隊列中的任務;
  2. SHUTDOWN:調用shutdown()方法會使線程池進入到該狀態,該狀態下再也不繼續接受新提交的任務,可是還會處理阻塞隊列中的任務;
  3. STOP:調用shutdownNow()方法會使線程池進入到該狀態,該狀態下再也不繼續接受新提交的任務,同時再也不處理阻塞隊列中的任務;
  4. TIDYING:若是線程池中workerCount=0,即有效線程數量爲0時,會進入該狀態;
  5. TERMINATED:在terminated()方法執行完後進入該狀態,只不過terminated()方法須要咱們自行實現。

咱們再來看看位運算:

COUNT_BITS表示ctl變量中表示有效線程數量的位數,這裏COUNT_BITS=29;

CAPACITY表示最大有效線程數,根據位運算得出COUNT_MASK=11111111111111111111111111111,這算成十進制大約是5億,在設計之初就已經想到不會開啓超過5億條線程,因此徹底夠用了;

線程池狀態的位運算獲得如下值:

  1. RUNNING:高三位值111
  2. SHUTDOWN:高三位值000
  3. STOP:高三位值001
  4. TIDYING:高三位值010
  5. TERMINATED:高三位值011

這裏簡單解釋一下Doug Lea大神爲何使用一個Integer變量表示兩個值:

不少人會想,一個變量表示兩個值,就節省了存儲空間,可是這裏很顯然不是爲了節省空間而設計的,即便將這輛個值拆分紅兩個Integer值,一個線程池也就多了4個字節而已,爲了這4個字節而去大費周章地設計一通,顯然不是Doug Lea大神的初衷。

在多線程的環境下,運行狀態和有效線程數量每每須要保證統一,不能出現一個改而另外一個沒有改的狀況,若是將他們放在同一個AtomicInteger中,利用AtomicInteger的原子操做,就能夠保證這兩個值始終是統一的。

Doug Lea大神牛逼!

Worker

Worker類繼承了AQS,並實現了Runnable接口,它有兩個重要的成員變量:firstTask和thread。firstTask用於保存第一次新建的任務;thread是在調用構造方法時經過ThreadFactory來建立的線程,是用來處理任務的線程。

如何在線程池中添加任務?

線程池要執行任務,那麼必須先添加任務,execute()雖然說是執行任務的意思,但裏面也包含了添加任務的步驟在裏面,下面源碼:

java.util.concurrent.ThreadPoolExecutor#execute:

public void execute(Runnable command) {
  // 若是添加訂單任務爲空,則空指針異常
  if (command == null)
    throw new NullPointerException();
  // 獲取ctl值
  int c = ctl.get();
  // 1.若是當前有效線程數小於核心線程數,調用addWorker執行任務(即建立一條線程執行該任務)
  if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
      return;
    c = ctl.get();
  }
  // 2.若是當前有效線程大於等於核心線程數,而且當前線程池狀態爲運行狀態,則將任務添加到阻塞隊列中,等待空閒線程取出隊列執行
  if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
      reject(command);
    else if (workerCountOf(recheck) == 0)
      addWorker(null, false);
  }
  // 3.若是阻塞隊列已滿,則調用addWorker執行任務(即建立一條線程執行該任務)
  else if (!addWorker(command, false))
    // 若是建立線程失敗,則調用線程拒絕策略
    reject(command);
}
複製代碼

能夠發現,源碼的解讀對應「你都瞭解線程池的參數嗎?」裏面那道面試題的解析是同樣的,我在這裏畫一下execute執行任務的流程圖:

繼續往下看,addWorker添加任務,方法源碼有點長,我按照邏輯拆分紅兩部分講解:

java.util.concurrent.ThreadPoolExecutor#addWorker:

retry:
for (;;) {
  int c = ctl.get();
  // 獲取線程池當前運行狀態
  int rs = runStateOf(c);

  // 若是rs大於SHUTDOWN,則說明此時線程池不在接受新任務了
  // 若是rs等於SHUTDOWN,同時知足firstTask爲空,且阻塞隊列若是有任務,則繼續執行任務
  // 也就說明了若是線程池處於SHUTDOWN狀態時,能夠繼續執行阻塞隊列中的任務,但不能繼續往線程池中添加任務了
  if (rs >= SHUTDOWN &&
      ! (rs == SHUTDOWN &&
         firstTask == null &&
         ! workQueue.isEmpty()))
    return false;

  for (;;) {
    // 獲取有效線程數量
    int wc = workerCountOf(c);
    // 若是有效線程數大於等於線程池所容納的最大線程數(基本不可能發生),不能添加任務
    // 或者有效線程數大於等於當前限制的線程數,也不能添加任務
    // 限制線程數量有任務是否要核心線程執行決定,core=true使用核心線程執行任務
    if (wc >= CAPACITY ||
        wc >= (core ? corePoolSize : maximumPoolSize))
      return false;
    // 使用AQS增長有效線程數量
    if (compareAndIncrementWorkerCount(c))
      break retry;
    // 若是再次獲取ctl變量值
    c = ctl.get();  // Re-read ctl
    // 再次對比運行狀態,若是不一致,再次循環執行
    if (runStateOf(c) != rs)
      continue retry;
    // else CAS failed due to workerCount change; retry inner loop
  }
}
複製代碼

這裏特別強調,firstTask是開啓線程執行的首個任務,以後常駐在線程池中的線程執行的任務都是從阻塞隊列中取出的,須要注意。

以上for循環代碼主要做用是判斷ctl變量當前的狀態是否能夠添加任務,特別說明了若是線程池處於SHUTDOWN狀態時,能夠繼續執行阻塞隊列中的任務,但不能繼續往線程池中添加任務了;同時增長工做線程數量使用了AQS做同步,若是同步失敗,則繼續循環執行。

// 任務是否已執行
boolean workerStarted = false;
// 任務是否已添加
boolean workerAdded = false;
// 任務包裝類,咱們的任務都須要添加到Worker中
Worker w = null;
try {
  // 建立一個Worker
  w = new Worker(firstTask);
  // 獲取Worker中的Thread值
  final Thread t = w.thread;
  if (t != null) {
    // 操做workers HashSet 數據結構須要同步加鎖
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
      // Recheck while holding lock.
      // Back out on ThreadFactory failure or if
      // shut down before lock acquired.
      // 獲取當前線程池的運行狀態
      int rs = runStateOf(ctl.get());
      // rs < SHUTDOWN表示是RUNNING狀態;
      // 若是rs是RUNNING狀態或者rs是SHUTDOWN狀態而且firstTask爲null,向線程池中添加線程。
      // 由於在SHUTDOWN時不會在添加新的任務,但仍是會執行workQueue中的任務
      // rs是RUNNING狀態時,直接建立線程執行任務
      // 當rs等於SHUTDOWN時,而且firstTask爲空,也能夠建立線程執行任務,也說說明了SHUTDOWN狀態時再也不接受新任務
      if (rs < SHUTDOWN ||
          (rs == SHUTDOWN && firstTask == null)) {
        if (t.isAlive()) // precheck that t is startable
          throw new IllegalThreadStateException();
        workers.add(w);
        int s = workers.size();
        if (s > largestPoolSize)
          largestPoolSize = s;
        workerAdded = true;
      }
    } finally {
      mainLock.unlock();
    }
    // 啓動線程執行任務
    if (workerAdded) {
      t.start();
      workerStarted = true;
    }
  }
} finally {
  if (! workerStarted)
    addWorkerFailed(w);
}
return workerStarted;
}
複製代碼

以上源碼主要的做用是建立一個Worker對象,並將新的任務裝進Worker中,開啓同步將Worker添加進workers中,這裏須要注意workers的數據結構爲HashSet,非線程安全,因此操做workers須要加同步鎖。添加步驟作完後就啓動線程來執行任務了,繼續往下看。

如何執行任務?

咱們注意到上面的代碼中:

// 啓動線程執行任務
if (workerAdded) {
  t.start();
  workerStarted = true;
}
複製代碼

這裏的t是w.thread獲得的,便是Worker中用於執行任務的線程,該線程由ThreadFactory建立,咱們再看看生成Worker的構造方法:

Worker(Runnable firstTask) {
  setState(-1); // inhibit interrupts until runWorker
  this.firstTask = firstTask;
  this.thread = getThreadFactory().newThread(this);
}
複製代碼

newThread傳的參數是Worker自己,而Worker實現了Runnable接口,因此當咱們執行t.start()時,執行的是Worker的run()方法,找到入口了:

java.util.concurrent.ThreadPoolExecutor.Worker#run:

public void run() {
  runWorker(this);
}
複製代碼

java.util.concurrent.ThreadPoolExecutor#runWorker:

final void runWorker(Worker w) {
  Thread wt = Thread.currentThread();
  Runnable task = w.firstTask;
  w.firstTask = null;
  w.unlock(); // allow interrupts
  boolean completedAbruptly = true;
  try {
    // 循環從workQueue阻塞隊列中獲取任務並執行
    while (task != null || (task = getTask()) != null) {
      // 加同步鎖的目的是爲了防止同一個任務出現多個線程執行的問題
      w.lock();
      // 若是線程池正在關閉,須確保中斷當前線程
      if ((runStateAtLeast(ctl.get(), STOP) ||
           (Thread.interrupted() &&
            runStateAtLeast(ctl.get(), STOP))) &&
          !wt.isInterrupted())
        wt.interrupt();
      try {
        // 執行任務前能夠作一些操做
        beforeExecute(wt, task);
        Throwable thrown = null;
        try {
          // 執行任務
          task.run();
        } catch (RuntimeException x) {
          thrown = x; throw x;
        } catch (Error x) {
          thrown = x; throw x;
        } catch (Throwable x) {
          thrown = x; throw new Error(x);
        } finally {
          // 執行任務後能夠作一些操做
          afterExecute(task, thrown);
        }
      } finally {
        // 將task置爲空,讓線程自行調用getTask()方法從workQueue阻塞隊列中獲取任務
        task = null;
        // 記錄Worker執行了多少次任務
        w.completedTasks++;
        w.unlock();
      }
    }
    completedAbruptly = false;
  } finally {
    // 線程回收過程
    processWorkerExit(w, completedAbruptly);
  }
}
複製代碼

這一步是執行任務的核心方法,首次執行不爲空的firstTask任務,以後便一直從workQueue阻塞隊列中獲取任務並執行,若是你想在任務執行先後作點啥不可告人的小動做,你能夠實現ThreadPoolExecutor如下兩個方法:

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
複製代碼

這樣一來,咱們就能夠對任務的執行進行實時監控了。

這裏還須要注意,在finally塊中,將task置爲空,目的是爲了讓線程自行調用getTask()方法從workQueue阻塞隊列中獲取任務。

如何保證核心線程不被銷燬?

咱們以前已經知道線程池中可維持corePoolSize數量的常駐核心線程,那麼它們是如何保證執行完任務而不被線程池回收的呢?在前面的章節中你可能已經到從workQueue隊列中會阻塞式地獲取任務,若是沒有獲取任務,那麼就會一直阻塞下去,很聰明,你已經知道答案了,如今咱們來看Doug Lea大神是如何實現的。

java.util.concurrent.ThreadPoolExecutor#getTask:

private Runnable getTask() {
  // 超時標記,默認爲false,若是調用workQueue.poll()方法超時了,會標記爲true
  // 這個標記很是之重要,下面會說到
  boolean timedOut = false;

  for (;;) {
    // 獲取ctl變量值
    int c = ctl.get();
    int rs = runStateOf(c);

    // 若是當前狀態大於等於SHUTDOWN,而且workQueue中的任務爲空或者狀態大於等於STOP
    // 則操做AQS減小工做線程數量,而且返回null,線程被回收
    // 也說明假設狀態爲SHUTDOWN的狀況下,若是workQueue不爲空,那麼線程池仍是能夠繼續執行剩下的任務
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
      // 操做AQS將線程池中的線程數量減一
      decrementWorkerCount();
      return null;
    }

    // 獲取線程池中的有效線程數量
    int wc = workerCountOf(c);

    // 若是開發者主動開啓allowCoreThreadTimeOut而且獲取當前工做線程大於corePoolSize,那麼該線程是能夠被超時回收的
    // allowCoreThreadTimeOut默認爲false,即默認不容許核心線程超時回收
    // 這裏也說明了在覈心線程之外的線程都爲「臨時」線程,隨時會被線程池回收
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
    // 這裏說明了兩點銷燬線程的條件:
    // 1.原則上線程池數量不可能大於maximumPoolSize,但可能會出現併發時操做了setMaximumPoolSize方法,若是此時將最大線程數量調少了,極可能會出現當前工做線程大於最大線程的狀況,這時就須要線程超時回收,以維持線程池最大線程小於maximumPoolSize,
    // 2.timed && timedOut 若是爲true,表示當前操做須要進行超時控制,這裏的timedOut爲true,說明該線程已經從workQueue.poll()方法超時了
    // 以上兩點知足其一,均可以觸發線程超時回收
    if ((wc > maximumPoolSize || (timed && timedOut))
        && (wc > 1 || workQueue.isEmpty())) {
      // 嘗試用AQS將線程池線程數量減一
      if (compareAndDecrementWorkerCount(c))
        // 減一成功後返回null,線程被回收
        return null;
      // 不然循環重試
      continue;
    }

    try {
      // 若是timed爲true,阻塞超時獲取任務,不然阻塞獲取任務
      Runnable r = timed ?
        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
      workQueue.take();
      if (r != null)
        return r;
      // 若是poll超時獲取任務超時了, 將timeOut設置爲true
      // 繼續循環執行,若是碰巧開發者開啓了allowCoreThreadTimeOut,那麼該線程就知足超時回收了
      timedOut = true;
    } catch (InterruptedException retry) {
      timedOut = false;
    }
  }
}
複製代碼

我把我對getTask()方法源碼的深度解析寫在源碼對應的地方了,該方法就是實現默認的狀況下核心線程不被銷燬的核心實現,其實現思路大體是:

  1. 將timedOut超時標記默認設置爲false;
  2. 計算timed的值,該值決定了線程的生死大權,(timed && timedOut) 便是線程超時回收的條件之一,須要注意的是第一次(timed && timedOut) 爲false,由於timedOut默認值爲false,此時還沒到poll超時獲取的操做;
  3. 根據timed值來決定是用阻塞超時獲取任務仍是阻塞獲取任務,若是用阻塞超時獲取任務,超時後timedOut會被設置爲true,接着繼續循環,此時(timed && timedOut) 爲true,知足線程超時回收。

嘔心瀝血的一篇源碼解讀到此結束,但願能助同窗們完全吃透線程池的底層原理,之後遇到面試官問你線程池的問題,你就說看過「後端進階」的線程池源碼解讀,面試官這時就會誇你:

這同窗基礎真紮實!

公衆號「後端進階」,專一後端技術分享!
相關文章
相關標籤/搜索