java中的所說的線程池,通常都是圍繞着 ThreadPoolExecutor 來展開的。其餘的實現基本都是基於它,或者模仿它的。因此只要理解 ThreadPoolExecutor, 就至關於徹底理解了線程池的精髓。html
其實要理解一個東西,通常地,咱們最好是要抱着本身的疑問或者理解去的。不然,每每收穫甚微。java
理解 ThreadPoolExecutor, 咱們能夠先理解一個線程池的意義: 本質上是提供預先定義好的n個線程,供調用方直接運行任務的一個工具。安全
1. 提升任務執行的響應速度,下降資源消耗。任務執行時,直接當即使用線程池提供的線程運行,避免了臨時建立線程的CPU/內存開銷,達到快速響應的效果。數據結構
2. 提升線程的可管理性。線程總數可預知,避免用戶主動建立無限多線程致使死機風險,還能夠進行線程統一的分配、調優和監控。多線程
3. 避免對資源的過分使用。在超出預期的請求任務狀況,響應策略可控。併發
要想使用線程池,天然是要理解其接口的。通常咱們使用 ExecotorService 進行線程池的調用。然而,咱們並不針對初學者。app
總體的接口以下:框架
咱們就挑幾個經常使用接口探討下:less
submit(Runnable task): 提交一個無需返回結果的任務。
submit(Callable<T> task): 提交一個有返回結果的任務。
invokeAll(Collection<? extends Callable<T>> tasks, long, TimeUnit): 同時執行n個任務並返回結果列表。
shutdown(): 關閉線程程池。
awaitTermination(long timeout, TimeUnit unit): 等待關閉結果,最長不超過timeout時間。異步
以上是ThreadPoolExector 提供的特性,針對以上特性。
1. 線程池如何接受任務?
2. 線程如何運行任務?
3. 線程池如何關閉?
接下來,就讓咱們帶着疑問去看實現吧。
咱們首先重點要看的是,如何執行提交的任務。我能夠經過下圖來看看。
總結描述下就是:
1. 判斷核心線程池是否已滿,若是不是,則建立線程執行任務
2. 若是核心線程池滿了,判斷隊列是否滿了,若是隊列沒滿,將任務放在隊列中
3. 若是隊列滿了,則判斷線程池是否已滿,若是沒滿,建立線程執行任務
4. 若是線程池也滿了,則按照拒絕策略對任務進行處理
另外,咱們來看一下 ThreadPoolExecutor 的構造方法,由於這裏會體現出每一個屬性的含義。
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
從構造方法能夠看出 ThreadPoolExecutor 的主要參數 7 個,在其註釋上也有說明功能,我們翻譯下每一個參數的功能:
corePoolSize: 線程池核心線程數(平時保留的線程數),使用時機: 在初始時刻,每次請求進來都會建立一個線程直到達到該size maximumPoolSize: 線程池最大線程數,使用時機: 當workQueue都放不下時,啓動新線程,直到最大線程數,此時到達線程池的極限 keepAliveTime/unit: 超出corePoolSize數量的線程的保留時間,unit爲時間單位 workQueue: 阻塞隊列,當核心線程數達到或者超出後,會先嚐試將任務放入該隊列由各線程自行消費; ArrayBlockingQueue: 構造函數必定要傳大小 LinkedBlockingQueue: 構造函數不傳大小會默認爲65536(Integer.MAX_VALUE ),當大量請求任務時,容易形成 內存耗盡。 SynchronousQueue: 同步隊列,一個沒有存儲空間的阻塞隊列 ,將任務同步交付給工做線程。 PriorityBlockingQueue: 優先隊列 threadFactory:線程工廠,用於線程須要建立時,調用其newThread()生產新線程使用 handler: 飽和策略,當隊列已放不下任務,且建立的線程已達到 maximum 後,則不能再處理任務,直接將任務交給飽和策略 AbortPolicy: 直接拋棄(默認) CallerRunsPolicy: 用調用者的線程執行任務 DiscardOldestPolicy: 拋棄隊列中最久的任務 DiscardPolicy: 拋棄當前任務
當調用 submit 方法,就是向線程池中提交一個任務,處理流程如步驟1所示。可是咱們須要更深刻理解。
submit 方法是定義在 AbstractExecutorService 中,最終調用 ThreadPoolExecutor 的 execute 方法,便是模板方法模式的應用。
// java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable, T) /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); // 封裝任務和返回結果爲 RunnableFuture, 統一交由具體的子類執行 RunnableFuture<T> ftask = newTaskFor(task, result); // execute 將會調用 ThreadPoolExecutor 的實現,是咱們討論的重要核心 execute(ftask); return ftask; } // FutureTask 是個重要的線程池組件,它承載了具體的任務執行流 /** * Returns a {@code RunnableFuture} for the given runnable and default * value. * * @param runnable the runnable task being wrapped * @param value the default value for the returned future * @param <T> the type of the given value * @return a {@code RunnableFuture} which, when run, will run the * underlying runnable and which, as a {@code Future}, will yield * the given value as its result and provide for cancellation of * the underlying task * @since 1.6 */ protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } // ThreadPoolExecutor 的任務提交過程 // java.util.concurrent.ThreadPoolExecutor#execute /** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ // ctl 是一個重要的控制全局狀態的數據結構,定義爲一個線程安全的 AtomicInteger // ctl = new AtomicInteger(ctlOf(RUNNING, 0)); int c = ctl.get(); // 當尚未達到核心線程池的數量時,直接添加1個新線程,而後讓其執行任務便可 if (workerCountOf(c) < corePoolSize) { // 2.1. 添加新線程,且執行command任務 // 添加成功,即不須要後續操做了,添加失敗,則說明外部環境變化了 if (addWorker(command, true)) return; c = ctl.get(); } // 當核心線程達到後,則嘗試添加到阻塞隊列中,具體添加方法由阻塞隊列實現 // isRunning => c < SHUTDOWN; if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 2.2. 添加隊列成功後,還要再次檢測線程池的運行狀態,決定啓動線程或者狀態過時 // 2.2.1. 當線程池已關閉,則將剛剛添加的任務移除,走reject策略 if (! isRunning(recheck) && remove(command)) reject(command); // 2.2.2. 當一個worker都沒有時,則添加worker else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 當隊列滿後,則直接再建立新的線程運行,若是不能再建立線程了,則 reject else if (!addWorker(command, false)) // 2.3. 拒絕策略處理 reject(command); }
經過上面這一小段代碼,咱們就已經完整地看到了。經過一個 ctl 變量進行全局狀態控制,從而保證了線程安全性。整個框架並無使用鎖,可是倒是線程安全的。
整段代碼恰好完整描述了線程池的執行流程:
1. 判斷核心線程池是否已滿,若是不是,則建立線程執行任務;
2. 若是核心線程池滿了,判斷隊列是否滿了,若是隊列沒滿,將任務放在隊列中;
3. 若是隊列滿了,則判斷線程池是否已滿,若是沒滿,建立線程執行任務;
4. 若是線程池也滿了,則按照拒絕策略對任務進行處理;
2.1. 添加新的worker
一個worker,便是一個工做線程。
/** * Checks if a new worker can be added with respect to current * pool state and the given bound (either core or maximum). If so, * the worker count is adjusted accordingly, and, if possible, a * new worker is created and started, running firstTask as its * first task. This method returns false if the pool is stopped or * eligible to shut down. It also returns false if the thread * factory fails to create a thread when asked. If the thread * creation fails, either due to the thread factory returning * null, or due to an exception (typically OutOfMemoryError in * Thread.start()), we roll back cleanly. * * @param firstTask the task the new thread should run first (or * null if none). Workers are created with an initial first task * (in method execute()) to bypass queuing when there are fewer * than corePoolSize threads (in which case we always start one), * or when the queue is full (in which case we must bypass queue). * Initially idle threads are usually created via * prestartCoreThread or to replace other dying workers. * * @param core if true use corePoolSize as bound, else * maximumPoolSize. (A boolean indicator is used here rather than a * value to ensure reads of fresh values after checking other pool * state). * @return true if successful */ private boolean addWorker(Runnable firstTask, boolean core) { // 爲確保線程安全,進行CAS反覆重試 retry: for (;;) { int c = ctl.get(); // 獲取runState , c 的高位存儲 // c & ~CAPACITY; int rs = runStateOf(c); // Check if queue empty only if necessary. // 已經shutdown, firstTask 爲空的添加並不會成功 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); // 若是超出最大容許建立的線程數,則直接失敗 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // CAS 更新worker+1數,成功則說明佔位成功退出retry,後續的添加操做將是安全的,失敗則說明已有其餘線程變動該值 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl // runState 變動,則退出到 retry 從新循環 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 如下爲添加 worker 過程 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 使用 Worker 封閉 firstTask 任務,後續運行將由 Worker 接管 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; // 添加 worker 的過程,須要保證線程安全 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // SHUTDOWN 狀況下仍是會建立 Worker, 可是後續檢測將會失敗 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 既然是新添加的線程,就不該該是 alive 狀態 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // workers 只是一個工做線程的容器,使用 HashSet 承載 // private final HashSet<Worker> workers = new HashSet<Worker>(); workers.add(w); int s = workers.size(); // 維護一個全局達到過的最大線程數計數器 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // worker 添加成功後,進行將worker啓起來,裏面應該是有一個 死循環,一直在獲取任務 // 否則怎麼運行添加到隊列裏的任務呢? if (workerAdded) { t.start(); workerStarted = true; } } } finally { // 若是任務啓動失敗,則必須進行清理,返回失敗 if (! workerStarted) addWorkerFailed(w); } return workerStarted; } // 大概添加 worker 的框架明白了,重點對象是 Worker, 咱們稍後再講 // 如今先來看看,添加失敗的狀況,如何進行 /** * Rolls back the worker thread creation. * - removes worker from workers, if present * - decrements worker count * - rechecks for termination, in case the existence of this * worker was holding up termination */ private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); // ctl 中的 workerCount - 1 , CAS 實現 decrementWorkerCount(); // 嘗試處理空閒線程 tryTerminate(); } finally { mainLock.unlock(); } } /** * Decrements the workerCount field of ctl. This is called only on * abrupt termination of a thread (see processWorkerExit). Other * decrements are performed within getTask. */ private void decrementWorkerCount() { do {} while (! compareAndDecrementWorkerCount(ctl.get())); } // 中止可能啓動的 worker /** * Transitions to TERMINATED state if either (SHUTDOWN and pool * and queue empty) or (STOP and pool empty). If otherwise * eligible to terminate but workerCount is nonzero, interrupts an * idle worker to ensure that shutdown signals propagate. This * method must be called following any action that might make * termination possible -- reducing worker count or removing tasks * from the queue during shutdown. The method is non-private to * allow access from ScheduledThreadPoolExecutor. */ final void tryTerminate() { for (;;) { int c = ctl.get(); // 線程池正在運行、正在清理、已關閉但隊列還未處理完,都不會進行 terminate 操做 if (isRunning(c) || // c >= TIDYING runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { // Eligible to terminate // 中止線程的兩個方式之一,只中斷一個 worker interruptIdleWorkers(ONLY_ONE); return; } // 如下爲整個線程池的後置操做 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 設置正在清理標識 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 線程池已終止的鉤子方法,默認實現爲空 terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); // 此處 termination 爲喚醒等待關閉的線程 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } } /** * Interrupts threads that might be waiting for tasks (as * indicated by not being locked) so they can check for * termination or configuration changes. Ignores * SecurityExceptions (in which case some threads may remain * uninterrupted). * * @param onlyOne If true, interrupt at most one worker. This is * called only from tryTerminate when termination is otherwise * enabled but there are still other workers. In this case, at * most one waiting worker is interrupted to propagate shutdown * signals in case all threads are currently waiting. * Interrupting any arbitrary thread ensures that newly arriving * workers since shutdown began will also eventually exit. * To guarantee eventual termination, it suffices to always * interrupt only one idle worker, but shutdown() interrupts all * idle workers so that redundant workers exit promptly, not * waiting for a straggler task to finish. */ private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 迭代全部 worker for (Worker w : workers) { Thread t = w.thread; // 獲取到 worker 的鎖以後,再進行 interrupt if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } // 只中斷一個 worker, 當即返回, 不保證 interrupt 成功 if (onlyOne) break; } } finally { mainLock.unlock(); } }
2.2. 當添加隊列成功後,發現線程池狀態變動,須要進行移除隊列操做
/** * Removes this task from the executor's internal queue if it is * present, thus causing it not to be run if it has not already * started. * * <p>This method may be useful as one part of a cancellation * scheme. It may fail to remove tasks that have been converted * into other forms before being placed on the internal queue. For * example, a task entered using {@code submit} might be * converted into a form that maintains {@code Future} status. * However, in such cases, method {@link #purge} may be used to * remove those Futures that have been cancelled. * * @param task the task to remove * @return {@code true} if the task was removed */ public boolean remove(Runnable task) { // 此移除不必定能成功 boolean removed = workQueue.remove(task); // 上面已經看過,它會嘗試中止一個 worker 線程 tryTerminate(); // In case SHUTDOWN and now empty return removed; }
/** * Invokes the rejected execution handler for the given command. * Package-protected for use by ScheduledThreadPoolExecutor. */ final void reject(Runnable command) { // 拒絕策略是在構造方法時傳入的,默認爲 RejectedExecutionHandler // 即用戶只需實現 rejectedExecution 方法,便可以自定義拒絕策略了 handler.rejectedExecution(command, this); }
從上面的實現中,咱們能夠看到,主要是對 Worker 的添加和 workQueue 的添加,因此具體的工做是由誰完成呢?天然就是 Worker 了。
// Worker 的構造方法,主要是接受一個 task, 能夠爲 null, 若是非null, 將在不久的未來被執行 // private final class Worker extends AbstractQueuedSynchronizer implements Runnable /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 將 Worker 自身看成一個 任務,綁定到 worker.thread 中 // thread 啓動時,worker 就啓動了 this.thread = getThreadFactory().newThread(this); } // Worker 的主要工做實現,經過一個循環掃描實現 /** Delegates main run loop to outer runWorker */ public void run() { // 調用 ThreadPoolExecutor 外部實現的 runWorker 方法 runWorker(this); } /** * Main worker run loop. Repeatedly gets tasks from queue and * executes them, while coping with a number of issues: * * 1. We may start out with an initial task, in which case we * don't need to get the first one. Otherwise, as long as pool is * running, we get tasks from getTask. If it returns null then the * worker exits due to changed pool state or configuration * parameters. Other exits result from exception throws in * external code, in which case completedAbruptly holds, which * usually leads processWorkerExit to replace this thread. * * 2. Before running any task, the lock is acquired to prevent * other pool interrupts while the task is executing, and then we * ensure that unless pool is stopping, this thread does not have * its interrupt set. * * 3. Each task run is preceded by a call to beforeExecute, which * might throw an exception, in which case we cause thread to die * (breaking loop with completedAbruptly true) without processing * the task. * * 4. Assuming beforeExecute completes normally, we run the task, * gathering any of its thrown exceptions to send to afterExecute. * We separately handle RuntimeException, Error (both of which the * specs guarantee that we trap) and arbitrary Throwables. * Because we cannot rethrow Throwables within Runnable.run, we * wrap them within Errors on the way out (to the thread's * UncaughtExceptionHandler). Any thrown exception also * conservatively causes thread to die. * * 5. After task.run completes, we call afterExecute, which may * also throw an exception, which will also cause thread to * die. According to JLS Sec 14.20, this exception is the one that * will be in effect even if task.run throws. * * The net effect of the exception mechanics is that afterExecute * and the thread's UncaughtExceptionHandler have as accurate * information as we can provide about any problems encountered by * user code. * * @param w the worker */ final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 不停地從 workQueue 中獲取任務,而後執行,就是這麼個邏輯 // getTask() 會阻塞式獲取,因此 Worker 每每不會當即退出 while (task != null || (task = getTask()) != null) { // 執行過程當中是不容許併發的,即同時只能一個 task 在運行,此時也不容許進行 interrupt w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // 檢測是否已被線程池是否中止 或者當前 worker 被中斷 // STOP = 1 << COUNT_BITS; if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) // 中斷信息傳遞 wt.interrupt(); try { // 任務開始前 切點,默認爲空執行 beforeExecute(wt, task); Throwable thrown = null; try { // 直接調用任務的run方法, 具體的返回結果,會被 FutureTask 封裝到 某個變量中 // 能夠參考之前的文章 (FutureTask是怎樣獲取到異步執行結果的? https://www.cnblogs.com/yougewe/p/11666284.html) task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 任務開始後 切點,默認爲空執行 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } // 正常退出,有必要的話,可能從新將 Worker 添加進來 completedAbruptly = false; } finally { // 處理退出後下一步操做,可能從新添加 Worker processWorkerExit(w, completedAbruptly); } } /** * Performs cleanup and bookkeeping for a dying worker. Called * only from worker threads. Unless completedAbruptly is set, * assumes that workerCount has already been adjusted to account * for exit. This method removes thread from worker set, and * possibly terminates the pool or replaces the worker if either * it exited due to user task exception or if fewer than * corePoolSize workers are running or queue is non-empty but * there are no workers. * * @param w the worker * @param completedAbruptly if the worker died due to user exception */ private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { // 在 Worker 正常退出的狀況下,檢查是否超時致使,維持最小線程數 if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; // 若是知足最小線程要求,則直接返回 if (workerCountOf(c) >= min) return; // replacement not needed } // 不然再添加一個Worker到線程池中備用 // 非正常退出,會直接再添加一個Worker addWorker(null, false); } } /** * Performs blocking or timed wait for a task, depending on * current configuration settings, or returns null if this worker * must exit because of any of: * 1. There are more than maximumPoolSize workers (due to * a call to setMaximumPoolSize). * 2. The pool is stopped. * 3. The pool is shutdown and the queue is empty. * 4. This worker timed out waiting for a task, and timed-out * workers are subject to termination (that is, * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) * both before and after the timed wait, and if the queue is * non-empty, this worker is not the last thread in the pool. * * @return task, or null if the worker must exit, in which case * workerCount is decremented */ private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 若是進行了 shutdown, 且隊列爲空, 則須要將 worker 退出 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // do {} while (! compareAndDecrementWorkerCount(ctl.get())); decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 線程數據大於最大容許線程,須要刪除多餘的 Worker if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 若是開戶了超時刪除功能,則使用 poll, 不然使用 take() 進行阻塞獲取 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // 獲取到任務,則能夠進行執行了 if (r != null) return r; // 若是有超時設置,則會在下一循環時退出 timedOut = true; } // 忽略中斷異常 // 在這種狀況下,Worker如何響應外部的中斷請求呢??? 思考 catch (InterruptedException retry) { timedOut = false; } } }
因此,Worker的做用就體現出來了,一個循環取任務執行任務過程:
1. 有一個主循環一直進行任務的獲取;
2. 針對有超時的設置,會使用poll進行獲取任務,若是超時,則 Worker 將會退出循環結束線程;
3. 無超時的設置,則會使用 take 進行阻塞式獲取,直到有值;
4. 獲取任務執行前置+業務+後置任務;
5. 當獲取到null的任務以後,當前Worker將會結束;
6. 當前Worker結束後,將會判斷是否有必要維護最低Worker數,從而決定是否再添加Worker進來。
仍是借用一個網上同窗比較通用的一個圖來表述下 Worker/ThreadPoolExecutor 的工做流程吧(已經很完美,不須要再造這輪子了)
ThreadPoolExecutor 是經過 ctl 這個變量進行全局狀態維護的,shutdown 在線程池中也是表現爲一個狀態,因此應該是比較簡單的。
/** * Initiates an orderly shutdown in which previously submitted * tasks are executed, but no new tasks will be accepted. * Invocation has no additional effect if already shut down. * * <p>This method does not wait for previously submitted tasks to * complete execution. Use {@link #awaitTermination awaitTermination} * to do that. * * @throws SecurityException {@inheritDoc} */ public void shutdown() { // 爲保證線程安全,使用 mainLock final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // SecurityManager 檢查 checkShutdownAccess(); // 設置狀態爲 SHUTDOWN advanceRunState(SHUTDOWN); // 中斷空閒的 Worker, 即至關於依次關閉每一個空閒線程 interruptIdleWorkers(); // 關閉鉤子,默認實現爲空操做,爲方便子類實現自定義清理功能 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 再 tryTerminate(); } /** * Transitions runState to given target, or leaves it alone if * already at least the given target. * * @param targetState the desired state, either SHUTDOWN or STOP * (but not TIDYING or TERMINATED -- use tryTerminate for that) */ private void advanceRunState(int targetState) { for (;;) { int c = ctl.get(); // 自身CAS更新成功或者被其餘線程更新成功 if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } } // 關閉空閒線程(非 running 狀態) /** * Common form of interruptIdleWorkers, to avoid having to * remember what the boolean argument means. */ private void interruptIdleWorkers() { // 上文已介紹, 此處 ONLY_ONE 爲 false, 便是最大可能地中斷全部 Worker interruptIdleWorkers(false); } 與 shutdown 對應的,有一個 shutdownNow, 其語義是 當即中止全部任務。 /** * Attempts to stop all actively executing tasks, halts the * processing of waiting tasks, and returns a list of the tasks * that were awaiting execution. These tasks are drained (removed) * from the task queue upon return from this method. * * <p>This method does not wait for actively executing tasks to * terminate. Use {@link #awaitTermination awaitTermination} to * do that. * * <p>There are no guarantees beyond best-effort attempts to stop * processing actively executing tasks. This implementation * cancels tasks via {@link Thread#interrupt}, so any task that * fails to respond to interrupts may never terminate. * * @throws SecurityException {@inheritDoc} */ public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 與 shutdown 的差異,設置的狀態不同 advanceRunState(STOP); // 強行中斷線程 interruptWorkers(); // 將未完成的任務返回 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } /** * Interrupts all threads, even if active. Ignores SecurityExceptions * (in which case some threads may remain uninterrupted). */ private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) // 調用 worker 的提供的中斷方法 w.interruptIfStarted(); } finally { mainLock.unlock(); } } // ThreadPoolExecutor.Worker#interruptIfStarted void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { // 直接調用任務的 interrupt t.interrupt(); } catch (SecurityException ignore) { } } }
invokeAll, 望文生義,便是調用全部給定的任務。想來應該是一個個地添加任務到線程池隊列吧。
// invokeAll 的方法直接在抽象方便中就實現了,它的語義是同時執行n個任務,並同步等待結果返回 // java.util.concurrent.AbstractExecutorService#invokeAll(java.util.Collection<? extends java.util.concurrent.Callable<T>>) public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { if (tasks == null) throw new NullPointerException(); ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false; try { for (Callable<T> t : tasks) { RunnableFuture<T> f = newTaskFor(t); futures.add(f); // 依次調用各子類的實現,添加任務 execute(f); } for (int i = 0, size = futures.size(); i < size; i++) { Future<T> f = futures.get(i); if (!f.isDone()) { try { // 依次等待執行結果 f.get(); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } } } done = true; return futures; } finally { if (!done) for (int i = 0, size = futures.size(); i < size; i++) futures.get(i).cancel(true); } }
實現很簡單,都是些外圍調用。
經過上面的過程,能夠看到,整個ThreadPoolExecutor 非狀態的依賴是很是強的。因此一個好的狀態值的設計就顯得很重要了,runState 表明線程池或者 Worker 的運行狀態。以下:
// runState is stored in the high-order bits // 整個狀態使值使用 ctl 的高三位值進行控制, COUNT_BITS=29 // 1110 0000 0000 0000 private static final int RUNNING = -1 << COUNT_BITS; // 0000 0000 0000 0000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 0010 0000 0000 0000 private static final int STOP = 1 << COUNT_BITS; // 0100 0000 0000 0000 private static final int TIDYING = 2 << COUNT_BITS; // 0110 0000 0000 0000 private static final int TERMINATED = 3 << COUNT_BITS; // 整個狀態值的大小順序主: RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED // 而低 29位,則用來保存 worker 的數量,當worker增長時,只要將整個 ctl 增長便可。 // 0001 1111 1111 1111, 便是最大的 worker 數量 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 整個 ctl 描述爲一個 AtomicInteger, 功能以下: /** * The main pool control state, ctl, is an atomic integer packing * two conceptual fields * workerCount, indicating the effective number of threads * runState, indicating whether running, shutting down etc * * In order to pack them into one int, we limit workerCount to * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 * billion) otherwise representable. If this is ever an issue in * the future, the variable can be changed to be an AtomicLong, * and the shift/mask constants below adjusted. But until the need * arises, this code is a bit faster and simpler using an int. * * The workerCount is the number of workers that have been * permitted to start and not permitted to stop. The value may be * transiently different from the actual number of live threads, * for example when a ThreadFactory fails to create a thread when * asked, and when exiting threads are still performing * bookkeeping before terminating. The user-visible pool size is * reported as the current size of the workers set. * * The runState provides the main lifecycle control, taking on values: * * RUNNING: Accept new tasks and process queued tasks * SHUTDOWN: Don't accept new tasks, but process queued tasks * STOP: Don't accept new tasks, don't process queued tasks, * and interrupt in-progress tasks * TIDYING: All tasks have terminated, workerCount is zero, * the thread transitioning to state TIDYING * will run the terminated() hook method * TERMINATED: terminated() has completed * * The numerical order among these values matters, to allow * ordered comparisons. The runState monotonically increases over * time, but need not hit each state. The transitions are: * * RUNNING -> SHUTDOWN * On invocation of shutdown(), perhaps implicitly in finalize() * (RUNNING or SHUTDOWN) -> STOP * On invocation of shutdownNow() * SHUTDOWN -> TIDYING * When both queue and pool are empty * STOP -> TIDYING * When pool is empty * TIDYING -> TERMINATED * When the terminated() hook method has completed * * Threads waiting in awaitTermination() will return when the * state reaches TERMINATED. * * Detecting the transition from SHUTDOWN to TIDYING is less * straightforward than you'd like because the queue may become * empty after non-empty and vice versa during SHUTDOWN state, but * we can only terminate if, after seeing that it is empty, we see * that workerCount is 0 (which sometimes entails a recheck -- see * below). */ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
從上面的 shutdown, 能夠看到,只是寫了 SHUTDOWN 標識後,嘗試儘量地中斷中止Worker線程,但並不保證中斷成功。要想保證中止完成,須要有另外的機制來保證。從 awaitTermination 的語義來講,它是能保證任務中止完成的,那麼它是如何保證的呢?
// ThreadPoolExecutor.awaitTermination() public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { // 只是循環 ctl 狀態, 只要 狀態爲 TERMINATED 狀態,則說明已經關閉成功 // 此處 termination 的狀態觸發是在 tryTerminate 中觸發的 if (runStateAtLeast(ctl.get(), TERMINATED)) return true; if (nanos <= 0) return false; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } }
看起來, awaitTermination 並無什麼特殊操做,而是一直在等待。因此 TERMINATED 是 Worker 自行發生的動做。
那是在哪裏作的操做呢?實際上是在獲取任務的時候,會檢測當前狀態是不是 SHUTDOWN, 若是是SHUTDOWN且 隊列爲空,則會觸發獲取任務的返回null.從而結束當前 Worker.
Worker 在結束前會調用 processWorkerExit() 方法,裏面會再次調用 tryTerminate(), 當全部 Worker 都運行到這個點後, awaitTermination() 就會收到通知了。(注意: processWorkerExit() 會在每次運行後進行 addWorker() 嘗試,可是在 SHUTDOWN 狀態的添加操做老是失敗的,因此不用考慮)
到此,你是否能夠解答前面的幾個問題了呢?