最近遇到了一個OOM的問題,提示的是沒法建立更多的線程,定位問題,發現是相似下面的這段代碼出現了問題,用JConsole監測,發現某一時段線程數量突然飆升,由此引起了下面的思考bash
public class DemoController {
private ExecutorService executorService = Executors.newWorkStealingPool(20);
@RequestMapping("/test")
public String test() {
ExecutorService forkJoinPool = Executors.newWorkStealingPool(10);
CompletableFuture[] completableFutures = new CompletableFuture[600];
for (int i = 0; i < 600; i++) {
int j = i;
completableFutures[i] = CompletableFuture.runAsync(() -> {
getAssociatedInfo(forkJoinPool);
}, forkJoinPool);
}
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(completableFutures);
voidCompletableFuture.join();
return "OK";
}
public String getAssociatedInfo(ExecutorService service) {
CompletableFuture<String> trialAssociatedInfoCompletableFuture
= CompletableFuture.supplyAsync(() -> {
try {
System.out.println("按理說你已在運行,不是嗎");
TimeUnit.SECONDS.sleep(100);
System.out.println("你已經完成了");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "a";
}, executorService);
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(trialAssociatedInfoCompletableFuture);
voidCompletableFuture.join();
return "ok";
}
}
複製代碼
completableFutures[i] = CompletableFuture.runAsync(() -> {
getAssociatedInfo(forkJoinPool);
}, forkJoinPool);
複製代碼
這一句的做用是啓異步任務,交由forkJoinPool線程池管理,當線程池數量不足10個時,啓動一個線程,當即執行,當超過10個時,加入任務隊列。多線程
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(completableFutures);
複製代碼
allOf的做用是遞歸地構造完成樹,彙總並返回成一個總任務,以下圖所示: 併發
// 從多線程的角度,若任務未完成,會阻塞
voidCompletableFuture.join();
return "OK";
CompletableFuture->join():
return reportJoin((r = result) == null ? waitingGet(false) : r);
CompletableFuture->waitingGet():
Signaller q = null;
boolean queued = false;
int spins = -1;
Object r;
// 當返回任務不爲空,循環結束
while ((r = result) == null) {
if (spins < 0)
spins = (Runtime.getRuntime().availableProcessors() > 1) ?
1 << 8 : 0; // Use brief spin-wait on multiprocessors
else if (spins > 0) {
if (ThreadLocalRandom.nextSecondarySeed() >= 0)
--spins;
}
else if (q == null)
// 實例化一個信號量 --1
q = new Signaller(interruptible, 0L, 0L);
else if (!queued)
queued = tryPushStack(q);
else if (interruptible && q.interruptControl < 0) {
q.thread = null;
cleanStack();
return null;
}
else if (q.thread != null && result == null) {
try {
// 若遲遲沒有返回結果,最終會走到這個方法中,下面是ForkJoinPool對信號量的管理
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
q.interruptControl = -1;
}
}
}
ForkJoinPool->managedBlock():
Thread t = Thread.currentThread();
if ((t instanceof ForkJoinWorkerThread) &&
(p = (wt = (ForkJoinWorkerThread)t).pool) != null) {
WorkQueue w = wt.workQueue;
while (!blocker.isReleasable()) {
//
if (p.tryCompensate(w)) { // --2
try {
do {} while (!blocker.isReleasable() &&
!blocker.block());
} finally {
U.getAndAddLong(p, CTL, AC_UNIT);
}
break;
}
}
}
else {
do {} while (!blocker.isReleasable() &&
!blocker.block());
}
ForkJoinPool->tryCompensate(): // --2
canBlock = add && createWorker(); // throws on exception
複製代碼
CompletableFuture->Signaller->Signaller(): // --1
Signaller(boolean interruptible, long nanos, long deadline) {
// thread變量是當前線程
this.thread = Thread.currentThread();
this.interruptControl = interruptible ? 1 : 0;
this.nanos = nanos;
this.deadline = deadline;
}
複製代碼
到第一個voidCompletableFuture.join(),該線程是http線程,由forkJoinPool線程池管理,最多10個線程並行,而後到waitingGet(),因爲其不是forkJoin線程,所以走的是else方法app
到第二個voidCompletableFuture.join(),該線程是forkJoinPool執行的任務,每個任務都會執行一次getAssociatedInfo方法,由executorService線程池管理,最多20個線程並行,而後到waitingGet(),因爲它是forkJoin線程,因此會新建一個線程,幫助執行forkJoinPool線程池裏的任務,然而受到executorService線程池數量的制約,即便線程數多了,也不能加快執行,隨着愈來愈多getAssociatedInfo方法的Join,致使了線程數量的飆升,又不能即時釋放,最終致使了OOM的發生dom
猜測:將http線程的任務與forkJoinPool線程池的任務放在同一線程池,這樣每當forkJoinPool線程池新產生一個線程時,都能竊取到任務從而執行,而且隨着線程數量的上升,愈來愈多的任務被執行,這樣就減小了線程建立的數量。最終的結果果真如此異步