Java高併發之線程池詳解

線程池優點

在業務場景中, 若是一個對象建立銷燬開銷比較大, 那麼此時建議池化對象進行管理.緩存

例如線程, jdbc鏈接等等, 在高併發場景中, 若是能夠複用以前銷燬的對象, 那麼系統效率將大大提高.併發

另一個好處是能夠設定池化對象的上限, 例如預防建立線程數量過多致使系統崩潰的場景.ide

jdk中的線程池

下文主要從如下幾個角度講解:函數

  • 建立線程池
  • 提交任務
  • 潛在宕機風險
  • 線程池大小配置
  • 自定義阻塞隊列BlockingQueue
  • 回調接口
  • 自定義拒絕策略
  • 自定義ThreadFactory
  • 關閉線程池

建立線程池高併發

咱們能夠經過自定義ThreadPoolExecutor或者jdk內置的Executors來建立一系列的線程池this

  • newFixedThreadPool: 建立固定線程數量的線程池
  • newSingleThreadExecutor: 建立單一線程的池
  • newCachedThreadPool: 建立線程數量自動擴容, 自動銷燬的線程池
  • newScheduledThreadPool: 建立支持計劃任務的線程池

 上述幾種都是經過new ThreadPoolExecutor()來實現的, 構造函數源碼以下: spa

複製代碼
 1     /**
 2      * @param corePoolSize 池內核心線程數量, 超出數量的線程會進入阻塞隊列
 3      * @param maximumPoolSize 最大可建立線程數量
 4      * @param keepAliveTime 線程存活時間
 5      * @param unit 存活時間的單位
 6      * @param workQueue 線程溢出後的阻塞隊列
 7      */
 8     public ThreadPoolExecutor(int corePoolSize,
 9                               int maximumPoolSize,
10                               long keepAliveTime,
11                               TimeUnit unit,
12                               BlockingQueue<Runnable> workQueue) {
13         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);
14     }
15 
16     public static ExecutorService newFixedThreadPool(int nThreads) {
17         return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
18     }
19 
20     public static ExecutorService newSingleThreadExecutor() {
21         return new Executors.FinalizableDelegatedExecutorService
22                 (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
23     }
24 
25     public static ExecutorService newCachedThreadPool() {
26         return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
27     }
28 
29     public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
30         return new ScheduledThreadPoolExecutor(corePoolSize);
31     }
32 
33     public ScheduledThreadPoolExecutor(int corePoolSize) {
34         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue());
35     }
複製代碼

提交任務線程

直接調用executorService.execute(runnable)或者submit(runnable)便可,code

execute和submit的區別在於submit會返回Future來獲取任何執行的結果.對象

咱們看下newScheduledThreadPool的使用示例.

複製代碼
 1 public class SchedulePoolDemo {
 2 
 3     public static void main(String[] args){
 4         ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
 5         // 若是前面的任務沒有完成, 調度也不會啓動
 6         service.scheduleAtFixedRate(new Runnable() {
 7             @Override
 8             public void run() {
 9                 try {
10                     Thread.sleep(2000);
11                     // 每兩秒打印一次.
12                     System.out.println(System.currentTimeMillis()/1000);
13                 } catch (InterruptedException e) {
14                     e.printStackTrace();
15                 }
16             }
17         }, 0, 2, TimeUnit.SECONDS);
18     }
19 }
複製代碼

潛在宕機風險

使用Executors來建立要注意潛在宕機風險.其返回的線程池對象的弊端以下:

  • FixedThreadPool和SingleThreadPoolPool : 容許的請求隊列長度爲 Integer.MAX_VALUE,可能會堆積大量的請求,從而致使 OOM.
  • CachedThreadPool和ScheduledThreadPool : 容許的建立線程數量爲 Integer.MAX_VALUE,可能會建立大量的線程,從而致使 OOM.

綜上所述, 在可能有大量請求的線程池場景中, 更推薦自定義ThreadPoolExecutor來建立線程池, 具體構造函數配置見下文.

線程池大小配置

通常根據任務類型進行區分, 假設CPU爲N核

  • CPU密集型任務須要減小線程數量, 下降線程之間切換形成的開銷, 可配置線程池大小爲N + 1.
  • IO密集型任務則能夠加大線程數量, 可配置線程池大小爲 N * 2.
  • 混合型任務則能夠拆分爲CPU密集型與IO密集型, 獨立配置.

