Java Executor併發框架(二)剖析ThreadPoolExecutor運行過程

上一篇從總體上介紹了Executor接口,從上一篇咱們知道了Executor框架的最頂層實現是ThreadPoolExecutor類,Executors工廠類中提供的newScheduledThreadPoolnewFixedThreadPoolnewCachedThreadPool方法其實也只是ThreadPoolExecutor的構造函數參數不一樣而已。經過傳入不一樣的參數,就能夠構造出適用於不一樣應用場景下的線程池,那麼它的底層原理是怎樣實現的呢,這篇就來介紹下ThreadPoolExecutor線程池的運行過程。java


1.線程池狀態

既然要講運行過程,那麼首先要了解下線程池的狀態分爲哪些?spring

volatile int runState;
static final int RUNNING    = 0;
static final int SHUTDOWN   = 1;
static final int STOP       = 2;
static final int TERMINATED = 3;

ThreadPoolExecutor代碼中定義了上面幾個變量:定義了一個volatile變量runState,以及其餘幾個表示狀態的常量。
runState:初始狀態,表示當前線程池的運行狀態,它的值就是上面的那4個常量值之一緩存

RUNNING:線程池接受新任務並執行隊列任務中...框架

SHUTDOWN:再也不接受新任務,可是會繼續執行等待隊列Queued中的任務。當調用了shutdown()方法,會從 RUNNING -> SHUTDOWN函數

STOP:再也不接受新任務,同時也不執行等待隊列Queued中的任務,而且會嘗試終止正在執行中的任務。當調用了shutdownNow()方法, 會從(RUNNING or SHUTDOWN) -> STOP學習

TERMINATED:線程池中全部線程已經中止運行,其餘行爲同 STOP狀態。this

  • 當等待隊列和線程池爲空時,會從SHUTDOWN -> TERMINATED
  • 當線程池爲空時,會從STOP -> TERMINATED

2.線程池運行任務

2.1變量介紹

在講解運行過程前,咱們先看下ThreadPoolExecutor中的幾個比較重要的成員變量:線程

private final BlockingQueue<Runnable> workQueue; //任務緩存隊列,用來保存等待中的任務,等待worker線程空閒時執行任務
private final ReentrantLock mainLock = new ReentrantLock(); //更新 poolSize, corePoolSize,maximumPoolSize, runState, and workers set 時須要持有這個鎖
private final HashSet<Worker> workers = new HashSet<Worker>(); //用來保存工做中的執行線程
private volatile long  keepAliveTime; //超過corePoolSize外的線程空閒存活之間
private volatile boolean allowCoreThreadTimeOut; //是否對corePoolSize內的線程設置空閒存活時間
private volatile int   corePoolSize; //核心線程數
private volatile int   maximumPoolSize; //最大線程數(即線程池中的線程數目大於這個參數時,提交的任務會被放進任務緩存隊列)
private volatile int   poolSize; //線程池中的當前線程數
private volatile RejectedExecutionHandler handler; //任務拒絕策略
private volatile ThreadFactory threadFactory; //線程工廠,用來新建線程
private int largestPoolSize; //記錄線程池中出現過的最大線程數大小
private long completedTaskCount; //已經執行完的線程數

這邊重點解釋下 corePoolSizemaximumPoolSizeworkQueue兩個變量,這兩個變量涉及到線程池中建立線程個數的一個策略。
corePoolSize: 這個變量咱們能夠理解爲線程池的核心大小,舉個例子來講明(corePoolSize假設等於10,maximumPoolSize等於20):設計

  1. 有一個部門,其中有10(corePoolSize)名工人,當有新任務來了後,領導就分配任務給工人去作,每一個工人只能作一個任務。
  2. 當10個工人都在忙時,新來的任務就要放到隊列(workQueue)中等待。
  3. 當任務越積累越多,遠遠超過工人作任務的速度時,領導就想了一個辦法:從其餘部門借10個工人來,借的數量有一個公式(maximumPoolSize - corePoolSize)來計算。而後把新來的任務分配給借來的工人來作。
  4. 可是若是速度仍是還不急的話,可能就要採起措施來放棄一些任務了(RejectedExecutionHandler)。
    等到必定時間後,任務都完成了,工人比較閒的狀況下,就考慮把借來的10個工人還回去(根據keepAliveTime判斷)
  5. 也就是說corePoolSize就是線程池大小,maximumPoolSize在我看來是線程池的一種補救措施,即任務量忽然過大時的一種補救措施。

