ThreadPoolExecutor源碼分析

threadpoolexecutor源碼分析


文章轉自:threadpoolexecutor源碼分析
前段時間學習java.util.concurrent的源碼,學習線程池這一塊的時候發現了一篇不錯的文章,就記錄下來。同時,文章之中加入了本身的一些看法。廢話很少說,直接開始。html

ThreadPoolExecutor做爲Java.util.concurrent包中核心的類,先看下類型的結構: java

image

核心的接口實際上是Executor,它只有一個execute方法抽象爲對任務(Runnable接口)的執行, ExecutorService接口在Executor的基礎上提供了對任務執行的生命週期的管理,主要是submit和shutdown方法, AbstractExecutorService對ExecutorService一些方法作了默認的實現,主要是submit和invoke方法,而真正的任務執行 的Executor接口execute方法是由子類實現,就是ThreadPoolExecutor,它實現了基於線程池的任務執行框架,因此要了解 JDK的線程池,那麼就得先看這個類。併發

再看execute方法以前須要先介幾個變量或類。框架

ctl

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

這個變量是整個類的核心,AtomicInteger保證了對這個變量的操做是原子的,經過巧妙的操做,ThreadPoolExecutor用這一個變量保存了兩個內容:函數

  • 全部有效線程的數量oop

  • 各個線程的狀態(runState)源碼分析

低29位存線程數,高3位存runState,這樣runState有5個值:學習

  • RUNNING:-536870912ui

  • SHUTDOWN:0this

  • STOP:536870912

  • TIDYING:1073741824

  • TERMINATED:1610612736

線程池中各個狀態間的轉換比較複雜,主要記住下面內容就能夠了:

  • RUNNING狀態:線程池正常運行,能夠接受新的任務並處理隊列中的任務;

  • SHUTDOWN狀態:再也不接受新的任務,可是會執行隊列中的任務;

  • STOP狀態:再也不接受新任務,不處理隊列中的任務

圍繞rtc有一些操做和變量:

/**
 * 這個方法用於取出runState的值 由於CAPACITY值爲:00011111111111111111111111111111
 * ~爲按位取反操做,則~CAPACITY值爲:11100000000000000000000000000000
 * 再同參數作&操做,就將低29位置0了,而高3位仍是保持原先的值,也就是runState的值
 * 
 * @param c
 *            該參數爲存儲runState和workerCount的int值
 * @return runState的值
 */
private static int runStateOf(int c) {
    return c & ~CAPACITY;
}


/**
 * 這個方法用於取出workerCount的值
 * 由於CAPACITY值爲:00011111111111111111111111111111,因此&操做將參數的高3位置0了
 * 保留參數的低29位,也就是workerCount的值
 * 
 * @param c
 *            ctl, 存儲runState和workerCount的int值
 * @return workerCount的值
 */
private static int workerCountOf(int c) {
    return c & CAPACITY;
}

/**
 * 將runState和workerCount存到同一個int中
 * 「|」運算的意思是,假設rs的值是101000,wc的值是000111,則他們位或運算的值爲101111
 * 
 * @param rs
 *            runState移位事後的值,負責填充返回值的高3位
 * @param wc
 *            workerCount移位事後的值,負責填充返回值的低29位
 * @return 二者或運算事後的值
 */
private static int ctlOf(int rs, int wc) {
    return rs | wc;
}

// 只有RUNNING狀態會小於0
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

corePoolSize

核心線程池大小,活動線程小於corePoolSize則直接建立,大於等於則先加到workQueue中,隊列滿了才建立新的線程。

keepAliveTime

線程從隊列中獲取任務的超時時間,也就是說若是線程空閒超過這個時間就會終止。

Worker

private final class Worker extends AbstractQueuedSynchronizer implements Runnable ...

內部類Worker是對任務的封裝,全部submit的Runnable都被封裝成了Worker,它自己也是一個Runnable, 而後利用AQS框架(關於AQS能夠看我這篇文章)實現了一個簡單的非重入的互斥鎖, 實現互斥鎖主要目的是爲了中斷的時候判斷線程是在空閒仍是運行,能夠看後面shutdown和shutdownNow方法的分析。

