[toc]java
線程池就是有N個子線程共同在運行的線程組合。面試
舉個容易理解的例子:有個線程組合(即線程池,咱能夠比喻爲一個公司),裏面有3個子線程(看成3個員工吧),待命幹活。
只要客戶告訴他一個任務(好比搬磚),公司就會挑一個員工來作;數據庫
若是不少客戶都找,3個忙不過來,那公司能夠再僱2我的,但本公司運營能力有限,辦公室也不大,最多就僱傭5我的,若是還忙不過來,那這些送來的任務就排隊了。一件一件作完。編程
java.uitl.concurrent.ThreadPoolExecutor
類是線程池中最核心的一個類,所以若是要透徹地瞭解Java中的線程池,必須先了解這個類。下面咱們來看一下ThreadPoolExecutor類的具體實現源碼:緩存
在ThreadPoolExecutor類中提供了四個構造方法:服務器
public class ThreadPoolExecutor extends AbstractExecutorService { ..... public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler); public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler); }
從上面的代碼能夠得知,ThreadPoolExecutor繼承了AbstractExecutorService類,並提供了四個構造器,事實上,經過觀察每一個構造器的源碼具體實現,發現前面三個構造器都是調用的第四個構造器進行的初始化工做。多線程
corePoolSize
線程池維護線程的最少數量。併發
須要注意的是在初建立線程池時線程不會當即啓動,直到有任務提交纔開始啓動線程並逐漸時線程數目達到corePoolSize
。若想一開始就建立全部核心線程需調用prestartAllCoreThreads
方法。
maximumPoolSize
-池中容許的最大線程數。框架
須要注意的是當核心線程滿且阻塞隊列也滿時纔會判斷當前線程數是否小於最大線程數,並決定是否建立新線程。
keepAliveTime
- 線程池維護線程所容許的空閒時間異步
當線程數大於核心時,多於的空閒線程最多存活時間
默認狀況下,只有 當線程池中的線程數大於corePoolSize時,keepAliveTime纔會起做用,直到線程池中的線程數不大於corePoolSize,即當線程池中的線程數大於corePoolSize時,若是一個線程空閒的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。可是若是調用了allowCoreThreadTimeOut(boolean)
方法,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起做用,直到線程池中的線程數爲0
unit
- keepAliveTime 參數的時間單位,有7種取值。
TimeUnit.DAYS; //天 TimeUnit.HOURS; //小時 TimeUnit.MINUTES; //分鐘 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //納秒
workQueue
- 當線程數目超過核心線程數時用於保存任務的隊列。主要有3種類型的BlockingQueue
可供選擇:有界隊列,無界隊列和同步移交。
ArrayBlockingQueue; //有界隊列 LinkedBlockingQueue; //無界隊列 SynchronousQueue; //同步移交 PriorityBlockingQueue; //一個具備優先級得無限阻塞隊列。
threadFactory
- 執行程序建立新線程時使用的工廠。
handler
- 阻塞隊列已滿且線程數達到最大值時所採起的飽和策略。java默認提供了4種飽和策略的實現方式:停止、拋棄、拋棄最舊的、調用者運行。
ThreadPoolExecutor.AbortPolicy(); 拋出java.util.concurrent.RejectedExecutionException異常 ThreadPoolExecutor.CallerRunsPolicy(); 重試添加當前的任務,他會自動重複調用execute()方法 ThreadPoolExecutor.DiscardOldestPolicy(); 拋棄舊的任務 ThreadPoolExecutor.DiscardPolicy(); 拋棄當前的任務 固然也能夠根據應用場景須要來實現`RejectedExecutionHandler`接口自定義策略。如記錄日誌或持久化不能處理的任務。
從上面給出的ThreadPoolExecutor
類的代碼能夠知道,ThreadPoolExecutor
繼承了AbstractExecutorService
,咱們來看一下AbstractExecutorService
的實現:
public abstract class AbstractExecutorService implements ExecutorService { protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { }; protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { }; public Future<?> submit(Runnable task) {}; public <T> Future<T> submit(Runnable task, T result) { }; public <T> Future<T> submit(Callable<T> task) { }; private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { }; public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { }; public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { }; public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { }; public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { }; }
AbstractExecutorService
是一個抽象類,它實現了ExecutorService
接口。
咱們接着看ExecutorService
接口的實現:
public interface ExecutorService extends Executor { void shutdown(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException; <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException; }
而ExecutorService
又是繼承了Executor
接口,咱們看一下Executor
接口的實現:
public interface Executor { void execute(Runnable command); }
到這裏,你們應該明白了ThreadPoolExecutor
、AbstractExecutorService
、ExecutorService
和Executor
幾個之間的關係了。
Executor
是一個頂層接口,在它裏面只聲明瞭一個方法execute(Runnable)
,返回值爲void,參數爲Runnable類型,從字面意思能夠理解,就是用來執行傳進去的任務的;
而後ExecutorService
接口繼承了Executor
接口,並聲明瞭一些方法:submit、invokeAll、invokeAny以及shutDown等;
抽象類AbstractExecutorService
實現了ExecutorService
接口,基本實現了ExecutorService中聲明的全部方法;
在ThreadPoolExecutor類中有幾個很是重要的方法:
execute() submit() shutdown() shutdownNow()
execute()方法其實是Executor中聲明的方法,在ThreadPoolExecutor進行了具體的實現,這個方法是ThreadPoolExecutor的核心方法,經過這個方法能夠向線程池提交一個任務,交由線程池去執行。
submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經有了具體的實現,在ThreadPoolExecutor中並無對其進行重寫,這個方法也是用來向線程池提交任務的,可是它和execute()方法不一樣,它可以返回任務執行的結果,去看submit()方法的實現,會發現它實際上仍是調用的execute()方法,只不過它利用了Future來獲取任務執行結果。
shutdown()和shutdownNow()是用來關閉線程池的。
還有不少其餘的方法好比:getQueue() 、getPoolSize()、getActiveCount()、getCompletedTaskCount()等獲取與線程池相關屬性的方法,自行查閱API。
線程池的主要工做流程以下圖:
從上圖咱們能夠看出,當提交一個新任務到線程池時,線程池的處理流程以下:
線程池執行任務的方法以下:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //若是線程數小於基本線程數,則建立線程並執行當前任務 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { //如線程數大於等於基本線程數或線程建立失敗,則將當前任務放到工做隊列中。 if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } //若是線程池不處於運行中或任務沒法放入隊列,而且當前線程數量小於最大容許的線程數量,則建立一個線程執行任務。 else if (!addIfUnderMaximumPoolSize(command)) //拋出RejectedExecutionException異常 reject(command); // is shutdown or saturated } }
工做線程。線程池建立線程時,會將線程封裝成工做線程Worker,Worker在執行完任務後,還會無限循環獲取工做隊列裏的任務來執行。咱們能夠從Worker的run方法裏看到這點:
public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); } }
要想合理的配置線程池,就必須首先分析任務特性,能夠從如下幾個角度來進行分析:
CPU密集型任務 配置儘量少的線程數量,如配置Ncpu+1
個線程的線程池。
IO密集型任務 則因爲須要等待IO操做,線程並非一直在執行任務,則配置儘量多的線程,如2*Ncpu
。
混合型的任務 若是能夠拆分,則將其拆分紅一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐率要高於串行執行的吞吐率,若是這兩個任務執行時間相差太大,則不必進行分解。
咱們能夠經過Runtime.getRuntime().availableProcessors()
方法得到當前設備的CPU個數。
優先級不一樣的任務可使用優先級隊列PriorityBlockingQueue
來處理。它可讓優先級高的任務先獲得執行,須要注意的是若是一直有優先級高的任務提交到隊列裏,那麼優先級低的任務可能永遠不能執行。
執行時間不一樣的任務能夠交給不一樣規模的線程池來處理,或者也可使用優先級隊列,讓執行時間短的任務先執行。
依賴數據庫鏈接池的任務,由於線程提交SQL後須要等待數據庫返回結果,若是等待的時間越長CPU空閒時間就越長,那麼線程數應該設置越大,這樣才能更好的利用CPU。
建議使用有界隊列,有界隊列能增長系統的穩定性和預警能力,能夠根據須要設大一點,好比幾千。
別人的例子:
有一次咱們組使用的後臺任務線程池的隊列和線程池全滿了,不斷的拋出拋棄任務的異常,經過排查發現是數據庫出現了問題,致使執行SQL變得很是緩慢,由於後臺任務線程池裏的任務全是須要向數據庫查詢和插入數據的,因此致使線程池裏的工做線程所有阻塞住,任務積壓在線程池裏。若是當時咱們設置成無界隊列,線程池的隊列就會愈來愈多,有可能會撐滿內存,致使整個系統不可用,而不僅是後臺任務出現問題。固然咱們的系統全部的任務是用的單獨的服務器部署的,而咱們使用不一樣規模的線程池跑不一樣類型的任務,可是出現這樣問題時也會影響到其餘任務。
經過線程池提供的參數進行監控。線程池裏有一些屬性在監控線程池的時候可使用
--
taskCount
:線程池須要執行的任務數量。
completedTaskCount
:線程池在運行過程當中已完成的任務數量。小於或等於taskCount。
largestPoolSize
:線程池曾經建立過的最大線程數量。經過這個數據能夠知道線程池是否滿過。如等於線程池的最大大小,則表示線程池曾經滿了。
getPoolSize
:線程池的線程數量。若是線程池不銷燬的話,池裏的線程不會自動銷燬,因此這個大小隻增不減。
getActiveCount
:獲取活動的線程數。
經過擴展線程池進行監控。經過繼承線程池並重寫線程池的beforeExecute,afterExecute,terminated
方法,咱們能夠在任務執行前,執行後和線程池關閉前幹一些事情。
如監控任務的平均執行時間,最大執行時間和最小執行時間等。這幾個方法在線程池裏是空方法。如:
protected void beforeExecute(Thread t, Runnable r) { }
什麼是 Executor 框架 ? (面試題)
Executor框架在Java 5中被引入,Executor 框架是一個根據一組執行策略調用、調度、執行和控制的異步任務的框架。
Executor 框架包括:線程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable
等。
不過在java doc中,並不提倡咱們直接使用ThreadPoolExecutor
,而是使用Executors
類中提供的幾個靜態方法來建立四種線程池:
注意在全新的阿里編程規約裏面不推薦使用
Executors
提供的靜態方法建立線程。
newCachedThreadPool
是一個可根據須要建立新線程的線程池,建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程,若無可回收,則新建線程。。ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); cachedThreadPool.execute(new Runnable() { public void run() { System.out.println("runing....."); } }); }
--
newSingleThreadExecutor
建立是一個單線程池,也就是該線程池只有一個線程在工做,全部的任務是串行執行的。若是這個惟一的線程由於異常結束,那麼會有一個新的線程來替代它,此線程池保證全部任務的執行順序按照任務的提交順序執行。ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();); singleThreadExecutor.execute(new Runnable() { public void run() { System.out.println("runing....."); } }); }
--
newFixedThreadPool
建立固定大小的線程池,每次提交一個任務就建立一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,若是某個線程由於執行異常而結束,那麼線程池會補充一個新線程。定長線程池的大小最好根據系統資源進行設置。如Runtime.getRuntime().availableProcessors()
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3); fixedThreadPool .execute(new Runnable() { public void run() { System.out.println("runing....."); } }); }
--
newScheduledThreadPool
建立一個定長線程池,支持定時及週期性任務執行。ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); scheduledThreadPool.schedule(new Runnable() { public void run() { System.out.println("runing....."); } }, 3, TimeUnit.SECONDS); // 表示延遲3秒執行。 }
雖然線程池是構建多線程應用程序的強大機制,但使用它並非沒有風險的。
用線程池構建的應用程序容易遭受任何其它多線程應用程序容易遭受的全部併發風險,諸如同步錯誤和死鎖,它還容易遭受特定於線程池的少數其它風險,諸如與池有關的死鎖、資源不足和線程泄漏。
任何多線程應用程序都有死鎖風險。當一組進程或線程中的每個都在等待一個只有該組中另外一個進程才能引發的事件時,咱們就說這組進程或線程 死鎖了。
死鎖的最簡單情形是:線程 A 持有對象 X 的獨佔鎖,而且在等待對象 Y 的鎖,而線程 B 持有對象 Y 的獨佔鎖,卻在等待對象 X 的鎖。除非有某種方法來打破對鎖的等待(Java 鎖定不支持這種方法),不然死鎖的線程將永遠等下去。
雖然任何多線程程序中都有死鎖的風險,但線程池卻引入了另外一種死鎖可能,在那種狀況下,全部池線程都在執行已阻塞的等待隊列中另外一任務的執行結果的任務,但這一任務卻由於沒有未被佔用的線程而不能運行。
當線程池被用來實現涉及許多交互對象的模擬,被模擬的對象能夠相互發送查詢,這些查詢接下來做爲排隊的任務執行,查詢對象又同步等待着響應時,會發生這種狀況。
線程池的一個優勢在於:相對於其它替代調度機制言,它們一般執行得很好。但只有恰當地調整了線程池大小時纔是這樣的。線程消耗包括內存和其它系統資源在內的大量資源。除了 Thread 對象所需的內存以外,每一個線程都須要兩個可能很大的執行調用堆棧。除此之外,JVM 可能會爲每一個 Java 線程建立一個本機線程,這些本機線程將消耗額外的系統資源。最後,雖然線程之間切換的調度開銷很小,但若是有不少線程,環境切換也可能嚴重地影響程序的性能。
若是線程池太大,那麼被那些線程消耗的資源可能嚴重地影響系統性能。在線程之間進行切換將會浪費時間,並且使用超出比您實際須要的線程可能會引發資源匱乏問題,由於池線程正在消耗一些資源,而這些資源可能會被其它任務更有效地利用。除了線程自身所使用的資源之外,服務請求時所作的工做可能須要其它資源,例如 JDBC 鏈接、套接字或文件。
這些也都是有限資源,有太多的併發請求也可能引發失效,例如不能分配 JDBC 鏈接。
線程池和其它排隊機制依靠使用 wait()
和 notify()
方法,這兩個方法都難於使用。若是編碼不正確,那麼可能丟失通知,致使線程保持空閒狀態,儘管隊列中有工做要處理。使用這些方法時,必須格外當心。而最好使用現有的、已經知道能工做的實現,例如 util.concurrent 包。
各類類型的線程池中一個嚴重的風險是線程泄漏,當從池中除去一個線程以執行一項任務,而在任務完成後該線程卻沒有返回池時,會發生這種狀況。發生線程泄漏的一種情形出如今任務拋出一個 RuntimeException
或一個 Error
時。若是池類沒有捕捉到它們,那麼線程只會退出而線程池的大小將會永久減小一個。當這種狀況發生的次數足夠多時,線程池最終就爲空,並且系統將中止,由於沒有可用的線程來處理任務。
有些任務可能會永遠等待某些資源或來自用戶的輸入,而這些資源又不能保證變得可用,用戶可能也已經回家了,諸如此類的任務會永久中止,而這些中止的任務也會引發和線程泄漏一樣的問題。若是某個線程被這樣一個任務永久地消耗着,那麼它實際上就被從池除去了。對於這樣的任務,應該要麼只給予它們本身的線程,要麼只讓它們等待有限的時間。
僅僅是請求就壓垮了服務器,這種狀況是可能的。在這種情形下,咱們可能不想將每一個到來的請求都排隊到咱們的工做隊列,由於排在隊列中等待執行的任務可能會消耗太多的系統資源並引發資源缺少。在這種情形下決定如何作取決於您本身;在某些狀況下,您能夠簡單地拋棄請求,依靠更高級別的協議稍後重試請求,您也能夠用一個指出服務器暫時很忙的響應來拒絕請求。
重複看一下新任務進入時線程池的執行策略:
corePoolSize
,則 Executor
始終首選添加新的線程,而不進行排隊。 corePoolSize
,則 Executor
始終首選將請求加入隊列,而不添加新的線程。maximumPoolSize
,在這種狀況下,任務將被拒絕。主要有3種類型的BlockingQueue
:
隊列大小無限制,經常使用的爲無界的LinkedBlockingQueue
,將致使在全部 corePoolSize
線程都忙時新任務在隊列中等待。這樣,建立的線程就不會超過 corePoolSize
。
應用場景:當每一個任務徹底獨立於其餘任務,即任務執行互不影響時,適合於使用無界隊列。
例如,在 Web 頁服務器中。這種排隊可用於處理瞬態突發請求,當命令以超過隊列所能處理的平均數連續到達時,此策略容許無界線程具備增加的可能性。
經常使用的有兩類,一類是遵循FIFO原則的隊列如ArrayBlockingQueue與有界的LinkedBlockingQueue
,另外一類是優先級隊列如PriorityBlockingQueue
。PriorityBlockingQueue中的優先級由任務的Comparator決定。
使用有界隊列時隊列大小需和線程池大小互相配合,線程池較小有界隊列較大時可減小內存消耗,下降cpu使用率和上下文切換,可是可能會限制系統吞吐量。
當使用有限的 maximumPoolSizes
時,有界隊列(如 ArrayBlockingQueue
)有助於防止資源耗盡,可是可能較難調整和控制。隊列大小和最大池大小可能須要相互折衷,使用大型隊列和小型池能夠最大限度地下降 CPU 使用率、操做系統資源和上下文切換開銷,可是可能致使人工下降吞吐量。
若是任務頻繁阻塞(例如,若是它們是 I/O 邊界),則系統可能爲超過您許可的更多線程安排時間。使用小型隊列一般要求較大的池大小,CPU 使用率較高,可是可能遇到不可接受的調度開銷,這樣也會下降吞吐量。
(直接提交) 若是不但願任務在隊列中等待而是但願將任務直接移交給工做線程,可以使用SynchronousQueue
做爲等待隊列。SynchronousQueue
不是一個真正的隊列,而是一種線程之間移交的機制。要將一個元素放入SynchronousQueue
中,必須有另外一個線程正在等待接收這個元素。只有在使用無界線程池或者有飽和策略時才建議使用該隊列。
工做隊列的默認選項是 SynchronousQueue
,此策略能夠 避免在處理可能具備內部依賴性的請求集時出現鎖。
該Queue自己的特性,在某次添加元素後必須等待其餘線程取走後才能繼續添加。
JDK主要提供了4種飽和策略供選擇。4種策略都作爲靜態內部類在ThreadPoolExcutor
中進行實現。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }
使用該策略時在飽和時會拋出RejectedExecutionException(繼承自RuntimeException)
,調用者可捕獲該異常自行處理。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }
如代碼所示,不作任何處理直接拋棄任務
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }
如代碼,先將阻塞隊列中的頭元素出隊拋棄,再嘗試提交任務。若是此時阻塞隊列使用PriorityBlockingQueue
優先級隊列,將會致使優先級最高的任務被拋棄,所以不建議將該種策略配合優先級隊列使用。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }
既不拋棄任務也不拋出異常,直接運行任務的run方法,換言之將任務回退給調用者來直接運行。使用該策略時線程池飽和後將由調用線程池的主線程本身來執行任務,所以在執行任務的這段時間裏主線程沒法再提交新任務,從而使線程池中工做線程有時間將正在處理的任務處理完成。