線程池沒你想的那麼簡單(續)

前言

前段時間寫過一篇《線程池沒你想的那麼簡單》,和你們一塊兒擼了一個基本的線程池,具有:java

  • 線程池基本調度功能。
  • 線程池自動擴容縮容。
  • 隊列緩存線程。
  • 關閉線程池。

這些功能,最後也留下了三個待實現的 featuresgit

  • 執行帶有返回值的線程。
  • 異常處理怎麼辦?
  • 全部任務執行完怎麼通知我?

此次就實現這三個特性來看看 j.u.c 中的線程池是如何實現這些需求的。github

再看本文以前,強烈建議先查看上文 《線程池沒你想的那麼簡單》

任務完成後的通知

你們在用線程池的時候或多或少都會有這樣的需求:api

線程池中的任務執行完畢後再通知主線程作其餘事情,好比一批任務都執行完畢後再執行下一波任務等等。緩存

以咱們以前的代碼爲例:安全

總共往線程池中提交了 13 個任務,直到他們都執行完畢後再打印 「任務執行完畢」 這個日誌。

執行結果以下:多線程

爲了簡單的達到這個效果,咱們能夠在初始化線程池的時候傳入一個接口的實現,這個接口就是用於任務完成以後的回調。併發

public interface Notify {

    /**
     * 回調
     */
    void notifyListen() ;
}

以上就是線程池的構造函數以及接口的定義。異步

因此想要實現這個功能的關鍵是在什麼時候回調這個接口?函數

仔細想一想其實也簡單:只要咱們記錄提交到線程池中的任務及完成的數量,他們二者的差爲 0 時就認爲線程池中的任務已執行完畢;這時即可回調這個接口。

因此在往線程池中寫入任務時咱們須要記錄任務數量:

爲了併發安全的考慮,這裏的計數器採用了原子的 AtomicInteger


而在任務執行完畢後就將計數器 -1 ,一旦爲 0 時則任務任務所有執行完畢;這時即可回調咱們自定義的接口完成通知。


JDK 的實現

這樣的需求在 jdk 中的 ThreadPoolExecutor 中也有相關的 API ,只是用法不太同樣,但本質原理都大同小異。

咱們使用 ThreadPoolExecutor 的常規關閉流程以下:

executorService.shutdown();
    while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) {
        logger.info("thread running");
    }

線程提交完畢後執行 shutdown() 關閉線程池,接着循環調用 awaitTermination() 方法,一旦任務所有執行完畢後則會返回 true 從而退出循環。

這兩個方法的目的和原理以下:

  • 執行 shutdown() 後會將線程池的狀態置爲關閉狀態,這時將會中止接收新的任務同時會等待隊列中的任務所有執行完畢後才真正關閉線程池。
  • awaitTermination 會阻塞直到線程池全部任務執行完畢或者超時時間已到。

爲何要兩個 api 結合一塊兒使用呢?

主要還在最終的目的是:全部線程執行完畢後再作某件事情,也就是在線程執行完畢以前其實主線程是須要被阻塞的。

shutdown() 執行後並不會阻塞,會當即返回,全部才須要後續用循環不停的調用 awaitTermination(),由於這個 api 纔會阻塞線程。

其實咱們查看源碼會發現,ThreadPoolExecutor 中的阻塞依然也是等待通知機制的運用,只不過用的是 LockSupportAPI 而已。

帶有返回值的線程

接下來是帶有返回值的線程,這個需求也很是常見;好比須要線程異步計算某些數據而後獲得結果最終彙總使用。

先來看看如何使用(和 jdk 的相似):

首先任務是不能實現 Runnable 接口了,畢竟他的 run() 函數是沒有返回值的;因此咱們改實現一個 Callable 的接口:

這個接口有一個返回值。

同時在提交任務時也稍做改動:

首先是執行任務的函數由 execute() 換爲了 submit(),同時他會返回一個返回值 Future,經過它即可拿到線程執行的結果。

最後經過第二步將全部執行結果打印出來:

實現原理

再看具體實現以前先來思考下這樣的功能如何實現?

  • 首先受限於 jdk 的線程 api 的規範,要執行一個線程不論是實現接口仍是繼承類,最終都是執行的 run() 函數。
  • 因此咱們想要一個線程有返回值無非只能是在執行 run() 函數時去調用一個有返回值的方法,再將這個返回值存放起來用於後續使用。

好比咱們這裏新建了一個 Callable<T> 的接口:

public interface Callable<T> {

    /**
     * 執行任務
     * @return 執行結果
     */
    T call() ;
}

