多線程(三)、線程池 ThreadPoolExecutor 知識點總結

本篇是多線程系列的第三篇,若是對前兩篇感興趣的也能夠去看看。前端

多線程(一)、基礎概念及notify()和wait()的使用java

多線程(二)、內置鎖 synchronizednginx

Android進階系列文章是我在學習的同時對知識點的整理,一是爲了加深印象,二是方便後續查閱。web

若是文中有錯誤的地方,歡迎批評指出。數據庫

前言

若是在Android裏面,直接 new Thread ,阿里巴巴 Android 開發規範會提示你不要顯示建立線程,請使用線程池,爲啥要用線程池?你對線程池瞭解多少?後端

1、線程池ThreadPoolExecutor 基礎概念

一、什麼是線程池

多線程(一)、基礎概念及notify()和wait()的使用 講了線程的建立,每當有任務來的時候,經過建立一個線程來執行任務,當任務執行結束,對線程進行銷燬,併發操做的時候,大量任務須要執行,每一個任務都要須要重複線程的建立、執行、銷燬,形成了CPU的資源銷燬,並下降了響應速度。數組

        new Thread(new Runnable() {
            @Override
            public void run() {
                // 任務執行
            }
        }).start();
複製代碼

**線程池 **:字面上理解就是將線程經過一個池子進行管理,當任務來的時候,從池子中取出一個已經建立好的線程進行任務的執行,執行結束後再將線程放回池中,待線程池銷燬的時候再統一對線程進行銷燬。緩存

二、使用線程池的好處

經過上面的對比,使用線程池基本有之前好處:網絡

一、下降資源消耗。經過重複使用線程池中的線程,下降了線程建立和銷燬帶來的資源消耗。多線程

二、提升響應速度。重複使用池中線程,減小了重複建立和銷燬線程帶來的時間開銷。

三、提升線程的可管理性。線程是稀缺資源,咱們不可能無節制建立,這樣會大量消耗系統資源,使用線程池能夠統一分配,管理和監控線程。

三、線程池參數說明

要使用線程池,就必需要用到 ThreadPoolExecutor 類,

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
 
