線程池源碼分析

概述

在 java 中,線程池 ThreadPoolExecutor 是一個繞不過去的類,它是享元模式思想的體現,經過在容器中建立必定數量的線程加以重複利用,從而避免頻繁建立線程帶來的額外開銷。一個設置合理的線程池能夠提升任務響應的速度,而且避免線程數超過硬件能力帶來的意外狀況。java

在本文,將深刻線程池源碼,瞭解線程池的底層實現與運行機制。函數

1、構造方法

ThreadPoolExecutor 類一共提供了四個構造方法,咱們基於參數最完整構造方法瞭解一下線程池建立所須要的變量:this

public ThreadPoolExecutor(int corePoolSize, // 核心線程數
                          int maximumPoolSize, // 最大線程數
                          long keepAliveTime, // 非核心線程閒置存活時間
                          TimeUnit unit, // 時間單位
                          BlockingQueue<Runnable> workQueue, // 工做隊列
                          ThreadFactory threadFactory, // 建立線程使用的線程工廠
                          RejectedExecutionHandler handler // 拒絕策略) {
}
  • 核心線程數:即長期存在的線程數,當線程池中運行線程未達到核心線程數時會優先建立新線程;
  • 最大線程數:當核心線程已滿,工做隊列已滿,同時線程池中線程總數未超過最大線程數,會建立非核心線程;
  • 非核心線程閒置存活時間:當非核心線程閒置的時的最大存活時間;
  • 時間單位:非核心線程閒置存活時間的時間單位;
  • 任務隊列:當核心線程滿後,任務會優先加入工做隊列,等等待覈心線程消費;
  • 線程工廠:線程池建立新線程時使用的線程工廠;
  • 拒絕策略:當工做隊列與線程池都滿時,用於執行的策略;

2、線程池狀態

1.線程池狀態

線程池擁有一個 AtomicInteger 類型的成員變量 ctl ,經過位運算分別使用 ctl 的高位低位以便在一個值中存儲線程數量以及線程池狀態。線程

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 29(32-3)
private static final int COUNT_BITS = Integer.SIZE - 3;
// 容許的最大工做線程(2^29-1 約5億)
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 運行狀態。線程池接受並處理新任務
private static final int RUNNING    = -1 << COUNT_BITS;
// 關閉狀態。線程池不能接受新任務,處理完剩餘任務後關閉。調用shutdown()方法會進入該狀態。
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 中止狀態。線程池不能接受新任務,而且嘗試中斷舊任務。調用shutdownNow()方法會進入該狀態。
private static final int STOP       =  1 << COUNT_BITS;
// 整理狀態。由關閉狀態轉變,線程池任務隊列爲空時進入該狀態,會調用terminated()方法。
private static final int TIDYING    =  2 << COUNT_BITS;
// 終止狀態。terminated()方法執行完畢後進入該狀態,線程池完全中止。
private static final int TERMINATED =  3 << COUNT_BITS;

2.線程狀態的計算

這裏比較很差理解的是上述-1的位運算,下面咱們來分析一下:日誌

在計算機中,二進制負數通常用補碼錶示,即源碼取反再加一。但又有這種說法,即將最高位做爲符號位,0爲正數,1爲負數。實際上二者是能夠結合在一塊兒看的。假如數字是單字節數,1 字節對應8 bit,即八位,如今,咱們要計算 - 1。code

按照第二種說法,最高位爲符號位,則有 1/000 0001,而後按第一種說法取反後+1,而且符號位不變,則有 1/111 1110 + 1,即 1/111 1111。對象

如今回到 -1 << COUNT_BITS這行代碼:繼承

一個 int 是 4 個字節,對應 32 bit,按上述過程 -1 轉爲二進制即爲 1/111......1111(32個1), COUNT_BITS是 29,-1 左移 29 位,最終獲得 111.0...0000。隊列

同理,計算其餘的幾種狀態,可知分別是:rem

狀態 二進制
RUNNING 111...0....00
SHUTDOWN 000...0....00
STOP 001...0....00
TIDYING 010...0....00
TERMINATED 011...0....00

其中,咱們能夠知道 SHUTDOWN 狀態轉爲十進制也是 0 ,而 RUNNING 做爲有符號數,它的最高位是 1,說明轉爲十進制之後是個負數,其餘的狀態最高位都是 0,轉爲十進制以後都是正數,也就是說,咱們能夠這麼認爲:

小於 SHUTDOWN 的就是 RUNNING,大於 SHUTDOWN 就是中止或者中止中。

這也是後面狀態計算的一些寫法的基礎。好比 isRunning()方法:

private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

3.線程狀態與工做線程數的獲取

