線程池ThreadPoolExecutor實現原理

1. 爲何要使用線程池

在實際使用中,線程是很佔用系統資源的,若是對線程管理不善很容易致使系統問題。所以,在大多數併發框架中都會使用線程池來管理線程,使用線程池管理線程主要有以下好處:數據庫

  1. 下降資源消耗。經過複用已存在的線程和下降線程關閉的次數來儘量下降系統性能損耗;
  2. 提高系統響應速度。經過複用線程,省去建立線程的過程,所以總體上提高了系統的響應速度;
  3. 提升線程的可管理性。線程是稀缺資源,若是無限制的建立,不只會消耗系統資源,還會下降系統的穩定性,所以,須要使用線程池來管理線程。

2. 線程池的工做原理

當一個併發任務提交給線程池,線程池分配線程去執行任務的過程以下圖所示:編程

線程池執行流程圖.jpg

從圖能夠看出,線程池執行所提交的任務過程主要有這樣幾個階段:緩存

  1. 先判斷線程池中核心線程池全部的線程是否都在執行任務。若是不是,則新建立一個線程執行剛提交的任務,不然,核心線程池中全部的線程都在執行任務,則進入第2步;
  2. 判斷當前阻塞隊列是否已滿,若是未滿,則將提交的任務放置在阻塞隊列中;不然,則進入第3步;
  3. 判斷線程池中全部的線程是否都在執行任務,若是沒有,則建立一個新的線程來執行任務,不然,則交給飽和策略進行處理

3. 線程池的建立

建立線程池主要是ThreadPoolExecutor類來完成,ThreadPoolExecutor的有許多重載的構造方法,經過參數最多的構造方法來理解建立線程池有哪些須要配置的參數。ThreadPoolExecutor的構造方法爲:併發

ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
複製代碼

下面對參數進行說明:框架

  1. corePoolSize:表示核心線程池的大小。當提交一個任務時,若是當前核心線程池的線程個數沒有達到corePoolSize,則會建立新的線程來執行所提交的任務,即便當前核心線程池有空閒的線程。若是當前核心線程池的線程個數已經達到了corePoolSize,則再也不從新建立線程。若是調用了prestartCoreThread()或者 prestartAllCoreThreads(),線程池建立的時候全部的核心線程都會被建立而且啓動。
  2. maximumPoolSize:表示線程池能建立線程的最大個數。若是當阻塞隊列已滿時,而且當前線程池線程個數沒有超過maximumPoolSize的話,就會建立新的線程來執行任務。
  3. keepAliveTime:空閒線程存活時間。若是當前線程池的線程個數已經超過了corePoolSize,而且線程空閒時間超過了keepAliveTime的話,就會將這些空閒線程銷燬,這樣能夠儘量下降系統資源消耗。
  4. unit:時間單位。爲keepAliveTime指定時間單位。
  5. workQueue:阻塞隊列。用於保存任務的阻塞隊列,關於阻塞隊列能夠看這篇文章。可使用ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue, PriorityBlockingQueue
  6. threadFactory:建立線程的工程類。能夠經過指定線程工廠爲每一個建立出來的線程設置更有意義的名字,若是出現併發問題,也方便查找問題緣由。
  7. handler:飽和策略。當線程池的阻塞隊列已滿和指定的線程都已經開啓,說明當前線程池已經處於飽和狀態了,那麼就須要採用一種策略來處理這種狀況。採用的策略有這幾種:
    1. AbortPolicy: 直接拒絕所提交的任務,並拋出RejectedExecutionException異常;
    2. CallerRunsPolicy:只用調用者所在的線程來執行任務;
    3. DiscardPolicy:不處理直接丟棄掉任務;
    4. DiscardOldestPolicy:丟棄掉阻塞隊列中存放時間最久的任務,執行當前任務

線程池執行邏輯ide

