上一篇文章介紹了 Thread 類,可知線程隨着任務的執行結束而被銷燬。可是,因爲線程的建立與銷燬操做涉及到系統調用,開銷較大,所以須要將線程的生命週期與任務進行解耦。使用線程池來管理線程,能夠有效地重複利用線程來執行任務。本文將介紹線程池最基礎的實現類 ThreadPoolExecutor。java
本文基於 jdk1.8.0_91
類型 名稱 描述 接口 Executor 最上層的接口,提供了任務提交的基礎方法 接口 ExecutorService 繼承了 Executor 接口,擴展了提交任務、獲取異步任務執行結果、線程池銷燬等方法 接口 ScheduledExecutorService 繼承了 ExecutorService 接口,增長了延遲執行任務、定時執行任務的方法 抽象類 AbstractExecutorService 提供了 ExecutorService 接口的默認實現,提供 newTaskFor 方法將任務轉換爲 RunnableFuture,以便提交給 Executor 執行 實現類 ThreadPoolExecutor 基礎、標準的線程池實現 實現類 ScheduledThreadPoolExecutor 繼承了 ThreadPoolExecutor,實現了 ScheduledExecutorService 中相關延遲任務、定時任務的方法 實現類 ForkJoinPool JDK7 加入的線程池,是 Fork/Join 框架的核心實現,只容許執行 ForkJoinTask 任務 普通類 Executors 建立各類線程池的工具類
線程池能夠解決兩個問題:編程
JDK 中建議使用較爲方便的 Executors 工廠方法,它們均爲大多數使用場景預約義了設置:segmentfault
阿里 Java 開發手冊 對線程池的使用進行了限制,可做參考:安全
【強制】線程資源必須經過線程池提供,不容許在應用中自行顯式建立線程。
說明:使用線程池的好處是減小在建立和銷燬線程上所花的時間以及系統資源的開銷,解決資源不足的問題。若是不使用線程池,有可能形成系統建立大量同類線程而致使消耗完內存或者「過分切換」的問題。多線程【強制】線程池不容許使用Executors去建立,而是經過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同窗更加明確線程池的運行規則,規避資源耗盡的風險。
說明:Executors返回的線程池對象的弊端以下:
1)FixedThreadPool和SingleThreadPool:容許的請求隊列長度爲Integer.MAX_VALUE,可能會堆積大量的請求,從而致使OOM。
2)CachedThreadPool和ScheduledThreadPool:容許的建立線程數量爲Integer.MAX_VALUE,可能會建立大量的線程,從而致使OOM。併發
Executors 內部都是調用 ThreadPoolExecutor 的構造方法來實現的:框架
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize 核心線程數量 * @param maximumPoolSize 總的線程數量 * @param keepAliveTime 空閒線程的存活時間 * @param unit keepAliveTime的單位 * @param workQueue 任務隊列, 保存已經提交但還沒有被執行的線程 * @param threadFactory 線程工廠(用於指定如何建立一個線程) * @param handler 拒絕策略 (當任務太多致使工做隊列滿時的處理策略) */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
Java 官方對 ThreadPoolExecutor 的使用說明:less
ThreadPoolExecutor 將根據 corePoolSize 和 maximumPoolSize 自動調整線程池中的線程數量,以及判斷任務是否進行排隊。異步
當提交新任務時:ide
若是運行的線程多於 corePoolSize 而少於 maximumPoolSize:
若是線程數大於 maximumPoolSize,新提交的任務將會根據拒絕策略來處理。
在大多數狀況下,corePoolSize 和 maximumPoolSize 是經過構造函數來設置的,不過也可使用 ThreadPoolExecutor 的 setCorePoolSize(int) 和 setMaximumPoolSize(int) 進行動態更改。注意,核心線程和非核心線程只是一種邏輯上的區分,線程池中只有一種類型的線程,稱爲工做線程(Worker)。
默認狀況下,核心線程只是在新任務到達時才建立和啓動的。
可使用方法 prestartCoreThread() 或 prestartAllCoreThreads() 對其進行動態重寫。
通常在構造帶有非空隊列的線程池時,會但願先啓動線程。
使用 ThreadFactory 來建立新線程。
在 Executors 中默認使用 Executors.defaultThreadFactory() 建立線程,而且這些線程具備相同的分組和優先級,且都是非守護線程。
若是 ThreadFactory 建立新線程失敗,此時線程池能夠繼續運行,但不能執行任何任務。
默認狀況下,保持活動策略只應用在非核心線程上,即當線程的空閒時間超過 keepAliveTime 時將會終止。
若使用 allowCoreThreadTimeOut(boolean) 方法則會把保活策略也應用於核心線程。
當線程池處於非活躍狀態時,能夠減小資源消耗。若是線程從新變得活躍,則會建立新的線程。
可使用方法 setKeepAliveTime(time, timeUnit) 動態地更改此參數,若入參使用 Long.MAX_VALUE 和 TimeUnit.NANOSECONDS 則不會終止空閒線程,除非線程池關閉。
全部 BlockingQueue 均可用於傳輸和保持提交的任務。當線程池中的線程數量大於 corePoolSize 而少於 maximumPoolSize 時,新任務會加入同步隊列。
排隊有三種通用策略:
直接提交(Direct handoffs)
無界隊列(Unbounded queues)
有界隊列(Bounded queues)
當線程池已關閉,或者線程池中的線程數量和隊列容量已飽和時,繼續提交新任務會被拒絕,會觸發 RejectedExecutionHandler#rejectedExecution 方法。
ThreadPoolExecutor 定義了四種拒絕策略:
能夠自定義 RejectedExecutionHandler 類來實現拒絕策略。須要注意的是,拒絕策略的運行須要指定線程池和隊列的容量。
ThreadPoolExecutor 中提供 beforeExecute 和 afterExecute 方法,在執行每一個任務以前和以後調用。提供 terminated 方法,在線程池關閉以前作一些收尾工做。
若是鉤子方法拋出異常,則內部工做線程將依次失敗並終止。
方法 getQueue() 容許出於監控和調試目的而訪問工做隊列。強烈反對出於其餘任何目的而使用此方法。
remove() 和 purge() 這兩種方法可用於在取消大量已排隊任務時幫助進行存儲回收。
當線程池的引用變爲不可達,而且線程池中沒有遺留的線程(經過設置 allowCoreThreadTimeOut 把非活動的核心線程銷燬),此時線程池會自動 shutdown。
ThreadPoolExecutor 中使用一個 AtomicInteger 類型的變量 ctl 來管理線程池。
其中,低 29 位保存線程數,高 3 位保存線程池狀態。
線程池中最大的線程數爲 2^29-1。
/** * The main pool control state, ctl, is an atomic integer packing * two conceptual fields * workerCount, indicating the effective number of threads // 工做線程數量 * runState, indicating whether running, shutting down etc // 線程池運行狀態 */ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; // 32 - 3 = 29 // 最大線程數: 2^29-1 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 0001 1111 1111 1111 1111 1111 1111 1111 // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; // 高三位 111 private static final int SHUTDOWN = 0 << COUNT_BITS; // 高三位 000 private static final int STOP = 1 << COUNT_BITS; // 高三位 001 private static final int TIDYING = 2 << COUNT_BITS; // 高三位 010 private static final int TERMINATED = 3 << COUNT_BITS; // 高三位 011 // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } // 運行狀態 private static int workerCountOf(int c) { return c & CAPACITY; } // 運行的工做線程數 private static int ctlOf(int rs, int wc) { return rs | wc; } // 封裝運行狀態和任務線程
ThreadPoolExecutor 一共定義了 5 種線程池狀態:
線程池狀態的轉移:
// 工做線程集合 private final HashSet<Worker> workers = new HashSet<Worker>(); // 操做 workers 集合使用到的鎖 private final ReentrantLock mainLock = new ReentrantLock();
在線程池中具備一個 Worker 集合,一個 Worker 對應一個工做線程。當線程池啓動時,對應的 Worker 會執行池中的任務,執行完畢後從阻塞隊列裏獲取一個新的任務繼續執行。
持有 mainLock 時才能操做 Worker 集合,Java 官方對使用 mainLock 而不是併發集合的說明:
Worker 是 ThreadPoolExecutor 的內部類,其繼承體系以下:
Worker 使用 AQS 中的 state 屬性表示是否持有鎖:
Worker 開始工做時,會先執行 unlock() 方法設置 state 爲 0,後續再使用 CAS 對 state 來加鎖。具體見 ThreadPoolExecutor#runWorker。
注意,線程池中的工做線程在邏輯上分爲核心線程和非核心線程,可是在 Worker 類中並無相關屬性標記當前線程是不是核心線程!
而是在運行期間動態指定的:
這樣設計的目的,只是爲了動態維持線程池中的核心線程數量不超過 corePoolSize,是一種鬆散的控制。
java.util.concurrent.ThreadPoolExecutor.Worker
private final class Worker // 不可重入的互斥鎖 extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; // 工做線程 /** Initial task to run. Possibly null. */ Runnable firstTask; // 初始運行任務 /** Per-thread task counter */ volatile long completedTasks; // 任務完成計數 /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
ThreadPoolExecutor 具備屬性 threadFactory 表示線程工廠。
/** * Factory for new threads. All threads are created using this * factory (via method addWorker). All callers must be prepared * for addWorker to fail, which may reflect a system or user's * policy limiting the number of threads. Even though it is not * treated as an error, failure to create threads may result in * new tasks being rejected or existing ones remaining stuck in * the queue. * * We go further and preserve pool invariants even in the face of * errors such as OutOfMemoryError, that might be thrown while * trying to create threads. Such errors are rather common due to * the need to allocate a native stack in Thread.start, and users * will want to perform clean pool shutdown to clean up. There * will likely be enough memory available for the cleanup code to * complete without encountering yet another OutOfMemoryError. */ private volatile ThreadFactory threadFactory;
Worker 中包含屬性 thread 表示工做線程,在 Worker 構造函數中經過線程工廠來建立線程(即 Thread#new,注意不能啓動線程!)。
/** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
在 Executors 工具類中定義了內部類 DefaultThreadFactory 做爲默認的線程工廠,用於統一設置線程信息:
java.util.concurrent.Executors#defaultThreadFactory
/** * Returns a default thread factory used to create new threads. * This factory creates all new threads used by an Executor in the * same {@link ThreadGroup}. If there is a {@link * java.lang.SecurityManager}, it uses the group of {@link * System#getSecurityManager}, else the group of the thread * invoking this {@code defaultThreadFactory} method. Each new * thread is created as a non-daemon thread with priority set to * the smaller of {@code Thread.NORM_PRIORITY} and the maximum * priority permitted in the thread group. New threads have names * accessible via {@link Thread#getName} of * <em>pool-N-thread-M</em>, where <em>N</em> is the sequence * number of this factory, and <em>M</em> is the sequence number * of the thread created by this factory. * @return a thread factory */ public static ThreadFactory defaultThreadFactory() { return new DefaultThreadFactory(); }
java.util.concurrent.Executors.DefaultThreadFactory
/** * The default thread factory */ static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
線程池的頂層接口 Executor 中只有一個 execute 方法,用於把任務提交到線程池,ThreadPoolExecutor 對它進行了實現。
方法說明:
代碼流程:
java.util.concurrent.ThreadPoolExecutor#execute
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) // 添加新的核心線程,並把當前任務交給它執行 return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { // 線程池未關閉,且非阻塞入隊成功,則進入下一步 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) // 線程池已關閉,不接收新任務,須要出隊並拒絕任務 reject(command); else if (workerCountOf(recheck) == 0) // 入隊成功但工做線程爲空,則添加非核心線程且不指定任務 addWorker(null, false); // 沒有任務的工做線程會從同步隊列中拉取任務去執行 } else if (!addWorker(command, false)) // 隊列已滿,且建立非核心線程失敗,則拒絕任務 reject(command); }
在任務提交(execute方法)、更新核心線程數(setCorePoolSize方法)、預啓動線程(prestartCoreThread方法)中都會調用 addWorker 方法添加新的工做線程。
addWorker 入參指定該工做線程須要執行的任務,以及該工做線程是否核心線程。
代碼主要流程:
檢查線程池狀態(注:SHUTDOWN 不接收新的任務,可是會處理隊列裏的任務):
檢查線程數量限制(注:工做線程在邏輯上分爲核心線程、非核心線程):
java.util.concurrent.ThreadPoolExecutor#addWorker
/** * Checks if a new worker can be added with respect to current * pool state and the given bound (either core or maximum). If so, * the worker count is adjusted accordingly, and, if possible, a * new worker is created and started, running firstTask as its * first task. This method returns false if the pool is stopped or * eligible to shut down. It also returns false if the thread * factory fails to create a thread when asked. If the thread * creation fails, either due to the thread factory returning * null, or due to an exception (typically OutOfMemoryError in * Thread.start()), we roll back cleanly. * * @param firstTask the task the new thread should run first (or * null if none). Workers are created with an initial first task * (in method execute()) to bypass queuing when there are fewer * than corePoolSize threads (in which case we always start one), * or when the queue is full (in which case we must bypass queue). * Initially idle threads are usually created via * prestartCoreThread or to replace other dying workers. * * @param core if true use corePoolSize as bound, else * maximumPoolSize. (A boolean indicator is used here rather than a * value to ensure reads of fresh values after checking other pool * state). * @return true if successful */ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 檢查線程池狀態 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); // 檢查線程數量限制 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) // workerCount 自增,結束自旋 break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) // 線程池狀態發生變化,重試 continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); // 建立工做線程,指定初始任務(Executors.DefaultThreadFactory 中會執行 Thread#new,可是不會調用 Thread#start) final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); // 加鎖用於操做 workers 集合 try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 若是線程工廠已經提早啓動線程了(Thread#start),則報錯 workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; // 更新最大池容量 workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); // 啓動線程,由 JVM 在操做系統層面建立線程並執行 Thread#run workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); // 線程添加失敗,回滾操做 } return workerStarted; }
注意,建立新線程的過程當中,須要區分 new Thread()
和 new Thread().start
的不一樣:
另外,在 ThreadPoolExecutor#addWorker 方法中執行 Thread t = w.thread; t.start()
會觸發執行 ThreadPoolExecutor#runWorker,該過程簡化以下:
private final class Worker implements Runnable { final Thread thread; // 工做線程 public Worker() { thread = new Thread(this); System.out.println("addWorker!"); } @Override public void run() { System.out.println("runWorker!"); } } /** * 測試在 addWorker 中觸發 runWorker */ @Test public void test() throws InterruptedException { Worker worker = new Worker(); worker.thread.start(); worker.thread.join(); }
在 ThreadPoolExecutor#addWorker 中添加工做線程以後,會啓動工做線程(Thread#start),觸發工做線程執行任務(Thread#run)。
runWorker 代碼流程:
對於 Worker#lock,官方的說明:
Before running any task, the lock is acquired to prevent other pool interrupts while the task is executing, and then we ensure that unless pool is stopping, this thread does not have its interrupt set.
java.util.concurrent.ThreadPoolExecutor#runWorker
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts // 初始化 state 爲 0 boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { // firstTask 不爲空,或者從隊列拉取到任務不爲空 w.lock(); // 加鎖,確保除非線程池關閉,不然沒有其餘線程可以中斷當前任務 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && // 若是線程池狀態 >= STOP,則中斷當前線程,不須要執行新任務 !wt.isInterrupted()) // 這裏可能會兩次執行 isInterrupted,是爲了不 shutdownNow 過程當中清除了線程中斷狀態 wt.interrupt(); try { beforeExecute(wt, task); // 前置工做,預留 Throwable thrown = null; try { task.run(); // 執行任務 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); // 後置工做,預留 } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; // 走到這裏說明線程沒有任務可執行 } finally { processWorkerExit(w, completedAbruptly); // 工做線程退出 } }
在 ThreadPoolExecutor#runWorker 中,工做線程執行任務以前,若是 firstTask 爲空,則調用 getTask() 從隊列中獲取任務。
工做線程從隊列中拉取任務以前,須要進行校驗,若是出現如下任意一種狀況會直接退出:
java.util.concurrent.ThreadPoolExecutor#getTask
/** * Performs blocking or timed wait for a task, depending on * current configuration settings, or returns null if this worker * must exit because of any of: * 1. There are more than maximumPoolSize workers (due to * a call to setMaximumPoolSize). * 2. The pool is stopped. * 3. The pool is shutdown and the queue is empty. * 4. This worker timed out waiting for a task, and timed-out * workers are subject to termination (that is, * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) * both before and after the timed wait, and if the queue is * non-empty, this worker is not the last thread in the pool. * * @return task, or null if the worker must exit, in which case * workerCount is decremented */ private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 校驗線程池狀態:1.線程池狀態爲 SHUTDOWN 且隊列爲空;2.線程池狀態 >= STOP decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 當前線程是否容許超時,true 表示具備超時時間(keepAliveTime) if ((wc > maximumPoolSize || (timed && timedOut)) // 校驗工做線程狀態:1.工做線程數超過 maximumPoolSize;2.當前工做線程已超時;3.隊列爲空 && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 阻塞直到拉取成功或超時 workQueue.take(); // 阻塞直到拉取成功 if (r != null) return r; timedOut = true; // 表示直到超時,都沒有獲取任務 } catch (InterruptedException retry) { // 拉取時被中斷喚醒,繼續自旋 timedOut = false; } } }
工做線程從線程池中拉取任務,具備兩種方式:
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
:阻塞直到拉取任務成功或超時。workQueue.take()
:阻塞直到拉取任務成功。代碼中經過 allowCoreThreadTimeOut || wc > corePoolSize
這個表達式來控制使用哪一種拉取任務方式。
該表達式爲 true 時,若是線程在 keepAliveTime 時間內沒有拉取到任務,則會被銷燬,表現爲「非核心線程」。
可是,因爲工做線程數量 wc 是會實時發生變化的,所以同一個線程在運行期間可能會前後使用不一樣的方式拉取任務。
也就是說,工做線程在運行期間可能會在 「核心線程」 和 「非核心線程」 兩種形態之間切換。
而實際上 ThreadPoolExecutor 區分 「核心線程」 和 「非核心線程」 只是爲了利用 corePoolSize 來控制活躍線程數量以及任務是否進入隊列中排隊等待,並不關心 Worker 究竟是不是「核心線程」。
在 runWorker() 中,若是經過 getTask() 識別到空閒線程(timedOut = true),或者工做線程在執行任務過程當中出現異常,會調用 processWorkerExit() 退出工做線程。
代碼流程:
注意,當前線程在執行完 processWorkerExit 方法以後會自動結束運行,Thread#isAlive 返回 false。
所以在當前線程終止以前,若是知足如下條件之一,則會建立新的非核心線程來替換當前線程:
Java 官方的說明:
replaces the worker if either it exited due to user task exception or if fewer than corePoolSize workers are running or queue is non-empty but there are no workers.
java.util.concurrent.ThreadPoolExecutor#processWorkerExit
/** * @param w the worker * @param completedAbruptly if the worker died due to user exception */ private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted // 當前線程執行任務時出現異常,須要扣減 workerCount decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; // 統計全部線程完成的任務數 workers.remove(w); // 移除當前線程的 worker } finally { mainLock.unlock(); } tryTerminate(); // 嘗試終止線程池 int c = ctl.get(); if (runStateLessThan(c, STOP)) { // RUNNING、SHUTDOWN,即線程池還沒有中止 if (!completedAbruptly) { // 沒有出現異常,說明當前線程是非活躍線程: // 1. allowCoreThreadTimeOut 爲 false,則 min 爲 corePoolSize。若 workerCountOf(c) >= min 說明當前終止的是非核心線程,無需補充新線程 // 2. allowCoreThreadTimeOut 爲 true,且隊列爲空,則 min 爲 0。 若 workerCountOf(c) >= min 說明當前沒有任務須要處理,無需補充新線程 // 3. allowCoreThreadTimeOut 爲 true,且隊列非空,則 min 爲 1。 若 workerCountOf(c) >= min 說明具備活躍的線程處理任務,無需補充新線程 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); // 建立新的線程替換當前線程 } }
tryTerminate 用於嘗試終止線程池,在 shutdow()、shutdownNow()、remove() 中均是經過此方法來終止線程池。
此方法必須在任何可能致使線程終止的行爲以後被調用,例如減小工做線程數,移除隊列中的任務,或者是在工做線程運行完畢後處理工做線程退出邏輯的方法(processWorkerExit)。
代碼流程:
java.util.concurrent.ThreadPoolExecutor#tryTerminate
final void tryTerminate() { for (;;) { int c = ctl.get(); // 校驗線程池狀態,只有狀態爲 STOP,或者(狀態爲 SHUTDOWN 且隊列爲空)的狀況下,才能夠往下執行,不然直接返回 if (isRunning(c) || runStateAtLeast(c, TIDYING) || // TIDYING、TERMINATED (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); // 僅中斷一個工做線程,由它來傳遞線程池關閉消息 return; } final ReentrantLock mainLock = this.mainLock; // 來到這裏,說明線程池中沒有工做線程了 mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // (STOP or SHUTDOWN) -> TIDYING try { terminated(); // 鉤子方法 } finally { ctl.set(ctlOf(TERMINATED, 0)); // TIDYING -> TERMINATED termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
理解了 tryTerminate() 如未嘗試關閉線程池後,再來看一下發起線程池關閉的方法:shutdown()、shutdownNow()。
關閉線程池,不接收新的任務,可是會處理隊列裏的任務。
java.util.concurrent.ThreadPoolExecutor#shutdown
/** * Initiates an orderly shutdown in which previously submitted * tasks are executed, but no new tasks will be accepted. * Invocation has no additional effect if already shut down. * * <p>This method does not wait for previously submitted tasks to * complete execution. Use {@link #awaitTermination awaitTermination} * to do that. * * @throws SecurityException {@inheritDoc} */ public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 檢查關閉權限 advanceRunState(SHUTDOWN); // 修改線程池狀態 interruptIdleWorkers(); // 依次中斷全部空閒線程 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); // 嘗試關閉線程池 }
理解 shutdown() 如何作到在關閉線程池以前,不接受新任務,而且繼續處理已有任務,關鍵在於兩個操做:
設置線程池狀態爲 SHUTDOWN 以後:
java.util.concurrent.ThreadPoolExecutor#interruptIdleWorkers()
/** * Common form of interruptIdleWorkers, to avoid having to * remember what the boolean argument means. */ private void interruptIdleWorkers() { interruptIdleWorkers(false); } /** * Interrupts threads that might be waiting for tasks (as * indicated by not being locked) so they can check for * termination or configuration changes. Ignores * SecurityExceptions (in which case some threads may remain * uninterrupted). * * @param onlyOne If true, interrupt at most one worker. This is * called only from tryTerminate when termination is otherwise * enabled but there are still other workers. In this case, at * most one waiting worker is interrupted to propagate shutdown * signals in case all threads are currently waiting. * Interrupting any arbitrary thread ensures that newly arriving * workers since shutdown began will also eventually exit. * To guarantee eventual termination, it suffices to always * interrupt only one idle worker, but shutdown() interrupts all * idle workers so that redundant workers exit promptly, not * waiting for a straggler task to finish. */ private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { // 可以獲取鎖,說明當前線程沒有在執行任務,是「空閒」的 try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
interruptIdleWorkers() 在中斷線程以前,使用 tryLock() 嘗試一次性獲取鎖,再中斷任務。
關閉線程池,不接收新任務,也不會處理隊列裏的任務,而且中斷正在運行的任務。
java.util.concurrent.ThreadPoolExecutor#shutdownNow
/** * Attempts to stop all actively executing tasks, halts the * processing of waiting tasks, and returns a list of the tasks * that were awaiting execution. These tasks are drained (removed) * from the task queue upon return from this method. * * <p>This method does not wait for actively executing tasks to * terminate. Use {@link #awaitTermination awaitTermination} to * do that. * * <p>There are no guarantees beyond best-effort attempts to stop * processing actively executing tasks. This implementation * cancels tasks via {@link Thread#interrupt}, so any task that * fails to respond to interrupts may never terminate. * * @throws SecurityException {@inheritDoc} */ public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); // 中斷全部線程 tasks = drainQueue(); // 移除等待隊列中的全部任務 } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
理解 shutdownNow() 如何作到在關閉線程池時,不接受新任務,也不會處理隊列裏的任務,而且中斷正在運行的任務。關鍵在於三個操做:
設置線程池狀態爲 STOP 以後:
java.util.concurrent.ThreadPoolExecutor#interruptWorkers
/** * Interrupts all threads, even if active. Ignores SecurityExceptions * (in which case some threads may remain uninterrupted). */ private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } }
java.util.concurrent.ThreadPoolExecutor.Worker#interruptIfStarted
void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { // 注意這裏沒有獲取鎖! try { t.interrupt(); } catch (SecurityException ignore) { } } }
與 interruptIdleWorkers() 相比,interruptWorkers() 在中斷線程以前,只需校驗 getState() >= 0,無需獲取鎖便可強行中斷運行中的線程。
java.util.concurrent.ThreadPoolExecutor#drainQueue
/** * Drains the task queue into a new list, normally using * drainTo. But if the queue is a DelayQueue or any other kind of * queue for which poll or drainTo may fail to remove some * elements, it deletes them one by one. */ private List<Runnable> drainQueue() { BlockingQueue<Runnable> q = workQueue; ArrayList<Runnable> taskList = new ArrayList<Runnable>(); q.drainTo(taskList); // 批量將隊列中的任務轉移到 taskList if (!q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } return taskList; }
將隊列 workQueue 中未處理的任務所有拉取到 taskList 中,再也不處理任務。
合理地配置線程池:
《Java 併發編程實戰》提出了一個線程數計算公式。
定義:
$$ N_{cpu} = CPU 核心數 $$
$$ U_{cpu} = 目標 CPU 利用率, 0 \leqslant U_{cpu} \leqslant 1 $$
$$ \frac{ W }{ C } = 等待時間和計算時間的比例 $$
要使得處理器達到指望的使用率,線程數的最優大小等於:
$$ N_{threads} = N_{cpu} * U_{cpu} * ( 1 + \frac{ W }{ C } ) $$
能夠經過 Runtime 來獲取 CPU 核心數:
int N_CPU = Runtime.getRuntime().availableProcessors();
回顧一下ThreadPoolExecutor 的內部結構: