ThreadPoolExecutor 原理及源碼詳細分析

總覽線程池工做機制

新任務來到的時候,先去判斷核心線程池數量(corePoolSize)滿了沒有,若是沒有滿那麼建立一個 Worker 去執行這個任務。java

某一時刻發現核心線程池數量滿了,那麼這個任務就暫時放到 workQueue 中去了。後續只要線程池中的 Worker 空閒下來了(任務執行完了)就去 workQueue 中取任務執行。多線程

隨着任務不斷的增長,核心線程池中 Worker 的消費能力跟不上了,即 workQueue 中堆積的任務開始變得愈來愈多,某一時刻 workQueue 滿了。this

這個時候新來的任務就要去檢查線程池的容許建立的最大容量是多少了(maximumPoolSize),好比核心線程池數量限制爲 5 它滿了,隊列長度限制爲 10 也滿了,最大線程池數量限制爲 10,則容許再建立 5 個非核心的 Worker 去執行任務。spa

某一時刻 maximumPoolSize 也滿了,那麼新來的任務就將被採起對應的拒絕策略來執行了線程

這時候線程池逐漸閒下來了(好比到了半夜之類的用戶請求不多了),此時隊列裏面的任務可能已經被消費完了,或者是不多,即須要執行的任務遠遠小於線程池 Worker 數量,那麼最初建立的 10 個 Worker 中大部分的 Worker 都沒法取得任務執行了,這個時候就要將部分閒置的 Worker 釋放了好比說某個 Worker 在 keepAliveTime 的時間尚未獲取到任務執行,那麼就認爲它空閒時間知足釋放要求了,釋放對應的 Workerrest

須要注意的是在底層實現中 Worker,並無一個狀態標識他是核心仍是非核心的,在線程池這裏僅僅是一個數量的概念,即在釋放的時候會看狀況釋放某幾個 Worker 而無論他建立的時候是不是核心 Worker,若是此時已經沒有任務須要執行了,那麼釋放 Worker 後就能保證線程池中剩下的 worker 數量與 corePoolSize 一致。可是有一點須要注意的是若是將 allowCoreThreadTimeOut 設置爲 true 了後,核心線程池中的 Worker 也會釋放即最終 worker 數量可能爲 0,可是 allowCoreThreadTimeOut 通常是默認爲 false 的code

一些關鍵字段含義

// 線程池狀態,高 3 位爲線程池狀態,低 29 位爲線程池容量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
// 線程池最大容量爲 2^29 - 1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// 取出線程池的運行狀態
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 取出目前的線程數量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 得到 ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }    
複製代碼
線程池狀態 程序移位 高 3 位 低 29 位 說明
RUNNING -1 << 29 111 000... 運行狀態
SHUTDOWN 0 << 29 000 000... 關閉狀態
STOP 1 << 29 001 000... 中止狀態
TIDYING 2 << 29 010 000... 全部線程終止了後調用 terminated()
TERMINATED 3 << 29 011 000... terminated() 調用結束

其中高 3 位表示線程池目前處於什麼狀態,低 29 位用來表示線程池目前線程的數量cdn

構造方法

// 核心線程池數量
    private volatile int corePoolSize;
    // 最大線程池數量
    private volatile int maximumPoolSize;
    // 非核心線程空閒時間,超過該時間後會被回收
    private volatile long keepAliveTime;
    // 工做隊列,當核心線程池數量滿了後,新的任務會放入工做隊列中
    private final BlockingQueue<Runnable> workQueue;
    // 拒絕策略,當線程池沒法再執行新的任務的時候調用
    private volatile RejectedExecutionHandler handler;
    // 默認的拒絕策略
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
    
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
複製代碼

Worker

線程池中執行任務的線程,它會將 Runnable 封裝進去,當執行完了某一個任務後,它會繼續從隊列裏面取任務執行,能夠經過 prestartAllCoreThreads() 預先初始化建立好 corePoolSize 個數量的 Worker,後續任務來了就直接用建立好的 Worker 執行就好,或者懶加載的方式每當一個任務過來時候建立一個 Worker 去執行。blog

當池中 Worker 數量達到 corePoolSize 的時候,新來任務的那麼就直接添加到 workQueue 當中(注意這時候只是添加的 Runnable 沒有新的 Worker 產生,即 Worker 是用來執行任務的線程),這時線程池裏面全部的 Worker 只要誰閒下來了那麼就去工做隊列中取出任務來執行便可。隊列

