線程池-線程池源碼詳解

在ThreadPoolExecutor的屬性定義中頻繁地用位移運算來表示線程池狀態,位移運算是改變當前值的一種高效手段,包括左移和右移。下面從屬性定義開始閱讀ThreadPoolExecutor的源碼。併發

 1 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 2     //Integer 共有32位,最右邊29位表示工做線程數,最左邊3位表示線程池狀態。
 3     //注:簡單的說,3個二進制位能夠表示從0-7的8個不一樣的數值(第1處)
 4     private static final int COUNT_BITS = Integer.SIZE - 3;
 5     //000-11111111111111111111111111111,相似於子網掩碼,用於位的與運算
 6     //獲得最左邊的3位,仍是右邊的29位
 7     private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
 8 
 9     // runState is stored in the high-order bits
10     //用左邊3位,實現5種線程池狀態(在左邊3位以後加入中畫線有助於理解)
11     //111-00000000000000000000000000000,十進制值:-563,870,912
12     //此狀態即是線程池可以接受的新任務
13     private static final int RUNNING    = -1 << COUNT_BITS;
14 
15     //000-00000000000000000000000000000,十進制值:0
16     //此狀態再也不接受新任務,但能夠繼續執行隊列中的任務
17     private static final int SHUTDOWN   =  0 << COUNT_BITS;
18 
19     //001-00000000000000000000000000000,十進制值:563,870,912
20     //此狀態全面拒絕,並終止正在處理的任務
21     private static final int STOP       =  1 << COUNT_BITS;
22 
23     //010-00000000000000000000000000000,十進制值:1073,741,824
24     //此狀態表示全部的任務已經被終止
25     private static final int TIDYING    =  2 << COUNT_BITS;
26 
27     //011-00000000000000000000000000000
28     //此狀態表示已清理完現場
29     private static final int TERMINATED =  3 << COUNT_BITS;
30 
31     // Packing and unpacking ctl
32     // 與運算,好比 001-00000000000000000000000100011,表示67個工做線程
33     // 掩碼取反    111-00000000000000000000000000000,即獲得最左邊3位 001
34     // 表示線程處於stop 狀態
35     private static int runStateOf(int c)     { return c & ~CAPACITY; }
36     //同理掩碼 000-11111111111111111111111111111,獲得右邊29位,即工做線程數
37     private static int workerCountOf(int c)  { return c & CAPACITY; }
38     //把左邊3位與右邊29位按或運算,合併成一個值
39     private static int ctlOf(int rs, int wc) { return rs | wc; }

第一處說明,線程池狀態用高3位表示,其中包括了符號位。五種狀態的十進制值按小道大依次排序爲:oop

RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED源碼分析

這樣設計的好處是能夠經過比較值的大小來肯定線程池的狀態,例如程序中常常出現isRuning的判斷。ui

   private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

咱們都知道Exexutor 接口有且只有一個方法execute,經過參數傳入待執行線程對象。下面分析ThreadPoolExecutor關於execute的實現。this

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        //返回包含線程數及線程池狀態的Integer類型數值
        int c = ctl.get();
        //若是工做線程數小於核心線程數,則建立線程任務並執行
        if (workerCountOf(c) < corePoolSize) {
            //addWorker 是另外一個極爲重要的方法,見下一段源碼分析(第1處)
            if (addWorker(command, true))
                return;
            //若是建立失敗,防止外部已經在線程池中加入新任務,從新獲取一下
            c = ctl.get();
        }
        // 只有線程池處於RUNNING 狀態,才執行後半句:置入隊列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 若是線程池不是 RUNNING 狀態 則將剛加入隊列的任務移除
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //若是以前的線程已經被消費完,新建一個線程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //核心池和隊列都已經滿,嘗試建立一個新線程
        else if (!addWorker(command, false))
            //若是addWorker返回false,即建立失敗,則喚醒拒絕策略(第2處)
            reject(command);
    }

第2處 發生拒絕的緣由有兩個(1)線程池狀態非Runing (2)等待隊列已滿。spa

下面繼續分析addWorker 源碼。線程

/**
     * 根據當前線程池狀態,檢查是否能夠添加新的任務線程,若是能夠則建立並啓動任務
     * 若是一切正常則返回true。返回false 的可能以下:
     * 1.線程池沒有處於RUNNING狀態
     * 2.線程工程建立新的任務線程失敗
     * @param firstTask 外部啓動線程池時須要構造的第一個線程,它是線程的母體
     * @param core 新增工做線程時的判斷指標,解釋以下
     *             true:表示新增線程時,須要判斷當前RUNNING 狀態的線程是否少於corePoolsize
     *             false:表示新增線程時,須要判斷當前RUNNING 狀態的線程是否少於maxmemPoolsize
     * @return
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        //不須要任務預約義的語法標籤,響應下文的continue retry.快速退出多層嵌套循環(第1處)
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c); // 當前線程池狀態

            // Check if queue empty only if necessary.
            // 這條語句等價:rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null ||workQueue.isEmpty())
            // 知足下列條件則直接返回false,線程建立失敗:
            // rs > SHUTDOWN:STOP || TIDYING || TERMINATED 此時再也不接受新的任務,且全部任務執行結束
            // rs = SHUTDOWN:firtTask != null 此時再也不接受任務,可是仍然會執行隊列中的任務
            // rs = SHUTDOWN:firtTask == null見execute方法的addWorker(null, false),任務爲null && 隊列爲空
            // 最後一種狀況也就是說SHUTDONW狀態下,若是隊列不爲空還得接着往下執行,爲何?add一個null任務目的究竟是什麼?
            // 看execute方法只有workCount==0的時候firstTask纔會爲null結合這裏的條件就是線程池SHUTDOWN了再也不接受新任務
            // 可是此時隊列不爲空,那麼還得建立線程把任務給執行完才行。

            //第2處
            if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                            firstTask == null &&
                            ! workQueue.isEmpty()))
                return false;

            // 走到這的情形:
            // 1.線程池狀態爲RUNNING
            // 2.SHUTDOWN狀態,但隊列中還有任務須要執行
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize)) // 最大線程數不能超過2^29,不然影響左邊3位的線程池狀態值
                    return false;
                //(第3處)
                if (compareAndIncrementWorkerCount(c)) //當前活動線程數+1(第3處) 原子操做遞增workCount
                    break retry; //操做成功跳出重試循環
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs) //若是線程池的狀態發生變化則重試(第5處)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        //開始建立工做線程
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //利用worker構造方法中的線程池工廠建立線程,並封裝成工做線程worker對象
            w = new Worker(firstTask);
            //注意這是worker 中屬性對象thread (第6處)
            final Thread t = w.thread;
            if (t != null) {
                //對ThreadExecutor 的敏感操做時,都須要持有主鎖,避免在添加和啓動線程時被幹擾。
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        //整個線程池在運行期間的最大併發線程數
                        if (s > largestPoolSize)
                            largestPoolSize = s;// 更新largestPoolSize
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 啓動新添加的線程,這個線程首先執行firstTask,而後不停的從隊列中取任務執行
                // 當等待keepAlieTime尚未任務執行則該線程結束。見runWoker和getTask方法的代碼。
                if (workerAdded) {
                    t.start(); //最終執行的是ThreadPoolExecutor的runWoker方法 並不是線程池execute 的command參數指向的線程
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
相關文章
相關標籤/搜索