高併發系列(九)--線程池詳解和參數設置(絕對幹活)

1、點題html

    這篇將介紹線程池的使用,咱們時常就有一個這樣的疑問,爲何要使用線程池?不是有更簡單的方式建立和開啓線程?還有怎麼使用線程池?線程池怎麼與Springboot整合,意思就是怎麼使用使用springBoot機制管理?帶着這些疑問開啓這篇文章之旅。java

    本篇的文章主要內容:程序員

    1.概念。spring

    2.線程池。緩存

    3.線程池配置。服務器

    4.Springboot管理線程池。多線程

    5.線程池隔離。併發

 

2、概念ide

 

1.爲何要使用線程池技術?性能

    若是不使用線程池技術,通常咱們是使用new Thread技術進行建立線程。可是爲何不推薦使用這個呢?主要是由於new Thread會帶來下面的弊端:

1.每次new Thread新建對象,性能差。

2.線程缺少統一管理,可能無限制的新建線程,互相競爭,有可能佔用過多系統資源致使死機或OOM。 

3.缺少更多功能,如更多執行、按期執行、線程中斷。

    其實在阿里巴巴規範裏也說明了,強制使用線程池建立:

2.使用線程池有什麼好處呢?

    固然上面的問題都是解決了的,具體的詳細,以下:

1.重用存在的線程,減小對象建立、消亡的開銷,性能佳。

2.可有效控制最大併發線程數,提升系統資源利用率,同時能夠避免過多資源競爭,避免阻塞。

3.提供定時執行、按期執行、單線程、併發數控制等功能。

    那麼線程池的使用場景,或者說是目的又是什麼呢?

線程是稀缺資源,不能頻繁的建立。

解耦做用;線程的建立於執行徹底分開,方便維護。 

應當將其放入一個池子中,能夠給其餘任務進行復用。

 

3、線程池

 

1.線程池究竟是什麼?

    java.util.concurrent.Executors提供了一個 java.util.concurrent.Executor接口的實現用於建立線程池多線程技術主要解決處理器單元內多個線程執行的問題,它能夠顯著減小處理器單元的閒置時間,增長處理器單元的吞吐能力。

 

    假設一個服務器完成一項任務所需時間爲:T1 建立線程時間,T2 在線程中執行任務的時間,T3 銷燬線程時間。

    若是:T1 + T3 遠大於 T2,則能夠採用線程池,以提升服務器性能。

    一個線程池包括如下四個基本組成部分:

  一、線程池管理器(ThreadPool):用於建立並管理線程池,包括 建立線程池,銷燬線程池,添加新任務; 

 二、工做線程(PoolWorker):線程池中線程,在沒有任務時處於等待狀態,能夠循環的執行任務;

  三、任務接口(Task):每一個任務必須實現的接口,以供工做線程調度任務的執行,它主要規定了任務的入口,任務執行完後的收尾工做,任務的執行狀態等;

  四、任務隊列(taskQueue):用於存放沒有處理的任務。提供一種緩衝機制。

    線程池技術正是關注如何縮短或調整T1,T3時間的技術,從而提升服務器程序性能的。它把T1,T3分別安排在服務器程序的啓動和結束的時間段或者一些空閒的時間段,這樣在服務器程序處理客戶請求時,不會有T1,T3的開銷了。

   線程池不只調整T1,T3產生的時間段,並且它還顯著減小了建立線程的數目,看一個例子:

    假設一個服務器一天要處理50000個請求,而且每一個請求須要一個單獨的線程完成。在線程池中,線程數通常是固定的,因此產生線程總數不會超過線程池中線程的數目,而若是服務器不利用線程池來處理這些請求則線程總數爲50000。通常線程池大小是遠小於50000。因此利用線程池的服務器程序不會爲了建立50000而在處理請求時浪費時間,從而提升效率。

2.Java中的線程池種類

    2.1 newSingleThreadExecutor

 

ExecutorService pool = Executors.newSingleThreadExecutor();

    一個單線程的線程池。這個線程池只有一個線程在工做,也就是至關於單線程串行執行全部任務。若是這個惟一的線程由於異常結束,那麼會有一個新的線程來替代它。此線程池保證全部任務的執行順序按照任務的提交順序執行。



