Java中關於CompletableFuture類的思考

最近遇到了一個OOM的問題,提示的是沒法建立更多的線程,定位問題,發現是相似下面的這段代碼出現了問題,用JConsole監測,發現某一時段線程數量突然飆升,由此引起了下面的思考bash

情景再現

public class DemoController {
    private ExecutorService executorService = Executors.newWorkStealingPool(20);

    @RequestMapping("/test")
    public String test() {
        ExecutorService forkJoinPool = Executors.newWorkStealingPool(10);
        CompletableFuture[] completableFutures = new CompletableFuture[600];
        for (int i = 0; i < 600; i++) {
            int j = i;
            completableFutures[i] = CompletableFuture.runAsync(() -> {
                getAssociatedInfo(forkJoinPool);
            }, forkJoinPool);
        }
        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(completableFutures);
        voidCompletableFuture.join();
        return "OK";
    }

    public String getAssociatedInfo(ExecutorService service) {
        CompletableFuture<String> trialAssociatedInfoCompletableFuture
                = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("按理說你已在運行,不是嗎");
                TimeUnit.SECONDS.sleep(100);
                System.out.println("你已經完成了");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "a";
        }, executorService);
        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(trialAssociatedInfoCompletableFuture);
        voidCompletableFuture.join();
        return "ok";
    }
}
複製代碼

在這段代碼中,http線程啓600個任務,使用自定義的線程池併發數量爲10個。每個任務啓一個子任務,使用類定義的線程池,併發數量爲20個。在個人理解中,按理說最多多三十多個線程數量纔對,可是短期內竟然飆升好幾百,那麼這幾百個線程究竟是如何產生的呢?在研究了源代碼以後,終於理解了其中的奧祕。

代碼探究

completableFutures[i] = CompletableFuture.runAsync(() -> {
                    getAssociatedInfo(forkJoinPool);
                }, forkJoinPool);
複製代碼

這一句的做用是啓異步任務,交由forkJoinPool線程池管理,當線程池數量不足10個時,啓動一個線程,當即執行,當超過10個時,加入任務隊列。多線程

CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(completableFutures);
複製代碼

allOf的做用是遞歸地構造完成樹,彙總並返回成一個總任務,以下圖所示: 併發

這裏的任務一、任務2等就是咱們前面定義的任務,每一個任務對象存儲着一個結果,當最終任務有結果時,必需要下面的彙總任務都有結果,進而每個定義的任務都要有結果,通俗來講,就是對voidCompletableFuture的管理即爲對全部定義任務的管理。

// 從多線程的角度,若任務未完成,會阻塞
    voidCompletableFuture.join();
    return "OK";
CompletableFuture->join():
    return reportJoin((r = result) == null ? waitingGet(false) : r);
CompletableFuture->waitingGet():
    Signaller q = null;
    boolean queued = false;
    int spins = -1;
    Object r;
    // 當返回任務不爲空,循環結束
    while ((r = result) == null) {
        if (spins < 0)
            spins = (Runtime.getRuntime().availableProcessors() > 1) ?
                1 << 8 : 0; // Use brief spin-wait on multiprocessors
        else if (spins > 0) {
            if (ThreadLocalRandom.nextSecondarySeed() >= 0)
                --spins;
        }
        else if (q == null)
            // 實例化一個信號量 --1
            q = new Signaller(interruptible, 0L, 0L);
        else if (!queued)
            queued = tryPushStack(q);
        else if (interruptible && q.interruptControl < 0) {
            q.thread = null;
            cleanStack();
            return null;
        }
        else if (q.thread != null && result == null) {
            try {
                // 若遲遲沒有返回結果,最終會走到這個方法中,下面是ForkJoinPool對信號量的管理
                ForkJoinPool.managedBlock(q);
            } catch (InterruptedException ie) {
                q.interruptControl = -1;
            }
        }
    }
ForkJoinPool->managedBlock():
    Thread t = Thread.currentThread();
    if ((t instanceof ForkJoinWorkerThread) &&
        (p = (wt = (ForkJoinWorkerThread)t).pool) != null) {
        WorkQueue w = wt.workQueue;
        while (!blocker.isReleasable()) {
            // 
            if (p.tryCompensate(w)) {   // --2
                try {
                    do {} while (!blocker.isReleasable() &&
                                 !blocker.block());
                } finally {
                    U.getAndAddLong(p, CTL, AC_UNIT);
                }
                break;
            }
        }
    }
    else {
        do {} while (!blocker.isReleasable() &&
                     !blocker.block());
    }
ForkJoinPool->tryCompensate():  // --2
     canBlock = add && createWorker(); // throws on exception
複製代碼
  • 獲取當前線程,判斷其類型,若是當前線程是非forkJoin線程的話,那麼走else方法,直到返回結果爲止;
  • 若是是forkJoin線程的話,當執行2處的代碼時,還會進行一系列複雜的判斷,若仍然遲遲得不到返回結果,會新建一個線程,幫助執行線程池裏的任務。多出來的那幾百個線程確實出自於此;
CompletableFuture->Signaller->Signaller():  // --1
    Signaller(boolean interruptible, long nanos, long deadline) {
        // thread變量是當前線程
         this.thread = Thread.currentThread();
         this.interruptControl = interruptible ? 1 : 0;
         this.nanos = nanos;
         this.deadline = deadline;
    }
複製代碼

反思

到第一個voidCompletableFuture.join(),該線程是http線程,由forkJoinPool線程池管理,最多10個線程並行,而後到waitingGet(),因爲其不是forkJoin線程,所以走的是else方法app

到第二個voidCompletableFuture.join(),該線程是forkJoinPool執行的任務,每個任務都會執行一次getAssociatedInfo方法,由executorService線程池管理,最多20個線程並行,而後到waitingGet(),因爲它是forkJoin線程,因此會新建一個線程,幫助執行forkJoinPool線程池裏的任務,然而受到executorService線程池數量的制約,即便線程數多了,也不能加快執行,隨着愈來愈多getAssociatedInfo方法的Join,致使了線程數量的飆升,又不能即時釋放,最終致使了OOM的發生dom

解決方案

猜測:將http線程的任務與forkJoinPool線程池的任務放在同一線程池,這樣每當forkJoinPool線程池新產生一個線程時,都能竊取到任務從而執行,而且隨着線程數量的上升,愈來愈多的任務被執行,這樣就減小了線程建立的數量。最終的結果果真如此異步

相關文章
相關標籤/搜索