Java併發篇-全面解析Executor框架

Executor你們族成員

成員分爲四個部分:任務、任務執行、任務執行結果以及任務執行工具類java

任務:實現Callable接口或Runnable接口面試

任務執行部分:ThreadPoolExecutor以及ScheduledThreadPoolExecutor多線程

任務執行結果:Future接口以及FutureTask實現類框架

任務執行工廠類:Executors異步

任務執行框架

ThreadPoolExecutor(核心)ide

ThreadPoolExecutor一般使用Executors進行建立,其內部含有三種線程池:函數

  1. FixedThreadPool:含有固定線程數的線程池。工具

  2. SingleThreadExecutor單線程的線程池,須要保證任務順序執行時採用。post

  3. CachedThreadPool大小無界的線程池,只要須要線程就能夠一直建立線程。this

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor一般使用Executors進行建立,其內部含有兩種線程池:

  1. ScheduledThreadPoolExecutor:含有固定線程數的定時任務線程池
  2. SingleThreadScheduledExecutor:只包含一個線程數的定時任務線程池,須要保證任務順序執行時採用。

任務執行結果

提交任務給任務執行框架執行後,任務執行框架會返回一個Future接口類型的對象,該對象內部包含了任務執行結果信息。

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<> submit(Runnable task)
複製代碼

在JDK1.8返回的是FutureTask對象,但不必定返回的就是FutureTask對象,只保證返回Future接口。

任務

實現RunnableCallable都可被任務執行框架執行,它們之間的區別是實現Runnable的任務執行完成後不會返回結果,而實現Callable接口的任務能夠返回結果。

返回結果爲null的封裝(沒有什麼意義)

public static Callable<Object> callable(Runnable task) 複製代碼

擁有返回結果的封裝

public static <T> Callable<T> callable(Runnable task, T result) 複製代碼

ThreadPoolExecutor

上一篇講過ThreadPoolExecutor構造函數中的參數含義,這裏再也不贅述,若是忘記了或者還沒閱讀過,能夠先去了解一下,兩篇文章是銜接的:juejin.im/post/5e9c45…

主要參數有4個:

  1. corePolSize:核心線程池大小
  2. maximumPoolSize:最大線程池大小
  3. BlockingQueue:存儲未執行任務的阻塞隊列
  4. RejectedExecutionHandler:任務沒法被執行時的拒絕策略

FixedThreadPool詳解

fixedThreadPool稱爲可重用固定線程數的線程池,下面是構造該線程池的方法源碼:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
複製代碼

參數解讀

  1. corePoolSizemaximumPoolSize都設置爲用戶傳入的nThreads
  2. keepAliveTime設置爲0L,表明只要空閒線程閒下來就立刻被終止,若是設置爲SECONDS > 0,則表明線程空閒下來後等待新任務的最長時間SECONDS
  3. TimeUnit表明keepAliveTime的單位
  4. workQueue設置阻塞隊列

execute()執行流程

curThread表明核心線程池的當前線程數

  1. 若是 curThread < corPoolSize,那麼建立新線程執行任務

  2. 若是 curThread = corPoolSize,那麼將任務加入阻塞隊列當中

  3. 線程池中的線程執行完任務後會空閒下來,而後會一直從阻塞隊列中獲取任務執行

使用無界阻塞隊列LinkedBlockingQueue的後果:

  1. curThread = corPoolSize後,任務會一直加入到阻塞隊列,嚴重時阻塞隊列的任務數過多會形成GC內存溢出
  2. 使用無界隊列,那麼參數maximumPoolSize是一個無效參數。只有當任務沒法加入到阻塞隊列時知足curThread < maximumPoolSiize纔會在線程池中建立新的線程執行該任務。
  3. 因爲第1點和第2點致使keepAliveTime參數無效
  4. 使用無界隊列後,任務均可以加入到阻塞隊列中,因此不會出現curThread > maximumPoolSize,也就不會出現任務拒絕執行的狀況,不會調用任務拒絕方法。

GC內存溢出示例(我的理解,若是有錯,歡迎指出!):

public class Task implements Runnable {    
    private String name;   
    public Task(String name) {
        this.name = name;
    }
    @Override
    public void run() {
        System.out.println(name);
    }
}
public class FixedThreadPoolTest {   
    private static final ExecutorService fixThreadPool = Executors.newFixedThreadPool(10);
    private static int i = 0;
    
    public static void main(String[] args) {
        // 不停地往線程池中提交任務
        for (;i < 100000;i++) {
            fixThreadPool.execute(new Task("任務" + i));
        }
        fixThreadPool.shutdown();
    }
}
複製代碼

設置堆內存參數:-Xms2m -Xmx2m

拋出OOM異常,緣由在於阻塞隊列中的任務數過多。

SingleThreadPool詳解

使用單個Worker線程的Executor。本質上是fixedThreadPoolcorePoolSizemaximumPoolSize都設置爲1,此外無其它區別。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
複製代碼

參數解釋

  1. corePoolSizeMaximumPoolSize的參數都設置爲1,其它參數含義和默認值都和fixedThreadPool相同。
  2. 無界阻塞隊列對線程池的影響和fixedThreadPool相同。