當 workQueue 滿了後(意味着核心線程池裏面的 Worker 們忙不過來了,都在不停的執行任務都執行不完),那麼就會添加非核心的 Worker 去執行任務。

而後任務繼續過來達到 maximumPoolSize 限制的線程池最大容量後(核心 Worker 和非核心 Worker 所有都忙不過來了沒法處理新的任務了)這個時候就選擇對應的策略處理,拒絕新的任務,仍是將其忽略等等。

非核心 Worker 若是空閒時間超過了 keepAliveTime 那麼就會釋放這個 Worker,這裏須要注意的一點是,釋放的時候只是保證釋放一個 Worker 並不必定是釋放以前 addWorker(task, false) 的這個,即若是目前比較空閒的話,會釋放幾個空閒的 Worker 最終保證核心 Worker 數量與 corePoolSize 一致便可

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
        // 用以執行任務的線程
        final Thread thread;
        // 須要執行的任務
        Runnable firstTask;
        // 統計每一個 Worker 完成了多少任務
        volatile long completedTasks;
        
        Worker(Runnable firstTask) {
            // 新建立的 Worker 沒有執行是不容許中斷的
            // 後面會提到爲何 setState(-1); 就沒法中斷
            setState(-1);
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        
        public void run() {
            // 執行任務
            runWorker(this);
        }
}
複製代碼

這裏先大概介紹下 Worker 由於任務執行的時候須要用它,便於理解後續的分析,runWorker(this) 後面會詳細分析是如何運行,如何釋放非核心 Worker 等

執行線程池的任務

public void execute(Runnable command) {
        if (command == null)
                throw new NullPointerException();
        int c = ctl.get();
        // 檢測正在運行的 Worker 數量是否小於核心 Worker 數
        if (workerCountOf(c) < corePoolSize) {
            // 若是核心 worker 數沒有到 corePoolSize 那麼建立一個新的 Worker 來執行任務
            if (addWorker(command, true))
                return;
            // 若是核心 worker 添加失敗則再次獲取一下線程池狀態值
            // 爲何會失敗?addWorker() 中會說明
            c = ctl.get();
        }
        // 若是線程池處於運行狀態,而且任務可以放入 workQueue 中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 再次檢查線程池的狀態若是已經不處於活動狀態
            // 刪除剛剛放入隊列裏面的任務
            // 而且使用對應的拒絕策略拒絕任務
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 若是說在任務添加隊列成功後,沒有 Worker 了
            // 那麼就添加一個非核心的 Worker 用來取隊列裏面的任務來執行
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 雖然隊列已經滿了可是若是尚未達到最大線程數 maximum 那麼繼續添加 Worker
        // 若是達到 maximum 沒法添加了則採起對應的拒絕策略
        else if (!addWorker(command, false))
            reject(command);
    }
複製代碼

