閱讀 JDK 源碼:線程池 ThreadPoolExecutor

上一篇文章介紹了 Thread 類,可知線程隨着任務的執行結束而被銷燬。可是,因爲線程的建立與銷燬操做涉及到系統調用,開銷較大,所以須要將線程的生命週期與任務進行解耦。使用線程池來管理線程,能夠有效地重複利用線程來執行任務。本文將介紹線程池最基礎的實現類 ThreadPoolExecutor。java

本文基於 jdk1.8.0_91

1. 線程池體系

Executor體系

類型      名稱                             描述

接口      Executor                        最上層的接口,提供了任務提交的基礎方法
接口      ExecutorService                 繼承了 Executor 接口,擴展了提交任務、獲取異步任務執行結果、線程池銷燬等方法
接口      ScheduledExecutorService        繼承了 ExecutorService 接口,增長了延遲執行任務、定時執行任務的方法
抽象類    AbstractExecutorService         提供了 ExecutorService 接口的默認實現,提供 newTaskFor 方法將任務轉換爲 RunnableFuture,以便提交給 Executor 執行
實現類    ThreadPoolExecutor              基礎、標準的線程池實現
實現類    ScheduledThreadPoolExecutor     繼承了 ThreadPoolExecutor,實現了 ScheduledExecutorService 中相關延遲任務、定時任務的方法
實現類    ForkJoinPool                    JDK7 加入的線程池,是 Fork/Join 框架的核心實現,只容許執行 ForkJoinTask 任務
普通類    Executors                       建立各類線程池的工具類

線程池能夠解決兩個問題:編程

  • 減小系統由於頻繁建立和銷燬線程所帶來的開銷。
  • 自動管理線程和分配資源,使用方只需提交任務便可。

2. 構造方法

JDK 中建議使用較爲方便的 Executors 工廠方法,它們均爲大多數使用場景預約義了設置:segmentfault

  • Executors.newCachedThreadPool():無界線程池,能夠進行自動線程回收
  • Executors.newFixedThreadPool(int):固定大小的線程池
  • Executors.newSingleThreadExecutor():單個後臺線程的線程池
  • Executors.newScheduledThreadPool():執行定時任務的線程池
  • Executors.newWorkStealingPool(int):支持並行執行的線程池

阿里 Java 開發手冊 對線程池的使用進行了限制,可做參考:安全

【強制】線程資源必須經過線程池提供,不容許在應用中自行顯式建立線程。
說明:使用線程池的好處是減小在建立和銷燬線程上所花的時間以及系統資源的開銷,解決資源不足的問題。若是不使用線程池,有可能形成系統建立大量同類線程而致使消耗完內存或者「過分切換」的問題。多線程

【強制】線程池不容許使用Executors去建立,而是經過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同窗更加明確線程池的運行規則,規避資源耗盡的風險。
說明:Executors返回的線程池對象的弊端以下:
1)FixedThreadPool和SingleThreadPool:容許的請求隊列長度爲Integer.MAX_VALUE,可能會堆積大量的請求,從而致使OOM。
2)CachedThreadPool和ScheduledThreadPool:容許的建立線程數量爲Integer.MAX_VALUE,可能會建立大量的線程,從而致使OOM。併發

2.1 源碼分析

Executors 內部都是調用 ThreadPoolExecutor 的構造方法來實現的:框架

/**
 * Creates a new {@code ThreadPoolExecutor} with the given initial
 * parameters.
 *
 * @param corePoolSize    核心線程數量
 * @param maximumPoolSize 總的線程數量
 * @param keepAliveTime   空閒線程的存活時間
 * @param unit            keepAliveTime的單位
 * @param workQueue       任務隊列, 保存已經提交但還沒有被執行的線程
 * @param threadFactory   線程工廠(用於指定如何建立一個線程)
 * @param handler         拒絕策略 (當任務太多致使工做隊列滿時的處理策略)
 */
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                          BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

2.2 使用說明

Java 官方對 ThreadPoolExecutor 的使用說明:less

2.2.1 Core and maximum pool sizes

ThreadPoolExecutor 將根據 corePoolSize 和 maximumPoolSize 自動調整線程池中的線程數量,以及判斷任務是否進行排隊。異步

當提交新任務時:ide

  • 若是運行的線程少於 corePoolSize,則建立新線程來處理請求,即便存在工做線程是空閒的,也不會進行排隊。
  • 若是運行的線程多於 corePoolSize 而少於 maximumPoolSize:

    • 若是隊列未滿,則會把新提交的任務加入隊列,不建立新的線程。
    • 若是隊列已滿,而且運行線程數小於 maximumPoolSize,也會建立新的線程來執行任務。
    • 若是設置的 corePoolSize 和 maximumPoolSize 相同,則建立了固定大小的線程池。
  • 若是線程數大於 maximumPoolSize,新提交的任務將會根據拒絕策略來處理。

    • 若是將 maximumPoolSize 設置爲基本的無界值(如 Integer.MAX_VALUE),則容許池適應任意數量的併發任務。

在大多數狀況下,corePoolSize 和 maximumPoolSize 是經過構造函數來設置的,不過也可使用 ThreadPoolExecutor 的 setCorePoolSize(int) 和 setMaximumPoolSize(int) 進行動態更改。注意,核心線程和非核心線程只是一種邏輯上的區分,線程池中只有一種類型的線程,稱爲工做線程(Worker)。

