多線程編程學習十一(ThreadPoolExecutor 詳解).

1、ThreadPoolExecutor 參數說明

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  • corePoolSize:核心線程池的大小。當提交一個任務到線程池時,核心線程池會建立一個核心線程來執行任務,即便其餘核心線程可以執行新任務也會建立線程,等到須要執行的任務數大於核心線程池基本大小時就再也不建立。若是調用了線程池的 prestartAllCoreThreads() 方法,核心線程池會提早建立並啓動全部核心線程。
  • workQueue:任務隊列。當核心線程池中沒有線程時,所提交的任務會被暫存在隊列中。Java 提供了多種阻塞隊列
  • maximumPoolSize:線程池容許建立的最大線程數。若是隊列也滿了,而且已建立的線程數小於最大線程數,則線程池會再建立新的空閒線程執行任務。值得注意的是,若是使用了無界的任務隊列則這個參數不起做用。
  • keepAliveTime:當線程池中的線程數大於 corePoolSize 時,keepAliveTime 爲多餘的空閒線程等待新任務的最長時間,超過這個時間後多餘的線程將被終止。因此,若是任務不少,而且每一個任務執行的時間比較短,能夠調大時間,提升線程的利用率。值得注意的是,若是使用了無界的任務隊列則這個參數不起做用。
  • TimeUnit:線程活動保持時間的單位。
  • threadFactory:建立線程的工廠。能夠經過線程工廠給每一個建立出來的線程設置符合業務的名字。html

    // 依賴 guava
    new ThreadFactoryBuilder().setNameFormat("xx-task-%d").build();
  • handler:飽和策略。當隊列和線程池都滿了,說明線程池處於飽和狀態,那麼必須採起一種策略處理提交的新任務。Java 提供瞭如下4種策略:java

    • AbortPolicy:默認。直接拋出異常。
    • CallerRunsPolicy:只用調用者所在線程來運行任務。
    • DiscardOldestPolicy:丟棄隊列裏最近的一個任務,並執行當前任務。
    • DiscardPolicy:不處理,丟棄掉。

tips: 通常咱們稱核心線程池中的線程爲核心線程,這部分線程不會被回收;超過任務隊列後,建立的線程爲空閒線程,這部分線程會被回收(回收時間即 keepAliveTime)數據庫

2、常見的 ThreadPoolExecutor 介紹

Executors 是建立 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 的工廠類。小程序

Java 提供了多種類型的 ThreadPoolExecutor,比較常見的有 FixedThreadPool、SingleThreadExecutor、CachedThreadPool等。服務器

FixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

FixedThreadPool 被稱爲可重用固定線程數的線程池。能夠看到 corePoolSize 和 maximumPoolSize 都被設置成了 nThreads;keepAliveTime設置爲0L,意味着多餘的空閒線程會被當即終止;使用了阻塞隊列 LinkedBlockingQueue 做爲線程的工做隊列(隊列的容量爲 Integer.MAX_VALUE)。多線程

FixedThreadPool 所存在的問題是,因爲隊列的容量爲 Integer.MAX_VALUE,基本能夠認爲是無界的,因此 maximumPoolSize 和 keepAliveTime 參數都不會生效,飽和拒絕策略也不會執行,會形成任務大量堆積在阻塞隊列中。異步

FixedThreadPool 適用於爲了知足資源管理的需求,而須要限制線程數量的應用場景。
函數


SingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

SingleThreadExecutor 是使用單個線程的線程池。能夠看到 corePoolSize 和 maximumPoolSize 被設置爲1,其餘參數與 FixedThreadPool 相同,因此所帶來的風險也和 FixedThreadPool 一致,就不贅述了。ui

SingleThreadExecutor 適用於須要保證順序的執行各個任務。
線程


CachedThreadPool

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

CachedThreadPool 是一個會根據須要建立新線程的線程池。能夠看到 corePoolSize 被設置爲 0,因此建立的線程都爲空閒線程;maximumPoolSize 被設置爲 Integer.MAX_VALUE(基本可認爲無界),意味着能夠建立無限數量的空閒線程;keepAliveTime 設置爲60L,意味着空閒線程等待新任務的最長時間爲60秒;使用沒有容量的 SynchronousQueue 做爲線程池的工做隊列。

CachedThreadPool 所存在的問題是, 若是主線程提交任務的速度高於maximumPool 中線程處理任務的速度時,CachedThreadPool 會不斷建立新線程。極端狀況下,CachedThreadPool會由於建立過多線程而耗盡CPU和內存資源。

CachedThreadPool 適用於執行不少的短時間異步任務的小程序,或者是負載較輕的服務器。

3、自建 ThreadPoolExecutor 線程池

鑑於上面提到的風險,咱們更提倡使用 ThreadPoolExecutor 去建立線程池,而不用 Executors 工廠去建立。

如下是一個 ThreadPoolExecutor 建立線程池的 Demo 實例:

public class Pool {