// state只有0和1,互斥
protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;// 成功得到鎖
    }
    // 線程進入等待隊列
    return false;
}

protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
}

之因此不用ReentrantLock是爲了不任務執行的代碼中修改線程池的變量,如setCorePoolSize,由於ReentrantLock是可重入的。

execute

execute方法主要三個步驟:

  • 活動線程小於corePoolSize的時候建立新的線程;

  • 活動線程大於corePoolSize時都是先加入到任務隊列當中;

  • 任務隊列滿了再去啓動新的線程,若是線程數達到最大值就拒絕任務。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    // 活動線程數 < corePoolSize
    if (workerCountOf(c) < corePoolSize) {
        // 直接啓動新的線程。第二個參數true:addWorker中會從新檢查workerCount是否小於corePoolSize
        if (addWorker(command, true))
            // 添加成功返回
            return;
        c = ctl.get();
    }
    // 活動線程數 >= corePoolSize
    // runState爲RUNNING && 隊列未滿
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // double check
        // 非RUNNING狀態 則從workQueue中移除任務並拒絕
        if (!isRunning(recheck) && remove(command))
            reject(command);// 採用線程池指定的策略拒絕任務
        // 線程池處於RUNNING狀態 || 線程池處於非RUNNING狀態可是任務移除失敗
        else if (workerCountOf(recheck) == 0)
            // 這行代碼是爲了SHUTDOWN狀態下沒有活動線程了,可是隊列裏還有任務沒執行這種特殊狀況。
            // 添加一個null任務是由於SHUTDOWN狀態下,線程池再也不接受新任務
            addWorker(null, false);

        // 兩種狀況:
        // 1.非RUNNING狀態拒絕新的任務
        // 2.隊列滿了啓動新的線程失敗(workCount > maximumPoolSize)
    } else if (!addWorker(command, false))
        reject(command);
}

其中比較難理解的應該是addWorker(null, false);這一行,這要結合addWorker一塊兒來看。 主要目的是防止HUTDOWN狀態下沒有活動線程了,可是隊列裏還有任務沒執行這種特殊狀況。

addWorker

/**
* @param firstTask:新增一個線程並執行這個任務,可空,增長的線程從隊列獲取任務;
* 
* @param core:是否使用corePoolSize做爲上限,不然使用maxmunPoolSize
**/
private boolean addWorker(Runnable firstTask, boolean core) {
        retry: for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);// 當前線程池狀態

            // Check if queue empty only if necessary.
            // 這條語句等價:rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null ||
            // workQueue.isEmpty())
            // 知足下列調價則直接返回false,線程建立失敗:
            // rs > SHUTDOWN:STOP || TIDYING || TERMINATED 此時再也不接受新的任務,且全部任務執行結束
            // rs = SHUTDOWN:firtTask != null 此時再也不接受任務,可是仍然會執行隊列中的任務
            // rs = SHUTDOWN:firtTask == null見execute方法的addWorker(null,
            // false),任務爲null && 隊列爲空
            // 最後一種狀況也就是說SHUTDONW狀態下,若是隊列不爲空還得接着往下執行,爲何?add一個null任務目的究竟是什麼?
            // 看execute方法只有workCount==0的時候firstTask纔會爲null結合這裏的條件就是線程池SHUTDOWN了再也不接受新任務
            // 可是此時隊列不爲空,那麼還得建立線程把任務給執行完才行。
            if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
                return false;

            // 走到這的情形:
            // 1.線程池狀態爲RUNNING
            // 2.SHUTDOWN狀態,但隊列中還有任務須要執行
            for (;;) {
                int wc = workerCountOf(c);
                //判斷條件有點難理解,實際上是非運行狀態下(>=SHUTDOWN)或者SHUTDOWN狀態下任務非空(新提交任務)、任務隊列爲空,
                //就不能夠再新增線程了(return false),即SHUTDOWN狀態是能夠新增線程去執行隊列中的任務;  
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))// 原子操做遞增workCount
                    break retry;// 操做成功跳出的重試的循環
                c = ctl.get(); // Re-read ctl
                if (runStateOf(c) != rs)// 若是線程池的狀態發生變化則重試
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        // wokerCount遞增成功

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 併發的訪問線程池workers對象必須加鎖
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    // RUNNING狀態 || SHUTDONW狀態下清理隊列中剩餘的任務
                    if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 將新啓動的線程添加到線程池中
                        workers.add(w);
                        // 更新largestPoolSize
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 啓動新添加的線程,這個線程首先執行firstTask,而後不停的從隊列中取任務執行
                // 當等待keepAlieTime尚未任務執行則該線程結束。見runWoker和getTask方法的代碼。
                if (workerAdded) {
                    t.start();// 最終執行的是ThreadPoolExecutor的runWoker方法
                    workerStarted = true;
                }
            }
        } finally {
            // 線程啓動失敗,則從wokers中移除w並遞減wokerCount
            if (!workerStarted)
                // 遞減wokerCount會觸發tryTerminate方法
                addWorkerFailed(w);
        }
        return workerStarted;
    }

