源碼分析—ThreadPoolExecutor線程池三大問題及改進方案

前言

在一次聚會中,我和一個騰訊大佬聊起了池化技術,說起到java的線程池實現問題,我說這個我懂啊,而後巴拉巴拉說了一大堆,而後騰訊大佬問我說,那你知道線程池有什麼缺陷嗎?我頓時啞口無言,甘拜下風,因此此次我再回來思考一下線程池的實現原理java

源碼分析

ThreadPoolExecutor構造器

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
      //校驗幾個參數不能小於零,不然拋出異常
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  • corePoolSize:核心線程數
  • maximumPoolSize:最大線程數,線程池容許建立的最大線程數
  • workQueue:任務隊列,BlockingQueue 接口的某個實現(常使用 ArrayBlockingQueue 和 LinkedBlockingQueue)
  • keepAliveTime:空閒線程的保活時間,若是某線程的空閒時間超過這個值都沒有任務給它作,那麼能夠被關閉了。注意這個值並不會對全部線程起做用,若是線程池中的線程數少於等於核心線程數 corePoolSize,那麼這些線程不會由於空閒太長時間而被關閉,固然,也能夠經過調用 allowCoreThreadTimeOut(true)使核心線程數內的線程也能夠被回收
  • threadFactory:用於生成線程,通常咱們使用Executors.defaultThreadFactory()
  • handler:當線程池已經滿了,可是又有新的任務提交的時候,該採起什麼策略由這個來指定

默認的幾個屬性:數據庫

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 這裏 COUNT_BITS 設置爲 29(32-3),意味着前三位用於存放線程狀態,後29位用於存放線程數
private static final int COUNT_BITS = Integer.SIZE - 3;
//最大線程數是 2^29-1=536870911
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
//111 00000000000000000000000000000
private static final int RUNNING    = -1 << COUNT_BITS;
// 000 00000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 001 00000000000000000000000000000
private static final int STOP       =  1 << COUNT_BITS;
// 010 00000000000000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;
// 011 00000000000000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
//將CAPACITY取費後和c進行取與運算,能夠獲得高3位的值,即線程池的狀態
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//將c和CAPACITY取與運算,能夠獲得低29位的值,即線程池的個數
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

採用一個 32 位的整數來存放線程池的狀態和當前池中的線程數,其中高 3 位用於存放線程池狀態,低 29 位表示線程數(CAPACITY)緩存

  • RUNNING:這個沒什麼好說的,這是最正常的狀態:接受新的任務,處理等待隊列中的任務
  • SHUTDOWN:不接受新的任務提交,可是會繼續處理等待隊列中的任務
  • STOP:不接受新的任務提交,再也不處理等待隊列中的任務,中斷正在執行任務的線程
  • TIDYING:全部的任務都銷燬了,workCount 爲 0。線程池的狀態在轉換爲 TIDYING 狀態時,會執行鉤子方法 terminated()
  • TERMINATED:terminated() 方法結束後,線程池的狀態就會變成這個

execute方法

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
     
    int c = ctl.get();
      //若是當前的線程數小於corePoolSize
    if (workerCountOf(c) < corePoolSize) {
          //調用addWorker新建一個線程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
       // 到這裏說明,要麼當前線程數大於等於核心線程數,要麼剛剛 addWorker 失敗了
      //校驗當前線程狀態是RUNNING,並將command入隊
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
          //若是不是運行狀態,那麼移除隊列,並執行拒絕策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 若是線程池仍是 RUNNING 的,而且線程數爲 0,那麼開啓新的線程
          //防止任務提交到隊列中了,可是線程都關閉了
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
      //到這裏說明隊列已經滿了,因此新建一個線程,若是新建的線程數已經超過了maximumPoolSize,那麼執行拒絕策略
    else if (!addWorker(command, false))
        reject(command);
}

