java多線程之ThreadPoolExecutor原理

「本文已參與好文召集令活動,點擊查看:後端、大前端雙賽道投稿,2萬元獎池等你挑戰!」前端

ThreadPoolExecutor是Java的線程池併發代名詞,多線程開發基本都是基於這個去作具體的業務開發。雖然以爲本身回了,網上帖子已經有不少的文章寫這個,可是是本身一一點寫的,終歸是要比看別人的理解更加深入,因此最近本身在對java知識的系統梳理。 那麼接下來主要分析下這個多線程框架的原理。java

ThreadPoolExecutor的構造函數以成員變量介紹

public ThreadPoolExecutor(int corePoolSize,
                           int maximumPoolSize,
                           long keepAliveTime,
                           TimeUnit unit,
                           BlockingQueue<Runnable> workQueue,
                           ThreadFactory threadFactory,
                           RejectedExecutionHandler handler) {

複製代碼

面試靠的最可能是這個構造函數中7個參數的做用,面試

  • corePoolSize 是核心線程數, 即便線程是空閒的,線程池一直保持的的線程數,除非 allowCoreThreadTimeOut參數設置爲true
  • maximumPoolSize 線程池最大線程數
  • keepAliveTime unit 線程存活時間 和 時間單位
  • workQueue 是任務隊列,是用來保持task,該隊列保持住了Runnable的任務,經過調用 線程池的execute的方法.
  • threadFactory 建立線程的工廠
  • RejectedExecutionHandler 是當線程數超過限制以及隊列也滿了,須要執行的拒絕策 略.

成員變零後端

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1; //線程容量

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
複製代碼

面試最喜歡問的是 ctl變量的表明什麼意義? ctl變量的的的用高3位表示線程池的狀態,用低29位表示線程個數,二者經過 | 操做,拼接出ctl變量,也就是線程池的最大線程數capacity是 (2^29)-1。markdown

線程池狀態

  • RUNNING 運行狀態 -1 << 29 表示線程池能夠接新的任務而且處理隊列任務
  • SHUTDOWN 關閉狀態態 -1 << 29 表示線程池不接受新的線程池任務可是能夠處理隊列中的任務
  • STOP 中止狀態 1 << 29 表示線程池不接受新的線程池任務也不處理隊列中的任務而且中斷線程池裏中正在執行的任務
  • TIDYING 2 << 29 表示全部的線程池都已經中斷了,線程數爲0,線程狀態轉爲爲TIDYING, 將執行terminated鉤子函數
  • TERMINATED 3 << 29 表示全部terminated方法都已經執行完成。

線程狀態之間裝換圖

image.png

線程池的提交任務執行流程

首先咱們來看平時業務代碼是提交任務到線程池執行的函數是經過execute或者submit方法, 區別就是submit返回具備Future,execute返回void,的、那麼接下來咱們主要分析execute 的執行流程,submit涉及到線程異步返回,以後會另外單獨分析,那麼下面這個execute函數 就能看出線程池的整個執行流程,多線程

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
         // 當線程池的核心線程數設置爲0狀況下,那麼這時workerCountOf(recheck)爲0,這時就開啓非線程數處理隊列任務
          addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}
複製代碼

線程池執行任務流程圖以下: image.png 我相信大概的流程通常同窗是清楚的:併發

  1. 當線程數的Worker線程 < corePoolSize 建立核心線程數執行
  2. 當線程數的Worker線程 > corePoolSize,將任務加入任務隊列中
  3. 則當corePoolSize< maxPoolsize,則新增非核心線程執行任務
  4. 當隊列滿了,線程數也已經達到maxPoolsize,則執行拒絕策略

實際源碼中執行流程還有一些小細節容易被忽略的地點框架

  • 從新檢查線程的狀態以及檢查 線程池的線程數的流程

線程池新增工做線程的流程

線程池新增工做任務主要addWorker方法。因爲代碼比較長,我就在 代碼裏寫好註釋less

