異步線程池的使用總結

一.使用場景異步

       當前主流程中的業務處理愈來愈複雜,須要進行概括總結是否能夠異步去作來下降關鍵接口的耗時和風險,目前咱們在分流調度降級,接單,拒單主流程中將一些鏡像信息異步保留下來,固然咱們能夠將接單過程的打標等一系列不影響主流程的業務處理通通異步去作。ide

二.使用風險模塊化

     使用過程當中,咱們初步會遇到線程池的參數設置(會影響到資源浪費,任務堆積時是否會引發oom(生產者和消費者的問題),如何根據生產設置最優參數),異常監控和處理(如何監控運行異常和藹後 業務是否能接受拋棄仍是重試 莫名其妙丟失任務等),模塊化設計(統一入口,統一監控)等問題this

三.使用過程問題總結spa

    1.最初的參數設置  .net

       咱們都知道線程池的參數設置有幾個重要的參數,分別是核心線程數 最大線程數 任務隊列 線程工廠  線程空閒時間 任務拒絕策略線程

       First:設計

                咱們先來看看幾個參數的意義:code

       (1) 核心線程數:默認狀況下核心線程會一直存活,即便處於閒置狀態也不會受存keepAliveTime限制即沒有任務須要執行。除非將allowCoreThreadTimeOut設置爲true,默認是false,活動線程數小於核心線程數時,即便有線程空閒,線程池也會優先建立新線程處理。接口

       (2)最大線程數:

        當線程數>=corePoolSize  小於最大線程數時,且任務隊列已滿時,線程池會建立新線程來處理任務 

當線程數=maxPoolSize,且任務隊列已滿時,線程池會拒絕處理任務而拋出異常

       (3)任務隊列:

                最大線程數的大小設置和任務隊列的選擇以及大小限制有關,經常使用隊列 好比SynchronousQueue,LinkedBlockingQueue咱們來看看線程池的執行規則的比較

                       假設任務隊列沒有大小限制

                                        1.若是線程數量<=核心線程數量,那麼直接啓動一個核心線程來執行任務,不會放入隊列中。

          2.若是線程數量>核心線程數,但<=最大線程數,而且任務隊列是LinkedBlockingQueue的時候,超過核心線程數量的任務會放在任務隊列中排隊。因此沒有大小限制的話會一直添加到任務隊列中,因此不會存在啓動新線程。也就是說最大線程數的大小設置無心義,可是這種模式 若是生產速度過快 可能會消耗內存太多從而引oom。

                 3.若是線程數量>核心線程數,但<=最大線程數,而且任務隊列是SynchronousQueue的時候,線程池會建立新線程執行任務,這些任務也不會被放在任務列中。這些線程屬於非核心線程,在任務完成後,閒置時間達到了超時時間就會被清除。

                                        4.若是線程數量>核心線程數,而且>最大線程數,當任務隊列是SynchronousQueue的時候,會由於線程池拒絕添加任務而拋出異常。

                               假設任務隊列存在大小限制

                                        1.當LinkedBlockingQueue塞滿時,新增的任務會直接建立新線程來執行,當建立的線程數量超過最大線程數量時會拋異常。

 2.SynchronousQueue沒有數量限制。由於他根本不保持這些任務,而是直接交給線程池去執行。當任務數量超過最大線程數時會直接拋異常。                SynchronousQueue 大小限制無心義。

       (4)線程工廠:線程工廠,提供建立新線程的功能。通常默認。

       (5)線程空閒時間設置:   keepAliveTime,默認狀況下核心線程會一直存活,即便處於閒置狀態也不會受存keepAliveTime限制即沒有任務須要執行。

除非將allowCoreThreadTimeOut設置爲true,默認是false。可是爲true的時候 線程會退出直到爲0,false時 線程會退出到活動線程等於核心線程數。

       (6)任務拒絕策略:

                        首先咱們看下什麼狀況 新的任務會被拒絕 

                                    1.當線程池線程數量達到最大線程數而且隊列已滿的時候  

                                    2.當調用shutdown和shutdownnow方法的時候,新進的任務都會被拒絕,前者會執行完剩餘任務,後者會試圖中止剩餘任務 並返回任務列表。

那麼這種狀況發生的時候,新的任務會被怎樣處理呢?線程池提供了哪些策略呢?

                                   1.AbortPolicy 默認 會拋出異常 丟棄任務

                                   2.CallerRunsPolicy 沒有shutdown的話 主線程繼續執行任務

                                   3.DiscardPolicy 什麼都不作 丟棄任務

                                   4.DiscardOldestPolicy 判讀shutdown 並移除隊列的最後一個任務 同時線程池執行新任務而不是主線程執行新任務。

                      這幾種策略都是內部類  實現了同一個接口 RejectedExecutionHandler 若是你願意的話 也能夠本身實現  注意判斷是否shutdown的狀態就行了。

