前言
以前研究了一下如何使用ScheduledThreadPoolExecutor動態建立定時任務(Springboot定時任務原理及如何動態建立定時任務),簡單瞭解了ScheduledThreadPoolExecutor相關源碼。今天看了同窗寫的ThreadPoolExecutor 的源碼解讀,甚是NB,必須轉發一下。html
讀了一下 ThreadPoolExecutor 的源碼(JDK 11), 簡單的作個筆記.java
Executor 框架
Executor
Executor
接口只有一個方法:markdown
public interface Executor { void execute(Runnable command); }
Executor
接口提供了一種將任務提交和任務執行機制解耦的方法. Executor
的實現並不需要是異步的.app
ExecutorService
ExecutorService
在 Executor
的基礎上, 提供了一些管理終止的方法和能夠生成 Future
來跟蹤一個或多個異步任務的進度的方法:框架
shutdown()
方法會啓動比較柔和的關閉過程, 而且不會阻塞.ExecutorService
將會繼續執行已經提交的任務, 但不會再接受新的任務. 若是ExecutorService
已經被關閉, 則不會有附加的操做.shutdownNow()
方法會嘗試中止正在執行的任務, 再也不執行等待執行的任務, 而且返回等待執行的任務列表, 不會阻塞. 這個方法只能嘗試中止任務, 典型的取消實現是經過中斷來取消任務, 所以不能響應中斷的任務可能永遠不會終止.invokeAll()
方法執行給定集合中的全部任務, 當全部任務完成時返回Future
的列表, 支持中斷. 若是在此操做正在進行時修改了給定的集合,則此方法的結果未定義.invokeAny()
方法會執行給定集合中的任務, 當有一個任務完成時, 返回這個任務的結果, 並取消其餘未完成的任務, 支持中斷. 若是在此操做正在進行時修改了給定的集合,則此方法的結果未定義.
AbstractExecutorService
AbstractExecutorService
提供了一些 ExecutorService
的執行方法的默認實現. 這個方法使用了 newTaskFor()
方法返回的 RunnableFuture
(默認是 FutureTask
) 來實現 submit()
、invokeAll()
、 invokeAny()
方法.less
RunnableFuture
繼承了 Runnable
和 Future
, 在 run()
方法成功執行後, 將會設置完成狀態, 並容許獲取執行的結果:異步
public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); }
FutureTask
FutureTask
實現了 RunnableFuture
接口, 表示一個可取消的計算任務, 只能在任務完成以後獲取結果, 而且在任務完成後, 就再也不能取消或重啓, 除非使用 runAndReset()
方法.ide
FutureTask
有 7 個狀態:oop
- NEW
- COMPLETING
- NORMAL
- EXCEPTIONAL
- CANCELLED
- INTERRUPTING
- INTERRUPTED
可能的狀態轉換:post
- NEW -> COMPLETING -> NORMAL
- NEW -> COMPLETING -> EXCEPTIONAL
- NEW -> CANCELLED
- NEW -> INTERRUPTING -> INTERRUPTED
FutureTask
在更新 state 、 runner、 waiters 時, 都使用了 VarHandle.compareAndSet()
:
// VarHandle mechanics private static final VarHandle STATE; private static final VarHandle RUNNER; private static final VarHandle WAITERS; static { try { MethodHandles.Lookup l = MethodHandles.lookup(); STATE = l.findVarHandle(FutureTask.class, "state", int.class); RUNNER = l.findVarHandle(FutureTask.class, "runner", Thread.class); WAITERS = l.findVarHandle(FutureTask.class, "waiters", WaitNode.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } // Reduce the risk of rare disastrous classloading in first call to // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 Class<?> ensureLoaded = LockSupport.class; } protected void set(V v) { if (STATE.compareAndSet(this, NEW, COMPLETING)) { outcome = v; STATE.setRelease(this, NORMAL); // final state finishCompletion(); } }
來看一下 get()
方法:
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); } private int awaitDone(boolean timed, long nanos) throws InterruptedException { long startTime = 0L; WaitNode q = null; boolean queued = false; for (;;) { int s = state; if (s > COMPLETING) { // 已經在終結狀態, 返回狀態 if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // 已經完成了, 可是狀態仍是 COMPLETING Thread.yield(); else if (Thread.interrupted()) { // 檢查中斷 removeWaiter(q); throw new InterruptedException(); } else if (q == null) { // 沒有建立 WaitNode 節點, 若是 timed 而且 nanos 大於 0, 建立一個 WaitNode if (timed && nanos <= 0L) return s; q = new WaitNode(); } else if (!queued) // 將新的 WaitNode 放到鏈表頭部, 並嘗試 cas 到 waiters queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q); else if (timed) { final long parkNanos; if (startTime == 0L) { // first time startTime = System.nanoTime(); if (startTime == 0L) startTime = 1L; parkNanos = nanos; } else { long elapsed = System.nanoTime() - startTime; if (elapsed >= nanos) { // 超時了 removeWaiter(q); return state; } // park 的時間 parkNanos = nanos - elapsed; } // nanos 比較慢, 再次檢查, 而後阻塞 if (state < COMPLETING) LockSupport.parkNanos(this, parkNanos); } else // 不須要超時的阻塞 LockSupport.park(this); } }
再來看下 run()
方法:
public void run() { if (state != NEW || !RUNNER.compareAndSet(this, null, Thread.currentThread())) // 不在 NEW 狀態, 或者 runner 不爲 null return; try { // callable 是在構造器中指定的或用 Executors.callable(runnable, result) 建立的 Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; // 設置異常狀態和異常結果 setException(ex); } if (ran) // 正常完成, 設置完成狀態和結果 set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } protected void set(V v) { if (STATE.compareAndSet(this, NEW, COMPLETING)) { outcome = v; STATE.setRelease(this, NORMAL); // final state finishCompletion(); } } private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (WAITERS.weakCompareAndSet(this, q, null)) { // cas 移除 waiters, 對鏈表中的每一個 Node 的線程 unpark for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } // 默認實現什麼都沒作 done(); callable = null; // to reduce footprint }
AbstractExecutorService 的執行方法
來看下 AbstractExecutorService
實現的幾個執行方法, 這裏就只放上以 Callable
爲參數的方法:
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { try { return doInvokeAny(tasks, false, 0); } catch (TimeoutException cannotHappen) { assert false; return null; } } private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { if (tasks == null) throw new NullPointerException(); int ntasks = tasks.size(); if (ntasks == 0) throw new IllegalArgumentException(); ArrayList<Future<T>> futures = new ArrayList<>(ntasks); ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this); try { ExecutionException ee = null; final long deadline = timed ? System.nanoTime() + nanos : 0L; Iterator<? extends Callable<T>> it = tasks.iterator(); // 提交一個任務到 ecs futures.add(ecs.submit(it.next())); --ntasks; int active = 1; for (;;) { // 嘗試獲取第一個完成的任務的 Future Future<T> f = ecs.poll(); if (f == null) { // 沒有完成的任務 if (ntasks > 0) { // 還有沒提交的任務, 再提交一個到 ecs --ntasks; futures.add(ecs.submit(it.next())); ++active; } else if (active == 0) // 沒有還沒提交的任務和正在執行的任務了 break; else if (timed) { f = ecs.poll(nanos, NANOSECONDS); if (f == null) throw new TimeoutException(); nanos = deadline - System.nanoTime(); } else f = ecs.take(); } if (f != null) { // 存在已經完成的任務 --active; try { // 獲取結果並返回 return f.get(); } catch (ExecutionException eex) { ee = eex; } catch (RuntimeException rex) { ee = new ExecutionException(rex); } } } // 出錯, 拋出 if (ee == null) ee = new ExecutionException(); throw ee; } finally { // 取消全部已經提交的任務 cancelAll(futures); } } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { if (tasks == null) throw new NullPointerException(); ArrayList<Future<T>> futures = new ArrayList<>(tasks.size()); try { for (Callable<T> t : tasks) { // 提交任務 RunnableFuture<T> f = newTaskFor(t); futures.add(f); execute(f); } for (int i = 0, size = futures.size(); i < size; i++) { Future<T> f = futures.get(i); if (!f.isDone()) { // 任務沒有完成, get() 等待任務完成 try { f.get(); } catch (CancellationException | ExecutionException ignore) {} } } return futures; } catch (Throwable t) { cancelAll(futures); throw t; } }
構造器
ThreadPoolExecutor
一共有4個構造器, 這裏就只放上兩個構造器:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
參數說明:
- corePoolSize: 在線程池中保持的線程的數量, 即便這些線程是空閒的, 除非
allowCoreThreadTimeOut
被設置爲true
; - maximumPoolSize: 線程池中最大線程數量;
- keepAliveTime: 多餘空閒線程在終止以前等待新任務的最長時間;
- unit:
keepAliveTime
的時間單位; - workQueue: 任務的等待隊列, 用於存放等待執行的任務. 僅包含
execute()
方法提交的Runnable
; - threadFactory: executor 用來建立線程的工廠, 默認使用
Executors.defaultThreadFactory()
來建立一個新的工廠; - handler: 任務由於達到了線程邊界和隊列容量而被阻止時的處理程序, 默認使用
AbortPolicy
.
狀態
ThreadPoolExecutor
有5個狀態:
- RUNNING: 接受新任務, 而且處理隊列中的任務;
- SHUTDOWN: 不接受新任務, 可是處理隊列中的任務, 此時仍然可能建立新的線程;
- STOP: 不接受新任務, 處理隊列中的任務, 中斷正在運行的任務;
- TIDYING: 全部的任務都終結了, workCount 的值是0, 將狀態轉換爲 TIDYING 的線程會執行
terminated()
方法; - TERMINATED:
terminated()
方法執行完畢.
狀態轉換:
- RUNNING -> SHUTDOWN , On invocation of shutdown()
- (RUNNING or SHUTDOWN) -> STOP , On invocation of shutdownNow()
- SHUTDOWN -> TIDYING , When both queue and pool are empty
- STOP -> TIDYING , When pool is empty
- TIDYING -> TERMINATED , When the terminated() hook method has completed
workCount 和 state 被打包在一個 AtomicInteger
中, 其中的高三位用於表示線程池狀態( state ), 低 29 位用於表示 workCount:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int COUNT_MASK = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~COUNT_MASK; } private static int workerCountOf(int c) { return c & COUNT_MASK; } private static int ctlOf(int rs, int wc) { return rs | wc; }
workCount 表示有效的線程數量, 是容許啓動且不容許中止的 worker 的數量, 與實際的線程數量瞬時不一樣. 用戶可見的線程池大小是 Worker 集合的大小.
Worker 與任務調度
工做線程被封裝在 Worker
中 , 而且存放在一個 HashSet
(workers) 中由 mainLock 保護:
/** * Set containing all worker threads in pool. Accessed only when * holding mainLock. */ private final HashSet<Worker> workers = new HashSet<>(); private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker. */ public void run() { runWorker(this); } ... }
Worker.run()
方法很簡單, 直接調用了 runWorker()
方法, 來看一下這個方法的源碼:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { // task 不爲 null 或 獲取到了須要執行的任務; getTask() 會阻塞, 並在線程須要退出時返回 null w.lock(); // 檢查線程池狀態和線程的中斷狀態, 若是被中斷, 表明線程池正在 STOP if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) // 從新設置中斷狀態 wt.interrupt(); try { // 執行前的鉤子 beforeExecute(wt, task); try { // 執行任務 task.run(); // 執行後的鉤子 afterExecute(task, null); } catch (Throwable ex) { // 執行後的鉤子 afterExecute(task, ex); throw ex; } } finally { // 更新狀態, 準備處理下一個任務 task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 處理 Worker 的退出 processWorkerExit(w, completedAbruptly); } }
getTask()
方法會在如下4種狀況返回 null :
- workCount 大於 maximumPoolSize;
- 線程池已經處於 STOP 狀態;
- 線程池已經處於 SHUTDOWN 狀態, 而且任務隊列爲空;
- 等待任務時超時, 而且超時的 worker 須要被終止.
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) { // 線程池已經處於 SHUTDOWN 狀態, 而且不在須要線程 (線程池已經處於 STOP 狀態 或 workQueue 爲空) decrementWorkerCount(); return null; } int wc = workerCountOf(c); // 是否須要剔除超時的 worker boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { // 須要剔除當前 worker, 嘗試調整 workerCount if (compareAndDecrementWorkerCount(c)) // 成功 返回 null return null; continue; } try { // 阻塞獲取任務 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; // 設置超時標記, 下一次循環中檢查是否須要返回 null timedOut = true; } catch (InterruptedException retry) { // 被中斷, 設置超時標記, 下一次循環中檢查是否須要返回 null timedOut = false; } } }
processWorkerExit()
方法負責垂死 worker 的清理和簿記, 只會被工做線程調用:
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 更新線程池完成的任務數量 completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } // 嘗試轉換線程池狀態到終止 tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { // 不是因爲用戶代碼異常而忽然退出 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) // 不須要在添加新 worker return; } // 嘗試添加新的 worker addWorker(null, false); } }
提交任務
ThreadPoolExecutor
沒有重寫 submit()
方法, 咱們只要看一下 execute()
就夠了:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // 有效線程數量小於 corePoolSize 嘗試調用 addWorker 來增長一個線程(在 addWorker 方法中使用 corePoolSize 來檢查是否須要增長線程), 使用 corePoolSize 做爲, 並把 command 做爲新線程的第一個任務 if (addWorker(command, true)) return; // 調用失敗, 從新獲取狀態 c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { // 線程池仍然在運行, 將 command 加入 workQueue 成功, 再次檢查狀態, 由於此時線程池狀態可能已經改變, 按照新的狀態拒絕 command 或嘗試添加新的線程 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) // 再也不是運行中狀態, 嘗試從隊列移除 command(還會嘗試將線程池狀態轉換爲 TERMINATED), 拒絕command reject(command); else if (workerCountOf(recheck) == 0) // 有效線程數量爲 0 , 建立新的線程, 在 addWorker 方法中使用 maximumPoolSize 來檢查是否須要增長線程 addWorker(null, false); } else if (!addWorker(command, false)) // 將任務放入隊列失敗或線程池不在運行狀態, 而且嘗試添加線程失敗(此時線程池已經 shutdown 或飽和), 拒絕任務 reject(command); }
addWorker()
方法有兩個參數 Runnable firstTask
和 boolean core
. firstTask
是新建的工做線程的第一個任務; core
若是爲 true , 表示用 corePoolSize 做爲邊界條件, 不然表示用 maximumPoolSize. 這裏的 core 用布爾值是爲了確保檢查最新的狀態.
addWorker()
主要作了這麼兩件事情:
- 是否能夠在當前線程池狀態和給定的邊界條件(core or maximum)下建立一個新的工做線程;
- 若是能夠, 調整 worker counter, 若是可能的話, 建立一個新的 worker 並啓動它, 把 firstTask 做爲這個新 worker 的第一個任務;
來看下 addWorker()
方法的源碼:
private boolean addWorker(Runnable firstTask, boolean core) { // 重試標籤 retry: for (int c = ctl.get();;) { // 獲取最新的狀態, 檢查狀態 if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) // 若是線程池狀態已經進入 SHUDOWN, 而且再也不須要工做線程(已經進入 STOP 狀態 或 firstTask 不爲 null 或 workQueue爲空) 返回 false return false; for (;;) { if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) // 有效線程數量大於邊界條件, 返回 false return false; if (compareAndIncrementWorkerCount(c)) // 調整 workerCount, break retry, 退出外部循環 break retry; c = ctl.get(); // Re-read ctl if (runStateAtLeast(c, SHUTDOWN)) // 由於狀態變化致使 CAS 失敗, continue retry, 重試外部循環 continue retry; // 因爲 workerCount 改變致使 CAS 失敗, 重試內嵌循環 } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 新建 Worker w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // threadFactory 成功建立了線程 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); // 從新檢查狀態 if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { // 線程池在 RUNNING 狀態 或 須要線程(線程池還不在 STOP 狀態 而且 firstTask 爲 null) // 檢查線程是否可啓動 if (t.isAlive()) throw new IllegalThreadStateException(); // 將 worker 添加到 workers workers.add(w); // 更新 largestPoolSize int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; // 更新 worker 添加的標記 workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 啓動線程, 更新啓動標記 t.start(); workerStarted = true; } } } finally { if (! workerStarted) // 失敗回滾 addWorkerFailed(w); } return workerStarted; } private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 從 workers 中移除 worker if (w != null) workers.remove(w); // 調整 workerCount() decrementWorkerCount(); // 嘗試將線程池狀態改變爲 TERMINATED tryTerminate(); } finally { mainLock.unlock(); } }
線程池關閉
來看一下線程池的關閉方法:
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 若是線程池狀態尚未達到SHUTDOWN, 將線程池狀態改成 SHUTDOWN advanceRunState(SHUTDOWN); // 中斷空閒的工做者線程 interruptIdleWorkers(); // 鉤子 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 嘗試轉換狀態到終止 tryTerminate(); } public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 若是線程池狀態尚未達到 STOP, 將線程池狀態改成 STOP advanceRunState(STOP); // 中斷全部 worker interruptWorkers(); // 獲取任務隊列中的任務, 並將這些任務從任務隊列中刪除 tasks = drainQueue(); } finally { mainLock.unlock(); } // 嘗試轉換狀態到終止 tryTerminate(); return tasks; } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 等待線程池終止或超時 while (runStateLessThan(ctl.get(), TERMINATED)) { if (nanos <= 0L) // 剩餘時間小於 0 , 超時 return false; nanos = termination.awaitNanos(nanos); } return true; } finally { mainLock.unlock(); } }
tryTerminate()
方法中, 若是成功將線程池狀態轉換到了 TERMINATED, 將會termination.signalAll()
來喚醒等待線程池終結的線程:
final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateLessThan(c, STOP) && ! workQueue.isEmpty())) // 狀態不須要改變 (處於 RUNNING 狀態 或 已經處於 TIDYING 狀態 或 (還沒到達 STOP 狀態, 而且 workQueue 不爲空)) return; if (workerCountOf(c) != 0) { // Eligible to terminate // 中斷一個空閒的 worker, 以傳播關閉狀態到工做線程 interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // 將狀態成功更新爲 TIDYING try { // 默認實現沒有作任何事情 terminated(); } finally { // 將線程池狀態更新爲 TERMINATED ctl.set(ctlOf(TERMINATED, 0)); // 喚醒等待終結的線程 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }