轉載:AbstractQueuedSynchronizer的實現分析(下)

http://www.infoq.com/cn/articles/java8-abstractqueuedsynchronizerjava

前言

通過本系列的上半部分JDK1.8 AbstractQueuedSynchronizer的實現分析(上)的解讀,相信不少讀者已經對AbstractQueuedSynchronizer(下文簡稱AQS)的獨佔功能瞭然於胸,那麼此次咱們經過對另外一個工具類:CountDownLatch的分析來解讀AQS的另一個功能:共享功能。node

AQS共享功能的實現

在開始解讀AQS的共享功能前,咱們再重溫一下CountDownLatch,CountDownLatch爲java.util.concurrent包下的計數器工具類,常被用在多線程環境下,它在初始時須要指定一個計數器的大小,而後可被多個線程併發的實現減1操做,並在計數器爲0後調用await方法的線程被喚醒,從而實現多線程間的協做。它在多線程環境下的基本使用方式爲:安全

//main thread
      // 新建一個CountDownLatch,並指制定一個初始大小
      CountDownLatch countDownLatch = new CountDownLatch(3);
      // 調用await方法後,main線程將阻塞在這裏,直到countDownLatch 中的計數爲0 
      countDownLatch.await();
      System.out.println("over");

     //thread1
     // do something 
     //...........
     //調用countDown方法,將計數減1
      countDownLatch.countDown();


     //thread2
     // do something 
     //...........
     //調用countDown方法,將計數減1
      countDownLatch.countDown();

       //thread3
     // do something 
     //...........
     //調用countDown方法,將計數減1
      countDownLatch.countDown();

相關廠商內容數據結構

關於紅包、SSD雲盤等核心技術集錦!

跟技術大牛,侃侃容器那些事兒!

看明略徐安華如何談天然語言處理

如何經過使用 AWS對IT資源實現高級別管控,並大規模實現更高級別的安全性?

【雙11】蘑菇街美麗說融合幕後,PHP VS Java誰勝誰負?

注意,線程thread 1,2,3各自調用 countDown後,countDownLatch 的計數爲0,await方法返回,控制檯輸入「over」,在此以前main thread 會一直沉睡。多線程

能夠看到CountDownLatch的做用相似於一個「欄柵」,在CountDownLatch的計數爲0前,調用await方法的線程將一直阻塞,直到CountDownLatch計數爲0,await方法纔會返回,併發

而CountDownLatch的countDown()方法則通常由各個線程調用,實現CountDownLatch計數的減1。工具

知道了CountDownLatch的基本使用方式,咱們就從上述DEMO的第一行new CountDownLatch(3)開始,看看CountDownLatch是怎麼實現的。oop

首先,看下CountDownLatch的構造方法:ui

和ReentrantLock相似,CountDownLatch內部也有一個叫作Sync的內部類,一樣也是用它繼承了AQS。spa

再看下Sync:

若是你看過本系列的上半部分,你對setState方法必定不會陌生,它是AQS的一個「狀態位」,在不一樣的場景下,表明不一樣的含義,好比在ReentrantLock中,表示加鎖的次數,在CountDownLatch中,則表示CountDownLatch的計數器的初始大小。

設置完計數器大小後CountDownLatch的構造方法返回,下面咱們再看下CountDownLatch的await()方法:

調用了Sync的acquireSharedInterruptibly方法,由於Sync是AQS子類的緣由,這裏實際上是直接調用了AQS的acquireSharedInterruptibly方法:

從方法名上看,這個方法的調用是響應線程的打斷的,因此在前兩行會檢查下線程是否被打斷。接着,嘗試着獲取共享鎖,小於0,表示獲取失敗,經過本系列的上半部分的解讀, 咱們知道AQS在獲取鎖的思路是,先嚐試直接獲取鎖,若是失敗會將當前線程放在隊列中,按照FIFO的原則等待鎖。而對於共享鎖也是這個思路,若是和獨佔鎖一致,這裏的tryAcquireShared應該是個空方法,留給子類去判斷:

