多線程知識梳理(6) 線程池四部曲之 ThreadPoolExecutor

1、ThreadPoolExecutor 簡介

1.1 優勢

多線程知識梳理(5) - 線程池四部曲之 Executor 框架 中,咱們對Executor框架以及它的調度模型進行了簡要的介紹,其中用於對線程進行調度和管理的線程池是整個框架的核心,經過線程池咱們能夠:數組

  • 重複利用已經建立的線程下降線程建立和銷燬形成的消耗。
  • 當任務到達時,任務能夠不須要等到線程建立就可以當即執行,提升響應速度。
  • 利用線程池對線程進行統一分配、調優和監控,提供線程的可管理性。

1.2 處理流程

JDK包中,ThreadPoolExecutor就是線程池的具體實現,在閱讀源碼以前,咱們先對它的處理流程進行簡要介紹,當咱們經過execute/submit方法提交一個任務到線程池後,會通過如下的處理流程: bash

  1. 若是當前運行的線程小於 corePoolSize,則建立新線程來執行任務。
  2. 若是運行的線程等於或多於 corePoolSize,則將任務加入到等待隊列中。
  3. 若是沒法將任務加入到等待隊列,則繼續建立新的線程來執行任務。
  4. 若是建立新線程使得當前運行的線程超過maximumPoolSize,任務將被拒絕。

以上就是ThreadPoolExecutor對於任務的處理流程,其中有幾點須要說明:多線程

  • 當建立一個新線程來執行任務時,須要獲取全局鎖,而若是僅僅是將任務加入到等待隊列中則不須要,
  • 當新線程執行完建立它時所指派的第一個任務以後,並不會立刻退出,它會反覆從等待隊列中獲取新的任務來執行。
  • 若是一個線程在指定的時間內一直沒有獲取到新任務,那麼咱們會根據當前線程池當中活動的線程數量來決定是否銷燬它:若是當前線程池數量大於corePoolSize,那麼銷燬該線程,不然只有當設置了allowCoreThreadTimeOut,纔會銷燬該線程。

2、ThreadPoolExecutor 實現

2.1 參數配置

從上面的處理流程能夠看出,ThreadPoolExecutor對於任務的處理流程,會受到corePoolSize、等待隊列、maximumPoolSize等參數的影響,而這些參數都是能夠由ThreadPoolExecutor的建立者去指定的,正是鑑於這種靈活性,使得咱們僅僅經過簡單的配置就能夠實現適用於不一樣的場景的ThreadPoolExecutor,下面,咱們就來介紹一一介紹這些參數的含義:框架

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
複製代碼

(1) int corePoolSize函數

指定核心線程池的大小,當線程的數量沒有大於corePoolSize以前,始終會建立新線程來執行分配的任務。若是調用了preStartAllCoreThreads()方法,線程池會提早建立並啓動全部核心線程。oop

(2) int maximumPoolSizeui

指定線程池的最大數量,當加入新任務時,若是發現等待隊列已經滿了,那麼咱們會嘗試經過建立新線程的方式來執行該任務,而若是此時線程池內線程的數量已經等於maximumPoolSize,那麼會採用指定的拒絕策略來處理該任務。this

(3) long keepAliveTime 和 TimeUnit unitspa

當一個線程在執行完分配給它的任務以後,會嘗試從等待隊列中取出任務去執行,若是通過keepAliveTime以後仍然不能從隊列中獲取到任務,說明此時系統中可能並無那麼多的任務須要去處理,那麼就會根據線程池此時的狀態來決定是否銷燬該線程,以保證在可以迅速響應任務的同時,又不至於有太多空閒的存活線程。線程

(4) BlockingQueue workQueue