addWorker()

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            // 獲取線程池狀態
            int c = ctl.get();
            // 獲取線程池運行狀態
            int rs = runStateOf(c);
            // @1
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                // 獲取線程池 worker 數量
                int wc = workerCountOf(c);
                // 若是 worker 數量達到了最大線程池容許的數量則拒絕添加 worker
                // 不然如果添加核心線程 worker 的話不能超過 corePoolSize 核心線程數
                // 若不是核心線程的話則不能超過限制的 maximumPoolSize 最大線程池數
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 線程池中 worker 數量 + 1
                // 若是添加成功的話則跳出準備執行 worker 線程
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                // 檢查運行狀態若是和以前的狀態不一致那麼重試添加
                if (runStateOf(c) != rs)
                    continue retry;
                // 若是狀態一直而且添加失敗則說明同時有有不少線程進來那麼 CAS 循環重試直到成功
            }
        }
        
        // worker 啓動成功標識
        boolean workerStarted = false;
        // worker 添加成功標識
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 建立 Worker
            w = new Worker(firstTask);
            // 獲取對應的線程
            final Thread t = w.thread;
            // 線程不爲 Null 的話
            if (t != null) {
                // 重入鎖鎖住一個一個的添加而且啓動
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    // 再次檢查狀態
                    // 線程池處於運行狀態或者
                    // 線程池處於 shutdown 可是 firstTask 爲 Null
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 預檢查線程是否能夠啓動不能的話拋出異常
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        // 添加 worker
                        workers.add(w);
                        // 記錄 worker 數量而且標識爲添加成功
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // worker 添加成功那麼則將 worker 標識爲啓動成功
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 若是 worker 啓動失敗
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
複製代碼

這個方法主要的功能是添加核心、非核心 Worker 同時校驗其合法性,而且增長 ctl 中 worker 計數,若是添加成功的話就執行對應的 Worker 來處理任務,若是添加失敗的話則須要保證從 worker 池中移除而且 ctl 中 worker 統計數量 -1.

咱們來看下 @1:

  • 線程池此時的狀態爲 > SHUTDOWN 的狀態即 STOP、TIDYING、TERMINATED 不容許添加新的 Worker
  • 線程池此時的狀態爲 SHUTDOWN 可是卻添加了一個新的任務要來執行,即 firstTask != null 這種狀況是不容許的
  • 線程池此時的狀態爲 SHUTDOWN 添加了一個 Worker (firstTask == null) 可能須要用來執行隊列中尚未終止的任務,可是卻發現隊列裏面沒有任務了!(隊列裏面沒有任務了還添加新的 Worker 來作什麼都 SHUTDOWN 了,我都準備關閉了,因此添加失敗)

addWorkerFailed()

private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 從線程池 workers 中刪除對應的 worker
            if (w != null)
                workers.remove(w);
            // 線程池 worker 數量 -1
            decrementWorkerCount();
            // 嘗試去終止線程池後面再分析
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }
複製代碼

這個方法比較簡單,在 addWorker() 中分別對線程池 worker 數量+1 而後將 worker 放入池中這兩部,若是後者添加失敗的話就從 workers 中移除,而後保證 worker 數量 -1,而後去嘗試調用 tryTerminate() 終止線程池,這個方法後面再分析

runWorker()

Worker 添加成功而且成功啓動後就會調用 runWorker 了就是調用上面 Worker 的 run() 方法而後調用 runWorker()

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        // 取出 worker 的第一個任務準備執行
        Runnable task = w.firstTask;
        w.firstTask = null;
        // @1 unlock 容許中斷
        w.unlock(); // allow interrupts
        // 標識任務是否是被異常終止的默認是
        boolean completedAbruptly = true;
        try {
            // 執行添加的任務或者從隊列裏面取出任務不斷的執行
            while (task != null || (task = getTask()) != null) { // @2
                // lock 鎖住一個個的執行任務
                w.lock();
                // @3
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 執行以前能夠作一些事情留給子類調用
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 運行任務
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 執行任務以後能夠作一些事情留給子類調用
                        afterExecute(task, thrown);
                    }
                } finally {
                    // task 置位 null
                    task = null;
                    // 完成任務 + 1
                    w.completedTasks++;
                    // 解鎖
                    w.unlock();
                    // 開啓又一輪循環取任務執行等過程
                }
            }
            // 任務是正常結束的,即處理完了隊列裏面的全部任務
            completedAbruptly = false;
        } finally {
            // @4 處理退出邏輯
            processWorkerExit(w, completedAbruptly);
        }
    }

複製代碼

這個方法的主要功能是,用 Worker 去不斷的取 workQueue 裏面的任務來執行。

@1 處,爲何 unlock 就能夠容許中斷了呢,首先 unlock 保證這個 Worker 後續能夠成功的 lock 而後執行任務,其次就是在 shutdown() 的時候,會去中斷全部的 Worker 的時候會去嘗試獲取 Worker 的鎖,若是這裏釋放了那麼就能成功中斷,在後面 shutdown() 會詳細分析。

@2 處,關鍵就是全部的 Worker 都會經過 getTask() 去獲取任務執行,若是獲取到了那麼就執行後續操做,若是任務沒有呢?是否是就說明目前工做隊列裏面沒有任務了,Worker 們好像有點閒啊,若是非核心的 Worker 達到了 keepAliveTime 這個時間尚未任務作,是否是該被釋放了呢?這個操做就是 getTask() 和 processWorkerExit() 共同完成的,後面會詳細分析。

@3 處,若是線程池狀態 >= STOP 確保線程被正確中斷,不然的話清除中斷標記

@4 處 processWorkerExit() 的時候分析

getTask()

