JUC源碼分析-線程池篇(一):ThreadPoolExecutor

JUC源碼分析-線程池篇(一):ThreadPoolExecutor

Java 中的線程池是運用場景最多的併發框架,幾乎全部須要異步或併發執行任務的程序均可以使用線程池。在開發過程當中,合理地使用線程池可以帶來 3 個好處。java

  • 第一:下降資源消耗。經過重複利用已建立的線程下降線程建立和銷燬形成的消耗。
  • 第二:提升響應速度。當任務到達時,任務能夠不須要等到線程建立就能當即執行。
  • 第三:提升線程的可管理性。線程是稀缺資源,若是無限制地建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行統一分配、調優和監控。可是,要作到合理利用線程池,必須對其實現原理了如指掌。

1. ThreadPoolExecutor 執行流程

線程池執行流程

ThreadPoolExecutor 執行 execute 方法分下面 4 種狀況。編程

1)若是當前運行的線程少於 corePoolSize,則建立新線程來執行任務(注意,執行這一步驟須要獲取全局鎖)。
2)若是運行的線程等於或多於 corePoolSize,則將任務加入 BlockingQueue。
3)若是沒法將任務加入 BlockingQueue(隊列已滿),則建立新的線程來處理任務(注意,執行這一步驟須要獲取全局鎖)。
4)若是建立新線程將使當前運行的線程超出 maximumPoolSize,任務將被拒絕,並調用 RejectedExecutionHandler.rejectedExecution() 方法。併發

ThreadPoolExecutor 採起上述步驟的整體設計思路,是爲了在執行 execute() 方法時,儘量地避免獲取全局鎖(那將會是一個嚴重的可伸縮瓶頸)。在 ThreadPoolExecutor 完成預熱以後(當前運行的線程數大於等於 corePoolSize),幾乎全部的 execute() 方法調用都是執行步驟2,而步驟2不須要獲取全局鎖。框架

上面的流程分析讓咱們很直觀地瞭解了線程池的工做原理,讓咱們再經過源代碼來看看是如何實現的,線程池執行任務的方法以下。咱們從 execute 入手分析源碼。異步

2. ThreadPoolExecutor 源碼分析

2.1 主要屬性

2.1.1 線程池生命週期

private static final int RUNNING    = -1 << COUNT_BITS; // 運行狀態
private static final int SHUTDOWN   =  0 << COUNT_BITS; // 拒絕提交新任務,會執行完已提交的任務後關閉線程池
private static final int STOP       =  1 << COUNT_BITS; // 拒絕提交新任務,也拒絕執行已提交的任務
private static final int TIDYING    =  2 << COUNT_BITS; // 關閉線程池後,線程所有關閉後的狀態,以後回調 terminated
private static final int TERMINATED =  3 << COUNT_BITS; // 回調 terminated 方法後狀態變爲 TERMINATED

線程池用 ctl 的低 29 位表示線程池中的線程數,高 3 位表示當前線程狀態。oop

  • RUNNING:運行狀態,高3位爲111;
  • SHUTDOWN:關閉狀態,高3位爲000,在此狀態下,線程池再也不接受新任務,可是仍然處理阻塞隊列中的任務;
  • STOP:中止狀態,高3位爲001,在此狀態下,線程池再也不接受新任務,也不會處理阻塞隊列中的任務,正在運行的任務也會中止;
  • TIDYING:高3位爲010;
  • TERMINATED:終止狀態,高3位爲011。

線程池生命週期

2.1.2 線程狀態標識 ctl

// ctl 高3位表示線程池狀態,低29位表示當前工做線程數
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;         // 低29位表示工做線程數
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;    // 最大線程數 0x1fffffff

// 獲取線程池狀態、線程總數、構造 ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

2.1.3 其它屬性

// 全局鎖,建立工做線程等操做時須要獲取全局鎖
private final ReentrantLock mainLock = new ReentrantLock();
private final Condition termination = mainLock.newCondition();

// 工做線程
private final HashSet<Worker> workers = new HashSet<Worker>();
private int largestPoolSize;
private volatile int corePoolSize;

2.2 任務提交 execute

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // ctl 高3位表示線程池狀態,低29位表示當前工做線程數
    int c = ctl.get();
    // 1. 小於核心線程數,建立新的線程執行任務。須要獲取全局鎖
    if (workerCountOf(c) < corePoolSize) {
        // addWorker 建立新的工做線程,true 表示核心線程數,false 表示最大線程數
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 2. 核心線程已滿,將任務提交到隊列中。不須要獲取全局鎖
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 2.1 恰好此時線程池關閉了,則須要將任務從隊列中踢除
        if (!isRunning(recheck) && remove(command))
            reject(command);    // 任務被踢除後回滾,執行拒絕任務
        // 2.2 線程池工做線程爲0,建立一個新的工做線程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3. 隊列滿後且線程數小於最大線程數,則建立新的線程執行任務。須要獲取全局鎖
    // 4. 超出最大線程拒絕任務
    else if (!addWorker(command, false))
        reject(command);
}

2.3 工做線程 Worker

工做線程:線程池建立線程時,會將線程封裝成工做線程 Worker,Worker 在執行完任務後,還會循環獲取工做隊列裏的任務來執行。咱們能夠從 Worker 類的 run() 方法裏看到這點。源碼分析

// Worker 是對線程 Thread 的包裝,實現了 AbstractQueuedSynchronizer
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    final Thread thread;    // 包裝的線程
    Runnable firstTask;     // 線程初始化時的任務,能夠爲 null

    Worker(Runnable firstTask) {
        setState(-1);   // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
        runWorker(this);
    }
}

思考:Worker 爲何要繼承 AbstractQueuedSynchronizer 實現本身的鎖,而不使用 ReentrantLock 呢?ui

實際上 ReentrantLock 是可重入鎖,而 Worker 實現的是獨佔鎖,只有三種狀 -1(初始化)、0(釋放鎖)、1(佔有鎖)。Worker 之因此實現獨佔鎖是爲了不在線程執行的時候被 interrupted 中斷(下面會講到)。this

2.4 建立工做線程 addWorker

