看阿里大牛深刻淺出Java線程池原理分析與使用

在咱們的開發中「池」的概念並不罕見,有數據庫鏈接池、線程池、對象池、常量池等等。下面咱們主要針對線程池來一步一步揭開線程池的面紗。html

使用線程池的好處

一、下降資源消耗java

能夠重複利用已建立的線程下降線程建立和銷燬形成的消耗。程序員

二、提升響應速度面試

當任務到達時,任務能夠不須要等到線程建立就能當即執行。數據庫

三、提升線程的可管理性編程

線程是稀缺資源,若是無限制地建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行統一分配、調優和監控安全

線程池的工做原理

首先咱們看下當一個新的任務提交到線程池以後,線程池是如何處理的性能優化

一、線程池判斷核心線程池裏的線程是否都在執行任務。若是不是,則建立一個新的工做線程來執行任務。若是核心線程池裏的線程都在執行任務,則執行第二步。服務器

二、線程池判斷工做隊列是否已經滿。若是工做隊列沒有滿,則將新提交的任務存儲在這個工做隊列裏進行等待。若是工做隊列滿了,則執行第三步架構

三、線程池判斷線程池的線程是否都處於工做狀態。若是沒有,則建立一個新的工做線程來執行任務。若是已經滿了,則交給飽和策略來處理這個任務

線程池飽和策略

這裏提到了線程池的飽和策略,那咱們就簡單介紹下有哪些飽和策略:

AbortPolicy

爲Java線程池默認的阻塞策略,不執行此任務,並且直接拋出一個運行時異常,切記ThreadPoolExecutor.execute須要try catch,不然程序會直接退出。

DiscardPolicy

直接拋棄,任務不執行,空方法

DiscardOldestPolicy

從隊列裏面拋棄head的一個任務,並再次execute 此task。

CallerRunsPolicy

在調用execute的線程裏面執行此command,會阻塞入口

用戶自定義拒絕策略(最經常使用)

實現RejectedExecutionHandler,並本身定義策略模式

下咱們以ThreadPoolExecutor爲例展現下線程池的工做流程圖

這裏寫圖片描述

這裏寫圖片描述

一、若是當前運行的線程少於corePoolSize,則建立新線程來執行任務(注意,執行這一步驟須要獲取全局鎖)。

二、若是運行的線程等於或多於corePoolSize,則將任務加入BlockingQueue。

三、若是沒法將任務加入BlockingQueue(隊列已滿),則在非corePool中建立新的線程來處理任務(注意,執行這一步驟須要獲取全局鎖)。

四、若是建立新線程將使當前運行的線程超出maximumPoolSize,任務將被拒絕,並調用RejectedExecutionHandler.rejectedExecution()方法。

ThreadPoolExecutor採起上述步驟的整體設計思路,是爲了在執行execute()方法時,儘量地避免獲取全局鎖(那將會是一個嚴重的可伸縮瓶頸)。在ThreadPoolExecutor完成預熱以後(當前運行的線程數大於等於corePoolSize),幾乎全部的execute()方法調用都是執行步驟2,而步驟2不須要獲取全局鎖。

線程池只是併發編程中的一小部分,下圖是史上最全面的Java的併發編程學習技術總彙,在互聯網公司面試中,併發編程必定是面試官會問到的問題,針對面試官通常都會問的問題,我錄製了一些併發編程的底層原理錄像視頻,加羣619881427能夠免費獲取這些錄像,裏面還有分佈式,微服務,性能優化,Spring,MyBatis的等源碼分析知識點的錄像視頻。這些視頻都是我找的一些資深架構師朋友一塊兒錄製的,這些視頻但願可以幫助如下幾類程序員:
1.對如今的薪資不滿,想要跳槽,卻對本身的技術沒有信心,不知道如何面對面試官。

2.想從傳統行業轉行到互聯網行業,但沒有接觸過互聯網技術。

