深刻理解 Java 線程池

1、簡介

什麼是線程池

線程池是一種多線程處理形式,處理過程當中將任務添加到隊列,而後在建立線程後自動啓動這些任務。html

爲何要用線程池

若是併發請求數量不少,但每一個線程執行的時間很短,就會出現頻繁的建立和銷燬線程。如此一來,會大大下降系統的效率,可能頻繁建立和銷燬線程的時間、資源開銷要大於實際工做的所需。java

正是因爲這個問題,因此有必要引入線程池。使用 線程池的好處 有如下幾點:編程

  • 下降資源消耗 - 經過重複利用已建立的線程下降線程建立和銷燬形成的消耗。
  • 提升響應速度 - 當任務到達時,任務能夠不須要等到線程建立就能當即執行。
  • 提升線程的可管理性 - 線程是稀缺資源,若是無限制的建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行統一的分配,調優和監控。可是要作到合理的利用線程池,必須對其原理了如指掌。

2、Executor 框架

Executor 框架是一個根據一組執行策略調用,調度,執行和控制的異步任務的框架,目的是提供一種將」任務提交」與」任務如何運行」分離開來的機制。數組

核心 API 概述

Executor 框架核心 API 以下:緩存

  • Executor - 運行任務的簡單接口。
  • ExecutorService - 擴展了 Executor 接口。擴展能力:
    • 支持有返回值的線程;
    • 支持管理線程的生命週期。
  • ScheduledExecutorService - 擴展了 ExecutorService 接口。擴展能力:支持按期執行任務。
  • AbstractExecutorService - ExecutorService 接口的默認實現。
  • ThreadPoolExecutor - Executor 框架最核心的類,它繼承了 AbstractExecutorService 類。
  • ScheduledThreadPoolExecutor - ScheduledExecutorService 接口的實現,一個可定時調度任務的線程池。
  • Executors - 能夠經過調用 Executors 的靜態工廠方法來建立線程池並返回一個 ExecutorService 對象。

Executor

Executor 接口中只定義了一個 execute 方法,用於接收一個 Runnable 對象。markdown

public interface Executor {
    void execute(Runnable command);
}
複製代碼

ExecutorService

ExecutorService 接口繼承了 Executor 接口,它還提供了 invokeAllinvokeAnyshutdownsubmit 等方法。多線程

public interface ExecutorService extends Executor {

    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
複製代碼

從其支持的方法定義,不難看出:相比於 Executor 接口,ExecutorService 接口主要的擴展是:併發

  • 支持有返回值的線程 - sumbitinvokeAllinvokeAny 方法中都支持傳入Callable 對象。
  • 支持管理線程生命週期 - shutdownshutdownNowisShutdown 等方法。

ScheduledExecutorService

ScheduledExecutorService 接口擴展了 ExecutorService 接口。框架

它除了支持前面兩個接口的全部能力之外,還支持定時調度線程。異步

public interface ScheduledExecutorService extends ExecutorService {

    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

}
複製代碼

其擴展的接口提供如下能力:

  • schedule 方法能夠在指定的延時後執行一個 Runnable 或者 Callable 任務。
  • scheduleAtFixedRate 方法和 scheduleWithFixedDelay 方法能夠按照指定時間間隔,按期執行任務。

3、ThreadPoolExecutor

java.uitl.concurrent.ThreadPoolExecutor 類是 Executor 框架中最核心的類。因此,本文將着重講述一下這個類。

重要字段

ThreadPoolExecutor 有如下重要字段:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
複製代碼

參數說明:

  • ctl - 用於控制線程池的運行狀態和線程池中的有效線程數量。它包含兩部分的信息:
    • 線程池的運行狀態 (runState)
    • 線程池內有效線程的數量 (workerCount)
    • 能夠看到,ctl 使用了 Integer 類型來保存,高 3 位保存 runState,低 29 位保存 workerCountCOUNT_BITS 就是 29,CAPACITY 就是 1 左移 29 位減 1(29 個 1),這個常量表示 workerCount 的上限值,大約是 5 億。
  • 運行狀態 - 線程池一共有五種運行狀態:
    • RUNNING - 運行狀態。接受新任務,而且也能處理阻塞隊列中的任務。
    • SHUTDOWN - 關閉狀態。不接受新任務,但能夠處理阻塞隊列中的任務。
      • 在線程池處於 RUNNING 狀態時,調用 shutdown 方法會使線程池進入到該狀態。
      • finalize 方法在執行過程當中也會調用 shutdown 方法進入該狀態。
    • STOP - 中止狀態。不接受新任務,也不處理隊列中的任務。會中斷正在處理任務的線程。在線程池處於 RUNNINGSHUTDOWN 狀態時,調用 shutdownNow 方法會使線程池進入到該狀態。
    • TIDYING - 整理狀態。若是全部的任務都已終止了,workerCount (有效線程數) 爲 0,線程池進入該狀態後會調用 terminated 方法進入 TERMINATED 狀態。
    • TERMINATED - 已終止狀態。在 terminated 方法執行完後進入該狀態。默認 terminated 方法中什麼也沒有作。進入 TERMINATED 的條件以下:
      • 線程池不是 RUNNING 狀態;
      • 線程池狀態不是 TIDYING 狀態或 TERMINATED 狀態;
      • 若是線程池狀態是 SHUTDOWN 而且 workerQueue 爲空;
      • workerCount 爲 0;
      • 設置 TIDYING 狀態成功。

構造方法

ThreadPoolExecutor 有四個構造方法,前三個都是基於第四個實現。第四個構造方法定義以下:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
複製代碼

參數說明:

  • corePoolSize - 核心線程數量。當有新任務經過 execute 方法提交時 ,線程池會執行如下判斷:
    • 若是運行的線程數少於 corePoolSize,則建立新線程來處理任務,即便線程池中的其餘線程是空閒的。
    • 若是線程池中的線程數量大於等於 corePoolSize 且小於 maximumPoolSize,則只有當 workQueue 滿時才建立新的線程去處理任務;
    • 若是設置的 corePoolSizemaximumPoolSize 相同,則建立的線程池的大小是固定的。這時若是有新任務提交,若 workQueue 未滿,則將請求放入 workQueue 中,等待有空閒的線程去從 workQueue 中取任務並處理;
    • 若是運行的線程數量大於等於 maximumPoolSize,這時若是 workQueue 已經滿了,則使用 handler 所指定的策略來處理任務;
    • 因此,任務提交時,判斷的順序爲 corePoolSize => workQueue => maximumPoolSize
  • maximumPoolSize - 最大線程數量
    • 若是隊列滿了,而且已建立的線程數小於最大線程數,則線程池會再建立新的線程執行任務。
    • 值得注意的是:若是使用了無界的任務隊列這個參數就沒什麼效果。
  • keepAliveTime線程保持活動的時間
    • 當線程池中的線程數量大於 corePoolSize 的時候,若是這時沒有新的任務提交,核心線程外的線程不會當即銷燬,而是會等待,直到等待的時間超過了 keepAliveTime
    • 因此,若是任務不少,而且每一個任務執行的時間比較短,能夠調大這個時間,提升線程的利用率。
  • unit - keepAliveTime 的時間單位。有 7 種取值。可選的單位有天(DAYS),小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
  • workQueue - 等待執行的任務隊列。用於保存等待執行的任務的阻塞隊列。 能夠選擇如下幾個阻塞隊列。
    • ArrayBlockingQueue - 有界阻塞隊列
      • 此隊列是基於數組的先進先出隊列(FIFO)
      • 此隊列建立時必須指定大小。
    • LinkedBlockingQueue - 無界阻塞隊列
      • 此隊列是基於鏈表的先進先出隊列(FIFO)
      • 若是建立時沒有指定此隊列大小,則默認爲 Integer.MAX_VALUE
      • 吞吐量一般要高於 ArrayBlockingQueue
      • 使用 LinkedBlockingQueue 意味着: maximumPoolSize 將不起做用,線程池能建立的最大線程數爲 corePoolSize,由於任務等待隊列是無界隊列。
      • Executors.newFixedThreadPool 使用了這個隊列。
    • SynchronousQueue - 不會保存提交的任務,而是將直接新建一個線程來執行新來的任務
      • 每一個插入操做必須等到另外一個線程調用移除操做,不然插入操做一直處於阻塞狀態。
      • 吞吐量一般要高於 LinkedBlockingQueue
      • Executors.newCachedThreadPool 使用了這個隊列。
    • PriorityBlockingQueue - 具備優先級的無界阻塞隊列
  • threadFactory - 線程工廠。能夠經過線程工廠給每一個建立出來的線程設置更有意義的名字。
  • handler - 飽和策略。它是 RejectedExecutionHandler 類型的變量。當隊列和線程池都滿了,說明線程池處於飽和狀態,那麼必須採起一種策略處理提交的新任務。線程池支持如下策略:
    • AbortPolicy - 丟棄任務並拋出異常。這也是默認策略。
    • DiscardPolicy - 丟棄任務,但不拋出異常。
    • DiscardOldestPolicy - 丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程)。
    • CallerRunsPolicy - 只用調用者所在的線程來運行任務。
    • 若是以上策略都不能知足須要,也能夠經過實現 RejectedExecutionHandler 接口來定製處理策略。如記錄日誌或持久化不能處理的任務。