它的 call 函數就是剛纔提到的有返回值的方法,因此咱們應當在線程的 run() 函數中去調用它。

接着還會有一個 Future 的接口,他的主要做用是獲取線程的返回值,也就是 再將這個返回值存放起來用於後續使用 這裏提到的後續使用

既然有了接口那天然就得有它的實現 FutureTask,它實現了 Future 接口用於後續獲取返回值。

同時實現了 Runnable 接口會把本身變爲一個線程。

因此在它的 run() 函數中會調用剛纔提到的具備返回值的 call() 函數。


再次結合 submit() 提交任務和 get() 獲取返回值的源碼來看會更加理解這其中的門道。

/**
     * 有返回值
     *
     * @param callable
     * @param <T>
     * @return
     */
    public <T> Future<T> submit(Callable<T> callable) {
        FutureTask<T> future = new FutureTask(callable);
        execute(future);
        return future;
    }

submit() 很是簡單,將咱們丟進來的 Callable 對象轉換爲一個 FutureTask 對象,而後再調用以前的 execute() 來丟進線程池(後續的流程就和一個普通的線程進入線程池的流程同樣)。

FutureTask 自己也是線程,因此能夠直接使用 execute() 函數。

future.get() 函數中 future 對象因爲在 submit() 中返回的真正對象是 FutureTask,因此咱們直接看其中的源碼就好。

因爲 get() 在線程沒有返回以前是一個阻塞函數,最終也是經過 notify.wait() 使線程進入阻塞狀態來實現的。

而使其從 wait() 中返回的條件必然是在線程執行完畢拿到返回值的時候才進行喚醒。

也就是圖中的第二部分;一旦線程執行完畢(callable.call())就會喚醒 notify 對象,這樣 get 方法也就能返回了。


一樣的道理,ThreadPoolExecutor 中的原理也是相似,只不過它考慮的細節更多因此看起來很複雜,但精簡代碼後核心也就是這些。

甚至最終使用的 api 看起來都是相似的:

異常處理

最後一個是一些新手使用線程池很容易踩坑的一個地方:那就是異常處理。

好比相似於這樣的場景:

建立了只有一個線程的線程池,這個線程只作一件事,就是一直不停的 while 循環。

可是循環的過程當中不當心拋出了一個異常,巧的是這個異常又沒有被捕獲。你以爲後續會發生什麼事情呢?

是線程繼續運行?仍是線程池會退出?

經過現象來看其實哪一種都不是,線程既沒有繼續運行同時線程池也沒有退出,會一直卡在這裏。

當咱們 dump 線程快照會發現:

這時線程池中還有一個線程在運行,經過線程名稱會發現這是新建立的一個線程(以前是Thread-0,如今是 Thread-1)。

它的線程狀態爲 WAITING ,經過堆棧發現是卡在了 CustomThreadPool.java:272 處。

就是卡在了從隊列裏獲取任務的地方,因爲此時的任務隊列是空的,因此他會一直阻塞在這裏。

看到這裏,以前關注的朋友有沒有似曾相識的感受。

沒錯,我以前寫過兩篇:

線程池相關的問題,當時的討論也很是「激烈」,其實最終的緣由和這裏是如出一轍的。

因此就此次簡版的代碼來看看其中的問題:

如今又簡化了一版代碼我以爲以前還有疑問的朋友此次應該會更加明白。

其實在線程池內部會對線程的運行捕獲異常,但它並不會處理,只是用於標記是否執行成功;

一旦執行失敗則會回收掉當前異常的線程,而後從新建立一個新的 Worker 線程繼續從隊列裏取任務而後執行

因此最終纔會卡在從隊列中取任務處。

其實 ThreadPoolExecutor 的異常處理也是相似的,具體的源碼就很少分析了,在上面兩篇文章中已經說過幾回。

因此咱們在使用線程池時,其中的任務必定要作好異常處理。

總結

這一波下來我以爲線程池搞清楚沒啥問題了,總的來看它內部運用了很是多的多線程解決方案,好比:

  • ReentrantLock 重入鎖來保證線程寫入的併發安全。
  • 利用等待通知機制來實現線程間通訊(線程執行結果、等待線程池執行完畢等)。

最後也學會了:

  • 標準的線程池關閉流程。
  • 如何使用有返回值的線程。
  • 線程異常捕獲的重要性。

最後本文全部源碼(結合其中的測試代碼使用):

https://github.com/crossoverJie/JCSprout/blob/master/src/main/java/com/crossoverjie/concurrent/CustomThreadPool.java

你的點贊與分享是對我最大的支持

相關文章
相關標籤/搜索