指定等待隊列的實現方式,咱們能夠根據須要選擇如下幾種等待隊列:

  • ArrayBlockingQueue:基於數組結構的有界等待隊列,按先進先出原則排序任務
  • LinkedBlockingQueue:基於鏈表結構的阻塞隊列,一樣按照先進先出原則排序任務,吞吐量要高於ArrayBlockingQueue
  • SynchronousQueue:對於這種阻塞隊列而言,每一個插入操做必需要等到另外一個線程調用移除操做,不然插入操做一直處於阻塞狀態。
  • PriorityBlockingQueue:一個具備優先級的無限阻塞隊列。

(5) ThreadFactory threadFactory

用於建立線程的工廠。

(6) RejectedExecutionHandler handler

系統內置瞭如下幾種策略,用於隊列和線程池都滿了的狀況:

  • AbortPolicy:拋出異常,這也是默認的策略
  • CallerRunsPolicy:使用調用者所在線程來執行任務
  • DiscardOldestPolicy:先丟棄隊列中最末尾的任務,再從新經過execute方法執行該任務。
  • DiscardPolicy:不作任何處理,直接丟棄

2.2 內置 ThreadPoolExecutor

多線程知識梳理(5) - 線程池四部曲之 Executor 框架 中,咱們介紹了幾種ThreadPoolExecutor的實現:

它們其實都是 ThreadPoolExecutor,只是在構造時傳入了不一樣的參數,以下表所示:
結合以前對於處理流程和核心參數的分析,對它們進行進一步的介紹:

(1) FixedThreadPool

  • 傳入的nThread參數將被做爲核心線程數和最大線程數,當線程池的數量達到nThread後,以後的任務將會被加入到無界的等待隊列當中
  • 除非某個線程由於異常而結束,不然當線程池的數量達到nThread以後將會一直保持不變
  • 因爲使用的是無界隊列,所以線程池不會拒絕任務

(2) SingleThreadPoolExecutor

  • 若是當前線程池中無運行的線程時,將建立一個新線程來執行任務
  • 因爲最大線程數被設置爲1,所以以後的任務都被加入到無界隊列當中,而且由線程池中這個惟一的線程從等待隊列中,按照添加的順序依次執行任務

(3) CachedThreadPool

  • 因爲等待隊列使用的是SynchonousQueue,它的每一個插入操做都必須等待另外一個線程的移除操做,對於線程池而言,也就是說:在添加任務到等待隊列時,必需要有一個空閒線程正在嘗試從等待隊列獲取任務,纔有可能添加成功。
  • 所以,當一個任務被添加進入線程池時,會有如下兩種狀況:
  • 若是當前有空閒線程正在嘗試從等待隊列中獲取任務,那麼這個任務將會被交給這個空閒線程進行處理
  • 若是當前沒有空閒線程嘗試從等待隊列中獲取任務,那麼將會建立一個新線程來執行任務
  • 因爲設置了等待超時時間,所以某個線程在60s內都沒法獲取到新的任務,那麼它將會被銷燬。

3、ThreadPoolExecutor 源碼走讀

3.1 ctl

ThreadPoolExector中,有一個關鍵變量 - ctl,理解它是咱們進行源碼走讀的基礎。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0))
複製代碼

這個原子操做類包含了兩部分信息:線程池的狀態線程池中的存活線程數目,它們用32位的整型數來表示,其中高3位表示線程池的狀態,低位表示當前線程池中存活的線程數。

在某一時刻,線程池會處於如下五種狀態之一:

  • RUNNINGAccept new tasks and process queued tasks
  • SHUTDOWNDon't accept new tasks, but process queued tasks
  • STOPDon't accept new tasks, don't process queued tasks, and interrupt in-progress tasks
  • TIDYINGAll tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method
  • TERMINATEDterminated() has completed

這五種狀態之間轉換轉換圖爲:

對於ctl變量,如下三個函數能夠用來拆解和組裝:

//獲取線程池的狀態信息
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//獲取線程池的存活線程數
private static int workerCountOf(int c)  { return c & CAPACITY; }
//將狀態信息和存活線程數進行組合
private static int ctlOf(int rs, int wc) { return rs | wc; }
複製代碼

3.2 任務執行過程