execute 方法

默認狀況下,建立線程池以後,線程池中是沒有線程的,須要提交任務以後纔會建立線程。

提交任務可使用 execute 方法,它是 ThreadPoolExecutor 的核心方法,經過這個方法能夠向線程池提交一個任務,交由線程池去執行

execute 方法工做流程以下:

  1. 若是 workerCount < corePoolSize,則建立並啓動一個線程來執行新提交的任務;
  2. 若是 workerCount >= corePoolSize,且線程池內的阻塞隊列未滿,則將任務添加到該阻塞隊列中;
  3. 若是 workerCount >= corePoolSize && workerCount < maximumPoolSize,且線程池內的阻塞隊列已滿,則建立並啓動一個線程來執行新提交的任務;
  4. 若是workerCount >= maximumPoolSize,而且線程池內的阻塞隊列已滿,則根據拒絕策略來處理該任務, 默認的處理方式是直接拋異常。

其餘重要方法

ThreadPoolExecutor 類中還有一些重要的方法:

  • submit - 相似於 execute,可是針對的是有返回值的線程。submit 方法是在 ExecutorService 中聲明的方法,在 AbstractExecutorService 就已經有了具體的實現。ThreadPoolExecutor 直接複用 AbstractExecutorServicesubmit 方法。
  • shutdown - 不會當即終止線程池,而是要等全部任務緩存隊列中的任務都執行完後才終止,但不再會接受新的任務。
    • 將線程池切換到 SHUTDOWN 狀態;
    • 並調用 interruptIdleWorkers 方法請求中斷全部空閒的 worker;
    • 最後調用 tryTerminate 嘗試結束線程池。
  • shutdownNow - 當即終止線程池,並嘗試打斷正在執行的任務,而且清空任務緩存隊列,返回還沒有執行的任務。與 shutdown 方法相似,不一樣的地方在於:
    • 設置狀態爲 STOP
    • 中斷全部工做線程,不管是不是空閒的;
    • 取出阻塞隊列中沒有被執行的任務並返回。
  • isShutdown - 調用了 shutdownshutdownNow 方法後,isShutdown 方法就會返回 true。
  • isTerminaed - 當全部的任務都已關閉後,才表示線程池關閉成功,這時調用 isTerminaed 方法會返回 true。
  • setCorePoolSize - 設置核心線程數大小。
  • setMaximumPoolSize - 設置最大線程數大小。
  • getTaskCount - 線程池已經執行的和未執行的任務總數;
  • getCompletedTaskCount - 線程池已完成的任務數量,該值小於等於 taskCount
  • getLargestPoolSize - 線程池曾經建立過的最大線程數量。經過這個數據能夠知道線程池是否滿過,也就是達到了 maximumPoolSize
  • getPoolSize - 線程池當前的線程數量;
  • getActiveCount - 當前線程池中正在執行任務的線程數量。

