線程池ThreadPoolExecutor類的使用

1.使用線程池的好處?

第一:下降資源消耗。經過重複利用已建立的線程下降線程建立和銷燬形成的消耗。html

第二:提升響應速度。當任務到達時,任務能夠不須要等到線程建立就能當即執行。java

第三:提升線程的可管理性。線程是稀缺資源,若是無限制的建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行統一的分配,調優和監控。數據庫

能夠先看下線程池的類圖:數組

 

2.ThreadPoolExecutor的使用

線程池的狀態:服務器

A.線程池的建立dom

咱們能夠經過java.util.concurrent.ThreadPoolExecutor來建立一個線程池。ide

new  ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler);

建立線程池須要的參數介紹:源碼分析

  • corePoolSize(線程池的基本大小):當提交一個任務到線程池時,線程池會建立一個線程來執行任務,即便其餘空閒的基本線程可以執行新任務也會建立線程,等到須要執行的任務數大於線程池基本大小時就再也不建立。若是調用了線程池的prestartAllCoreThreads方法,線程池會提早建立並啓動全部基本線程。

  • runnableTaskQueue(任務隊列):用於保存等待執行的任務的阻塞隊列。 能夠選擇如下幾個阻塞隊列。

    • ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。
    • LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按FIFO (先進先出) 排序元素,吞吐量一般要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列。
    • SynchronousQueue:一個不存儲元素的阻塞隊列。每一個插入操做必須等到另外一個線程調用移除操做,不然插入操做一直處於阻塞狀態,吞吐量一般要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個隊列。
    • PriorityBlockingQueue:一個具備優先級的無限阻塞隊列。
  • maximumPoolSize(線程池最大大小):線程池容許建立的最大線程數。若是隊列滿了,而且已建立的線程數小於最大線程數,則線程池會再建立新的線程執行任務。值得注意的是若是使用了無界的任務隊列這個參數就沒什麼效果。

  • ThreadFactory:用於設置建立線程的工廠,能夠經過線程工廠給每一個建立出來的線程設置更有意義的名字。

  • RejectedExecutionHandler(飽和策略):當隊列和線程池都滿了,說明線程池處於飽和狀態,那麼必須採起一種策略處理提交的新任務。這個策略默認狀況下是AbortPolicy,表示沒法處理新任務時拋出異常。如下是JDK1.5提供的四種策略。

    • AbortPolicy:直接拋出異常。
    • CallerRunsPolicy:只用調用者所在線程來運行任務。
    • DiscardOldestPolicy:丟棄隊列裏最近的一個任務,並執行當前任務。
    • DiscardPolicy:不處理,丟棄掉。
    • 固然也能夠根據應用場景須要來實現RejectedExecutionHandler接口自定義策略。如記錄日誌或持久化不能處理的任務。
  • keepAliveTime(線程活動保持時間):線程池的工做線程空閒後,保持存活的時間。因此若是任務不少,而且每一個任務執行的時間比較短,能夠調大這個時間,提升線程的利用率。

  • TimeUnit(線程活動保持時間的單位):可選的單位有天(DAYS),小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