private Runnable getTask() {
        // 判斷是否 poll() 超時
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // @1
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // 若是線程池中的 worker 數量大於 corePoolSize 
            // 則說明可能須要淘汰一些空閒的線程
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            
            // @2
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 在 keepAliveTime 時間內是否獲取到了任務
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                // 獲取到了成功返回
                if (r != null)
                    return r;
                // 若是沒有獲取到設置 timedOut 爲 true
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
複製代碼

這個方法的主要功能是不斷的從 workQueue 中取出任務,若是在指定的 keepAliveTime 時間尚未取到任務的話則返回 null 須要考慮是否將這個外層沒有取到任務的 Worker 釋放了,由於他暫時沒有事作了。

@1 處,若是線程池狀態爲 SHUTDOWN 而且 workQueue 爲空,則返回爲 Null。若是線程池狀態 > SHUTDOWN 則返回 Null。同時執行 decrementWorkerCount 減小 ctl 中低 29 位表示的 workers 數量

@2 處,若是線程池 worker 數量大於 maximumPoolSize 或者當前線程池 worker 數量大於了 corePoolSize 而且其它 worker 在 keepAliveTime 都沒有獲取到任務,那麼返回 null 而且減小 workCount 數量

當 getTask() 返回爲 null 的時候 completedAbruptly = false; 表示任務是正常結束的,最終執行 processWorkerExit() 方法

processWorkerExit()

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 若是 runWorker() 上述操做是正常完成的即 completedAbruptly = false
        // 那麼在 getTask() 邏輯裏面已經進行了線程池 worker 數量 -1 的操做了
        // 這裏就不須要再次執行了
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 完成的任務數量 +1
            completedTaskCount += w.completedTasks;
            // 將空閒的 worker 從線程池中移除
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        
        // 嘗試終止線程池
        tryTerminate();

        int c = ctl.get();
        // 若是線程池狀態處於 RUNNING 和 SHUTDOWN
        if (runStateLessThan(c, STOP)) {
            // 不是異常結束的 runWorker()
            if (!completedAbruptly) {
                // 默認返回 corePoolSize
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                // 若是線程池 worker 數量已經大於了 corePoolSize
                // 那麼就直接返回
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 若是是異常結束的或者線程池 worker 數量沒有大於 corePoolSize
            // 那麼能夠再添加一個 worker 去執行任務
            addWorker(null, false);
        }
    }
複製代碼

這個方法的主要功能是若是一個 Worker 沒有事作了,那麼就能夠將其釋放了(主要是在線程池數量 > 核心線程池數量,而且隊列任務比較少執行 Worker 大都比較閒),在釋放的同時看下是否須要終止線程池。

shutdown()

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 將線程池狀態設置爲 SHUTDOWN
            advanceRunState(SHUTDOWN);
            // 而後去中斷全部的 worker
            interruptIdleWorkers();
            // 留給子類使用的 hook 關閉後作一些事情
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        // 嘗試終止
        tryTerminate();
    }
複製代碼

這個方法的主要功能是將線程池的狀態標記爲 SHUTDOWN,此時拒絕接收新的任務,可是若是發現工做隊列中還有任務,是能夠添加一個不攜帶任務的非核心的 Wroker 去將隊列裏面的任務執行完成的,而後去中斷全部的 worker

shutdownNow()

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 將線程池狀態設置爲 STOP
            advanceRunState(STOP);
            // 中斷全部的 worker
            interruptWorkers();
            // 拋棄 workQueue 裏面全部的任務
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        // 嘗試終止
        tryTerminate();
        return tasks;
    }
複製代碼

這個方法的主要功能是將線程池的狀態標記爲 STOP,此時拒絕接收新的任務,同時拋棄 workQueue 裏面的全部任務,而後去中斷全部的 worker

tryTerminate()

for (;;) {
            int c = ctl.get();
            // 只有在線程池狀態爲 SHUTDOWN 而且隊列不爲空的時候纔會繼續執行
            // 或者線程池狀態位 STOP 的時候纔會繼續執行
            // 不然的話就直接返回
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            // 若是線程池 worker 數量不爲 0 依次中斷對應的 worker
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
            
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 將線程池的狀態改成 TIDYING
                // 從這裏也能看出爲何上面要判斷只能線程池狀態位 SHUTDOWN 和 STOP
                // 才能執行該方法 SHUTDOWN < STOP < TIDYING
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // 而後調用 terminated 
                        terminated();
                    } finally {
                        // 最終將線程池狀態改成 TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // 不然的話 CAS 不斷循環處理
            // else retry on failed CAS
        }
