Java併發編程之線程池的使用

1. 爲何要使用多線程?

隨着科技的進步,如今的電腦及服務器的處理器數量都比較多,之後可能會愈來愈多,好比個人工做電腦的處理器有8個,怎麼查看呢?html

計算機右鍵--屬性--設備管理器,打開屬性窗口,而後點擊「設備管理器」,在「處理器」下可看到全部的處理器:java

也能夠經過如下Java代碼獲取處處理器的個數:面試

System.out.println("CPU個數:" + Runtime.getRuntime().availableProcessors());
複製代碼

運行結果以下所示:編程

CPU個數:8緩存

既然處理器的個數增長了,若是還使用傳統的串行編程,就有點浪費資源了,所以,爲了提升資源利用率,讓各個處理器都忙碌起來,就須要引入併發編程,要引入併發編程,就引入了多線程。服務器

能夠說,使用多線程的最直接目的就是爲了提升資源利用率,資源的利用率提升了,系統的吞吐率也就相應提升了。微信

2. 爲何要使用線程池?

在必定的範圍內,增長線程能夠提升應用程序的吞吐率,但線程並非越多越好(由於線程的建立與銷燬都須要很大的開銷),若是超過了某個範圍,不只會下降應用程序的執行速度,嚴重的話,應用程序甚至會崩潰,以致於不得不重啓應用程序。多線程

爲了不這種問題,就須要對應用程序能夠建立的線程數量進行限制,確保在線程數量達到限制時,程序也不會耗盡資源,線程池就是爲了解決這種問題而出現的。架構

線程池:管理一組工做線程的資源池。併發

線程池與工做隊列密切相關,工做隊列中保存了全部等待執行的任務。

工做者線程的任務就是從工做隊列中獲取一個任務,執行任務,而後返回線程池並等待下一個任務。

使用線程池能夠帶來如下好處:

  1. 經過重用現有的線程而不是建立新線程,能夠在處理多個任務時減小在線程建立與銷燬過程當中產生的巨大開銷。
  2. 當任務到達時,工做線程一般已經存在,所以不會因爲等待建立線程而延遲任務的執行,從而提升了響應性。
  3. 能夠經過調整線程池的大小,建立足夠多的線程使處理器保持忙碌狀態,同時還能夠防止過多線程相互競爭資源而使應用程序耗盡內存或崩潰。

3. 建立線程池

3.1 使用Executors靜態方法建立(不推薦)

Executors類提供瞭如下4個靜態方法來快速的建立線程池:

  1. newFixedThreadPool
  2. newCachedThreadPool
  3. newSingleThreadExecutor
  4. newScheduledThreadPool

首先看下newFixedThreadPool()方法的使用方式:

ExecutorService threadPool = Executors.newFixedThreadPool(10);
複製代碼

它的源碼爲:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
複製代碼

說明:newFixedThreadPool將建立一個固定長度的線程池,每當提交一個任務時就建立一個線程,直到達到線程池的最大數量,這時線程池的規模將再也不變化(若是某個線程因爲發生了未預期的Exception而結束,那麼線程池會補充一個新的線程)。

而後看下newCachedThreadPool()方法的使用方式:

ExecutorService threadPool = Executors.newCachedThreadPool();
複製代碼

它的源碼爲:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
複製代碼

說明:newCachedThreadPool將建立一個可緩存的線程池,若是線程池的規模超過了處理需求時,那麼將回收空閒的線程,而當需求增長時,則添加新的線程,線程池的最大規模爲Integer.MAX_VALUE。

而後看下newSingleThreadExecutor()方法的使用方式:

ExecutorService threadPool = Executors.newSingleThreadExecutor();
複製代碼

它的源碼爲:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
複製代碼

說明:newSingleThreadExecutor是一個單線程的Executor,它建立單個工做者線程來執行任務,若是這個線程異常結束,就建立一個新的線程來替代。

newSingleThreadExecutor能夠確保依照任務在隊列中的順序來串行執行。