    static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("pool-task-%d").build();
    static ExecutorService executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2,
            200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024),
            threadFactory, new ThreadPoolExecutor.AbortPolicy());

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1. 無返回值的任務執行 -> Runnable
        executor.execute(() -> System.out.println("Hello World"));
        // 2. 有返回值的任務執行 -> Callable
        Future<String> future = executor.submit(() -> "Hello World");
        // get 方法會阻塞線程執行等待返回結果
        String result = future.get();
        System.out.println(result);

        // 3. 監控線程池
        monitor();

        // 4. 關閉線程池
        shutdownAndAwaitTermination();

        monitor();
    }

    private static void monitor() {
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Pool.executor;
        System.out.println("【線程池任務】線程池中曾經建立過的最大線程數:" + threadPoolExecutor.getLargestPoolSize());
        System.out.println("【線程池任務】線程池中線程數:" + threadPoolExecutor.getPoolSize());
        System.out.println("【線程池任務】線程池中活動的線程數:" + threadPoolExecutor.getActiveCount());
        System.out.println("【線程池任務】隊列中等待執行的任務數:" + threadPoolExecutor.getQueue().size());
        System.out.println("【線程池任務】線程池已執行完任務數:" + threadPoolExecutor.getCompletedTaskCount());
    }

    /**
     * 關閉線程池
     * 1. shutdown、shutdownNow 的原理都是遍歷線程池中的工做線程,而後逐個調用線程的 interrupt 方法來中斷線程。
     * 2. shutdownNow:將線程池的狀態設置成 STOP,而後嘗試中止全部的正在執行或暫停任務的線程,並返回等待執行任務的列表。
     * 3. shutdown:將線程池的狀態設置成 SHUTDOWN 狀態,而後中斷全部沒有正在執行任務的線程。
     */
    private static void shutdownAndAwaitTermination() {
        // 禁止提交新任務
        executor.shutdown();
        try {
            // 等待現有任務終止
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                // 取消當前正在執行的任務
                executor.shutdownNow();
                // 等待一段時間讓任務響應被取消
                if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                    System.err.println("Pool did not terminate");
                }
            }
        } catch (InterruptedException ie) {
            // 若是當前線程也中斷,則取消
            executor.shutdownNow();
            // 保留中斷狀態
            Thread.currentThread().interrupt();
        }
    }
}

建立線程池須要注意如下幾點:

  1. CPU 密集型任務應配置儘量小的線程,如配置 Ncpu+1 個線程。
  2. IO 密集型任務(數據庫讀寫等)應配置儘量多的線程,如配置 Ncpu*2 個線程。
  3. 優先級不一樣的任務可使用優先級隊列 PriorityBlockingQueue 來處理。
  4. 建議使用有界隊列。能夠避免建立數量很是多的線程,甚至拖垮系統。有界隊列能增長系統的穩定性和預警能力,能夠根據須要設大一點兒,好比幾千。

4、ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 繼承自 ThreadPoolExecutor。它主要用來在給定的延遲以後運行任務,或者按期執行任務。

public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory);
    }

ScheduledThreadPoolExecutor 的功能與 Timer 相似,但功能更強大、更靈活。Timer 對應的是單個後臺線程,而ScheduledThreadPoolExecutor 能夠在構造函數中指定多個對應的後臺線程數。

Java 提供了多種類型的 ScheduledThreadPoolExecutor ,能夠經過 Executors 建立,比較常見的有 ScheduledThreadPool、SingleThreadScheduledExecutor 等。適用於須要多個後臺線程執行週期任務,同時爲了知足資源管理的需求而須要限制後臺線程數量的應用場景。

public class ScheduleTaskTest {

    static ThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").build();
    static ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5, threadFactory);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1. 延遲 3 秒後執行 Runnable 方法
        scheduledExecutorService.schedule(() -> System.out.println("Hello World"), 3000, TimeUnit.MILLISECONDS);

        // 2. 延遲 3 秒後執行 Callable 方法
        ScheduledFuture<String> scheduledFuture = scheduledExecutorService.schedule(() -> "Hello ScheduledFuture", 3000, TimeUnit.MILLISECONDS);
        System.out.println(scheduledFuture.get());

        // 3. 延遲 1 秒後開始每隔 3 秒週期執行。
        //    若是中間任務遇到異常,則禁止後續執行。
        //    固定的頻率來執行某項任務,它不受任務執行時間的影響。到時間,就執行。
        scheduledExecutorService.scheduleAtFixedRate(() -> System.out.println("Hello ScheduleAtFixedRate"), 1, 3000, TimeUnit.MILLISECONDS);

        // 4. 延遲 1 秒後,每一個任務結束延遲 3 秒後再執行下個任務。
        //    若是中間任務遇到異常,則禁止後續執行。
        //    受任務執行時間的影響,等待任務執行結束後纔開始計算延遲。
        scheduledExecutorService.scheduleWithFixedDelay(() -> System.out.println("Hello ScheduleWithFixedDelay"), 1, 3000, TimeUnit.MILLISECONDS);
    }
}

ScheduledThreadPoolExecutor 的執行步驟大抵以下:

  1. 當調用 ScheduledThreadPoolExecutor 的 scheduleAtFixedRate() 方法或者 scheduleWithFixedDelay()方 法時,會向 DelayedWorkQueue 隊列添加 ScheduledFutureTask 任務。
  2. 線程池中的線程從 DelayedWorkQueue隊列中獲取執行時間已到達的 ScheduledFutureTask,而後執行任務。
  3. 線程修改 ScheduledFutureTask 任務的執行時間爲下次將要被執行的時間。
  4. 線程把修改後的 ScheduledFutureTask 從新放回隊列。
相關文章
相關標籤/搜索