再看看CountDownLatch:

若是state變成0了,則返回1,表示獲取成功,不然返回-1則表示獲取失敗。

看到這裏,讀者可能會發現, await方法的獲取方式更像是在獲取一個獨佔鎖,那爲何這裏還會用tryAcquireShared呢?

回想下CountDownLatch的await方法是否是隻能在主線程中調用,是否認的,CountDownLatch的await方法能夠在多個線程中調用,當CountDownLatch的計數器爲0後,調用await的方法都會依次返回。 也就是說能夠多個線程同時在等待await方法返回,因此它被設計成了實現tryAcquireShared方法,獲取的是一個共享鎖,鎖在全部調用await方法的線程間共享,因此叫共享鎖。

回到acquireSharedInterruptibly方法:

若是獲取共享鎖失敗(返回了-1,說明state不爲0,也就是CountDownLatch的計數器還不爲0),進入調用doAcquireSharedInterruptibly方法中,按照咱們上述的猜測,應該是要將當前線程放入到隊列中去。

在這以前,咱們再回顧一下AQS隊列的數據結構:AQS是一個雙向鏈表,經過節點中的next,pre變量分別指向當前節點後一個節點和前一個節點。其中,每一個節點中都包含了一個線程和一個類型變量:表示當前節點是獨佔節點仍是共享節點,頭節點中的線程爲正在佔有鎖的線程,然後的全部節點的線程表示爲正在等待獲取鎖的線程。以下圖所示:

黃色節點爲頭節點,表示正在獲取鎖的節點,剩下的藍色節點(Node一、Node二、Node3)爲正在等待鎖的節點,他們經過各自的next、pre變量分別指向先後節點,造成了AQS中的雙向鏈表。每一個線程被加上類型(共享仍是獨佔)後即是一個Node, 也就是本文中說的節點。

再看看doAcquireSharedInterruptibly方法:

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED); 
//將當前線程包裝爲類型爲Node.SHARED的節點,標示這是一個共享節點。
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
//若是新建節點的前一個節點,就是Head,說明當前節點是AQS隊列中等待獲取鎖的第一個節點,
//按照FIFO的原則,能夠直接嘗試獲取鎖。
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r); 
//獲取成功,須要將當前節點設置爲AQS隊列中的第一個節點,這是AQS的規則//隊列的頭節點表示正在獲取鎖的節點
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) && //檢查下是否須要將當前節點掛起
                    parkAndCheckInterrupt()) 
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

這裏有幾點須要說明的:

1. setHeadAndPropagate方法:

首先,使用了CAS更換了頭節點,而後,將當前節點的下一個節點取出來,若是一樣是「shared」類型的,再作一個"releaseShared"操做。

看下doReleaseShared方法:

for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) { 
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 
//若是當前節點是SIGNAL意味着,它正在等待一個信號,  
//或者說,它在等待被喚醒,所以作兩件事,1是重置waitStatus標誌位,2是重置成功後,喚醒下一個節點。
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))  
//若是自己頭節點的waitStatus是出於重置狀態(waitStatus==0)的,將其設置爲「傳播」狀態。
//意味着須要將狀態向後一個節點傳播。
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }

爲何要這麼作呢?這就是共享功能和獨佔功能最不同的地方,對於獨佔功能來講,有且只有一個線程(一般只對應一個節點,拿ReentantLock舉例,若是當前持有鎖的線程重複調用lock()方法,那根據本系列上半部分咱們的介紹,咱們知道,會被包裝成多個節點在AQS的隊列中,因此用一個線程來描述更準確),可以獲取鎖,可是對於共享功能來講。

共享的狀態是能夠被共享的,也就是意味着其餘AQS隊列中的其餘節點也應能第一時間知道狀態的變化。所以,一個節點獲取到共享狀態流程圖是這樣的:

好比如今有以下隊列:

