深度分析:Java併發編程之線程池技術,看完面試這個不再慌了!

線程池的好處

Java中的線程池是運用場景最多的併發框架,幾乎全部須要異步或併發執行任務的程序均可以使用線程池。在開發過程當中,合理地使用線程池,相對於單線程串行處理(Serial Processing)和爲每個任務分配一個新線程(One Task One New Thread)的作法可以帶來3個好處。數據庫

  1. 下降資源消耗。經過重複利用已建立的線程下降線程建立和銷燬形成的消耗。
  2. 提升響應速度。當任務到達時,任務能夠不須要等到線程建立就能當即執行。
  3. 提升線程的可管理性。線程是稀缺資源,若是無限制地建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行統一分配、調優和監控。可是,要作到合理利用線程池,必須對其實現原理了如指掌。

線程池的實現原理

下面全部的介紹都是基於JDK 1.8源碼。數組

架構設計

Java中的線程池核心實現類是ThreadPoolExecutor。這個類的設計是繼承了AbstractExecutorService抽象類和實現了ExecutorService,Executor兩個接口,關係大體以下圖所示:安全

下面將從頂向下逐個介紹這個4個接口與類。服務器

Executor

頂層接口Executor提供了一種將任務提交每一個任務的執行機制(包括線程使用的細節以及線程調度等)解耦分開的方法。使用Executor能夠避免顯式的建立線程。例如,對於一系列的任務,你可能會使用下列這種方式來代替new Thread(new(RunnableTask())).start()的方式:多線程

Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());

Executor接口提供了一個接口方法,用來在將來的某段時間執行指定的任務。指定的任務架構

  1. 可能由一個新建立的線程執行;
  2. 可能由一個線程池中空閒的線程執行;
  3. 也可能由方法的調用線程執行。

這些可能執行方式都取決於Executor接口實現類的設計或實現方式。併發

public interface Executor {
    void execute(Runnable command);
}

Serial Processing

事實上,Executor接口並無嚴格的要求線程的執行須要異步進行。最簡單的接口實現方法是,將全部的任務以調用方法的線程執行。框架

class DirectExecutor implements Executor {
   public void execute(Runnable r) {
     r.run();
   }
}

這種實際上就是上面提到的Serial Processing的方式。假設,咱們如今以這種方式去實現一個響應請求的服務器應用。那麼,這種實現方式雖然在理論上是正確的。異步

  1. 可是其性能卻很是差,由於它每次只能響應處理一個請求。若是有大量請求則只能串行響應。
  2. 同時,若是服務器響應邏輯裏面有文件I/O或者數據庫操做,服務器須要等待這些操做完成才能繼續執行。這個時候若是阻塞的時間過長,服務器資源利用率就很低。這樣,在等待過程當中,服務器CPU將處於空閒狀態。

綜上,這種Serial Processing的方式方式就會有沒法快速響應問題低吞吐率問題。函數

One Task One New Thread

不過,更典型的實現方式是,任務由一些其餘的線程執行而不是方法調用的線程執行。例如,下面的Executor的實現方法是對於每個任務都新建一個線程去執行。

class ThreadPerTaskExecutor implements Executor {
   public void execute(Runnable r) {
     new Thread(r).start();
   }
}

這種方式實際上就是上面提到的One Task One New Thread的方式,這種無限建立線程的方法也有不少問題。

  1. 線程生命週期的開銷很是高。若是有大量任務須要執行,那麼就須要建立大量線程。這樣就會形成線程生命週期的建立和銷燬的開銷很是大。
  2. 資源消耗。活躍的線程會消耗系統資源,尤爲是內存。若是,已經有足夠多的線程使全部的CPU保持忙碌狀態,那麼在建立更多的線程反而會下降性能。最簡單的例子是,一個4核的CPU機器,對於100個任務建立100個線程去執行。
  3. 穩定性。可建立線程的數量上存在一個限制。這個限制受JVM啓動參數,棧大小以及底層操做系統對線程的限制等因素。超過了這個限制,就可能拋出OutOfMemoryError異常。

ExecutorService

ExecutorService接口是繼承自Executor接口,並增長了一些接口方法。接口也能夠繼承?之前沒注意,如今學習到了。這裏介紹下接口繼承的語義

  1. 接口Executor有execute(Runnable)方法,接口ExecutorService繼承Executor,不用複寫Executor的方法。只須要,寫本身的方法(業務)便可。
  2. 當一個類ThreadPoolExecutor要實現ExecutorService接口的時候,須要實現ExecutorService和Executor兩個接口的方法。

ExecutorService大體新增了2類接口方法:

  1. ExecutorService的關閉方法。對於線程池實現,這些方法的具體實如今ThreadPoolExecutor裏面。
  2. 擴充異步執行任務的方法。對於線程池實現,用的這類方法都是AbstractExecutorService抽象類裏面實現的模板方法。

AbstractExecutorService

抽象類AbstractExecutorService提供了ExecutorService接口類中各類submit異步執行方法的實現,這些方法與Executor.execute(Runnable)相比,它們都是有返回值的。同時,這些方法的實現的最終都是調用ThreadPoolExecutor類中實現的execute(Runnable)方法。

儘管說submit方法能提供線程執行的返回值,但只有實現了Callable纔會有返回值,而實現Runnable的返回值是null。

public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

除此以外,這個抽象類中還有ExecutorService接口類中invokeAny和invokeAll方法的實現。這裏就只是簡單介紹下這2個種方法的語義。

invokeAny

  1. invokeAny() 接收一個包含 Callable 對象的集合做爲參數。調用該方法不會返回 Future 對象,而是返回集合中某一個Callable對象的運行結果
  2. 這個方法無法保證調用以後返回的結果是哪個Callable,只知道它是這些 Callable 中一個執行結束的Callable 對象。

invokeAll

  1. invokeAll接受一個包含 Callable 對象的集合做爲參數。調用該方法會返回一個Future 對象的列表,對應輸入的Callable 對象的集合的運行結果。
  2. 這裏提交的任務容器列表和返回的Future列表存在順序對應的關係

ThreadPoolExecutor

execute(Runnable)方法

線程池是如何執行輸入的任務,這個整個線程池實現的核心邏輯,咱們從這個方法開始學習。其代碼以下所示:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

能夠發現,當提交一個新任務到線程池時,線程池的處理流程以下:

  1. 判斷線程池中工做的線程數是否小於核心線程數(corePoolSize)。若是是,則新建一個新的工做線程來執行任務(須要獲取全局鎖)。不然,進入下個流程。
  2. 判斷線程池的工做隊列(BlockingQeue)是否已滿。若是未滿,將新加的任務存儲在工做隊列中。不然,進入下個流程。
  3. 判斷線程池中工做的線程數是否小於最大線程數(maximumPoolSize)。若是小於,則新建一個工做線程來執行任務(須要獲取全局鎖)。
  4. 若是大於或者等於,則交給飽和策略處理這個任務。

新提交任務處理流程圖

以流程圖來講明的話,線程池處理一個新提交的任務的流程以下圖所示:

ThreadPoolExecutor執行示意圖

從上面的內容,咱們能夠發現線程池對於一個新任務有4種處理的可能,分別對應於上面處理流程的4個步驟。

ThreadPoolExecutor採起上述步驟的整體設計思路,是爲了在執行execute()方法時,儘量地避免獲取全局鎖(那將會是一個嚴重的可伸縮瓶頸)。在ThreadPoolExecutor完成預熱以後(當前運行的線程數大於等於corePoolSize),幾乎全部的execute()方法調用都是執行步驟2,而步驟2不須要獲取全局鎖。

工做線程

從上面execute(Runnable)的代碼咱們能夠發現,線程池建立線程時,會將線程封裝成工做線程Worker,Worker在執行完任務後,還會循環獲取工做隊列裏的任務來執行。

