理解線程池到走進dubbo源碼

引言

合理利用線程池可以帶來三個好處。java

​ 第一:下降資源消耗。經過重複利用已建立的線程下降線程建立和銷燬形成的消耗。git

​ 第二:提升響應速度。當任務到達時,任務能夠不須要等到線程建立就能當即執行。github

​ 第三:提升線程的可管理性。線程是稀缺資源,若是無限制的建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行統一的分配,調優和監控。可是要作到合理的利用線程池,必須對其原理了如執掌。數據庫

 


 

 

 

線程池的使用

線程池的建立

​ 咱們能夠經過ThreadPoolExecutor來建立一個線程池數組

1 new  ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler);

 

建立一個線程池須要輸入幾個參數:緩存

  • corePoolSize(線程池的基本大小):當提交一個任務到線程池時,線程池會建立一個線程來執行任務,即便其餘空閒的基本線程可以執行新任務也會建立線程,等到須要執行的任務數大於線程池基本大小時就再也不建立。若是調用了線程池的prestartAllCoreThreads方法,線程池會提早建立並啓動全部基本線程。
  • runnableTaskQueue(任務隊列):用於保存等待執行的任務的阻塞隊列。 能夠選擇如下幾個阻塞隊列。
  1. ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。
  2. LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按FIFO (先進先出) 排序元素,吞吐量一般要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列。
  3. SynchronousQueue:一個不存儲元素的阻塞隊列。每一個插入操做必須等到另外一個線程調用移除操做,不然插入操做一直處於阻塞狀態,吞吐量一般要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個隊列。
  4. PriorityBlockingQueue:一個具備優先級的無限阻塞隊列。
  • maximumPoolSize(線程池最大大小):線程池容許建立的最大線程數。若是隊列滿了,而且已建立的線程數小於最大線程數,則線程池會再建立新的線程執行任務。值得注意的是若是使用了無界的任務隊列這個參數就沒什麼效果。
  • ThreadFactory:用於設置建立線程的工廠,能夠經過線程工廠給每一個建立出來的線程設置更有意義的名字。
  • RejectedExecutionHandler(飽和策略):當隊列和線程池都滿了,說明線程池處於飽和狀態,那麼必須採起一種策略處理提交的新任務。這個策略默認狀況下是AbortPolicy,表示沒法處理新任務時拋出異常。如下是JDK1.5提供的四種策略。
  1. AbortPolicy:直接拋出異常。
  2. CallerRunsPolicy:只用調用者所在線程來運行任務。
  3. DiscardOldestPolicy:丟棄隊列裏最近的一個任務,並執行當前任務。
  4. DiscardPolicy:不處理,丟棄掉。
  5. 固然也能夠根據應用場景須要來實現RejectedExecutionHandler接口自定義策略。如記錄日誌或持久化不能處理的任務。
  • keepAliveTime(線程活動保持時間):線程池的工做線程空閒後,保持存活的時間。因此若是任務不少,而且每一個任務執行的時間比較短,能夠調大這個時間,提升線程的利用率。
  • TimeUnit(線程活動保持時間的單位):可選的單位有天(DAYS),小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

 

向線程池提交任務

​ 咱們可使用execute提交的任務,可是execute方法沒有返回值,因此沒法判斷任務是否被線程池執行成功。經過如下代碼可知execute方法輸入的任務是一個Runnable類的實例。服務器

1 threadsPool.execute(new Runnable() {
2             @Override
3             public void run() {
4                 // TODO Auto-generated method stub
5             }
6         });

咱們也可使用submit 方法來提交任務,它會返回一個future,那麼咱們能夠經過這個future來判斷任務是否執行成功,經過future的get方法來獲取返回值,get方法會阻塞住直到任務完成,而使用get(long timeout, TimeUnit unit)方法則會阻塞一段時間後當即返回,這時有可能任務沒有執行完。併發

 1 Future<Object> future = executor.submit(harReturnValuetask);
 2 try {
 3      Object s = future.get();
 4 } catch (InterruptedException e) {
 5     // 處理中斷異常
 6 } catch (ExecutionException e) {
 7     // 處理沒法執行任務異常
 8 } finally {
 9     // 關閉線程池
10     executor.shutdown();
11 }

 

線程池的關閉

​ 咱們能夠經過調用線程池的shutdown或shutdownNow方法來關閉線程池,它們的原理是遍歷線程池中的工做線程,而後逐個調用線程的interrupt方法來中斷線程,因此沒法響應中斷的任務可能永遠沒法終止。可是它們存在必定的區別,shutdownNow首先將線程池的狀態設置成STOP,而後嘗試中止全部的正在執行或暫停任務的線程,並返回等待執行任務的列表,而shutdown只是將線程池的狀態設置成SHUTDOWN狀態,而後中斷全部沒有正在執行任務的線程。ide