自定義阻塞隊列BlockingQueue

主要存放等待執行的線程, ThreadPoolExecutor中支持自定義該隊列來實現不一樣的排隊隊列.

  • ArrayBlockingQueue:先進先出隊列,建立時指定大小, 有界;
  • LinkedBlockingQueue:使用鏈表實現的先進先出隊列,默認大小爲Integer.MAX_VALUE;
  • SynchronousQueue:不保存提交的任務, 數據也不會緩存到隊列中, 用於生產者和消費者互等對方, 一塊兒離開.
  • PriorityBlockingQueue: 支持優先級的隊列

回調接口

線程池提供了一些回調方法, 具體使用以下所示.

複製代碼
 1         ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>()) {
 2 
 3             @Override
 4             protected void beforeExecute(Thread t, Runnable r) {
 5                 System.out.println("準備執行任務: " + r.toString());
 6             }
 7 
 8             @Override
 9             protected void afterExecute(Runnable r, Throwable t) {
10                 System.out.println("結束任務: " + r.toString());
11             }
12 
13             @Override
14             protected void terminated() {
15                 System.out.println("線程池退出");
16             }
17         };
複製代碼

能夠在回調接口中, 對線程池的狀態進行監控, 例如任務執行的最長時間, 平均時間, 最短期等等, 還有一些其餘的屬性以下:

  • taskCount:線程池須要執行的任務數量.
  • completedTaskCount:線程池在運行過程當中已完成的任務數量.小於或等於taskCount.
  • largestPoolSize:線程池曾經建立過的最大線程數量.經過這個數據能夠知道線程池是否滿過.如等於線程池的最大大小,則表示線程池曾經滿了.
  • getPoolSize:線程池的線程數量.若是線程池不銷燬的話,池裏的線程不會自動銷燬,因此這個大小隻增不減.
  • getActiveCount:獲取活動的線程數.

自定義拒絕策略

線程池滿負荷運轉後, 由於時間空間的問題, 可能須要拒絕掉部分任務的執行.

jdk提供了RejectedExecutionHandler接口, 並內置了幾種線程拒絕策略

  • AbortPolicy: 直接拒絕策略, 拋出異常.
  • CallerRunsPolicy: 調用者本身執行任務策略.
  • DiscardOldestPolicy: 捨棄最老的未執行任務策略.

使用方式也很簡單, 直接傳參給ThreadPool

複製代碼
1         ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, 
2                 new SynchronousQueue<Runnable>(),
3                 Executors.defaultThreadFactory(),
4                 new RejectedExecutionHandler() {
5                     @Override
6                     public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
7                         System.out.println("reject task: " + r.toString());
8                     }
9                 });
複製代碼

自定義ThreadFactory

線程工廠用於建立池裏的線程. 例如在工廠中都給線程setDaemon(true), 這樣程序退出的時候, 線程自動退出.

或者統一指定線程優先級, 設置名稱等等.

複製代碼
 1 class NamedThreadFactory implements ThreadFactory {
 2     private static final AtomicInteger threadIndex = new AtomicInteger(0);
 3     private final String baseName;
 4     private final boolean daemon;
 5 
 6     public NamedThreadFactory(String baseName) {
 7         this(baseName, true);
 8     }
 9 
10     public NamedThreadFactory(String baseName, boolean daemon) {
11         this.baseName = baseName;
12         this.daemon = daemon;
13     }
14 
15     public Thread newThread(Runnable runnable) {
16         Thread thread = new Thread(runnable, this.baseName + "-" + threadIndex.getAndIncrement());
17         thread.setDaemon(this.daemon);
18         return thread;
19     }
20 }
複製代碼

關閉線程池

跟直接new Thread不同, 局部變量的線程池, 須要手動關閉, 否則會致使線程泄漏問題.

默認提供兩種方式關閉線程池.

  • shutdown: 等全部任務, 包括阻塞隊列中的執行完, 纔會終止, 可是不會接受新任務.
  • shutdownNow: 當即終止線程池, 打斷正在執行的任務, 清空隊列. 

 

 

相關文章
相關標籤/搜索