用一張圖來歸納一下上面的內容:
ide

  • 若是線程池中的線程數少於 coreThreadCount 時,處理新的任務時會建立新的線程;
  • 若是線程數大於 coreThreadCount 則把任務丟到一個隊列裏面,由當前空閒的線程執行;
  • 當隊列中的任務堆積滿了的時候,則繼續建立線程,直到達到 maxThreadCount;
  • 當線程數達到 maxTheadCount 時還有新的任務提交,那麼咱們就不得不將它們丟棄了。

咱們下面看一下addWorker是如何建立線程的:oop

addWorker源碼分析

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
          //獲取當前線程池狀態
        int rs = runStateOf(c);
           
          //1
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
               //2. 校驗傳入的線程數是否超過了容量大小, 或者是否超過了corePoolSize或maximumPoolSize
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
              //到了這裏說明線程數沒有超,那麼就用CAS將線程池的個數加1
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
              //3 說明有其餘的線程搶先更新了狀態,繼續下一輪的循環,跳到外層循環
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
          //建立一個線程
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                  //4 若是線程是沒有問題的話,那麼將worker加入到隊列中
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    // largestPoolSize 用於記錄 workers 中的個數的最大值
                    // 由於 workers 是不斷增長減小的,經過這個值能夠知道線程池的大小曾經達到的最大值
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
               //若是worker入隊成功,那麼啓動線程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
          //若是worker啓動失敗,那麼就回滾woker線程建立的狀態
        if (! workerStarted)
            addWorkerFailed(w);
    }
      // 返回線程是否啓動成功
    return workerStarted;
}
  1. 這裏主要是列舉了幾個條件不能建立新的worker的狀況
    1. 線程池狀態大於 SHUTDOWN,其實也就是 STOP, TIDYING, 或 TERMINATED
    2. firstTask != null
    3. workQueue.isEmpty()
      若是線程池處於 SHUTDOWN,可是 firstTask 爲 null,且 workQueue 非空,那麼是容許建立 worker 的
  2. 若是傳入的core參數是true表明使用核心線程數 corePoolSize 做爲建立線程的界限,也就說建立這個線程的時候,若是線程池中的線程總數已經達到 corePoolSize,那麼不能響應此次建立線程的請求;若是是false,表明使用最大線程數 maximumPoolSize 做爲界限
  3. 若是CAS失敗並非由於有其餘線程在嘈雜哦致使的,那麼就直接在裏層循環繼續下一次的循環就行了,若是是由於其餘線程的操做,致使線程池的狀態發生了變動,若有其餘線程關閉了這個線程池,那麼須要回到外層的for循環
  4. 若是是 小於 SHUTTDOWN 那就是 RUNNING,則繼續往下繼續,或者狀態是SHUTDOWN可是傳入的firstTask爲空,表明繼續處理隊列中的任務

addWorkerFailed學習

private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            workers.remove(w);
        decrementWorkerCount();
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

addWorkerFailed的處理就是將workers集合裏面的worker移除,而後count減1,優化

worker對象

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
      
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;
 
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
      ....
}

Worker是繼承AQS對象的,在建立Worker對象的時候會傳入一個Runnable對象,並設置AQS的state狀態爲-1,並從線程工廠中新建一個線程ui

調用thread.start方法會調用到Worker的run方法中this

public void run() {
    runWorker(this);
}

Worker的run方法會調用到ThreadPoolExecutor的runWorker方法

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
          //若是task爲空,那麼就從workQueue裏面獲取task
        while (task != null || (task = getTask()) != null) {
            w.lock();
              // 若是線程池狀態大於等於 STOP,那麼意味着該線程也要中斷
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                  // 這是一個鉤子方法
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                       //執行任務
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                       // 這是一個鉤子方法
                    afterExecute(task, thrown);
                }
            } finally {
                   // 置空 task,準備 getTask 獲取下一個任務
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
          //異常狀況或getTask獲取不到任務時會執行關閉
        processWorkerExit(w, completedAbruptly);
    }
}