2.2.2 On-demand construction(按需構造)

默認狀況下,核心線程只是在新任務到達時才建立和啓動的。
可使用方法 prestartCoreThread() 或 prestartAllCoreThreads() 對其進行動態重寫。
通常在構造帶有非空隊列的線程池時,會但願先啓動線程。

2.2.3 Creating new threads

使用 ThreadFactory 來建立新線程。
在 Executors 中默認使用 Executors.defaultThreadFactory() 建立線程,而且這些線程具備相同的分組和優先級,且都是非守護線程。
若是 ThreadFactory 建立新線程失敗,此時線程池能夠繼續運行,但不能執行任何任務。

2.2.4 Keep-alive times

默認狀況下,保持活動策略只應用在非核心線程上,即當線程的空閒時間超過 keepAliveTime 時將會終止。
若使用 allowCoreThreadTimeOut(boolean) 方法則會把保活策略也應用於核心線程。
當線程池處於非活躍狀態時,能夠減小資源消耗。若是線程從新變得活躍,則會建立新的線程。
可使用方法 setKeepAliveTime(time, timeUnit) 動態地更改此參數,若入參使用 Long.MAX_VALUE 和 TimeUnit.NANOSECONDS 則不會終止空閒線程,除非線程池關閉。

2.2.5 Queuing

全部 BlockingQueue 均可用於傳輸和保持提交的任務。當線程池中的線程數量大於 corePoolSize 而少於 maximumPoolSize 時,新任務會加入同步隊列。

排隊有三種通用策略:

  • 直接提交(Direct handoffs)

    • 默認使用傳遞隊列 SynchronousQueue,該隊列會將任務直接傳遞給線程,而不會保存任務。
    • 若是當前沒有可用線程,則會建立新的線程。一般須要無界的最大線程數(maximumPoolSize)以免拒絕新提交的任務。
    • 極端狀況下會由於建立過多線程而耗盡 CPU 和內存資源。
  • 無界隊列(Unbounded queues)

    • 使用無界隊列(如 LinkedBlockingQueue)做爲等待隊列,當全部的核心線程都在處理任務時,新提交的任務都會進入隊列等待。
    • 線程數量不會超過 corePoolSize,也就是 maximumPoolSize 的值無效。
    • 當每一個任務徹底獨立於其餘任務,即任務執行互不影響時,適合於使用無界隊列。
    • 極端狀況下會由於存儲過多任務而耗盡內存資源。
  • 有界隊列(Bounded queues)

    • 使用有界隊列(如 ArrayBlockingQueue)做爲等待隊列,有助於防止資源耗盡,可是可能較難調整和控制隊列大小和線程池大小(maximumPoolSize)。
    • 使用大的隊列和小的線程數能夠減小 CPU 使用率、系統資源和上下文切換的開銷,可是會致使吞吐量變低。
    • 使用小的隊列一般須要更多的線程數,這樣能夠最大化 CPU 使用率,但可能會須要更大的調度開銷,從而下降吞吐量。

2.2.6 Rejected tasks(拒絕策略)

當線程池已關閉,或者線程池中的線程數量和隊列容量已飽和時,繼續提交新任務會被拒絕,會觸發 RejectedExecutionHandler#rejectedExecution 方法。

ThreadPoolExecutor 定義了四種拒絕策略:

  • AbortPolicy:默認策略,在須要拒絕任務時拋出 RejectedExecutionException;
  • CallerRunsPolicy:由提交任務的線程自行執行任務,以此減緩新任務的提交速度。若是線程池已經關閉,任務將被丟棄;
  • DiscardPolicy:直接丟棄任務;
  • DiscardOldestPolicy:丟棄在隊列中等待時間最長的任務,並執行當前提交的任務,若是線程池已經關閉,任務將被丟棄。

能夠自定義 RejectedExecutionHandler 類來實現拒絕策略。須要注意的是,拒絕策略的運行須要指定線程池和隊列的容量。

2.2.7 Hook methods(鉤子方法)

ThreadPoolExecutor 中提供 beforeExecute 和 afterExecute 方法,在執行每一個任務以前和以後調用。提供 terminated 方法,在線程池關閉以前作一些收尾工做。
若是鉤子方法拋出異常,則內部工做線程將依次失敗並終止。

2.2.8 Queue maintenance(隊列維護)

方法 getQueue() 容許出於監控和調試目的而訪問工做隊列。強烈反對出於其餘任何目的而使用此方法。
remove() 和 purge() 這兩種方法可用於在取消大量已排隊任務時幫助進行存儲回收。

2.2.9 Finalization(終止)

當線程池的引用變爲不可達,而且線程池中沒有遺留的線程(經過設置 allowCoreThreadTimeOut 把非活動的核心線程銷燬),此時線程池會自動 shutdown。

3. 屬性

3.1 線程池狀態

ThreadPoolExecutor 中使用一個 AtomicInteger 類型的變量 ctl 來管理線程池。
其中,低 29 位保存線程數,高 3 位保存線程池狀態。
線程池中最大的線程數爲 2^29-1。

