源碼角度分析-newFixedThreadPool線程池致使的內存飆升問題

前言

使用無界隊列的線程池會致使內存飆升嗎?面試官常常會問這個問題,本文將基於源碼,去分析newFixedThreadPool線程池致使的內存飆升問題,但願能加深你們的理解。java

內存飆升問題復現

實例代碼

ExecutorService executor = Executors.newFixedThreadPool(10);
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            executor.execute(() -> {
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    //do nothing
                }
            });
        }
複製代碼

配置Jvm參數

IDE指定JVM參數:-Xmx8m -Xms8m : node

執行結果

run以上代碼,會拋出OOM: 面試

JVM OOM問題通常是 建立太多對象,同時 GC 垃圾來不及回收致使的,那麼什麼緣由 致使線程池的OOM呢?帶着發現新大陸的心情,咱們從源碼角度分析這個問題,去找找實例代碼中哪裏創了太多對象。

線程池源碼分析

以上的實例代碼,就一個newFixedThreadPool和一個execute方法。首先,咱們先來看一下newFixedThreadPool方法的源碼算法

newFixedThreadPool源碼

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
複製代碼

該段源碼以及結合線程池特色,咱們能夠知道newFixedThreadPool編程

  • 核心線程數coreSize和最大線程數maximumPoolSize大小同樣,都是nThreads。
  • 空閒時間爲0,即keepAliveTime爲0
  • 阻塞隊列爲無參構造的LinkedBlockingQueue

線程池特色瞭解不是很清楚的朋友,能夠看我這篇文章,面試必備:Java線程池解析bash

接下來,咱們再來看看線程池執行方法execute的源碼。併發

線程池執行方法execute的源碼

execute的源碼以及相關解釋以下:函數

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {   //步驟一:判斷當前正在工做的線程是否比核心線程數量小
            if (addWorker(command, true))    // 以核心線程的身份,添加到工做集合
                return;
            c = ctl.get();
        }
         //步驟二:不知足步驟一,線程池還在RUNNING狀態,阻塞隊列也沒滿的狀況下,把執行任務添加到阻塞隊列workQueue。
        if (isRunning(c) && workQueue.offer(command)) {  
            int recheck = ctl.get();
            //來個double check ,檢查線程池是否忽然被關閉
            if (! isRunning(recheck) && remove(command))  
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //步驟三:若是阻塞隊列也滿了,執行任務以非核心線程的身份,添加到工做集合
        else if (!addWorker(command, false))
            reject(command);
    }
複製代碼

縱觀以上代碼,咱們能夠發現就addWorker 以及workQueue.offer(command) 可能在建立對象。那咱們先分析addWorker方法。oop

addWorker源碼分析

addWorker源碼以及相關解釋以下源碼分析

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            //獲取當前線程池的狀態
            int rs = runStateOf(c);

            //若是線程池狀態是STOP,TIDYING,TERMINATED狀態的話,則會返回false。
            // 若是如今狀態是SHUTDOWN,可是firstTask不爲空或者workQueue爲空的話,那麼直接返回false
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
           //自旋
            for (;;) {
                //獲取當前工做線程的數量
                int wc = workerCountOf(c);
                //判斷線程數量是否符合要求,若是要建立的是核心工做線程,判斷當前工做線程數量是否已經超過coreSize,
               // 若是要建立的是非核心線程,判斷當前工做線程數量是否超過maximumPoolSize,是的話就返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
               //若是線程數量符合要求,就經過CAS算法,將WorkerCount加1,成功就跳出retry自旋
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                  retry inner loop
            }
        }
        //線程啓動標誌
        boolean workerStarted = false;
        //線程添加進集合workers標誌
        boolean workerAdded = false;
        Worker w = null;
        try {
            //由(Runnable 構造Worker對象
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                //獲取線程池的重入鎖
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                   //獲取線程池狀態
                    int rs = runStateOf(ctl.get());
                    //若是狀態知足,將Worker對象添加到workers集合
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) 
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
               //啓動Worker中的線程開始執行任務
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            //線程啓動失敗,執行addWorkerFailed方法
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
複製代碼

addWorker執行流程

大概就是判斷線程池狀態是否OK,若是OK,在判斷當前工做中的線程數量是否知足(小於coreSize/maximumPoolSize),若是不知足,不添加,若是知足,就將執行任務添加到工做集合workers,,並啓動執行該線程。

再看一下workers的類型:

