【Java多線程】Executor框架的詳解

在Java中,使用線程來異步執行任務。Java線程的建立與銷燬須要必定的開銷,若是咱們爲每個任務建立一個新線程來執行,這些線程的建立與銷燬將消耗大量的計算資源。同時,爲每個任務建立一個新線程來執行,這種策略可能會使處於高負荷狀態的應用最終崩潰。html

Java線程既是工做單元,也是執行單元。從JDK1.5開始,把工做單元與執行機制分離開來。工做單元包括Runnable 和 Callable,而執行機制由Executor框架提供。java

Executor框架簡介

Executor框架的兩級調度模型

在HotSpot VM的線程模型中,Java線程被一對一映射爲本地操做系統線程。Java線程啓動時會建立一個本地操做系統線程;當Java線程終止時,這個操做系統線程也會被回收。操做系統會調用全部線程並將他們分配給可用的CPU。編程

能夠將此種模式分爲兩層,在上層,Java多線程程序一般把應用程序分解爲若干任務,而後使用用戶級的調度器(Executor框架)講這些任務映射爲固定數量的線程;在底層,操做系統內核將這些線程映射到硬件處理器上。多線程

兩級調度模型的示意圖:併發

這裏寫圖片描述

從圖中能夠看出,該框架用來控制應用程序的上層調度(下層調度由操做系統內核控制,不受應用程序的控制)。框架

 

Executor框架的結構和成員

Executor框架的結構

1. 任務異步

包括被執行任務須要實現的接口:Runnable接口和Callable接口高併發

2. 任務的執行工具

包括任務執行機制的核心接口Executor,以及繼承自Executor的ExecutorService接口。spa

Executor框架有兩個關鍵類實現了ExecutorService接口:ThreadPoolExecutor 和 ScheduledThreadPoolExecutor

3. 異步計算的結果

包括Future和實現Future接口的FutureTask類。

Executor框架的類與接口

示意圖

  • Executor是一個接口,他是Executor框架的基礎,它將任務的提交與任務的執行分離。
  • ThreadPoolExecutor是線程池的核心實現類,用來執行被提交的任務。
  • ScheduledThreadPoolExecutor是一個實現類,能夠在給定的延遲後運行命令,或者按期執行命令。ScheduledThreadPoolExecutor 比 Timer 更靈活,功能更強大。
  • Future接口和它的實現FutureTask類,表明異步計算的結果。
  • Runnable和Callable接口的實現類,均可以被ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 執行。

 

Executor框架的使用

 先來看個圖:

  1. 主線程首先要建立實現 Runnable接口或者Callable接口的任務對象。工具類Executors能夠把一個Runnable對象封裝爲一個Callable對象
    Executors.callable(Runnale task); 
    或
    Executors.callable(Runnable task, Object resule); 
  2. 而後能夠把Runnable對象直接交給ExecutorService執行
    ExecutorServicel.execute(Runnable command);
    或者也能夠把Runnable對象或Callable對象提交給ExecutorService執行
    ExecutorService.submit(Runnable task);

    若是執行ExecutorService.submit(...),ExecutorService將返回一個實現Future接口的對象(到目前爲止的JDK中,返回的是FutureTask對象)。因爲FutureTask實現了Runnable接口,咱們也能夠建立FutureTask類,而後直接交給ExecutorService執行。  

  3. 最後,主線程能夠執行FutureTask.get()方法來等待任務執行完成。主線程也能夠執行FutureTask.cancel(boolean mayInterruptIfRunning)來取消此任務的執行。

 

 ThreadPoolExecutor詳解

 Executor框架最核心的類是ThreadPoolExecutor

 ThreadPoolExecutor的組件構成

  • corePool:核心線程池的大小
  • maximumPool:最大線程池的大小
  • BlockingQueue:用來暫時保存任務的工做隊列
  • RejectedExecutionHandler:當ThreadPoolExecutor已經關閉或ThreadPoolExecutor已經飽和時(達到了最大線程池的大小且工做隊列已滿),execute()方法將要調用的Handler。

 

Executor 可 以 創 建 3 種 類 型 的 ThreadPoolExecutor 線 程 池:

 1. FixedThreadPool

建立固定長度的線程池,每次提交任務建立一個線程,直到達到線程池的最大數量,線程池的大小再也不變化。