當Node1調用tryAcquireShared成功後,更換了頭節點:

     Node1變成了頭節點而後調用unparkSuccessor()方法喚醒了Node二、Node2中持有的線程A出於上面流程圖的park node的位置,

線程A被喚醒後,重複黃色線條的流程,從新檢查調用tryAcquireShared方法,看可否成功,若是成功,則又更改頭節點,重複以上步驟,以實現節點自身獲取共享鎖成功後,喚醒下一個共享類型節點的操做,實現共享狀態的向後傳遞。

2.其實對於doAcquireShared方法,AQS還提供了集中相似的實現:

分別對應了:

  1. 帶參數請求共享鎖。 (忽略中斷)
  2. 帶參數請求共享鎖,且響應中斷。(每次循環時,會檢查當前線程的中斷狀態,以實現對線程中斷的響應)
  3. 帶參數請求共享鎖可是限制等待時間。(第二個參數設置超時時間,超出時間後,方法返回。)

比較特別的爲最後一個doAcquireSharedNanos方法,咱們一塊兒看下它怎麼實現超時時間的控制的。

由於該方法和其他獲取共享鎖的方法邏輯是相似的,我用紅色框圈出了它所不同的地方,也就是實現超時時間控制的地方。

能夠看到,其實就是在進入方法時,計算出了一個「deadline」,每次循環的時候用當前時間和「deadline」比較,大於「dealine」說明超時時間已到,直接返回方法。

注意,最後一個紅框中的這行代碼:

nanosTimeout > spinForTimeoutThreshold

從變量的字面意思可知,這是拿超時時間和超時自旋的最小做比較,在這裏Doug Lea把超時自旋的閾值設置成了1000ns,即只有超時時間大於1000ns纔會去掛起線程,不然,再次循環,以實現「自旋」操做。這是「自旋」在AQS中的應用之處。

看完await方法,咱們再來看下countDown()方法:

調用了AQS的releaseShared方法,並傳入了參數1:

一樣先嚐試去釋放鎖,tryReleaseShared一樣爲空方法,留給子類本身去實現,如下是CountDownLatch的內部類Sync的實現:

死循環更新state的值,實現state的減1操做,之因此用死循環是爲了確保state值的更新成功。

從上文的分析中可知,若是state的值爲0,在CountDownLatch中意味:全部的子線程已經執行完畢,這個時候能夠喚醒調用await()方法的線程了,而這些線程正在AQS的隊列中,並被掛起的,

因此下一步應該去喚醒AQS隊列中的頭節點了(AQS的隊列爲FIFO隊列),而後由頭節點去依次喚醒AQS隊列中的其餘共享節點。

若是tryReleaseShared返回true,進入doReleaseShared()方法:

private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) { 
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 
//若是當前節點是SIGNAL意味着,它正在等待一個信號,
 //或者說,它在等待被喚醒,所以作兩件事,1是重置waitStatus標誌位,2是重置成功後,喚醒下一個節點。
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))  
//若是自己頭節點的waitStatus是出於重置狀態(waitStatus==0)的,將其設置爲「傳播」狀態。
//意味着須要將狀態向後一個節點傳播。
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
  }

當線程被喚醒後,會從新嘗試獲取共享鎖,而對於CountDownLatch線程獲取共享鎖判斷依據是state是否爲0,而這個時候顯然state已經變成了0,所以能夠順利獲取共享鎖而且依次喚醒AQS隊裏中後面的節點及對應的線程。

總結

本文從CountDownLatch入手,深刻分析了AQS關於共享鎖方面的實現方式:

若是獲取共享鎖失敗後,將請求共享鎖的線程封裝成Node對象放入AQS的隊列中,並掛起Node對象對應的線程,實現請求鎖線程的等待操做。待共享鎖能夠被獲取後,從頭節點開始,依次喚醒頭節點及其之後的全部共享類型的節點。實現共享狀態的傳播。

