在上一篇文章Java併發 之 線程池系列 (1) 讓多線程再也不坑爹的線程池中,咱們介紹了使用JDK concurrent包下的工廠和工具類Executors
來建立線程池經常使用的幾種方法:html
//建立固定線程數量的線程池
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
//建立一個線程池,該線程池會根據須要建立新的線程,但若是以前建立的線程能夠使用,會重用以前建立的線程
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
//建立一個只有一個線程的線程池
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
複製代碼
誠然,這種建立線程池的方法很是簡單和方便。但仔細閱讀源碼,卻把我嚇了一條: 這是要老子的命啊!java
咱們前面講過,若是有新的請求過來,在線程池中會建立新的線程處理這些任務,一直建立到線程池的最大容量(Max Size)爲止;超出線程池的最大容量的Tasks,會被放入阻塞隊列(Blocking Queue)進行等待,知道有線程資源釋放出來爲止;要知道的是,阻塞隊列也是有最大容量的,多餘隊列最大容量的請求不光沒有得到執行的機會,連排隊的資格都沒有!api
那這些連排隊的資格都沒有的Tasks怎麼處理呢?不要急,後面在介紹ThreadPoolExecutor
的拒絕處理策略(Handler Policies for Rejected Task)的時候會詳細介紹。安全
說到這裏你也許有寫疑惑了,上面這些東西,我一般使用Executors
的時候沒有指定過啊。是的,由於Executors
很「聰明」地幫咱們作了這些事情。bash
咱們看下Executors
的newFixedThreadPool
和newSingleThreadExecutor
方法的源碼:多線程
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(
nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
複製代碼
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(
1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
複製代碼
其實它們底層仍是經過ThreadPoolExecutor
來建立ExecutorService
的,這裏對它的參數先不做介紹,下面會詳細講,咱們只說一下new LinkedBlockingQueue<Runnable>()
這個參數。併發
LinkedBlockingQueue
就是當任務數大於線程池的線程數的時候的阻塞隊列,這裏使用的是無參構造,咱們再看一下構造函數:oracle
/** * Creates a {@code LinkedBlockingQueue} with a capacity of * {@link Integer#MAX_VALUE}. */
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
複製代碼
咱們看到阻塞隊列的默認大小居然是Integer.MAX_VALUE
!less
若是不作控制,拼命地往阻塞隊列裏放Task,分分鐘「Out of Memory」啊!jvm
還有更絕的,newCachedThreadPool
方法:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(
0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
複製代碼
最大線程數默認也是Integer.MAX_VALUE
,也就是說,若是以前的任務沒有執行完就有新的任務進來了,就會繼續建立新的線程,直到建立到Integer.MAX_VALUE
爲止。
下面提供一個使用newCachedThreadPool
建立大量線程處理Tasks,最終OutOfMemoryError
的例子。
友情提醒:場面過於血腥,請勿在生產環境使用。
package net.ijiangtao.tech.concurrent.jsd.threadpool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExample2 {
private static final ExecutorService executorService = Executors.newCachedThreadPool();
private static class Task implements Runnable {
@Override
public void run() {
try {
Thread.sleep(1000 * 600);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static void newCachedThreadPoolTesterBadly() {
System.out.println("begin............");
for (int i = 0; i <= Integer.MAX_VALUE; i++) {
executorService.execute(new Task());
}
System.out.println("end.");
}
public static void main(String[] args) {
newCachedThreadPoolTesterBadly();
}
}
複製代碼
當main
方法啓動之後,打開控制面板,看到CPU和內存幾乎已經所有耗盡:
很快控制檯就拋出了java.lang.OutOfMemoryError
:
begin............
Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:717)
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1378)
at net.ijiangtao.tech.concurrent.jsd.threadpool.ThreadPoolExample2.newCachedThreadPoolTesterBadly(ThreadPoolExample2.java:24)
at net.ijiangtao.tech.concurrent.jsd.threadpool.ThreadPoolExample2.main(ThreadPoolExample2.java:30)
複製代碼
下面咱們在看Java開發手冊這條規定,應該就明白做者的良苦用心了吧。
【強制】線程池不容許使用Executors去建立,而是經過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同窗更加明確線程池的運行規則,規避資源耗盡的風險。 說明: Executors返回的線程池對象的弊端以下: 1)FixedThreadPool和SingleThreadPool:容許的請求隊列長度爲Integer.MAX_VALUE,可能會堆積大量的請求,從而致使OOM。 2)CachedThreadPool和ScheduledThreadPool:容許的建立線程數量爲Integer.MAX_VALUE,可能會建立大量的線程,從而致使OOM。
解鈴還須繫鈴人,其實避免這個OutOfMemoryError
風險的鑰匙就藏在Executors
的源碼裏,那就是本身直接使用ThreadPoolExecutor
。
構造一個ThreadPoolExecutor
須要蠻多參數的。下面是ThreadPoolExecutor
的構造函數。
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
複製代碼
下面就一一介紹一下這些參數的具體含義。
其實從源碼中的JavaDoc已經能夠很清晰地明白這些參數的含義了,下面照顧懶得看英文的同窗,再解釋一下:
線程池核心線程數。
默認狀況下核心線程會一直存活,即便處於閒置狀態也不會受存keepAliveTime
限制,除非將allowCoreThreadTimeOut
設置爲true
。
線程池所能容納的最大線程數。超過maximumPoolSize
的線程將被阻塞。
最大線程數maximumPoolSize
不能小於corePoolSize
非核心線程的閒置超時時間。
超過這個時間非核心線程就會被回收。
keepAliveTime
的時間單位,如TimeUnit.SECONDS。
當將allowCoreThreadTimeOut
爲true時,對corePoolSize生效。
線程池中的任務隊列。
沒有得到線程資源的任務將會被放入workQueue
,等待線程資源被釋放。若是放入workQueue
的任務數大於workQueue
的容量,將由RejectedExecutionHandler
的拒絕策略進行處理。
經常使用的有三種隊列: SynchronousQueue
,LinkedBlockingDeque
,ArrayBlockingQueue
。
提供建立新線程功能的線程工廠。
ThreadFactory
是一個接口,只有一個newThread
方法:
Thread newThread(Runnable r);
複製代碼
沒法被線程池處理的任務的處理器。
通常是由於任務數超出了workQueue
的容量。
總結一下,當一個任務經過execute(Runnable)
方法添加到線程池時:
若是此時線程池中線程的數量小於corePoolSize
,即便線程池中的線程都處於空閒狀態,也要建立新的線程來處理被添加的任務。
若是此時線程池中的數量等於corePoolSize
,可是緩衝隊列workQueue
未滿,那麼任務被放入緩衝隊列。
若是此時線程池中的數量大於corePoolSize
,緩衝隊列workQueue
滿,而且線程池中的數量小於maximumPoolSize
,建新的線程來處理被添加的任務。
若是此時線程池中的數量大於corePoolSize
,緩衝隊列workQueue
滿,而且線程池中的數量等於maximumPoolSize
,那麼經過 handler所指定的拒絕策略來處理此任務。
處理任務的優先級爲:核心線程數(corePoolSize) > 任務隊列容量(workQueue) > 最大線程數(maximumPoolSize);若是三者都滿了,使用rejectedExecutionHandler處理被拒絕的任務。
下面就經過一個簡單的例子,使用ThreadPoolExecutor
構造的線程池執行任務。
package net.ijiangtao.tech.concurrent.jsd.threadpool;
import java.time.LocalTime;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/** * @author ijiangtao.net */
public class ThreadPoolExample3 {
private static final AtomicInteger threadNumber = new AtomicInteger(1);
private static class Task implements Runnable {
@Override
public void run() {
try {
Thread.currentThread().sleep(2000);
System.out.println(Thread.currentThread().getName() + "-" + LocalTime.now());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static class MyThreadFactory implements ThreadFactory {
private final String namePrefix;
public MyThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
}
public Thread newThread(Runnable runnable) {
return new Thread(runnable, namePrefix + "-" + threadNumber.getAndIncrement());
}
}
private static final ExecutorService executorService = new ThreadPoolExecutor(
10,
20, 30, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(50),
new MyThreadFactory("MyThreadFromPool"),
new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
// creates five tasks
Task r1 = new Task();
Task r2 = new Task();
Task r3 = new Task();
Task r4 = new Task();
Task r5 = new Task();
// submit方法有返回值
Future future = executorService.submit(r1);
System.out.println("r1 isDone ? " + future.isDone());
// execute方法沒有返回值
executorService.execute(r2);
executorService.execute(r3);
executorService.execute(r4);
executorService.execute(r5);
//關閉線程池
executorService.shutdown();
}
}
複製代碼
r1 isDone ? false
MyThreadFromPool-2-21:04:03.215
MyThreadFromPool-5-21:04:03.215
MyThreadFromPool-4-21:04:03.215
MyThreadFromPool-3-21:04:03.215
MyThreadFromPool-1-21:04:03.215
複製代碼
從結果看,從線程池取出了5個線程,併發執行了5個任務。
這一章咱們介紹了一種更安全、更定製化的線程池構建方式:ThreadPoolExecutor
。相信你之後不敢輕易使用Executors
來構造線程池了。
後面咱們會介紹線程池的更多實現方式(例如使用Google核心庫Guava),以及關於線程池的更多知識和實戰。