FutureTask 源碼解析

站在使用者的角度,future是一個常常在多線程環境下使用的Runnable,使用它的好處有兩個:node

1. 線程執行結果帶有返回值編程

2. 提供了一個線程超時的功能,超過超時時間拋出異常後返回。多線程

那,怎麼實現future這種超時控制呢?來看看代碼:併發

1

FutureTask的實現只是依賴了一個內部類Sync實現的,Sync是AQS (AbstractQueuedSynchronizer)的子類,這個類承擔了全部future的功能,AbstractQueuedSynchronizer的做者是大名鼎鼎的併發編程大師Doug Lea,它的做用遠遠不止實現一個Future這麼簡單,後面在說。函數

下面,咱們從一個future提交到線程池開始,直到future超時或者執行結束來看看future都作了些什麼。怎麼作的。oop

首先,向線程池ThreadPoolExecutor提交一個future:ui

Image(1)

ThreadPoolExecutor將提交的任務用FutureTask包裝一下:this

Image(2)

Image(3)

而後嘗試將包裝後的Future用Thread類包裝下後啓動,spa

紅色標記的地方表示,噹噹前線程池的大小小於corePoolSize時,將任務提交,不然將該任務加入到workQueue中去,若是workQueue裝滿了,則嘗試在線程數小於MaxPoolSize的條件下提交該任務。.net

Image(4)

順便說明下,咱們使用線程池時,經常看到有關有界隊列,無界隊列做爲工做隊列的字眼:使用無界隊列時,線程池的大小永遠不大於corePoolSize,使用有界隊列時的maxPoolSize纔有效,緣由就在這裏,若是是

無界隊列,紅框中的add永遠爲true 下方的addIfUnderMaximumPoolSize怎麼也走不到了,也就不會有線程數量大於MaxPoolSize的狀況

言歸正傳,看看addIfUnderCorePoolSize 中作了什麼事:

new了一個Thread,將咱們提交的任務包裝下後就直接啓動了

Image(5)

咱們知道,線程的start方法會調用咱們runnable接口的run方法,所以不難猜想FutureTask也是實現了Runnable接口的
Image(6)

Image(7)

FutureTask的run()方法中是這麼寫:

Image(8)

innerRun方法先使用原子方式更改了一下本身的一個標誌位state(用於標示任務的執行狀況)

而後紅色框的方法 實現回調函數call的調用,而且將返回值做爲參數傳遞下去,放置在一個叫作result的泛型變量中,

而後future只管等待一段時間後去拿result這個變量的值就能夠了。   至於怎麼實現的「等待一段時間再去拿」 後面立刻說明。

Image(9)

innerSet在通過一系列的狀態判斷後,最終將V這個call方法返回的值賦值給了result

Image(10)

說到這裏,咱們知道,future是經過將call方法的返回值放在一個叫作result的變量中,通過一段時間的等待後再去拿出來返回就能夠了。

怎麼實現這個 「等一段時間」呢?

要從Sync的父類AbstractQueuedSynchronizer這個類提及:

咱們知道AbstractQueuedSynchronizer 後者的中文名字叫作 同步器,顧名思義,是用來控制資源佔用的一種方式。對於FutureTask來講,「資源」就是result,線程執行的結果。思路就是經過控制對result這個資源的訪問來決定是否須要立刻去取得result這個結果,當超時時間未到,或者線程未執行結束時,是不能去取result的。當線程正常執行結束後,一系列的標誌位會被修改,並告訴等待future執行結果的各個線程,能夠來獲取result了。

這裏會涉及到 獨佔鎖和共享鎖的概念。

獨佔鎖:同一時間只有一個線程獲取鎖。再有線程嘗試加鎖,將失敗。 典型例子 reentrantLock

共享鎖:同一時間能夠有多個線程獲取鎖。 典型例子,本例中的FutureTask

爲何說他們?由於Sync本質上就是想完成一個共享鎖的功能,因此Sync繼承了AbstractQueuedSynchronizer 因此Sync的方法使用的是AbstractQueuedSynchronizer的共享鎖的API

首先,咱們明白,future結束有兩種狀態:

     1. 線程正常執行完畢,通知等待結果的主線程對應於future.get()方法。

     2. 線程還未執行完畢,等待結果的主線程已經等不到了(超時),拋出一個TimeOutException後再也不等待。對應於future.get(long timeout, TimeUnit unit)

下面咱們依次看看對於這兩種狀態,咱們是怎麼處理的:

從上圖中能夠得知,線程在執行完畢後會將執行的結果放到result中, 紅色框中同時提到了releaseShared 方法,咱們從這裏進入AbstractQueuedSynchronizer

Image(11)

當result已經被賦值,或者FutureTask爲cancel狀態時,FutureTask會嘗試去釋放共享鎖(能夠同時有多個線程調用future.get() 方法,也就是會有多個線程在等待future執行結果,而furue在執行完畢後會依次喚醒各個線程)

若是嘗試成功,則開始真正的釋放鎖,這裏是AbstractQueuedSynchronizer 比較精妙的地方, 「嘗試」動做都定義爲抽象方法,交個各個子類去定義「嘗試成功的含義」 而真正的釋放則本身實現,這種複雜規則交個子類,流程交給本身的思路很值得借鑑

Image(12)

再看FutureTask的 「嘗試釋放」的規則:

沒啥好說,怎麼嘗試都成功

Image(13)