/**
 * The main pool control state, ctl, is an atomic integer packing
 * two conceptual fields
 *   workerCount, indicating the effective number of threads      // 工做線程數量
 *   runState,    indicating whether running, shutting down etc   // 線程池運行狀態
 */
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static final int COUNT_BITS = Integer.SIZE - 3;      // 32 - 3 = 29
// 最大線程數: 2^29-1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 0001 1111 1111 1111 1111 1111 1111 1111

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS; // 高三位 111
private static final int SHUTDOWN   =  0 << COUNT_BITS; // 高三位 000
private static final int STOP       =  1 << COUNT_BITS; // 高三位 001
private static final int TIDYING    =  2 << COUNT_BITS; // 高三位 010
private static final int TERMINATED =  3 << COUNT_BITS; // 高三位 011

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; } // 運行狀態
private static int workerCountOf(int c)  { return c & CAPACITY; }  // 運行的工做線程數
private static int ctlOf(int rs, int wc) { return rs | wc; }       // 封裝運行狀態和任務線程

ThreadPoolExecutor 一共定義了 5 種線程池狀態:

  • RUNNING:能夠接收新的任務和隊列任務
  • SHUTDOWN:不接收新的任務,可是會處理隊列裏的任務
  • STOP:不接收新任務,也不會處理隊列裏的任務,而且中斷正在運行的任務
  • TIDYING:全部任務都已經終止,workerCount 爲 0,當池狀態爲 TIDYING 時將會調用 terminated() 方法處理收尾工做。
  • TERMINATED:說明 terminated() 方法完成執行。

線程池狀態的轉移:

  • RUNNING -> SHUTDOWN
    On invocation of shutdown(), perhaps implicitly in finalize()
  • (RUNNING or SHUTDOWN) -> STOP
    On invocation of shutdownNow()
  • SHUTDOWN -> TIDYING
    When both queue and pool are empty
  • STOP -> TIDYING
    When pool is empty
  • TIDYING -> TERMINATED
    When the terminated() hook method has completed

線程池狀態轉移

3.2 工做線程 Worker

// 工做線程集合
private final HashSet<Worker> workers = new HashSet<Worker>();

// 操做 workers 集合使用到的鎖
private final ReentrantLock mainLock = new ReentrantLock();

在線程池中具備一個 Worker 集合,一個 Worker 對應一個工做線程。當線程池啓動時,對應的 Worker 會執行池中的任務,執行完畢後從阻塞隊列裏獲取一個新的任務繼續執行。

持有 mainLock 時才能操做 Worker 集合,Java 官方對使用 mainLock 而不是併發集合的說明:

  • 在線程池關閉期間,可以串行地檢查並中斷 Worker 集合中的線程,避免中斷風暴(interrupt storms)。
  • 可以簡化一些統計操做,如 largestPoolSize。

Worker 是 ThreadPoolExecutor 的內部類,其繼承體系以下:

  • 繼承了 AQS 抽象類,實現了不可重入的互斥鎖。工做線程在執行任務時需持有 Worker 鎖,每一個工做線程之間持有的鎖對象不一樣。
  • 實現了 Runnable 接口,也就是說 Worker 自己也做爲一個線程任務執行。

繼承體系Worker 使用 AQS 中的 state 屬性表示是否持有鎖:

  • -1: 初始狀態
  • 0: 無鎖狀態
  • 1: 加鎖狀態

Worker 開始工做時,會先執行 unlock() 方法設置 state 爲 0,後續再使用 CAS 對 state 來加鎖。具體見 ThreadPoolExecutor#runWorker。

注意,線程池中的工做線程在邏輯上分爲核心線程和非核心線程,可是在 Worker 類中並無相關屬性標記當前線程是不是核心線程!

而是在運行期間動態指定的:

  1. ThreadPoolExecutor#execute 提交任務時,調用 addWorker 新增工做線程,若入參 core 傳入 true 只是用於校驗覈心線程數量 corePoolSize 是否合法,並不表明該線程一直都是核心線程。
  2. ThreadPoolExecutor#getTask 獲取任務時,若是 workerCount > corePoolSize 成立,則說明當前線程要以非核心線程的規則來從隊列中拉取任務(keepAliveTime 時間內拉取不到任務,線程會被銷燬),無論該線程在 addWorker 建立時是否指定 core 爲 true.

這樣設計的目的,只是爲了動態維持線程池中的核心線程數量不超過 corePoolSize,是一種鬆散的控制。

java.util.concurrent.ThreadPoolExecutor.Worker

private final class Worker // 不可重入的互斥鎖
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread; // 工做線程
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;  // 初始運行任務
    /** Per-thread task counter */
    volatile long completedTasks; // 任務完成計數

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

3.3 線程工廠 ThreadFactory

ThreadPoolExecutor 具備屬性 threadFactory 表示線程工廠。

/**
 * Factory for new threads. All threads are created using this
 * factory (via method addWorker).  All callers must be prepared
 * for addWorker to fail, which may reflect a system or user's
 * policy limiting the number of threads.  Even though it is not
 * treated as an error, failure to create threads may result in
 * new tasks being rejected or existing ones remaining stuck in
 * the queue.
 *
 * We go further and preserve pool invariants even in the face of
 * errors such as OutOfMemoryError, that might be thrown while
 * trying to create threads.  Such errors are rather common due to
 * the need to allocate a native stack in Thread.start, and users
 * will want to perform clean pool shutdown to clean up.  There
 * will likely be enough memory available for the cleanup code to
 * complete without encountering yet another OutOfMemoryError.
 */
private volatile ThreadFactory threadFactory;