3.工做1 - 5年須要提高本身的核心競爭力,但學習沒有系統化,不知道本身接下來要學什麼纔是正確的,沒有人引導,踩坑後又不知道找誰,百度後依然不知因此然。


4.工做5 - 10年沒法突破技術瓶頸(運用過不少技術,在公司一直寫着業務代碼,卻依然不懂底層實現原理)成爲架構師。
若是你如今正處於我上述所說的幾個階段能夠加個人羣來學習。並且我也可以提供一些面試指導,職業規劃等建議。

 

關鍵方法源碼分析

咱們看看核心方法添加到線程池方法execute的源碼以下:

//
     //Executes the given task sometime in the future.  The task
     //may execute in a new thread or in an existing pooled thread.
     //
     // If the task cannot be submitted for execution, either because this
     // executor has been shutdown or because its capacity has been reached,
     // the task is handled by the current {@code RejectedExecutionHandler}.
     //
     // @param command the task to execute
     // @throws RejectedExecutionException at discretion of
     //         {@code RejectedExecutionHandler}, if the task
     //         cannot be accepted for execution
     // @throws NullPointerException if {@code command} is null
     //
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //
         // Proceed in 3 steps:
         //
         // 1. If fewer than corePoolSize threads are running, try to
         // start a new thread with the given command as its first
         // task.  The call to addWorker atomically checks runState and
         // workerCount, and so prevents false alarms that would add
         // threads when it shouldn't, by returning false.
         // 翻譯以下:
         // 判斷當前的線程數是否小於corePoolSize若是是,使用入參任務經過addWord方法建立一個新的線程,
         // 若是能完成新線程建立exexute方法結束,成功提交任務
         // 2. If a task can be successfully queued, then we still need
         // to double-check whether we should have added a thread
         // (because existing ones died since last checking) or that
         // the pool shut down since entry into this method. So we
         // recheck state and if necessary roll back the enqueuing if
         // stopped, or start a new thread if there are none.
         // 翻譯以下:
         // 在第一步沒有完成任務提交;狀態爲運行而且可否成功加入任務到工做隊列後,再進行一次check,若是狀態
         // 在任務加入隊列後變爲了非運行(有多是在執行到這裏線程池shutdown了),非運行狀態下固然是須要
         // reject;而後再判斷當前線程數是否爲0(有可能這個時候線程數變爲了0),如是,新增一個線程;
         // 3. If we cannot queue task, then we try to add a new
         // thread.  If it fails, we know we are shut down or saturated
         // and so reject the task.
         // 翻譯以下:
         // 若是不能加入任務到工做隊列,將嘗試使用任務新增一個線程,若是失敗,則是線程池已經shutdown或者線程池
         // 已經達到飽和狀態,因此reject這個他任務
         //
        int c = ctl.get();
        // 工做線程數小於核心線程數
        if (workerCountOf(c) < corePoolSize) {
            // 直接啓動新線程,true表示會再次檢查workerCount是否小於corePoolSize
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 若是工做線程數大於等於核心線程數
        // 線程的的狀態未RUNNING而且隊列notfull
        if (isRunning(c) && workQueue.offer(command)) {
            // 再次檢查線程的運行狀態,若是不是RUNNING直接從隊列中移除
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                // 移除成功,拒絕該非運行的任務
                reject(command);
            else if (workerCountOf(recheck) == 0)
                // 防止了SHUTDOWN狀態下沒有活動線程了,可是隊列裏還有任務沒執行這種特殊狀況。
                // 添加一個null任務是由於SHUTDOWN狀態下,線程池再也不接受新任務
                addWorker(null, false);
        }
        // 若是隊列滿了或者是非運行的任務都拒絕執行
        else if (!addWorker(command, false))
            reject(command);
    }

下面咱們繼續看看addWorker是如何實現的:

private boolean addWorker(Runnable firstTask, boolean core) {
        // java標籤
        retry:
        // 死循環
        for (;;) {
            int c = ctl.get();
            // 獲取當前線程狀態
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            // 這個邏輯判斷有點繞能夠改爲 
            // rs >= shutdown && (rs != shutdown || firstTask != null || workQueue.isEmpty())
            // 邏輯判斷成立能夠分爲如下幾種狀況均不接受新任務
            // 一、rs > shutdown:--不接受新任務
            // 二、rs >= shutdown && firstTask != null:--不接受新任務
            // 三、rs >= shutdown && workQueue.isEmppty:--不接受新任務
            // 邏輯判斷不成立
            // 一、rs==shutdown&&firstTask != null:此時不接受新任務,可是仍會執行隊列中的任務
            // 二、rs==shotdown&&firstTask == null:會執行addWork(null,false)
            //  防止了SHUTDOWN狀態下沒有活動線程了,可是隊列裏還有任務沒執行這種特殊狀況。
            //  添加一個null任務是由於SHUTDOWN狀態下,線程池再也不接受新任務
            if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null &&! workQueue.isEmpty()))
                return false;
            // 死循環
            // 若是線程池狀態爲RUNNING而且隊列中還有須要執行的任務
            for (;;) {
                // 獲取線程池中線程數量
                int wc = workerCountOf(c);
                // 若是超出容量或者最大線程池容量不在接受新任務
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 線程安全增長工做線程數
                if (compareAndIncrementWorkerCount(c))
                    // 跳出retry
                    break retry;
                c = ctl.get();  // Re-read ctl
                // 若是線程池狀態發生變化,從新循環
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        // 走到這裏說明工做線程數增長成功
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 加鎖
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);
                    // RUNNING狀態 || SHUTDONW狀態下清理隊列中剩餘的任務
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 檢查線程狀態
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 將新啓動的線程添加到線程池中
                        workers.add(w);
                        // 更新線程池線程數且不超過最大值
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 啓動新添加的線程,這個線程首先執行firstTask,而後不停的從隊列中取任務執行
                if (workerAdded) {
                    //執行ThreadPoolExecutor的runWoker方法
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 線程啓動失敗,則從wokers中移除w並遞減wokerCount
            if (! workerStarted)
                // 遞減wokerCount會觸發tryTerminate方法
                addWorkerFailed(w);
        }
        return workerStarted;
    }

addWorker以後是runWorker,第一次啓動會執行初始化傳進來的任務firstTask;而後會從workQueue中取任務執行,若是隊列爲空則等待keepAliveTime這麼長時間

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 容許中斷
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 若是getTask返回null那麼getTask中會將workerCount遞減,若是異常了這個遞減操做會在processWorkerExit中處理
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

咱們看下getTask是如何執行的

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        // 死循環
        retry: for (;;) {
            // 獲取線程池狀態
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            // 1.rs > SHUTDOWN 因此rs至少等於STOP,這時再也不處理隊列中的任務
            // 2.rs = SHUTDOWN 因此rs>=STOP確定不成立,這時還須要處理隊列中的任務除非隊列爲空
            // 這兩種狀況都會返回null讓runWoker退出while循環也就是當前線程結束了,因此必需要decrement
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // 遞減workerCount值
                decrementWorkerCount();
                return null;
            }
            // 標記從隊列中取任務時是否設置超時時間
            boolean timed; // Are workers subject to culling?
            // 1.RUNING狀態
            // 2.SHUTDOWN狀態,但隊列中還有任務須要執行
            for (;;) {
                int wc = workerCountOf(c);
                // 1.core thread容許被超時,那麼超過corePoolSize的的線程一定有超時
                // 2.allowCoreThreadTimeOut == false && wc >
                // corePoolSize時,通常都是這種狀況,core thread即便空閒也不會被回收,只要超過的線程纔會
                timed = allowCoreThreadTimeOut || wc > corePoolSize;
                // 從addWorker能夠看到通常wc不會大於maximumPoolSize,因此更關心後面半句的情形:
                // 1. timedOut == false 第一次執行循環, 從隊列中取出任務不爲null方法返回 或者
                // poll出異常了重試
                // 2.timeOut == true && timed ==
                // false:看後面的代碼workerQueue.poll超時時timeOut才爲true,
                // 而且timed要爲false,這兩個條件相悖不可能同時成立(既然有超時那麼timed確定爲true)
                // 因此超時不會繼續執行而是return null結束線程。
                if (wc <= maximumPoolSize && !(timedOut && timed))
                    break;
                // workerCount遞減,結束當前thread
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get(); // Re-read ctl
                // 須要從新檢查線程池狀態,由於上述操做過程當中線程池可能被SHUTDOWN
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
            try {
                // 1.以指定的超時時間從隊列中取任務
                // 2.core thread沒有超時
                Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;// 超時
            } catch (InterruptedException retry) {
                timedOut = false;// 線程被中斷重試
            }
        }
    }