B.向線程池提交任務this

  提交任務有execute()和submit()兩個方法,下面看看他倆的區別:spa

  ①接收參數不一樣

  execute()的參數是Runnable,submit()參數能夠是Runnable,也能夠是Cable。

  ②返回值不一樣

  execute()沒有返回值,submit()有返回值Future。經過Future能夠獲取各個線程的完成狀況,是否有異常,還能試圖取消任務的執行。詳見》》》》》》》》

  execute()很好理解,下面看個使用submit()獲取返回值的例子,假設我有不少更新各類數據的task,我但願若是其中一個task失敗,其它的task就不須要執行了。那我就須要catch Future.get拋出的異常,而後終止其它task的執行,代碼以下:

 1 public class SubmitTest {
 2     
 3     public static void main(String[] args) {
 4         ExecutorService executorService = Executors.newCachedThreadPool();
 5         List<Future<String>> futureList = new ArrayList<>();
 6         // 建立10個任務並執行  
 7         for (int i = 0; i < 10; i++) {
 8             // 使用ExecutorService執行Callable類型的任務,並將結果保存在future變量中
 9             Future<String> future = executorService.submit(new TaskRunn(i));
10             // 將任務執行結果存儲到List中
11             futureList.add(future);
12         }
13         // 正常關閉線程池
14         executorService.shutdown();
15         // 遍歷任務的結果  
16         for (Future<String> future : futureList) {
17             try {
18                 System.out.println(future.get());
19             } catch (InterruptedException e) {
20                 e.printStackTrace();
21             } catch (ExecutionException e) {
22                 // 出錯了中止全部的線程
23                 executorService.shutdownNow();
24                 e.printStackTrace();
25                 return;
26             }
27         }
28     }
29 }
30 
31 class TaskRunn implements Callable<String>{
32     
33     private int id;
34     public TaskRunn(int id) {
35         this.id = id;
36     }
37 
38     /**
39      * 任務的具體過程,一旦任務傳給ExecutorService的submit方法,則該方法自動在一個線程上執行
40      */
41     @Override
42     public String call() throws Exception {
43         System.out.println("call() begin..."+id+"//"+Thread.currentThread().getName());
44         if (new Random().nextInt(10) > 5) {
45             throw new TaskException("task err:"+id+"//"+Thread.currentThread().getName());
46         }
47         // 模擬業務耗時
48         for (int i = 0; i < 10; i++) {
49             Thread.sleep(1000);
50         }
51         return "result:"+id+"//" +Thread.currentThread().getName();
52     }
53 }
54 
55 // 定義本身的異常
56 class TaskException extends Exception{
57     public TaskException(String mess) {
58         super(mess);
59     }
60 }

 

c.線程池的關閉

咱們能夠經過調用線程池的shutdown或shutdownNow方法來關閉線程池,它們的區別詳見 http://www.cnblogs.com/shamo89/p/6703563.html

能夠簡單的總結爲shutdown()是正常結束線程池,已經添加進去正在執行的線程正常執行,沒添加的線程不會再添加。shutdownNow()則是強制中斷線程池裏的線程,可是由於是經過interuppt()來執行的,因此會有侷限性,另外該方法會返回未執行的任務。

因此一般調shutdown來正常關閉線程池,若是任務不必定要執行完,則能夠調用shutdownNow。

3. 線程池的分析

A.流程分析:線程池的主要工做流程以下圖:

 

從上圖咱們能夠看出,當提交一個新任務到線程池時,線程池的處理流程以下:

  1. 首先線程池判斷基本線程池是否已滿?沒滿,建立一個工做線程來執行任務。滿了,則進入下個流程。
  2. 其次線程池判斷工做隊列是否已滿?沒滿,則將新提交的任務存儲在工做隊列裏。滿了,則進入下個流程。
  3. 最後線程池判斷整個線程池是否已滿?沒滿,則建立一個新的工做線程來執行任務,滿了,則交給飽和策略來處理這個任務。

B.源碼分析

上面的流程分析讓咱們很直觀的瞭解了線程池的工做原理,讓咱們再經過源代碼來看看是如何實現的。線程池執行任務的方法以下:

 1 public void execute(Runnable command) {
 2     if (command == null)
 3        throw new NullPointerException();
 4     //若是線程數小於基本線程數,則建立線程並執行當前任務
 5     if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
 6     //如線程數大於等於基本線程數或線程建立失敗,則將當前任務放到工做隊列中。
 7         if (runState == RUNNING && workQueue.offer(command)) {
 8             if (runState != RUNNING || poolSize == 0)
 9                       ensureQueuedTaskHandled(command);
10         }
11     //若是線程池不處於運行中或任務沒法放入隊列,而且當前線程數量小於最大容許的線程數量,
12 // 則建立一個線程執行任務。
13         else if (!addIfUnderMaximumPoolSize(command))
14         //拋出RejectedExecutionException異常
15             reject(command); // is shutdown or saturated
16     }
17 }