Worker 中包含屬性 thread 表示工做線程,在 Worker 構造函數中經過線程工廠來建立線程(即 Thread#new,注意不能啓動線程!)。

/**
 * Creates with given first task and thread from ThreadFactory.
 * @param firstTask the first task (null if none)
 */
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

在 Executors 工具類中定義了內部類 DefaultThreadFactory 做爲默認的線程工廠,用於統一設置線程信息:

  • 若存在 SecurityManager,則線程分組爲 System#getSecurityManager,不然與調用 defaultThreadFactory 方法的線程所屬分組相同;
  • 線程優先級均爲 Thread.NORM_PRIORITY(處於最小和最大之間);
  • 線程命名統一爲 pool-N-thread-M 格式,其中 N 是此工廠的序列號,M 是此工廠所建立線程的序列號。

java.util.concurrent.Executors#defaultThreadFactory

/**
 * Returns a default thread factory used to create new threads.
 * This factory creates all new threads used by an Executor in the
 * same {@link ThreadGroup}. If there is a {@link
 * java.lang.SecurityManager}, it uses the group of {@link
 * System#getSecurityManager}, else the group of the thread
 * invoking this {@code defaultThreadFactory} method. Each new
 * thread is created as a non-daemon thread with priority set to
 * the smaller of {@code Thread.NORM_PRIORITY} and the maximum
 * priority permitted in the thread group.  New threads have names
 * accessible via {@link Thread#getName} of
 * <em>pool-N-thread-M</em>, where <em>N</em> is the sequence
 * number of this factory, and <em>M</em> is the sequence number
 * of the thread created by this factory.
 * @return a thread factory
 */
public static ThreadFactory defaultThreadFactory() {
    return new DefaultThreadFactory();
}

java.util.concurrent.Executors.DefaultThreadFactory

/**
 * The default thread factory
 */
static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

4. 線程池方法

4.1 提交任務 execute

線程池的頂層接口 Executor 中只有一個 execute 方法,用於把任務提交到線程池,ThreadPoolExecutor 對它進行了實現。

方法說明:

  • 提交一個任務到線程池,任務不必定會當即執行。
  • 提交的任務可能在一個新的線程中執行,也可能在已經存在的空閒線程中執行。
  • 若是因爲池關閉或者池容量已經飽和致使任務沒法提交,那麼就根據拒絕策略 RejectedExecutionHandler 處理任務。

代碼流程:

  1. 若是任務爲空,拋出 NullPointerException。
  2. 若是 worderCount < corePoolSize,則經過 addWorker 添加新的核心線程,並把當前任務做爲它的 firstTask 去執行。
  3. 若是 worderCount >= corePoolSize,說明沒法添加新的核心線程,則須要把任務加入同步隊列。分爲兩種狀況:
  4. 若是入隊成功,須要作雙重檢查:
    4.1 入隊過程當中線程池已關閉,則要回退入隊操做,並執行拒絕策略。
    4.2 入隊過程當中工做線程已消亡,則當工做線程數量爲 0 時,須要初始化非核心線程,用於拉取隊列中的任務去處理。
  5. 若是入隊失敗,則嘗試建立非核心線程用於處理該任務。若非核心線程建立失敗,則執行拒絕策略。

java.util.concurrent.ThreadPoolExecutor#execute

/**
 * Executes the given task sometime in the future.  The task
 * may execute in a new thread or in an existing pooled thread.
 *
 * If the task cannot be submitted for execution, either because this
 * executor has been shutdown or because its capacity has been reached,
 * the task is handled by the current {@code RejectedExecutionHandler}.
 *
 * @param command the task to execute
 * @throws RejectedExecutionException at discretion of
 *         {@code RejectedExecutionHandler}, if the task
 *         cannot be accepted for execution
 * @throws NullPointerException if {@code command} is null
 */
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true)) // 添加新的核心線程,並把當前任務交給它執行
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {  // 線程池未關閉,且非阻塞入隊成功,則進入下一步
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command)) // 線程池已關閉,不接收新任務,須要出隊並拒絕任務
            reject(command);
        else if (workerCountOf(recheck) == 0)        // 入隊成功但工做線程爲空,則添加非核心線程且不指定任務
            addWorker(null, false);                  // 沒有任務的工做線程會從同步隊列中拉取任務去執行
    }
    else if (!addWorker(command, false)) // 隊列已滿,且建立非核心線程失敗,則拒絕任務
        reject(command);
}

4.2 添加工做線程 addWorker

在任務提交(execute方法)、更新核心線程數(setCorePoolSize方法)、預啓動線程(prestartCoreThread方法)中都會調用 addWorker 方法添加新的工做線程。

addWorker 入參指定該工做線程須要執行的任務,以及該工做線程是否核心線程。

