ThreadPoolExecutor 核心源碼解析

本文只介紹 ThreadPoolExecutor 源碼的關鍵部分,開篇會先介紹 ThreadPoolExecutor 中的一些核心常量定義,而後選取線程池工做週期中的幾個關鍵方法分析其源碼實現。其實,看 JDK 源碼的最好途徑就是看類文件註釋,做者把想說的全都寫在裏面了。java

一些重要的常量

ThreadPoolExecutor 內部做者採用了一個 32bitint 值來表示線程池的運行狀態(runState)和當前線程池中的線程數目(workerCount),這個變量取名叫 ctlcontrol 的縮寫),其中高 3bit 表示容許狀態,低 29bit表示線程數目(最多容許 2^29 - 1 個線程)。程序員

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3; // 29 位
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 線程池最大容量

    // runState is stored in the high-order bits
	// 定義的線程池狀態常量
	// 111+29個0,值爲 -4 + 2 + 1 = -1(不懂的面壁)
    private static final int RUNNING    = -1 << COUNT_BITS; 
	// 000+29個0
    private static final int SHUTDOWN   =  0 << COUNT_BITS; 
	// 001+29個0
    private static final int STOP       =  1 << COUNT_BITS; 
	// 010+29個0
    private static final int TIDYING    =  2 << COUNT_BITS; 
	// 011+29個0
    private static final int TERMINATED =  3 << COUNT_BITS; 

    // Packing and unpacking ctl
    private static int runStateOf(int c) { return c & ~CAPACITY; } // 獲得線程池狀態
    private static int workerCountOf(int c) { return c & CAPACITY; } // 獲得線程池線程數
    private static int ctlOf(int rs, int wc) { return rs | wc; } // 反向構造 ctl 的值
複製代碼

由於表明線程池狀態的常量能夠經過值的大小來表示前後關係(order),所以後續源碼中會有:併發

rs >= SHUTDOWN // 那就表示SHUTDOWN、 STOP or TIDYING or TERMINATED,反正不是 RUNNING
複製代碼

理解上述的常量意義有助於後面理解源碼。異步

討論線程池的狀態轉換

從第一節咱們已經知道了線程池分爲五個狀態,下面咱們聊聊這五個狀態分別限制了線程池能執行怎樣的行爲:函數

  1. RUNNING:能夠接受新任務,且執行 Queue 中的任務
  2. SHUTDOWN:再也不接受新的任務,但能繼續執行 Queue 中已有的任務
  3. STOP:再也不接受新的任務,且也再也不執行 Queue 中已有的任務
  4. TIDYING:全部任務完成,workCount=0,線程池狀態轉爲 TIDYING 且會執行 hook method,即 terminated()
  5. TERMINATED:``hook method terminated() 執行完畢以後進入的狀態

線程池的關鍵邏輯

上圖總結了 ThreadPoolExecutor 源碼中的關鍵性步驟,正好對應咱們這次解析的核心源碼(上圖出處見水印)。oop

  1. execute 方法用來向線程池提交 task,這是用戶使用線程池的第一步。若是線程池內未達到 corePoolSize 則新建一個線程,將該 task 設置爲這個線程的 firstTask,而後加入 workerSet 等待調度,這步須要獲取全局鎖 mainLock
  2. 已達到 corePoolSize 後,將 task 放入阻塞隊列
  3. 若阻塞隊列放不下,則新建新的線程來處理,這一步也須要獲取全局鎖 mainLock
  4. 當前線程池 workerCount 超出 maxPoolSize 後用 rejectHandler 來處理

咱們能夠看到,線程池的設計使得在 2 步驟時避免了使用全局鎖,只須要塞進隊列返回等待異步調度就能夠,僅剩下 13 建立線程時須要獲取全局鎖,這有利於線程池的效率提高,由於一個線程池老是大部分時間在步驟 2 上,不然這線程池也沒什麼存在的意義。源碼分析

源碼分析

本文只分析 executeaddWorkerrunWorker,三個核心方法和一個 Worker 類,看懂了這幾個,其實其餘的代碼都能看懂。ui

Worker 類

// 繼承自 AQS 實現簡單的鎖控制
 	private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
        // worker 運行所在的線程
        final Thread thread;
        // 賦予該線程的第一個 task,多是 null,若是不是 null 就運行這個,
		// 若是是 null 就經過 getTask 方法去 Queue 裏取任務
        Runnable firstTask;
        // 線程完成的任務數量
        volatile long completedTasks;

        Worker(Runnable firstTask) {
		// 限制線程直到 runWorker 方法前都不容許被打斷
            setState(-1); 
            this.firstTask = firstTask;
			// 線程工廠建立線程
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker */
        public void run() {
			// 線程內部的 run 方法調用了 runWorker 方法
            runWorker(this);
        }
	}
複製代碼