{
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
複製代碼

這裏貼了 ThreadPoolExecutor 最複雜的一個構造方法,咱們把參數單獨拎出來說

一、int corePoolSize

核心線程數量:每當接收到一個任務的時候,線程池會建立一個新的線程來執行任務,直到當前線程池中的線程數目等於 corePoolSize ,當任務大於corePoolSize 時候,會放入阻塞隊列

二、int maximumPoolSize

非核心線程數量:線程池中容許的最大線程數,若是當前阻塞隊列滿了,當接收到新的任務就會再次建立線程進行執行,直到線程池中的數目等於maximumPoolSize

三、long keepAliveTime

線程空閒時存活時間:當線程數大於沒有任務執行的時候,繼續存活的時間,默認該參數只有線程數大於corePoolSize時纔有用

四、TimeUnit unit

keepAliveTime的時間單位

五、BlockingQueue workQueue

阻塞隊列:當線程池中線程數目超過 corePoolSize 的時候,線程會進入阻塞隊列進行阻塞等待,當阻塞隊列滿了的時候,會根據 maximumPoolSize 數量新開線程執行。

隊列:

是一種特殊的線性表,特殊之處在於它只容許在表的前端(front)進行刪除操做,而在表的後端(rear)進行插入操做,和棧同樣,隊列是一種操做受限制的線性表。

進行插入操做的端稱爲隊尾,進行刪除操做的端稱爲隊頭。

隊列中沒有元素時,稱爲空隊列。隊列的數據元素又稱爲隊列元素。

在隊列中插入一個隊列元素稱爲入隊,從隊列中刪除一個隊列元素稱爲出隊。

由於隊列只容許在一端插入,在另外一端刪除,因此只有最先進入隊列的元素才能最早從隊列中刪除,故隊列又稱爲先進先出(FIFO—first in first out)線性表。

阻塞隊列經常使用於生產者和消費者的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是生產者存放元素的緩存容器,而消費者也只從容器裏拿元素。

先看看 BlockingQueue ,它是一個接口,繼承 Queue

public interface BlockingQueue<Eextends Queue<E>
複製代碼

再看看它裏面的方法

針對這幾個方法,簡單的進行介紹:

拋出異常 返回特殊值 阻塞 超時
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
檢查 element() peek()

拋出異常:是指當阻塞隊列滿時候,再往隊列裏插入元素,會拋出IllegalStateException("Queue full")異常。當隊列爲空時,從隊列裏獲取元素時會拋出NoSuchElementException異常 。

返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從隊列裏拿出一個元素,若是沒有則返回null

阻塞:當阻塞隊列滿時,若是生產者線程往隊列裏put元素,隊列會一直阻塞生產者線程,直到拿到數據,或者響應中斷退出。當隊列空時,消費者線程試圖從隊列裏take元素,隊列也會阻塞消費者線程,直到隊列可用。

超時:當阻塞隊列滿時,隊列會阻塞生產者線程一段時間,若是超過必定的時間,生產者線程就會退出。

咱們再看JDK爲咱們提供的一些阻塞隊列,以下圖:

簡單說明:

阻塞隊列 用法
ArrayBlockingQueue 一個由數組結構組成的有界阻塞隊列。
LinkedBlockingQueue 一個由鏈表結構組成的有界阻塞隊列。
PriorityBlockingQueue 一個支持優先級排序的無界阻塞隊列。
DelayQueue 一個使用優先級隊列實現的無界阻塞隊列。
SynchronousQueue 一個不存儲元素的阻塞隊列。
LinkedTransferQueue 一個由鏈表結構組成的無界阻塞隊列。
LinkedBlockingDeque 一個由鏈表結構組成的雙向阻塞隊列。

六、ThreadFactory threadFactory

建立線程的工廠,經過自定義的線程工廠能夠給每一個新建的線程設置一個具備識別度的線程名Executors靜態工廠裏默認的threadFactory,線程的命名規則是「pool-數字-thread-數字」。

七、RejectedExecutionHandler handler (飽和策略)

線程池的飽和策略,若是任務特別多,隊列也滿了,且沒有空閒線程進行處理,線程池將必須對新的任務採起飽和策略,即提供一種方式來處理這部分任務。

jdk 給咱們提供了四種策略,如圖:

策略 做用
AbortPolicy 直接拋出異常,該策略也爲默認策略
CallerRunsPolicy 在調用者線程中執行該任務
DiscardOldestPolicy 丟棄阻塞隊列最前面的任務,並執行當前任務
DiscardPolicy 直接丟棄任務

咱們能夠看到 RejectedExecutionHandler 實際是一個接口,且只有一個 rejectedExecution 因此咱們能夠根據本身的需求定義本身的飽和策略。

/**
 * A handler for tasks that cannot be executed by a {@link ThreadPoolExecutor}.
 *
 * @since 1.5
 * @author Doug Lea
 */

public interface RejectedExecutionHandler {

    /**
     * Method that may be invoked by a {@link ThreadPoolExecutor} when
     * {@link ThreadPoolExecutor#execute execute} cannot accept a
     * task.  This may occur when no more threads or queue slots are
     * available because their bounds would be exceeded, or upon
     * shutdown of the Executor.
     *
     * <p>In the absence of other alternatives, the method may throw
     * an unchecked {@link RejectedExecutionException}, which will be
     * propagated to the caller of {@code execute}.
     *
     * @param r the runnable task requested to be executed
     * @param executor the executor attempting to execute this task
     * @throws RejectedExecutionException if there is no remedy
     */

    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
複製代碼

四、線程池工做機制

熟悉了上面線程池的各個參數含義,對線程池的工做原理,咱們也能夠大體總結以下:

一、線程池剛建立的時候,裏面沒有線程在運行,當有任務進來,而且線程池開始執行的時候,會根據實際狀況處理。

二、當前線程池線程數量少於 corePoolSize 時候,每當有新的任務來時,都會建立一個新的線程進行執行。

三、當線程池中運行的線程數大於等於 corePoolSize ,每當有新的任務來的時候,都會加入阻塞隊列中。

四、當阻塞隊列加滿,沒法再加入新的任務的時候,則會再根據 maximumPoolSize數 來建立新的非核心線程執行任務。

四、當線程池中線程數目大於等於 maximumPoolSize 時候,當有新的任務來的時候,拒絕執行該任務,採起飽和策略。

五、當一個線程無事可作,超過必定的時間(keepAliveTime)時,線程池會判斷,若是當前運行的線程數大於 corePoolSize,那麼這個線程就被停掉。因此線程池的全部任務完成後,它最終會收縮到 corePoolSize 的大小。

五、建立線程池

5.一、ThreadPoolExecutor

直接經過 ThreadPoolExecutor 建立:

        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                210
                , 1, TimeUnit.SECONDS
                , new LinkedBlockingQueue<Runnable>(50)
                , Executors.defaultThreadFactory()
                , new ThreadPoolExecutor.AbortPolicy());
複製代碼
5.二、Executors 靜態方法

經過工具類java.util.concurrent.Executors 建立的線程池,其實質也是調用 ThreadPoolExecutor 進行建立,只是針對不一樣的需求,對參數進行了設置。

一、FixedThreadPool

可重用固定線程數

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
複製代碼

參數說明:

int corePoolSize: nThreads

int maximumPoolSize: nThreads

long keepAliveTime:0L

TimeUnit unit:TimeUnit.MILLISECONDS

BlockingQueue workQueue:new LinkedBlockingQueue()

能夠看到核心線程和非核心線程一致,及不會建立非核心線程,超時時間爲0,即就算線程處於空閒狀態,也不會對其進行回收,阻塞隊列爲LinkedBlockingQueue無界阻塞隊列。

當有任務來的時候,先建立核心線程,線程數超過 corePoolSize 就進入阻塞隊列,當有空閒線程的時候,再在阻塞隊列中去任務執行。

使用場景:線程池線程數固定,且不會回收,線程生命週期與線程池生命週期同步,適用任務量比較固定且耗時的長的任務。

二、newSingleThreadExecutor

單線程執行

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(11,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
複製代碼

參數說明

int corePoolSize: 1

int maximumPoolSize: 1

long keepAliveTime:0L

TimeUnit unit:TimeUnit.MILLISECONDS

BlockingQueue workQueue:new LinkedBlockingQueue()

基本和 FixedThreadPool 一致,最明顯的區別就是線程池中只存在一個核心線程來執行任務。

使用場景:只有一個線程,確保因此任務都在一個線程中順序執行,不須要處理線程同步問題,適用多個任務順序執行。

三、newCachedThreadPool
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
複製代碼

參數說明

int corePoolSize: 0

int maximumPoolSize: Integer.MAX_VALUE

long keepAliveTime:60L

TimeUnit unit:TimeUnit.SECONDS

BlockingQueue workQueue:new SynchronousQueue()

無核心線程,非核心線程數量 Integer.MAX_VALUE,能夠無限建立,空閒線程60秒會被回收,任務隊列採用的是SynchronousQueue,這個隊列是沒法插入任務的,一有任務當即執行。

使用場景:因爲非核心線程無限制,且使用沒法插入的SynchronousQueue隊列,因此適合任務量大但耗時少的任務。

四、newScheduledThreadPool

定時延時執行

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory)
 
{
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }
複製代碼

參數說明

int corePoolSize: corePoolSize (設定)

int maximumPoolSize: Integer.MAX_VALUE

long keepAliveTime:0

TimeUnit unit:NANOSECONDS

BlockingQueue workQueue:new DelayedWorkQueue()

核心線程數固定(設置),非核心線程數建立無限制,可是空閒時間爲0,即非核心線程一旦空閒就回收, DelayedWorkQueue() 無界隊列會將任務進行排序,延時執行隊列任務。

使用場景:newScheduledThreadPool是惟一一個具備定時按期執行任務功能的線程池。它適合執行一些週期性任務或者延時任務,能夠經過schedule(Runnable command, long delay, TimeUnit unit) 方法實現。

六、線程池的執行

線程池提供了 executesubmit 兩個方法來執行

execute:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // 得到當前線程的生命週期對應的二進制狀態碼
        int c = ctl.get();
        //判斷當前線程數量是否小於核心線程數量,若是小於就直接建立核心線程執行任務,建立成功直接跳出,失敗則接着往下走.
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //判斷線程池是否爲RUNNING狀態,而且將任務添加至隊列中.
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //審覈下線程池的狀態,若是不是RUNNING狀態,直接移除隊列中
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //若是當前線程數量爲0,則單首創建線程,而不指定任務.
            else if (workerCountOf(recheck) == 0)
                addWorker(nullfalse);
        }
        //若是不知足上述條件,嘗試建立一個非核心線程來執行任務,若是建立失敗,調用reject()方法.
        else if (!addWorker(command, false))
            reject(command);
    }
