Java併發 之 線程池系列 (2) 使用ThreadPoolExecutor構造線程池

Executors的「罪與罰」

在上一篇文章Java併發 之 線程池系列 (1) 讓多線程再也不坑爹的線程池中,咱們介紹了使用JDK concurrent包下的工廠和工具類Executors來建立線程池經常使用的幾種方法:html

//建立固定線程數量的線程池
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);

//建立一個線程池,該線程池會根據須要建立新的線程,但若是以前建立的線程能夠使用,會重用以前建立的線程
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

//建立一個只有一個線程的線程池
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
複製代碼

誠然,這種建立線程池的方法很是簡單和方便。但仔細閱讀源碼,卻把我嚇了一條: 這是要老子的命啊!java

咱們前面講過,若是有新的請求過來,在線程池中會建立新的線程處理這些任務,一直建立到線程池的最大容量(Max Size)爲止;超出線程池的最大容量的Tasks,會被放入阻塞隊列(Blocking Queue)進行等待,知道有線程資源釋放出來爲止;要知道的是,阻塞隊列也是有最大容量的,多餘隊列最大容量的請求不光沒有得到執行的機會,連排隊的資格都沒有!api

那這些連排隊的資格都沒有的Tasks怎麼處理呢?不要急,後面在介紹ThreadPoolExecutor的拒絕處理策略(Handler Policies for Rejected Task)的時候會詳細介紹。安全

說到這裏你也許有寫疑惑了,上面這些東西,我一般使用Executors的時候沒有指定過啊。是的,由於Executors很「聰明」地幫咱們作了這些事情。bash

Executors的源碼

咱們看下ExecutorsnewFixedThreadPoolnewSingleThreadExecutor方法的源碼:多線程

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(
        nThreads, nThreads,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>());
}
複製代碼
public static ExecutorService newSingleThreadExecutor() {
     return new FinalizableDelegatedExecutorService
         (new ThreadPoolExecutor(
         1, 1,
         0L, TimeUnit.MILLISECONDS,
         new LinkedBlockingQueue<Runnable>()));
 }
複製代碼

其實它們底層仍是經過ThreadPoolExecutor來建立ExecutorService的,這裏對它的參數先不做介紹,下面會詳細講,咱們只說一下new LinkedBlockingQueue<Runnable>()這個參數。併發

LinkedBlockingQueue就是當任務數大於線程池的線程數的時候的阻塞隊列,這裏使用的是無參構造,咱們再看一下構造函數:oracle

/** * Creates a {@code LinkedBlockingQueue} with a capacity of * {@link Integer#MAX_VALUE}. */
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}
複製代碼

咱們看到阻塞隊列的默認大小居然是Integer.MAX_VALUEless

若是不作控制,拼命地往阻塞隊列裏放Task,分分鐘「Out of Memory」啊!jvm

還有更絕的,newCachedThreadPool方法:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(
    0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
}
複製代碼

最大線程數默認也是Integer.MAX_VALUE,也就是說,若是以前的任務沒有執行完就有新的任務進來了,就會繼續建立新的線程,直到建立到Integer.MAX_VALUE爲止。

讓你的JVM OutOfMemoryError

下面提供一個使用newCachedThreadPool建立大量線程處理Tasks,最終OutOfMemoryError的例子。

友情提醒:場面過於血腥,請勿在生產環境使用。

package net.ijiangtao.tech.concurrent.jsd.threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolExample2 {

    private static final ExecutorService executorService = Executors.newCachedThreadPool();

    private static class Task implements Runnable {
        @Override
        public void run() {
            try {
                Thread.sleep(1000 * 600);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private static void newCachedThreadPoolTesterBadly() {
        System.out.println("begin............");
        for (int i = 0; i <= Integer.MAX_VALUE; i++) {
            executorService.execute(new Task());
        }
        System.out.println("end.");
    }

    public static void main(String[] args) {
        newCachedThreadPoolTesterBadly();
    }

}
複製代碼

main方法啓動之後,打開控制面板,看到CPU和內存幾乎已經所有耗盡:

很快控制檯就拋出了java.lang.OutOfMemoryError

begin............
Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread
	at java.lang.Thread.start0(Native Method)
	at java.lang.Thread.start(Thread.java:717)
	at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1378)
	at net.ijiangtao.tech.concurrent.jsd.threadpool.ThreadPoolExample2.newCachedThreadPoolTesterBadly(ThreadPoolExample2.java:24)
	at net.ijiangtao.tech.concurrent.jsd.threadpool.ThreadPoolExample2.main(ThreadPoolExample2.java:30)
複製代碼

阿里巴巴Java開發手冊

下面咱們在看Java開發手冊這條規定,應該就明白做者的良苦用心了吧。

【強制】線程池不容許使用Executors去建立,而是經過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同窗更加明確線程池的運行規則,規避資源耗盡的風險。 說明: Executors返回的線程池對象的弊端以下: 1)FixedThreadPool和SingleThreadPool:容許的請求隊列長度爲Integer.MAX_VALUE,可能會堆積大量的請求,從而致使OOM。 2)CachedThreadPool和ScheduledThreadPool:容許的建立線程數量爲Integer.MAX_VALUE,可能會建立大量的線程,從而致使OOM。

主角出場

解鈴還須繫鈴人,其實避免這個OutOfMemoryError風險的鑰匙就藏在Executors的源碼裏,那就是本身直接使用ThreadPoolExecutor