execute 方法

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
		// 若是當前線程數小於 corePoolSize
        if (workerCountOf(c) < corePoolSize) {
		// 調用 addWorker 方法新建線程,若是新建成功返回 true,那麼 execute 方法結束
            if (addWorker(command, true))
                return;
			// 這裏意味着 addWorker 失敗,向下執行,由於 addWorker 可能改變 ctl 的值,
			// 因此這裏從新獲取下 ctl
            c = ctl.get();
        }
		
		// 到這步要麼是 corePoolSize 滿了,要麼是 addWorker 失敗了
		// 前者很好理解,後者爲何會失敗呢?addWorker 中會講
		
		// 若是線程池狀態爲 RUNNING 且 task 插入 Queue 成功
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
			// 若是已不處於 RUNNING 狀態,那麼刪除已經入隊的 task,而後執行拒絕策略
			// 這裏主要是擔憂併發場景下有別的線程改變了線程池狀態,因此 double-check 下
            if (! isRunning(recheck) && remove(command))
                reject(command);
			// 這個分支有點難以理解,意爲若是當前 workerCount=0 的話就建立一個線程
			// 那爲何方法開頭的那個 addWorker(command, true) 會返回 false 呢,其實
			// 這裏有個場景就是 newCachedThreadPool,corePoolSize=0,maxPoolSize=MAX 的場景,
			// 就會進到這個分支,以 maxPoolSize 爲界建立臨時線程,firstTask=null
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
		// 這個分支很好理解,workQueue 滿了那麼要根據 maxPoolSize 建立線程了
		// 若是無法建立說明 maxPoolSize 滿了,執行拒絕策略
        else if (!addWorker(command, false))
            reject(command);
    }
複製代碼

addWorker 方法

// core 表示以 corePoolSize 仍是 maxPoolSize 爲界
	private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 看看 addWorker 何時返回 false
			// 這裏的 if 邏輯有點難懂,用下數學上的分配率,將第一個邏輯表達式放進括號裏就好懂了
			// 一、rs >= SHUTDOWN && rs != SHUTDOWN 其實就表示當線程池狀態是 STOP、TIDYING, 或 TERMINATED 的時候,固然不能添加 worker 了,任務都不執行了還想加 worker?
			// 二、rs >= SHUTDOWN && firstTask != null 表示當提交一個非空任務,但線程池狀態已經不是 RUNNING 的時候,固然也不能 addWorker,由於你最多隻能執行完 Queue 中已有的任務
			// 三、rs >= SHUTDOWN && workQueue.isEmpty() 若是 Queue 已經空了,那麼不容許新增
			// 須要注意的是,若是 rs=SHUTDOWN && firstTask=null 或者 rs=SHUTDOWN && workQueue 非空的狀況下,仍是能夠新增 worker 的,須要建立臨時線程處理 Queue 裏的任務
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
				// 這裏也是一個返回 false 的狀況,但很簡單,就是數目溢出了
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
				// CAS 成功了,就跳出 loop
                if (compareAndIncrementWorkerCount(c))
                    break retry;
				// CAS 失敗的話,check 下目前線程池狀態,若是發生改變就回到外層 loop 再來一遍,這個也好理解,不然單純 CAS 失敗可是線程池狀態不變的話,就只要繼續內層 loop 就好了
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
				// 這是全局鎖,必須持有才能進行 addWorker 操做
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
					// 啓動線程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
複製代碼

runWorker 方法

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
			// 循環直至 task = null,多是因爲線程池關閉、等待超時等
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // 下面這個 if 邏輯沒怎麼讀懂。。。翻譯了下注釋
				// 若是線程池中止,確保線程中斷;
				// 若是沒有,確保線程不中斷。這須要在第二種狀況下進行從新獲取ctl,以便在清除中斷時處理shutdownNow競爭
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
					// 前置鉤子函數,能夠自定義
                    beforeExecute(wt, task); 
                    Throwable thrown = null;
                    try {
						// 運行 run 方法
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
						// 線程的 run 不容許拋出 Throwable,因此轉換爲 Error 
                        thrown = x; throw new Error(x);
                    } finally {
					// 後置鉤子函數,也能夠自定義
                        afterExecute(task, thrown);
                    }
                } finally {
					// 獲取下一個任務
                    task = null;
					// 增長完成的任務數目
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
複製代碼

總結

看完 ThreadPoolExecutor 的源碼,不得不驚歎於代碼寫得真優雅,可是正由於寫的太簡潔優雅甚至找不到一句囉嗦的代碼,因此讓人有點難懂。看源碼的建議是先仔細閱讀一遍類註釋,而後再配合 debug,理清關鍵性的步驟在作什麼,有些 corner case 夾雜在主邏輯裏面,若是一開始看不懂能夠直接略過,過後再來反思。this

寫在最後

這是一個不定時更新的、披着程序員外衣的文青小號。既分享極客技術,也記錄人間煙火。spa

相關文章
相關標籤/搜索