// 根據當前運行狀態和工做線程數獲取當前的 ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
// 獲取運行狀態
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 獲取工做線程數
private static int workerCountOf(int c)  { return c & CAPACITY; }

前面獲取狀態的時候調用了 ctlOf()方法,根據前面,咱們能夠知道,CAPACITY其實是 29 位,而線程狀態用的是 32 - 30 共 3 位,也就是說,ctl 共 32 位,高3 位用於表示線程池狀態,而低 29 位表示工做線程的數量

這樣上述三個方法就很好理解了:

  • ctlOf():獲取 ctl。

    將工做線程數量與運行狀態進行於運算,假如咱們處於 RUNNING,而且有 1 個工做線程,那麼 ctl = 111....000 | 000.... 001,最終獲得 111 ..... 001;

  • runStateOf():獲取運行狀態。

    繼續根據上文的數據,~CAPACITY 取反即爲 111....000,與運行狀態 111...0000 與運算,最終獲得 111....000,至關於低位掩碼,消去低 29 位;

  • workerCountOf():獲取工做線程數。

    同理,c & CAPACITY裏的 CAPACITY 至關於高位掩碼,用於消去高 3 位,最終獲得 00...001,即工做線程數。

同理,若是要增長工做線程數,就直接經過 CAS 去遞增 ctl,好比新建線程中使用的公共方法:

private boolean compareAndIncrementWorkerCount(int expect) {
    // 經過 CAS 遞增 ctl
    return ctl.compareAndSet(expect, expect + 1);
}

要改變線程池狀態,就根據當前工做線程和要改變的狀態去合成新的 ctl,而後 CAS 改變 ctl,好比 shutdown()中涉及的相關代碼:

private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            if (runStateAtLeast(c, targetState) ||       
		        // 經過 CAS 改變 ctl
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }

3、任務的建立與執行

線程池任務提交方法是 execute(),根據代碼可知,當一個任務進來時,分四種狀況:

  • 當前工做線程數小於核心線程數,啓動新線程;
  • 當前工做線程數大於核心線程數,可是未大於最大線程數,嘗試添加到工做隊列;
  • 當前線程池核心線程和隊列都滿了,嘗試建立新非核心線程。
  • 非核心線程建立失敗,說明線程池完全滿了,執行拒絕策略。
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    
    // 1.當前工做線程數小於核心線程數,啓動新線程
    if (workerCountOf(c) < corePoolSize) {
        // 添加任務
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    
    // 2. 當前工做線程數大於核心線程數,可是未大於最大線程數,嘗試添加到工做隊列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 若是當前線程處於非運行態,而且移除當前任務成功,則拒絕任務(防止添加到一半就shutdown)
        if (! isRunning(recheck) && remove(command)) 
            reject(command);
        // 若是當前沒有工做線程了,就啓動新線程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    
    // 3.當前線程池核心線程和隊列都滿了,嘗試建立新非核心線程
    else if (!addWorker(command, false))
        // 4.線程池完全滿了,執行拒絕策略
        reject(command);
}

1.添加任務

添加任務依靠 addWorker()方法,這個方法很長,可是主要就幹了兩件事:

  • CAS 讓 ctl 的工做線程數 +1;
  • 啓動新的線程;
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    // 1.改變 ctl 使工做線程+1
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 若是當前不處於運行狀態,傳入任務爲空,而且任務隊列爲空的時候拒絕添加新任務
        // 即線程池 shutdown 時不讓添加新任務,可是運行繼續跑完任務隊列裏的任務。
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            // 線程不容許超過最大線程數,核心線程不容許超過最大核心線程數
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // CAS 遞增工做線程數
            if (compareAndIncrementWorkerCount(c))
                // 失敗了就從新回到上面的retry處繼續往下執行
                break retry;
            // 更新 ctl
            c = ctl.get();
            // 若是運行狀態改變了就所有歷來
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    // 2.啓動新線程
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 建立新線程
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 加鎖
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
			   
                // 若是線程池處於運行狀態,或者沒有新任務的SHUTDOWN狀態(即SHUTDOW之後還在消費工做隊列裏的任務) 
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 線程是否在未啓動前就已經啓動了
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                     // 若是集合中的工做線程數大於最大線程數,則將池中最大線程數改成當前工做線程數
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // 線程建立完成
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 若是線程成功建立,就啓動線程,而且更改啓動狀態爲成功
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 若是線程啓動不成功,就執行失敗策略
        if (! workerStarted)
            // 啓動失敗策略,從當前工做線程隊列移除當前啓動失敗的線程,遞減工做線程數,而後嘗試關閉線程池(若是當前任務就是線程池最後一個任務)
            addWorkerFailed(w);
    }
    return workerStarted;
}

2. 任務對象Worker