ThreadPoolExecutor中線程執行任務的示意圖以下所示:

線程池中的線程執行任務分兩種狀況:

  1. 在execute()方法中建立一個線程時,會讓這個線程執行當前任務。
  2. 這個線程執行完上圖中1的任務後,會反覆從BlockingQueue獲取任務來執行。

ThreadPoolExecutor的ctl變量

ctl 是一個 AtomicInteger 的類,保存的 int 變量的更新都是原子操做,保證線程安全。它的前面3位用來表示線程池狀態,後面29位用來表示工程線程數量

ThreadPoolExecutor的狀態

線程池的狀態有5種:

  1. Running:線程池處在Running的狀態時,可以接收新任務,以及對已添加的任務進行處理。線程池的初始化狀態是RUNNING。換句話說,線程池被一旦被建立,就處於Running狀態,而且線程池中的任務數爲0。

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

  2. Shutdown: 線程池處在SHUTDOWN狀態時,不接收新任務,但能處理已添加(正在運行的以及在BlockingQueue)的任務。調用線程池的shutdown()接口時,線程池由RUNNING -> SHUTDOWN。
  3. Stop: 線程池處在STOP狀態時,不接收新任務,不處理已添加的任務,而且會中斷正在運行的任務。 調用線程池的shutdownNow()接口時,線程池由(RUNNING or SHUTDOWN ) -> STOP。
  4. Tidying: 當全部的任務已終止,ctl記錄的」任務數量」爲0,線程池會變爲Tidying狀態。當線程池變爲Tidying狀態時,會執行鉤子函數terminated()。terminated()在ThreadPoolExecutor類中是空的,若用戶想在線程池變爲Tidying時,進行相應的處理;能夠經過重載terminated()函數來實現。
  5. Terminated: 線程池完全終止,就變成Terminated狀態。 線程池處在Tidying狀態時,執行完terminated()以後,就會由 Tidying -> Terminated。

線程池的使用

線程池的建立