這裏有幾點值得注意:

  1. 與AQS的獨佔功能同樣,共享鎖是否能夠被獲取的判斷爲空方法,交由子類去實現。
  2. 與AQS的獨佔功能不一樣,當鎖被頭節點獲取後,獨佔功能是隻有頭節點獲取鎖,其他節點的線程繼續沉睡,等待鎖被釋放後,纔會喚醒下一個節點的線程,而共享功能是隻要頭節點獲取鎖成功,就在喚醒自身節點對應的線程的同時,繼續喚醒AQS隊列中的下一個節點的線程,每一個節點在喚醒自身的同時還會喚醒下一個節點對應的線程,以實現共享狀態的「向後傳播」,從而實現共享功能。

以上的分析都是從AQS子類的角度去看待AQS的部分功能的,而若是直接看待AQS,或許能夠這麼去解讀:

首先,AQS並不關心「是什麼鎖」,對於AQS來講它只是實現了一系列的用於判斷「資源」是否能夠訪問的API,而且封裝了在「訪問資源」受限時將請求訪問的線程的加入隊列、掛起、喚醒等操做, AQS只關心「資源不能夠訪問時,怎麼處理?」、「資源是能夠被同時訪問,仍是在同一時間只能被一個線程訪問?」、「若是有線程等不及資源了,怎麼從AQS的隊列中退出?」等一系列圍繞資源訪問的問題,而至於「資源是否能夠被訪問?」這個問題則交給AQS的子類去實現。

當AQS的子類是實現獨佔功能時,例如ReentrantLock,「資源是否能夠被訪問」被定義爲只要AQS的state變量不爲0,而且持有鎖的線程不是當前線程,則表明資源不能訪問。

當AQS的子類是實現共享功能時,例如:CountDownLatch,「資源是否能夠被訪問」被定義爲只要AQS的state變量不爲0,說明資源不能訪問。

這是典型的將規則和操做分開的設計思路:規則子類定義,操做邏輯由於具備公用性,放在父類中去封裝。

固然,正式由於AQS只是關心「資源在什麼條件下可被訪問」,因此子類還能夠同時使用AQS的共享功能和獨佔功能的API以實現更爲複雜的功能。

好比:ReentrantReadWriteLock,咱們知道ReentrantReadWriteLock的中也有一個叫Sync的內部類繼承了AQS,而AQS的隊列能夠同時存放共享鎖和獨佔鎖,對於ReentrantReadWriteLock來講分別表明讀鎖和寫鎖,當隊列中的頭節點爲讀鎖時,表明讀操做能夠執行,而寫操做不能執行,所以請求寫操做的線程會被掛起,當讀操做依次推出後,寫鎖成爲頭節點,請求寫操做的線程被喚醒,能夠執行寫操做,而此時的讀請求將被封裝成Node放入AQS的隊列中。如此往復,實現讀寫鎖的讀寫交替進行。

而本系列文章上半部分提到的FutureTask,其實思路也是:封裝一個存放線程執行結果的變量A,使用AQS的獨佔API實現線程對變量A的獨佔訪問,判斷規則是,線程沒有執行完畢:call()方法沒有返回前,不能訪問變量A,或者是超時時間沒到前不能訪問變量A(這就是FutureTask的get方法能夠實現獲取線程執行結果時,設置超時時間的緣由)。

綜上所述,本系列文章從AQS獨佔鎖和共享鎖兩個方面深刻分析了AQS的實現方式和獨特的設計思路,但願對讀者有啓發,下一篇文章,咱們將繼續JDK 1.8下 J.U.C (java.util.concurrent)包中的其餘工具類,敬請期待。

感謝郭蕾對本文的策劃和審校。

給InfoQ中文站投稿或者參與內容翻譯工做,請郵件至editors@cn.infoq.com。也歡迎你們經過新浪微博(@InfoQ)或者騰訊微博(@InfoQ)關注咱們,並與咱們的編輯和其餘讀者朋友交流。

相關文章
相關標籤/搜索