代碼主要流程:

  1. 經過檢查線程池狀態、線程數量限制,判斷可否添加工做線程。
  2. 建立工做線程(Worker#new),啓動工做線程(Thread#start)。

檢查線程池狀態(注:SHUTDOWN 不接收新的任務,可是會處理隊列裏的任務):

  1. 線程池狀態爲 STOP 或 TIDYING 或 TERMINATED: 都不會執行任何任務,沒法建立新線程;
  2. 線程池狀態爲 SHUTDOWN 且 firstTask != null: 由於再也不接受新任務的提交,沒法建立新線程;
  3. 線程池狀態爲 SHUTDOWN 且 隊列爲空: 由於隊列中已經沒有任務了, 因此也就不須要執行任何任務了,沒法建立新線程。

檢查線程數量限制(注:工做線程在邏輯上分爲核心線程、非核心線程):

  1. 若是工做線程數量超過 CAPACITY(即 2^29-1),則沒法建立新【工做線程】;
  2. 若是工做線程數量超過 corePoolSize,則沒法建立新的【核心線程】;
  3. 若是工做線程數量超過 maximumPoolSize,則沒法建立新的【非核心線程】;

java.util.concurrent.ThreadPoolExecutor#addWorker

/**
 * Checks if a new worker can be added with respect to current
 * pool state and the given bound (either core or maximum). If so,
 * the worker count is adjusted accordingly, and, if possible, a
 * new worker is created and started, running firstTask as its
 * first task. This method returns false if the pool is stopped or
 * eligible to shut down. It also returns false if the thread
 * factory fails to create a thread when asked.  If the thread
 * creation fails, either due to the thread factory returning
 * null, or due to an exception (typically OutOfMemoryError in
 * Thread.start()), we roll back cleanly.
 *
 * @param firstTask the task the new thread should run first (or
 * null if none). Workers are created with an initial first task
 * (in method execute()) to bypass queuing when there are fewer
 * than corePoolSize threads (in which case we always start one),
 * or when the queue is full (in which case we must bypass queue).
 * Initially idle threads are usually created via
 * prestartCoreThread or to replace other dying workers.
 *
 * @param core if true use corePoolSize as bound, else
 * maximumPoolSize. (A boolean indicator is used here rather than a
 * value to ensure reads of fresh values after checking other pool
 * state).
 * @return true if successful
 */
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary. // 檢查線程池狀態
        if (rs >= SHUTDOWN && 
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c); // 檢查線程數量限制
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c)) // workerCount 自增,結束自旋
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs) // 線程池狀態發生變化,重試
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask); // 建立工做線程,指定初始任務(Executors.DefaultThreadFactory 中會執行 Thread#new,可是不會調用 Thread#start)
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock(); // 加鎖用於操做 workers 集合
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException(); // 若是線程工廠已經提早啓動線程了(Thread#start),則報錯
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s; // 更新最大池容量
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start(); // 啓動線程,由 JVM 在操做系統層面建立線程並執行 Thread#run
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w); // 線程添加失敗,回滾操做
    }
    return workerStarted;
}

注意,建立新線程的過程當中,須要區分 new Thread()new Thread().start 的不一樣:

  • Thread#new:建立 Thread 對象,並不會映射到操做系統上的線程,此時 Thread#isAlive 爲 false。注意線程池中的線程工廠只能建立 Thread 對象,不可啓動線程。
  • Thread#start:啓動線程,由 JVM 在操做系統層面建立線程,並綁定到 Thread 對象中,此時 Thread#isAlive 爲 true。

另外,在 ThreadPoolExecutor#addWorker 方法中執行 Thread t = w.thread; t.start() 會觸發執行 ThreadPoolExecutor#runWorker,該過程簡化以下:

private final class Worker implements Runnable {

    final Thread thread; // 工做線程

    public Worker() {
        thread = new Thread(this);
        System.out.println("addWorker!");
    }

    @Override
    public void run() {
        System.out.println("runWorker!");
    }
}

/**
 * 測試在 addWorker 中觸發 runWorker
 */
@Test
public void test() throws InterruptedException {
    Worker worker = new Worker();
    worker.thread.start();
    worker.thread.join();
}

4.3 執行任務 runWorker

在 ThreadPoolExecutor#addWorker 中添加工做線程以後,會啓動工做線程(Thread#start),觸發工做線程執行任務(Thread#run)。

runWorker 代碼流程:

  1. 獲取任務,該任務多是 firstTask,也多是從隊列中拉取的任務。
  2. 獲取 worker 上的互斥鎖,確保除非線程池關閉,不然沒有其餘線程可以中斷當前任務。
  3. 檢查線程池狀態,若是是 STOP 或 TIDYING 或 TERMINATED,說明再也不須要執行任務了,中斷當前線程。
  4. 執行前置工做 beforeExecute,這是一個鉤子方法。
  5. 執行任務 Runnable#run。
  6. 執行後置工做 afterExecute,這也是一個鉤子方法。

對於 Worker#lock,官方的說明:

Before running any task, the lock is acquired to prevent other pool interrupts while the task is executing, and then we ensure that unless pool is stopping, this thread does not have its interrupt set.

java.util.concurrent.ThreadPoolExecutor#runWorker

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts // 初始化 state 爲 0
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) { // firstTask 不爲空,或者從隊列拉取到任務不爲空
            w.lock(); // 加鎖,確保除非線程池關閉,不然沒有其餘線程可以中斷當前任務
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) && // 若是線程池狀態 >= STOP,則中斷當前線程,不須要執行新任務
                !wt.isInterrupted())                    // 這裏可能會兩次執行 isInterrupted,是爲了不 shutdownNow 過程當中清除了線程中斷狀態
                wt.interrupt();
            try {
                beforeExecute(wt, task); // 前置工做,預留
                Throwable thrown = null;
                try {
                    task.run(); // 執行任務
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown); // 後置工做,預留
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false; // 走到這裏說明線程沒有任務可執行
    } finally {
        processWorkerExit(w, completedAbruptly); // 工做線程退出
    }
}

4.4 獲取任務 getTask

在 ThreadPoolExecutor#runWorker 中,工做線程執行任務以前,若是 firstTask 爲空,則調用 getTask() 從隊列中獲取任務。

