Java 中的幾種線程池,你以前用對了嗎

很久不發文章了,難道是由於忙,實際上是由於懶。這是一篇關於線程池使用和基本原理的科普水文,若是你常常用到線程池,不知道你的用法標準不標準,是否有隱藏的 OOM 風險。不常常用線程池的同窗,還有對幾種線程的使用不甚瞭解的同窗能夠讀一下此文。java

爲何要使用線程池

雖然你們應該都已經很清楚了,但仍是說一下。其實歸根結底最主要的一個緣由就是爲了提升性能。web

線程池和數據庫鏈接池是一樣的道理,數據庫鏈接池是爲了減小鏈接創建和釋放帶來的性能開銷。而線程池則是爲了減小線程創建和銷燬帶來的性能消耗。數據庫

以 web 項目爲例,有如下兩種狀況:緩存

一、每次過來一個請求,都要在服務端建立一個新線程來處理請求,請求處理完成銷燬線程;ide

二、每次過來一個請求,服務端在線程池中直接拿過一個空閒的線程來處理這個請求,處理完成後還給線程池;函數

答案是確定的,確定是第二種使用線程池的方式性能更好。性能

除了性能這個最重要的緣由外,線程池的使用能夠幫助咱們更合理的使用系統資源。仍是以 web 項目爲例,若是咱們在服務端不使用線程池,而是無節制的來一個請求建立一個線程,系統資源將會很快被耗盡。而使用線程池的話,則能夠防止這種狀況發生,固然這要創建在正確合理的使用線程池的基礎上,要固定線程的最大數以及等待隊列的大小。ui

幾種線程池的使用和原理

線程池當然好用,可是要創建在正確的使用方式的基礎上,若是使用方式不當,一樣會出現問題。接下來就介紹一下幾種線程池的使用。線程

在大名鼎鼎的 J.U.C 包下已經提供了 Executors 類,它已經封裝實現了四種建立線程池的方式,它暴露出幾個簡單的方法供開發者調用。最終都是經過 new ThreadPoolExecutor() ExecutorService 實例,從而獲得咱們想要的線程池類型。這樣作其實有利有弊,好的是咱們不用關心那麼多參數,只須要簡單的指定一兩個參數就能夠;很差的是,這樣一來又屏蔽了不少細節,若是有些參數使用默認的,而開發者又不瞭解原理的狀況下,可能會形成 OOM 等問題。rest

不少公司都不建議或者強制不容許直接使用 Executors 類提供的方法來建立線程池,例如阿里巴巴Java開發手冊裏就明確不容許這樣建立線程池,必定要經過 ThreadPoolExecutor(xx,xx,xx...) 來明確線程池的運行規則,指定更合理的參數。

先來看一下 ThreadPoolExecutor 的幾個參數和它們的意義,先來看一下它最完整參數的重載。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

一共有 7 個參數。

corePoolSize

核心線程數,當有任務進來的時候,若是當前線程數還未達到 corePoolSize 個數,則建立核心線程,核心線程有幾個特色:

一、當線程數未達到核心線程最大值的時候,新任務進來,即便有空閒線程,也不會複用,仍然新建核心線程;

二、核心線程通常不會被銷燬,即便是空閒的狀態,可是若是經過方法 allowCoreThreadTimeOut(boolean value) 設置爲 true 時,超時也一樣會被銷燬;

三、生產環境首次初始化的時候,能夠調用 prestartCoreThread() 方法來預先建立全部核心線程,避免第一次調用緩慢;

maximumPoolSize

除了有核心線程外,有些策略是當核心線程徹底無空閒的時候,還會建立一些臨時的線程來處理任務,maximumPoolSize 就是核心線程 + 臨時線程的最大上限。臨時線程有一個超時機制,超過了設置的空閒時間沒有事兒幹,就會被銷燬。

keepAliveTime

這個就是上面兩個參數裏所提到的超時時間,也就是線程的最大空閒時間,默認用於非核心線程,經過 allowCoreThreadTimeOut(boolean value) 方法設置後,也會用於核心線程。

unit

這個參數配合上面的 keepAliveTime ,指定超時的時間單位,秒、分、時等。

workQueue

等待執行的任務隊列,若是核心線程沒有空閒的了,新來的任務就會被放到這個等待隊列中。這個參數其實必定程度上決定了線程池的運行策略,爲何這麼說呢,由於隊列分爲有界隊列和無界隊列。

有界隊列:隊列的長度有上限,當核心線程滿載的時候,新任務進來進入隊列,當達到上限,有沒有核心線程去即時取走處理,這個時候,就會建立臨時線程。(警戒臨時線程無限增長的風險)

無界隊列:隊列沒有上限的,當沒有核心線程空閒的時候,新來的任務能夠無止境的向隊列中添加,而永遠也不會建立臨時線程。(警戒任務隊列無限堆積的風險)

threadFactory