execute()執行流程

  1. 若是線程池尚未預熱,第一個任務來的時候就建立Worker線程來執行任務
  2. 沒法建立新線程時放入阻塞隊列中
  3. 惟一的工做者線程不斷地執行阻塞隊列中的任務

CachedThreadPool詳解

根據須要建立新線程的線程池

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
複製代碼

參數解釋

  1. corePoolSize設置爲0,即corePool爲空
  2. maximumPoolSize設置爲Integer.MAX_VALUE,因此maximumPool是幾乎無界的,可容納足夠多的線程
  3. keepAliveTime設置爲60L,每一個空閒線程能夠等待新任務的時間是60秒
  4. TimeUnit單位設置爲SECONDS表明以秒爲單位計算
  5. 阻塞隊列採用無界的SynchronousQueue,每一次的offer操做都要對應一個poll操做

execute()執行流程

  1. 先把任務offer進阻塞隊列中
  2. 若是線程池有空閒的線程則從阻塞隊列中poll出任務執行,不然執行第3步
  3. 建立新的線程,並從隊列中poll出任務執行。

ScheduledThreadPoolExecutor

因爲JDK1.6與JDK1.8中延時隊列的實現不一樣,因此在閱讀本小節時不要死記硬背其實現,掌握其思想。圖的組件均會用中文表示,對思想的掌握有幫助,實現的思想是一致的。

該類用於指定特定的延遲時間後運行任務,或者按期執行任務Timer是單線程的,該類是多線程的,並且該類功能更增強大。

運行機制

ScheduledThreadPoolExecutor將任務提交到延時隊列當中存儲,當任務到達執行時間時,Worker工做線程會拉取任務進行執行,空閒的線程會一直阻塞,不斷嘗試獲取延時隊列中的任務。

ScheduledThreadPoolExecutor本質上是ThreadPoolExecutor的改造:

  1. 使用延時隊列做爲任務隊列
  2. 獲取任務的方式不一樣,空閒線程將一直阻塞獲取任務
  3. 執行週期任務後,增長了額外的處理操做

實現方法

調用線程把待調度的任務(ScheduledFutureTask)放入一個延時隊列當中,ScheduledFutureTask的主要成員變量有三個:

  1. time:該任務要被執行的具體時間
  2. sequenceNumber:該任務被添加到ScheduledThreadPoolExecutor的序號,標記任務添加到隊列的時刻
  3. period:任務執行的間隔週期

延時隊列中採用優先級隊列進行排序。排序時,time越小排在越前面,time相同時,sequenceNumber越小排在越前面。

ScheduledThreadPoolExecutor執行任務的流程以下圖:

  1. 獲取已到期的任務,到期的條件是time >= 當前時間
  2. 線程1執行該任務
  3. 線程1修改該任務的time變量爲下次要被執行的時間
  4. 把該任務放回到延時隊列當中

FutureTask

表示異步計算的結果,實現了Runnable、Future接口。

FutureTask的幾種狀態

  1. 未啓動。FutureTask沒有執行run()方法以前處於未啓動狀態
  2. 已啓動。FutureTask.run()方法被執行的過程當中
  3. 已完成。FutureTask.run()方法執行完後正常結束、或被取消、或執行方法時拋出異常而異常結束

FutureTask的狀態轉換圖以下所示

FutureTask的核心方法

FutureTask.run()

當任務處於未啓動狀態時,調用FutureTask.run()會使任務執行。

FutureTask.get()

FutureTask處於未啓動或已啓動狀態時,執行FutureTask.get()會致使線程阻塞

當FutureTask處於已完成狀態時,調用FutureTask.get()會當即返回結果或者拋出異常

FutureTask.cancel()

當任務處於未啓動狀態時,調用FutureTask.cancel()會致使此任務永遠不會被執行

當任務處於已啓動狀態時,執行FutureTask.cancel(true)會中斷任務,執行FutureTask.cancel(false)方法將不會對正在執行此任務的線程產生任何影響;

當FutureTask處於完成狀態時,執行FutureTask.cancel(...)方法將返回false

get()和cancel()的狀態轉換圖:

FutureTask的使用

當一個線程須要等待另外一個線程把某個任務執行完後才能繼續執行,可使用FutureTask。這種用法並非很常見,只要知道有這種用法就能夠了。

總結

這一章主要和上一章的線程池有很大聯繫,閱讀到這裏很是不容易,收穫也應該不少,留下幾道面試題來檢驗本身是否真的掌握了這些內容吧。

  1. Executor框架主要有哪些成員?
  2. 任務執行框架主要有哪幾種線程池?
  3. 線程池有哪些參數?它們的含義是什麼?
  4. 每種線程池的工做流程是如何的?簡單介紹一下(越詳細越好,使用了什麼隊列、線程池大小······)
  5. 任務執行完成後返回的結果如何接收?
  6. FutureTask有哪幾種狀態?有哪些核心方法?調用方法會致使狀態如何變化?(靈魂三問)

若是這篇文章能給你帶來一點點的小收穫,你的一個小小的點讚我會開心一成天!感謝你的閱讀!

相關文章
相關標籤/搜索