/**
     * Set containing all worker threads in pool. Accessed only when
     * holding mainLock.
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();
複製代碼

workers是一個HashSet集合,它由coreSize/maximumPoolSize控制着,那麼addWorker方法會致使OOM?結合實例代碼demo,coreSize=maximumPoolSize=10,若是超過10,不會再添加到workers了,因此它不是致使newFixedThreadPool內存飆升的緣由。那麼,問題應該就在於workQueue.offer(command) 方法了。爲了讓整個流程清晰,咱們畫一下execute執行的流程圖。

線程池執行方法execute的流程

根據以上execute以及addWork源碼分析,咱們把流程圖畫出來:

  • 提交一個任務command,線程池裏存活的核心線程數小於線程數corePoolSize時,調用addWorker方法,線程池會建立一個核心線程去處理提交的任務。
  • 若是線程池核心線程數已滿,即線程數已經等於corePoolSize,一個新提交的任務,會被放進任務隊列workQueue排隊等待執行。
  • 當線程池裏面存活的線程數已經等於corePoolSize了,而且任務隊列workQueue也滿,判斷線程數是否達到maximumPoolSize,即最大線程數是否已滿,若是沒到達,建立一個非核心線程執行提交的任務。
  • 若是當前的線程數達到了maximumPoolSize,還有新的任務過來的話,直接採用拒絕策略處理 。

看完execute的執行流程,我猜想,內存飆升問題就是workQueue塞滿了。接下來,進行阻塞隊列源碼分析,揭開內存飆升問題的神祕面紗。

阻塞隊列源碼分析

回到newFixedThreadPool構造函數,發現阻塞隊列就是LinkedBlockingQueue,並且是個 無參的LinkedBlockingQueue隊列。OK,那咱們直接分析LinkedBlockingQueue源碼。

LinkedBlockingQueue類圖

由類圖能夠看到:

  • LinkedBlockingQueue 是使用單向鏈表實現的,其有兩個 Node,分別用來存放首、尾節點, 而且還有一個初始值爲 0 的原子變量 count,用來記錄 隊列元素個數。
  • 另外還有兩個 ReentrantLock 的實例,分別用來控制元素入隊和出隊的原 子性,其中 takeLock 用來控制同時只有一個線程能夠從隊列頭獲取元素,其餘線程必須 等待, putLock 控制同時只能有一個線程能夠獲取鎖,在隊列尾部添加元素,其餘線程必 須等待。
  • 另外, notEmpty 和 notFull 是條件變量,它們內部都有一個條件隊列用來存放進 隊和出隊時被阻塞的線程,其實這是生產者一消費者模型。

LinkedBlockingQueue無參構造函數

public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
複製代碼

LinkedBlockingQueue無參構造函數,默認構造Integer.MAX_VALUE(那麼大) 的鏈表,看到這裏,你回想一下execute流程,是否是阻塞隊列一直不會滿了,這隊列來者不拒,把全部阻塞任務收於麾下。。。是否是內存飆升問題水落石出啦。

LinkedBlockingQueue的offer函數

線程池中,插入隊列用了offer方法,咱們來看一下阻塞隊列LinkedBlockingQueue的offer騷操做吧

public boolean offer(E e) {
        //爲空元素則拋出空指針異常
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        //如採當前隊列滿則丟棄將要放入的元素, 而後返回false 
        if (count.get() == capacity)
            return false;
        int c = -1;
        //構造新節點,獲取putLock獨佔鎖
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            //如採隊列不滿則進隊列,並遞增元素計數 
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                //新元素入隊後隊列還有空閒空間,則
                喚醒 notFull 的條件隊列中一條阻塞線程
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            //釋放鎖 
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }
複製代碼

offer操做向隊列尾部插入一個元素,若是隊列中有空閒則插入成功後返回 true,若是隊列己滿 則丟棄當前元素而後返回 false。 若是 e 元素爲 null 則拋出 Nul!PointerException 異常。另外, 該方法是非阻塞的。

內存飆升問題結果揭曉

newFixedThreadPool線程池的核心線程數是固定的,它使用了近乎於無界的LinkedBlockingQueue阻塞隊列。當核心線程用完後,任務會入隊到阻塞隊列,若是任務執行的時間比較長,沒有釋放,會致使愈來愈多的任務堆積到阻塞隊列,最後致使機器的內存使用不停的飆升,形成JVM OOM。

參考與感謝

我的公衆號

  • 若是你是個愛學習的好孩子,能夠關注我公衆號,一塊兒學習討論。
  • 若是你以爲本文有哪些不正確的地方,能夠評論,也能夠關注我公衆號,私聊我,你們一塊兒學習進步哈。
相關文章
相關標籤/搜索