runWorker

任務添加成功後實際執行的是runWorker這個方法,這個方法很是重要,簡單來講它作的就是:

  • 第一次啓動會執行初始化傳進來的任務firstTask;

  • 而後會從workQueue中取任務執行,若是隊列爲空則等待keepAliveTime這麼長時間。

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // Worker的構造函數中抑制了線程中斷setState(-1),因此這裏須要unlock從而容許中斷
        w.unlock();
        // 用於標識是否異常終止,finally中processWorkerExit的方法會有不一樣邏輯
        // 爲true的狀況:1.執行任務拋出異常;2.被中斷。
        boolean completedAbruptly = true;
        try {
            // 若是getTask返回null那麼getTask中會將workerCount遞減,若是異常了這個遞減操做會在processWorkerExit中處理
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted. This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
                        && !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 任務執行前能夠插入一些處理,子類重載該方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();// 執行用戶任務
                    } catch (RuntimeException x) {
                        thrown = x;
                        throw x;
                    } catch (Error x) {
                        thrown = x;
                        throw x;
                    } catch (Throwable x) {
                        thrown = x;
                        throw new Error(x);
                    } finally {
                        // 和beforeExecute同樣,留給子類去重載
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }

            completedAbruptly = false;
        } finally {
            // 結束線程的一些清理工做
            processWorkerExit(w, completedAbruptly);
        }
    }

getTask

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        retry: for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 1.rs > SHUTDOWN 因此rs至少等於STOP,這時再也不處理隊列中的任務
            // 2.rs = SHUTDOWN 因此rs>=STOP確定不成立,這時還須要處理隊列中的任務除非隊列爲空
            // 這兩種狀況都會返回null讓runWoker退出while循環也就是當前線程結束了,因此必需要decrement
            // wokerCount
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // 遞減workerCount值
                decrementWorkerCount();
                return null;
            }

            // 標記從隊列中取任務時是否設置超時時間
            boolean timed; // Are workers subject to culling?

            // 1.RUNING狀態
            // 2.SHUTDOWN狀態,但隊列中還有任務須要執行
            for (;;) {
                int wc = workerCountOf(c);

                // 1.core thread容許被超時,那麼超過corePoolSize的的線程一定有超時
                // 2.allowCoreThreadTimeOut == false && wc >
                // corePoolSize時,通常都是這種狀況,core thread即便空閒也不會被回收,只要超過的線程纔會
                timed = allowCoreThreadTimeOut || wc > corePoolSize;

                // 從addWorker能夠看到通常wc不會大於maximumPoolSize,因此更關心後面半句的情形:
                // 1. timedOut == false 第一次執行循環, 從隊列中取出任務不爲null方法返回 或者
                // poll出異常了重試
                // 2.timeOut == true && timed ==
                // false:看後面的代碼workerQueue.poll超時時timeOut才爲true,
                // 而且timed要爲false,這兩個條件相悖不可能同時成立(既然有超時那麼timed確定爲true)
                // 因此超時不會繼續執行而是return null結束線程。(重點:線程是如何超時的???)
                if (wc <= maximumPoolSize && !(timedOut && timed))
                    break;

                // workerCount遞減,結束當前thread
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get(); // Re-read ctl
                // 須要從新檢查線程池狀態,由於上述操做過程當中線程池可能被SHUTDOWN
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }

            try {
                // 1.以指定的超時時間從隊列中取任務
                // 2.core thread沒有超時
                Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;// 超時
            } catch (InterruptedException retry) {
                timedOut = false;// 線程被中斷重試
            }
        }
    }