這個線程池能夠建立固定線程數的線程池。特色就是能夠重用固定數量線程的線程池。它的構造源碼以下:

public static ExecutorService newFixedThreadPool(int nThreads) {  
        return new ThreadPoolExecutor(nThreads, nThreads, 0L, 
                                      TimeUnit.MILLISECONDS,  
                                      new LinkedBlockingQueue<Runnable>());  
}  
  • FixedThreadPool的corePoolSize和maxiumPoolSize都被設置爲建立FixedThreadPool時指定的參數nThreads。
  • 0L則表示當線程池中的線程數量操做核心線程的數量時,多餘的線程將被當即中止
  • 最後一個參數表示FixedThreadPool使用了無界隊列LinkedBlockingQueue做爲線程池的作工隊列,因爲是無界的,當線程池的線程數達到corePoolSize後,新任務將在無界隊列中等待,所以線程池的線程數量不會超過corePoolSize,同時maxiumPoolSize也就變成了一個無效的參數,而且運行中的線程池並不會拒絕任務。

FixedThreadPool運行圖以下

執行過程以下:

1.若是當前工做中的線程數量少於corePool的數量,就建立新的線程來執行任務。

2.當線程池的工做中的線程數量達到了corePool,則將任務加入LinkedBlockingQueue。

3.線程執行完1中的任務後會從隊列中去任務。

注意LinkedBlockingQueue是無界隊列,因此能夠一直添加新任務到線程池。

 

2. SingleThreadExecutor  

SingleThreadExecutor是使用單個worker線程的Executor。特色是使用單個工做線程執行任務。它的構造源碼以下:

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
}

  

SingleThreadExecutor的corePoolSize和maxiumPoolSize都被設置1。
其餘參數均與FixedThreadPool相同,其運行圖以下:

 

執行過程以下:

1.若是當前工做中的線程數量少於corePool的數量,就建立一個新的線程來執行任務。

2.當線程池的工做中的線程數量達到了corePool,則將任務加入LinkedBlockingQueue。

3.線程執行完1中的任務後會從隊列中去任務。

注意:因爲在線程池中只有一個工做線程,因此任務能夠按照添加順序執行。

 

 3. CachedThreadPool

 CachedThreadPool是一個」無限「容量的線程池,它會根據須要建立新線程。特色是能夠根據須要來建立新的線程執行任務,沒有特定的corePool。下面是它的構造方法:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

  

CachedThreadPool的corePoolSize被設置爲0,即corePool爲空;maximumPoolSize被設置爲Integer.MAX_VALUE,即maximum是無界的。這裏keepAliveTime設置爲60秒,意味着空閒的線程最多能夠等待任務60秒,不然將被回收。
 
CachedThreadPool使用沒有容量的SynchronousQueue做爲主線程池的工做隊列,它是一個沒有容量的阻塞隊列。每一個插入操做必須等待另外一個線程的對應移除操做。這意味着,若是主線程提交任務的速度高於線程池中處理任務的速度時,CachedThreadPool會不斷建立新線程。極端狀況下,CachedThreadPool會由於建立過多線程而耗盡CPU資源。其運行圖以下:

 

執行過程以下:

1.首先執行SynchronousQueue.offer(Runnable task)。若是在當前的線程池中有空閒的線程正在執行SynchronousQueue.poll(),那麼主線程執行的offer操做與空閒線程執行的poll操做配對成功,主線程把任務交給空閒線程執行。,execute()方法執行成功,不然執行步驟2

2.當線程池爲空(初始maximumPool爲空)或沒有空閒線程時,配對失敗,將沒有線程執行SynchronousQueue.poll操做。這種狀況下,線程池會建立一個新的線程執行任務。

3.在建立完新的線程之後,將會執行poll操做。當步驟2的線程執行完成後,將等待60秒,若是此時主線程提交了一個新任務,那麼這個空閒線程將執行新任務,不然被回收。所以長時間不提交任務的CachedThreadPool不會佔用系統資源。

SynchronousQueue是一個不存儲元素阻塞隊列,每次要進行offer操做時必須等待poll操做,不然不能繼續添加元素。

 

參考書籍:《Java併發編程的藝術》,《Java併發編程實戰》,《Java高併發程序設計》

更多內容:http://www.cnblogs.com/study-everyday/

相關文章
相關標籤/搜索