複製代碼

submit():

源碼:

  public <T> Future<T> submit(Runnable task, T result) {
        if (task == nullthrow new NullPointerException();
          // 將runnable封裝成 Future 對象
        RunnableFuture<T> ftask = newTaskFor(task, result);
          // 執行 execute 方法
        execute(ftask);
          // 返回包裝好的Runable
        return ftask;
    }



  // newTaskFor : 經過 FutureTask 
  protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
複製代碼

其中 newTaskFor 返回的 RunnableFuture<T> 方法繼承了 Runnable 接口,因此能夠直接經過 execute 方法執行。

public interface RunnableFuture<Vextends RunnableFuture<V{
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */

    void run();
}
複製代碼

能夠看到,submit 中實際也是調用了 execute() 方法,只不過在調用方法以前,先將Runnable對象封裝成FutureTask對象,而後再返回 Future<T>,咱們能夠經過Futureget 方法,拿到任務執行結束後的返回值。

咱們在 多線程(一)、基礎概念及notify()和wait()的使用 中也講了 FutureTask 它提供了 cancelisCancelledisDoneget幾個方法,來對任務進行相應的操做。

總結:

一般狀況下,咱們不須要對線程或者獲取執行結果,能夠直接使用 execute 方法。

若是咱們要獲取任務執行的結果,或者想對任務進行取消等操做,就使用 submit 方法。

七、線程池的關閉

關於線程的中斷在 多線程(一)、基礎概念及notify()和wait()的使用 裏面有介紹。

  • shutdown():不會當即終止線程池,而是要等全部任務緩存隊列中的任務都執行完後才終止,但不再會接受新的任務
  • shutdownNow():當即終止線程池,並嘗試打斷正在執行的任務,而且清空任務緩存隊列,返回還沒有執行的任務

八、線程池的合理配置

線程池的參數比較靈活,咱們能夠自由設置,可是具體每一個參數該設置成多少比較合理呢?這個要根據咱們處理的任務來決定,對任務通常從如下幾個點分析:

8.一、任務的性質

CPU 密集型、IO 密集型、混合型

CPU密集型應配置儘量小的線程,如Ncpu+1個線程的線程池

IO密集型,IO操做有關,如磁盤,內存,網絡等等,對CPU的要求不高則應配置儘量多的線程,如2*Ncpu個線程的線程池

混合型須要拆成CPU 密集型和IO 密集型分別分析,根據任務數量和執行時間,來決定線程的數量

8.二、任務的優先級

高中低優先級

8.三、任務執行時間

長中短

8.四、任務的依耐性

是否須要依賴其餘系統資源,如數據庫鏈接

Runtime.getRuntime().availableProcessors() : 當前設備的CPU個數

九、線程池實戰

又巴拉巴拉說了一大推,我以爲惟有代碼運行,經過結果分析最能打動人心,下面就經過代碼運行結果來分析。

先看這麼一段代碼:

    public static void main(String[] args) {
        // 一、經過 ThreadPoolExecutor 建立基本線程池
        final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                3,
                5,
                1,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(50));
        for (int i = 0; i < 30; i++) {
            final int num = i;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        // 睡兩秒後執行
                        Thread.sleep(2000);
                        System.out.println("run : " + num + "  當前線程:" + Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            // 執行
            threadPoolExecutor.execute(runnable);
        }
    }
複製代碼

咱們經過 ThreadPoolExecutor 建立了一個線程池,而後執行30個任務。

參數說明

int corePoolSize: 3

int maximumPoolSize: 5

long keepAliveTime:1

TimeUnit unit:TimeUnit.SECONDS

BlockingQueue workQueue:new LinkedBlockingQueue(50)

線程池核心線程數爲3,非核心線程數爲5,非核心線程空閒1秒被回收,阻塞隊列使用了 new LinkedBlockingQueue 並指定了隊列容量爲50。

結果:

咱們看到每兩秒後,有三個任務被執行。這是由於核心咱們設置的核心線程數爲3,當多餘的任務到來後,會先放入到阻塞隊列中,又因爲咱們設置的阻塞隊列容量爲50,因此,阻塞隊列永遠不會滿,就不會啓動非核心線程。

咱們改一下咱們的線程池以下:

final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                2,
                5,
                1,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(25));
