ThreadPoolExecutor源碼共讀

一塊兒共讀,共同閱步。java

零:序言

源碼閱讀本就是要貫注全神、戒驕戒躁的沉浸式過程。我本着浮躁至極的心態,單刀直入,從入口方法先殺入「敵軍」內部,讓你們短期內享受到最大的學習成就感,而後再橫向鋪開,帶你們一窺源碼的究竟。有不對之處,輕噴、指出。安全

java1.5引入的線程池的標準類,ThreadPoolExecutor。bash

咱們一般是經過Executors這個工廠類來建立具體的實例,如:多線程

Executors.newCachedThreadPool(...)
Executors.newScheduledThreadPool(...)
複製代碼

前者建立的就是咱們要講的ThreadPoolExecutor實例。後者是有延遲功能的線程池,ScheduledThreadPoolExecutor,有機會再講吧。ThreadPoolExecutor這個線程池實例,內部維護了一個線程的集合,用來存放線程;有一個存放待執行任務的隊列,在池內線程數達到最大值時,任務就暫時入隊,等待線程取走運行。因此,目前來看,ThreadPoolExecutor的結構以下: 函數

ThreadPoolExecutor結構圖.png

零點一:ThreadPoolExecutor源碼閱讀思惟導圖

我會先列一下該源碼涉及到的重要的邏輯方法,而後按使用時一般的調用順序,挨個講解,最後合併總結。oop

  • execute:執行任務方法,內部封裝了新建線程、任務入隊等重要邏輯
  • addWorker:新建線程方法
  • getTask:從任務隊列內獲取一個任務
  • runWorker:池內線程的主循環邏輯。提醒一下,多線程都會調用這同一個方法,因此尤爲注意同步問題。

零點五:ThreadPoolExecutor內的關鍵變量解釋

  • workQueue[BlockingQueue],任務隊列,是一個BlockingQueue對象,線程安全
  • ctl[Integer],記錄了線程池的運行狀態值跟池內的線程數
  • workers[HashSet<Worker>],具體存放線程的set對象
  • corePoolSize[volatile int],線程池核心線程數配置,低於這個數值時,新進來的任務一概以新啓動線程處理

一:線程池狀態ctl基礎知識準備

ThreadPoolExecutor實例表明一個線程池,相應地,這個池子就有一些運行狀態,以及池子內線程數量的配置。這二者的實現以下:學習

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
複製代碼

線程池狀態跟池內線程數的統計都記錄在ctl這個AtomicInteger(它是誰,先本身查吧。後面估計會專門寫下,這裏只要記住它的加減是線程安全的就好)中。具體實現是: java內一個整型量是32位,這裏AtomicInteger也是。源碼做者將ctl的32位中高3位用來記錄線程池狀態,低29位用來記錄線程數量。 驗證來看,COUNT_BITS的值是29,方法 runStateOf(int c) {return c & ~CAPACITY;} 這裏的c就是ctl變量,而CAPACITY就是一個mask面紗,用來從 ctl中提取上面兩個變量的,它是這樣的: ui

CAPACITY的位圖.png

因此,runState就是ctl取反後CAPACITY相與,也就是隻有高4位有效,正好對應線程池狀態的記錄位。 因此,各類狀態下,ctl的值以下: RUNNING:1001 x(28個),-1,最高位是符號位,這個<<位移操做是忽略符號位的位移 SHUTDOWN:0000 x(28), 0 STOP: 0001 x(28), 1 TIDYING: 0010 x(28), 2 TERMINATED: 0011 x(28), 3this

二:入口函數execute(Runnable command)

