線程池 execute() 的工做邏輯

原文地址:https://www.xilidou.com/2018/02/09/thread-corepoolsize/java

最近在看《Java併發編程的藝術》回顧線程池的原理和參數的時候發現一個問題,若是 corePoolSize = 0 且 阻塞隊列是無界的。線程池將如何工做?編程

咱們先回顧一下書裏面描述線程池execute()工做的邏輯:微信

  1. 若是當前運行的線程,少於corePoolSize,則建立一個新的線程來執行任務。
  2. 若是運行的線程等於或多於 corePoolSize,將任務加入 BlockingQueue。
  3. 若是 BlockingQueue 內的任務超過上限,則建立新的線程來處理任務。
  4. 若是建立的線程數是單錢運行的線程超出 maximumPoolSize,任務將被拒絕策略拒絕。

看了這四個步驟,其實描述上是有一個漏洞的。若是核心線程數是0,阻塞隊列也是無界的,會怎樣?若是按照上文的邏輯,應該沒有線程會被運行,而後線程無限的增長到隊列裏面。而後呢?併發

因而我作了一下試驗看看到底會怎樣?oop

public class threadTest {
    private final static ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
    public static void main(String[] args) {
        AtomicInteger atomicInteger = new AtomicInteger();
        while (true) {
            executor.execute(() -> {
                System.out.println(atomicInteger.getAndAdd(1));
            });
        }
    }
}

結果裏面的System.out.println(atomicInteger.getAndAdd(1));語句執行了,與上面的描述矛盾了。到底發生了什麼?線程池建立線程的邏輯是什麼?咱們仍是從源碼來看看到底線程池的邏輯是什麼?學習

ctl

要了解線程池,咱們首先要了解的線程池裏面的狀態控制的參數 ctl。ui

  • 線程池的ctl是一個原子的 AtomicInteger。
  • 這個ctl包含兩個參數 :this

    • workerCount 激活的線程數
    • runState 當前線程池的狀態
  • 它的低29位用於存放當前的線程數, 所以一個線程池在理論上最大的線程數是 536870911; 高 3 位是用於表示當前線程池的狀態, 其中高三位的值和狀態對應以下:atom

    • 111: RUNNING
    • 000: SHUTDOWN
    • 001: STOP
    • 010: TIDYING
    • 110: TERMINATED

爲了可以使用 ctl 線程池提供了三個方法:spa

// Packing and unpacking ctl
    // 獲取線程池的狀態
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 獲取線程池的工做線程數
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // 根據工做線程數和線程池狀態獲取 ctl
    private static int ctlOf(int rs, int wc) { return rs | wc; }

execute

外界經過 execute 這個方法來向線程池提交任務。

先看代碼:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();

        //若是工做線程數小於核心線程數,
        if (workerCountOf(c) < corePoolSize) {
            //執行addWork,提交爲核心線程,提交成功return。提交失敗從新獲取ctl
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

        //若是工做線程數大於核心線程數,則檢查線程池狀態是不是正在運行,且將新線程向阻塞隊列提交。
        if (isRunning(c) && workQueue.offer(command)) {

            //recheck 須要再次檢查,主要目的是判斷加入到阻塞隊裏中的線程是否能夠被執行
            int recheck = ctl.get();

            //若是線程池狀態不爲running,將任務從阻塞隊列裏面移除,啓用拒絕策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 若是線程池的工做線程爲零,則調用addWoker提交任務
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

        //添加非核心線程失敗,拒絕
        else if (!addWorker(command, false))
            reject(command);
    }

addWorker

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            //獲取線程池狀態
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 判斷是否能夠添加任務。
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                //獲取工做線程數量
                int wc = workerCountOf(c);
                //是否大於線程池上限,是否大於核心線程數,或者最大線程數
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //CAS 增長工做線程數
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                //若是線程池狀態改變,回到開始從新來
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;

        //上面的邏輯是考慮是否可以添加線程,若是能夠就cas的增長工做線程數量
        //下面正式啓動線程
        try {
            //新建worker
            w = new Worker(firstTask);

            //獲取當前線程
            final Thread t = w.thread;
            if (t != null) {
                //獲取可重入鎖
                final ReentrantLock mainLock = this.mainLock;
                //鎖住
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    // rs < SHUTDOWN ==> 線程處於RUNNING狀態
                    // 或者線程處於SHUTDOWN狀態,且firstTask == null(多是workQueue中仍有未執行完成的任務,建立沒有初始任務的worker線程執行)
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 當前線程已經啓動,拋出異常
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //workers 是一個 HashSet 必須在 lock的狀況下操做。
                        workers.add(w);
                        int s = workers.size();
                        //設置 largeestPoolSize 標記workAdded
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //若是添加成功,啓動線程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            //啓動線程失敗,回滾。
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

先看看 addWork() 的兩個參數,第一個是須要提交的線程 Runnable firstTask,第二個參數是 boolean 類型,表示是否爲核心線程。

execute() 中有三處調用了 addWork() 咱們逐一分析。

  • 第一次,條件 if (workerCountOf(c) < corePoolSize) 這個很好理解,工做線程數少於核心線程數,提交任務。因此 addWorker(command, true)
  • 第二次,若是 workerCountOf(recheck) == 0 若是worker的數量爲0,那就 addWorker(null,false)。爲何這裏是 null ?以前已經把 command 提交到阻塞隊列了 workQueue.offer(command) 。因此提交一個空線程,直接從阻塞隊列裏面取就能夠了。
  • 第三次,若是線程池沒有 RUNNING 或者 offer 阻塞隊列失敗,addWorker(command,false),很好理解,對應的就是,阻塞隊列滿了,將任務提交到,非核心線程池。與最大線程池比較。

至此,從新概括execute()的邏輯應該是:

  1. 若是當前運行的線程,少於corePoolSize,則建立一個新的線程來執行任務。
  2. 若是運行的線程等於或多於 corePoolSize,將任務加入 BlockingQueue。
  3. 若是加入 BlockingQueue 成功,須要二次檢查線程池的狀態若是線程池沒有處於 Running,則從 BlockingQueue 移除任務,啓動拒絕策略。
  4. 若是線程池處於 Running狀態,則檢查工做線程(worker)是否爲0。若是爲0,則建立新的線程來處理任務。若是啓動線程數大於maximumPoolSize,任務將被拒絕策略拒絕。
  5. 若是加入 BlockingQueue 。失敗,則建立新的線程來處理任務。
  6. 若是啓動線程數大於maximumPoolSize,任務將被拒絕策略拒絕。

總結

回顧我開始提出的問題:

若是 corePoolSize = 0 且 阻塞隊列是無界的。線程池將如何工做?

這個問題應該就不難回答了。

最後

《Java併發編程的藝術》是一本學習 java 併發編程的好書,在這裏推薦給你們。

同時,但願你們在閱讀技術數據的時候要仔細思考,結合源碼,發現,提出問題,解決問題。這樣的學習才能高效且透徹。

歡迎關注個人微信公衆號

二維碼

相關文章
相關標籤/搜索