下面咱們看下processWorkerExit是如何工做的

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 正常的話再runWorker的getTask方法workerCount已經被減一了
        if (completedAbruptly)
            decrementWorkerCount();
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 累加線程的completedTasks
            completedTaskCount += w.completedTasks;
            // 從線程池中移除超時或者出現異常的線程
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        // 嘗試中止線程池
        tryTerminate();
        int c = ctl.get();
        // runState爲RUNNING或SHUTDOWN
        if (runStateLessThan(c, STOP)) {
            // 線程不是異常結束
            if (!completedAbruptly) {
                // 線程池最小空閒數,容許core thread超時就是0,不然就是corePoolSize
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // 若是min == 0可是隊列不爲空要保證有1個線程來執行隊列中的任務
                if (min == 0 && !workQueue.isEmpty())
                    min = 1;
                // 線程池還不爲空那就不用擔憂了
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 1.線程異常退出
            // 2.線程池爲空,可是隊列中還有任務沒執行,看addWoker方法對這種狀況的處理
            addWorker(null, false);
        }
    }

tryTerminate

processWorkerExit方法中會嘗試調用tryTerminate來終止線程池。這個方法在任何可能致使線程池終止的動做後執行:好比減小wokerCount或SHUTDOWN狀態下從隊列中移除任務。

final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            // 如下狀態直接返回:
            // 1.線程池還處於RUNNING狀態
            // 2.SHUTDOWN狀態可是任務隊列非空
            // 3.runState >= TIDYING 線程池已經中止了或在中止了
            if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
                return;
            // 只能是如下情形會繼續下面的邏輯:結束線程池。
            // 1.SHUTDOWN狀態,這時再也不接受新任務並且任務隊列也空了
            // 2.STOP狀態,當調用了shutdownNow方法
            // workerCount不爲0則還不能中止線程池,並且這時線程都處於空閒等待的狀態
            // 須要中斷讓線程「醒」過來,醒過來的線程才能繼續處理shutdown的信號。
            if (workerCountOf(c) != 0) { // Eligible to terminate
                // runWoker方法中w.unlock就是爲了能夠被中斷,getTask方法也處理了中斷。
                // ONLY_ONE:這裏只須要中斷1個線程去處理shutdown信號就能夠了。
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 進入TIDYING狀態
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // 子類重載:一些資源清理工做
                        terminated();
                    } finally {
                        // TERMINATED狀態
                        ctl.set(ctlOf(TERMINATED, 0));
                        // 繼續awaitTermination
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

shutdown這個方法會將runState置爲SHUTDOWN,會終止全部空閒的線程。shutdownNow方法將runState置爲STOP。和shutdown方法的區別,這個方法會終止全部的線程。主要區別在於shutdown調用的是interruptIdleWorkers這個方法,而shutdownNow實際調用的是Worker類的interruptIfStarted方法:

他們的實現以下:

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 線程池狀態設爲SHUTDOWN,若是已經至少是這個狀態那麼則直接返回
            advanceRunState(SHUTDOWN);
            // 注意這裏是中斷全部空閒的線程:runWorker中等待的線程被中斷 → 進入processWorkerExit →
            // tryTerminate方法中會保證隊列中剩餘的任務獲得執行。
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // STOP狀態:再也不接受新任務且再也不執行隊列中的任務。
        advanceRunState(STOP);
        // 中斷全部線程
        interruptWorkers();
        // 返回隊列中尚未被執行的任務。
        tasks = drainQueue();
    }
    finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            // w.tryLock能獲取到鎖,說明該線程沒有在運行,由於runWorker中執行任務會先lock,
            // 所以保證了中斷的確定是空閒的線程。
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    }
    finally {
        mainLock.unlock();
    }
}
void interruptIfStarted() {
    Thread t;
    // 初始化時state == -1
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}