2.2線程執行過程

先看下前一篇文章中的一個例子:代理

ExecutorService executor = Executors.newFixedThreadPool(3);

        IntStream.range(0, 6).forEach(i -> executor.execute(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("finished: " + threadName);
        }));

上面代碼就是新建6個任務,而後扔到線程池中運行,輸出線程名稱,直到運行完畢。其中最核心的方法就是execute()方法,雖然submit()也能夠執行任務,但它底層也是調用execute()方法,因此懂了execute()的實現原理便可:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {   //1.
            if (runState == RUNNING && workQueue.offer(command)) {    //2.
                if (runState != RUNNING || poolSize == 0)   //3.
                    ensureQueuedTaskHandled(command);  //4.
            }
            else if (!addIfUnderMaximumPoolSize(command))  //5.
                reject(command); // is shutdown or saturated //6
        }
    }

上面的代碼看起來邏輯有點複雜,咱們一個一個看,首先看上面1位置處:
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
是一個或表達式,它分紅兩部分

  1. 首先判斷當前線程數是否大於等於核心線程數,是的話直接進入if語句塊中,不然判斷第二個部分
  2. 第二個部分addIfUnderCorePoolSize(command) ,這個方法是當線程數小於核心線程數時,用來新建線程執行任務(由於線程數小於corePoolSize時,直接新建線程來運行任務,無論當前線程池裏有沒有空閒的線程)。若是新建失敗,那麼進入if語句塊,成功了那麼execute方法就執行結束了,由於線程已經新建成功了,任務已經開始在線程池中運行。

進入if語句塊後,看上面代碼2.if (runState == RUNNING && workQueue.offer(command))

  1. 判斷當前線程池狀態是不是RUNNING 並且 任務放入等待隊列中成功,那麼直接進入if語句塊
  2. 不然到代碼5.處 if (!addIfUnderMaximumPoolSize(command)),判斷新任務用新線程執行是否成功(注:這裏的新線程就是咱們上面講的 「借來的工人」 maximumPoolSize)
  3. 若是「借來的工人」仍是處理不了的話,執行任務拒絕策略

繼續進到代碼塊3 的if語句塊if (runState != RUNNING || poolSize == 0), 由於新任務加入到等待隊列中了,這句判斷是爲了防止在將此任務添加進任務緩存隊列的同時其餘線程忽然調用shutdown或者shutdownNow方法關閉了線程池的一種應急措施。若是是的話,應急處理加入的新任務 ensureQueuedTaskHandled(command)


咱們看下兩個關鍵方法的實現:
##### 1.addIfUnderCorePoolSize

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < corePoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        return t != null;
    }

首先獲取鎖,由於涉及到線程池狀態的變化。而後再次判斷 if (poolSize < corePoolSize && runState == RUNNING),在execute()方法中咱們已經判斷過一次,這邊再次判斷是爲了防止其餘線程又新增了新線程或者調用了shutdown、shutdownNow方法,這邊起到了雙重檢查的一個效果。若是爲true的話,進行t = addThread(firstTask)新增線程執行任務。addThread方法裏面比較簡單,就是經過線程工廠建立線程thread,而後封裝到Worker對象中,加入到 workers隊列中,並執行線程,能夠把Worker對象當作是擁有一個線程的對象。

private Thread addThread(Runnable firstTask) {
        Worker w = new Worker(firstTask);
        Thread t = threadFactory.newThread(w);
        boolean workerStarted = false;
        if (t != null) {
            w.thread = t;
            workers.add(w);
            int nt = ++poolSize;
            if (nt > largestPoolSize)
                largestPoolSize = nt;
            try {
                t.start();
                workerStarted = true;
            }
        }
        return t;
    }