public class ThreadPoolExample3 {public static void main(String[] args) {ExecutorService executorService = Executors.newSingleThreadExecutor();for (int i = 0; i < 10; i++) {final int index = i;executorService.execute(new Runnable() {@Overridepublic void run() {System.out.println("t;ask:{}"+ index);}});}executorService.shutdown();}}

    2.2 newFixedThreadPool

 

ExecutorService pool = Executors.newFixedThreadPool(10);

    建立固定大小的線程池。每次提交一個任務就建立一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,若是某個線程由於執行異常而結束,那麼線程池會補充一個新線程。



public class ThreadPoolExample2 {public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(3);for (int i = 0; i < 10; i++) {final int index = i;executorService.execute(new Runnable() {@Overridepublic void run() {System.out.println("t;ask:{}"+ index);}});}executorService.shutdown();}}

    2.3 newCachedThreadPool

 

ExecutorService pool = Executors.newCachedThreadPool();

    建立一個可緩存的線程池。若是線程池的大小超過了處理任務所須要的線程,那麼就會回收部分空閒的線程,當任務數增長時,此線程池又添加新線程來處理任務。

 



public class ThreadPoolExample1 {public static void main(String[] args) {ExecutorService executorService = Executors.newCachedThreadPool();for (int i = 0; i < 10; i++) {final int index = i;executorService.execute(new Runnable() {@Overridepublic void run() {System.out.println("t;ask:{}"+ index);}});}executorService.shutdown();}}

    2.4 newScheduledThreadPool

    此線程池支持定時以及週期性執行任務的需求。

 

ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
  •  
import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;public class ThreadPool {public static void main(String[] args) {ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);for (int i = 0; i < 10; i++) {pool.schedule(() -> {System.out.println(Thread.currentThread().getName() + "\t開始發車啦....");}, 10, TimeUnit.SECONDS);}}}

    上面演示的是延遲10秒執行任務,若是想要執行週期性的任務能夠用下面的方式,每秒執行一次。

//pool.scheduleWithFixedDelay也能夠pool.scheduleAtFixedRate(() -> {System.out.println(Thread.currentThread().getName() + "\t開始發車啦....");}, 1, 1, TimeUnit.SECONDS);

    2.5 newWorkStealingPool

    newWorkStealingPool是jdk1.8纔有的,會根據所需的並行層次來動態建立和關閉線程,經過使用多個隊列減小競爭,底層用的ForkJoinPool來實現的。ForkJoinPool的優點在於,能夠充分利用多cpu,多核cpu的優點,把一個任務拆分紅多個「小任務」,把多個「小任務」放到多個處理器核心上並行執行;當多個「小任務」執行完成以後,再將這些執行結果合併起來便可。

 

    上面的5個線程池,Java已經提供了相對固定的模式,其實還有不足就是,很差寫拒絕策略,調整核心數等大小問題。因此,這也是阿里巴巴不推薦的:

4、線程池配置

 

    竟然阿里規範都不推薦,那麼咱們應該怎麼寫呢?,以下,所示:

​​​​​​​

public ExecutorService buildConsumerQueueThreadPool(){ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("consumer-queue-thread-%d").build();ExecutorService pool = new ThreadPoolExecutor(10, Integer.MAX_VALUE, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),namedThreadFactory,new ThreadPoolExecutor.AbortPolicy());return pool ;}

    併發線程池到底怎麼設置呢?

 

    在具體分析以前先了解下線程池中所定義的狀態,這些狀態都和線程的執行密切相關:

  • RUNNING 天然是運行狀態,指能夠接受任務執行隊列裏的任務

  • SHUTDOWN 指調用了 shutdown() 方法,再也不接受新任務了,可是隊列裏的任務得執行完畢。

  • STOP 指調用了 shutdownNow() 方法,再也不接受新任務,同時拋棄阻塞隊列裏的全部任務並中斷全部正在執行任務。

  • TIDYING 全部任務都執行完畢,在調用 shutdown()/shutdownNow() 中都會嘗試更新爲這個狀態。

  • TERMINATED 終止狀態,當執行 terminated() 後會更新爲這個狀態。

 

    在咱們平常業務開發過程當中,或多或少都會用到併發的功能。那麼在用到併發功能的過程當中,就確定會碰到這個問題。

    一般有點年紀的程序員或許都據說這樣一個說法 (其中 N 表明 CPU 的個數):