下面,咱們經過模擬一個任務的執行來對ThreadPoolExecutor的源碼進行簡單的走讀,整個流程以下圖所示,紅色字部分爲咱們所要關注的關鍵方法:

(1) public void execute(Runnable command)

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        } else if (!addWorker(command, false)) {
            reject(command);
        }
    }
複製代碼

前面咱們說過,向一個線程池中提交任務有兩種方法,execute/submit,它們最終都會調用到上面的這個execute當中,這一函數的邏輯分爲三步:

  • 當線程池中的存活線程數小於指定的核心線程數時,嘗試經過addWorker(firstTask, core)建立一個新的線程來執行任務,這裏將傳入的runnable做爲該線程的第一個任務,而且core參數爲true,若是建立成功,那麼直接返回,不然從新獲取一次ctl變量,跳轉到步驟2
  • 接着經過ctl變量判斷若是當前線程池處於running狀態,那麼將runnable添加到等待隊列workQueue當中,若是添加失敗跳轉到步驟3,添加成功則進行二次檢查,當發現了下面這兩種狀況之一,那麼還須要進行額外的處理:
  • 若是發現線程池變爲了非running狀態,那麼會將該任務從等待隊列中移除;
  • 若是當前線程池已經沒有存活的線程,那麼爲了讓等待隊列中的任務能夠運行,咱們須要經過addWorker方法啓動一個新線程,與第一步不一樣的是,該線程的第一個任務爲空。
  • 經過addWorker方法建立新線程來執行該任務,和第一步的惟一區別就是core參數爲false,若是建立失敗,那麼執行拒絕策略。

(2) private boolean addWorker(Runnable firstTask, boolean core) 下面,咱們再來看一下這個核心的函數addWorker,它的最終目的就是建立一個新的線程來執行任務:

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            //第一部分:當前線程池的狀態是否知足加入的條件
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            //第二部分:當前線程池的容量是否知足加入的條件
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //第三部分:建立工做類Worker
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) 
                            throw new IllegalThreadStateException();
                        //第四部分:將Worker加入到線程池中
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //第五部分:啓動線程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
複製代碼

整個addWorker進行了如下幾步操做:

  • 根據當前線程池的狀態,判斷是否容許新建線程
  • 根據當前線程池的工做線程數,判斷是否容許新建線程
  • 建立一個Worker對象,這個Worker類中包含了一個線程
Worker(Runnable firstTask) {
            setState(-1); 
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
複製代碼
  • 將新建的Worker對象加入到線程池中
  • 啓動Worker中的線程

(3) final void runWorker(Worker w)

在第(2)步中,咱們啓動了Worker對象中的線程t,它會調用Worker對象的run()函數,接着會執行runWorker方法:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); 
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
複製代碼

這裏面的task就是咱們經過execute方法傳入的Runnable,若是Worker的第一個任務不爲空,那麼會首先執行該任務,若是第一個任務執行完畢,那麼會調用getTask()方法來嘗試去獲取下一個任務,當getTask()方法不返回(等待隊列爲空)時,會一直阻塞在這裏,而當這個while循環退出的時候,那麼Worker所對應的線程就會被銷燬。

(4) private Runnable getTask()

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
複製代碼

getTask()方法會去在第(1)步中的等待隊列workerQueue取任務,在獲取任務的時候會考慮超時時間keepAliveTime,若是超時時間到了仍然沒有獲取到任務,那麼getTask()方法就會返回null,從而runWorker()中的while循環就會結束,以後在finally代碼塊中經過processWorkerExit(w, completedAbruptly)銷燬該線程。

4、關閉線程池

關閉線程池有兩種方法:shutdownshutdownNow

  • shutdown:將線程池的狀態設置成SHUTDOWN狀態,而後中斷全部沒有正在執行任務的線程。
  • shutdownNow:將線程池的狀態設置爲STOP,嘗試中止全部正在執行或暫停任務的線程,並返回等待執行任務的列表。
相關文章
相關標籤/搜索