等待返回任務的結果的多步驟的處理場景, 批量併發執行任務,總耗時是單個步驟耗時最長的那個,提供總體的執行效率,程序員
最終一致性,異步執行任務,無需等待,快速返回數據庫
通常狀況下咱們是經過ThreadPoolExecutor來構造咱們的線程池對象的。緩存
* 阿里巴巴的開發規範文檔是禁止直接使用Executors靜態工廠類來建立線程池的,緣由是 服務器
【強制】線程池不容許使用 Executors 去建立,而是經過 ThreadPoolExecutor 的方式,這樣
的處理方式讓寫的同窗更加明確線程池的運行規則,規避資源耗盡的風險。
說明: Executors 返回的線程池對象的弊端以下:
(1) FixedThreadPool 和 SingleThreadPool :
容許的請求隊列長度爲 Integer.MAX_VALUE ,可能會堆積大量的請求,從而致使 OOM 。
(2) CachedThreadPool 和 ScheduledThreadPool :
容許的建立線程數量爲 Integer.MAX_VALUE ,可能會建立大量的線程,從而致使 OOM 。併發
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { }
參數說明:異步
- corePoolSize:核心線程數,線程池最低的線程數
- maximumPoolSize:容許的最大的線程數
- keepAliveTime:當前線程數超過corePoolSize的時候,空閒線程保留的時間
- unit: keepAliveTime線程保留的時間的單位
- workQueue: 任務緩衝區
- threadFactory: 線程的構造工廠
- handler: 線程池飽含時候的處理策略
Java經過Executors提供四種線程池,分別爲:async
- newCachedThreadPool建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程,若無可回收,則新建線程。
- newFixedThreadPool 建立一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。
- newScheduledThreadPool 建立一個定長線程池,支持定時及週期性任務執行。
- newSingleThreadExecutor 建立一個單線程化的線程池,它只會用惟一的工做線程來執行任務,保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行。
public static ExecutorService newCachedThreadPool(){ return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.MILLISECONDS,new SynchronousQueue<Runnable>()); }
它是一個能夠無限擴大的線程池;優化
- 它比較適合處理執行時間比較小的任務;
corePoolSize爲0,maximumPoolSize爲無限大,意味着線程數量能夠無限大;ui
keepAliveTime爲60S,意味着線程空閒時間超過60S就會被殺死;線程
採用SynchronousQueue裝等待的任務,這個阻塞隊列沒有存儲空間,這意味着只要有請求到來,就必需要找到一條工做線程處理他,若是當前沒有空閒的線程,那麼就會再建立一條新的線程。
public static ExecutorService newFixedThreadPool(int nThreads){ return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); }
- 它是一種固定大小的線程池;corePoolSize和maximunPoolSize都爲用戶設定的線程數量nThreads;
- keepAliveTime爲0,意味着一旦有多餘的空閒線程,就會被當即中止掉;但這裏keepAliveTime無效;
- 阻塞隊列採用了LinkedBlockingQueue,它是一個無界隊列;因爲阻塞隊列是一個無界隊列,所以永遠不可能拒絕任務;
- 因爲採用了無界隊列,實際線程數量將永遠維持在nThreads,所以maximumPoolSize和keepAliveTime將無效。
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
- 定時任務的使用
public static ExecutorService newSingleThreadExecutor(){ return new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); }
- 它只會建立一條工做線程處理任務;
- 採用的阻塞隊列爲LinkedBlockingQueue;
線程池 | 特色 | 建議使用場景 |
---|---|---|
newCachedThreadPool | 一、線程數無上限 二、空閒線程存活60s 三、阻塞隊列 |
一、任務執行時間短 二、任務要求響應時間短 |
newFixedThreadPool | 一、線程數固定 二、無界隊列 |
一、任務比較平緩 二、控制最大的線程數 |
newScheduledThreadPool | 核心線程數量固定、非核心線程數量無限制(閒置時立刻回收) | 執行定時 / 週期性 任務 |
newSingleThreadExecutor | 只有一個核心線程(保證全部任務按照指定順序在一個線程中執行,不須要處理線程同步的問題) | 不適合併發但可能引發IO阻塞性及影響UI線程響應的操做,如數據庫操做,文件操做等 |
現象 | 緣由 |
---|---|
整個系統影響緩慢,大部分504 | 一、爲設置最大的線程數,任務積壓過多,線程數用盡 |
oom | 一、隊列無界或者size設置過大 |
使用線程池對效率並無明顯的提高 | 一、線程池的參數設置太小,線程數太小或者隊列太小,或者是服務器的cpu核數過低 |
- 線程池中線程數和隊列的類型及長度對線程會形成很大的影響,並且會爭奪系統稀有資源,線程數。設置不當,或是沒有最大的利用系統資源,提升系統的總體運行效率,或是致使整個系統的故障。典型的場景是線程數被佔滿,其餘的請求無響應。或是任務積壓過多,直接oom
- 方便的排查線程中的故障以及優化線程池的使用
另起一個定時單線程數的線程池newSingleThreadScheduledExecutor
調用scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)定時執行監控任務;
定時任務內 經過ThreadPoolExecutor對象獲取監控的對象信息,好比t線程池須要執行的任務數、線程池在運行過程當中已完成的任務數、曾經建立過的最大線程數、線程池裏的線程數量、線程池裏活躍的線程數量、當前排隊線程數
根據預設的日誌或報警策略,進行規則控制
定義線程池並啓動監控
/** * 定義線程池的隊列的長度 */ private final Integer queueSize = 1000; /** * 定義一個定長的線程池 */ private ExecutorService executorService; @PostConstruct private void initExecutorService() { log.info( "executorService init with param: threadcount:{} ,queuesize:{}", systemConfig.getThreadCount(), systemConfig.getThreadQueueSize()); executorService = new ThreadPoolExecutor( systemConfig.getThreadCount(), systemConfig.getThreadCount(), 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(systemConfig.getThreadQueueSize()), new BasicThreadFactory.Builder() .namingPattern("async-sign-thread-%d") .build(), (r, executor) -> log.error("the async executor pool is full!!")); /** 啓動線程池的監控 */ ThreadPoolMonitoring threadPoolMonitoring = new ThreadPoolMonitoring(); threadPoolMonitoring.init(); }
線程池的監控
/** * 功能說明:線程池監控 * * @params * @return <br> * 修改歷史<br> * [2019年06月14日 10:20:10 10:20] 建立方法by fengqingyang */ public class ThreadPoolMonitoring { /** 用於週期性監控線程池的運行狀態 */ private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( new BasicThreadFactory.Builder() .namingPattern("async thread executor monitor") .build()); /** * 功能說明:自動運行監控 * * @return <br> * 修改歷史<br> * [2019年06月14日 10:26:51 10:26] 建立方法by fengqingyang * @params */ public void init() { scheduledExecutorService.scheduleAtFixedRate( () -> { try { ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService; /** 線程池須要執行的任務數 */ long taskCount = threadPoolExecutor.getTaskCount(); /** 線程池在運行過程當中已完成的任務數 */ long completedTaskCount = threadPoolExecutor.getCompletedTaskCount(); /** 曾經建立過的最大線程數 */ long largestPoolSize = threadPoolExecutor.getLargestPoolSize(); /** 線程池裏的線程數量 */ long poolSize = threadPoolExecutor.getPoolSize(); /** 線程池裏活躍的線程數量 */ long activeCount = threadPoolExecutor.getActiveCount(); /** 當前排隊線程數 */ int queueSize = threadPoolExecutor.getQueue().size(); log.info( "async-executor monitor. taskCount:{}, completedTaskCount:{}, largestPoolSize:{}, poolSize:{}, activeCount:{},queueSize:{}", taskCount, completedTaskCount, largestPoolSize, poolSize, activeCount, queueSize); /** 超過閥值的80%報警 */ if (activeCount >= systemConfig.getThreadCount() * 0.8) { log.error( "async-executor monitor. taskCount:{}, completedTaskCount:{}, largestPoolSize:{}, poolSize:{}, activeCount:{},queueSize:{}", taskCount, completedTaskCount, largestPoolSize, poolSize, activeCount, queueSize); ; } } catch (Exception ex) { log.error("ThreadPoolMonitoring service error,{}", ex.getMessage()); } }, 0, 30, TimeUnit.SECONDS); } }
- 線程數要合理設置,通常建議值是核數的2倍。
- 線程池隊列的類型和長度要根據業特性合理設置
- 不一樣的業務須要線程池隔離,避免相互影響
- 未每一個線程池增長特有的命名規範以及關鍵的日誌,方便出問題排查和優化
更多精彩,敬請關注, 程序員導航網 https://chenzhuofan.top