使用示例

public class ThreadPoolExecutorDemo {

    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 500, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());

        for (int i = 0; i < 100; i++) {
            threadPoolExecutor.execute(new MyThread());
            String info = String.format("線程池中線程數目:%s,隊列中等待執行的任務數目:%s,已執行玩別的任務數目:%s",
                threadPoolExecutor.getPoolSize(),
                threadPoolExecutor.getQueue().size(),
                threadPoolExecutor.getCompletedTaskCount());
            System.out.println(info);
        }
        threadPoolExecutor.shutdown();
    }

    static class MyThread implements Runnable {

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " 執行");
        }

    }

}
複製代碼

4、Executors

JDK 的 Executors 類中提供了幾種具備表明性的線程池,這些線程池 都是基於 ThreadPoolExecutor 的定製化實現

在實際使用線程池的場景中,咱們每每不是直接使用 ThreadPoolExecutor ,而是使用 JDK 中提供的具備表明性的線程池實例。

newSingleThreadExecutor

建立一個單線程的線程池

只會建立惟一的工做線程來執行任務,保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行。 若是這個惟一的線程由於異常結束,那麼會有一個新的線程來替代它

單工做線程最大的特色是:可保證順序地執行各個任務

示例:

public class SingleThreadExecutorDemo {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 100; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " 執行");
                }
            });
        }
        executorService.shutdown();
    }

}
複製代碼

newFixedThreadPool

建立一個固定大小的線程池

每次提交一個任務就會新建立一個工做線程,若是工做線程數量達到線程池最大線程數,則將提交的任務存入到阻塞隊列中

FixedThreadPool 是一個典型且優秀的線程池,它具備線程池提升程序效率和節省建立線程時所耗的開銷的優勢。可是,在線程池空閒時,即線程池中沒有可運行任務時,它不會釋放工做線程,還會佔用必定的系統資源。

示例:

public class FixedThreadPoolDemo {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 100; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " 執行");
                }
            });
        }
        executorService.shutdown();
    }

}
複製代碼

newCachedThreadPool

建立一個可緩存的線程池

  • 若是線程池長度超過處理任務所須要的線程數,就會回收部分空閒的線程;
  • 若是長時間沒有往線程池中提交任務,即若是工做線程空閒了指定的時間(默認爲 1 分鐘),則該工做線程將自動終止。終止後,若是你又提交了新的任務,則線程池從新建立一個工做線程。
  • 此線程池不會對線程池大小作限制,線程池大小徹底依賴於操做系統(或者說 JVM)可以建立的最大線程大小。 所以,使用 CachedThreadPool 時,必定要注意控制任務的數量,不然,因爲大量線程同時運行,頗有會形成系統癱瘓。

示例:

public class CachedThreadPoolDemo {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 100; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " 執行");
                }
            });
        }
        executorService.shutdown();
    }

}
複製代碼

newScheduleThreadPool

建立一個大小無限的線程池。此線程池支持定時以及週期性執行任務的需求。

public class ScheduledThreadPoolDemo {

    public static void main(String[] args) {
        schedule();
        scheduleAtFixedRate();
    }

    private static void schedule() {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        for (int i = 0; i < 100; i++) {
            executorService.schedule(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " 執行");
                }
            }, 1, TimeUnit.SECONDS);
        }
        executorService.shutdown();
    }

    private static void scheduleAtFixedRate() {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        for (int i = 0; i < 100; i++) {
            executorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " 執行");
                }
            }, 1, 1, TimeUnit.SECONDS);
        }
        executorService.shutdown();
    }

}
複製代碼

參考資料

相關文章
相關標籤/搜索