傳入一個Worker首先去校驗firstTask是否是null,若是是那麼就調用getTask方法從workQueue隊列裏面獲取,而後判斷一下當前的線程是否須要中斷,如須要的話執行鉤子方法,而後調用task的run方法執行task;
若是while循環裏面getTask獲取不到任務的話,就結束循環調用processWorkerExit方法執行關閉;
若是是異常緣由致使的while循環退出,那麼會調用processWorkerExit並傳入爲true

getTask

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        //要麼狀態大於STOP,要麼狀態等於SHUTDOWN而且隊列是空的,那麼線程數減一後返回null
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);
        // 容許核心線程數內的線程回收,或當前線程數超過了核心線程數,那麼有可能發生超時關閉
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
          //校驗線程數是否超了,或者是否超時
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
              // 到 workQueue 中獲取任務
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

這個方法返回null有以下幾種狀況:

  1. 當前狀態是SHUTDOWN而且workQueue隊列爲空
  2. 當前狀態是STOP及以上
  3. 池中有大於 maximumPoolSize 個 workers 存在(經過調用 setMaximumPoolSize 進行設置)

processWorkerExit

private void processWorkerExit(Worker w, boolean completedAbruptly) {
      //若是是異常緣由中斷,那麼須要將運行線程數減一
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
          //設置完成任務數
        completedTaskCount += w.completedTasks;
          //將worker從集合裏移除
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
      //判斷當前的線程池是否處於SHUTDOWN狀態,判斷是否要終止線程
    tryTerminate();

    int c = ctl.get();
      //若是是RUNNING或SHUTDOWN則會進入這個方法
    if (runStateLessThan(c, STOP)) {
          //如不是之外中斷則會往下走
        if (!completedAbruptly) {
              //判斷是否保留最少核心線程數
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
          //若是當前運行的Worker數比當前所須要的Worker數少的話,那麼就會調用addWorker,添加新的Worker
        addWorker(null, false);
    }
}
  1. 判斷是不是意外退出的,若是是意外退出的話,那麼就須要把WorkerCount--
  2. 加完鎖後,同步將completedTaskCount進行增長,表示總共完成的任務數,而且從WorkerSet中將對應的Worker移除
  3. 調用tryTemiate,進行判斷當前的線程池是否處於SHUTDOWN狀態,判斷是否要終止線程
  4. 判斷當前的線程池狀態,若是當前線程池狀態比STOP大的話,就不處理
  5. 判斷是不是意外退出,若是不是意外退出的話,那麼就會判斷最少要保留的核心線程數,若是allowCoreThreadTimeOut被設置爲true的話,那麼說明核心線程在設置的KeepAliveTime以後,也會被銷燬。
  6. 若是最少保留的Worker數爲0的話,那麼就會判斷當前的任務隊列是否爲空,若是任務隊列不爲空的話並且線程池沒有中止,那麼說明至少還須要1個線程繼續將任務完成
  7. 判斷當前的Worker是否大於min,也就是說當前的Worker總數大於最少須要的Worker數的話,那麼就直接返回,由於剩下的Worker會繼續從WorkQueue中獲取任務執行
  8. 若是當前運行的Worker數比當前所須要的Worker數少的話,那麼就會調用addWorker,添加新的Worker,也就是新開啓線程繼續處理任務

線程池的三大問題

這個任務處理流程看似簡單,實際上有不少坑,你在使用的時候必定要注意。

  1. JDK 實現的這個線程池優先把任務放入隊列暫存起來,而不是建立更多的線程,它比較適用於執行 CPU 密集型的任務,也就是須要執行大量 CPU 運算的任務。因此噹噹前線程數超過核心線程數時,線程池不會增長線程,而是放在隊列裏等待覈心線程空閒下來。

可是,咱們平時開發的 Web 系統一般都有大量的 IO 操做,比方說查詢數據庫、查詢緩存等等。任務在執行 IO 操做的時候 CPU 就空閒了下來,這時若是增長執行任務的線程數而不是把任務暫存在隊列中,就能夠在單位時間內執行更多的任務,大大提升了任務執行的吞吐量。因此你看 Tomcat 使用的線程池就不是 JDK 原生的線程池,而是作了一些改造,當線程數超過 coreThreadCount 以後會優先建立線程,直到線程數到達 maxThreadCount,這樣就比較適合於 Web 系統大量 IO 操做的場景了,你在實際運用過程當中也能夠參考借鑑。

  1. 線程池中使用的隊列的堆積量也是咱們須要監控的重要指標,對於實時性要求比較高的任務來講,這個指標尤其關鍵。

我在實際項目中就曾經遇到過任務被丟給線程池以後,長時間都沒有被執行的詭異問題。最初,我認爲這是代碼的 Bug 致使的,後來通過排查發現,是由於線程池的 coreThreadCount 和 maxThreadCount 設置的比較小,致使任務在線程池裏面大量的堆積,在調大了這兩個參數以後問題就解決了。跳出這個坑以後,我就把重要線程池的隊列任務堆積量,做爲一個重要的監控指標放到了系統監控大屏上。

  1. 若是你使用線程池請必定記住不要使用無界隊列(即沒有設置固定大小的隊列)。也許你會以爲使用了無界隊列後,任務就永遠不會被丟棄,只要任務對實時性要求不高,反正遲早有消費完的一天。可是,大量的任務堆積會佔用大量的內存空間,一旦內存空間被佔滿就會頻繁地觸發 Full GC,形成服務不可用,我以前排查過的一次 GC 引發的宕機,原由就是系統中的一個線程池使用了無界隊列。

線程池的改造方案

咱們這裏直接學習Tomcat是如何優化線程池的,在咱們平時的使用中若是使用LinkedBlockingQueue的話,默認是使用Integer.MAX_VALUE,即無界隊列(這種狀況下若是沒有配置隊列的capacity的話,隊列始終不會滿,那麼始終沒法進入開啓新線程到達maxThreads個數的地步,則此時配置maxThreads實際上是沒有意義的)。

而在Tomcat中使用的是TaskQueue,TaskQueue的隊列capacity爲maxQueueSize,默認也是Integer.MAX_VALUE。可是,其重寫offer方法,當其線程池大小小於maximumPoolSize的時候,返回false,即在必定程度改寫了隊列滿的邏輯,修復了使用LinkedBlockingQueue默認的capacity爲Integer.MAX_VALUE的時候,maxThreads失效的"bug"。從而能夠繼續增加線程到maxThreads,超過以後,繼續放入隊列。

因此綜上,Tomcat的線程池使用了本身擴展的taskQueue,修改了offer的邏輯,以作到最小的改動實現了線程池的改造。

咱們再來回顧一下ThreadPoolExecutor的execute方法是怎麼寫的:
ThreadPoolExecutor#execute

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
      //這裏,若是使用workQueue的offer成功的話,那麼就不會建立新的線程,若是失敗的話,就會走到else if方法進行建立新的線程
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

TaskQueue

public class TaskQueue extends LinkedBlockingQueue<Runnable> {
   private ThreadPoolExecutor parent = null;
    @Override
    public boolean offer(Runnable o) {
        //we can't do any checks
        if (parent==null) return super.offer(o);
        //we are maxed out on threads, simply queue the object
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
        //we have idle threads, just add it to the queue
        if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
        //當其線程池大小小於maximumPoolSize的時候,返回false
        if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
        //if we reached here, we need to add it to the queue
        return super.offer(o);
    }
}

咱們從這裏能夠看到

  1. 若是當前線程數已達到MaximumPoolSize,那麼就放入到隊列裏去
  2. 若是當前線程池的數量大於正在運行的線程數,說明有空閒的線程,那麼就將任務放入到隊列中去
  3. 若當其線程池大小小於maximumPoolSize的時候,返回false
相關文章
相關標籤/搜索