10問10答:你真的瞭解線程池嗎?

簡介: 《Java開發手冊》中強調,線程資源必須經過線程池提供,而建立線程池必須使用ThreadPoolExecutor。手冊主要強調利用線程池避免兩個問題,一是線程過渡切換,二是避免請求過多時形成OOM。可是若是參數配置錯誤,仍是會引起上面的兩個問題。因此本節咱們主要是討論ThreadPoolExecutor的一些技術細節,而且給出幾個經常使用的最佳實踐建議。緩存

image.png

做者 | 風樓
來源 | 阿里技術公衆號服務器

《Java開發手冊》中強調,線程資源必須經過線程池提供,而建立線程池必須使用ThreadPoolExecutor。手冊主要強調利用線程池避免兩個問題,一是線程過渡切換,二是避免請求過多時形成OOM。可是若是參數配置錯誤,仍是會引起上面的兩個問題。因此本節咱們主要是討論ThreadPoolExecutor的一些技術細節,而且給出幾個經常使用的最佳實踐建議。多線程

我在查找資料的過程當中,發現有些問題存在爭議。後面發現,一部分緣由是由於不一樣JDK版本的現實是有差別的。所以,下面的分析是基於當下最經常使用的版本JDK1.8,而且對於存在爭議的問題,咱們分析源碼,源碼纔是最準確的。併發

1 corePoolSize=0會怎麼樣

這是一個爭議點。我發現大部分博文,不管是國內的仍是國外的,都是這樣回答這個問題的:框架

  • 提交任務後,先判斷當前池中線程數是否小於corePoolSize,若是小於,則建立新線程執行這個任務。
  • 否者,判斷等待隊列是否已滿,若是沒有滿,則添加到等待隊列。
  • 否者,判斷當前池中線程數是否大於maximumPoolSize,若是大於則拒絕。
  • 否者,建立一個新的線程執行這個任務。

按照上面的描述,若是corePoolSize=0,則會判斷等待隊列的容量,若是還有容量,則排隊,而且不會建立新的線程。異步

—— 但其實,這是老版本的實現方式,從1.6以後,實現方式就變了。咱們直接看execute的源碼(submit也依賴它),我備註出了關鍵一行:ide

int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 注意這一行代碼,添加到等待隊列成功後,判斷當前池內線程數是否爲0,若是是則建立一個firstTask爲null的worker,這個worker會從等待隊列中獲取任務並執行。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
  • 線程池提交任務後,首先判斷當前池中線程數是否小於corePoolSize。
  • 若是小於則嘗試建立新的線程執行該任務;不然嘗試添加到等待隊列。
  • 若是添加隊列成功,判斷當前池內線程數是否爲0,若是是則建立一個firstTask爲null的worker,這個worker會從等待隊列中獲取任務並執行。
  • 若是添加到等待隊列失敗,通常是隊列已滿,纔會再嘗試建立新的線程。
  • 但在建立以前須要與maximumPoolSize比較,若是小於則建立成功。
  • 不然執行拒絕策略。

函數

上述問題需區分JDK版本。在1.6版本以後,若是corePoolSize=0,提交任務時若是線程池爲空,則會當即建立一個線程來執行任務(先排隊再獲取);若是提交任務的時候,線程池不爲空,則先在等待隊列中排隊,只有隊列滿了纔會建立新線程。工具

因此,優化在於,在隊列沒有滿的這段時間內,會有一個線程在消費提交的任務;1.6以前的實現是,必須等隊列滿了以後,纔開始消費。優化

2 線程池建立以後,會當即建立核心線程麼

以前有人問過我這個問題,由於他發現應用中有些Bean建立了線程池,可是這個Bean通常狀況下用不到,因此諮詢我是否須要把這個線程池註釋掉,以減小應用運行時的線程數(該應用運行時線程過多。)

不會。從上面的源碼能夠看出,在剛剛建立ThreadPoolExecutor的時候,線程並不會當即啓動,而是要等到有任務提交時纔會啓動,除非調用了prestartCoreThread/prestartAllCoreThreads事先啓動核心線程。

  • prestartCoreThread:Starts a core thread, causing it to idly wait for work. This overrides the default policy of starting core threads only when new tasks are executed.
  • prestartAllCoreThreads:Starts all core threads.

3 核心線程永遠不會銷燬麼