最後看下newScheduledThreadPool()方法的使用方式:

ExecutorService threadPool = Executors.newScheduledThreadPool(10);
複製代碼

它的源碼爲:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
複製代碼

super指向以下代碼:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
複製代碼

說明:newScheduledThreadPool將建立一個固定長度的線程池,並且以延遲或者定時的方式來執行任務,相似於Timer。

能夠發現,以上4種方式最終都指向了ThreadPoolExecutor的如下構造函數,只是不少參數沒讓你指定,傳遞了默認值而已:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    // 省略具體的代碼
}
複製代碼

雖然使用這4個方法能夠快速的建立線程池,但仍是不推薦使用,第一,不少參數都設置了默認值,不便於你理解各個參數的具體含義,第二,參數的默認值可能會形成必定的問題,最好是由使用者根據本身的需求自行指定。

那麼這7個參數分別表明什麼含義呢?請接着往下看。

3.2 使用ThreadPoolExecutor構造函數建立(推薦)

ThreadPoolExecutor共有如下4個構造函數,推薦使用這種方式來建立線程池:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}
複製代碼

以上3個也都指向參數最全的第4個構造函數:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    // 省略具體的代碼
}
複製代碼

如下爲各個參數的講解:

  • corePoolSize:核心線程數。

  • maximumPoolSize:最大線程數。

    最大線程數=核心線程數+非核心線程數。

  • keepAliveTime:非核心線程閒置超時時間。

    一個非核心線程,若是不幹活(閒置狀態)的時長超過這個參數所設定的時長,就會被銷燬掉,若是設置了allowCoreThreadTimeOut = true,則會做用於核心線程。

  • unit:參數keepAliveTime的時間單位,如秒、分、小時。

  • workQueue:工做隊列,即要執行的任務隊列,裏面存儲等待執行的任務。

    這裏的阻塞隊列可選擇的有:LinkedBlockingQueue、ArrayBlockingQueue、SynchronousQueue、DelayedWorkQueue。

    newFixedThreadPool()方法默認使用的LinkedBlockingQueue,

    newCachedThreadPool()方法默認使用的SynchronousQueue,

    newSingleThreadExecutor()方法默認使用的LinkedBlockingQueue,

    newScheduledThreadPool()方法默認使用的DelayedWorkQueue。

  • threadFactory:線程工廠,用來用來建立線程。

  • handler:飽和策略/拒絕處理任務時的策略。

    當workQueue已滿,而且線程池的線程數已達到maximumPoolSize,此時新提交的任務會交由RejectedExecutionHandler handler處理,主要有如下4種策略:

    AbortPolicy:停止策略,拋棄任務並拋出未檢查的RejectedExecutionException,這也是默認的飽和策略。

    DiscardPolicy:拋棄策略,直接拋棄任務,但不拋出異常。

    DiscardOldestPolicy:拋棄最舊的策略,拋棄下一個將被執行的任務,而後嘗試從新提交新的任務。

    CallerRunsPolicy:調用者運行策略,將任務回退到調用者,在調用者所在的線程執行該任務。

4. 線程池的運行原理

能夠經過下面2張圖來理解線程池的運行原理:

1)若是線程池中的線程小於corePoolSize,則建立新線程來處理任務,這時建立的線程爲核心線程。

2)若是線程中的線程等於或者大於corePoolSize,則將任務放到工做隊列中,即上圖中的BlockingQueue。

3)若是工做隊列已滿,沒法將任務加入到BlockingQueue,則建立新的線程來處理任務,這時建立的線程爲非核心線程,非核心線程在空閒一段時間後會被回收銷燬掉(keepAliveTime和unit就是用來定義這個空閒的時間是多少)。

4)若是建立新線程致使線程池中的線程數超過了maximumPoolSize,任務將被拒絕,並調用RejectedExecutionHandler.rejectedExecution()方法。

5. ThreadPoolExecutor示例

新建以下示例代碼,建立1個corePoolSize爲2,maximumPoolSize爲3的線程池:

import java.util.concurrent.*;

public class ThreadPoolExecutorTest {
    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 3, 60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(1));

        threadPoolExecutor.execute(() -> {
            try {
                Thread.sleep(3 * 1000);
                System.out.println("任務1執行線程:" + Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        threadPoolExecutor.execute(() -> {
            System.out.println("任務2執行線程:" + Thread.currentThread().getName());
        });
    }
}
複製代碼

運行結果爲:

任務2執行線程:pool-1-thread-2

任務1執行線程:pool-1-thread-1

能夠看出,由於線程池中的線程數小於corePoolSize,線程池建立了2個核心線程來分別執行任務1和任務2。

修改代碼爲以下所示,開啓3個任務:

import java.util.concurrent.*;

public class ThreadPoolExecutorTest {
    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 3, 60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(1));

        threadPoolExecutor.execute(() -> {
            try {
                Thread.sleep(3 * 1000);
                System.out.println("任務1執行線程:" + Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        threadPoolExecutor.execute(() -> {
            try {
                Thread.sleep(5 * 1000);
                System.out.println("任務2執行線程:" + Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        threadPoolExecutor.execute(() -> System.out.println("任務3執行線程:" + Thread.currentThread().getName()));
    }
}
複製代碼

運行結果爲:

任務1執行線程:pool-1-thread-1

任務3執行線程:pool-1-thread-1

任務2執行線程:pool-1-thread-2

能夠看出,執行任務3時並無新建線程,而是先放入了工做隊列,最後由線程1執行完成。

在上面的代碼中新增個任務4:

threadPoolExecutor.execute(() -> System.out.println("任務4執行線程:" + Thread.currentThread().getName()));
複製代碼

此時運行結果爲:

任務4執行線程:pool-1-thread-3

任務3執行線程:pool-1-thread-3

任務1執行線程:pool-1-thread-1

任務2執行線程:pool-1-thread-2

能夠看出,任務3是先放入了工做隊列,任務4放不到工做隊列(空間已滿),因此建立了第3個線程來執行,執行完畢後從隊列裏獲取到任務3執行,任務1和任務2分別由線程1和線程2執行。

修改下任務4的代碼,並添加任務5:

threadPoolExecutor.execute(() -> {
    try {
        Thread.sleep(2 * 1000);
        System.out.println("任務4執行線程:" + Thread.currentThread().getName());
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});

threadPoolExecutor.execute(() -> System.out.println("任務5執行線程:" + Thread.currentThread().getName()));
複製代碼

此時運行結果爲:

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task ThreadPoolExecutorTest$$Lambda5/935044096@179d3b25 rejected from java.util.concurrent.ThreadPoolExecutor@254989ff[Running, pool size = 3, active threads = 3, queued tasks = 1, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutorAbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at ThreadPoolExecutorTest.main(ThreadPoolExecutorTest.java:37) 任務4執行線程:pool-1-thread-3

任務3執行線程:pool-1-thread-3

任務1執行線程:pool-1-thread-1

任務2執行線程:pool-1-thread-2

能夠看出,當提交任務5時,因爲工做隊列已滿, 且線程池中的線程數已經爲3,因此該任務被拋棄並拋出了java.util.concurrent.RejectedExecutionException異常。

若是你看到了這裏,是否會好奇參數maximumPoolSize設置爲多少合適呢?

這個問題,咱們下次講解,歡迎持續關注,哈哈!

6. 源碼及參考

Brian Goetz《Java併發編程實戰》

怎麼查看處理器(cpu)的核數

ThreadPoolExecutor使用方法

Java線程池-ThreadPoolExecutor原理分析與實戰

深刻理解 Java 多線程核心知識:跳槽面試必備

互聯網大廠Java面試題:使用無界隊列的線程池會致使內存飆升嗎?【石杉的架構筆記】

最後,歡迎關注個人微信公衆號:「申城異鄉人」,全部博客會同步更新。

相關文章
相關標籤/搜索