源碼中 提供了兩個參數 一個主線程 一個當前線程池 進行處理

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 

   

         Second:

             瞭解了這些參數的含義後  咱們如何根據業務定義這些值呢 

              - tasks :每秒的任務數,按照午高峯假設爲500~1000
              - taskcost:每一個任務花費時間,假設爲0.01s 
              - responsetime:系統容許容忍的最大響應時間,假設爲1s 
              - corePoolSize = 每秒須要多少個線程處理? 
                 * threadcount = tasks/(1/taskcost) =tasks*taskcout = (500~1000)*0.01 = 5~10 個線程。corePoolSize差很少這個範圍 假設初步設爲8
              - queueCapacity = (coreSizePool/taskcost)*responsetime 
                * 計算可得 queueCapacity = 8/0.01*1 = 800。意思是隊列裏的任務能夠等待1s,超過了的須要新開線程來執行,否則來不及處理 超過系統容忍的最大響應時間
                * 切記不能設置爲Integer.MAX_VALUE,這樣隊列會很大,線程數只會保持在corePoolSize大小,當任務陡增時,不能新開線程來執行,響應時間會隨之陡增。 
              - maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)+corePoolSize
               * 計算可得 maxPoolSize = (1000-800)/100 + 8= 10
               * (最大任務數-隊列容量)/每一個線程每秒處理能力 + 核心線程數= 最大線程數 
              - rejectedExecutionHandler:根據具體狀況來決定,任務不重要可丟棄,任務重要則要利用一些緩衝機制來處理 
              - keepAliveTime和allowCoreThreadTimeout採用默認一般能知足

               總結下來 初步上線的核心線程數8 最大線程數10 隊列容量800

 2.監控後的參數設置

              那麼實際線上的狀況 可能不是最優  因此咱們須要監控相關指標 在午高峯 晚高峯的時候看看線程池中活躍線程到底有多少 超不超過核心線程數 

              超過以後  任務隊列的大小又是多少?因此咱們統一一個入口  關心繫統中不一樣線程池的執行狀況 以下

/**

 * Created by huhaosumail on 17/8/25.

 */

public class GalaxyThreadPool extends ThreadPoolExecutor{

 

    private String poolName;

 

    public GalaxyThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,String poolName) {

        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);

        this.poolName=poolName;

    }

 

    protected void beforeExecute(Thread paramThread, Runnable paramRunnable) {

        //獲取工做線程

 

  }

 

    protected void afterExecute(Runnable r, Throwable t) {

 

        super.afterExecute(r, t);

        //獲取工做線程以及運行時異常進行處理 若是線上的話調用etrace 打點會不會訪問太多 能夠進行隨機數來記錄

        if(System.currentTimeMillis()%100==0){

            //獲取活動的線程數

            MetricUtil.recordIndex("threadPoolMonitor",this.getActiveCount(),poolName+".corePoolSize");

            //程池曾經建立過的最大線程數量

            MetricUtil.recordIndex("threadPoolMonitor",this.getLargestPoolSize(),poolName+".largestPoolSize");

            //線程池的線程數量

            MetricUtil.recordIndex("threadPoolMonitor",this.getPoolSize(),poolName+".poolSize");

            //任務隊列的長度

            MetricUtil.recordIndex("threadPoolMonitor",this.getQueue().size(),poolName+".queueSize");

        }

    }

 

    protected void terminated() {

        //線程池關閉以前能夠幹一些事情;

 

    }

    //對於執行結果的獲取

    @Override

    public Future<?> submit(Runnable task) {

        return super.submit(task);

    }

}


咱們封裝了本身的線程池  並實現了執行完任務的方法  在方法內 咱們能獲取到相關線程池的指標 如 活躍線程數  任務隊列大小等  結合線程池的執行策略 進行調優參數 

好比 活躍線程數始終不超過5

       

隊列長度基本沒有

就是說消費速度很快  核心線程數能夠適當調小  基本夠用

地址:

https://d.elenet.me/dashboard/db/galaxy-thread-pool-monitor?panelId=1&fullscreen&edit&editorTab=General

3.接下來咱們對線程池的執行結果進行處理 咱們知道提交任務後 咱們能夠觀察執行任務的結果 以下

List<Future> taskList =  new ArrayList<Future>();

for () {

    taskList.add(ruleMirrorImageThreadPool.submit(new RuleMirrorImageRunnable(order.getTrackingId(),currentRule.getChainContext().getScheduleUnitList(), order, currentSeqNum, currentRule.getId()));

}

 

if (Assert.isEmptyList(taskList)) {

    continue;

}

 

for (Future task : taskList) {

    try {

        task.get();

    } catch (Exception e) {

       //是否從新處理  或者持久化失敗信息 或者拋出異常等

    }

}


綜上 咱們對於線程池的最初參數設置  參數調優  統一監控指標  線程池異常處理等有了初步的瞭解。

遺留問題:1.如何在代碼層面監測到當前是否引發了oom或者線程池異常過多  而後當即關閉線程池 減少系統的風險性?

                  2.如何防止丟失任務?

相關文章
相關標籤/搜索