這個問題有點tricky。首先咱們要明確一下概念,雖然在JavaDoc中也使用了「core/non-core threads」這樣的描述,但其實這是一個動態的概念,JDK並無給一部分線程打上「core」的標記,作什麼特殊化的處理。這個問題我認爲想要探討的是閒置線程終結策略的問題。

在JDK1.6以前,線程池會盡可能保持corePoolSize個核心線程,即便這些線程閒置了很長時間。這一點曾被開發者詬病,因此從JDK1.6開始,提供了方法allowsCoreThreadTimeOut,若是傳參爲true,則容許閒置的核心線程被終止。

請注意這種策略和corePoolSize=0的區別。我總結的區別是:

  • corePoolSize=0:在通常狀況下只使用一個線程消費任務,只有當併發請求特別多、等待隊列都滿了以後,纔開始用多線程。
  • allowsCoreThreadTimeOut=true && corePoolSize>1:在通常狀況下就開始使用多線程(corePoolSize個),當併發請求特別多,等待隊列都滿了以後,繼續加大線程數。可是當請求沒有的時候,容許核心線程也終止。

因此corePoolSize=0的效果,基本等同於allowsCoreThreadTimeOut=true && corePoolSize=1,但實現細節其實不一樣。

在JDK1.6以後,若是allowsCoreThreadTimeOut=true,核心線程也能夠被終止。

4 如何保證線程不被銷燬

首先咱們要明確一下線程池模型。線程池有個內部類Worker,它實現了Runnable接口,首先,它本身要run起來。而後它會在合適的時候獲取咱們提交的Runnable任務,而後調用任務的run()接口。一個Worker不終止的話能夠不斷執行任務。

咱們前面說的「線程池中的線程」,其實就是Worker;等待隊列中的元素,是咱們提交的Runnable任務。

每個Worker在建立出來的時候,會調用它自己的run()方法,實現是runWorker(this),這個實現的核心是一個while循環,這個循環不結束,Worker線程就不會終止,就是這個基本邏輯。

  • 在這個while條件中,有個getTask()方法是核心中的核心,它所作的事情就是從等待隊列中取出任務來執行:
  • 若是沒有達到corePoolSize,則建立的Worker在執行完它承接的任務後,會用workQueue.take()取任務、注意,這個接口是阻塞接口,若是取不到任務,Worker線程一直阻塞。
  • 若是超過了corePoolSize,或者allowCoreThreadTimeOut,一個Worker在空閒了以後,會用workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)取任務。注意,這個接口只阻塞等待keepAliveTime時間,超過這個時間返回null,則Worker的while循環執行結束,則被終止了。
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 看這裏,核心邏輯在這裏
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 注意,核心中的核心在這裏
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

實現方式很是巧妙,核心線程(Worker)即便一直空閒也不終止,是經過workQueue.take()實現的,它會一直阻塞到從等待隊列中取到新的任務。非核心線程空閒指定時間後終止是經過workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)實現的,一個空閒的Worker只等待keepAliveTime,若是尚未取到任務則循環終止,線程也就運行結束了。

引伸思考

Worker自己就是個線程,它再調用咱們傳入的Runnable.run(),會啓動一個子線程麼?若是你尚未答案,再回想一下Runnable和Thread的關係。

5 空閒線程過多會有什麼問題

籠統地回答是會佔用內存,咱們分析一下佔用了哪些內存。首先,比較普通的一部分,一個線程的內存模型:

  • 虛擬機棧
  • 本地方法棧
  • 程序計數器

我想額外強調是下面這幾個內存佔用,須要當心:

  • ThreadLocal:業務代碼是否使用了ThreadLocal?就算沒有,Spring框架中也大量使用了ThreadLocal,你所在公司的框架可能也是同樣。
  • 局部變量:線程處於阻塞狀態,確定還有棧幀沒有出棧,棧幀中有局部變量表,凡是被局部變量表引用的內存都不能回收。因此若是這個線程建立了比較大的局部變量,那麼這一部份內存沒法GC。
  • TLAB機制:若是你的應用線程數處於高位,那麼新的線程初始化可能由於Eden沒有足夠的空間分配TLAB而觸發YoungGC。

線程池保持空閒的核心線程是它的默認配置,通常來說是沒有問題的,由於它佔用的內存通常不大。怕的就是業務代碼中使用ThreadLocal緩存的數據過大又不清理。