C.工做線程

線程池建立線程時,會將線程封裝成工做線程Worker,Worker在執行完任務後,還會無限循環獲取工做隊列裏的任務來執行。咱們能夠從Worker的run方法裏看到這點:

 1 public void run() {
 2      try {
 3            Runnable task = firstTask;
 4            firstTask = null;
 5             while (task != null || (task = getTask()) != null) {
 6                     runTask(task);
 7                     task = null;
 8             }
 9       } finally {
10              workerDone(this);
11       }
12 }

4. 合理的配置線程池

要想合理的配置線程池,就必須首先分析任務特性,能夠從如下幾個角度來進行分析:

  1. 任務的性質:CPU密集型任務,IO密集型任務和混合型任務。
  2. 任務的優先級:高,中和低。
  3. 任務的執行時間:長,中和短。
  4. 任務的依賴性:是否依賴其餘系統資源,如數據庫鏈接。

任務性質不一樣的任務能夠用不一樣規模的線程池分開處理。

CPU密集型任務配置儘量小的線程,如配置Ncpu+1個線程的線程池。

IO密集型任務則因爲線程並非一直在執行任務,則配置儘量多的線程,如2*Ncpu。

混合型的任務,若是能夠拆分,則將其拆分紅一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐率要高於串行執行的吞吐率,若是這兩個任務執行時間相差太大,則不必進行分解。

咱們能夠經過Runtime.getRuntime().availableProcessors()方法得到當前設備的CPU個數。

優先級不一樣的任務可使用優先級隊列PriorityBlockingQueue來處理。它可讓優先級高的任務先獲得執行,須要注意的是若是一直有優先級高的任務提交到隊列裏,那麼優先級低的任務可能永遠不能執行。

執行時間不一樣的任務能夠交給不一樣規模的線程池來處理,或者也可使用優先級隊列,讓執行時間短的任務先執行。

依賴數據庫鏈接池的任務,由於線程提交SQL後須要等待數據庫返回結果,若是等待的時間越長CPU空閒時間就越長,那麼線程數應該設置越大,這樣才能更好的利用CPU。

建議使用有界隊列,有界隊列能增長系統的穩定性和預警能力,能夠根據須要設大一點,好比幾千。

有一次咱們組使用的後臺任務線程池的隊列和線程池全滿了,不斷的拋出拋棄任務的異常,經過排查發現是數據庫出現了問題,致使執行SQL變得很是緩慢,由於後臺任務線程池裏的任務全是須要向數據庫查詢和插入數據的,因此致使線程池裏的工做線程所有阻塞住,

任務積壓在線程池裏。

若是當時咱們設置成無界隊列,線程池的隊列就會愈來愈多,有可能會撐滿內存,致使整個系統不可用,而不僅是後臺任務出現問題。

固然咱們的系統全部的任務是用的單獨的服務器部署的,而咱們使用不一樣規模的線程池跑不一樣類型的任務,可是出現這樣問題時也會影響到其餘任務。

5. 線程池的監控

經過線程池提供的參數進行監控。線程池裏有一些屬性在監控線程池的時候可使用

  • taskCount:線程池須要執行的任務數量。
  • completedTaskCount:線程池在運行過程當中已完成的任務數量。小於或等於taskCount。
  • largestPoolSize:線程池曾經建立過的最大線程數量。經過這個數據能夠知道線程池是否滿過。如等於線程池的最大大小,則表示線程池曾經滿了。
  • getPoolSize:線程池的線程數量。若是線程池不銷燬的話,池裏的線程不會自動銷燬,因此這個大小隻增不+ getActiveCount:獲取活動的線程數。

經過擴展線程池進行監控。經過繼承線程池並重寫線程池的beforeExecute,afterExecute和terminated方法,咱們能夠在任務執行前,執行後和線程池關閉前幹一些事情。如監控任務的平均執行時間,最大執行時間和最小執行時間等。這幾個方法在線程池裏是空方法。如:

protected void beforeExecute(Thread t, Runnable r) { }
相關文章
相關標籤/搜索