ThreadPoolExecutor的構造

構造一個ThreadPoolExecutor須要蠻多參數的。下面是ThreadPoolExecutor的構造函數。

/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
複製代碼

下面就一一介紹一下這些參數的具體含義。

ThreadPoolExecutor構造參數說明

其實從源碼中的JavaDoc已經能夠很清晰地明白這些參數的含義了,下面照顧懶得看英文的同窗,再解釋一下:

  • corePoolSize

線程池核心線程數。

默認狀況下核心線程會一直存活,即便處於閒置狀態也不會受存keepAliveTime限制,除非將allowCoreThreadTimeOut設置爲true

  • maximumPoolSize

線程池所能容納的最大線程數。超過maximumPoolSize的線程將被阻塞。

最大線程數maximumPoolSize不能小於corePoolSize

  • keepAliveTime

非核心線程的閒置超時時間。

超過這個時間非核心線程就會被回收。

  • TimeUnit

keepAliveTime的時間單位,如TimeUnit.SECONDS。

當將allowCoreThreadTimeOut爲true時,對corePoolSize生效。

  • workQueue

線程池中的任務隊列。

沒有得到線程資源的任務將會被放入workQueue,等待線程資源被釋放。若是放入workQueue的任務數大於workQueue的容量,將由RejectedExecutionHandler的拒絕策略進行處理。

經常使用的有三種隊列: SynchronousQueue,LinkedBlockingDeque,ArrayBlockingQueue

  • threadFactory

提供建立新線程功能的線程工廠。

ThreadFactory是一個接口,只有一個newThread方法:

Thread newThread(Runnable r);
複製代碼
  • rejectedExecutionHandler

沒法被線程池處理的任務的處理器。

通常是由於任務數超出了workQueue的容量。

當一個任務被加入線程池時

總結一下,當一個任務經過execute(Runnable)方法添加到線程池時:

  1. 若是此時線程池中線程的數量小於corePoolSize,即便線程池中的線程都處於空閒狀態,也要建立新的線程來處理被添加的任務。

  2. 若是此時線程池中的數量等於corePoolSize,可是緩衝隊列workQueue未滿,那麼任務被放入緩衝隊列。

  3. 若是此時線程池中的數量大於corePoolSize,緩衝隊列workQueue滿,而且線程池中的數量小於maximumPoolSize,建新的線程來處理被添加的任務。

  4. 若是此時線程池中的數量大於corePoolSize,緩衝隊列workQueue滿,而且線程池中的數量等於maximumPoolSize,那麼經過 handler所指定的拒絕策略來處理此任務。

處理任務的優先級爲:核心線程數(corePoolSize) > 任務隊列容量(workQueue) > 最大線程數(maximumPoolSize);若是三者都滿了,使用rejectedExecutionHandler處理被拒絕的任務。

ThreadPoolExecutor的使用

下面就經過一個簡單的例子,使用ThreadPoolExecutor構造的線程池執行任務。

ThreadPoolExample3

package net.ijiangtao.tech.concurrent.jsd.threadpool;

import java.time.LocalTime;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/** * @author ijiangtao.net */
public class ThreadPoolExample3 {

    private static final AtomicInteger threadNumber = new AtomicInteger(1);

    private static class Task implements Runnable {
        @Override
        public void run() {
            try {
                Thread.currentThread().sleep(2000);
                System.out.println(Thread.currentThread().getName() + "-" + LocalTime.now());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


    private static class MyThreadFactory implements ThreadFactory {

        private final String namePrefix;

        public MyThreadFactory(String namePrefix) {
            this.namePrefix = namePrefix;
        }

        public Thread newThread(Runnable runnable) {
            return new Thread(runnable, namePrefix + "-" + threadNumber.getAndIncrement());
        }

    }

    private static final ExecutorService executorService = new ThreadPoolExecutor(
            10,
            20, 30, TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(50),
            new MyThreadFactory("MyThreadFromPool"),
            new ThreadPoolExecutor.AbortPolicy());

    public static void main(String[] args) {

        // creates five tasks
        Task r1 = new Task();
        Task r2 = new Task();
        Task r3 = new Task();
        Task r4 = new Task();
        Task r5 = new Task();

        // submit方法有返回值
        Future future = executorService.submit(r1);
        System.out.println("r1 isDone ? " + future.isDone());

        // execute方法沒有返回值
        executorService.execute(r2);
        executorService.execute(r3);
        executorService.execute(r4);
        executorService.execute(r5);

        //關閉線程池
        executorService.shutdown();

    }

}
複製代碼

執行結果

r1 isDone ? false
MyThreadFromPool-2-21:04:03.215
MyThreadFromPool-5-21:04:03.215
MyThreadFromPool-4-21:04:03.215
MyThreadFromPool-3-21:04:03.215
MyThreadFromPool-1-21:04:03.215
複製代碼

從結果看,從線程池取出了5個線程,併發執行了5個任務。

總結

這一章咱們介紹了一種更安全、更定製化的線程池構建方式:ThreadPoolExecutor。相信你之後不敢輕易使用Executors來構造線程池了。

後面咱們會介紹線程池的更多實現方式(例如使用Google核心庫Guava),以及關於線程池的更多知識和實戰。

Links

做者資源

相關資源

相關文章
相關標籤/搜索