Executor 框架是 juc 裏提供的線程池的實現。前兩天看了下 Executor 框架的一些源碼,作個簡單的總結。java
線程池大概的思路是維護一個的線程池用於執行提交的任務。我理解池的技術的主要意義有兩個:數據庫
1. 資源的控制,如併發量限制。像鏈接池這種是對數據庫資源的保護。編程
2. 資源的有效利用,如線程複用,避免頻繁建立線程和線程上下文切換。併發
那麼想象中設計一個線程池就須要有線程池大小、線程生命週期管理、等待隊列等等功能,下面結合代碼看看原理。框架
Excutor 總體結構以下: this
Executor 接口定義了最基本的 execute 方法,用於接收用戶提交任務。 ExecutorService 定義了線程池終止和建立及提交 futureTask 任務支持的方法。spa
AbstractExecutorService 是抽象類,主要實現了 ExecutorService 和 futureTask 相關的一些任務建立和提交的方法。線程
ThreadPoolExecutor 是最核心的一個類,是線程池的內部實現。線程池的功能都在這裏實現了,平時用的最多的基本就是這個了。其源碼很精練,遠沒當時想象的多。設計
ScheduledThreadPoolExecutor 在 ThreadPoolExecutor 的基礎上提供了支持定時調度的功能。線程任務能夠在必定延時時間後才被觸發執行。代理
1.1 ThreadPoolExecutor內部的幾個重要屬性
1.線程池自己的狀態
volatile int runState;
static final int RUNNING = 0;
static final int SHUTDOWN = 1;
static final int STOP = 2;
static final int TERMINATED = 3;
2.等待任務隊列和工做集
private final BlockingQueue<Runnable> workQueue; //等待被執行的Runnable任務
private final HashSet<Worker> workers = new HashSet<Worker>(); //正在被執行的Worker任務集
3.線程池的主要狀態鎖。線程池內部的狀態變化 ( 如線程大小 ) 都須要基於此鎖。
private final ReentrantLock mainLock = new ReentrantLock();
4.線程的存活時間和大小
private volatile long keepAliveTime;// 線程存活時間
private volatile boolean allowCoreThreadTimeOut;// 是否容許核心線程存活
private volatile int corePoolSize;// 核心池大小
private volatile int maximumPoolSize; // 最大池大小
private volatile int poolSize; //當前池大小
private int largestPoolSize; //最大池大小,區別於maximumPoolSize,是用於記錄線程池曾經達到過的最大併發,理論上小於等於maximumPoolSize。
5.線程工廠和拒絕策略
private volatile RejectedExecutionHandler handler;// 拒絕策略,用於當線程池沒法承載新線程是的處理策略。
private volatile ThreadFactory threadFactory;// 線程工廠,用於在線程池須要新建立線程的時候建立線程
6.線程池完成任務數
private long completedTaskCount;//線程池運行到當前完成的任務數總和
1.2 ThreadPoolExecutor 的內部工做原理
有了以上定義好的數據,下面來看看內部是如何實現的 。 Doug Lea 的整個思路總結起來就是 5 句話:
1. 若是當前池大小 poolSize 小於 corePoolSize ,則建立新線程執行任務。
2. 若是當前池大小 poolSize 大於 corePoolSize ,且等待隊列未滿,則進入等待隊列
3. 若是當前池大小 poolSize 大於 corePoolSize 且小於 maximumPoolSize ,且等待隊列已滿,則建立新線程執行任務。
4. 若是當前池大小 poolSize 大於 corePoolSize 且大於 maximumPoolSize ,且等待隊列已滿,則調用拒絕策略來處理該任務。
5. 線程池裏的每一個線程執行完任務後不會馬上退出,而是會去檢查下等待隊列裏是否還有線程任務須要執行,若是在 keepAliveTime 裏等不到新的任務了,那麼線程就會退出。
下面看看代碼實現 :
線程池最重要的方法是由 Executor 接口定義的 execute 方法 , 是任務提交的入口。
咱們看看 ThreadPoolExecutor.execute(Runnable cmd) 的實現:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
解釋以下:
當提交一個新的 Runnable 任務:
分支1 : 若是當前池大小小於 corePoolSize, 執行 addIfUnderCorePoolSize(command) , 若是線程池處於運行狀態且 poolSize < corePoolSize addIfUnderCorePoolSize(command) 會作以下事情,將 Runnable 任務封裝成 Worker 任務 , 建立新的 Thread ,執行 Worker 任務。若是不知足條件,則返回 false 。代碼以下:
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}
分支2 : 若是大於 corePoolSize 或 1 失敗失敗,則:
若是等待隊列未滿,把 Runnable 任務加入到 workQueue 等待隊列
workQueue .offer(command)
如多等待隊列已經滿了,調用 addIfUnderMaximumPoolSize(command) ,和 addIfUnderCorePoolSize 基本相似,只不過判斷條件是 poolSize < maximumPoolSize 。若是大於 maximumPoolSize ,則把 Runnable 任務交由 RejectedExecutionHandler 來處理。
問題:如何實現線程的複用 ?
Doug Lea 的實現思路是 線程池裏的每一個線程執行完任務後不馬上退出,而是去檢查下等待隊列裏是否還有線程任務須要執行,若是在 keepAliveTime 裏等不到新的任務了,那麼線程就會退出。這個功能的實現 關鍵在於 Worker 。線程池在執行 Runnable 任務的時候,並不單純把 Runnable 任務交給建立一個 Thread 。而是會把 Runnable 任務封裝成 Worker 任務。
下面看看 Worker 的實現:
代碼很簡單,能夠看出, worker 裏面包裝了 firstTask 屬性,在構造worker 的時候傳進來的那個 Runnable 任務就是 firstTask 。 同時也實現了Runnable接口,因此是個代理模式,看看代理增長了哪些功能。 關鍵看 woker 的 run 方法:
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}
能夠看出 worker 的 run 方法是一個循環,第一次循環運行的必然是 firstTask ,在運行完 firstTask 以後,並不會馬上結束,而是會調用 getTask 獲取新的任務( getTask 會從等待隊列裏獲取等待中的任務),若是 keepAliveTime 時間內獲得新任務則繼續執行,得不到新任務則那麼線程纔會退出。這樣就保證了多個任務能夠複用一個線程,而不是每次都建立新任務。 keepAliveTime 的邏輯在哪裏實現的呢?主要是利用了 BlockingQueue 的 poll 方法支持等待。可看 getTask 的代碼段:
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
r = workQueue.take();
ThreadFactory 和 RejectedExecutionHandler是ThreadPoolExecutor的兩個屬性,也 能夠認爲是兩個簡單的擴展點 . ThreadFactory 是建立線程的工廠。
默認的線程工廠會建立一個帶有「 pool-poolNumber-thread-threadNumber 」爲名字的線程,若是咱們有特別的須要,如線程組命名、優先級等,能夠定製本身的 ThreadFactory 。
RejectedExecutionHandler 是拒絕的策略。常見有如下幾種:
AbortPolicy :不執行,會拋出 RejectedExecutionException 異常。
CallerRunsPolicy :由調用者(調用線程池的主線程)執行。
DiscardOldestPolicy :拋棄等待隊列中最老的。
DiscardPolicy: 不作任何處理,即拋棄當前任務。
ScheduleThreadPoolExecutor 是對ThreadPoolExecutor的集成。增長了定時觸發線程任務的功能。須要注意
從內部實現看, ScheduleThreadPoolExecutor 使用的是 corePoolSize 線程和一個無界隊列的固定大小的池,因此調整 maximumPoolSize 沒有效果。無界隊列是一個內部自定義的 DelayedWorkQueue 。
ScheduleThreadPoolExecutor 線程池接收定時任務的方法是 schedule ,看看內部實現:
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
以上代碼會初始化一個 RunnableScheduledFuture 類型的任務 t, 並交給 delayedExecute 方法。 delayedExecute(t) 方法實現以下:
private void delayedExecute(Runnable command) {
if (isShutdown()) {
reject(command);
return;
}
if (getPoolSize() < getCorePoolSize())
prestartCoreThread();
super.getQueue().add(command);
}
若是當前線程池大小 poolSize 小於 CorePoolSize ,則建立一個新的線程,注意這裏建立的線程是空的,不會把任務直接交給線程來作,而是把線程任務放到隊列裏。由於任務是要定時觸發的,因此不能直接交給線程去執行。
問題: 那如何作到定時觸發呢?
關鍵在於DelayedWorkQueue,它代理了 DelayQueue 。能夠認爲 DelayQueue 是這樣一個隊列(具體能夠去看下源碼,不詳細分析):
1. 隊列裏的元素按照任務的 delay 時間長短升序排序, delay 時間短的在隊頭, delay 時間長的在隊尾。
2. 從 DelayQueue 裏 FIFO 的獲取一個元素的時候,不會直接返回 head 。可能會阻塞,等到 head 節點到達 delay 時間後才能被獲取。能夠看下 DelayQueue 的 take 方法實現:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
available.await();
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
long tl = available.awaitNanos(delay);//等待delay時間
} else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll(); // wake up other takers
return x;
}
}
}
} finally {
lock.unlock();
}
}
經過以上的詳解基本上可以定製出本身須要的策略了,下面簡單介紹下Executors裏面提供的一些常見線程池策略:
1.FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
實際上就是個不支持keepalivetime,且corePoolSize和maximumPoolSize相等的線程池。
2.SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
實際上就是個不支持keepalivetime,且corePoolSize和maximumPoolSize都等1的線程池。
3.CachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
實際上就是個支持keepalivetime時間是60秒(線程空閒存活時間),且corePoolSize爲0,maximumPoolSize無窮大的線程池。
4.SingleThreadScheduledExecutor
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
其實是個corePoolSize爲1的ScheduledExecutor。上文說過ScheduledExecutor採用無界等待隊列,因此maximumPoolSize沒有做用。
5.ScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
其實是corePoolSize課設定的ScheduledExecutor。上文說過ScheduledExecutor採用無界等待隊列,因此maximumPoolSize沒有做用。
以上還不必定知足你的須要,徹底能夠根據本身須要去定製。
<線程相關 先暫時不整理 等慢慢理解 由於這些比較難因此慢點 參考數據 java併發編程實戰>