// addWorker 建立一個新的工做線程
// firstTask 線程初始化任務,能夠爲 null;core 表示是核心線程仍是最大線程
private boolean addWorker(Runnable firstTask, boolean core) {
    // 1. 經過自旋線程數+1 compareAndIncrementWorkerCount
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 1.1 1、STOP 不能建立新線程
        //     2、SHUTDOWN 時 workQueue 爲空,也不能建立新線程
        //        firstTask 表示線程初始化任務,是新提交的任務,SHUTDOWN 時拒絕新提交的任務
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
            return false;
        // 1.2 自旋使線程數+1
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)    // 不斷檢查線程池狀態變化
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    // 2. 建立線程 Worker
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 2.1 初始化工做線程 Worker,使用全局鎖添加到 workers 隊列中
        w = new Worker(firstTask);
        final Thread t = w.thread;  // threadFactory 可能建立線程失敗,返回 null
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
                // 2.2 1、RUNNING能夠建立新線程
                //     2、SHUTDOWN不接收新任務,但會執行完 workQueue 的任務 ,所以能夠建立空任務的線程
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)    // largestPoolSize 表示線程池運行過程當中達到的最大線程數
                        largestPoolSize = s;
                    workerAdded = true;         // 工做線程添加到 workers 成功
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;           // 啓動線程成功
            }
        }
    } finally {
        // 2.3 建立工做線程失敗,回滾
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

總結:spa

  1. addWorker 前半部分主要是判斷可否新建工做線程,若是容許則執行 compareAndIncrementWorkerCount(c),利用 CAS 原則,將線程數量+1。
  2. addWorker 後半部分則是真正建立工做線程並啓動,這個過程須要獲取全局鎖。建立失敗則須要回滾 addWorkerFailed。

addWorker 的 4 種調用方式:

  1. addWorker(command, true) 線程數 < coreSize 時,則建立新線程
  2. addWorker(command, false) 當①阻塞隊列已滿,②線程數 < maximumPoolSize 時,則建立新線程
  3. addWorker(null, true) 同 1。只是線程初始化任務爲 null,至關於建立一個新的線程。實際的使用是在 prestartCoreThread() 等方法,有興趣的讀者能夠自行閱讀,在此不作詳細贅述。
  4. addWorker(null, false) 同 2。只是線程初始化任務爲 null,至關於建立一個新的線程,沒立馬分配任務;

2.4 線程執行 runWorker

在 addWorker 建立線程後調用 t.start() 啓動線程,run 方法主要乾了一件事,調用 runWorker(this),接下來咱們來看看 runWorker 的具體實現。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;    // 線程初始化任務 task
    w.firstTask = null;
    // 1. Worker 是獨佔鎖,此時狀態由 -1 -> 0,也就是其它線程才能獲取w的鎖,進而interrupt
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 2. 循環經過 getTask 獲取任務,若是不能獲取任務了,退出循環,關閉線程池
        //    也就是說 getTask 返回 null 時線程就關閉了
        while (task != null || (task = getTask()) != null) {
            w.lock();   // 獲取鎖,這樣在線程執行過程當中不能中斷線程(interrupt)
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // 
            // 3.1 線程池已經STOP,若是線程尚未被中斷(wt.isInterrupted=false),則調用wt.interrupt中斷線程
            // 3.2 若是runStateAtLeast(ctl.get(), STOP)=false,則說明線程池處於RUNNING或SHUTDOWN狀態
            //     調用 Thread.interrupted() 後會清空線程的 interrupted 狀態
            //     Thread.interrupted()&& false 結果始終爲 false,這裏僅僅是爲了調用Thread.interrupted()
            //  實際上就是:一若是線程已經STOP,則必定要將線程 interrupt
            //             二若是線程處於運行狀態(包括SHUTDOWN),則必定不能 interrupt(也就是要清除 interrupt 標記)
            if ((runStateAtLeast(ctl.get(), STOP) || 
                    (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
                && !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);        // 執行前
                Throwable thrown = null;
                try {
                    task.run();                 // 執行任務
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown); // 執行後
                }
            } finally {
                task = null;
                w.completedTasks++;             // 統計執行的任務數
                w.unlock();                     // 釋放鎖,能夠被中斷了
            }
        }
        completedAbruptly = false;              // true時表示正常退出,false表示異常退出
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

總結,runWoker 具體實現:

  1. 線程啓動後,釋放鎖,設 AQS 狀態爲 0,釋放鎖。此時其它線程才能夠獲取鎖,中斷線程 interrupt;
  2. 獲取 firstTask 任務並執行,執行任務先後可定製 beforeExecute 和 afterExecute;
  3. 若是 getTask 從阻塞隊列獲取等待任務執行,若是獲取的任務爲 null,while 則退出循環,線程關閉。
  4. 若是線程已經STOP,則必定要將線程 interrupt。若是線程處於運行狀態(包括SHUTDOWN),則必定不能 interrupt。但實際上 interrupt() 方法並不必定能中斷正在運行的線程,它只能喚醒 wait 阻塞的線程或給線程設置一個標記位。業務線程必須對 interrupt 作出響應才能中斷線程,不然會一直等線程執行結束纔會銷燬。

2.5 獲取任務 getTask

// 注意 getTask 前 worker 釋放了鎖,也就是可能被 interrupt 喚醒
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {      // 自旋獲取任務
        int c = ctl.get();
        int rs = runStateOf(c);

        // 1. ①STOP直接銷燬線程,②SHUTDOWN時任務隊列爲空時也直接銷燬線程
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();      // 原子性更新,工做線程數-1
            return null;
        }

        int wc = workerCountOf(c);      // 當前工做線程數
        // 2.1 timed表示是否能夠銷燬線程。timed=true表示超時獲取任務,則可能返回null
        //     當線程數大於核心線程數或容許銷燬核心線程時 timed=true
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        // 2.2 一是超過了最大線程數,當線程池啓動後手動修改最大線程數可能會出現這種狀況
        //     二是當容許銷燬線程時,獲取任務超時
        // 2.3 三是線程池中至少有一個工做線程或任務隊列爲空,則能夠銷燬線程
        if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))      // 失敗重試,此時線程數已經-1
                return null;
            continue;
        }

        try {
            // 3. 獲取任務,無限等待則不會返回 null,也就不會銷燬線程。而限時等待則可能返回 null
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;   // 其它線程喚醒等待的線程
        }
    }
}