複製代碼

參數就不分析了,咱們直接看結果:

咱們看到此次每隔兩秒有五個任務在執行,爲何?這裏要根據咱們前面線程池的工做原理來分析,咱們有三十個任務須要執行,核心線程數爲2,其他的任務放入阻塞隊列中,阻塞隊列容量爲25,剩餘任務不超過非核心線程數,當阻塞隊列滿的時候,就啓動了非核心線程來執行。

咱們再簡單改一下咱們的線程池,代碼以下:

        final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                2,
                5,
                1,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(24));
複製代碼

相比上面的,咱們就將阻塞隊列容量改爲了24,若是上面你對線程池的工做原理清楚了,你應該能知道我這裏改爲 24 的良苦用心了,咱們先看結果。

最直接的就是拋異常了,可是線程池仍然再執行任務,首先爲啥拋異常?首先,咱們須要執行三十個任務,可是咱們的阻塞隊列容量爲 24,隊列滿後啓動了非核心線程,可是非核心線程數量爲5,當剩下的這個任務來的時候,線程池將採起飽和策略,咱們沒有設置,默認爲 AbortPolicy,即直接拋異常,若是咱們手動設置飽和策略以下:

        final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                2,
                5,
                1,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(24),new ThreadPoolExecutor.DiscardPolicy());