這裏在介紹下Worker對象, 它實現了Runnable接口,你把它當成Runnable的一個代理類便可,最終也是執行它的run方法。只要注意一下Worker中的beforeExecuteafterExecute方法,這兩個方法在ThreadPoolExecutor中沒有具體實現,用戶能夠重寫這個方法和後面的afterExecute方法來進行一些統計信息,好比某個任務的執行時間等,而afterExecute方法還有一個Throwable t參數,用戶能夠用來記錄一些異常信息,由於新線程中的異常時捕獲不到的,須要在afterExecute中記錄。
看起來這個是否是和spring 切面有點像,能夠看到 知識都是相通的。
看一下它的run方法:

public void run() {
            try {
                hasRun = true;
                Runnable task = firstTask;
                firstTask = null;
                while (task != null || (task = getTask()) != null) {  //1
                    runTask(task);
                    task = null;
                }
            } finally {
                workerDone(this);
            }
        }

注意代碼塊1,能夠看到這邊在循環獲取任務,並執行,直到任務所有執行完畢。除了第一個任務,其餘任務都是經過getTask()方法去取,這個方法是ThreadPoolExecutor中的一個方法。咱們猜一下,整個類中只有任務緩存隊列中保存了任務,應該就是去緩存隊列中取了。

Runnable getTask() {
    for (;;) {
        try {
            int state = runState;
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            if (state == SHUTDOWN)  // Help drain queue
                r = workQueue.poll(); //取任務
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //若是線程數大於核心池大小或者容許爲核心池線程設置空閒時間,
                //則經過poll取任務,若等待必定的時間取不到任務,則返回null
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            else
                r = workQueue.take();
            if (r != null)
                return r;
            if (workerCanExit()) {    //若是沒取到任務,即r爲null,則判斷當前的worker是否能夠退出
                if (runState >= SHUTDOWN) // Wake up others
                    interruptIdleWorkers();   //中斷處於空閒狀態的worker
                return null;
            }
            // Else retry
        } catch (InterruptedException ie) {
            // On interruption, re-check runState
        }
    }
}

這裏有一個很是巧妙的設計方式,假如咱們來設計線程池,可能會有一個任務分派線程,當發現有線程空閒時,就從任務緩存隊列中取一個任務交給 空閒線程執行。可是在這裏,並無採用這樣的方式,由於這樣會要額外地對任務分派線程進行管理,無形地會增長難度和複雜度,這裏直接讓執行完任務的線程Worker去任務緩存隊列裏面取任務來執行,由於每個Worker裏面都包含了一個線程thread。


2. addIfUnderMaximumPoolSize

這個方法的實現思想和 addIfUnderCorePoolSize方法的實現思想很是類似,惟一的區別在於addIfUnderMaximumPoolSize方法是在線程 池中的線程數達到了核心池大小而且往任務隊列中添加任務失敗的狀況下執行的:

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < maximumPoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        return t != null;
    }

到這裏,大部分朋友應該對任務提交給線程池以後到被執行的整個過程有了一個基本的瞭解,下面總結一下:

  1. 首先,要清楚corePoolSize和maximumPoolSize的含義;
  2. 其次,要知道Worker是用來起到什麼做用的;
  3. 要知道任務提交給線程池以後的處理策略,這裏總結一下主要有4點:
  • 若是當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會建立一個線程去執行這個任務;
  • 若是當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閒線程將其取出去執行;若添加失敗(通常來講是任務緩存隊列已滿),則會嘗試建立新的線程去執行這個任務;
  • 若是當前線程池中的線程數目達到maximumPoolSize,則會採起任務拒絕策略進行處理;
  • 若是線程池中的線程數量大於 corePoolSize時,若是某線程空閒時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於 corePoolSize;若是容許爲核心池中的線程設置存活時間,那麼核心池中的線程空閒時間超過keepAliveTime,線程也會被終止。

這篇寫完了,後面會介紹一下任務緩存隊列的種類已經緩存的策略以及任務拒絕策略等。若是文章有什麼問題,歡迎你們指正,你們互相溝通,互相學習。

相關文章
相關標籤/搜索