​ 只要調用了這兩個關閉方法的其中一個,isShutdown方法就會返回true。當全部的任務都已關閉後,才表示線程池關閉成功,這時調用isTerminaed方法會返回true。至於咱們應該調用哪種方法來關閉線程池,應該由提交到線程池的任務特性決定,一般調用shutdown來關閉線程池,若是任務不必定要執行完,則能夠調用shutdownNow。源碼分析

 

線程池的分析

​ 流程分析: 線程池的主要工做流程以下圖:

從上圖咱們能夠看出,當提交一個新任務到線程池時,線程池的處理流程以下:

  1. 首先線程池判斷基本線程池是否已滿?沒滿,建立一個工做線程來執行任務。滿了,則進入下個流程。
  2. 其次線程池判斷工做隊列是否已滿?沒滿,則將新提交的任務存儲在工做隊列裏。滿了,則進入下個流程。
  3. 最後線程池判斷整個線程池是否已滿?沒滿,則建立一個新的工做線程來執行任務,滿了,則交給飽和策略來處理這個任務。

 

源碼分析。上面的流程分析讓咱們很直觀的瞭解了線程池的工做原理,讓咱們再經過源代碼來看看是如何實現的。線程池執行任務的方法以下:

 1 public void execute(Runnable command) {
 2     if (command == null)
 3        throw new NullPointerException();
 4     //若是線程數小於基本線程數,則建立線程並執行當前任務 
 5     if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
 6     //如線程數大於等於基本線程數或線程建立失敗,則將當前任務放到工做隊列中。
 7         if (runState == RUNNING && workQueue.offer(command)) {
 8             if (runState != RUNNING || poolSize == 0)
 9                       ensureQueuedTaskHandled(command);
10         }
11     //若是線程池不處於運行中或任務沒法放入隊列,而且當前線程數量小於最大容許的線程數量,
12 則建立一個線程執行任務。
13         else if (!addIfUnderMaximumPoolSize(command))
14         //拋出RejectedExecutionException異常
15             reject(command); // is shutdown or saturated
16     }
17 }

 

工做線程。線程池建立線程時,會將線程封裝成工做線程Worker,Worker在執行完任務後,還會無限循環獲取工做隊列裏的任務來執行。咱們能夠從Worker的run方法裏看到這點:

 1 public void run() {
 2      try {
 3            Runnable task = firstTask;
 4            firstTask = null;
 5             while (task != null || (task = getTask()) != null) {
 6                     runTask(task);
 7                     task = null;
 8             }
 9       } finally {
10              workerDone(this);
11       }
12 }

 

合理的配置線程池

​ 要想合理的配置線程池,就必須首先分析任務特性,能夠從如下幾個角度來進行分析:

  1. 任務的性質:CPU密集型任務,IO密集型任務和混合型任務。

  2. 任務的優先級:高,中和低。

  3. 任務的執行時間:長,中和短。

  4. 任務的依賴性:是否依賴其餘系統資源,如數據庫鏈接。

    ​ 任務性質不一樣的任務能夠用不一樣規模的線程池分開處理。CPU密集型任務配置儘量小的線程,如配置Ncpu+1個線程的線程池。IO密集型任務則因爲線程並非一直在執行任務,則配置儘量多的線程,如2*Ncpu。混合型的任務,若是能夠拆分,則將其拆分紅一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐率要高於串行執行的吞吐率,若是這兩個任務執行時間相差太大,則不必進行分解。咱們能夠經過Runtime.getRuntime().availableProcessors()方法得到當前設備的CPU個數。

    ​ 優先級不一樣的任務可使用優先級隊列PriorityBlockingQueue來處理。它可讓優先級高的任務先獲得執行,須要注意的是若是一直有優先級高的任務提交到隊列裏,那麼優先級低的任務可能永遠不能執行。

    ​ 執行時間不一樣的任務能夠交給不一樣規模的線程池來處理,或者也可使用優先級隊列,讓執行時間短的任務先執行。

    ​ 依賴數據庫鏈接池的任務,由於線程提交SQL後須要等待數據庫返回結果,若是等待的時間越長CPU空閒時間就越長,那麼線程數應該設置越大,這樣才能更好的利用CPU。

    ​ 建議使用有界隊列,有界隊列能增長系統的穩定性和預警能力,能夠根據須要設大一點,好比幾千。有一次咱們組使用的後臺任務線程池的隊列和線程池全滿了,不斷的拋出拋棄任務的異常,經過排查發現是數據庫出現了問題,致使執行SQL變得很是緩慢,由於後臺任務線程池裏的任務全是須要向數據庫查詢和插入數據的,因此致使線程池裏的工做線程所有阻塞住,任務積壓在線程池裏。若是當時咱們設置成無界隊列,線程池的隊列就會愈來愈多,有可能會撐滿內存,致使整個系統不可用,而不僅是後臺任務出現問題。固然咱們的系統全部的任務是用的單獨的服務器部署的,而咱們使用不一樣規模的線程池跑不一樣類型的任務,可是出現這樣問題時也會影響到其餘任務。

 