根據上文,不難發現,在線程池中線程每每以 Worker 對象的方式存在,那麼這個 Worker 又是何方神聖?

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {

        // 工做線程
        final Thread thread;
    
        // 要執行的任務
        Runnable firstTask;
    
        // 線程執行過的任務數
        volatile long completedTasks;

        // 經過線程工廠建立工做線程
        Worker(Runnable firstTask) {
            setState(-1);
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        // 執行任務
        public void run() {
            runWorker(this);
        }
    
    	... ...
    }

這個 Worker 類繼承了 AQS,也就是說,他自己就至關於一個同步隊列,結合他的成員變量 thread 和 firstTask,能夠知道他實際上就是咱們線程池中所說的「線程」。除了父類 AQS 自己提供的獨佔鎖之外,Worker 還提供了一些檢查任務線程運行狀態以及中斷線程相關的方法。

此外,線程池中還有一個工做隊列 workers,用於保存當前所有的 Worker

private final HashSet<Worker> workers = new HashSet<Worker>();

3.任務的啓動

當調用 Worker.run()的時候,其實調用的是 runWorker()方法。

runWorker()方法實際上就是調用線程執行任務的方法,他的邏輯大題是這樣的:

  • 拿到入參的新 Worker,一直循環獲取 Worker 裏的任務;
  • 加鎖而後執行任務;
  • 若是執行完任務流程,而且沒有發生異常致使 Worker 掛掉,就直接複用 Worker(在獲取任務的方法 getTask()中循環等待任務);
  • 若是執行完任務流程後發現發生異常致使 Worker 掛掉,就從工做隊列中移除當前 Worker,而且補充一個新的;

若是整個流程執行完畢,就刪除當前的 Worker。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // 新建立的Worker默認state爲-1,AQS的unlock方法會將其改成0,此後容許使用interruptIfStarted()方法進行中斷
    
    // 完成任務之後是否須要移除當前Worker,即當前任務是否意外退出
    boolean completedAbruptly = true;
    
    try {
        // 循環獲取任務
        while (task != null || (task = getTask()) != null) {
            // 加鎖,防止 shundown 時中斷正在運行的任務
            w.lock();
            // 若是線程池狀態爲 STOP 或更後面的狀態,中斷線程任務
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 鉤子方法,默認空實現,須要本身提供
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 執行任務
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 鉤子方法
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                // 任務執行完畢
                w.completedTasks++;
                w.unlock();
            }
        }
        
        completedAbruptly = false;
    } finally {
        // 根據completedAbruptly決定是否要移除意外退出的Worker,並補充新的Worker
        // 也就是說,若是上述過程順利完成,工做線程沒有掛掉,就不刪除,下次繼續用,不然就幹掉它再補充一個。
        processWorkerExit(w, completedAbruptly);
    }
}

4.任務的獲取與超時處理

runWorker()方法中,經過 getTask()方法去獲取任務。值得注意的是,超時處理也在此處,簡單的來講,整套流程是這樣的:

  • 判斷線程池是否關閉,工做隊列是否爲空,若是是說明沒任務了,直接返回null,不然接着往下判斷;
  • 判斷當前是否存在非核心線程,若是是說明須要進行超時處理;
  • 獲取任務,若是不須要超時處理,則直接從任務隊列獲取任務,不然根據 keepaliveTime 阻塞一段時間後獲取任務,若是獲取不到,說明非核心線程超時,返回 null 交給 runWorker()中的processWorkerExit()方法去刪除;

