實現多線程的三種方式,繼承Thread,實現Runnable 和 實現 Executor接口 ,具體參考:Java 多線程 三種實現方式html
去美團,問到了什麼是線程池,如何使用,爲何要用,如下作個總結java
一、什麼是線程池: java.util.concurrent.Executors提供了一個 java.util.concurrent.Executor接口的實現用於建立線程池mysql
多線程技術主要解決處理器單元內多個線程執行的問題,它能夠顯著減小處理器單元的閒置時間,增長處理器單元的吞吐能力。
假設一個服務器完成一項任務所需時間爲:T1 建立線程時間,T2 在線程中執行任務的時間,T3 銷燬線程時間。程序員
若是:T1 + T3 遠大於 T2,則能夠採用線程池,以提升服務器性能。sql
一個線程池包括如下四個基本組成部分:
一、線程池管理器(ThreadPool):用於建立並管理線程池,包括 建立線程池,銷燬線程池,添加新任務;
二、工做線程(PoolWorker):線程池中線程,在沒有任務時處於等待狀態,能夠循環的執行任務;
三、任務接口(Task):每一個任務必須實現的接口,以供工做線程調度任務的執行,它主要規定了任務的入口,任務執行完後的收尾工做,任務的執行狀態等;
四、任務隊列(taskQueue):用於存放沒有處理的任務。提供一種緩衝機制。數據庫
線程池技術正是關注如何縮短或調整T1,T3時間的技術,從而提升服務器程序性能的。它把T1,T3分別安排在服務器程序的啓動和結束的時間段或者一些空閒的時間段,這樣在服務器程序處理客戶請求時,不會有T1,T3的開銷了。
線程池不只調整T1,T3產生的時間段,並且它還顯著減小了建立線程的數目,看一個例子:
假設一個服務器一天要處理50000個請求,而且每一個請求須要一個單獨的線程完成。在線程池中,線程數通常是固定的,因此產生線程總數不會超過線程池中線程的數目,而若是服務器不利用線程池來處理這些請求則線程總數爲50000。通常線程池大小是遠小於50000。因此利用線程池的服務器程序不會爲了建立50000而在處理請求時浪費時間,從而提升效率。緩存
Java線程池使用說明服務器
線程的使用在java中佔有極其重要的地位,在jdk1.4極其以前的jdk版本中,關於線程池的使用是極其簡陋的。在jdk1.5以後這一狀況有了很大的改觀。Jdk1.5以後加入了java.util.concurrent包,這個包中主要介紹java中線程以及線程池的使用。爲咱們在開發中處理線程的問題提供了很是大的幫助。多線程
線程池的做用:ide
線程池做用就是限制系統中執行線程的數量。
根據系統的環境狀況,能夠自動或手動設置線程數量,達到運行的最佳效果;少了浪費了系統資源,多了形成系統擁擠效率不高。用線程池控制線程數量,其餘線程排隊等候。一個任務執行完畢,再從隊列的中取最前面的任務開始執行。若隊列中沒有等待進程,線程池的這一資源處於等待。當一個新任務須要運行時,若是線程池中有等待的工做線程,就能夠開始運行了;不然進入等待隊列。
爲何要用線程池:
1.減小了建立和銷燬線程的次數,每一個工做線程均可以被重複利用,可執行多個任務。
2.能夠根據系統的承受能力,調整線程池中工做線線程的數目,防止由於消耗過多的內存,而把服務器累趴下(每一個線程須要大約1MB內存,線程開的越多,消耗的內存也就越大,最後死機)。
Java裏面線程池的頂級接口是Executor,可是嚴格意義上講Executor並非一個線程池,而只是一個執行線程的工具。真正的線程池接口是ExecutorService。
比較重要的幾個類:
ExecutorService |
真正的線程池接口。 |
ScheduledExecutorService |
能和Timer/TimerTask相似,解決那些須要任務重複執行的問題。 |
ThreadPoolExecutor |
ExecutorService的默認實現。 |
ScheduledThreadPoolExecutor |
繼承ThreadPoolExecutor的ScheduledExecutorService接口實現,週期性任務調度的類實現。
|
要配置一個線程池是比較複雜的,尤爲是對於線程池的原理不是很清楚的狀況下,頗有可能配置的線程池不是較優的,所以在Executors類裏面提供了一些靜態工廠,生成一些經常使用的線程池。
1. newSingleThreadExecutor
建立一個單線程的線程池。這個線程池只有一個線程在工做,也就是至關於單線程串行執行全部任務。若是這個惟一的線程由於異常結束,那麼會有一個新的線程來替代它。此線程池保證全部任務的執行順序按照任務的提交順序執行。
2.newFixedThreadPool
建立固定大小的線程池。每次提交一個任務就建立一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,若是某個線程由於執行異常而結束,那麼線程池會補充一個新線程。
3. newCachedThreadPool
建立一個可緩存的線程池。若是線程池的大小超過了處理任務所須要的線程,
那麼就會回收部分空閒(60秒不執行任務)的線程,當任務數增長時,此線程池又能夠智能的添加新線程來處理任務。此線程池不會對線程池大小作限制,線程池大小徹底依賴於操做系統(或者說JVM)可以建立的最大線程大小。
4.newScheduledThreadPool
建立一個大小無限的線程池。此線程池支持定時以及週期性執行任務的需求。
實例
1:newSingleThreadExecutor
package com.thread; /* * 經過實現Runnable接口,實現多線程 * Runnable類是有run()方法的; * 可是沒有start方法 * 參考: http://blog.csdn.net/qq_31753145/article/details/50899119 * */ public class MyThread extends Thread { @Override public void run() { // TODO Auto-generated method stub // super.run(); System.out.println(Thread.currentThread().getName()+"正在執行...."); } }
package com.thread; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /* * 經過實現Runnable接口,實現多線程 * Runnable類是有run()方法的; * 可是沒有start方法 * 參考: * http://blog.csdn.net/qq_31753145/article/details/50899119 * */ public class singleThreadExecutorTest{ public static void main(String[] args) { // TODO Auto-generated method stub //建立一個可重用固定線程數的線程池 ExecutorService pool=Executors.newSingleThreadExecutor(); //建立實現了Runnable接口對象,Thread對象固然也實現了Runnable接口; Thread t1=new MyThread(); Thread t2=new MyThread(); Thread t3=new MyThread(); Thread t4=new MyThread(); Thread t5=new MyThread(); //將線程放到池中執行; pool.execute(t1); pool.execute(t2); pool.execute(t3); pool.execute(t4); pool.execute(t5); //關閉線程池 pool.shutdown(); } }
結果:
pool-1-thread-1正在執行.... pool-1-thread-1正在執行.... pool-1-thread-1正在執行.... pool-1-thread-1正在執行.... pool-1-thread-1正在執行....
2newFixedThreadPool
package com.thread; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /* * 經過實現Runnable接口,實現多線程 * Runnable類是有run()方法的; * 可是沒有start方法 * 參考: * http://blog.csdn.net/qq_31753145/article/details/50899119 * */ public class fixedThreadExecutorTest{ public static void main(String[] args) { // TODO Auto-generated method stub //建立一個可重用固定線程數的線程池 ExecutorService pool=Executors.newFixedThreadPool(2); //建立實現了Runnable接口對象,Thread對象固然也實現了Runnable接口; Thread t1=new MyThread(); Thread t2=new MyThread(); Thread t3=new MyThread(); Thread t4=new MyThread(); Thread t5=new MyThread(); //將線程放到池中執行; pool.execute(t1); pool.execute(t2); pool.execute(t3); pool.execute(t4); pool.execute(t5); //關閉線程池 pool.shutdown(); } }
結果:
pool-1-thread-1正在執行.... pool-1-thread-1正在執行.... pool-1-thread-1正在執行.... pool-1-thread-1正在執行.... pool-1-thread-2正在執行....
三、newCachedThreadPool
package com.thread; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /* * 經過實現Runnable接口,實現多線程 * Runnable類是有run()方法的; * 可是沒有start方法 * 參考: * http://blog.csdn.net/qq_31753145/article/details/50899119 * */ public class cachedThreadExecutorTest{ public static void main(String[] args) { // TODO Auto-generated method stub //建立一個可重用固定線程數的線程池 ExecutorService pool=Executors.newCachedThreadPool(); //建立實現了Runnable接口對象,Thread對象固然也實現了Runnable接口; Thread t1=new MyThread(); Thread t2=new MyThread(); Thread t3=new MyThread(); Thread t4=new MyThread(); Thread t5=new MyThread(); //將線程放到池中執行; pool.execute(t1); pool.execute(t2); pool.execute(t3); pool.execute(t4); pool.execute(t5); //關閉線程池 pool.shutdown(); } }
結果:
pool-1-thread-2正在執行.... pool-1-thread-1正在執行.... pool-1-thread-3正在執行.... pool-1-thread-4正在執行.... pool-1-thread-5正在執行....
四、newScheduledThreadPool
package com.thread; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; /* * 經過實現Runnable接口,實現多線程 * Runnable類是有run()方法的; * 可是沒有start方法 * 參考: * http://blog.csdn.net/qq_31753145/article/details/50899119 * */ public class scheduledThreadExecutorTest{ public static void main(String[] args) { // TODO Auto-generated method stub ScheduledThreadPoolExecutor exec =new ScheduledThreadPoolExecutor(1); exec.scheduleAtFixedRate(new Runnable(){//每隔一段時間就觸發異常 @Override public void run() { // TODO Auto-generated method stub //throw new RuntimeException(); System.out.println("==================="); }}, 1000, 5000, TimeUnit.MILLISECONDS); exec.scheduleAtFixedRate(new Runnable(){//每隔一段時間打印系統時間,證實二者是互不影響的 @Override public void run() { // TODO Auto-generated method stub System.out.println(System.nanoTime()); }}, 1000, 2000, TimeUnit.MILLISECONDS); } }
結果:
=================== 23119318857491 23121319071841 23123319007891 =================== 23125318176937 23127318190359 =================== 23129318176148 23131318344312 23133318465896 =================== 23135319645812
ThreadPoolExecutor的完整構造方法的簽名是:ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
.
corePoolSize - 池中所保存的線程數,包括空閒線程。
maximumPoolSize-池中容許的最大線程數。
keepAliveTime - 當線程數大於核心時,此爲終止前多餘的空閒線程等待新任務的最長時間。
unit - keepAliveTime 參數的時間單位。
workQueue - 執行前用於保持任務的隊列。此隊列僅保持由 execute方法提交的 Runnable任務。
threadFactory - 執行程序建立新線程時使用的工廠。
handler - 因爲超出線程範圍和隊列容量而使執行被阻塞時所使用的處理程序。
ThreadPoolExecutor是Executors類的底層實現。
在JDK幫助文檔中,有如此一段話:
「強烈建議程序員使用較爲方便的Executors
工廠方法Executors.newCachedThreadPool()
(無界線程池,能夠進行自動線程回收)、Executors.newFixedThreadPool(int)
(固定大小線程池)Executors.newSingleThreadExecutor()
(單個後臺線程)
它們均爲大多數使用場景預約義了設置。」
下面介紹一下幾個類的源碼:
ExecutorService newFixedThreadPool (int nThreads):固定大小線程池。
能夠看到,corePoolSize和maximumPoolSize的大小是同樣的(實際上,後面會介紹,若是使用無界queue的話maximumPoolSize參數是沒有意義的),keepAliveTime和unit的設值表名什麼?-就是該實現不想keep alive!最後的BlockingQueue選擇了LinkedBlockingQueue,該queue有一個特色,他是無界的。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
ExecutorService newSingleThreadExecutor():單線程
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
ExecutorService newCachedThreadPool():無界線程池,能夠進行自動線程回收
這個實現就有意思了。首先是無界的線程池,因此咱們能夠發現maximumPoolSize爲big big。其次BlockingQueue的選擇上使用SynchronousQueue。可能對於該BlockingQueue有些陌生,簡單說:該QUEUE中,每一個插入操做必須等待另外一個線程的對應移除操做。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
先從BlockingQueue<Runnable> workQueue這個入參開始提及。在JDK中,其實已經說得很清楚了,一共有三種類型的queue。
全部BlockingQueue 均可用於傳輸和保持提交的任務。可使用此隊列與池大小進行交互:
若是運行的線程少於 corePoolSize,則 Executor始終首選添加新的線程,而不進行排隊。(若是當前運行的線程小於corePoolSize,則任務根本不會存放,添加到queue中,而是直接抄傢伙(thread)開始運行)
若是運行的線程等於或多於 corePoolSize,則 Executor始終首選將請求加入隊列,而不添加新的線程。
若是沒法將請求加入隊列,則建立新的線程,除非建立此線程超出 maximumPoolSize,在這種狀況下,任務將被拒絕。
queue上的三種類型。
排隊有三種通用策略:
直接提交。工做隊列的默認選項是 SynchronousQueue,它將任務直接提交給線程而不保持它們。在此,若是不存在可用於當即運行任務的線程,則試圖把任務加入隊列將失敗,所以會構造一個新的線程。此策略能夠避免在處理可能具備內部依賴性的請求集時出現鎖。直接提交一般要求無界 maximumPoolSizes 以免拒絕新提交的任務。當命令以超過隊列所能處理的平均數連續到達時,此策略容許無界線程具備增加的可能性。
無界隊列。使用無界隊列(例如,不具備預約義容量的 LinkedBlockingQueue)將致使在全部corePoolSize 線程都忙時新任務在隊列中等待。這樣,建立的線程就不會超過 corePoolSize。(所以,maximumPoolSize的值也就無效了。)當每一個任務徹底獨立於其餘任務,即任務執行互不影響時,適合於使用無界隊列;例如,在 Web頁服務器中。這種排隊可用於處理瞬態突發請求,當命令以超過隊列所能處理的平均數連續到達時,此策略容許無界線程具備增加的可能性。
有界隊列。當使用有限的 maximumPoolSizes時,有界隊列(如 ArrayBlockingQueue)有助於防止資源耗盡,可是可能較難調整和控制。隊列大小和最大池大小可能須要相互折衷:使用大型隊列和小型池能夠最大限度地下降 CPU 使用率、操做系統資源和上下文切換開銷,可是可能致使人工下降吞吐量。若是任務頻繁阻塞(例如,若是它們是 I/O邊界),則系統可能爲超過您許可的更多線程安排時間。使用小型隊列一般要求較大的池大小,CPU使用率較高,可是可能遇到不可接受的調度開銷,這樣也會下降吞吐量。
BlockingQueue的選擇。
例子一:使用直接提交策略,也即SynchronousQueue。 好比 Executors.newCachedThreadPool()默認就是 SynchronousQueue
首先SynchronousQueue是無界的,也就是說他存數任務的能力是沒有限制的,可是因爲該Queue自己的特性,在某次添加元素後必須等待其餘線程取走後才能繼續添加。在這裏不是核心線程即是新建立的線程,可是咱們試想同樣下,下面的場景。
咱們使用一下參數構造ThreadPoolExecutor:
new ThreadPoolExecutor( 2, 3, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new RecorderThreadFactory("CookieRecorderPool"), new ThreadPoolExecutor.CallerRunsPolicy());
當核心線程已經有2個正在運行.
因此在使用SynchronousQueue一般要求maximumPoolSize是無界的,這樣就能夠避免上述狀況發生(若是但願限制就直接使用有界隊列)。對於使用SynchronousQueue的做用jdk中寫的很清楚:此策略能夠避免在處理可能具備內部依賴性的請求集時出現鎖。
什麼意思?若是你的任務A1,A2有內部關聯,A1須要先運行,那麼先提交A1,再提交A2,當使用SynchronousQueue咱們能夠保證,A1一定先被執行,在A1麼有被執行前,A2不可能添加入queue中。
例子二:使用無界隊列策略,即LinkedBlockingQueue
這個就拿newFixedThreadPool來講,根據前文提到的規則:
若是運行的線程少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。那麼當任務繼續增長,會發生什麼呢?
若是運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。OK,此時任務變加入隊列之中了,那何時纔會添加新線程呢?
若是沒法將請求加入隊列,則建立新的線程,除非建立此線程超出 maximumPoolSize,在這種狀況下,任務將被拒絕。這裏就頗有意思了,可能會出現沒法加入隊列嗎?不像SynchronousQueue那樣有其自身的特色,對於無界隊列來講,老是能夠加入的(資源耗盡,固然另當別論)。換句說,永遠也不會觸發產生新的線程!corePoolSize大小的線程數會一直運行,忙完當前的,就從隊列中拿任務開始運行。因此要防止任務瘋長,好比任務運行的實行比較長,而添加任務的速度遠遠超過處理任務的時間,並且還不斷增長,不一下子就爆了。
例子三:有界隊列,使用ArrayBlockingQueue。
這個是最爲複雜的使用,因此JDK不推薦使用也有些道理。與上面的相比,最大的特色即是能夠防止資源耗盡的狀況發生。
舉例來講,請看以下構造方法:
new ThreadPoolExecutor( 2, 4, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), new RecorderThreadFactory("CookieRecorderPool"), new ThreadPoolExecutor.CallerRunsPolicy());
假設,全部的任務都永遠沒法執行完。
對於首先來的A,B來講直接運行,接下來,若是來了C,D,他們會被放到queue中,若是接下來再來E,F,則增長線程運行E,F。可是若是再來任務,隊列沒法再接受了,線程數也到達最大的限制了,因此就會使用拒絕策略來處理。
keepAliveTime
jdk中的解釋是:當線程數大於核心時,此爲終止前多餘的空閒線程等待新任務的最長時間。
有點拗口,其實這個不難理解,在使用了「池」的應用中,大多都有相似的參數須要配置。好比數據庫鏈接池,DBCP中的maxIdle,minIdle參數。
什麼意思?接着上面的解釋,後來向老闆派來的工人始終是「借來的」,俗話說「有借就有還」,但這裏的問題就是何時還了,若是借來的工人剛完成一個任務就還回去,後來發現任務還有,那豈不是又要去借?這一來一往,老闆確定頭也大死了。
合理的策略:既然借了,那就多借一下子。直到「某一段」時間後,發現再也用不到這些工人時,即可以還回去了。這裏的某一段時間即是keepAliveTime的含義,TimeUnit爲keepAliveTime值的度量。
RejectedExecutionHandler
另外一種狀況即是,即便向老闆借了工人,可是任務仍是繼續過來,仍是忙不過來,這時整個隊伍只好拒絕接受了。
RejectedExecutionHandler接口提供了對於拒絕任務的處理的自定方法的機會。在ThreadPoolExecutor中已經默認包含了4中策略,由於源碼很是簡單,這裏直接貼出來。
CallerRunsPolicy:線程調用運行該任務的 execute 自己。此策略提供簡單的反饋控制機制,可以減緩新任務的提交速度。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }
這個策略顯然不想放棄執行任務。可是因爲池中已經沒有任何資源了,那麼就直接使用調用該execute的線程自己來執行。
AbortPolicy:處理程序遭到拒絕將拋出運行時RejectedExecutionException
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException(); }
這種策略直接拋出異常,丟棄任務。
DiscardPolicy:不能執行的任務將被刪除
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }
這種策略和AbortPolicy幾乎同樣,也是丟棄任務,只不過他不拋出異常。
DiscardOldestPolicy:若是執行程序還沒有關閉,則位於工做隊列頭部的任務將被刪除,而後重試執行程序(若是再次失敗,則重複此過程)
這是咱們系統的作法:
private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(16, 32, 5L, TimeUnit.MINUTES, new ArrayBlockingQueue<>(1000), new ThreadFactory() { private final ThreadGroup threadGroup = new ThreadGroup("fileTemplateMethodThreadGroup"); private final AtomicInteger threadNumber = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { return new Thread(threadGroup, r, "fileTemplateMethod-thread-pool-" + threadNumber.getAndIncrement()); } }, (r, executor) -> { if (!executor.isShutdown()) { /* 丟棄隊列最老的數據 */ if (executor.getQueue().poll() != null) { Cat.logMetricForCount(CatConstant.METRIC_DISCARD_FILE_TASK_COUNT); } executor.execute(r); } });
其實就是下面的代碼:注意: e.getQueue().poll();
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }
該策略就稍微複雜一些,在pool沒有關閉的前提下首先丟掉緩存在隊列中的最先的任務,而後從新嘗試運行該任務。這個策略須要適當當心。
設想:若是其餘線程都還在運行,那麼新來任務踢掉舊任務,緩存在queue中,再來一個任務又會踢掉queue中最老任務。
總結:
keepAliveTime和maximumPoolSize及BlockingQueue的類型均有關係。若是BlockingQueue是無界的,那麼永遠不會觸發maximumPoolSize,天然keepAliveTime也就沒有了意義。
反之,若是核心數較小,有界BlockingQueue數值又較小,同時keepAliveTime又設的很小,若是任務頻繁,那麼系統就會頻繁的申請回收線程。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); }
參考:線程池的原理及實現