咱們能夠經過ThreadPoolExecutor的構造函數來建立一個線程池。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  1. corePoolSize(線程池的核心線程數):線程池要保持的線程數目,即便是他們是空閒也不會中止。 當提交一個任務到線程池時,線程池會建立一個線程來執行任務,即便其餘空閒的基本線程可以執行新任務也會建立線程,等到須要執行的任務數大於線程池基本大小時就再也不建立。若是調用了線程池的prestartAllCoreThreads()方法,線程池會提早建立並啓動全部基本線程。
  2. maximumPoolSize(線程池的最大線程數): 線程池容許建立的最大線程數。若是隊列滿了,而且已建立的線程數小於最大線程數,則線程池會再建立新的線程執行任務。值得注意的是,若是使用了無界的任務隊列這個參數就沒什麼效果
  3. keepAliveTime(線程活動保持時間): 當線程池中的線程數大於corePoolSize時,keepAliveTime爲多餘的空閒線程等待新任務的最長保持存活的時間。因此,若是任務不少,而且每一個任務執行的時間比較短,能夠調大時間,提升線程的利用率。
  4. unit(線程活動保持時間的單位) : 可選的單位有天(DAYS)、小時(HOURS)、分鐘(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和納秒(NANOSECONDS,千分之一微秒)。
  5. runnableTaskQueue(任務隊列):用於保存等待執行的任務的阻塞隊列。能夠選擇如下幾個阻塞隊列。
  • ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按FIFO(先進先出)原則對元素進行排序。
  • LinkedBlockingQueue:一個基於鏈表結構的無界阻塞隊列,此隊列按FIFO排序元素,吞吐量一般要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列。
  • SynchronousQueue:一個不存儲元素的阻塞隊列。每一個插入操做必須等到另外一個線程調用移除操做,不然插入操做一直處於阻塞狀態,吞吐量一般要高於Linked-BlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個隊列。
  • PriorityBlockingQueue:一個具備優先級的無限阻塞隊列。
  1. ThreadFactory:用於設置建立線程的工廠,能夠經過線程工廠給每一個建立出來的線程設置更有意義的名字。
  2. RejectedExecutionHandler(飽和策略):當ThreadPoolExecutor已經關閉或ThreadPoolExecutor已經飽和 時(達到了最大線程池大小且工做隊列已滿),execute()方法將要調用的Handler,那麼必須採起一種策略處理提交的新任務。這個策略默認狀況下是AbortPolicy。Java線程池框架提供瞭如下4種策略:

    • AbortPolicy:直接拋出異常
    • CallerRunsPolicy:只用調用者所在線程來運行任務
    • DiscardOldestPolicy:丟棄隊列裏最老的一個任務,並執行當前任務
    • DiscardPolicy:不處理,丟棄掉

經常使用ThreadPoolExecutor

經過Executor框架的工具類Executors,能夠建立如下3種類型的ThreadPoolExecutor。經過源碼能夠發現這3種線程池的本質都是不一樣輸入參數配置的ThreadPoolExecutor。

FixedThreadPool

FixedThreadPool被稱爲可重用固定線程數的線程池。下面是FixedThreadPool的源代碼實現。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

注意到,

  1. FixedThreadPool的corePoolSize和maximumPoolSize都被設置爲建立時的同一個指定的參數nThreads。
  2. 任務阻塞隊列使用的是無界隊列new LinkedBlockingQueue()。
  3. keepAliveTime設置爲0。
  4. ThreadFactory和RejectedExecutionHandler皆使用的默認值。

FixedThreadPool的execute()方法的運行示意圖以下所示:

其運行說明:

  1. 若是當前運行的線程數少於corePoolSize,則建立新線程來執行任務。
  2. 在線程池完成預熱以後(當前運行的線程數等於corePoolSize),將任務加入LinkedBlockingQueue。
  3. 線程執行完1中的任務後,會在循環中反覆從LinkedBlockingQueue獲取任務來執行。

FixedThreadPool使用無界隊列LinkedBlockingQueue做爲線程池的工做隊列(隊列的容量爲Integer.MAX_VALUE)對線程池會帶來以下影響:

  1. 當線程池中的線程數達到corePoolSize後,新任務將在無界隊列中等待。因爲無界隊列永遠不會滿,所以線程池中的線程數不會超過corePoolSize。
  2. 因爲1,使用無界隊列時maximumPoolSize將是一個無效參數。
  3. 因爲1和2,使用無界隊列時keepAliveTime將是一個無效參數。不會有超過corePoolSize的線程數目。
  4. 因爲使用無界隊列。運行中的FixedThreadPool(未執行方法shutdown()或shutdownNow())不會拒絕任務(不會調用RejectedExecutionHandler.rejectedExecution方法)。
SingleThreadExecutor

SingleThreadExecutor是使用單個worker線程的Executor。SingleThreadExecutor與FixedThreadPool相似,只是它的corePoolSize和maximumPoolSize被設置爲1。下面是SingleThreadExecutor的源代碼實現。

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
CachedThreadPool

CachedThreadPool是一個會根據須要建立新線程的線程池。下面是建立CachedThread-Pool的源代碼。

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

注意到:

  1. CachedThreadPool的corePoolSize被設置爲0,即corePool爲空;maximumPoolSize被設置爲 Integer.MAX_VALUE,即maximumPool是無界的。
  2. keepAliveTime設置爲60L,意味着CachedThreadPool中的空閒線程等待新任務的最長時間爲60秒,空閒線程超過60秒後將會被終止。
  3. CachedThreadPool使用沒有容量的SynchronousQueue做爲線程池的工做隊列,但CachedThreadPool的maximumPool是無界的。這意味着,若是主線程提交任務的速度高於maximumPool中線程處理任務的速度時,CachedThreadPool會不斷建立新線程。極端狀況下,CachedThreadPool會由於建立過多線程而耗盡CPU和內存資源。

CacheThreadPool的execute()方法的執行過程以下圖所示:

其執行過程的說明以下:

  1. 首先執行SynchronousQueue.offer(Runnable task)。若是當前maximumPool中有空閒線程正在執行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那麼主線程執行offer操做與空閒線程執行的poll操做配對成功,主線程把任務交給空閒線程執行;不然執行下面的步驟2。
  2. 當初始maximumPool爲空,或者maximumPool中當前沒有空閒線程時,將沒有線程執行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。這種狀況下,CachedThreadPool將會建立一個新線程執行任務。
  3. 步驟2中新建立的線程將任務執行完後,會執行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。這個poll操做會讓空閒線程最多在SynchronousQueue中等待60秒鐘。若是60秒鐘內主線程提交了一個新任務(主線程執行步驟1),那麼這個空閒線程將執行主線程提交的新任務;不然,這個空閒線程將終止。因爲空閒60秒的空閒線程會被終止,所以長時間保持空閒的CachedThreadPool不會使用任何資源。

向線程池提交任務

可使用兩個方法向線程池提交任務,分別爲execute()和submit()方法。

  1. execute()方法用於提交不須要返回值的任務,因此沒法判斷任務是否被線程池執行成功。通常execute()方法輸入的任務是一個Runnable類的實例。
  2. submit()方法用於提交須要返回值的任務。線程池會返回一個future類型的對象,經過這個future對象能夠判斷任務是否執行成功,而且能夠經過future的get()方法來獲取返回值get()方法會阻塞當前線程直到任務完成,而使用get(long timeout,TimeUnit unit)方法則會阻塞當前線程一段時間後當即返回,這時候有可能任務沒有執行完。

關閉線程池

能夠經過調用線程池的shutdown或者shutdownNow方法來關閉線程池。它們的原理是遍歷線程池中的工做線程,而後逐個調用線程的interrupt方法來中斷線程,因此沒法響應中斷的任務可能永遠沒法終止。可是它們存在必定的區別。

  1. shutdown首先將線程池的狀態設置成SHUTDOWN。而後阻止新提交的任務,對於新提交的任務,若是測試到狀態不爲RUNNING,則拋出rejectedExecution 。對於已經提交(正在運行的以及在任務隊列中的)任務不會產生任何影響。同時會將那些閒置的線程(idleWorkers)進行中斷
  2. shutdownNow首先將線程池的狀態設置成STOP。而後阻止新提交的任務,對於新提交的任務,若是測試到狀態不爲RUNNING,則拋出rejectedExecution 同時會中斷當前正在運行的線程。另外它還將BolckingQueue中的任務給移除,並將這些任務添加到列表中進行返回

線程池的監控

能夠經過線程池提供的參數進行監控,在監控線程池的時候可使用如下屬性:

  1. taskCount:線程池須要執行的任務數量。
  2. completedTaskCount:線程池在運行過程當中已完成的任務數量,小於或等於taskCount。
  3. largestPoolSize:線程池裏曾經建立過的最大線程數量。經過這個數據能夠知道線程池是 否曾經滿過。如該數值等於線程池的最大大小,則表示線程池曾經滿過。
  4. getPoolSize:線程池的線程數量。若是線程池不銷燬的話,線程池裏的線程不會自動銷 毀,因此這個大小隻增不減。
  5. getActiveCount:獲取活動的線程數。

另外,經過擴展線程池進行監控。能夠經過繼承線程池來自定義線程池,重寫線程池的beforeExecute、afterExecute和terminated方法,也能夠在任務執行前、執行後和線程池關閉前執行一些代碼來進行監控。例如,監控任務的平均執行時間、最大執行時間和最小執行時間等。這幾個方法在線程池裏是空方法。

相關文章
相關標籤/搜索