它是一個接口,用於實現生成線程的方式、定義線程名格式、是否後臺執行等等,能夠用 Executors.defaultThreadFactory() 默認的實現便可,也能夠用 Guava 等三方庫提供的方法實現,若是有特殊要求的話能夠本身定義。它最重要的地方應該就是定義線程名稱的格式,便於排查問題了吧。

handler

當沒有空閒的線程處理任務,而且等待隊列已滿(固然這隻對有界隊列有效),再有新任務進來的話,就要作一些取捨了,而這個參數就是指定取捨策略的,有下面四種策略能夠選擇:

ThreadPoolExecutor.AbortPolicy:直接拋出異常,這是默認策略; 
ThreadPoolExecutor.DiscardPolicy:直接丟棄任務,可是不拋出異常。 
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,而後將新來的任務加入等待隊列
ThreadPoolExecutor.CallerRunsPolicy:由線程池所在的線程處理該任務,好比在 main 函數中建立線程池,若是執行此策略,將有 main 線程來執行該任務

雖然並不提倡用 Executors 中的方法來建立線程池,但仍是用他們來說一下幾種線程池的原理。

一、newFixedThreadPool

它有兩個重載方法,代碼以下:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
 }

創建一個線程數量固定的線程池,規定的最大線程數量,超過這個數量以後進來的任務,會放到等待隊列中,若是有空閒線程,則在等待隊列中獲取,遵循先進先出原則。

建立固定線程數量線程池, corePoolSize 和 maximumPoolSize 要一致,即核心線程數和最大線程數(核心+非核心線程)一致,Executors 默認使用的是 LinkedBlockingQueue 做爲等待隊列,這是一個無界隊列,這也是使用它的風險所在,除非你能保證提交的任務不會無節制的增加,不然不要使用無界隊列,這樣有可能形成等待隊列無限增長,形成 OOM。

正確的建立固定線程數線程池的作法是

private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("fengzheng" + "-%d").setDaemon(true).build();

public static ExecutorService createFixedThreadPool() {
        int poolSize = 5;
        int queueSize = 10;
        ExecutorService executorService = new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(queueSize), threadFactory, new ThreadPoolExecutor.AbortPolicy());
        return executorService;
    }

上面代碼是建立一個 5 個線程的固定數量線程池,這裏線程存活時間沒有做用,因此設置爲 0,使用了 ArrayBlockingQueue 做爲等待隊列,設置長度爲 10 ,最多容許10個等待任務,超過的任務會執行默認的 AbortPolicy 策略,也就是直接拋異常。ThreadFactory 使用了 Guava 庫提供的方法,定義了線程名稱,方便以後排查問題。

二、newSingleThreadExecutor

創建一個只有一個線程的線程池,若是有超過一個任務進來,只有一個能夠執行,其他的都會放到等待隊列中,若是有空閒線程,則在等待隊列中獲取,遵循先進先出原則。使用 LinkedBlockingQueue 做爲等待隊列。

這個方法一樣存在等待隊列無限長的問題,容易形成 OOM,因此正確的建立方式參考上面固定數量線程池建立的方式,只是把 poolSize 設置爲 1 。

三、newCachedThreadPool

緩存型線程池,在覈心線程達到最大值以前,有任務進來就會建立新的核心線程,並加入核心線程池,即時有空閒的線程,也不會複用。達到最大核心線程數後,新任務進來,若是有空閒線程,則直接拿來使用,若是沒有空閒線程,則新建臨時線程。而且線程的容許空閒時間都很短,若是超過空閒時間沒有活動,則銷燬臨時線程。關鍵點就在於它使用 SynchronousQueue 做爲等待隊列,它不會保留任務,新任務進來後,直接建立臨時線程處理,這樣一來,也就容易形成無限制的建立線程,形成 OOM。

正確的建立緩存型線程池的作法是

private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("fengzheng" + "-%d").setDaemon(true).build();

    public static ExecutorService createCacheThreadPool(){
        int coreSize = 10;
        int maxSize = 20;
        return new ThreadPoolExecutor(coreSize, maxSize, 10L, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(), threadFactory, new ThreadPoolExecutor.AbortPolicy());
    }

四、newScheduledThreadPool

計劃型線程池,能夠設置固定時間的延時或者按期執行任務,一樣是看線程池中有沒有空閒線程,若是有,直接拿來使用,若是沒有,則新建線程加入池。使用的是 DelayedWorkQueue 做爲等待隊列,這中類型的隊列會保證只有到了指定的延時時間,纔會執行任務。

正確的建立緩存型線程池的作法是

private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("fengzheng" + "-%d").setDaemon(true).build();

    private static CountDownLatch latch = new CountDownLatch(1);

    public static void main(String[] args) throws InterruptedException {
        Task task = new Task();
        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2, threadFactory);
        executorService.scheduleAtFixedRate(task,0L,5L, TimeUnit.SECONDS);
        latch.await();
    }

    static class Task implements Runnable{
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "executing");
        }
    }

最後,各位同窗不妨到個人公衆號裏互動一下 : 古時的風箏 ,進入公衆號能夠加入交流羣

相關文章
相關標籤/搜索