複製代碼

從代碼能夠看到調用了 shutdown() 後線程池狀態從 RUNNING 變爲 SHUTDOWN,而後調用 tryTerminate() 若是 workQueue 不爲空的話表示還有任務沒有執行完那麼不能終止須要等待隊列裏面的任務執行完畢後才能終止。

若是調用了 shutdownNow() 後線程池狀態從 RUNNING 變爲 STOP,而且將 workQueue 裏面的任務所有移除了,最終線程池 worker 數量爲 0 了後調用 terminated() 後將線程池狀態設置爲 TERMINATED

總結下狀態的轉換主要爲

調用 shutdown() 後線程池狀態從 RUNNING -> SHUTDOWN,調用以後的隊列和線程池的任務都執行完成後那麼 SHUTDOWN -> TIDYING

調用 shuwdownNow() 後線程池狀態從 (RUNNING or SHUTDOWN) -> STOP,調用以後會拋棄隊列裏面等待執行的任務,而後等待線程池裏面的任務執行完成後 STOP -> TIDYING

terminated() 方法執行完畢後 TIDYING -> TERMINATED

可是若是說線程不能正常結束或者不能響應中斷那麼意味着 shutdown() 和 shutdownNow() 都沒法將線程池正常結束,以下。 調用 shutdownNow() 後

public class Test {

    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor =
            new ThreadPoolExecutor(5,
                10,
                10,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(5));
        fillThreadPoll(threadPoolExecutor, 5);
        System.out.println("此時核心線程數已經滿了");
        fillThreadPoll(threadPoolExecutor, 5);
        System.out.println("此時隊列已經已經滿了");
        fillThreadPoll(threadPoolExecutor, 5);
        System.out.println("此時最大線程數已經滿了");
        threadPoolExecutor.shutdownNow();
    }

    private static void fillThreadPoll(ThreadPoolExecutor threadPoolExecutor, int size) {
        for (int i = 0; i < size; i++) {
            threadPoolExecutor.execute(() -> {
                while (true) {
                }
            });
        }
    }
}
複製代碼

調整下代碼就能正常結束

private static void fillThreadPoll(ThreadPoolExecutor threadPoolExecutor, int size) {
        for (int i = 0; i < size; i++) {
            threadPoolExecutor.execute(() -> {
                while (!Thread.currentThread().isInterrupted()) {

                }
            });
        }
    }
複製代碼

可是調用 shutdown() 發現仍是沒法正常結束,由於他會去調用 interruptIdleWorkers(false);

private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                // 這裏沒法得到鎖
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
複製代碼

由於 w.tryLock() 沒法得到鎖,全部的 worker 都處於忙碌狀態,每個 worker 執行對應的任務,那個任務都沒有結束,鎖都沒有是釋放,因此 shutdown() 沒法結束。

而 shutdownNow() 能夠結束的緣由是它調用的是

private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }
    
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
複製代碼

能夠看到就算 Worker 正在執行中沒有釋放鎖也能夠直接對其進行中斷

拒絕任務

當ThreadPoolExecutor關閉、隊列和線程池飽和時,會拒絕新提交的任務,同時調用RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor)方法。

ThreadPoolExecutor預置瞭如下四種策略:

  • ThreadPoolExecutor.AbortPolicy,默認策略,在拒絕任務時,會拋出RejectedExecutionException。
  • ThreadPoolExecutor.CallerRunsPolicy,由提交的線程本身來執行(execute)當前提交的任務。這種策略提供了簡單的反饋控制機制,可以下降新的任務提交的速率。
  • ThreadPoolExecutor.DiscardPolicy,簡單粗暴的拋棄不能執行的任務。
  • ThreadPoolExecutor.DiscardOldestPolicy,若是ThreadPoolExecutor沒有被關閉,那麼刪除隊列頭部的任務,而且再次嘗試提交任務,若是仍然被拒絕,那麼再刪除隊列頭部任務,如此反覆。 能夠自定義RejectedExecutionHandler拒絕策略,可是要當心處理好策略生效時須要知足的條件,例如隊列和線程池大小等等。
相關文章
相關標籤/搜索