工做線程從隊列中拉取任務以前,須要進行校驗,若是出現如下任意一種狀況會直接退出:

  1. 工做線程數量大於 maximumPoolSize;
  2. 線程池已中止(STOP);
  3. 線程池已關閉(SHUTDOWN)且隊列爲空;
  4. 工做線程等待任務超時(keepAliveTime)。

java.util.concurrent.ThreadPoolExecutor#getTask

/**
 * Performs blocking or timed wait for a task, depending on
 * current configuration settings, or returns null if this worker
 * must exit because of any of:
 * 1. There are more than maximumPoolSize workers (due to
 *    a call to setMaximumPoolSize).
 * 2. The pool is stopped.
 * 3. The pool is shutdown and the queue is empty.
 * 4. This worker timed out waiting for a task, and timed-out
 *    workers are subject to termination (that is,
 *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
 *    both before and after the timed wait, and if the queue is
 *    non-empty, this worker is not the last thread in the pool.
 *
 * @return task, or null if the worker must exit, in which case
 *         workerCount is decremented
 */
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 校驗線程池狀態:1.線程池狀態爲 SHUTDOWN 且隊列爲空;2.線程池狀態 >= STOP
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 當前線程是否容許超時,true 表示具備超時時間(keepAliveTime)

        if ((wc > maximumPoolSize || (timed && timedOut)) // 校驗工做線程狀態:1.工做線程數超過 maximumPoolSize;2.當前工做線程已超時;3.隊列爲空
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 阻塞直到拉取成功或超時
                workQueue.take(); // 阻塞直到拉取成功
            if (r != null)
                return r;
            timedOut = true; // 表示直到超時,都沒有獲取任務
        } catch (InterruptedException retry) { // 拉取時被中斷喚醒,繼續自旋
            timedOut = false;
        }
    }
}

工做線程從線程池中拉取任務,具備兩種方式:

  1. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS):阻塞直到拉取任務成功或超時。
  2. workQueue.take():阻塞直到拉取任務成功。

代碼中經過 allowCoreThreadTimeOut || wc > corePoolSize 這個表達式來控制使用哪一種拉取任務方式。
該表達式爲 true 時,若是線程在 keepAliveTime 時間內沒有拉取到任務,則會被銷燬,表現爲「非核心線程」。
可是,因爲工做線程數量 wc 是會實時發生變化的,所以同一個線程在運行期間可能會前後使用不一樣的方式拉取任務。
也就是說,工做線程在運行期間可能會在 「核心線程」 和 「非核心線程」 兩種形態之間切換。

  • 核心 -> 非核心:初始 addWorker() 時當前線程爲核心線程。當隊列滿了後,池中新增了非核心線程,此時當前線程執行 getTask() 知足 wc > corePoolSize,變爲非核心線程。
  • 非核心 -> 核心:初始 addWorker() 時當前線程爲非核心線程。當部分核心線程因執行任務發生異常而終結,此時當前線程執行 getTask() 不知足 wc > corePoolSize,變爲核心線程。

而實際上 ThreadPoolExecutor 區分 「核心線程」 和 「非核心線程」 只是爲了利用 corePoolSize 來控制活躍線程數量以及任務是否進入隊列中排隊等待,並不關心 Worker 究竟是不是「核心線程」。

4.5 工做線程退出 processWorkerExit

在 runWorker() 中,若是經過 getTask() 識別到空閒線程(timedOut = true),或者工做線程在執行任務過程當中出現異常,會調用 processWorkerExit() 退出工做線程。

代碼流程:

  1. 若是當前線程是因爲任務執行異常而終止的,須要扣減 workerCount。
  2. 獲取 mainLock,統計任務數,從 workers set 中移除當前 worker。
  3. 嘗試終止線程池。
  4. 若是線程池未終止,須要判斷是否補上新的非核心線程。

注意,當前線程在執行完 processWorkerExit 方法以後會自動結束運行,Thread#isAlive 返回 false。
所以在當前線程終止以前,若是知足如下條件之一,則會建立新的非核心線程來替換當前線程:

  1. 用戶任務執行異常致使線程退出。
  2. 工做線程數少於 corePoolSize。
  3. 等待隊列不爲空但沒有工做線程。

Java 官方的說明:

replaces the worker if either it exited due to user task exception or if fewer than corePoolSize workers are running or queue is non-empty but there are no workers.

java.util.concurrent.ThreadPoolExecutor#processWorkerExit

/**
 * @param w the worker
 * @param completedAbruptly if the worker died due to user exception
 */
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted // 當前線程執行任務時出現異常,須要扣減 workerCount
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks; // 統計全部線程完成的任務數
        workers.remove(w); // 移除當前線程的 worker
    } finally {
        mainLock.unlock();
    }

    tryTerminate(); // 嘗試終止線程池

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) { // RUNNING、SHUTDOWN,即線程池還沒有中止
        if (!completedAbruptly) {    
            // 沒有出現異常,說明當前線程是非活躍線程:
            // 1. allowCoreThreadTimeOut 爲 false,則 min 爲 corePoolSize。若 workerCountOf(c) >= min 說明當前終止的是非核心線程,無需補充新線程
            // 2. allowCoreThreadTimeOut 爲 true,且隊列爲空,則 min 爲 0。 若 workerCountOf(c) >= min 說明當前沒有任務須要處理,無需補充新線程
            // 3. allowCoreThreadTimeOut 爲 true,且隊列非空,則 min 爲 1。 若 workerCountOf(c) >= min 說明具備活躍的線程處理任務,無需補充新線程
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false); // 建立新的線程替換當前線程
    }
}