private boolean addWorker(Runnable firstTask, boolean core) {
   retry:
   for (;;) {
       int c = ctl.get();
       int rs = runStateOf(c);

       // Check if queue empty only if necessary.
       if (rs >= SHUTDOWN &&
          //第一個條件: 線程至少不是運行狀態,那麼就是shutdown stop tidying,terminated狀態
           ! (rs == SHUTDOWN &&
              firstTask == null &&
              ! workQueue.isEmpty()))
            //第二個條件: 當前線程池是shutdown狀態且任務隊列非空而且工做任務第一個任務是空的取反條件,這個含義是當除了SHUTDOWN狀態且第一個任務爲空且任務隊列不爲空
           // 狀況下,直接返回false,增長Work線程失敗
           return false;

       for (;;) {
           int wc = workerCountOf(c);
           if (wc >= CAPACITY ||
               wc >= (core ? corePoolSize : maximumPoolSize))
               return false;
           if (compareAndIncrementWorkerCount(c))
               break retry;
           c = ctl.get();  // Re-read ctl
           if (runStateOf(c) != rs)
               continue retry;
           // else CAS failed due to workerCount change; retry inner loop
       }
   }

   boolean workerStarted = false;
   boolean workerAdded = false;
   Worker w = null;
   try {
       w = new Worker(firstTask);
       final Thread t = w.thread;
       if (t != null) {
           final ReentrantLock mainLock = this.mainLock;
           mainLock.lock();
           try {
               // Recheck while holding lock.
               // Back out on ThreadFactory failure or if
               // shut down before lock acquired.
               int rs = runStateOf(ctl.get());

               if (rs < SHUTDOWN ||
               // 線程池是running狀態
                   (rs == SHUTDOWN && firstTask == null)) {
                   //線程池處於shutdown狀態而且第一個task爲空
                   if (t.isAlive()) // precheck that t is startable
                       throw new IllegalThreadStateException();
                   //加入工做線程的集合
                   workers.add(w);
                   int s = workers.size();
                   if (s > largestPoolSize)
                      //記錄最大線程數
                       largestPoolSize = s;
                   workerAdded = true;
               }
           } finally {
-                 mainLock.unlock();
-             }
-             if (workerAdded) {
-                 t.start();
                  workerStarted = true;
           }
       }
   } finally {
       if (! workerStarted)
           addWorkerFailed(w);
   }
   return workerStarted;
}
複製代碼

添加工做線程主要步驟異步

  • 檢查線程池的運行狀態以及隊列是不是空,增長線程。
    爲何增長這個判斷,主要是由於線程池是多線程的隨即可能另外調用shutdown等方法關閉線程池,因此作每一步以前都要再次check線程池的狀態,其中比較重要的點是線程池在除了Running狀態,其餘的只有shutdow狀態,且隊列任務非空的狀況,才能增長work線程處理任務。
  • 判斷線程池的線程是核心線程數,而後就判斷大於核心線程數, 若是不是增長的核心線程數,而後經過 CAS增長線程數加1,而後re-read的ctl的如今的狀態是否剛開始進入循環的狀態保持一致。
  • 建立Worker對象,它的第一個參數Runable就是執行的第一個task,而後獲取mainLock的重入鎖, 而後再次判斷線程池的狀態是不是shutdown狀態,而後將Worker對象加入工做線程的Set集合中, 判斷是大於largePoolSize,則將workSet的size賦值largePoolSize,而後賦值workerAdded爲true,接下來在finnally中workerAdded爲true,則調用Worker的start方法啓動該Worker線程,

若是WorkerAdded失敗,則從Worder的Set移除剛纔加入Worker線程,並將線程池的線程數減1,

工做線程Worker的執行流程

首先來看下Work的類的成員變量的構造函數,從下面的Work的代碼,能夠看到它是實現了 RUnnable接口,上一節Worker啓動是調用了它的start方法,真正由操做系統調度執行 的其run方法,那麼接下來重點看下run的工做流程。