若是你的應用線程數處於高位,那麼須要觀察一下YoungGC的狀況,估算一下Eden大小是否足夠。若是不夠的話,可能要謹慎地建立新線程,而且讓空閒的線程終止;必要的時候,可能須要對JVM進行調參。

6 keepAliveTime=0會怎麼樣

這也是個爭議點。有的博文說等於0表示空閒線程永遠不會終止,有的說表示執行完馬上終止。還有的說等於-1表示空閒線程永遠不會終止。其實稍微看一下源碼知道了,這裏我直接拋出答案。

在JDK1.8中,keepAliveTime=0表示非核心線程執行完馬上終止。

默認狀況下,keepAliveTime小於0,初始化的時候纔會報錯;但若是allowsCoreThreadTimeOut,keepAliveTime必須大於0,否則初始化報錯。

7 怎麼進行異常處理

不少代碼的寫法,咱們都習慣按照常見範式去編寫,而沒有去思考爲何。好比:

  • 若是咱們使用execute()提交任務,咱們通常要在Runable任務的代碼加上try-catch進行異常處理。
  • 若是咱們使用submit()提交任務,咱們通常要在主線程中,對Future.get()進行try-catch進行異常處理。

—— 可是在上面,我提到過,submit()底層實現依賴execute(),二者應該統一呀,爲何有差別呢?下面再扒一扒submit()的源碼,它的實現蠻有意思。

首先,ThreadPoolExecutor中沒有submit的代碼,而是在它的父類AbstractExecutorService中,有三個submit的重載方法,代碼很是簡單,關鍵代碼就兩行:

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

正是由於這三個重載方法,都調用了execute,因此我才說submit底層依賴execute。經過查看這裏execute的實現,咱們不難發現,它就是ThreadPoolExecutor中的實現,因此,形成submit和execute的差別化的代碼,不在這。那麼形成差別的必定在newTaskFor方法中。這個方法也就new了一個FutureTask而已,FutureTask實現RunnableFuture接口,RunnableFuture接口繼承Runnable接口和Future接口。而Callable只是FutureTask的一個成員變量。

因此講到這裏,就有另外一個Java基礎知識點:Callable和Future的關係。咱們通常用Callable編寫任務代碼,Future是異步返回對象,經過它的get方法,阻塞式地獲取結果。FutureTask的核心代碼就是實現了Future接口,也就是get方法的實現:

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        // 核心代碼
        s = awaitDone(false, 0L);
    return report(s);
}

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    // 死循環
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        // 只有任務的狀態是’已完成‘,纔會跳出死循環
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}

get的核心實現是有個awaitDone方法,這是一個死循環,只有任務的狀態是「已完成」,纔會跳出死循環;不然會依賴UNSAFE包下的LockSupport.park原語進行阻塞,等待LockSupport.unpark信號量。而這個信號量只有當運行結束得到結果、或者出現異常的狀況下,纔會發出來。分別對應方法set和setException。這就是異步執行、阻塞獲取的原理,扯得有點遠了。

回到最初咱們的疑問,爲何submit以後,經過get方法能夠獲取到異常?緣由是FutureTask有一個Object類型的outcome成員變量,用來記錄執行結果。這個結果能夠是傳入的泛型,也能夠是Throwable異常:

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

// get方法中依賴的,報告執行結果

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

FutureTask的另外一個巧妙的地方就是借用RunnableAdapter內部類,將submit的Runnable封裝成Callable。因此就算你submit的是Runnable,同樣能夠用get獲取到異常。

  • 不管是用execute仍是submit,均可以本身在業務代碼上加try-catch進行異常處理。我通常喜歡使用這種方式,由於我喜歡對不一樣業務場景的異常進行差別化處理,至少打不同的日誌吧。
  • 若是是execute,還能夠自定義線程池,繼承ThreadPoolExecutor並複寫其afterExecute(Runnable r, Throwable t)方法。
  • 或者實現Thread.UncaughtExceptionHandler接口,實現void uncaughtException(Thread t, Throwable e);方法,並將該handler傳遞給線程池的ThreadFactory。
  • 可是注意,afterExecute和UncaughtExceptionHandler都不適用submit。由於經過上面的FutureTask.run()不難發現,它本身對Throwable進行了try-catch,封裝到了outcome屬性,因此底層方法execute的Worker是拿不到異常信息的。

8 線程池需不須要關閉

