諸如web服務器、數據庫服務器、文件服務器和郵件服務器等許多服務器應用都面向處理來自某些遠程來源的大量短小的任務。構建服務器應用程序的一個過於簡單的模型是:每當一個請求到達就建立一個新的服務對象,而後在新的服務對象中爲請求服務。但當有大量請求併發訪問時,服務器不斷的建立和銷燬對象的開銷很大。因此提升服務器效率的一個手段就是儘量減小建立和銷燬對象的次數,特別是一些很耗資源的對象建立和銷燬,這樣就引入了「池」的概念,「池」的概念使得人們能夠定製必定量的資源,而後對這些資源進行復用,而不是頻繁的建立和銷燬。java
線程池是預先建立線程的一種技術。線程池在尚未任務到來以前,建立必定數量的線程,放入空閒隊列中。這些線程都是處於睡眠狀態,即均爲啓動,不消耗CPU,而只是佔用較小的內存空間。當請求到來以後,緩衝池給此次請求分配一個空閒線程,把請求傳入此線程中運行,進行處理。當預先建立的線程都處於運行狀態,即預製線程不夠,線程池能夠自由建立必定數量的新線程,用於處理更多的請求。當系統比較閒的時候,也能夠經過移除一部分一直處於停用狀態的線程。web
在面向對象編程中,建立和銷燬對象是很費時間的,由於建立一個對象要獲取內存資源或者其它更多資源。在Java中更是如此,虛擬機將試圖跟蹤每個對象,以便可以在對象銷燬後進行垃圾回收。redis
因此提升服務程序效率的一個手段就是儘量減小建立和銷燬對象的次數,特別是一些很耗資源的對象建立和銷燬。如何利用已有對象來服務就是一個須要解決的關鍵問題,其實這就是一些」池化資源」技術產生的緣由。算法
在開發程序的過程當中,不少時候咱們會遇到遇到批量執行任務的場景,當各個具體任務之間互相獨立並不依賴其餘任務的時候,咱們會考慮使用併發的方式,將各個任務分散到不一樣的線程中進行執行來提升任務的執行效率。數據庫
咱們會想到爲每一個任務都分配一個線程,可是這樣的作法存在很大的問題:編程
一、資源消耗:首先當任務數量龐大的時候,大量線程會佔據大量的系統資源,特別是內存,當線程數量大於CPU可用數量時,空閒線程會浪費形成內存的浪費,並加大GC的壓力,大量的線程甚至會直接致使程序的內存溢出,並且大量線程在競爭CPU的時候會帶來額外的性能開銷。若是CPU已經足夠忙碌,再多的線程不只不會提升性能,反而會下降性能。緩存
二、線程生命週期的開銷:線程的建立和銷燬都是有代價的,線程的建立須要時間、延遲處理的請求、須要JVM和操做系統提供一些輔助操做。若是請求特別龐大,而且任務的執行特別輕量級(好比只是計算1+1),那麼對比下來建立和銷燬線程代價就太昂貴了。服務器
三、穩定性:如資源消耗中所說,若是程序由於大量的線程拋出OutOfMemoryEorror,會致使程序極大的不穩定。網絡
既然爲每一個任務分配一個線程的作法已經不可行,咱們考慮的代替方法中就必須考慮到,一、線程不能不能無限制建立,數量必須有一個合適的上限。二、線程的建立開銷昂貴,那咱們能夠考慮重用這些線程。理所固然,池化技術是一項比較容易想到的替代方案(馬後炮),線程的池化管理就叫線程池。多線程
多線程技術主要解決處理器單元內多個線程執行的問題,它能夠顯著減小處理器單元的閒置時間,增長處理器單元的吞吐能力。
假設一個服務器完成一項任務所需時間爲:T1 建立線程時間,T2 在線程中執行任務的時間,T3 銷燬線程時間。
若是:T1 + T3 遠大於 T2,則能夠採用線程池,以提升服務器性能。
一個線程池包括如下四個基本組成部分:
一、線程池管理器(ThreadPool):用於建立並管理線程池,包括建立線程池,銷燬線程池,添加新任務;
二、工做線程(PoolWorker):線程池中線程,在沒有任務時處於等待狀態,能夠循環的執行任務;
三、任務接口(Task):每一個任務必須實現的接口,以供工做線程調度任務的執行,它主要規定了任務的入口,任務執行完後的收尾工做,任務的執行狀態等;
四、任務隊列(taskQueue):用於存放沒有處理的任務。提供一種緩衝機制。
線程池技術正是關注如何縮短或調整T1,T3時間的技術,從而提升服務器程序性能的。它把T1,T3分別安排在服務器程序的啓動和結束的時間段或者一些空閒的時間段,這樣在服務器程序處理客戶請求時,不會有T1,T3的開銷了。
線程池不只調整T1,T3產生的時間段,並且它還顯著減小了建立線程的數目,看一個例子:
假設一個服務器一天要處理50000個請求,而且每一個請求須要一個單獨的線程完成。在線程池中,線程數通常是固定的,因此產生線程總數不會超過線程池中線程的數目,而若是服務器不利用線程池來處理這些請求則線程總數爲50000。通常線程池大小是遠小於50000。因此利用線程池的服務器程序不會爲了建立50000而在處理請求時浪費時間,從而提升效率。
代碼實現中並無實現任務接口,而是把Runnable對象加入到線程池管理器(ThreadPool),而後剩下的事情就由線程池管理器(ThreadPool)來完成了。
一、減小了建立和銷燬線程的次數,每一個工做線程均可以被重複利用,可執行多個任務。
二、能夠根據系統的承受能力,調整線程池中工做線程的數目,防止由於消耗過多的內存而把服務器累趴下(每一個線程須要大於1MB內存,線程開的越多,消耗的內存也就越大,最後死機)。
Java裏面線程池的頂級接口是Executor,可是嚴格意義上講Executor並非一個線程池,而只是一個執行線程的工具。真正的線程池接口是ExecutorService。
線程池做用就是限制系統中執行線程的數量。
根據系統的環境狀況,能夠自動或手動設置線程數量,達到運行的最佳效果;少了浪費了系統資源,多了形成系統擁擠效率不高。用線程池控制線程數量,其餘線程排隊等候。一個任務執行完畢,再從隊列的中取最前面的任務開始執行。若隊列中沒有等待進程,線程池的這一資源處於等待。當一個新任務須要運行時,若是線程池中有等待的工做線程,就能夠開始運行了;不然進入等待隊列。
提交一個任務到線程池中,線程池的處理流程以下:
一、判斷線程池裏的核心線程是否都在執行任務,若是不是(核心線程空閒或者還有核心線程沒有被建立)則建立一個新的工做線程來執行任務。若是核心線程都在執行任務,則進入下個流程。
二、線程池判斷工做隊列是否已滿,若是工做隊列沒有滿,則將新提交的任務存儲在這個工做隊列裏。若是工做隊列滿了,則進入下個流程。
三、判斷線程池裏的線程是否都處於工做狀態,若是沒有,則建立一個新的工做線程來執行任務。若是已經滿了,則交給飽和策略來處理這個任務。
一、異步處理日誌,這個是比較場景的採用線程池來解決的
二、定時任務,定時對數據庫備份、定時更新redis配置等,定時發送郵件
三、數據遷移
這些常見的一些場景咱們就應該優先來考慮線程池來解決
線程池本質的概念就是一堆線程一塊兒完成一件事情。
從內部實現上看,線程池技術可主要劃分爲以下6個要點實現:
工做者線程worker:即線程池中能夠重複利用起來執行任務的線程,一個worker的生命週期內會不停的處理多個業務job。線程池「複用」的本質就是複用一個worker去處理多個job,「流控「的本質就是經過對worker數量的控制實現併發數的控制。經過設置不一樣的參數來控制 worker的數量能夠實現線程池的容量伸縮從而實現複雜的業務需求。
待處理工做job的存儲隊列:工做者線程workers的數量是有限的,同一時間最多隻能處理最多workers數量個job。對於來不及處理的job須要保存到等待隊列裏,空閒的工做者work會不停的讀取空閒隊列裏的job進行處理。基於不一樣的隊列實現,能夠擴展出多種功能的線程池,如定製隊列出隊順序實現帶處理優先級的線程池、定製隊列爲阻塞有界隊列實現可阻塞能力的線程池等。流控一方面經過控制worker數控制併發數和處理能力,一方面可基於隊列控制線程池處理能力的上限。
線程池初始化:即線程池參數的設定和多個工做者workers的初始化。一般有一開始就初始化指定數量的workers或者有請求時逐步初始化工做者兩種方式。前者線程池啓動初期響應會比較快但形成了空載時的少許性能浪費,後者是基於請求量靈活擴容但犧牲了線程池啓動初期性能達不到最優。
處理業務job算法:業務給線程池添加任務job時線程池的處理算法。有的線程池基於算法識別直接處理job仍是增長工做者數處理job或者放入待處理隊列,也有的線程池會直接將job放入待處理隊列,等待工做者worker去取出執行。
workers的增減算法:業務線程數不是持久不變的,有高低峯期。線程池要有本身的算法根據業務請求頻率高低調節自身工做者workers的 數量來調節線程池大小,從而實現業務高峯期增長工做者數量提升響應速度,而業務低峯期減小工做者數來節省服務器資源。增長算法一般基於幾個維度進行:待處 理工做job數、線程池定義的最大最小工做者數、工做者閒置時間。
線程池終止邏輯:應用中止時線程池要有自身的中止邏輯,保證全部job都獲得執行或者拋棄。
1.重用線程池中的線程,減小因對象建立,銷燬所帶來的性能開銷;
2.能有效的控制線程的最大併發數,提升系統資源利用率,同時避免過多的資源競爭,避免堵塞;
3.可以多線程進行簡單的管理,使線程的使用簡單、高效。
4.減小頻繁的建立和銷燬線程(因爲線程建立和銷燬都會耗用必定的內存)
5.線程池也是多線程,充分利用CPU,提升系統的效率
6.線程是稀缺資源,使用線程池能夠減小建立和銷燬線程的次數,每一個工做線程均可以重複使用。
7.能夠根據系統的承受能力,調整線程池中工做線程的數量,防止由於消耗過多內存致使服務器崩潰。
8.線程複用
9.控制最大併發數
10.管理線程
線程池剛建立時,裏面沒有一個線程。任務隊列是做爲參數傳進來的。不過,就算隊列裏面有任務,線程池也不會立刻執行它們。
當調用 execute() 方法添加一個任務時,線程池會作以下判斷:
若是正在運行的線程數量小於 corePoolSize,那麼立刻建立線程運行這個任務;
若是正在運行的線程數量大於或等於 corePoolSize,那麼將這個任務放入隊列;
若是這時候隊列滿了,並且正在運行的線程數量小於 maximumPoolSize,那麼仍是要建立非核心線程馬上運行這個任務;
若是隊列滿了,並且正在運行的線程數量大於或等於 maximumPoolSize,那麼線程池會拋出異常RejectExecutionException。
當一個線程完成任務時,它會從隊列中取下一個任務來執行。
當一個線程無事可作,超過必定的時間(keepAliveTime)時,線程池會判斷,若是當前運行的線程數大於 corePoolSize,那麼這個線程就被停掉。因此線程池的全部任務完成後,它最終會收縮到 corePoolSize 的大小。
調用ThreadPoolExecutor的execute提交線程,首先檢查CorePool,若是CorePool內的線程小於CorePoolSize,新建立線程執行任務。
若是當前CorePool內的線程大於等於CorePoolSize,那麼將線程加入到BlockingQueue。
若是不能加入BlockingQueue,在小於MaxPoolSize的狀況下建立線程執行任務。
若是線程數大於等於MaxPoolSize,那麼執行拒絕策略。
通常須要根據任務的類型來配置線程池大小:
若是是CPU密集型任務,就須要儘可能壓榨CPU,參考值能夠設爲 NCPU+1;
若是是IO密集型任務,參考值能夠設置爲2*NCPU。
固然,這只是一個參考值,具體的設置還須要根據實際狀況進行調整,好比能夠先將線程池大小設置爲參考值,再觀察任務運行狀況和系統負載、資源利用率來進行適當調整。
雖然線程池是構建多線程應用程序的強大機制,但使用它並非沒有風險的。在使用線程池時需注意線程池大小與性能的關係,注意併發風險、死鎖、資源不足和線程泄漏等問題。
(1)線程池大小。多線程應用並不是線程越多越好,須要根據系統運行的軟硬件環境以及應用自己的特色決定線程池的大小。通常來講,若是代碼結構合理的話,線程數目與CPU 數量相適合便可。若是線程運行時可能出現阻塞現象,可相應增長池的大小;若有必要可採用自適應算法來動態調整線程池的大小,以提升CPU 的有效利用率和系統的總體性能。
(2)併發錯誤。多線程應用要特別注意併發錯誤,要從邏輯上保證程序的正確性,注意避免死鎖現象的發生。
(3)線程泄漏。這是線程池應用中一個嚴重的問題,當任務執行完畢而線程沒能返回池中就會發生線程泄漏現象。
答案是否認的,好比Redis就是單線程的,但它卻很是高效,基本操做都能達到十萬量級/s。從線程這個角度來看,部分緣由在於:
多線程帶來線程上下文切換開銷,單線程就沒有這種開銷
鎖
固然「Redis很快」更本質的緣由在於:Redis基本都是內存操做,這種狀況下單線程能夠很高效地利用CPU。而多線程適用場景通常是:存在至關比例的IO和網絡操做。
因此即便有上面的簡單估算方法,也許看似合理,但實際上也未必合理,都須要結合系統真實狀況(好比是IO密集型或者是CPU密集型或者是純內存操做)和硬件環境(CPU、內存、硬盤讀寫速度、網絡情況等)來不斷嘗試達到一個符合實際的合理估算值。
線程數量要點:
若是運行線程的數量少於核心線程數量,則建立新的線程處理請求
若是運行線程的數量大於核心線程數量,小於最大線程數量,則當隊列滿的時候才建立新的線程
若是核心線程數量等於最大線程數量,那麼將建立固定大小的鏈接池
若是設置了最大線程數量爲無窮,那麼容許線程池適合任意的併發數量
線程空閒時間要點:
當前線程數大於核心線程數,若是空閒時間已經超過了,那該線程會銷燬。
排隊策略要點:
同步移交:不會放到隊列中,而是等待線程執行它。若是當前線程沒有執行,極可能會新開一個線程執行。
無界限策略:若是核心線程都在工做,該線程會放到隊列中。因此線程數不會超過核心線程數
有界限策略:能夠避免資源耗盡,可是必定程度上減低了吞吐量
當線程關閉或者線程數量滿了和隊列飽和了,就有拒絕任務的狀況了:
拒絕任務策略:
直接拋出異常
使用調用者的線程來處理
直接丟掉這個任務
丟掉最老的任務
在線程的生命週期中,它要通過新建(New)、就緒(Runnable)、運行(Running)、阻塞(Blocked)和死亡(Dead)5種狀態。
Thread經過new來新建一個線程,這個過程是是初始化一些線程信息,如線程名,id,線程所屬group等,能夠認爲只是個普通的對象。調用Thread的start()後Java虛擬機會爲其建立方法調用棧和程序計數器,同時將hasBeenStarted爲true,以後調用start方法就會有異常。
處於這個狀態中的線程並無開始運行,只是表示該線程能夠運行了。至於該線程什麼時候開始運行,取決於JVM裏線程調度器的調度。當線程獲取cpu後,run()方法會被調用。不要本身去調用Thread的run()方法。以後根據CPU的調度在就緒——運行——阻塞間切換,直到run()方法結束或其餘方式中止線程,進入dead狀態。
Java中線程池主要是併發包java.util.concurrent 中 ThreadPoolExecutor這個類實現的。
java中的線程池是經過Executor框架實現的,Executor 框架包括類
Executor
Executors
ExecutorService
ThreadPoolExecutor
Callable
Future
FutureTask
(1) Executor: 全部線程池的接口,只有一個方法。
public interface Executor {
void execute(Runnable command);
}
execute執行方法
execute執行方法分了三步,以註釋的方式寫在代碼上了~
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //若是線程池中運行的線程數量<corePoolSize,則建立新線程來處理請求,即便其餘輔助線程是空閒的。 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //若是線程池中運行的線程數量>=corePoolSize,且線程池處於RUNNING狀態,且把提交的任務成功放入阻塞隊列中,就再次檢查線程池的狀態, // 1.若是線程池不是RUNNING狀態,且成功從阻塞隊列中刪除任務,則該任務由當前 RejectedExecutionHandler 處理。 // 2.不然若是線程池中運行的線程數量爲0,則經過addWorker(null, false)嘗試新建一個線程,新建線程對應的任務爲null。 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 若是以上兩種case不成立,即沒能將任務成功放入阻塞隊列中,且addWoker新建線程失敗,則該任務由當前 RejectedExecutionHandler 處理。 else if (!addWorker(command, false)) reject(command); }
(2) ExecutorService: 增長Executor的行爲,是Executor實現類的最直接接口。
(3) Executors: 提供了一系列工廠方法用於創先線程池,返回的線程池都實現了ExecutorService 接口。
建立線程池使用類:java.util.concurrent.Executors
Executors幾個重要方法:
callable(Runnable task): 將 Runnable 的任務轉化成 Callable 的任務
newSingleThreadExecutor(): 產生一個ExecutorService對象,這個對象只有一個線程可用來執行任務,若任務多於一個,任務將按前後順序執行。
newCachedThreadPool(): 產生一個ExecutorService對象,這個對象帶有一個線程池,線程池的大小會根據須要調整,線程執行完任務後返回線程池,供執行下一次任務使用。
//建立無大小限制的線程池 public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
newFixedThreadPool(int poolSize): 產生一個ExecutorService對象,這個對象帶有一個大小爲 poolSize 的線程池,若任務數量大於 poolSize ,任務會被放在一個 queue 裏順序執行。
//建立固定大小的線程池 public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
newSingleThreadScheduledExecutor(): 產生一個ScheduledExecutorService對象,這個對象的線程池大小爲 1 ,若任務多於一個,任務將按前後順序執行。
//建立單線程池 public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
newScheduledThreadPool(int poolSize): 產生一個ScheduledExecutorService對象,這個對象的線程池大小爲 poolSize ,若任務數量大於 poolSize ,任務會在一個 queue 裏等待執行。
//建立定時調度池 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
(4) ThreadPoolExecutor:線程池的具體實現類,通常用的各類線程池都是基於這個類實現的。
線程池能夠解決兩個不一樣問題:因爲減小了每一個任務的調用開銷,在執行大量的異步任務時,它一般可以提供更好的性能,而且還能夠提供綁定和管理資源(包括執行集合任務時使用的線程)的方法。每一個 ThreadPoolExecutor還維護着一些基本的統計數據,如完成的任務數。
線程池作的其實能夠看得很簡單,其實就是把你提交的任務(task)進行調度管理運行,但這個調度的過程以及其中的狀態控制是比較複雜的。
構造方法以下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException();
this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
[1]corePoolSize:線程池的核心線程數,線程池中運行的線程數也永遠不會超過 corePoolSize 個,默認狀況下能夠一直存活。能夠經過設置allowCoreThreadTimeOut爲True,此時 核心線程數就是0,此時keepAliveTime控制全部線程的超時時間。
核心線程會一直存活,及時沒有任務須要執行,當線程數小於核心線程數時
即便有線程空閒,線程池也會優先建立新線程處理
設置allowCoreThreadTimeout=true(默認false)時,核心線程會超時關閉
[2]maximumPoolSize:線程池容許的最大線程數;
線程池最大線程數(當workQueue都放不下時,啓動新線程)
當線程數>=corePoolSize,且任務隊列已滿時。線程池會建立新線程來處理任務
當線程數=maxPoolSize,且任務隊列已滿時,線程池會拒絕處理任務而拋出異常(飽和策略怎麼處理)
[3]keepAliveTime: 指的是空閒線程結束的超時時間;
超出corePoolSize數量的線程的保留時間,若是allowCoreThreadTimeout=true,則會直到線程數量=0
jdk中的解釋是:當線程數大於核心時,此爲終止前多餘的空閒線程等待新任務的最長時間。
有點拗口,其實這個不難理解,在使用了「池」的應用中,大多都有相似的參數須要配置。好比數據庫鏈接池,DBCP中的maxIdle,minIdle參數。
什麼意思?接着上面的解釋,後來向老闆派來的工人始終是「借來的」,俗話說「有借就有還」,但這裏的問題就是何時還了,若是借來的工人剛完成一個任務就還回去,後來發現任務還有,那豈不是又要去借?這一來一往,老闆確定頭也大死了。
合理的策略:既然借了,那就多借一下子。直到「某一段」時間後,發現再也用不到這些工人時,即可以還回去了。這裏的某一段時間即是keepAliveTime的含義,TimeUnit爲keepAliveTime值的度量。
[4]unit :是一個枚舉,表示 keepAliveTime 的單位;
TimeUnit.DAYS; //天 TimeUnit.HOURS; //小時 TimeUnit.MINUTES; //分鐘 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //納秒
[5]workQueue:表示存聽任務的BlockingQueue<Runnable隊列。阻塞隊列(任務隊列容量),當線程數達到核心線程數時,新任務會放在隊列中排隊等待執行
ArrayBlockingQueue:構造函數必定要傳大小
LinkedBlockingQueue:構造函數不傳大小會默認爲Integer.MAX_VALUE ,當大量請求任務時,容易形成 內存耗盡
SynchronousQueue:同步隊列,一個沒有存儲空間的阻塞隊列 ,將任務同步交付給工做線程
PriorityBlockingQueue : 優先隊列
BlockingQueue:阻塞隊列(BlockingQueue)是java.util.concurrent下的主要用來控制線程同步的工具。若是BlockQueue是空的,從BlockingQueue取東西的操做將會被阻斷進入等待狀態,直到BlockingQueue進了東西纔會被喚醒。一樣,若是BlockingQueue是滿的,任何試圖往裏存東西的操做也會被阻斷進入等待狀態,直到BlockingQueue裏有空間纔會被喚醒繼續操做。
阻塞隊列經常使用於生產者和消費者的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器裏拿元素。具體的實現類有LinkedBlockingQueue,ArrayBlockingQueued等。通常其內部的都是經過Lock和Condition(顯示鎖(Lock)及Condition的學習與使用)來實現阻塞和喚醒。
queue上的三種類型。
排隊有三種通用策略:
直接提交:工做隊列默認選項是SynchronousQueue,它將任務直接提交給線程而不保存它們。在此,若是不存在可用於當即運行任務的線程,則駛入把任務加入到隊列將失敗,所以將會構造一個新的線程。此策略能夠避免在處理可能具備內部依賴性請求集時出現鎖。直接提交一般要求無界maximumPoolSize以免拒絕新提交的任務。當命令以超過隊列所能處理的平均數連續到達時,此策略容許無界線程有增加的可能性。
無界隊列:使用無界隊列(例如,不具備預約義容量的 LinkedBlockingQueue)將致使在全部 corePoolSize 線程都忙時新任務在隊列中等待。這樣,建立的線程就不會超過 corePoolSize。(所以,maximumPoolSize的值也就無效了。)當每一個任務徹底獨立於其餘任務,即任務執行互不影響時,適合於使用無界隊列;例如,在 Web頁服務器中。這種排隊可用於處理瞬態突發請求,當命令以超過隊列所能處理的平均數連續到達時,此策略容許無界線程具備增加的可能性。
有界隊列。當使用有限的 maximumPoolSizes時,有界隊列(如 ArrayBlockingQueue)有助於防止資源耗盡,可是可能較難調整和控制。隊列大小和最大池大小可能須要相互折衷:使用大型隊列和小型池能夠最大限度地下降 CPU 使用率、操做系統資源和上下文切換開銷,可是可能致使人工下降吞吐量。若是任務頻繁阻塞(例如,若是它們是 I/O邊界),則系統可能爲超過您許可的更多線程安排時間。使用小型隊列一般要求較大的池大小,CPU使用率較高,可是可能遇到不可接受的調度開銷,這樣也會下降吞吐量。
[6]threadFactory:線程工廠,主要用來建立線程
[7]rejectedExecutionHandler :任務拒絕處理器
兩種狀況會拒絕處理任務
線程池會調用rejectedExecutionHandler來處理這個任務。若是沒有設置默認是AbortPolicy,會拋出異常,ThreadPoolExecutor類有幾個內部實現類來處理這類狀況
實現RejectedExecutionHandler接口,可自定義處理器
另外一種狀況即是,即便向老闆借了工人,可是任務仍是繼續過來,仍是忙不過來,這時整個隊伍只好拒絕接受了。
RejectedExecutionHandler接口提供了對於拒絕任務的處理的自定方法的機會。在ThreadPoolExecutor中已經默認包含了4中策略,由於源碼很是簡單,這裏直接貼出來。
CallerRunsPolicy:線程調用運行該任務的 execute 自己。此策略提供簡單的反饋控制機制,可以減緩新任務的提交速度。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
AbortPolicy:處理程序遭到拒絕將拋出運行時RejectedExecutionException
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException();
}
DiscardPolicy:不能執行的任務將被刪除
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
這種策略和AbortPolicy幾乎同樣,也是丟棄任務,只不過他不拋出異常。
DiscardOldestPolicy:若是執行程序還沒有關閉,則位於工做隊列頭部的任務將被刪除,而後重試執行程序(若是再次失敗,則重複此過程)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
該策略就稍微複雜一些,在pool沒有關閉的前提下首先丟掉緩存在隊列中的最先的任務,而後從新嘗試運行該任務。這個策略須要適當當心。
設想:若是其餘線程都還在運行,那麼新來任務踢掉舊任務,緩存在queue中,再來一個任務又會踢掉queue中最老任務。
池子有對象池如commons pool的GenericObjectPool(通用對象池技術)也有java裏面的線程池ThreadPoolExecutor,但java裏面的線程池引入了一個叫拒絕執行的策略模式,感受比GenericObjectPool好一點,意思也就是說當池子滿的時候該如何執行還在不斷往裏面添加的一些任務。
像GenericObjectPool只提供了,繼續等待和直接返回空的策略。而ThreadPoolExecutor則提供了一個接口,並內置了4中實現策略供用戶分場景使用。
ThreadPoolExecutor.execute(Runnable command)提供了提交任務的入口,此方法會自動判斷若是池子滿了的話,則會調用拒絕策略來執行此任務,接口爲RejectedExecutionHandler,內置的4中策略分別爲AbortPolicy、DiscardPolicy、DiscardOldestPolicy、CallerRunsPolicy。
圖5 拒絕策略關係圖
AbortPolicy
爲java線程池默認的阻塞策略,不執行此任務,並且直接拋出一個運行時異常,切記ThreadPoolExecutor.execute須要try catch,不然程序會直接退出。
DiscardPolicy
直接拋棄,任務不執行,空方法
DiscardOldestPolicy
從隊列裏面拋棄head的一個任務,並再次execute 此task。
CallerRunsPolicy
在調用execute的線程裏面執行此command,會阻塞入口。
用戶自定義拒絕策略
實現RejectedExecutionHandler,並本身定義策略模式。
再次須要注意的是,ThreadPoolExecutor.submit() 函數,此方法內部調用的execute方法,並把execute執行完後的結果給返回,但若是任務並無執行的話(被拒絕了),則submit返回的future.get()會一直等到。
future 內部其實仍是一個runnable,並把command給封裝了下,當command執行完後,future會返回一個值。
當線程池由於工做池已經飽和,準備拒絕任務時候。會調用RejectedExecutionHandler來拒絕該任務。Jdk提供了幾種不一樣的RejectedExecutionHandler實現,每種實現都包含不一樣的飽和策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy。
Abort是默認的飽和策略,該策略會拋出未檢查的RejectedExecutionException。
CallerRuns實現一種調節機制,將任務回退到調用者,讓調用者執行,從而下降了新任務的流量。webServer經過使用該策略使得在請求負載太高的狀況下實現了性能的平緩下降。
Discard實現了會悄悄拋棄該任務,DiscardOldestPolicy會拋棄隊列中拋棄下一個即將被執行的任務。若是是在優先隊列裏,DiscardOldestPolicy會拋棄優先級最高的任務。
ThreadLocalPool的池的大小設置,《Java併發編程實戰》書中給了一個推薦的設置值。
Ncpu爲CPU的數量,Ucpu爲CPU的利用率,W/C爲任務的等待時間 / 任務的計算時間。在這種狀況下,通常線程池的最優大小:
N=Ncpu*Ucpu*(1+W/C)
ThreadPoolExecutor中有一個ctl變量。ctl是一個32位的二級制數,其中高3位用於表示線程池的狀態,低29位表示線程池中的活動線程。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
如上代碼所示,線程池有五種狀態。RUNNING、SHUTDOWN、STOP、TIDYING、TERMINNATED。幸虧ThreadPoolExecutor的代碼上有對應註釋,看着這些註釋能對ThreadPoolExecutor的狀態做用和狀態流轉能有一個大體的瞭解。
RUNNING:在線程池建立的時候,線程池默認處於RUNNING狀態。當線程池處於RUNNING狀態的時候,任務隊列能夠接受任務,而且能夠執行QUEUE中任務。
SHUTDOWN:不接受新任務,可是會繼續執行QUEUE中的任務。
STOP:不接受新任務,也不執行QUEUE中的任務。
TIDYING:全部的任務都停止了,沒有活動中的線程。當線程池進行該狀態時候,會執行鉤子方法terminated() 。
當Executors建立完成了線程池以後能夠返回「ExecutorService」接口對象,而這個對象裏面有兩個方法來接收線程的執行:
//接收Callable: public <T> Future<T> submit(Callable<T> task); //接收Runnable: public Future<?> submit(Runnable task);
範例:建立無限量線程池
package so.strong.mall.concurrent; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ExecutorDemo { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); //建立一個線程池 for (int i = 0; i < 5; i++) { service.submit(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"執行操做"); } }); } service.shutdown(); //線程池執行完畢後須要關閉 } } //無限量大小的線程池會根據內部線程的執行情況來進行線程對象個數的控制。
submit()方法是能夠接收Callable接口對象的
package so.strong.mall.concurrent; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class ExecutorDemo { public static void main(String[] args) throws Exception { ExecutorService service = Executors.newCachedThreadPool(); for (int i = 0; i < 5; i++) { Future<?> future = service.submit(new Callable<Object>() { @Override public Object call() throws Exception { return Thread.currentThread().getName() + "執行操做"; } }); System.out.println(future.get()); } service.shutdown(); } }
Future線程模型設計的優點在於:能夠進行線程數據的異步控制,可是在以前編寫的過程嚴格來說並很差,至關於啓動了一個線程就得到了一個返回值,因而爲了方便這些線程池中線程對象的管理,可使用以下方法進行統一返回:
public interface ExecutorService extends Executor { public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; }
範例:使用invokeAny()方法
package so.strong.mall.concurrent; import java.util.HashSet; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ExecutorDemo { public static void main(String[] args) throws Exception { Set<Callable<String>> tasks = new HashSet<>(); //全部任務 for (int i = 0; i < 10; i++) { final int temp = i; tasks.add(new Callable<String>() { @Override public String call() throws Exception { return Thread.currentThread().getName() + "執行任務,i=" + temp; } }); } ExecutorService service = Executors.newCachedThreadPool(); //建立一個線程池 String invokeAny = service.invokeAny(tasks); //執行任務 System.out.println("返回結果:" + invokeAny); service.shutdown(); } } //返回結果:pool-1-thread-2執行任務,i=4 //使用invokeAny()方法只會返回一個任務的執行操做
package java.util.concurrent; public interface CompletionService<V> { Future<V> submit(Callable<V> task); Future<V> submit(Runnable task, V result); Future<V> take() throws InterruptedException; Future<V> poll(); Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException; }
線程池異步交互:CompletionService
將生產新的異步任務與使用已完成任務的結果分離開來的服務。生產者submit()執行的任務,使用者take()已完成的任務,並按照完成任務的順序處理它們的結果。
CompletionService依賴於一個單獨的Executor來實際執行任務,在這種狀況下,CompletionService只管理一個內部完成隊列,在CompletionService接口裏面提供有以下兩個方法:
設置Callable:
public Future<V> submit(Callable<V> task);
設置Runnable:
public Future<V> submit(Runnable task, V result);
CompletionService是一個接口,若是要想使用這個接口能夠採用ExecutorCompletionService這個子類
public class ExecutorCompletionService<V> implements CompletionService<V>
ExecutorCompletionService的構造方法:
public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); }
CompletionService來控制全部線程池的操做以及數據返回,則應該使用這個類來進行線程池的提交處理。
提交線程
Future<V> submit(Callable<V> task);
獲取返回內容
Future<V> take() throws InterruptedException;
範例:使用CompletionService工具類
package so.strong.mall.concurrent; import java.util.concurrent.*; public class ExecutorDemo { public static void main(String[] args) throws Exception { ExecutorService service = Executors.newCachedThreadPool(); CompletionService<String> completions = new ExecutorCompletionService<>(service); for (int i = 0; i < 5; i++) { final int temp = i; completions.submit(new Callable<String>() { @Override public String call() throws Exception { return Thread.currentThread().getName() + "- i =" + temp; } }); } for (int i = 0; i < 5; i++) { System.out.println(completions.take().get()); } service.shutdown(); } }
CompletionService操做接口的主要目的是能夠去隱藏ExecutorService接口執行線程池的處理,不在須要關心novkeAny(), invokeAll()的執行方法了。
建立一個定時調度池,這個調度池主要是以時間間隔調度爲主。若是要建立調度池則使用ScheduledExecutorService接口完成,該接口之中包含有以下的兩個方法:
延遲啓動
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
間隔調度
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
範例:建立調度池
package so.strong.mall.concurrent; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class ExecutorDemo { public static void main(String[] args) { ScheduledExecutorService service = Executors.newScheduledThreadPool(1); for (int i = 0; i < 5; i++) { service.schedule(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"執行操做"); } },2, TimeUnit.SECONDS); } service.shutdown(); } }
在ExecutorService接口裏面的確提供有接收Runnable接口對象的方法,可是這個方法爲了統一使用的是submit()。submit()重載了許屢次,能夠接收Runnable:
public Future<?> submit(Runnale task)
生成線程池採用了工具類Executors的靜態方法,如下是幾種常見的線程池。
SingleThreadExecutor:單個後臺線程 (其緩衝隊列是無界的)
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService ( new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
建立一個單線程的線程池。這個線程池只有一個核心線程在工做,也就是至關於單線程串行執行全部任務。若是這個惟一的線程由於異常結束,那麼會有一個新的線程來替代它。此線程池保證全部任務的執行順序按照任務的提交順序執行。
FixedThreadPool:只有核心線程的線程池,大小固定 (其緩衝隊列是無界的) 。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
建立固定大小的線程池。每次提交一個任務就建立一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,若是某個線程由於執行異常而結束,那麼線程池會補充一個新線程。
CachedThreadPool:無界線程池,能夠進行自動線程回收。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0,Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
若是線程池的大小超過了處理任務所須要的線程,那麼就會回收部分空閒(60秒不執行任務)的線程,當任務數增長時,此線程池又能夠智能的添加新線程來處理任務。此線程池不會對線程池大小作限制,線程池大小徹底依賴於操做系統(或者說JVM)可以建立的最大線程大小。SynchronousQueue是一個是緩衝區爲1的阻塞隊列。
ScheduledThreadPool:核心線程池固定,大小無限的線程池。此線程池支持定時以及週期性執行任務的需求。
public static ExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPool(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); }
建立一個週期性執行任務的線程池。若是閒置,非核心線程池會在DEFAULT_KEEPALIVEMILLIS時間內回收。
execute:
ExecutorService.execute(Runnable runable);
submit:
FutureTask task = ExecutorService.submit(Runnable runnable);
FutureTask<T> task = ExecutorService.submit(Runnable runnable,T Result); FutureTask<T> task = ExecutorService.submit(Callable<T> callable);
submit(Callable callable)的實現,submit(Runnable runnable)同理。
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); FutureTask<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
能夠看出submit開啓的是有返回結果的任務,會返回一個FutureTask對象,這樣就能經過get()方法獲得結果。submit最終調用的也是execute(Runnable runable),submit只是將Callable對象或Runnable封裝成一個FutureTask對象,由於FutureTask是個Runnable,因此能夠在execute中執行。關於Callable對象和Runnable怎麼封裝成FutureTask對象,見Callable和Future、FutureTask的使用。
ThreadPoolExecutor提供瞭如下3個生命週期的鉤子方法讓子類擴展:
(1).beforeExecute:
任務執行前,線程會調用該方法,能夠用來添加日誌、監控或者信息收集統計。
若beforeExcute方法拋出了RuntimeException,線程的任務將不被執行,afterExecute方法也不會被調用。
(2).afterExecute:
任務執行結束後,線程會調用該方法,能夠用來添加日誌、監控或者信息收集統計。
不管任務正常返回或者拋出異常(拋出Error不能被調用),該方法都會被調用。
(3).terminate:
線程池完成關閉動做後調用,能夠用來釋放資源、發出通知、記錄日誌或者完成統計信息等。
一個擴展ThreadPoolExecutor的例子代碼以下:
public class TimingThreadPool extends ThreadPoolExecutor{ private final ThreadLocal<Long> startTime = new ThreadLocal<Long>(); private final Logger log = Logger.getLogger(TimingThreadPool.class.getClassName()); private final AtomicLong numTasks = new AtomicLong(); private final AtomicLong totalTime = new AtomicLong(); protected void beforeExecute(Thread t, Runnable r){ super.beforeExecute(t, r); log.fine(String.format(「Thread %s: start %s」, t, r)); startTime.set(System.nanoTime()); } protected void afterExecute(Runnable r, Throwable t){ try{ long endTime = System.nanoTime(); long taskTime = endTime - startTime.get(); numTasks.incrementAndGet(); totalTime.addAndGet(taskTime); log.fine(String.format("Thread %s: end %s, time=%dns", t, r, taskTime)); }finally{ super.afterExecute(r, t); } } protected void terminated(){ try{ log.info(String.format("Terminated: avg time=%dns", totalTime.get() / numTasks.get())); }finally{ super.terminated(); } } }
下面給出一個線程池使用示例,及教你獲取線程池狀態。
private static ExecutorService es = new ThreadPoolExecutor(50, 100, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100000)); public static void main(String[] args) throws Exception { for (int i = 0; i < 100000; i++) { es.execute(() -> { System.out.print(1); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); } ThreadPoolExecutor tpe = ((ThreadPoolExecutor) es); while (true) { System.out.println(); int queueSize = tpe.getQueue().size(); System.out.println("當前排隊線程數:" + queueSize); int activeCount = tpe.getActiveCount(); System.out.println("當前活動線程數:" + activeCount); long completedTaskCount = tpe.getCompletedTaskCount(); System.out.println("執行完成線程數:" + completedTaskCount); long taskCount = tpe.getTaskCount(); System.out.println("總線程數:" + taskCount); Thread.sleep(3000); } }
線程池提交了 100000 個任務,但同時只有 50 個線程在執行工做,咱們每陋 3 秒來獲取當前線程池的運行狀態。
第一次程序輸出:
當前排隊線程數:99950 當前活動線程數:50 執行完成線程數:0 總線程數(排隊線程數 + 活動線程數 + 執行完成線程數):100000
第二次程序輸出:
當前排隊線程數:99800 當前活動線程數:50 執行完成線程數:150 總線程數(排隊線程數 + 活動線程數 + 執行完成線程數):100000
活動線程數和總線程數是不變的,排隊中的線程數和執行完成的線程數不斷在變化,直到全部任務執行完畢,最後輸出:
當前排隊線程數:0 當前活動線程數:0 執行完成線程數:100000 總線程數(排隊線程數 + 活動線程數 + 執行完成線程數):100000
這樣,你瞭解了這些 API 的使用方法,你想監控線程池的狀態就很是方便了。
爲了充分利用多CPU的優點、多核CPU的性能優點。能夠考多個小任務,把小任務放到多個處理器核心上並行執行;當多個小任務執行完成以後,再將這些執行結果合併起來便可。Java 7提供了ForkJoinPool來支持這個功能。
ForkJoinPool是ExecutorService的實現類,所以是一種特殊的線程池。提供了以下兩個經常使用的構造器
Java 8進一步拓展了ForkJoinPool的功能,Java 8增長了通用池功能。ForkJoinPool經過以下兩個方法提供通用池功能。
建立了通用池ForkJoinPool實例以後,就可調用ForkJoinPool的submit(ForkJoinTask task)或invoke(ForkJoinTask task)方法來執行指定任務了。其中,ForkJoinTask表明一個並行,合併的任務。
ForkJoinTask是一個抽象類,它還有兩個抽象子類:RecursiveAction和recursiveTask。其中RecursiveAction表明沒有返回值的任務,RecursiveTask表明有返回值的任務。
下面程序將一個大任務(打印0~500)的數值分紅多個小任務,並將任務交給ForkJoinPool來執行。
package com.gdut.thread.threadPool; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; class PrintTask extends RecursiveAction{ private static final int THRESHOLD = 50; private int start; private int end; public PrintTask(int start,int end) { this.start = start; this.end = end; } @Override protected void compute() { if(end-start<THRESHOLD){ for (int i = start; i <end ; i++) { System.out.println(Thread.currentThread().getName()+"的i值"+i); } }else{ //當end與start的差大於THRESHOLD時,即要打印的數超過50時,將大任務分紅兩個小任務 int middle = (end+start)/2; PrintTask left = new PrintTask(start,middle); PrintTask right = new PrintTask(middle,end); left.fork(); right.fork(); } } } public class ForkJoinPoolTest{ public static void main(String[] args) throws InterruptedException{ ForkJoinPool pool = new ForkJoinPool(); pool.submit(new PrintTask(0,500)); pool.awaitTermination(2, TimeUnit.SECONDS); pool.shutdown(); } }