接着AbstractQueuedSynchronizer 開始了真正的釋放喚醒工做:

  
1 private void doReleaseShared() { 2 /* 3 * Ensure that a release propagates, even if there are other 4 * in-progress acquires/releases. This proceeds in the usual 5 * way of trying to unparkSuccessor of head if it needs 6 * signal. But if it does not, status is set to PROPAGATE to 7 * ensure that upon release, propagation continues. 8 * Additionally, we must loop in case a new node is added 9 * while we are doing this. Also, unlike other uses of 10 * unparkSuccessor, we need to know if CAS to reset status 11 * fails, if so rechecking. 12 */ 13 for (;;) { 14 Node h = head; // 把頭元素取出來,保持頭元素的引用,防止head被更改 15 if (h != null && h != tail) { 16 int ws = h.waitStatus; 17 if (ws == Node.SIGNAL) { // 若是狀態位爲:須要一個信號去喚醒 註釋原話:/** waitStatus value to indicate successor's thread needs unparking */ 18 if ( ! compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) // 修改狀態位 19 continue ; // loop to recheck cases 20 unparkSuccessor(h); // 若是修改爲功,則經過頭元素找到一個線程,而且喚醒它(喚醒動做是經過JNI方法去調用的) 21 } 22 else if (ws == 0 && 23 ! compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) 24 continue ; // loop on failed CAS 25 } 26 if (h == head) // loop if head changed 27 break ; 28 } 29 }

循環遍歷後,知道已經沒有結點須要喚醒則返回,依次return後,future的run方法執行完畢。

以上是針對future線程的,咱們知道,FutureTask已經將執行結果放在了result中,而且按等的前後順序依喚醒了等待隊列上的線程。

那,猜想future.get方法就不難了,對於帶超時的get方法:最大的可能性就是不斷的檢查future的一個狀態位,看它是否執行完畢,執行完則獲取

結果返回,不然,再阻塞本身一段時間。

對於不待超時的,就上來就先嚐試獲取結果,拿不到就阻塞本身,直到上述的innerSet方法喚醒它。

到底是不是這樣呢?一塊兒來看看:

Image(14)

Image(15)

由於innerGet(long nanosTimeout) 和innerGet()流程大體相同,因此咱們重點講解innerGet(long nanosTimeout) ,在惟一一個有區別的地方說明下便可。

以下圖所示,對於innerGet(long nanosTimeout) 方法,FutureTask採用的方法是直接加鎖或者每隔一段時間嘗試加鎖,若是成功,則返回true,則如上圖所示,直接返回result,主線程拿到執行結果。

不然,拋出超時異常。

Image(16)

對於tryAcquireShared 方法,比較簡單,直接看future是否執行完畢

Image(17)

若是沒有結束,則進入doAcquireSharedNanos方法:

  
1 private boolean doAcquireSharedNanos( int arg, long nanosTimeout) 2 throws InterruptedException { 3 4 long lastTime = System.nanoTime(); 5 final Node node = addWaiter(Node.SHARED); // 在隊列尾部增長一個結點,個人理解是,用來標明這個隊列是共享者隊列仍是獨佔隊列 6 try { 7 for (;;) { 8 final Node p = node.predecessor(); // 拿出剛纔新增結點的前一個結點:實際有效的隊尾結點。 9 if (p == head) { 10 int r = tryAcquireShared(arg); // 嘗試獲取鎖。 11 if (r >= 0 ) { // 12 setHeadAndPropagate(node, r); // 返回值大於1 對於FutureTask表明任務已經被cancel了,則更改隊列頭部結點。 13 p.next = null ; // help GC 將p結點脫離隊列,幫助GC 14 return true ; // 返回true後 上述中能夠知道當前線成會拋出超時異常 肯定下會不會喚醒其餘節點? 15 } 16 } 17 if (nanosTimeout <= 0 ) { // 若是設置的超時時間小於等於0 則取消獲取鎖 18 cancelAcquire(node); 19 return false ; 20 } 21 if (nanosTimeout > spinForTimeoutThreshold && // 等待的時間必須大於一個自旋鎖的週期時間 22 shouldParkAfterFailedAcquire(p, node)) // 遍歷隊列,找到須要沉睡的第一個節點 23 LockSupport.parkNanos( this , nanosTimeout); // 調用JNI方法,沉睡當前線程 24 long now = System.nanoTime(); 25 nanosTimeout -= now - lastTime; // 更新等待時間 循環遍歷 26 lastTime = now; 27 if (Thread.interrupted()) 28 break ; 29 } 30 } catch (RuntimeException ex) { 31 cancelAcquire(node); 32 throw ex; 33 } 34 // Arrive here only if interrupted 35 cancelAcquire(node); 36 throw new InterruptedException(); 37 } 38 39

這樣經過AQS的協做,全部調用future.get(long timeout, TimeUnit unit)的線程都會按順序等待,直到線成執行完被喚醒或者超時時間到 主動拋出異常。

總結

至此爲止FutureTask的解析已經基本結束了,能夠看到。它依靠AQS的共享鎖實現了對線程執行結果的訪問控制。和咱們一般意義上的訪問控制(併發訪問某個資源,獲取失敗時,沉睡本身等待喚醒或者超時後返回)基本是一致的,不外乎維護了一個等待資源的列表。將等待資源的線程經過鏈表的方式串了起來。

固然AQS的功能遠不只如此,它還提供了一套獨佔鎖的API,幫助使用者實現獨佔鎖的功能。

最經常使用的Reentrantlock就是使用這套API作的。

有機會的話再和你們分享下它的實現。

相關文章
相關標籤/搜索