池化技術應該是最經常使用的提升程序性能的手段,包括線程池與數據庫鏈接池,常量池等等java
建立與銷燬線程是比較耗費時間的,不利於處理Java程序的高併發,所以引入線程池,也就是維護一組可用的線程,若是有任務,就當即將線程池的空閒線程分配給任務,提高性能,若是線程池內全部的線程都是忙狀態的話,能夠將任務放到任務隊列,或者建立一個新的線程並放入線程池,用於處理新的任務git
使用線程池的好處程序員
下降資源消耗。經過重複利用已建立的線程下降線程建立和銷燬形成的消耗。github
在《阿里巴巴 Java 開發手冊》「併發處理」這一章節,明確指出線程資源必須經過線程池提供,不容許在應用中自行顯示建立線程。spring
爲何呢?docker
使用線程池的好處是減小在建立和銷燬線程上所消耗的時間以及系統資源開銷,解決資源不足的問題。若是不使用線程池,有可能會形成系統建立大量同類線程而致使消耗完內存或者「過分切換」的問題。數據庫
提升響應速度。當任務到達時,任務能夠不須要等待線程建立就能當即執行。編程
提升線程的可管理性。線程是稀缺資源,若是無限制的建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行統一的分配,調優和監控。(最原始粗放的服務器實現就是請求綁定一個套接字後就新開一個線程去處理,若是請求量巨大的時候,服務器是確定要崩的,由於缺少對線程資源的管理)數組
線程池監控的方法:緩存
SpringBoot 中的 Actuator
組件
經過ThreadPoolExecutor
的自有接口獲取線程池信息
線程池通常用於執行多個不相關聯的耗時任務,沒有多線程的狀況下,任務順序執行,使用了線程池的話可以讓多個不相關聯的任務同時執行。
舉個項目中實際使用的例子:
實際使用時要注意的通常規則
使用線程池,而不是建立單個線程
使用ThreadPoolExecutor
構造函數而不是Executors
工具類,下文有具體的解釋
顯式的定義線程池名字,以業務名字做區分,便於定位問題
可使用自定義的ThreadFactory
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/** * 線程工廠,它設置線程名稱,有利於咱們定位問題。 */
public final class NamingThreadFactory implements ThreadFactory {
private final AtomicInteger threadNum = new AtomicInteger();
private final ThreadFactory delegate;
private final String name;
/** * 建立一個帶名字的線程池生產工廠 */
public NamingThreadFactory(ThreadFactory delegate, String name) {
this.delegate = delegate;
this.name = name; // TODO consider uniquifying this
}
@Override
public Thread newThread(Runnable r) {
Thread t = delegate.newThread(r);
t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
return t;
}
}
複製代碼
使用guava的ThreadFactoryBuilder
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(threadNamePrefix + "-%d")
.setDaemon(true).build();
ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory)
複製代碼
不一樣的業務使用不一樣的線程池
有依賴關係的任務在使用同一個線程池在稍高的併發情況下可能會出現一種邏輯上的死鎖,大概來講就是父任務A中調用了子任務B,父任務與子任務共用一個線程池,當父任務佔據了所有的核心線程資源,而且子任務仍未執行時,沒法退出對核心線程的佔用,而與此同時子任務只能堆積在任務隊列中,沒法得到線程資源,若是又使用了無界隊列的話,則會一直堆積直到OOM,具體的參考線程池運用不當的一次線上事故
Executor 框架是 Java5 以後引進的,在 Java 5 以後,經過 Executor 來啓動線程比使用Thread
的 start
方法更好,除了更易管理,效率更好(用線程池實現,節約開銷)外,還有關鍵的一點:有助於避免this
逃逸問題。
補充:this 逃逸是指在構造函數返回以前其餘線程就持有該對象的引用. 調用還沒有構造徹底的對象的方法可能引起使人疑惑的錯誤,若是用
volatile
修飾的話應該就能解決這個問題了,不知道Executor框架的出現是如何有助於解決此問題的呢?---不是很清楚
Executor 框架不只包括了線程池的管理,還提供了線程工廠、隊列以及拒絕策略等,Executor 框架讓併發編程變得更加簡單
實際上在Executor框架中,還有一個線程池ForkJoinPool
可能用的不太多,此類繼承AbstractExecutorService
,文章末尾會介紹到
除了說Executor框架,還有一種說法就是JUC框架
,也就是java.util.concurrent
這個包下的全部的多線程相關類的總稱
向線程池提交任務有兩種方法:
execute
方法
Runnable
的任務,不提供返回值,源碼分析見下文(做爲線程池的入口必定是要仔細分析的)submit
方法
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
複製代碼
ThreadPoolExecutor
沒有實現本身的submit方法,而是沿用的父類AbstractExecutorService
的實現
接受Runnable
或Callable
的任務,並提供Future
類型返回值
submit
內部將傳入的任務統一封裝爲RunnableFuture
類型,此類型實現了Runnable
與Future
接口,老縫合怪了~
不一樣之處就在於傳入Runnable
的任務獲得的Future
可能沒法獲得有效的返回值,而Callable
的任務可以獲得返回結果
提交Runnable
任務時也能夠指定一個返回結果,做爲Future
的返回結果,可是這個結果顯然並非任務執行完成的返回值,而是程序員事先傳入的值,其做用相似因而一個flag值
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
複製代碼
Runnable
任務會被轉換爲Callable
類型,若是有傳入預期的返回值,call
函數中就會原封不動的返回,可是若是沒有傳入,就是返回null了submit
內部實際上仍然調用了execute
方法
此處補充Callable
與Runnable
的差別:
補充Future接口的做用
isDone
判斷任務是否執行完get方法得到
執行結果
shutdown方法
關閉線程池,線程池的狀態變爲 SHUTDOWN
。線程池再也不接受新任務了,可是隊列裏的任務得執行完畢。
執行shutdown
方法後,能夠執行awaitTermination
方法,則會等待指定的時間讓線程池關閉,若在指定時間內關閉則返回true,不然false
shutdown源碼分析
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
// 上鎖
mainLock.lock();
try {
// 判斷調用者是否有權限shutdown線程池
checkShutdownAccess();
// CAS 設置線程池狀態爲SHUTDOWN
advanceRunState(SHUTDOWN);
// 中斷全部空閒線程
interruptIdleWorkers();
// 鉤子函數
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
// 解鎖
mainLock.unlock();
}
// 嘗試終止線程池
tryTerminate();
}
複製代碼
shutdownNow
方法 閉線程池,線程的狀態變爲 STOP
。線程池會終止當前正在運行的任務,並中止處理排隊的任務並返回正在等待執行的任務列表。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
// 上鎖
mainLock.lock();
try {
// 判斷調用者是否有權限shutdown線程池
checkShutdownAccess();
// CAS 設置線程池狀態爲STOP
advanceRunState(STOP);
// 中斷全部線程
interruptWorkers();
// 從隊列中獲取剩餘的未執行的工做列表
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 嘗試終止線程池
tryTerminate();
// 返回未執行的任務列表
return tasks;
}
複製代碼
interruptWorkers
的解析放到了後文中使用以下兩個方法來判斷線程池是否徹底關閉
isTerminated
() 當調用 shutdown()
方法後,而且全部提交的任務完成後返回爲 true,或者是執行shutdownNow
後,線程池內的線程所有被中斷,工做線程數量爲0後返回trueisShutdown()
當調用 shutdown()
方法後返回爲 true。Runnable
或者 Callable
接口的任務對象。Runnable
/Callable
接口的 對象直接交給 execute
執行: ExecutorService.execute(Runnable command)
)或者也能夠把 Runnable
對象或Callable
對象提交給 submit
執行(ExecutorService.submit(Runnable task)
或 ExecutorService.submit(Callable <T> task)
)。ExecutorService.submit(…)
,ExecutorService
將返回一個實現Future
接口的對象(剛剛也提到過了執行 execute()
方法和 submit()
方法的區別,submit()
會返回一個 FutureTask
對象FutureTask.get()
方法來等待任務執行完成。主線程也能夠執行 FutureTask.cancel(boolean mayInterruptIfRunning)
來取消此任務的執行。Executors
工具類,能夠建立普通的線程池與能夠執行定時任務的線程池,可是簡單的建立方法意味着封裝的程度高,就會致使自由度低,甚至有一些風險固定線程數量的線程池
該線程池中的線程數量始終不變。當有一個新的任務提交時,線程池中如有空閒線程,則當即執行。若沒有,則新的任務會被暫存在一個任務隊列中,待有線程空閒時,便處理在任務隊列中的任務。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
// 默認任務隊列的長度是Integer.MAX_VALUE
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
複製代碼
maximumPoolSize
將是事實上的無效參數,由於不可能存在任務隊列滿的狀況(能夠將任務隊列視做系統內最大,因此不用設置最大線程數,由於再多的任務也徹底能夠緩存在隊列中)。因此,經過建立 FixedThreadPool
的源碼能夠看出建立的 FixedThreadPool
的 corePoolSize
和 maximumPoolSize
被設置爲同一個值。
keepAliveTime
將是一個無效參數(由於不會有核心線程以外的其他線程)(固然,若是空閒核心線程被容許超時回收的話,就是有用的了,便是,若是空閒就會當即展開回收)corePoolSize
後,新任務將在無界隊列中等待,所以線程池中的線程數不會超過 corePoolSize;因此一旦corePoolSize設置不對的話,將會有大量任務乾等着,而且性能也沒有徹底發揮僅有一個線程的線程池
能夠視爲是固定線程數量線程池的特值狀況,即nThreads爲1的狀況
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
// 使用包裝類包裝過的,用來保證:
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
複製代碼
動態分配線程數量的線程池
該方法返回一個可根據實際狀況調整線程數量的線程池。線程池的線程數量不肯定,但如有空閒線程能夠複用,則會優先使用可複用的線程。若全部線程均在工做,又有新的任務提交,則會建立新的線程處理任務。全部線程在當前任務執行完畢後,將返回線程池進行復用。
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
複製代碼
SynchronousQueue.offer(Runnable task)
提交任務到任務隊列。若是當前線程池中有閒線程正在執行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)
,那麼主線程執行 offer 操做與空閒線程執行的 poll
操做配對成功,主線程把任務交給空閒線程執行,execute()
方法執行完成。SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)
。這種狀況下,offer方法將返回false,此時 CachedThreadPool
會建立新線程執行任務,execute 方法執行完成;Integer.MAX_VALUE
,這意味着線程池能夠不受控的一直接受任務,直到棧空間OOMpublic static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
複製代碼
雖然隊列使用的是有界隊列,可是最大線程數量是Integer.MAX_VALUE
,這意味着線程池能夠不受控的一直接受任務,直到棧空間OOM
須要注意的是,儘管ScheduledExecutorService是內部調用了父類ThreadPoolExecutord的構造方法,可是其內部實現的核心入口方法再也不是ThreadPoolExecutor的execute方法,而是ScheduledThreadPoolExecutor中的delayExecute方法
定時任務的實現依賴於延遲隊列DelayedWorkQueue
能夠發現執行定時任務可使用springboot中的@Scheduled註解,也可使用底層的定時任務線程池,實際上本線程池基本不會用,由於實現定時任務有其餘的方案,好比springboot的註解與quartz等等
備註: Quartz 是一個由 java 編寫的任務調度庫,由 OpenSymphony 組織開源出來。在實際項目開發中使用 Quartz 的仍是居多,比較推薦使用 Quartz。由於 Quartz 理論上可以同時對上萬個任務進行調度,擁有豐富的功能特性,包括任務調度、任務持久化、可集羣化、插件等等
一次性的延遲任務 schedule
方法
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
複製代碼
ScheduledThreadPoolExecutor
重寫了execute
與submit
方法,兩個方法內部實際上都是簡單地調用schedule
方法來實現的以上一次任務開始爲基準固定間隔循環執行任務 scheduleAtFixedRate
方法
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
複製代碼
以上一次任務結束爲基準固定間隔循環執行任務 scheduleWithFixedDelay
方法
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
複製代碼
後二者的區別見圖
引出兩個問題:
上邊說過了,定時任務線程池的核心入口就是上邊三種類型的任務方法中都有的一個方法--就是delayedExecute
,可是在說這個關鍵的入口方法以前,不得說下,調用方法前對於提交的任務的包裝,包裝這一塊設計到的類比較多,先用一張類圖大體把握
首先包裝爲ScheduledFutureTask
// 用於包裝schedule(Runnable)提交的任務
// result爲null,ns是納秒爲單位的,要觸發執行任務的系統時間
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
// 包裝scheduleWithFixedDelay和scheduleAtFixedRate提交的任務
// result 爲null
// ns是納秒爲單位的,下一次要觸發執行任務的系統時間
// period是以納秒爲單位的任務循環週期
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
// 包裝schedule(Callable)提交的任務
// ns是納秒爲單位的,要觸發執行任務的系統時間
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
// 關鍵的run方法
public void run() {
// 首先判斷是否是週期性執行的任務
boolean periodic = isPeriodic();
// 判斷當前的線程池可否執行定時任務,若是不能則取消任務
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
// 若是不是週期性任務,也就是一次性的定時任務的話,直接執行提交的任務
ScheduledFutureTask.super.run();
// 若是是週期性執行的任務,首先執行提交的任務,並將任務的狀態重置爲初始化狀態,以備下一次執行
else if (ScheduledFutureTask.super.runAndReset()) {
// 執行完畢後計算下一次執行的時間
setNextRunTime();
// 從新提交當前的任務到延時隊列中,用於下一個週期的執行
reExecutePeriodic(outerTask);
}
}
// 計算下一次要執行任務的時間
// time表示下一次執行任務的時間,period是用來計算time的週期時間
private void setNextRunTime() {
long p = period;
if (p > 0)
// scheduleAtFixedRate
// 在第一次執行完任務後,下一次要執行的時間就是徹底按照週期來執行,無論到底何時執行完的(也就是now),以後的每次執行都是如此
time += p;
else
// scheduleWithFixedDelay
// 第一次執行完任務後,下一次要執行的時間是以當前時間爲基準計算的,也就是上一次完成任務的時間爲基準計算的,以後的每次執行都是如此
time = triggerTime(-p);
}
// 用於在延遲隊列中按照下一次觸發的順序進行排序
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
// 觸發時間一致的,按照提交的順序來
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
// 計算從當前時刻到下次執行任務還有多長時間
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
複製代碼
scheduleWithFixedDelay
與scheduleAtFixedRate
在實現時的區別就在於這次包裝過程當中,前者傳入的週期是unit.toNanos(-delay)
然後者是unit.toNanos(perioid)
setNextRunTime
方法中,詳見方法註釋period
以外還有一個區別就是outerTask
reExecutePeriodic
方法getDelay
方法最主要的應用就是在延時隊列的take
poll
這兩個獲取任務的方法中,起到了控制獲取任務的時間的做用
getDelay
方法來控制獲取任務的時延--這兩個特性是直觀上的延遲任務線程池起做用的關鍵其次包裝爲RunnableScheduleFuture
protected <V> RunnableScheduledFuture<V> decorateTask( Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
protected <V> RunnableScheduledFuture<V> decorateTask( Callable<V> callable, RunnableScheduledFuture<V> task) {
return task;
}
複製代碼
RunnableScheduledFuture
,可是沒有看懂爲何要用這樣的一個方法類型提高定時任務線程池的入口方法delayedExecute
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 1. 判斷線程池是否是shutdown狀態,若是是執行拒絕策略
if (isShutdown())
reject(task);
else {
// 2. 首先就是向DelayedWorkQueue中添加任務
super.getQueue().add(task);
// 3. 無論是通常的線程池仍是執行定時任務的線程池,都會在向隊列中添加完任務後執行re-check
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 4. 若是經過了recheck,執行此方法
// 確保線程池內有線程運行
ensurePrestart();
}
}
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
// 對於Executors建立的線程池來講,核心線程數量爲0,因此會保證有非核心線程執行
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
複製代碼
SHUTDOWN
的話,直接向隊列中添加任務,而沒有直接讓線程去執行任務的場景從addWorker
開始,後續的就是標準的線程池的線程管理與任務獲取的流程了,也就是說定時任務線程池與通常線程池的主要區別在於任務調度部分,而鏈接任務管理與線程管理的通道--延時隊列也須要大體瞭解下
static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
// 任務調度時提交任務的方法就是add方法
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
// 按照排序規則,選擇合適的位置插入到隊列中
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
// 按照排序規則,選擇合適的位置插入到隊列中
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
// 按照RunnableScheduledFuture的time屬性進行排序
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
// getTask中,核心線程取任務(無超時時間)
// 若是當前不能獲取,就阻塞等待
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
// 調用getDelay方法獲得須要延時等待的時間
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
// getTask中,非核心線程取任務或則核心線程獲取任務(容許超時回收)
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null) {
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
if (nanos <= 0)
return null;
first = null; // don't retain ref while waiting
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
}
複製代碼
DelayedWorkQueue
的內部存儲是RunnableScheduledFuture
類型的數組