private final class Worker
   extends AbstractQueuedSynchronizer
   implements Runnable
{
   /**
    * This class will never be serialized, but we provide a
    * serialVersionUID to suppress a javac warning.
    */
   private static final long serialVersionUID = 6138294804551838833L;

   /** Thread this worker is running in.  Null if factory fails. */
   final Thread thread;
   /** Initial task to run.  Possibly null. */
   Runnable firstTask;
   /** Per-thread task counter */
   volatile long completedTasks;

   /**
    * Creates with given first task and thread from ThreadFactory.
    * @param firstTask the first task (null if none)
    */
   Worker(Runnable firstTask) {
        //初始化狀態爲-1,表示不能被中斷
       setState(-1); // inhibit interrupts until runWorker
       this.firstTask = firstTask;
       this.thread = getThreadFactory().newThread(this);
   }
複製代碼

下面代碼中Work的run直接調用runWork,並傳入自身對象, 開始一個循環判斷 第一個任務後者從任務隊列中取任務不爲空,就開始上鎖,而後執行任務,若是任務 隊列爲空了,則處理Work的退出。

/** Delegates main run loop to outer runWorker  */
public void run() {
    //直接調用runWorker函數
    runWorker(this);
}

final void runWorker(Worker w) {
    // Wokder當前線程
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    //將state值賦值爲0,這樣就運行中斷
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 循環判斷第一個Task獲取從獲取任務
        while (task != null || (task = getTask()) != null) {
           //獲取當前Work的鎖,處理任務,也就是當前Work線程處理是同步處理任務的
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
             //線程池的狀態至少是stop,即便stop,tidying.terminated狀態
           if ((runStateAtLeast(ctl.get(), STOP)
             //檢查線程是否中斷且清楚中斷
           || (Thread.interrupted()
                 &&
                  //再次檢查線程池的狀態至少是STOP
                  runStateAtLeast(ctl.get(), STOP))) &&
                  //再次判斷是否中斷
                !wt.isInterrupted())
                 //中斷線程
                 wt.interrupt();
            try {
               //執行業務任務前處理(鉤子函數)
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                   // 這裏就是執行提交線程池的Runnable的任務的run方法                               task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //執行業務任務後處理(鉤子函數)
                    afterExecute(task, thrown);
                }
            } finally {
                //執行結束重置爲空,回到while循環拿下一個
                task = null;
                //處理任務加1
                w.completedTasks++;
                //釋放鎖,處理下一個任務
                w.unlock();
            }
        }
        //代碼執行到這裏,表明業務的任務沒有異常,否則不會走到這裏,
        //由於上一層try沒有catch異常的,而業務執行出現異常,最裏層
        //雖然catch了異常,可是也都經過throw向外拋出
        completedAbruptly = false;
    } finally {
     //若是循環結束,則處理Work退出工做,表明任務拿不到任務,即任務隊列沒有任務了
      processWorkerExit(w, completedAbruptly);
    }
}
複製代碼

下面就來看下getTask獲取任務隊列的處理邏輯 、 若是這裏返回null,即runWorker循環退出,則會處理finnaly中processWorkExit, 處理Work線程的退出,下面是getWork返回null的狀況:

  1. 若是線程池狀態值至少是SHUTDOWN狀態,而且 線程池狀態值至少是STOP狀態,或者是任務隊列是空,則將線程池的workcout減1,並返回null,
  2. 計算線程池中線程池的數量,若是線程數量大於最大線程數量, 或者 allowCoreThreadTimeOut參數爲true 或者 線程數大於而且任務隊列爲空,則將線程池減1,並返回null,
private Runnable getTask() {
    //超時標誌
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
         //獲取線程狀態
        int c = ctl.get();
        //線程狀態
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
       // 若是線程池狀態值至少是SHUTDOWN狀態,
        if (rs >= SHUTDOWN
        線程池狀態值至少是STOP狀態,或者是任務隊列是空
        && (rs >= STOP || workQueue.isEmpty())) {
            // CAS將worker線程數減1
            decrementWorkerCount();
            return null;
        }
        //計算線程池線程數量
        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // allowCoreThreadTimeOut參數設置爲true,或則線程池的線程數大於corePoolSize, 表示須要超時的Worker須要退出,
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
         //線程數大於最大線程數 || 已經超時
        if ((wc > maximumPoolSize || (timed && timedOut))
            // 線程數大於1 或者 任務隊列爲空
            && (wc > 1 || workQueue.isEmpty())) {
            // CAS將線程數減1
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 須要處理超時的Worker,則獲取任務隊列中任務等待的時間
            //就是線程池構造函數中keepAliveTime時間,若是不處理超時的Worker
            //則直接調用take一直阻塞等待任務隊列中有任務,拿到就返回Runnale任務
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
複製代碼

Worker的退出處理: 1 從上面分析知道completedAbruptly是任務執行時是否出現異常標誌, 若是任務執行過程出錯,則將線程池的線程數量減1 2.加線程池的mainLock的全局鎖,這裏主要區分Worker執行任務中,拿的是Worker內部的鎖,完成任務加1,將worker從Worker的集合移除, 3. 執行tryTerminate函數,是否線程池線程池是否關閉 4. 根據線程池狀態是否補充非核心的Worker線程去處理

private void processWorkerExit(Worker w, boolean completedAbruptly) {
     //任務執行時出現異常,則減去工做
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();
     //拿到線程池的主鎖
    final ReentrantLock mainLock = this.mainLock;
    //加鎖
    mainLock.lock();
    try {
       //完成任務加1
        completedTaskCount += w.completedTasks;
        //將worker從Worker的集合移除
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    //嘗試線程池關閉
    tryTerminate();
    //獲取線程池的ctl
    int c = ctl.get();
     //若是線程池的狀態值小於STOP,即便SHUTDOWN RUNNING
    if (runStateLessThan(c, STOP)) {
         //任務執行沒有異常
        if (!completedAbruptly) {
            //allowCoreThreadTimeOut參數true,則min=0,表示不須要線程常駐。
            //負責是有corePoolSize個線程常駐線程池
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
                //若是線程池數大於最小,也就是不須要補充線程執行任務隊列的任務
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 走到這裏表示線程池的線程數爲0,而任務隊列又不爲空,得補充一個線程處理任務   addWorker(null, false);
    }
}
複製代碼

tryTerminate的邏輯是處理線程池關閉的場景

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        //線程池是RUNNING狀態
        if (isRunning(c) ||
           //線程池狀態至少是TIDYING
            runStateAtLeast(c, TIDYING) ||
            //線程池狀態是SHUTDOWN可是隊列不爲空
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
             //中斷一個空閒線程
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
        //只有最後一個線程才能走到這裏,處理線程池從TIDYIING狀態
        //到TERMINATED狀態
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try { 
                   //鉤子函數
                    terminated();
                } finally {
                   //設置線程池TERMINATED狀態
                    ctl.set(ctlOf(TERMINATED, 0));
                    //喚醒調用awaitTermination的線程
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}
複製代碼

線程池的拒絕策略 RejectedExecutionHandler

當線程池沒法處理任務時的處理策略:
1.默認拒絕策略是AbortPolicy 直接拋出RejectedExecutionException異常
2.DiscardPolicy 直接丟棄任務
3.DiscardOldestPolicy 丟棄任務隊列中最老的任務,這裏以前理解是直接丟棄,其實看了源碼以後,其實它仍是當線程池還咩有關閉時,嘗試去提交該任務到線程池去執行

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardOldestPolicy} for the given executor.
     */
    public DiscardOldestPolicy() { }

    /**
     * Obtains and ignores the next task that the executor
     * would otherwise execute, if one is immediately available,
     * and then retries execution of task r, unless the executor
     * is shut down, in which case task r is instead discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}
複製代碼
  1. CallerRunsPolicy 直接調用方去執行這個任務,也就是直接Runnable的run函數。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code CallerRunsPolicy}.
     */
    public CallerRunsPolicy() { }

    /**
     * Executes task r in the caller's thread, unless the executor
     * has been shut down, in which case the task is discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}
複製代碼

總結 本文主要就線程池的狀態轉換、工做線程Worker建立以及執行任務隊列中任務的流程、拒絕策略的詳細分析。

相關文章
相關標籤/搜索