4.6 嘗試關閉線程池 tryTerminate

tryTerminate 用於嘗試終止線程池,在 shutdow()、shutdownNow()、remove() 中均是經過此方法來終止線程池。
此方法必須在任何可能致使線程終止的行爲以後被調用,例如減小工做線程數,移除隊列中的任務,或者是在工做線程運行完畢後處理工做線程退出邏輯的方法(processWorkerExit)。

代碼流程:

  1. 校驗線程池狀態。當線程池狀態爲 STOP,或者狀態爲 SHUTDOWN 且隊列爲空,說明線程池是可終止的,此時纔可進入下一步。
  2. 校驗線程數量。若是線程池中工做線程數量不爲 0,則中斷其中一個線程(interruptIdleWorkers)並結束 tryTerminate 方法,後續由該線程來傳遞線程池關閉消息(runWorker -> getTask -> processWorkerExit -> tryTerminate)。
  3. 當線程池中沒有工做線程,且隊列中沒有任務後,開始關閉線程池:
    3.1 修改線程池狀態:(STOP or SHUTDOWN) -> TIDYING
    3.2 調用鉤子方法 terminated()
    3.3 修改線程池狀態:TIDYING -> TERMINATED

java.util.concurrent.ThreadPoolExecutor#tryTerminate

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        // 校驗線程池狀態,只有狀態爲 STOP,或者(狀態爲 SHUTDOWN 且隊列爲空)的狀況下,才能夠往下執行,不然直接返回
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) || // TIDYING、TERMINATED
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE); // 僅中斷一個工做線程,由它來傳遞線程池關閉消息
            return;
        }

        final ReentrantLock mainLock = this.mainLock; // 來到這裏,說明線程池中沒有工做線程了
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // (STOP or SHUTDOWN) -> TIDYING
                try {
                    terminated(); // 鉤子方法
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0)); // TIDYING -> TERMINATED
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

4.7 關閉線程池

理解了 tryTerminate() 如未嘗試關閉線程池後,再來看一下發起線程池關閉的方法:shutdown()、shutdownNow()。

4.7.1 shutdown

關閉線程池,不接收新的任務,可是會處理隊列裏的任務。

java.util.concurrent.ThreadPoolExecutor#shutdown

/**
 * Initiates an orderly shutdown in which previously submitted
 * tasks are executed, but no new tasks will be accepted.
 * Invocation has no additional effect if already shut down.
 *
 * <p>This method does not wait for previously submitted tasks to
 * complete execution.  Use {@link #awaitTermination awaitTermination}
 * to do that.
 *
 * @throws SecurityException {@inheritDoc}
 */
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();     // 檢查關閉權限
        advanceRunState(SHUTDOWN); // 修改線程池狀態
        interruptIdleWorkers();    // 依次中斷全部空閒線程
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate(); // 嘗試關閉線程池
}

理解 shutdown() 如何作到在關閉線程池以前,不接受新任務,而且繼續處理已有任務,關鍵在於兩個操做:

advanceRunState(SHUTDOWN)

設置線程池狀態爲 SHUTDOWN 以後:

  • 在 ThreadPoolExecutor#execute 中,線程池狀態爲 SHUTDOWN 不會接收新的任務。
  • 在 ThreadPoolExecutor#getTask 中,線程池狀態爲 SHUTDOWN 可是隊列中仍有未處理的任務,會繼續拉取任務來處理。
  • 在 ThreadPoolExecutor#runWorker 中,線程池狀態爲 SHUTDOWN 能夠繼續處理任務。

interruptIdleWorkers

java.util.concurrent.ThreadPoolExecutor#interruptIdleWorkers()

/**
 * Common form of interruptIdleWorkers, to avoid having to
 * remember what the boolean argument means.
 */
private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