用過線程池的人應該都用過這個入口函數,它就是用來將一個Runnable任務體放入線程池,下面讓咱們來具體看看它的邏輯(代碼塊無法高亮了,你們看下代碼段中註釋的翻譯部分):atom

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         * 主要的邏輯註釋,在這
         * 1. If fewer than corePoolSize threads are running, try to
         * 若是當前池內線程數小於corePoolSize,那就嘗試去新建一
         * start a new thread with the given command as its first
         * 條線程,傳進來的command參數做爲該線程的第一個任務 
         * task.  The call to addWorker atomically checks runState and
         * 體。調用addWorker函數會自動檢查線程池的狀態和池內活躍的線程數
         * workerCount, and so prevents false alarms that would add
         * 若是在不應或不能新建線程時新建了,那不會拋出異常,會返回false
         * threads when it shouldn't, by returning false. * * 即便任務體成功入隊,咱們仍須要再去檢查一遍,咱們是否應該是新建線程而不是入隊, * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (由於可能自第一次檢查後,原池內活躍的線程 * (because existing ones died since last checking) or that * 又死掉啦)又或者線程池的狀態發生了變化 * the pool shut down since entry into this method. So we * 因此咱們去二次檢查池內狀態,若線程池狀態變爲中止,就去回滾,或者線程池此時一個活躍線程都沒有的話, * recheck state and if necessary roll back the enqueuing if * 新建一個線程去執行任務體。(PS:由於ThreadPoolExecutor * stopped, or start a new thread if there are none. * 主要是經過addWorker來建立線程,因此,若是池內一個活躍線程都沒有, * 這時咱們任務體入隊了,也沒有線程去跑...固然爲何只檢查一遍?我是想,可 * 能就只是做者單純地在這裏想檢查一遍,稍微確保下。由於即便這個二次檢查 * 沒問題,後續的,到池內線程確切地去跑這個任務體以前的代碼,每一行 * 代碼,都仍有發生這種狀況的可能。這,就是多線程...) * 3. If we cannot queue task, then we try to add a new * 若是咱們任務體入隊失敗,那我嘗試新建線程,若是還失敗 * 那就說明線程池已經被shutdown了,或者整個池子已經滿了,那咱們 * 就去拒絕這個任務體。這個拒絕,就會用到所謂的RejectPolicy對象 * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ // 獲取ctl對象 int c = ctl.get(); // 若是池內活躍線程數小於corePoolSize if (workerCountOf(c) < corePoolSize) { // 新建線程,第二個參數true能夠先忽略 if (addWorker(command, true)) return; // 新建線程失敗,那咱們獲取最新的線程池狀態變量ctl c = ctl.get(); } // 若是當前線程池仍在運行並且任務體入隊成功。 // (workQueue就是ThreadPoolExecutor具體的任務隊列。 // 而這裏就是咱們上面註釋提到的那段二次檢查的邏輯) if (isRunning(c) && workQueue.offer(command)) { // 二次檢查。獲取最新的線程池狀態字段 int recheck = ctl.get(); // 若是線程不在運行狀態 而且也成功把入隊的任務體刪除了 // 那就菜哦用拒絕策略來拒絕 if (! isRunning(recheck) && remove(command)) reject(command); // 或者,在線程池內活躍的線程數爲0時,新建一個線程 // 這裏傳參跟上面不同,先忽略。記錄這個新啓動一個線程就夠了 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 若是上面的If失敗了,就嘗試新啓動線程,啓動失敗了,那說明 // 上面的失敗,是isRunning形成的,因此拒絕任務體。啓動成功了,那就是成功了。 else if (!addWorker(command, false)) reject(command); } 複製代碼

這裏涉及到ThreadPoolExecutor線程池增長線程的一個判斷邏輯: 每當ThreadPoolExecutor.execute執行一個任務時,先判斷corePoolSize,當池內線程數小於這個時,直接新增線程,若大於這個,則向workQueue任務隊列入隊,隊列滿了時,則以maximumPoolSize爲界開始繼續新建線程,當超過maximumPoolSize時,就採用最後的RejectPolicy進行拒絕處理。

三: 函數addWorker(Runnable firstTask, boolean core)