經過ThreadPoolExecutor建立線程池後,提交任務後執行過程是怎樣的,下面來經過源碼來看一看。execute方法源碼以下:源碼分析

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
	//若是線程池的線程個數少於corePoolSize則建立新線程執行當前任務
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
	//若是線程個數大於corePoolSize或者建立線程失敗,則將任務存放在阻塞隊列workQueue中
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
	//若是當前任務沒法放進阻塞隊列中,則建立新的線程來執行任務
    else if (!addWorker(command, false))
        reject(command);
}
複製代碼

ThreadPoolExecutor的execute方法執行邏輯請見註釋。下圖爲ThreadPoolExecutor的execute方法的執行示意圖:post

execute執行過程示意圖.jpg

execute方法執行邏輯有這樣幾種狀況:性能

  1. 若是當前運行的線程少於corePoolSize,則會建立新的線程來執行新的任務;
  2. 若是運行的線程個數等於或者大於corePoolSize,則會將提交的任務存放到阻塞隊列workQueue中;
  3. 若是當前workQueue隊列已滿的話,則會建立新的線程來執行任務;
  4. 若是線程個數已經超過了maximumPoolSize,則會使用飽和策略RejectedExecutionHandler來進行處理。

須要注意的是,線程池的設計思想就是使用了核心線程池corePoolSize,阻塞隊列workQueue和線程池maximumPoolSize,這樣的緩存策略來處理任務,實際上這樣的設計思想在須要框架中都會使用。ui

4. 線程池的關閉

關閉線程池,能夠經過shutdownshutdownNow這兩個方法。它們的原理都是遍歷線程池中全部的線程,而後依次中斷線程。shutdownshutdownNow仍是有不同的地方:

  1. shutdownNow首先將線程池的狀態設置爲STOP,而後嘗試中止全部的正在執行和未執行任務的線程,並返回等待執行任務的列表;
  2. shutdown只是將線程池的狀態設置爲SHUTDOWN狀態,而後中斷全部沒有正在執行任務的線程

能夠看出shutdown方法會將正在執行的任務繼續執行完,而shutdownNow會直接中斷正在執行的任務。調用了這兩個方法的任意一個,isShutdown方法都會返回true,當全部的線程都關閉成功,才表示線程池成功關閉,這時調用isTerminated方法纔會返回true。

5. 如何合理配置線程池參數?

要想合理的配置線程池,就必須首先分析任務特性,能夠從如下幾個角度來進行分析:

  1. 任務的性質:CPU密集型任務,IO密集型任務和混合型任務。
  2. 任務的優先級:高,中和低。
  3. 任務的執行時間:長,中和短。
  4. 任務的依賴性:是否依賴其餘系統資源,如數據庫鏈接。

任務性質不一樣的任務能夠用不一樣規模的線程池分開處理。CPU密集型任務配置儘量少的線程數量,如配置Ncpu+1個線程的線程池。IO密集型任務則因爲須要等待IO操做,線程並非一直在執行任務,則配置儘量多的線程,如2xNcpu。混合型的任務,若是能夠拆分,則將其拆分紅一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐率要高於串行執行的吞吐率,若是這兩個任務執行時間相差太大,則不必進行分解。咱們能夠經過Runtime.getRuntime().availableProcessors()方法得到當前設備的CPU個數。

優先級不一樣的任務可使用優先級隊列PriorityBlockingQueue來處理。它可讓優先級高的任務先獲得執行,須要注意的是若是一直有優先級高的任務提交到隊列裏,那麼優先級低的任務可能永遠不能執行。

執行時間不一樣的任務能夠交給不一樣規模的線程池來處理,或者也可使用優先級隊列,讓執行時間短的任務先執行。

依賴數據庫鏈接池的任務,由於線程提交SQL後須要等待數據庫返回結果,若是等待的時間越長CPU空閒時間就越長,那麼線程數應該設置越大,這樣才能更好的利用CPU。

而且,阻塞隊列最好是使用有界隊列,若是採用無界隊列的話,一旦任務積壓在阻塞隊列中的話就會佔用過多的內存資源,甚至會使得系統崩潰。

參考文獻

《Java併發編程的藝術》

ThreadPoolExecutor源碼分析,很詳細

相關文章
相關標籤/搜索