總結,整個 getTask 循環實現:

  1. getTask 時,worker 已經釋放了鎖,也就是說其它線程能夠調用 wt.interrupt() 喚醒等待的線程。
  2. 若是當前線程數大於最大線程數,或容許核心線程銷燬時,若是獲取任務超時則返回 null,即銷燬線程。

2.6 線程關閉

2.6.1 shutdown 和 shutdownNow

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();      // 權限檢查
        advanceRunState(SHUTDOWN);  // 更新線程池狀態爲 SHUTDOWN
        interruptIdleWorkers();     // 關閉全部的空閒線程
        onShutdown();               // 子類實現,如 ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();     // 嘗試中止線程池
}

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();  // 權限檢查
        advanceRunState(STOP);  // 更新線程池狀態爲 SHUTDOWN
        interruptWorkers();     // 關閉全部的線程
        tasks = drainQueue();   // 返回還未執行的任務
    } finally {
        mainLock.unlock();
    }
    tryTerminate();     // 嘗試中止線程池
    return tasks;
}

總結,shutdown 和 shutdownNow 區別:

  1. shutdown 會執行完成已提交的任務後關閉線程池,而 shutdownNow 則會踢除已提交的任務。
  2. shutdown 調用 interruptIdleWorkers 關閉空閒的線程,而 shutdownNow 調用 interruptWorkers 強行中斷全部的線程。

2.6.2 interruptIdleWorkers 和 interruptWorkers

// 關閉全部的空閒線程
private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

// 中斷線程其實是調用 t.interrupt(),須要獲取線程鎖 w.tryLock
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            // 只有空閒線程才能獲取鎖,正在執行的線程沒法獲取鎖,也就沒法中斷
            // 這也就是爲何 Worker 要實現獨佔鎖的緣由。
            if (!t.isInterrupted() && w.tryLock()) {    // 須要獲取w的獨佔鎖
                try {
                    t.interrupt();  // 其實是調用 t.interrupt() 中斷線程
                                    // 其實是給能線程設置一個標記位
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

interruptIdleWorkers 只會嘗試獲取鎖,所以只會中斷空閒線程。而 interruptWorkers 不須要獲取鎖,強行中斷線程。實際上業務線程必須對 interrupt 作出響應才能中斷線程,不然會一直等線程執行結束纔會銷燬。

private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}
// 調用Worker#interruptIfStarted 不須要獲取鎖
void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}

而 interruptIdleWorkers 和 interruptWorkers 都是 interrupt 全部線程, 所以大部分線程將馬上被中斷。之因此是大部分,而不是所有,是由於 interrupt() 方法能力有限。 若是線程中沒有 sleep 、wait、Condition、定時鎖等應用, interrupt() 方法是沒法中斷當前的線程的。因此,ShutdownNow() 並不表明線程池就必定當即就能退出,它可能必需要等待全部正在執行的任務都執行完成了才能退出。 以下面這個線程永遠不會中斷,由於該線程沒有響應 Thread.interrupted() 或者是直接將 InterruptedException 異常 catch 了。

// 沒法響應 interrupted,線程永遠沒法停止。
executorService.submit(() -> { while (true) System.out.println("go go go"); });
executorService.shutdownNow();

2.6.3 tryTerminate

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        // 1. RUNNING或SHUTDOWN還有任務執行時不能關閉,TIDYING則已經關閉
        if (isRunning(c) ||                 // 1.1 正在運行,不能中斷
            runStateAtLeast(c, TIDYING) ||  // 1.2 已經中斷,不須要執行
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))   // 1.3 SHUTDOWN時還有任務執行
            return;
        // 2. 還有線程則關閉空閒線程
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 3. 工做線程數爲0時,能夠關閉線程池了,設置線程狀態爲TIDYING,
            //    並回調terminated後,線程的狀態最終變爲TERMINATED
            // 4. 線程狀態設置失敗,則 CAS 自旋
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

除了 shutdown 和 shutdownNow 外,addWorkerFailed、processWorkerExit、remove 等方法也會調用 tryTerminate 方法。

參考:

  1. 《Java併發編程的藝術》

天天用心記錄一點點。內容也許不重要,但習慣很重要!

相關文章
相關標籤/搜索