複製代碼

咱們這裏採用的飽和策略爲 DiscardPolicy ,即丟棄多餘任務。最終能夠看到結果沒有拋異常,最終只執行了29個任務,最後一個任務被拋棄了。

最後再看一下經過 Executors 靜態方法建立的線程池運行上面的任務結果如何,Executors 建立的線程池本質也是經過建立 ThreadPoolExecutor 來執行,可結合上面分析自行總結。

一、FixedThreadPool

ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(3);
複製代碼

結果:

二、newSingleThreadExecutor

ExecutorService threadPoolExecutor = Executors.newSingleThreadExecutor();
複製代碼

結果:

三、newCachedThreadPool

ExecutorService threadPoolExecutor = Executors.newCachedThreadPool();
複製代碼

結果 :

四、newScheduledThreadPool

ScheduledExecutorService threadPoolExecutor = Executors.newScheduledThreadPool(3);
複製代碼

總結

這是多線程的第三篇,這篇文章篇幅有點多, 有點小亂,後續會再整理一下,基本都是跟着本身的思路,在寫的同時,本身也會再操做一遍,源碼分析過程當中,也會盡量的詳細,一步步的深刻,後續查閱的時候也方便,文章中有些不是很詳細的地方,後面可能會再次更新,或者單獨用一篇文章來說。

相關文章
相關標籤/搜索