線程池的監控

經過線程池提供的參數進行監控。線程池裏有一些屬性在監控線程池的時候可使用

  • taskCount:線程池須要執行的任務數量。
  • completedTaskCount:線程池在運行過程當中已完成的任務數量。小於或等於taskCount。
  • largestPoolSize:線程池曾經建立過的最大線程數量。經過這個數據能夠知道線程池是否滿過。如等於線程池的最大大小,則表示線程池曾經滿了。
  • getPoolSize:線程池的線程數量。若是線程池不銷燬的話,池裏的線程不會自動銷燬,因此這個大小隻增不+ getActiveCount:獲取活動的線程數。

經過擴展線程池進行監控。經過繼承線程池並重寫線程池的beforeExecute,afterExecute和terminated方法,咱們能夠在任務執行前,執行後和線程池關閉前幹一些事情。如監控任務的平均執行時間,最大執行時間和最小執行時間等。這幾個方法在線程池裏是空方法。以下:

 1 protected void beforeExecute(Thread t, Runnable r) { } 

 

dubbo對線程池的使用

dubbo-common 模塊的threadpool包下體現,以下圖所示:

 

ThreadPool

com.alibaba.dubbo.common.threadpool.ThreadPool ,線程池接口。代碼以下:

 1 //@SPI("fixed")註解,Dubbo SPI擴展點,默認爲"fixed"。
 2 @SPI("fixed")
 3 public interface ThreadPool {
 4     /**
 5      * @Adaptive({Constants.THREADPOOL_KEY}) 註解,基於Dubbo SPI Adaptive機制,加載對應的線程池實現,使用URL.threadpool屬性。
 6      * getExecutor(url)方法,得到對應的線程池的執行器
 7      *
 8      */
 9     @Adaptive({Constants.THREADPOOL_KEY})
10     Executor getExecutor(URL url);
11 
12 }

 

FixedThreadPool

com.alibaba.dubbo.common.threadpool.support.fixed.FixedThreadPool ,實現ThreadPool接口,固定大小線程池,啓動時創建線程,不關閉,一直持有。代碼以下:

 1 public class FixedThreadPool implements ThreadPool {
 2 
 3     @Override
 4     public Executor getExecutor(URL url) {
 5         //線程名
 6         String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
 7         //線程數
 8         int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
 9         //隊列數
10         int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
11         //建立執行器
12         return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
13                 /**
14                  * 根據不一樣的隊列數,使用不一樣的隊列實現:
15                  * queues == 0,SynchronousQueue對象。
16                  * queues < 0,LinkedBlockingQueue對象。
17                  * queues > 0,帶隊列數的LinkedBlockingQueue對象。
18                  */
19                 queues == 0 ? new SynchronousQueue<Runnable>() :
20                         (queues < 0 ? new LinkedBlockingQueue<Runnable>()
21                                 : new LinkedBlockingQueue<Runnable>(queues)),
22                 /**
23                  * 建立NamedThreadFactory對象,用於生成線程名
24                  * 建立AbortPolicyWithReport對象,用於當任務添加到線程池中被拒絕時。
25                  */
26                 new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
27     }
28 }

 

推薦閱讀:

 

CachedThreadPool

com.alibaba.dubbo.common.threadpool.support.cached.CachedThreadPool ,實現ThreadPool接口,緩存線程池,空閒必定時長,自動刪除,須要時重建。代碼以下:

 1 public class CachedThreadPool implements ThreadPool {
 2 
 3     @Override
 4     public Executor getExecutor(URL url) {
 5         //線程池名
 6         String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
 7         //核心線程數
 8         int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
 9         //最大線程數
10         int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
11         //隊列數
12         int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
13         //線程存活時長
14         int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
15         //建立執行器
16         return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
17                 queues == 0 ? new SynchronousQueue<Runnable>() :
18                         (queues < 0 ? new LinkedBlockingQueue<Runnable>()
19                                 : new LinkedBlockingQueue<Runnable>(queues)),
20                 new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
21     }
22 }

 

