概述:html
ThreadPoolExecutor -> AbstractExecutorService =>ExecutorService =>Executor.java
ThreadPoolExecutor 解決了兩個不一樣的問題:1,經過減小任務調用開銷來改善執行大量異步任務時的性能問題;2,提供資源約束與管理,包括線程。另外還有一些統計功能,好比已完成任務個數等。android
爲了使用於不一樣的上下文環境,ThreadPoolExecutor提供了不少可調整的參數和鉤子方法, 可是官方仍然推薦編程者使用更爲便捷的 Executors
的工廠方法,這些方法根據不一樣場景進行了配置,能夠知足大部分需求:web
newCachedThreadPool()
(無大小限制線程池,線程自動回收), 編程
newFixedThreadPool(int)
(線程池大小固定) 緩存
以及 newSingleThreadExecutor()
(單個後臺線程). 服務器
當你須要直接使用ThreadPoolExecutor 時, 可參考一下指南:併發
Core and maximum pool sizes異步
ThreadPoolExecutor
能夠根據corePoolSize和maximumPoolSize自動調整線程池的大小(分別經過 getPoolSize() 和
getCorePoolSize()進行查看
) . 當有新的任務提交時會調用execute(Runnable)
, 若是當前正在運行的線程數小於corePoolSize,新的線程會被建立來執行新提交的任務,即便當前有空閒線程。若是當前運行的線程數大於corePoolSize可是小於maximumPoolSize, 同時任務隊列(queue)已經滿了的話,一個新的線程會被建立來執行任務. 設置corePoolSize 和 maximumPoolSize 相等時, 至關於建立了一個固定大小的線程池. 若是設置 maximumPoolSize 爲一個無限值好比Integer.MAX_VALUE
, 那麼該池中將容許有任意數量的併發任務. 通常狀況下 corePoolSize和 maximumPoolSize 在構造時傳入, 不過也能夠經過 setCorePoolSize(int)
和 setMaximumPoolSize(int)來動態設定
.oop
On-demand construction
默認狀況下核心線程(core threads)只在有新任務到達時纔會初始化和啓動, 不過能夠經過覆寫 prestartCoreThread()
或者 prestartAllCoreThreads()方法來改變這種行爲
.
Creating new threads
默認經過 ThreadFactory建立新的線程
. 它們將有相同的 ThreadGroup
,具備 NORM_PRIORITY優先級而且是非守護線程
. 你能夠提供不一樣的ThreadFactory 來改變線程的名字、組別、優先級、守護狀態等. 若是ThreadFactory
建立線程失敗,該executor 將繼續運行,可是不能再執行任何任務.
Keep-alive times
若是當前線程池中線程的數量已經多於corePoolSize, 那麼額外的線程若是處於idle狀態超過期間 keepAlive(查看 getKeepAliveTime(TimeUnit)
)將會被強行終止. 這樣當線程池的使用比較緊張時能夠有效地減小資源消耗.當線程池再次緊張時,新的(額外)線程還會被建立. keepAlive時間能夠經過 setKeepAliveTime(long, TimeUnit)
方法來設置 .使用Long.MAX_VALUE
NANOSECONDS
可使處於idle狀態的線程不被終止,直到ThreadPoolExecutor被關閉 (shut down). 默認狀況下, keep-alive 策略只在當前線程數量多於 corePoolSize 時起做用.可是能夠經過方法 allowCoreThreadTimeOut(boolean)
設置是否將這樣的超時策略同時應用於core Threads, 前提是 keepAliveTime 是非零的.
Queuing
任何 BlockingQueue
均可以用來調度和存儲提交的任務. 任務隊列的使用和線程池大小的改變有依賴關係:
若是當前線程數量小於coreThreadSize, Executor 會優先選擇建立新線程來執行任務,而不是將任務存入隊列.
若是有多於或等於corePoolSize個線程在運行, Executor 會將新的任務請求入隊,而不是建立新的線程.
若是任務請求不能入隊,新線程會被建立,可是要求線程數量不超過 maximumPoolSize, 不然任務請求會被拒絕(rejected).
隊列有三種普遍使用的策略:
直接傳遞(Direct handoffs). 一個不錯的默認選擇是使用同步隊列(SynchronousQueue
), 將任務直接交給線程處理而不是存儲起來. 若是沒有線程及時處理,任務也不能被成功存儲,因此新的線程會被建立. 這種方法在處理一組有互相依賴的任務時避免了繁複的查找. 直接傳遞策略通常要求一個無界的線程池(unbounded maximumPoolSizes ),這樣才能避免提交的任務被拒絕. 這種作法存在一個缺點,當任務到達速度超過線程對任務的處理速度是,線程數量會無限制的增加.
無界隊列(Unbounded queues). 若是當前的corePoolSize個線程都處於忙狀態時,使用無界隊列 (好比 LinkedBlockingQueue
,不預先設置容量) 會把新的任務緩存在隊列裏. 這樣executor裏不會有多於corePoolSize 個線程. ( 這樣maximumPoolSize 就再也不起做用了.) 無界隊列策略適用於任務相互獨立的狀況,各個任務不會影響彼此的執行, 好比 web page 服務器處理頁面請求. 雖然這種方法可使爆發式請求獲得平滑的處理,可是當任務處理速度較慢時,可能形成任務隊列的無限制增加.
有界對類(Bounded queues). 有界隊列(好比 ArrayBlockingQueue
) 是直接傳遞和無界隊列的一個平衡,經過設置一個有限的maximumPoolSizes值,能夠防止資源的過分消耗, 可是這種方式更難調整和控制. 任務隊列大小和最大線程池大小須要互相折中: 使用較大的隊列和較小的線程池能夠減小CPU、系統資源 以及上下文切換開銷, 可是會影響吞吐量. 當任務頻繁阻塞時 (好比等待I/O),這種作法會對系統的調度形成負面影響 . 若是使用較小的任務隊列就要求有較大的線程池, 這樣可使CPU充分利用,可是會引起過分的調度開銷,從而也會映像吞吐量.
Rejected tasks
若是executor已經關閉,或者executor使用了有界的線程池和有界的任務隊列而且都已飽和,那麼經過方法 execute(Runnable)
提交的新任務會被拒絕。 不管哪一種狀況, execute
方法都會調用 RejectedExecutionHandler的
rejectedExecution(Runnable, ThreadPoolExecutor)
方法. RejectedExecutionHandler有四中預設的處理:
默認使用 ThreadPoolExecutor.AbortPolicy
, 當有任務被拒絕時會拋出運行時異常: RejectedExecutionException
.
ThreadPoolExecutor.CallerRunsPolicy
, 這種策略在調用 execute
方法的線程(即提交任務的線程)中運行任務 . 這種策略提供了一個簡單的返回機制來使下降新任務的提交頻率。
ThreadPoolExecutor.DiscardPolicy
, 這種策略只是簡單的丟棄新任務,不作任何反饋處理.
ThreadPoolExecutor.DiscardOldestPolicy
, 若是 executor 還沒有關閉, 任務隊列首部的任務會被丟棄, 而後重試提交任務 (若是再次失敗,還要重複這一過程.)
定義並使用其餘的處理策略也是能夠的. 可是要格外注意特定的工做場景下的隊列容量和隊列管理策略。
Hook methods
ThreadPoolExecutor提供了可覆寫的beforeExecute(Thread, Runnable)
和 afterExecute(Runnable, Throwable)
方法, 這些方法在每次執行任務前、後分別調用,可用來操做執行環境,好比從新初始化線程本地變量(ThreadLocals),收集統計信息或者添加日誌項 . 另外,還能夠覆寫 terminated()
方法在Executor徹底終止前作一些特殊的處理工做.
若是鉤子方法或回調方法拋出異常,內部的工做線程也可能會失敗並異常終止。
Queue maintenance
方法 getQueue()
提供了訪問queue的途徑,從而對queue進行監視和調試,不推薦用於其餘目的. 當有大量入隊的任務被取消時,另外兩個方法, remove(Runnable)
and purge()
可用於存儲資源的回收.
Finalization
線程池再也不被程序引用 而且 沒有任何剩餘的線程在運行時,線程池會被自動關閉 (shutdown
). 若是你想確保未被引用的線程池被回收, 即便用戶忘記調用 shutdown()
, 那麼你必須使未被使用的線程最可以終止, 能夠經過設置適當的 keep-alive 時間, 使用下限爲0的核心線程數而且/或者 設置allowCoreThreadTimeOut(boolean)來實現
.
下面看一下提交一個任務到執行的具體過程:
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //scene 1 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // scene 2 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // scene 3 else if (!addWorker(command, false)) reject(command); }
從註釋裏能夠看到任務被添加時有三種狀況,scene 1 和 scene 3 都容易理解, 咱們看第二種, addWorker(null, false):
addworker, 這裏的firstTask 是null, 可是worker裏的thread仍是啓動了, 爲何?
private boolean addWorker(Runnable firstTask, boolean core) { ... boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { ... workers.add(w); ... workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
Worker也是一個runnable, 這裏的線程啓動運行的是Worker的run方法,裏面又調用了runWorker 方法:
咱們看下 worker的構造:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { ... /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } ... }
下面的runWorker方法中注意while loop, 當前的worker若是有task就執行,不然調用getTask 從workQueue中取一個來執行,執行完會繼續去取, 這裏就體現了「線程池」的概念, 並非一個任務一個線程,而是在線程數量達到限定的數量,同時任務數比較多被放入緩存隊列的時候, 一個線程有可能執行多個任務。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); ... try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { ... } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
看一下getTask方法:
這裏有一個無限的for loop, 直到從workQueue取到一個任務,或者等待超時,或者當前的executor被關閉
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
從上面的過程能夠看出, 經過addWorker(null, false /true)提交的任務雖然是null, 可是仍然會啓動一個線程去workQueue中等待或取得一個任務。