CPU 密集型應用,線程池大小設置爲 N + 1。

IO 密集型應用,線程池大小設置爲 2N。

    其實這是極不正確的。那爲何呢?

    1.首先咱們從反面來看,假設這個說法是成立的,那咱們在一臺服務器上部署多少個服務都無所謂了。由於線程池的大小隻能服務器的核數有關,因此這個說法是不正確的。那具體應該怎麼設置大小呢? 

    2.假設這個應用是二者混合型的,其中任務即有 CPU 密集,也有 IO 密集型的,那麼咱們改怎麼設置呢?是否是隻能拋硬盤來決定呢?

    那麼咱們到底該怎麼設置線程池大小呢?

    咱們可使用利特爾法則(Little’s law)來斷定線程池大小。咱們只需計算請求到達率和請求處理的平均時間。而後,將上述值放到利特爾法則(Little’s law)就能夠算出系統平均請求數。估算公式以下:

*線程池大小 = ((線程 IO time + 線程 CPU time )/線程 CPU time ) CPU數目**

    經過公式,咱們瞭解到須要 3 個具體數值:

   1.一個請求所消耗的時間 (線程 IO time + 線程 CPU time)。 

   2.該請求計算時間 (線程 CPU time)。

   3.CPU 數目。

    1.請求消耗時間:Web 服務容器中,能夠經過 Filter 來攔截獲取該請求先後消耗的時間。

    2.CPU 計算時間 = 請求總耗時 - CPU IO time。假設該請求有一個查詢 DB 的操做,只要知道這個查詢 DB 的耗時(CPU IO time)。 

    3.邏輯 CPU 個數 ,設置線程池大小的時候參考的 CPU 個數:

    cat /proc/cpuinfo| grep "processor"| wc -l

    按照上訴原則,咱們再來看線程池的7個參數:

corePoolSize: 線程池中核心線程數量 。

maximumPoolSize: 最大線程數量 。

keepAliveTime :空閒時間(當線程池梳理超過核心數量時,多餘的空閒時間的存活時間,即超過核心線程數量的空閒線程,在多長時間內,會被銷燬) 。

unit: 時間單位 。

workQueue: 當核心線程工做已滿,須要存儲任務的隊列。 

threadFactory: 建立線程的工廠 。

handler: 當隊列滿了以後的拒絕策略。

    線程池按如下行爲執行任務:

    詳細配置參考文章:

    https://www.cnblogs.com/syp172654682/p/9383335.html

 

    前面幾個參數咱們就不講了,很簡單,主要是後面幾個參數,隊列,線程工廠,拒絕策略。

    咱們先看看隊列,線程池默認提供了 4 個隊列:

無界隊列(LinkedBlockingQueue): 默認大小 int 最大值,所以可能會耗盡系統內存,引發OOM,很是危險。 

直接提交的隊列 : 沒有容量,不會保存,直接建立新的線程,所以須要設置很大的線程池數。不然容易執行拒絕策略,也很危險。 

有界隊列(ArrayBlockingQueue):若是core滿了,則存儲在隊列中,若是core滿了且隊列滿了,則建立線程,直到maximumPoolSize 到了,若是隊列滿了且最大線程數已經到了,則執行拒絕策略。 

優先級隊列:按照優先級執行任務。也能夠設置大小。

    再看看拒絕策略,什麼是拒絕策略呢?當隊列滿了,如何處理那些仍然提交的任務。JDK 默認有4種策略:

AbortPolicy :直接拋出異常,阻止系統正常工做.

CallerRunsPolicy : 只要線程池未關閉,該策略直接在調用者線程中,運行當前被丟棄的任務。顯然這樣作不會真的丟棄任務,可是,任務提交線程的性能極有可能會急劇降低。 

DiscardOldestPolicy: 該策略將丟棄最老的一個請求,也就是即將被執行的一個任務,並嘗試再次提交當前任務. 

DiscardPolicy: 該策略默默地丟棄沒法處理的任務,不予任何處理,若是容許任務丟失,我以爲這是最好的方案.

    如何優雅的關閉線程池呢?   

 

    有運行任務天然也有關閉任務,從上文提到的 5 個狀態就能看出如何來關閉線程池。

    其實無非就是兩個方法 shutdown()/shutdownNow()

    但他們有着重要的區別:

  • shutdown() 執行後中止接受新任務,會把隊列的任務執行完畢。

  • shutdownNow() 也是中止接受新任務,但會中斷全部的任務,將線程池狀態變爲 stop。