/**
 * Interrupts threads that might be waiting for tasks (as
 * indicated by not being locked) so they can check for
 * termination or configuration changes. Ignores
 * SecurityExceptions (in which case some threads may remain
 * uninterrupted).
 *
 * @param onlyOne If true, interrupt at most one worker. This is
 * called only from tryTerminate when termination is otherwise
 * enabled but there are still other workers.  In this case, at
 * most one waiting worker is interrupted to propagate shutdown
 * signals in case all threads are currently waiting.
 * Interrupting any arbitrary thread ensures that newly arriving
 * workers since shutdown began will also eventually exit.
 * To guarantee eventual termination, it suffices to always
 * interrupt only one idle worker, but shutdown() interrupts all
 * idle workers so that redundant workers exit promptly, not
 * waiting for a straggler task to finish.
 */
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) { // 可以獲取鎖,說明當前線程沒有在執行任務,是「空閒」的
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

interruptIdleWorkers() 在中斷線程以前,使用 tryLock() 嘗試一次性獲取鎖,再中斷任務。

  • 若是目標線程在 ThreadPoolExecutor#getTask 中拉取任務,因爲拉取任務不用持有鎖,所以該目標線程在等待任務過程當中會被中斷喚醒,從新自旋校驗再拉取任務。
  • 若是目標線程在 ThreadPoolExecutor#runWorker 中執行任務,因爲執行任務須要持有鎖,所以其餘線程 tryLock() 失敗,當前目標線程能夠安全執行完任務。

4.7.2 shutdownNow

關閉線程池,不接收新任務,也不會處理隊列裏的任務,而且中斷正在運行的任務。

java.util.concurrent.ThreadPoolExecutor#shutdownNow

/**
 * Attempts to stop all actively executing tasks, halts the
 * processing of waiting tasks, and returns a list of the tasks
 * that were awaiting execution. These tasks are drained (removed)
 * from the task queue upon return from this method.
 *
 * <p>This method does not wait for actively executing tasks to
 * terminate.  Use {@link #awaitTermination awaitTermination} to
 * do that.
 *
 * <p>There are no guarantees beyond best-effort attempts to stop
 * processing actively executing tasks.  This implementation
 * cancels tasks via {@link Thread#interrupt}, so any task that
 * fails to respond to interrupts may never terminate.
 *
 * @throws SecurityException {@inheritDoc}
 */
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();   // 中斷全部線程
        tasks = drainQueue(); // 移除等待隊列中的全部任務
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

理解 shutdownNow() 如何作到在關閉線程池時,不接受新任務,也不會處理隊列裏的任務,而且中斷正在運行的任務。關鍵在於三個操做:

advanceRunState(STOP)

設置線程池狀態爲 STOP 以後:

  • 在 ThreadPoolExecutor#execute 中,線程池狀態爲 STOP 不會接收新的任務。
  • 在 ThreadPoolExecutor#getTask 中,線程池狀態爲 STOP,無論隊列中是否有未處理任務,均再也不拉取。
  • 在 ThreadPoolExecutor#runWorker 中,線程池狀態爲 STOP 會執行 interrupt() 設置中斷狀態,任務會不會繼續執行取決於該任務中有沒有檢查中斷狀態。

interruptWorkers

java.util.concurrent.ThreadPoolExecutor#interruptWorkers

/**
 * Interrupts all threads, even if active. Ignores SecurityExceptions
 * (in which case some threads may remain uninterrupted).
 */
private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}

java.util.concurrent.ThreadPoolExecutor.Worker#interruptIfStarted

void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { // 注意這裏沒有獲取鎖!
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}

與 interruptIdleWorkers() 相比,interruptWorkers() 在中斷線程以前,只需校驗 getState() >= 0,無需獲取鎖便可強行中斷運行中的線程。

  • 若是目標線程在 ThreadPoolExecutor#getTask 中拉取任務,因爲拉取任務不用持有鎖,所以該目標線程在等待任務過程當中會被中斷喚醒,從新自旋校驗後再也不拉取任務。
  • 若是目標線程在 ThreadPoolExecutor#runWorker 中執行任務,會被強行設置中斷狀態,可是任務會不會繼續執行取決於該任務中有沒有檢查中斷狀態。

drainQueue

java.util.concurrent.ThreadPoolExecutor#drainQueue

/**
 * Drains the task queue into a new list, normally using
 * drainTo. But if the queue is a DelayQueue or any other kind of
 * queue for which poll or drainTo may fail to remove some
 * elements, it deletes them one by one.
 */
private List<Runnable> drainQueue() {
    BlockingQueue<Runnable> q = workQueue;
    ArrayList<Runnable> taskList = new ArrayList<Runnable>();
    q.drainTo(taskList); // 批量將隊列中的任務轉移到 taskList
    if (!q.isEmpty()) {
        for (Runnable r : q.toArray(new Runnable[0])) {
            if (q.remove(r))
                taskList.add(r);
        }
    }
    return taskList;
}

將隊列 workQueue 中未處理的任務所有拉取到 taskList 中,再也不處理任務。

5. 線程數配置

合理地配置線程池:

  • CPU 密集型任務:配置儘量小的線程,如配置 Ncpu+1 個線程的線程池,下降調度開銷。
  • IO 密集型任務:因爲線程並非一直在執行任務,應配置儘量多的線程,如 2*Ncpu
  • 混合型的任務:只要這兩個任務執行的時間相差不是太大,將其拆分紅一個 CPU 密集型任務 和一個 IO 密集型任務。

《Java 併發編程實戰》提出了一個線程數計算公式。

定義:

$$ N_{cpu} = CPU 核心數 $$

$$ U_{cpu} = 目標 CPU 利用率, 0 \leqslant U_{cpu} \leqslant 1 $$

$$ \frac{ W }{ C } = 等待時間和計算時間的比例 $$

要使得處理器達到指望的使用率,線程數的最優大小等於:

$$ N_{threads} = N_{cpu} * U_{cpu} * ( 1 + \frac{ W }{ C } ) $$

能夠經過 Runtime 來獲取 CPU 核心數:

int N_CPU = Runtime.getRuntime().availableProcessors();

6. 總結

回顧一下ThreadPoolExecutor 的內部結構:
線程池

  1. 若是當前運行的線程少於 corePoolSize,即便有空閒線程也會建立新線程來執行任務。
  2. 若是運行的線程等於或多於 corePoolSize,則將任務加入 BlockingQueue。
  3. 若是沒法將任務加入 BlockingQueue(隊列已滿),則建立新的線程來處理任務。
  4. 若是建立新線程將使當前運行的線程超出 maximumPoolSize,任務將被拒絕。

做者:Sumkor
連接:https://segmentfault.com/a/11...

相關文章
相關標籤/搜索