這個函數主要邏輯是新啓動一個線程,firstTask是新啓動線程的第一個任務,能夠爲null,爲null時,就是單純地啓動一個線程,記得咱們以前在execute(Runnable command)方法中,在線程池內沒有有效線程時,調用firstTasknull的方法來啓動一條線程。 第二個參數core是用來辨別,啓動一個新線程時,是以corePoolSize這個線程數配置量來做爲限制,仍是以maximumPoolSize這個線程數配置量做爲限制。 看下源碼(邏輯主要放註釋裏了):

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            // 獲取到線程池的運行狀態
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 檢查任務隊列在它必要的時候才能爲空。這裏
            // 只能直接翻譯下,具體的,if裏的判斷邏輯也能推斷出來
            // 可是目前我也不能肯定說出在這種狀況下要false退出的
            // 緣由。若是想搞清它,可能只能完全把線程池的運行狀態、
            // 線程池內的線程數、任務隊列內的任務數三者全部可能的狀況的
            // 前提下才能肯定。這裏待大神指出來了。
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                // 第二個參數的做用就在這裏產生了!
                // 這裏在確保池內線程數不超過ctl極限CAPACITY
                // 以及不超過相應的xxxPoolSize的狀況下,經過
                // CAI操做去給線程數加1,成功了,則跳出retry標記後
                // 的循環。至於CAI是什麼?先記住它是線程安全的給數值+1的操做就好
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        // 到此位置,線程池內的線程數標記字段已經加1了
        // 接下來的,就是具體添加一個線程的操做了
        // 
        // 這裏就不可避免的涉及到了ThreadPoolSize中的
        // Worker這個內部類了,這個類就是具體的ThreadPoolSize
        // 內部用來表明一個線程的封裝對象,他封裝了一個線程實例
        // ,用來跑具體的任務;封裝了一個Runnable 實例,表明具體的任務;
        // 同時,它繼承、實現了AQS(AbstractQueuedSynchronizer)跟Runnable,因此,這個
        // Worker實例能夠理解成一個小勞工,有本身的運行線程,有
        // 本身的具體的執行任務體,同時,本身也有同步機制AQS。
        // 這裏涉及到AQS,你們夥就暫且理解成AQS賦予Worker同步的性質便可(調用AQS的方法就能實現)
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 初始化一個Worker勞工,同時指定給他的任務。這個任務
            // 能夠爲null,空。表示什麼也不作。同時,初始化的時候,也會
           // 初始化Worker體內的線程對象,這條線程的對象的啓動,是
           // 在worker對象的Runnable.run實現方法裏
            w = new Worker(firstTask);
            final Thread t = w.thread;
                // 這個mainLock是ThreadPoolExecutor用來同步對
                // workers線程隊列的CRUD操做的
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    // 當線程池處於RUNNING狀態,則能夠繼續操做;
                    // 或者當線程池處於SHUTDOWN,可是firstTask 爲null
                    // 也就是說,這裏是爲了增長一個線程的,因此,也能夠放行
                    // 由於SHUTDOWN狀態,是容許啓動線程將任務隊列內的任務跑完的
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 將新生成的線程勞工加入到線程集合中
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 若是線程勞工添加成功,則啓動它
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 若是線程啓動失敗,則運行回滾操做
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
複製代碼

四: Worker對象的線程start()

這裏講述的方法是接上面addWorker時,成功調用的t.start(),這裏啓動了Worker封裝的線程。這個線程是Worker構造函數裏生成的,以下:

/**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            // 這裏設置了AQS的state,用來抑制interrupt直到
            // runWorker方法
            setState(-1); // inhibit interrupts until runWorker
            // 這裏傳遞了線程的任務體,能夠爲null
            this.firstTask = firstTask;
            // 初始化線程時,給線程指定了worker實例自身這個Runnable,所以,線程在start後,
            // 就是在運行worker當前實例自身的run方法
            this.thread = getThreadFactory().newThread(this);
        }
複製代碼

看完上面代碼的註釋,接着看worker實例自身的run方法

/** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
複製代碼

能夠看到這裏調用了runWorker方法,傳參是worker自身。runWorker是同一個ThreadPoolExecutor實例的方法,因此,線程池實例下的全部Worker線程都是在跑這同一個runWorker方法。

五: 線程池的循環體方法runWorker(Worker worker)

/**
     * Main worker run loop.  Repeatedly gets tasks from queue and
     * executes them...
     */