兩個方法都會中斷線程,用戶可自行判斷是否須要響應中斷。

shutdownNow() 要更簡單粗暴,能夠根據實際場景選擇不一樣的方法。

pool.awaitTermination(1, TimeUnit.SECONDS) 會每隔一秒鐘檢查一次是否執行完畢(狀態爲 TERMINATED),當從 while 循環退出時就代表線程池已經徹底終止了。

    

    如何寫個完整的一個線程池?    

 

   瞭解上面的參數信息後咱們就能夠定義本身的線程池了,我這邊用ArrayBlockingQueue替換了LinkedBlockingQueue,指定了隊列的大小,當任務超出隊列大小以後使用CallerRunsPolicy拒絕策略處理。

    這樣作的好處是嚴格控制了隊列的大小,不會出現一直往裏面添加任務的狀況,有的時候任務處理的比較慢,任務數量過多會佔用大量內存,致使內存溢出。

    固然你也能夠在提交到線程池的入口進行控制,好比用CountDownLatch, Semaphore等。

/*** 自定義線程池<br>* 默認的newFixedThreadPool裏的LinkedBlockingQueue是一個無邊界隊列,若是不斷的往裏加任務,最終會致使內存的不可控<br> * 增長了有邊界的隊列,使用了CallerRunsPolicy拒絕策略**/public class FangjiaThreadPoolExecutor {private static ExecutorService executorService = newFixedThreadPool(50);private static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(10000), new DefaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());}public static void execute(Runnable command) {executorService.execute(command);}public static void shutdown() {executorService.shutdown();}static class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;DefaultThreadFactory() {SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix = "FSH-pool-" +poolNumber.getAndIncrement() +"-thread-";}public Thread newThread(Runnable r) {Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);if (t.isDaemon())t.setDaemon(false);if (t.getPriority() != Thread.NORM_PRIORITY)t.setPriority(Thread.NORM_PRIORITY);return t;}}}

 

5、Spring管理線程池

 

    SpringBoot 盛行;來看看在 SpringBoot 中應當怎麼配置和使用線程池。        既然用了 SpringBoot ,那天然得發揮 Spring 的特性,因此須要 Spring 來幫咱們管理線程池:







@Configurationpublic class TreadPoolConfig {/*** 消費隊列線程* @return*/@Bean(value = "consumerQueueThreadPool")public ExecutorService buildConsumerQueueThreadPool(){ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("consumer-queue-thread-%d").build();ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(5),namedThreadFactory,new ThreadPoolExecutor.AbortPolicy());return pool ;}}

    使用時,很方便:

 




@Resource(name = "consumerQueueThreadPool")private ExecutorService consumerQueueThreadPool;@Overridepublic void execute() {//消費隊列for (int i = 0; i < 5; i++) {consumerQueueThreadPool.execute(new ConsumerQueueThread());}}

6、線程池隔離

 

線程池看似很美好,但也會帶來一些問題。

    若是咱們不少業務都依賴於同一個線程池,當其中一個業務由於各類不可控的緣由消耗了全部的線程,致使線程池所有佔滿。    

    這樣其餘的業務也就不能正常運轉了,這對系統的打擊是巨大的。    

    好比咱們 Tomcat 接受請求的線程池,假設其中一些響應特別慢,線程資源得不到回收釋放;線程池慢慢被佔滿,最壞的狀況就是整個應用都不能提供服務。

    因此咱們須要將線程池進行隔離。

    一般的作法是按照業務進行劃分:

好比下單的任務用一個線程池,獲取數據的任務用另外一個線程池。這樣即便其中一個出現問題把線程池耗盡,那也不會影響其餘的任務運行。

    hystrix 隔離

這樣的需求 Hystrix 已經幫咱們實現了。

Hystrix 是一款開源的容錯插件,具備依賴隔離、系統容錯降級等功能。

下面來看看 Hystrix 簡單的應用:

首先須要定義兩個線程池,分別用於執行訂單、處理用戶。

相關文章
相關標籤/搜索