processWorkerExit

線程退出會執行這個方法作一些清理工做。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 正常的話再runWorker的getTask方法workerCount已經被減一了
        if (completedAbruptly)
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 累加線程的completedTasks
            completedTaskCount += w.completedTasks;
            // 從線程池中移除超時或者出現異常的線程
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        // 嘗試中止線程池
        tryTerminate();

        int c = ctl.get();
        // runState爲RUNNING或SHUTDOWN
        if (runStateLessThan(c, STOP)) {
            // 線程不是異常結束
            if (!completedAbruptly) {
                // 線程池最小空閒數,容許core thread超時就是0,不然就是corePoolSize
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // 若是min == 0可是隊列不爲空要保證有1個線程來執行隊列中的任務
                if (min == 0 && !workQueue.isEmpty())
                    min = 1;
                // 線程池還不爲空那就不用擔憂了
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 1.線程異常退出
            // 2.線程池爲空,可是隊列中還有任務沒執行,看addWoker方法對這種狀況的處理
            addWorker(null, false);
        }
    }

tryTerminate

processWorkerExit方法中會嘗試調用tryTerminate來終止線程池。這個方法在任何可能致使線程池終止的動做後執行:好比減小wokerCount或SHUTDOWN狀態下從隊列中移除任務。

final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            // 如下狀態直接返回:
            // 1.線程池還處於RUNNING狀態
            // 2.SHUTDOWN狀態可是任務隊列非空
            // 3.runState >= TIDYING 線程池已經中止了或在中止了
            if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
                return;

            // 只能是如下情形會繼續下面的邏輯:結束線程池。
            // 1.SHUTDOWN狀態,這時再也不接受新任務並且任務隊列也空了
            // 2.STOP狀態,當調用了shutdownNow方法

            // workerCount不爲0則還不能中止線程池,並且這時線程都處於空閒等待的狀態
            // 須要中斷讓線程「醒」過來,醒過來的線程才能繼續處理shutdown的信號。
            if (workerCountOf(c) != 0) { // Eligible to terminate
                // runWoker方法中w.unlock就是爲了能夠被中斷,getTask方法也處理了中斷。
                // ONLY_ONE:這裏只須要中斷1個線程去處理shutdown信號就能夠了。
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 進入TIDYING狀態
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // 子類重載:一些資源清理工做
                        terminated();
                    } finally {
                        // TERMINATED狀態
                        ctl.set(ctlOf(TERMINATED, 0));
                        // 繼續awaitTermination
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

shutdown和shutdownNow

shutdown這個方法會將runState置爲SHUTDOWN,會終止全部空閒的線程。

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 線程池狀態設爲SHUTDOWN,若是已經至少是這個狀態那麼則直接返回
            advanceRunState(SHUTDOWN);
            // 注意這裏是中斷全部空閒的線程:runWorker中等待的線程被中斷 → 進入processWorkerExit →
            // tryTerminate方法中會保證隊列中剩餘的任務獲得執行。
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

shutdownNow方法將runState置爲STOP。和shutdown方法的區別,這個方法會終止全部的線程。

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // STOP狀態:再也不接受新任務且再也不執行隊列中的任務。
        advanceRunState(STOP);
        // 中斷全部線程
        interruptWorkers();
        // 返回隊列中尚未被執行的任務。
        tasks = drainQueue();
    }
    finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

主要區別在於shutdown調用的是interruptIdleWorkers這個方法,而shutdownNow實際調用的是Worker類的interruptIfStarted方法:

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            // w.tryLock能獲取到鎖,說明該線程沒有在運行,由於runWorker中執行任務會先lock,
            // 所以保證了中斷的確定是空閒的線程。
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    }
    finally {
        mainLock.unlock();
    }
}
void interruptIfStarted() {
    Thread t;
    // 初始化時state == -1
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}

這就是前面提到的Woker類實現AQS的主要做用。

注意:shutdown方法可能會在finalize被隱式的調用。

相關文章
相關標籤/搜索