final void runWorker(Worker w) {
        // 這裏獲取到線程對象,其實就是參數Worker對象內封裝的Thread對象。
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // allow interrupts,容許Interrupt打斷,記得Worker對象的構造函數嘛?
        // 構造函數一開始就調用了setState(-1)去抑制interrupt。這裏就是去釋放它。
        // 固然,這裏具體的抑制interrupt的含義,要結合AQS來了解了,我後面再加吧。
        w.unlock();
        boolean completedAbruptly = true;
        try {
            // 若是Worker中的firstTask對象不是空的,則
            // 直接跑它;若否則,調用getTask從隊列中獲取一條任務來執行。這裏
            // 會一直while循環,因此worker們在任務隊列中有任務時
            // 會一直在這個runWorker中循環while取任務執行
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                // 這裏有一個很巧妙的判斷邏輯,這段代碼也主要是實現了
                // 上面英文註釋中的邏輯,可是可能比較難理解,稍等
                // 在後面我會專門講一下
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 任務體的前置函數,如今默認是空函數
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 任務體的執行函數
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; 
                        throw new Error(x);
                    } finally {
                        // 後置函數
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            // 這個參數記錄worker運行是不是被打斷的,若是不是,代碼
            // 會安全地走到這裏,而後置字段爲false。
            // 不然,異常狀況下就直接跳到finally中了,值仍爲初始化時的true
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
複製代碼

這段源碼上方我放了一段註釋,翻譯過來就是: worker對象的主要Loop循環體。從隊列(workQueue)中獲取任務體,而後執行。

六: 從任務隊列獲取任務的getTask函數

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        // 這個for(;;)也是個無限循環的實現,它比while(true)的好處是
        // 在字節碼層面上,它比while(true)少兩行字節碼代碼,因此
        // 效率更高
        for (;;) {
            // 獲取線程池最新狀態
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            // allowCoreThreadTimeOut表示coreThread,實際上是判斷
            // 線程數在這個coreThreadPoolSize範圍內時,線程是否能夠超時。
            // 這裏的判斷邏輯也很巧妙,若是allowxxxTimeOut爲true,coreThread
            // 能夠超時,則 || 後面判斷coreThread的邏輯也就無所謂了,是吧。
            // 但若是allowxxxxTimeOut爲false,coreThread不容許超時,
            // 則須要去判斷在判斷的線程是否實在coreThread範圍內,是的話,
            // 則最終結果也爲false,符合coreThread不能超時的邏輯;若是大於,
            // 則說明當前方法的線程不是在coreThread,
            // 注意去理解這個是否是coreThread這個概念
            // 因此,timed爲true,也就是能夠超時,符合邏輯
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            // 這裏我把代碼格式化了下,方便你們去看
            // 這裏的判斷就是,在符合了一些邏輯後,就去直接
            // wokerCount減一,表明當前這個woker就直接幹掉了,
            // 而在方法內返回null這個邏輯,在調用getTask的代碼處
            // 確實也是去幹掉當前的worker實例。可是,woker不能
            // 瞎幹掉,必需要確保線程池能正常產生做用,這個正常做用
            // 的實現,要麼就是幹掉當前的worker還剩下至少一個,
            // 要麼就是任務隊列空了,這個邏輯就在(wc > 1 || workQueue.isEmpty)
            // 實現了。再來看 && 以前,在當前線程數大於
            // maximumPoolSize限制時,或者當前woker能夠超時,
            // 即timed爲true,同時,上一次獲取任務體時也超時了(timedOut)
            // 則,當前的worker就幹掉。這段邏輯有一個timedOut
            // 判斷,即上一次當前worker獲取任務體時就超時了。
            // 我猜想,加這個邏輯,可能就是純粹的統計學上的效率
            // 提升。固然,歡迎更多想法。
            //
            // 在符合上述條件後,CAS操做來減小workerCount數
            // 再返回null,去幹掉當前worker實例。
            if (
                (wc > maximumPoolSize || (timed && timedOut))
                &&
                 (wc > 1 || workQueue.isEmpty())
                ) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 根據當前的worker是否能夠超時,調用BlockingQueue
                // 的不一樣方法來獲取任務體。
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                // 獲取到任務體,則返回
                if (r != null)
                    return r;
                // 超時了,記錄標記位
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
複製代碼

七: 截止目前,全部的任務入隊、線程循環、取任務執行任務的邏輯就已經都看完了

這裏咱們總結下:

ThreadPoolExecutor的實際邏輯圖

ThreadPoolExecutor結構圖.png

workers線程集合中的Worker對象,在runWorker中循環自workQueue中獲取Runnable任務體進行執行。對workers線程集合的訪問要通過mainLock這個鎖。

shutdown等線程池結束等方法,後續會講。代碼講解中涉及一些跟線程同步相關的細碎的小邏輯,建議在理解了主要邏輯後,去重點理解,這些點的思想是頗有價值的思想。

相關文章
相關標籤/搜索