使用無界隊列的線程池會致使內存飆升嗎?面試官常常會問這個問題,本文將基於源碼,去分析newFixedThreadPool線程池致使的內存飆升問題,但願能加深你們的理解。java
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < Integer.MAX_VALUE; i++) {
executor.execute(() -> {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
//do nothing
}
});
}
複製代碼
IDE指定JVM參數:-Xmx8m -Xms8m : node
run以上代碼,會拋出OOM: 面試
JVM OOM問題通常是 建立太多對象,同時 GC 垃圾來不及回收致使的,那麼什麼緣由 致使線程池的OOM呢?帶着發現新大陸的心情,咱們從源碼角度分析這個問題,去找找實例代碼中哪裏創了太多對象。以上的實例代碼,就一個newFixedThreadPool和一個execute方法。首先,咱們先來看一下newFixedThreadPool方法的源碼算法
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
複製代碼
該段源碼以及結合線程池特色,咱們能夠知道newFixedThreadPool:編程
線程池特色瞭解不是很清楚的朋友,能夠看我這篇文章,面試必備:Java線程池解析bash
接下來,咱們再來看看線程池執行方法execute的源碼。併發
execute的源碼以及相關解釋以下:函數
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { //步驟一:判斷當前正在工做的線程是否比核心線程數量小
if (addWorker(command, true)) // 以核心線程的身份,添加到工做集合
return;
c = ctl.get();
}
//步驟二:不知足步驟一,線程池還在RUNNING狀態,阻塞隊列也沒滿的狀況下,把執行任務添加到阻塞隊列workQueue。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//來個double check ,檢查線程池是否忽然被關閉
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//步驟三:若是阻塞隊列也滿了,執行任務以非核心線程的身份,添加到工做集合
else if (!addWorker(command, false))
reject(command);
}
複製代碼
縱觀以上代碼,咱們能夠發現就addWorker 以及workQueue.offer(command) 可能在建立對象。那咱們先分析addWorker方法。oop
addWorker源碼以及相關解釋以下源碼分析
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
//獲取當前線程池的狀態
int rs = runStateOf(c);
//若是線程池狀態是STOP,TIDYING,TERMINATED狀態的話,則會返回false。
// 若是如今狀態是SHUTDOWN,可是firstTask不爲空或者workQueue爲空的話,那麼直接返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//自旋
for (;;) {
//獲取當前工做線程的數量
int wc = workerCountOf(c);
//判斷線程數量是否符合要求,若是要建立的是核心工做線程,判斷當前工做線程數量是否已經超過coreSize,
// 若是要建立的是非核心線程,判斷當前工做線程數量是否超過maximumPoolSize,是的話就返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//若是線程數量符合要求,就經過CAS算法,將WorkerCount加1,成功就跳出retry自旋
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
retry inner loop
}
}
//線程啓動標誌
boolean workerStarted = false;
//線程添加進集合workers標誌
boolean workerAdded = false;
Worker w = null;
try {
//由(Runnable 構造Worker對象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//獲取線程池的重入鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//獲取線程池狀態
int rs = runStateOf(ctl.get());
//若是狀態知足,將Worker對象添加到workers集合
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//啓動Worker中的線程開始執行任務
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//線程啓動失敗,執行addWorkerFailed方法
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
複製代碼
addWorker執行流程
大概就是判斷線程池狀態是否OK,若是OK,在判斷當前工做中的線程數量是否知足(小於coreSize/maximumPoolSize),若是不知足,不添加,若是知足,就將執行任務添加到工做集合workers,,並啓動執行該線程。
再看一下workers的類型:
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
複製代碼
workers是一個HashSet集合,它由coreSize/maximumPoolSize控制着,那麼addWorker方法會致使OOM?結合實例代碼demo,coreSize=maximumPoolSize=10,若是超過10,不會再添加到workers了,因此它不是致使newFixedThreadPool內存飆升的緣由。那麼,問題應該就在於workQueue.offer(command) 方法了。爲了讓整個流程清晰,咱們畫一下execute執行的流程圖。
根據以上execute以及addWork源碼分析,咱們把流程圖畫出來:
看完execute的執行流程,我猜想,內存飆升問題就是workQueue塞滿了。接下來,進行阻塞隊列源碼分析,揭開內存飆升問題的神祕面紗。
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
複製代碼
LinkedBlockingQueue無參構造函數,默認構造Integer.MAX_VALUE(那麼大) 的鏈表,看到這裏,你回想一下execute流程,是否是阻塞隊列一直不會滿了,這隊列來者不拒,把全部阻塞任務收於麾下。。。是否是內存飆升問題水落石出啦。
線程池中,插入隊列用了offer方法,咱們來看一下阻塞隊列LinkedBlockingQueue的offer騷操做吧
public boolean offer(E e) {
//爲空元素則拋出空指針異常
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
//如採當前隊列滿則丟棄將要放入的元素, 而後返回false
if (count.get() == capacity)
return false;
int c = -1;
//構造新節點,獲取putLock獨佔鎖
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//如採隊列不滿則進隊列,並遞增元素計數
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
//新元素入隊後隊列還有空閒空間,則
喚醒 notFull 的條件隊列中一條阻塞線程
if (c + 1 < capacity)
notFull.signal();
}
} finally {
//釋放鎖
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
複製代碼
offer操做向隊列尾部插入一個元素,若是隊列中有空閒則插入成功後返回 true,若是隊列己滿 則丟棄當前元素而後返回 false。 若是 e 元素爲 null 則拋出 Nul!PointerException 異常。另外, 該方法是非阻塞的。
newFixedThreadPool線程池的核心線程數是固定的,它使用了近乎於無界的LinkedBlockingQueue阻塞隊列。當核心線程用完後,任務會入隊到阻塞隊列,若是任務執行的時間比較長,沒有釋放,會致使愈來愈多的任務堆積到阻塞隊列,最後致使機器的內存使用不停的飆升,形成JVM OOM。