線程池的使用

線程池的建立

咱們能夠經過ThreadPoolExecutor來建立一個線程池

/**
     * @param corePoolSize 線程池基本大小,核心線程池大小,活動線程小於corePoolSize則直接建立,大於等於則先加到workQueue中,
     * 隊列滿了才建立新的線程。當提交一個任務到線程池時,線程池會建立一個線程來執行任務,即便其餘空閒的基本線程可以執行新任務也會建立線程,
     * 等到須要執行的任務數大於線程池基本大小時就再也不建立。若是調用了線程池的prestartAllCoreThreads()方法,
     * 線程池會提早建立並啓動全部基本線程。
     * @param maximumPoolSize 最大線程數,超過就reject;線程池容許建立的最大線程數。若是隊列滿了,
     * 而且已建立的線程數小於最大線程數,則線程池會再建立新的線程執行任務
     * @param keepAliveTime
     * 線程池的工做線程空閒後,保持存活的時間。因此,若是任務不少,而且每一個任務執行的時間比較短,能夠調大時間,提升線程的利用率
     * @param unit  線程活動保持時間的單位):可選的單位有天(DAYS)、小時(HOURS)、分鐘(MINUTES)、
     * 毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和納秒(NANOSECONDS,千分之一微秒)
     * @param workQueue 工做隊列,線程池中的工做線程都是從這個工做隊列源源不斷的獲取任務進行執行
     */
    public ThreadPoolExecutor(int corePoolSize,
               int maximumPoolSize,
               long keepAliveTime,
               TimeUnit unit,
               BlockingQueue<Runnable> workQueue) {
        // threadFactory用於設置建立線程的工廠,能夠經過線程工廠給每一個建立出來的線程設置更有意義的名字
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                Executors.defaultThreadFactory(), defaultHandler);
    }

向線程池提交任務

可使用兩個方法向線程池提交任務,分別爲execute()和submit()方法。execute()方法用於提交不須要返回值的任務,因此沒法判斷任務是否被線程池執行成功。經過如下代碼可知execute()方法輸入的任務是一個Runnable類的實例。

threadsPool.execute(new Runnable() {
        @Override
        public void run() {
        }
    });

submit()方法用於提交須要返回值的任務。線程池會返回一個future類型的對象,經過這個future對象能夠判斷任務是否執行成功,而且能夠經過future的get()方法來獲取返回值,get()方法會阻塞當前線程直到任務完成,而使用get(long timeout,TimeUnit unit)方法則會阻塞當前線程一段時間後當即返回,這時候有可能任務沒有執行完。

Future<Object> future = executor.submit(harReturnValuetask);
  try
    {
        Object s = future.get();
    }catch(
    InterruptedException e)
    {
        // 處理中斷異常
    }catch(
    ExecutionException e)
    {
        // 處理沒法執行任務異常
    }finally
    {
        // 關閉線程池
        executor.shutdown();
    }

關閉線程池

能夠經過調用線程池的shutdown或shutdownNow方法來關閉線程池。它們的原理是遍歷線程池中的工做線程,而後逐個調用線程的interrupt方法來中斷線程,因此沒法響應中斷的任務可能永遠沒法終止。可是它們存在必定的區別,shutdownNow首先將線程池的狀態設置成STOP,而後嘗試中止全部的正在執行或暫停任務的線程,並返回等待執行任務的列表,而shutdown只是將線程池的狀態設置成SHUTDOWN狀態,而後中斷全部沒有正在執行任務的線程。