LimitedThreadPool

com.alibaba.dubbo.common.threadpool.support.limited.LimitedThreadPool ,實現ThreadPool接口,可伸縮線程池,但池中的線程池只會增加不會收縮。只增加不收縮的目的是爲了不收縮時忽然來了大流量引發的性能問題。代碼以下:

 1 public class LimitedThreadPool implements ThreadPool {
 2 
 3     @Override
 4     public Executor getExecutor(URL url) {
 5         //線程池名
 6         String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
 7         //核心線程數
 8         int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
 9         //最大線程數
10         int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
11         //隊列數
12         int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
13         /**
14          * 和CachedThreadPool實現是基本一致的,差別點在alive == Integer.MAX_VALUE,空閒時間無限大,即不會刪除。
15          */
16         return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
17                 queues == 0 ? new SynchronousQueue<Runnable>() :
18                         (queues < 0 ? new LinkedBlockingQueue<Runnable>()
19                                 : new LinkedBlockingQueue<Runnable>(queues)),
20                 new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
21     }
22 
23 }

 

AbortPolicyWithReport

com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport ,實現 java.util.concurrent.ThreadPoolExecutor.AbortPolicy,拒絕策略實現類。打印JStack,分析線程狀態 代碼以下:

  1 /**
  2  * AbortPolicyWithReport實現自ThreadPoolExecutor.AbortPolicy,拒絕策略實現類,
  3  * 打印JStack,分析線程狀態。
  4  */
  5 public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
  6 
  7 
  8     protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);
  9     /**
 10      * 線程名
 11      */
 12     private final String threadName;
 13 
 14     /**
 15      * URL 對象
 16      */
 17     private final URL url;
 18 
 19     /**
 20      * 最後打印時間
 21      */
 22     private static volatile long lastPrintTime = 0;
 23 
 24     /**
 25      * 信號量,大小爲1。
 26      */
 27     private static Semaphore guard = new Semaphore(1);
 28 
 29     public AbortPolicyWithReport(String threadName, URL url) {
 30         this.threadName = threadName;
 31         this.url = url;
 32     }
 33 
 34     @Override
 35     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
 36         /**
 37          * 打印告警日誌
 38          */
 39         String msg = String.format("Thread pool is EXHAUSTED!" +
 40                         " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
 41                         " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
 42                 threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
 43                 e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
 44                 url.getProtocol(), url.getIp(), url.getPort());
 45         logger.warn(msg);
 46         // 打印 JStack,分析線程狀態。
 47         dumpJStack();
 48         //拋出 RejectedExecutionException 異常
 49         throw new RejectedExecutionException(msg);
 50     }
 51 
 52     private void dumpJStack() {
 53         long now = System.currentTimeMillis();
 54         //每 10 分鐘,打印一次。
 55         //dump every 10 minutes
 56         if (now - lastPrintTime < 10 * 60 * 1000) {
 57             return;
 58         }
 59         //得到信號量
 60         if (!guard.tryAcquire()) {
 61             return;
 62         }
 63         //建立線程池,後臺執行打印JStack
 64         Executors.newSingleThreadExecutor().execute(new Runnable() {
 65             @Override
 66             public void run() {
 67                 //得到路徑
 68                 String dumpPath = url.getParameter(Constants.DUMP_DIRECTORY, System.getProperty("user.home"));
 69 
 70                 SimpleDateFormat sdf;
 71                 //得到系統
 72                 String OS = System.getProperty("os.name").toLowerCase();
 73 
 74                 // window system don't support ":" in file name
 75                 if(OS.contains("win")){
 76                     sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
 77                 }else {
 78                     sdf = new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss");
 79                 }
 80 
 81                 String dateStr = sdf.format(new Date());
 82                 //得到輸出流
 83                 FileOutputStream jstackStream = null;
 84                 try {
 85                     jstackStream = new FileOutputStream(new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr));
 86                     //打印JStack
 87                     JVMUtil.jstack(jstackStream);
 88                 } catch (Throwable t) {
 89                     logger.error("dump jstack error", t);
 90                 } finally {
 91                     //釋放信號量
 92                     guard.release();
 93                     //釋放輸出流
 94                     if (jstackStream != null) {
 95                         try {
 96                             jstackStream.flush();
 97                             jstackStream.close();
 98                         } catch (IOException e) {
 99                         }
100                     }
101                 }
102                 //記錄最後打印時間
103                 lastPrintTime = System.currentTimeMillis();
104             }
105         });
106 
107     }
108 
109 }

推薦閱讀:

相關文章
相關標籤/搜索