獲取任務的執行結果

以前的兩篇文章中,咱們介紹了異步編程,也介紹了線程池的基本概念。也說了,線程池的實現天生也實現了異步任務,容許直接向線程池中進行任務的提交與結果獲取。java

可是,咱們始終沒有去深刻的瞭解下,異步任務框架對於任務執行的進度是如何監控的,任務執行的結果該如何獲取。git

那麼,本篇文章就來詳細地探討下異步框架中,關於任務執行過程當中的一些狀態以及執行結果反饋的相關細節。程序員

傳統的 Future 模式

咱們說過,異步編程的一個好處是:github

我只須要定義好任務,向 ExecutorService 中提交便可,而不用關心何時,什麼線程在執行咱們的任務。它會返回一個 Future 對象,咱們經過他了解當前任務的執行細節。編程

Future 接口中定義瞭如下一些方法:bash

public interface Future<V> {
    //取消執行當前任務
    boolean cancel(boolean mayInterruptIfRunning);
    //當前任務是否被取消了
    boolean isCancelled();
    //當前任務是否已經完成
    boolean isDone();
    //返回任務執行的返回結果,若是任務未完成
    //將阻塞在 Future 內部隊列上等待
    V get()
    //新增超時限制
    V get(long timeout, TimeUnit unit)
}
複製代碼

這五個方法,每個都很重要,爲咱們監控任務的執行提供有力的支持。而咱們的 ThreadPoolExecutor 使用的是 FutureTask 做爲 Future 的實現類。微信

而咱們也不妨看看這個 FutureTask 內部都有些哪些成員:多線程

任務執行狀態

state 和它可取的這些值共同描述了當前任務的執行狀態,是剛開始執行,仍是正在執行中,仍是正常結束,仍是異常結束,仍是被取消了,都由這個 state 來體現。框架

image

callable 表明當前正在執行的工做內容,這裏說一下爲何只有 Callable 類型的任務,由於全部的 Runnable 類型任務都會被事先轉換成 Callable 類型,我以爲主要是統一和抽象實現吧。異步

outcome 是任務執行結束的返回值,runner 是正在執行當前任務的線程,waiters 是一個簡單的單鏈表,維護的是全部在任務執行結束以前嘗試調用 get 方法獲取執行結果的線程集合。當任務執行結束自當喚醒隊列中全部的線程。

除此以外,還有一個稍顯重要的方法,就是 run 方法,這個方法會在任務開始時由 ExecutorService 調用,這是一個很核心的方法,雖然方法體有點長,可是邏輯簡單,咱們大致上歸納下。

image

  1. 若是任務已經開始將退出方法邏輯的執行
  2. 調度任務執行,調用 call 方法
  3. 調用成功將保存結果,異常則將保存異常信息
  4. 處理中斷

這裏須要額外去說一下,第三步中的 set 方法除了會將任務執行的返回結果設置到 FutureTask 的 outcome 字段上,還會調用 finishCompletion 方法完成任務的調用,嘗試喚醒全部在等待任務執行結果的線程。

其餘的方法就不去看了,也比較多,還算是簡單的,若是有所想法,也歡迎你和我探討交流。

那麼,咱們也來看一個最簡單的應用示例:

image

咱們向線程池提交了一個任務,這個任務的工做量不大,就是睡覺而後返回執行結果。而咱們能夠直接調用 get 方法去獲取任務執行的結果,不過 get 方法是阻塞式的,一旦任務還未執行結束,當前線程將丟失 CPU 進而被阻塞到 Future 的內部隊列上。

因此,推薦你們在 get 返回結果以前,先判斷下目標任務是否已經執行結束,進而避免當前線程的阻塞喚醒所帶來的代價。

到這裏,相信你也必定看出來了,FutureTask 實現的 Future 的弊端在 get 方法,這個方法非異步,若是沒有成功獲取到任務的執行結果就將直接阻塞當前線程,以等待任務的執行完成。

可是,有一種情境,當咱們向線程池中提交了不少任務,可是不清楚各個任務的執行效率,也就是不知道誰先執行結束,若是直接 get 某個未完成的任務,將致使當前線程阻塞等待。

那麼咱們能不能阻塞,直接獲取已經執行結束的任務 Future,而未完成的任務不容許獲取它的 Future?

使用 CompletionService

分析 CompletionService 以前,咱們搬出以前分析過的一張類圖:

image

左半邊的類咱們已經在前面的文章中都涉獵了,惟獨落下了 CompletionService 這個接口,咱們當時說之後會分析它的,如今咱們來看看這個接口會給咱們帶來哪些能力。

首先,從類的繼承體系上來看,CompletionService 並不與咱們的 Executor 產生任何直接關係,線程池的實現也沒有繼承該接口。

實際上來講,CompletionService 只是利用了 Executor 乃至線程池爲本身提供任務的提交與執行能力,而本身不過額外的維護一個隊列,保存着全部已經完成的任務的 Future,以致於咱們能夠直接在外部調用 take 方法直接獲取已完成的任務返回結果,無需阻塞。

廢話很少說,咱們寫個小 demo,或許你會有更直接的體驗:

==要求:使用多線程計算 1-10000 之間的總和==

==思路:分段計算,最後總和相加==

實現:

image

image

相信你運行後必定和我是一樣的答案:50005000

可能不少人會有疑問,這段代碼其實也沒什麼特別的地方啊,我使用基本的線程池不同也能實現嗎?

可是,實際上並無那麼簡單,由於你不能肯定哪一個任務完成了,哪一個尚未,因此你至少須要寫五個循環自旋等待。

而若是你的運氣很差,第一個任務特別慢,即使後續的任務已經結束了,主線程也依然因爲第一個任務的結果拿不到而阻塞,耽誤了對其餘已完成任務的返回結果處理。

乍一看,你可能以爲差異不大,但仔細分析了纔會發現,一旦任務量增大、增多,真的是「差之毫釐,謬以千里」。

其實,原理我也能夠帶你們一塊兒來看看,並不難:

先從你們最關心的 CompletionService 實現子類內部結構開始:

image

這裏,至少能夠看出來兩點,字段 executor 是一個任務調度器,completionQueue 是一個阻塞隊列。

也就是說,Completion 是徹底依賴外部傳入的 Executor 來實現任務的提交與執行的。而這個阻塞隊列 completionQueue 就是保存的全部已經完成的任務 Future 對象。

除此以外,ExecutorCompletionService 還自定義了一個內部類 QueueingFuture,重寫了 FutureTask 的 done 方法。

可能你們對這個 done 沒什麼印象,可是還記得咱們說過的 finishCompletion 方法嗎?

FutureTask 抽象的描述了一個任務,當線程啓動後將調用 FutureTask 內部的 run 方法執行任務的核心邏輯,並在執行的最後調用 finishCompletion 喚醒全部阻塞在本身隊列上等待返回結果的線程。

而其中 finishCompletion 方法在結束前,會調用一個 done 方法,這個 done 方法在 FutureTask 中是空實現,沒有任何的代碼實現,表示並無什麼用。

可是咱們的 QueueingFuture 充分利用這一點,重寫了 done 方法,而邏輯就是將已結束的任務添加到咱們在外部維護的一個新隊列 completionQueue 中,供外部獲取調用。

這些就是 CompletionService 的祕密。


關注公衆不迷路,一個愛分享的程序員。
公衆號回覆「1024」加做者微信一塊兒探討學習!
每篇文章用到的全部案例代碼素材都會上傳我我的 github
github.com/SingleYam/o…
歡迎來踩!

YangAM 公衆號
相關文章
相關標籤/搜索