只要調用了這兩個關閉方法中的任意一個,isShutdown方法就會返回true。當全部的任務都已關閉後,才表示線程池關閉成功,這時調用isTerminaed方法會返回true。至於應該調用哪種方法來關閉線程池,應該由提交到線程池的任務特性決定,一般調用shutdown方法來關閉線程池,若是任務不必定要執行完,則能夠調用shutdownNow方法。

合理的配置線程池

要想合理地配置線程池,就必須首先分析任務特性,能夠從如下幾個角度來分析。

一、任務的性質:CPU密集型任務、IO密集型任務和混合型任務。

二、任務的優先級:高、中和低。

三、任務的執行時間:長、中和短。

四、任務的依賴性:是否依賴其餘系統資源,如數據庫鏈接。

性質不一樣的任務能夠用不一樣規模的線程池分開處理。CPU密集型任務應配置儘量小的線程,如配置Ncpu+1個線程的線程池。因爲IO密集型任務線程並非一直在執行任務,則應配置儘量多的線程,如2*Ncpu。混合型的任務,若是能夠拆分,將其拆分紅一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐量將高於串行執行的吞吐量。若是這兩個任務執行時間相差太大,則不必進行分解。能夠經過Runtime.getRuntime().availableProcessors()方法得到當前設備的CPU個數。優先級不一樣的任務可使用優先級隊列PriorityBlockingQueue來處理。它可讓優先級高的任務先執行

若是一直有優先級高的任務提交到隊列裏,那麼優先級低的任務可能永遠不能執行。執行時間不一樣的任務能夠交給不一樣規模的線程池來處理,或者可使用優先級隊列,讓執行時間短的任務先執行。依賴數據庫鏈接池的任務,由於線程提交SQL後須要等待數據庫返回結果,等待的時間越長,則CPU空閒時間就越長,那麼線程數應該設置得越大,這樣才能更好地利用CPU。

建議使用有界隊列。有界隊列能增長系統的穩定性和預警能力,能夠根據須要設大一點兒,好比幾千。有時候咱們系統裏後臺任務線程池的隊列和線程池全滿了,不斷拋出拋棄任務的異常,經過排查發現是數據庫出現了問題,致使執行SQL變得很是緩慢,由於後臺任務線程池裏的任務全是須要向數據庫查詢和插入數據的,因此致使線程池裏的工做線程所有阻塞,任務積壓在線程池裏。若是當時咱們設置成無界隊列,那麼線程池的隊列就會愈來愈多,有可能會撐滿內存,致使整個系統不可用,而不僅是後臺任務出現問題。固然,咱們的系統全部的任務是用單獨的服務器部署的,咱們使用不一樣規模的線程池完成不一樣類型的任務,可是出現這樣問題時也會影響到其餘任務。

線程池的監控

若是在系統中大量使用線程池,則有必要對線程池進行監控,方便在出現問題時,能夠根據線程池的使用情況快速定位問題。能夠經過線程池提供的參數進行監控,在監控線程池的時候可使用如下屬性

  • taskCount:線程池須要執行的任務數量。
  • completedTaskCount:線程池在運行過程當中已完成的任務數量,小於或等於taskCount。
  • largestPoolSize:線程池裏曾經建立過的最大線程數量。經過這個數據能夠知道線程池是否曾經滿過。如該數值等於線程池的最大大小,則表示線程池曾經滿過。
  • getPoolSize:線程池的線程數量。若是線程池不銷燬的話,線程池裏的線程不會自動銷燬,因此這個大小隻增不減。
  • getActiveCount:獲取活動的線程數。

經過擴展線程池進行監控。能夠經過繼承線程池來自定義線程池,重寫線程池的beforeExecute、afterExecute和terminated方法,也能夠在任務執行前、執行後和線程池關閉前執行一些代碼來進行監控。例如,監控任務的平均執行時間、最大執行時間和最小執行時間等。

相關文章
相關標籤/搜索