一篇搞懂Java線程池

歡迎關注微信公衆號:xiaosen_javasharejava

在上一篇文章《spring boot使用@Async異步任務》中咱們瞭解了使用@Async的異步任務使用,在這篇文章中咱們將學習使用線程池來建立異步任務的線程。spring

在《阿里巴巴Java開發手冊中》對線程使用有以下要求:數組

接下來就讓咱們就好好了解一下線程池。微信

線程池簡單介紹

在Java5中引入Executor框架。框架

ThreadPoolExecutor線程池解析

其類關係圖以下:異步

下圖是ThreadPoolExecutor的構造方法:ide

咱們這裏看構造參數最多的也是最全的方法:學習

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        if (corePoolSize < 0 || maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize || keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ? null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
複製代碼
  1. corePoolSize(線程池的基本大小):當提交一個任務到線程池時,線程池會建立一個線程來執行任務,即便其餘空閒的基本線程可以執行新任務也會建立線程,等到須要執行的任務數大於線程池基本大小時就再也不建立。若是調用了線程池的prestartAllCoreThreads方法,線程池會提早建立並啓動全部基本線程。
  2. maximumPoolSize(線程池最大大小):線程池容許建立的最大線程數。若是隊列滿了,而且已建立的線程數小於最大線程數,則線程池會再建立新的線程執行任務。值得注意的是若是使用了無界的任務隊列這個參數就沒什麼效果。
  3. keepAliveTime(線程活動保持時間):線程池的工做線程空閒後,保持存活的時間。因此若是任務不少,而且每一個任務執行的時間比較短,能夠調大這個時間,提升線程的利用率。
  4. TimeUnit(線程活動保持時間的單位):可選的單位有天(DAYS),小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
  5. workQueue (任務隊列):用於保存等待執行的任務的阻塞隊列。能夠選擇如下幾個阻塞隊列。
    1. ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。
    2. LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按FIFO (先進先出) 排序元素,吞吐量一般要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列
    3. SynchronousQueue:一個不存儲元素的阻塞隊列。每一個插入操做必須等到另外一個線程調用移除操做,不然插入操做一直處於阻塞狀態,吞吐量一般要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個隊列。
    4. PriorityBlockingQueue:一個具備優先級的無限阻塞隊列。
  1. ThreadFactory:用於設置建立線程的工廠,能夠經過線程工廠給每一個建立出來的線程作些更有意義的事情,好比設置daemon和優先級等等this

  2. RejectedExecutionHandler(飽和策略):當隊列和線程池都滿了,說明線程池處於飽和狀態,那麼必須採起一種策略處理提交的新任務。這個策略默認狀況下是AbortPolicy,表示沒法處理新任務時拋出異常。如下是JDK1.5提供的四種策略。spa

    1. AbortPolicy:直接拋出異常。
    2. CallerRunsPolicy:只用調用者所在線程來運行任務。
    3. DiscardOldestPolicy:丟棄隊列裏最近的一個任務,並執行當前任務。
    4. DiscardPolicy:不處理,丟棄掉。
    5. 也能夠根據應用場景須要來實現RejectedExecutionHandler接口自定義策略。如記錄日誌或持久化不能處理的任務。
線程池的工做方式
  1. 若是運行的線程少於 corePoolSize,則 Executor 始終建立新的線程,而不添加到queue中。
  2. 若是運行的線程等於或多於 corePoolSize,則 Executor 始終將請求加入隊列,而不建立新的線程。
  3. 若是沒法將請求加入隊列(隊列已滿),則建立新的線程,除非建立此線程超出 maximumPoolSize,若是超過,在這種狀況下,新的任務將被拒絕。

Executors提供的線程池方法

  1. newFixedThreadPool固定線程數的線程。
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
複製代碼

從上面源代碼能夠看出新建立的newFixedThreadPool的corePoolSize和maximumPoolSize都被設置爲nThreads。

說明:

  1. 若是當前運行的線程數小於corePoolSize,則建立新的線程來執行任務;
  2. 當前運行的線程數等於corePoolSize後,將任務加入LinkedBlockingQueue;
  3. 線程執行完1中的任務後,會在循環中反覆從LinkedBlockingQueue中獲取任務來執行。

FixedThreadPool使用無界隊列 LinkedBlockingQueue(隊列的容量爲Intger.MAX_VALUE)做爲線程池的工做隊列會對線程池帶來以下影響:

  1. 當線程池中的線程數達到corePoolSize後,新任務將在無界隊列中等待,所以線程池中的線程數不會超過corePoolSize;
  2. 使用無界隊列時maximumPoolSize和keepAliveTime將是無效參數;
  3. 運行中的newFixedThreadPool(未執行shutdown()或shutdownNow()方法)不會拒絕任務。
  1. newSingleThreadExecutor單個線程的線程池
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
複製代碼

從上面源代碼能夠看出新建立的SingleThreadExecutor的corePoolSize和maximumPoolSize都被設置爲1.其餘參數和newFixedThreadPool相同。也是用的是無界隊列存放。

說明:

  1. 若是當前運行的線程數少於corePoolSize,則建立一個新的線程執行任務;
  2. 當前線程池中有一個運行的線程後,將任務加入LinkedBlockingQueue
  3. 線程執行完1中的任務後,會在循環中反覆從LinkedBlockingQueue中獲取任務來執行。
  1. newCachedThreadPool, 先看一下源碼:
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }
複製代碼