換句話說,runWorker()方法一旦執行完畢,必然會刪除當前的 Worker,而經過 getTask()拿任務的 Worker,在線程池正常運行的狀態下,核心線程只會一直在 for 循環中等待直到拿到任務,而非核心線程超時之後拿不到任務就會返回一個 null,而後回到 runWorker()中走完processWorkerExit()方法被刪除。

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 若是線程池關閉了,而且工做隊列裏的任務都完成了,或者線程池直接進入了 STOP 或更進一步的狀態,就不返回新任務
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        // 獲取當前工做線程
        int wc = workerCountOf(c);

        // 核心線程是否超時(默認false)或當前是否存在非核心線程,即判斷當前當前是否須要進行超時控制
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 判斷線程是否超過最大線程數或存在非核心線程
        if ((wc > maximumPoolSize || (timed && timedOut))
            // 而且除非任務隊列爲空,不然池中最少有一個線程
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 獲取任務
            Runnable r = timed ?
                // 阻塞 keepaliveTime 以獲取任務,若是在 keepaliveTime 時間內沒有獲取到任務,則返回 null.
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            // 若是獲取不到任務,說明非核心線程超時了,下一輪判斷確認是否退出循環。
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

4、線程池的中斷

image-20210211171605477

線程池的中斷方法分爲三種:

  • shutdown():中斷線程池,再也不添加新任務,同時等待當前進行和隊列中的任務完成
  • shutdownNow():當即中斷線程池,再也不添加新任務,同時中斷全部工做中的任務,再也不處理任務隊列中任務

1.shutdown

shutdown 是有序關閉。主要乾了三件事:

  • 改變當前線程池狀態爲 SHUTDOWN;
  • 將當前工做隊列中的所有線程標記爲中斷;
  • 完成上述過程後將線程池狀態改成 TIDYING
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    // 加鎖
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 改變當前線程池狀態
        advanceRunState(SHUTDOWN);
        // 中斷當前線程
        interruptIdleWorkers();
        // 鉤子函數,默認空實現
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

其中,interruptIdleWorkers()方法以下:

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 遍歷工做隊列中的所有 Worker
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    // 標記爲中斷
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

2.shutdownNow

shutdownNow()shutdown()流程相似,可是會直接將狀態轉爲 STOP,在 addWorker() 或者getTask()等處理任務的相關方法裏,會針對 STOP 或更進一步的狀態作區分,將不會再處理任務隊列中的任務,配合drainQueue()方法以刪除任務隊列中的任務。

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 改變當前線程池狀態
        advanceRunState(STOP);
        // 中斷當前線程
        interruptWorkers();
        // 刪除任務隊列中的任務
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

5、拒絕策略

當任務隊列已滿,而且線程池中線程也到達最大線程數的時候,就會調用拒絕策略。也就是reject()方法

final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

拒絕策略共分四種:

  • AbortPolicy:拒絕策略,直接拋出異常,默認策略;
  • CallerRunsPolicy:調用者運行策略,用調用者所在的線程來執行任務;
  • DiscardOldestPolicy:棄老策略,無聲無息的丟棄阻塞隊列中靠最前的任務,並執行當前任務;
  • DiscardPolicy:丟棄策略,直接無聲無息的丟棄任務;

咱們能夠簡單的瞭解一下他們的實現:

AbortPolicy

throw new RejectedExecutionException("Task " + r.toString() +
                                     " rejected from " +
                                     e.toString());

CallerRunsPolicy

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        r.run();
    }
}

DiscardOldestPolicy

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        // 彈出隊頭元素
        e.getQueue().poll();
        e.execute(r);
    }
}

DiscardPolicy

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
	// Does nothing
}

6、線程池的鉤子函數

和 HashMap 與 LinkedHashMap 中的行爲有點相似,在線程池的代碼中,有些方法調用了一些具備空實現的方法,這些方法是提供給用戶去繼承並重寫的鉤子函數,主要包括三個:

  • beforeExecute():在執行任務以前回調
  • afterExecute():在任務執行完後回調
  • terminated():在線程池中的全部任務執行完畢後回調

經過繼承 ThreadPoolExecutor 類,並重寫以上三個方法,咱們能夠進行監控或者輸出日誌,更方便的瞭解線程池的狀態。

值得一提的是,afterExecute()方法的入參類型是(Runnable r, Throwable t),也就是說,若是線程運行中拋出異常,咱們也能夠經過該方法去捕獲異常並做出相應的處理。

7、總結

線程池提供了四個構造方法,參數最全的構造方法參數按順序有:核心線程數,最大線程數,非核心線程閒置存活時間,存活時間單位,任務隊列,線程工廠,拒絕策略。

線程池共有五種狀態,分別是:RUNNING,SHUTDOWN,STOP,TYDYING,TERMINATED,它們與工做線程數量一同記錄在成員變量 ctl 中,其中高 3 位用於記錄狀態,低 29 位用於記錄工做線程數,實際使用中經過位運算去獲取。

線程池中任務線程以繼承了 AQS 的 Worker 類的實例形式存在。當添加任務時,會有四種狀況:核心線程不滿,優先建立核心線程;核心線程滿,優先添加任務隊列;核心線程與隊列都滿,建立非核心線程;線程和隊列都滿,則執行拒絕策略。

其中,拒絕策略分爲四類,默認的拒絕策略 AbortPolicy;調用者運行策略 CallerRunsPolicy;棄老策略 DiscardOldestPolicy;丟棄策略 DiscardPolicy。

線程池的中斷有兩個方法:shutdown()shutdownNow(),二者都會讓線程池再也不接受新任務,可是 shutdown()會等待當前與任務隊列中的任務執行完畢,而 shutdownNow()會直接中斷當前任務,忽略並刪除任務隊列中的任務。

線程池提供了beforeExecute()afterExecute()terminated()三個鉤子函數,其中,afterExecute()的入參含有拋出的異常,所以能夠藉由該方法處理線程池中線程拋出的異常。

相關文章
相關標籤/搜索