通常來說,線程池的生命週期跟隨服務的生命週期。若是一個服務(Service)中止服務了,那麼須要調用shutdown方法進行關閉。因此ExecutorService.shutdown在Java以及一些中間件的源碼中,是封裝在Service的shutdown方法內的。

若是是Server端不重啓就不中止提供服務,我認爲是不須要特殊處理的。

9 shutdown和shutdownNow的區別

shutdown => 平緩關閉,等待全部已添加到線程池中的任務執行完再關閉。
shutdownNow => 馬上關閉,中止正在執行的任務,並返回隊列中未執行的任務。
原本想分析一下二者的源碼的,可是發現本文的篇幅已通過長了,源碼也貼了很多。感興趣的朋友本身看一下便可。

10 Spring中有哪些和ThreadPoolExecutor相似的工具

image.png

這裏我想着重強調的就是SimpleAsyncTaskExecutor,Spring中使用的@Async註解,底層就是基於SimpleAsyncTaskExecutor去執行任務,只不過它不是線程池,而是每次都新開一個線程。

另外想要強調的是Executor接口。Java初學者容易想固然的覺得Executor結尾的類就是一個線程池,而上面的都是反例。咱們能夠在JDK的execute方法上看到這個註釋:

/**

  • Executes the given command at some time in the future. The command
  • may execute in a new thread, in a pooled thread, or in the calling
  • thread, at the discretion of the {@code Executor} implementation.
    */
    因此,它的職責並非提供一個線程池的接口,而是提供一個「未來執行命令」的接口。真正能表明線程池意義的,是ThreadPoolExecutor類,而不是Executor接口。

最佳實踐總結
【強制】使用ThreadPoolExecutor的構造函數聲明線程池,避免使用Executors類的 newFixedThreadPool和newCachedThreadPool。
【強制】 建立線程或線程池時請指定有意義的線程名稱,方便出錯時回溯。即threadFactory參數要構造好。
【建議】建議不一樣類別的業務用不一樣的線程池。
【建議】CPU密集型任務(N+1):這種任務消耗的主要是CPU資源,能夠將線程數設置爲N(CPU核心數)+1,比CPU核心數多出來的一個線程是爲了防止線程偶發的缺頁中斷,或者其它緣由致使的任務暫停而帶來的影響。一旦任務暫停,CPU就會處於空閒狀態,而在這種狀況下多出來的一個線程就能夠充分利用CPU的空閒時間。
【建議】I/O密集型任務(2N):這種任務應用起來,系統會用大部分的時間來處理I/O交互,而線程在處理I/O的時間段內不會佔用CPU來處理,這時就能夠將CPU交出給其它線程使用。所以在I/O密集型任務的應用中,咱們能夠多配置一些線程,具體的計算方法是2N。
【建議】workQueue不要使用無界隊列,儘可能使用有界隊列。避免大量任務等待,形成OOM。
【建議】若是是資源緊張的應用,使用allowsCoreThreadTimeOut能夠提升資源利用率。
【建議】雖然使用線程池有多種異常處理的方式,但在任務代碼中,使用try-catch最通用,也能給不一樣任務的異常處理作精細化。
【建議】對於資源緊張的應用,若是擔憂線程池資源使用不當,能夠利用ThreadPoolExecutor的API實現簡單的監控,而後進行分析和優化。
image.png

線程池初始化示例:

private static final ThreadPoolExecutor pool;

static {
    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("po-detail-pool-%d").build();
    pool = new ThreadPoolExecutor(4, 8, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(512),
        threadFactory, new ThreadPoolExecutor.AbortPolicy());
    pool.allowCoreThreadTimeOut(true);
}
  • threadFactory:給出帶業務語義的線程命名。
  • corePoolSize:快速啓動4個線程處理該業務,是足夠的。
  • maximumPoolSize:IO密集型業務,個人服務器是4C8G的,因此4*2=8。
  • keepAliveTime:服務器資源緊張,讓空閒的線程快速釋放。
  • pool.allowCoreThreadTimeOut(true):也是爲了在能夠的時候,讓線程釋放,釋放資源。
  • workQueue:一個任務的執行時長在100~300ms,業務高峯期8個線程,按照10s超時(已經很高了)。10s鍾,8個線程,能夠處理10 1000ms / 200ms 8 = 400個任務左右,往上再取一點,512已經不少了。
  • handler:極端狀況下,一些任務只能丟棄,保護服務端。

原文連接

本文爲阿里雲原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索