工做中遇到了消息隊列的發送,以前都是用數據庫做爲中轉和暫存的。此次考慮用多線程的方式進行消息的發送,因而學習了一下線程池的應用。說實話,實踐中對Java高級特性的應用真的很少,對多線程的理解也就一直停留在理論層面。藉着實踐的機會好好整理一下。java
準備從如下幾個方面總結:數據庫
線程池的使用緩存
消息隊列——生產者消費者模式服務器
定時任務Quartz原理多線程
線程池的大小、隊列大小設置ide
這個部分是有關線程池的使用:函數
1. 爲何要用線程池?學習
在Java中,若是每當一個請求到達就建立一個新線程,開銷是至關大的。在實際使用中,每一個請求建立新線程的服務器在建立和銷燬線程上花費的時間和消耗的系統資源,甚至可能要比花在實際處理實際的用戶請求的時間和資源要多的多。除了建立和銷燬線程的開銷以外,活動的線程也須要消耗系統資源。若是在一個JVM中建立太多的線程,可能會致使系統因爲過分消耗內存或者「切換過分」而致使系統資源不足。爲了防止資源不足,服務器應用程序須要一些辦法來限制任何給定時刻處理的請求數目,儘量減小建立和銷燬線程的次數,特別是一些資源耗費比較大的線程的建立和銷燬,儘可能利用已有對象來進行服務,這就是「池化資源」技術產生的緣由。測試
線程池主要用來解決線程生命週期開銷問題和資源不足問題,經過對多個任務重用線程,線程建立的開銷被分攤到多個任務上了,並且因爲在請求到達時線程已經存在,因此消除了建立所帶來的延遲。這樣,就能夠當即請求服務,使應用程序響應更快。另外,經過適當的調整線程池中的線程數據能夠防止出現資源不足的狀況。this
2. ThreadPoolExecutor類
JDK 1.5之後,Java提供一個線程池ThreadPoolExecutor類。下面從構造函數來分析一下這個線程池的使用方法。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);
參數名、說明:
參數名 | 說明 |
---|---|
corePoolSize | 線程池維護線程的最少數量 |
maximumPoolSize | 線程池維護線程的最大數量 |
keepAliveTime | 線程池維護線程所容許的空閒時間 |
workQueue | 任務隊列,用來存放咱們所定義的任務處理線程 |
threadFactory | 線程建立工廠 |
handler | 線程池對拒絕任務的處理策略 |
ThreadPoolExecutor將根據corePoolSize和maximumPoolSize設置的邊界自動調整池大小。當新任務在方法execute(Runnable) 中提交時, 若是運行的線程少於corePoolSize,則建立新線程來處理請求。
若是正在運行的線程等於corePoolSize時,ThreadPoolExecutor優先往隊列中添加任務,直到隊列滿了,而且沒有空閒線程時才建立新的線程。若是設置的corePoolSize 和 maximumPoolSize 相同,則建立了固定大小的線程池。
keepAliveTime:當線程數達到maximumPoolSize時,通過某段時間,發現多出的線程出於空閒狀態,就進行線程的回收。keepAliveTime就是線程池內最大的空閒時間。
workQueue:當核心線程不能都在處理任務時,新進任務被放在Queue裏。
線程池中任務有三種排隊策略:
a. 直接提交。直接提交策略表示線程池不對任務進行緩存。新進任務直接提交給線程池,當線程池中沒有空閒線程時,建立一個新的線程處理此任務。這種策略須要線程池具備無限增加的可能性。實現爲:SynchronousQueue
b. 有界隊列。當線程池中線程達到corePoolSize時,新進任務被放在隊列裏排隊等待處理。有界隊列(如ArrayBlockingQueue)有助於防止資源耗盡,可是可能較難調整和控制。隊列大小和最大池大小可能須要相互折衷:使用大型隊列和小型池能夠最大限度地下降 CPU 使用率、操做系統資源和上下文切換開銷,可是可能致使人工下降吞吐量。若是任務頻繁阻塞(例如,若是它們是 I/O 邊界),則系統可能爲超過您許可的更多線程安排時間。使用小型隊列一般要求較大的池大小,CPU 使用率較高,可是可能遇到不可接受的調度開銷,這樣也會下降吞吐量。
c. 無界隊列。使用無界隊列(例如,不具備預約義容量的 LinkedBlockingQueue)將致使在全部 corePoolSize 線程都忙時新任務在隊列中等待。這樣,建立的線程就不會超過 corePoolSize。(所以,maximumPoolSize 的值也就無效了。)當每一個任務徹底獨立於其餘任務,即任務執行互不影響時,適合於使用無界隊列;例如,在 Web 頁服務器中。這種排隊可用於處理瞬態突發請求,當命令以超過隊列所能處理的平均數連續到達時,此策略容許無界線程具備增加的可能性。
拒絕策略:當任務源源不斷的過來,而咱們的系統又處理不過來的時候,咱們要採起的策略是拒絕服務。RejectedExecutionHandler接口提供了拒絕任務處理的自定義方法的機會。在ThreadPoolExecutor中已經包含四種處理策略。
1)CallerRunsPolicy:線程調用運行該任務的 execute 自己。此策略提供簡單的反饋控制機制,可以減緩新任務的提交速度。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }
這個策略顯然不想放棄執行任務。可是因爲池中已經沒有任何資源了,那麼就直接使用調用該execute的線程自己來執行。(開始我總不想丟棄任務的執行,可是對某些應用場景來說,頗有可能形成當前線程也被阻塞。若是全部線程都是不能執行的,極可能致使程序無法繼續跑了。須要視業務情景而定吧。)
2)AbortPolicy:處理程序遭到拒絕將拋出運行時 RejectedExecutionException
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException(); }
這種策略直接拋出異常,丟棄任務。(jdk默認策略,隊列滿併線程滿時直接拒絕添加新任務,並拋出異常,因此說有時候放棄也是一種勇氣,爲了保證後續任務的正常進行,丟棄一些也是能夠接收的,記得作好記錄)
3)DiscardPolicy:不能執行的任務將被刪除
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
這種策略和AbortPolicy幾乎同樣,也是丟棄任務,只不過他不拋出異常。
4)DiscardOldestPolicy:若是執行程序還沒有關閉,則位於工做隊列頭部的任務將被刪除,而後重試執行程序(若是再次失敗,則重複此過程)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }
該策略就稍微複雜一些,在pool沒有關閉的前提下首先丟掉緩存在隊列中的最先的任務,而後從新嘗試運行該任務。這個策略須要適當當心。
3. Executors 工廠
ThreadPoolExecutor是Executors類的實現,Executors類裏面提供了一些靜態工廠,生成一些經常使用的線程池,主要有如下幾個:
newSingleThreadExecutor:建立一個單線程的線程池。這個線程池只有一個線程在工做,也就是至關於單線程串行執行全部任務。若是這個惟一的線程由於異常結束,那麼會有一個新的線程來替代它。此線程池保證全部任務的執行順序按照任務的提交順序執行。
newFixedThreadPool:建立固定大小的線程池。每次提交一個任務就建立一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,若是某個線程由於執行異常而結束,那麼線程池會補充一個新線程。(我用的就是這個,同上所述,至關於建立了相同corePoolSize、maximumPoolSize的線程池)
newCachedThreadPool:建立一個可緩存的線程池。若是線程池的大小超過了處理任務所須要的線程,那麼就會回收部分空閒(60秒不執行任務)的線程,當任務數增長時,此線程池又能夠智能的添加新線程來處理任務。此線程池不會對線程池大小作限制,線程池大小徹底依賴於操做系統(或者說JVM)可以建立的最大線程大小。
4. 舉個栗子
測試類:ThreadPool,建立了一個 newFixedThreadPool,最大線程數爲3的固定大小線程池。而後模擬10個任務丟進去。主線程結束後會打印一句:主線程結束。
public class ThreadPool { public static void main(String[] args) { //建立固定大小線程池 ExecutorService executor = Executors.newFixedThreadPool(3); for (int i = 0; i < 10; i++) { SendNoticeTask task = new SendNoticeTask(); task.setCount(i); executor.execute(task); } System.out.println("主線程結束"); } }
測試類:SendNoticeTask,執行任務類,就是打印一句當前線程名+第幾個任務。爲了方便觀察,每一個線程執行完之後睡10s。
public class SendNoticeTask implements Runnable { private int count; public void setCount(int count) { this.count = count; } @Override public void run() { System.out.println(Thread.currentThread().getName() + " start to" + " send " + count + " ..."); try { Thread.currentThread().sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("finish " + Thread.currentThread().getName()); } }
執行結果:
主線程結束 pool-1-thread-3 start to send 2 ... pool-1-thread-1 start to send 0 ... pool-1-thread-2 start to send 1 ... finish pool-1-thread-3 finish pool-1-thread-2 pool-1-thread-2 start to send 3 ... finish pool-1-thread-1 pool-1-thread-3 start to send 4 ... pool-1-thread-1 start to send 5 ... finish pool-1-thread-3 finish pool-1-thread-1 pool-1-thread-1 start to send 6 ... pool-1-thread-3 start to send 7 ... finish pool-1-thread-2 pool-1-thread-2 start to send 8 ... finish pool-1-thread-1 pool-1-thread-1 start to send 9 ... finish pool-1-thread-3 finish pool-1-thread-2 finish pool-1-thread-1
由上可見執行順序是這樣的:線程池建立了三個線程,分別執行任務0、一、2,因爲線程建立須要必定時間,所以前三個線程的執行順序具備必定隨機性。此時主線程接着往線程池中塞任務,線程池已達到最大線程數(3),因而開始往隊列裏放。當某個線程執行完任務後,直接從隊列里拉出新的任務執行,隊列具備先進先出的特性,所以後面的任務執行是有序的。
這個看一下 Executors 類的源碼就更明白了。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
其實是建立了一個具備固定線程數、無界隊列的 ThreadPoolExecutor。隊列無界,不會拒絕任務提交,所以使用此方法時,須要注意資源被耗盡的狀況。
測試類:TestThreadPool,採用有界隊列(隊列大小2),和默認拒絕策略的ThreadPoolExecutor。
public class TestThreadPool { private static final int corePoolSize = 2; private static final int maximumPoolSize = 4; private static final int keepAliveTime = 1000; private static BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(2); public static void main(String[] args) { //建立線程池 ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, workQueue); for (int i = 0; i < 10; i++) { SendNoticeTask task = new SendNoticeTask(); task.setCount(i); executor.execute(task); } System.out.println("主線程結束:" + Thread.currentThread().getName()); } }
執行結果:
pool-1-thread-1 start to send 0 ... Exception in thread "main" java.util.concurrent.RejectedExecutionException at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1768) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:767) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:658) at com.dylanviviv.pool.TestThreadPool.main(TestThreadPool.java:24) pool-1-thread-4 start to send 5 ... pool-1-thread-2 start to send 1 ... pool-1-thread-3 start to send 4 ... finish pool-1-thread-1 finish pool-1-thread-3 pool-1-thread-1 start to send 2 ... finish pool-1-thread-2 pool-1-thread-3 start to send 3 ... finish pool-1-thread-4 finish pool-1-thread-3 finish pool-1-thread-1
線程池建立了2個線程,分別執行任務0、1,線程池達到corePoolSize,新進任務二、3被放入隊列中等待處理,此時隊列滿,而線程池中線程沒有執行完任務0、1,線程池建立新的線程,執行新進任務四、5,達到maximumPoolSize。此時全部任務都沒有執行結束,主線程又繼續提交任務,線程池進入默認異常策略(AbortPolicy)拒絕服務。