newCachedThreadPool的corePoolSize是0,maximumPoolSize被設置爲Integer.MAX.VALUE。

說明:

  1. SynchronousQueue是無界的,在某次添加元素後必須等待其餘線程取走後才能繼續添加。
  2. 若是沒法將請求加入隊列,則建立新的線程,除非建立此線程超出maximumPoolSize,在這種狀況下,任務將被拒絕。
  1. ScheduledExecutorService,此線程池支持定時以及週期性執行任務的需求。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }
複製代碼

咱們再看一下ScheduledThreadPoolExecutor中的源碼:

public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }
複製代碼

調用schedule方法,delay是延遲多少時間執行。

以上就是Executors提供的幾種常見的線程池的解析。

ThreadPoolTaskExecutor

下面咱們來看一下spring爲咱們提供的線程池ThreadPoolTaskExecutor。下圖是其類關係圖:

private final Object poolSizeMonitor = new Object();

	private int corePoolSize = 1;

	private int maxPoolSize = Integer.MAX_VALUE;

	private int keepAliveSeconds = 60;

	private int queueCapacity = Integer.MAX_VALUE;

	private boolean allowCoreThreadTimeOut = false;

	@Nullable
	private TaskDecorator taskDecorator;

	@Nullable
	private ThreadPoolExecutor threadPoolExecutor;
複製代碼

這是ThreadPoolTaskExecutor類中的屬性,能夠看出依然須要ThreadPoolExecutor類來支持。默認使用無界隊列。

在初始化方法中調用了ThreadPoolExecutor的構造器:

@Override
	protected ExecutorService initializeExecutor( ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

		BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);

		ThreadPoolExecutor executor;
		if (this.taskDecorator != null) {
			executor = new ThreadPoolExecutor(
					this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
					queue, threadFactory, rejectedExecutionHandler) {
				@Override
				public void execute(Runnable command) {
					Runnable decorated = taskDecorator.decorate(command);
					if (decorated != command) {
						decoratedTaskMap.put(decorated, command);
					}
					super.execute(decorated);
				}
			};
		}
		else {
			executor = new ThreadPoolExecutor(
					this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
					queue, threadFactory, rejectedExecutionHandler);

		}

		if (this.allowCoreThreadTimeOut) {
			executor.allowCoreThreadTimeOut(true);
		}

		this.threadPoolExecutor = executor;
		return executor;
	}
複製代碼

說明:

  1. spring 配置的線城池(threadPoolTaskExecutor)因爲是spring建立注入的,在首次使用以後,會一直保持corePoolSize個空閒線程,它只會把多餘的空閒線程在keepAliveSeconds 時間以後釋放,並且線城池不能調用shutdown()方法,不然再次調用,因爲線程池已經關閉,會報錯。
  2. threadPoolTaskExecutor也能夠在配置文件配置多個線城池,防止多有任務之間競爭,或者因爲不一樣任務使用的線城池大小不一樣等狀況。

總結

以上簡單的介紹了java自帶的四種線程池和spring提供的線程池,他們各有利弊,實際項目中能夠根據需求選擇。


歡迎關注公衆號:

歡迎關注公衆號
相關文章
相關標籤/搜索