初步瞭解AQS是什麼(三)

前言

  1. 第一篇文章經過ReentranctLock的公平鎖介紹了AQS的核心部分,第二篇文章經過Condition分析了獨佔鎖是的具體的工做機制,還有公平鎖和非公平鎖的區別,也分析了中斷機制在AQS的實現
  2. 本篇文章主要了解的是AQS的共享模式,有些內容須要前面兩篇文章的內容,因此閱讀以前能夠先閱讀以前的文章,這樣效果更佳!
  3. 文章連接:<初步瞭解什麼是AQS(一)><初步瞭解什麼是AQS(二)>
  4. 本文章僅供我的學習用,但願你們能夠互幫互助!

共享鎖和獨佔鎖的區別

獨佔鎖

  1. 獨佔的,排他的
  2. 當某個線程擁有獨佔鎖,其餘線程只能等待這個線程釋放這個獨佔鎖才能去爭取,而且同一時刻只能有一個線程能夠爭取獨佔鎖成功

共享鎖

  1. 鎖是共享的,能夠被多個線程同時擁有
  2. 若是一個線程擁有了這個鎖,那麼其餘等待這個共享鎖的線程能夠去嘗試獲取鎖,而且有很大概率獲取成功

方法區別

獨佔鎖 共享鎖
tryAcquire(int arg) tryAcquireShared(int arg)
tryAcquireNanos(int arg, long nanosTimeout) tryAcquireSharedNanos(int arg, long nanosTimeout)
acquire(int arg) acquireShared(int arg)
acquireQueued(final Node node, int arg) doAcquireShared
acquireInterruptibly(int arg) acquireSharedInterruptibly(int arg)
doAcquireNanos(int arg, long nanosTimeout) doAcquireSharedNanos(int arg, long nanosTimeout)
release(int arg) releaseShared(int arg)
tryRelease(int arg) tryReleaseShared(int arg)
- doReleaseShared()

由表格咱們知道,除了最後一個doReleased()是共享鎖獨有以外,其餘的方法獨佔鎖和共享鎖基本都是一一對應的。html

在獨佔鎖中,release方法中嘗試釋放鎖,若是成功就unparkSuccessor(h)java

在共享鎖中,releaseShared方法中嘗試釋放鎖,若是成功就doReleaseShared()node

因此通常來講unparkSuccessor(h)doReleaseShared()通常是互相對應的,可是doReleaseShare()要執行的邏輯比前者多。這點咱們到後面再看!app

這裏提一下:ide

咱們以前說過函數

  • 在獨佔模式中,當一個線程獲取鎖以後,只有釋放鎖以後纔會去喚醒下一個線程。oop

  • 在共享模式中,若是一個線程成功獲取了共享鎖,他就會立刻喚醒下一個等待的線程,並不須要該擁有鎖的線程釋放鎖才喚醒下一個線程,由於共享鎖的設定就是爲了讓多個線程同時擁有所資源的!這裏有兩種狀況會喚醒後面等待的線程 1. 當前線程成功擁有鎖 , 2. 擁有鎖的線程釋放鎖 . 但願讀者記住這兩點,這對於後面咱們分析源碼的時候有指導性的做用。源碼分析

前面咱們先說了獨佔鎖和共享鎖的區別,是爲了讓讀者更好地去區分它們。下面咱們就經過分析CountDownLatch的源碼,去體會AQS中共享模式是如何工做的。post

CountDownLatch

說明

CountDownLatch類是典型的AQS的共享模式使用,而且是一個高頻使用的類。學習

關於CountDownLatch的用法,咱們就不一一在這裏贅述。若是有不瞭解的讀者,能夠先去百度一下再往下看源碼分析,這樣效果會更佳。

這裏推薦一篇關於怎麼用的博客,我的看的時候以爲比較通俗易懂

www.cnblogs.com/cuglkb/p/85…


源碼分析

構造方法

須要傳入一個不小於0的數,其實也就是設置AQS的共享資源

public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }


//----------Sync是CountDownLatch的內部類
/** 咱們這裏說一下,咱們看到Sync就是繼承了AQS這個抽象類,而後重寫了tryAcquireShared和 tryReleaseShared的方法,也就是本身加了個getCount()方法,因此後面看到Sync類調用的方法,除了重寫那兩個,其餘的基本都是父類的方法,讀者要時刻記住這一點! **/
  private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count); //AQS
        }

        int getCount() {
            return getState();
        }

      	//重寫AQS的方法
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
		//重寫AQS的方法
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
          
複製代碼

咱們能夠看到,在構造方法裏面咱們就設置了AQS的state值。

全部調用了await()方法的線程就會等待掛起,而後另外的線程就會執行state = state -1 的操做。

當state的值爲0的時候,那麼將state減爲0的線程就會立刻喚醒以前調用了await()的線程,而後這些線程就會去獲取共享鎖啦!

在咱們知道CountDownLatch的用法以後,咱們須要注意它的兩個方法,一個是CountDown(),另一個是await()方法

countDown()每次會把state的值減1,直到state的值變爲0,喚醒調用await()的線程

await()顧名思義,調用這個方法的線程會阻塞,直到被喚醒。

上面說的但願讀者能夠認真理解一下,理解到位了看下面的源碼纔不會半知半懂

示例代碼

咱們經過下面的源碼,來分析countDownawait的源碼

public static void main(String[] args) {
       CountDownLatch latch = new CountDownLatch(2);
    //t1和t2負責countDown,也就是將state減1
       Thread t1 = new Thread(new Runnable() {
           @Override
           public void run() {
               try {
                   Thread.sleep(5000);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
               latch.countDown();
           }
       },"t1");

       Thread t2 = new Thread(new Runnable() {
           @Override
           public void run() {
               try {
                   Thread.sleep(7000);

               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
               latch.countDown();
           }
       },"t2");

        t1.start();
        t2.start();

    // t3 和 t4負責await,等待state爲0而後被喚醒
        Thread t3 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    latch.await();
                    System.out.println(Thread.currentThread().getName() + "從await回來了");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }


            }
        },"t3");

        Thread t4 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    latch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "從await回來了");
            }
        },"t4");

        t3.start();
        t4.start();
    }
複製代碼

結果(t3和t4不是按照絕對的順序輸出的,後面的分析咱們按照t3先入隊)


下面咱們就按照步驟來

latch先await()阻塞,等待被喚醒(等待state變爲0),而後await()返回去作其餘事情!

await源碼

public void await() throws InterruptedException {
     	//調用sync的方法,關於Sync咱們前面已經說過,這裏就很少贅述了
        sync.acquireSharedInterruptibly(1);
    }


public final void acquireSharedInterruptibly(int arg) //AQS throws InterruptedException {
    	//這個方法是響應中斷的有中斷就拋出異常唄
        if (Thread.interrupted())
            throw new InterruptedException();
    	//t3和t4到這裏的時候,確定是小於0的(此時state = 2)
    	//因此先看看tryAcquireShared方法
    	if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }


protected int tryAcquireShared(int acquires) { //重寫AQS的方法
        //只有state>0的時候纔會返回1
    	return (getState() == 0) ? 1 : -1;
        }

//從這個方法名咱們知道,這個方法是獲取共享鎖而且是響應中斷的
private void doAcquireSharedInterruptibly(int arg) //AQS throws InterruptedException {
    
    	//步驟1. 將當前節點設置爲共享模式,而後加入阻塞隊列!
    	//關於這些知識,在第一篇和第二篇文章就說過啦,不懂的回去看看吧!
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    //這樣也是同樣,只要state > 0,就返回-1,(此時state = 2)
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //步驟2. 由於是剛開始,因此不會進到前面那個if語句,因此會直接來到這
                //這裏咱們都很熟悉了,就是在阻塞隊列找到一個地方讓這個線程能夠掛起!
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
複製代碼

咱們把上面的步驟用圖來描述一下你們就清楚啦!

t3通過步驟1以後,也就是入隊以後,會變成以下圖所示

而後tryAcquiredShared會返回-1,那麼就執行shouldParkAfterFailedAcquire就會將t3的pre指向的節點的WaitStatus(下面統稱爲ws)置爲-1,以下圖所示。(t3的ws爲-1)

執行完shouldParkAfterFailedAcquire以後執行parkAndCheckInterrupt就會把t3掛起啦!

而後後面t4進來的時候和t3是同樣的步驟,到後面就是t4加入阻塞隊列,而後把t3的ws置爲-1,結果以下圖所示。(t4的ws爲-1)

接着,t4就就被掛起啦,如今t3和t4就已經全掛起了,就等待着被喚醒啦!


這裏說明一下,由於能夠是由多個線程調用countDown的,那麼有些線程調用的時候state還不是0,因此這些線程不會喚醒await的線程,只有當某個線程調用了countDown方法,而後state從1變爲0了,那麼這個線程就喚醒其餘await的線程!明白這點很重要。

接下來咱們繼續看countDown方法

countDown源碼

public void countDown() {
        sync.releaseShared(1);
    }

public final boolean releaseShared(int arg) { // AQS
    	//只有將state設置爲0的時候纔會返回true,不然只是每次將state-1而已
        if (tryReleaseShared(arg)) {
            //到這裏說明已經成功把state置爲0了,那麼就開始喚醒如今還在等待的線程啦!
            doReleaseShared();
            return true;
        }
    	//state原本就是0的話,確定釋放不了,就返回false啦
        return false;
    }

protected boolean tryReleaseShared(int releases) {// 重寫AQS
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0) 
                    return false;
                int nextc = c-1;
                //每次都是CAS將state-1
                if (compareAndSetState(c, nextc))
                    //若是state減1成功變爲0的話,確定就返回true啦!
                    return nextc == 0;
            }
        }


複製代碼

下面咱們來着重分析下doReleaseShared()這個方法

//此時state爲0,先把流程走一遍先,先看註釋便可,其餘先不看
private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //t3入隊的時候,已經把head節點設置爲-1啦,因此會到if語句裏面
                if (ws == Node.SIGNAL) {
                    //將head設置爲0,也就是恢復到初始狀態
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //到這裏就是喚醒head節點的下一個節點,這時候也就是喚醒t3
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
複製代碼

在t3被喚醒以後,咱們回到t3被喚醒的地方和接下來要作的事情

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {//此時head就是t3的前綴啦,因此能夠進來這裏
                    int r = tryAcquireShared(arg);
                    //這裏r > 0啦,由於以前說過state == 0的時候纔會返回1
                    if (r >= 0) {
                        //那麼此時t3就走到這一步了,下面咱們看下這個方法
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //步驟1:t3被喚醒以後就會從這裏繼續執行啦,假設沒有中斷的狀況,那麼是不會進到if語句裏面的啦
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
複製代碼

此時t3會進入setHeadAndPropagate這個方法

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node); //步驟1:t3先把本身設置爲頭結點先
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                //基本都會執行這個函數,那麼這裏也就說明在共享模式中,當節點被喚醒
                //的時候都會執行這個函數,也就是會喚醒下一個節點
                //那麼這裏就是說t3把本身設置爲頭部以後,就會立刻去喚醒t4去搶鎖啦!
                doReleaseShared(); 
        }
    }
複製代碼

那麼在這裏,咱們就好好分析一下doReleaseShared()這個方法了,因此在這裏咱們知道,t3已是head節點了

//調用這個方法的時候咱們已經知道此事state爲0了
private void doReleaseShared() {
        for (;;) {
            Node h = head;
            /** h == null 說明阻塞隊列已經爲空 h == tail 說明阻塞隊列剛被初始化的頭結點 還有一種狀況就是 以前是普通的節點,可是此節點已是頭結點,那麼就是說該節點是已經被喚醒了, 後面阻塞隊列已經沒有節點了 因此上面兩種狀況都不須要被喚醒 */
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //ws == -1,由於以前t4已經把t3設置爲-1(SIGNAL)啦
                if (ws == Node.SIGNAL) {
                    //問題1:這裏若是將節點設置爲初始化狀態失敗的緣由待會解釋!
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //到達這裏,就說明t3的ws已經設置爲0,而後t3就去喚醒t4啦!
                    //那麼t4接下來被喚醒,就像t3被喚醒後作的事情差很少是如出一轍,
                    //因此想分析t4接下來要幹什麼,看上面t3被喚醒以後作了什麼就行了
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                   	//問題2:
                    //在咱們舉的例子中,t3是確定不會來到這裏的,這裏主要判斷的是
                    //若是有節點加入進來,可是這個節點準備到這裏的時候把它的前驅節點
                    //就設置爲-1了,關於這裏,咱們下面說明解釋。
                    continue;                // loop on failed CAS
            }
            //此時h指向的仍是t3那個節點,可是head節點可能已經變了
            //若是是同一個節點,退出循環,讓被喚醒的線程去喚醒後面的線程。
            if (h == head)                   // loop if head changed
                break;
        }
    }
複製代碼

下面咱們來看一張圖,就能夠解釋註釋的內容了(但願你們能夠好好看看這張圖,方便理解後面的解釋)

接着咱們來解釋一些內容

問題1

就是第一個CAS爲何會失敗的,咱們看到'喚醒cur的next節點'的流程,好比說t3喚醒t4以後,t3和t4此時就同時進行執行各自的代碼了(這裏咱們把t3和cur對應,t4和next對應),因此在next被喚醒以後,就會把next節點設置爲head節點,咱們以前也說過,在setHeadAndPropagate裏面有doReleased方法,因此會有下面的狀況

  • cur節點喚醒next節點以後,cur因爲某種緣由阻塞了,next節點成功把本身設置爲head節點,執行後面的操做,可是還沒到第一個CAS.此時cur又繼續往下走,判斷此時的h已經不是執行head節點了,那麼就會指向新的head,也就是此時的next節點.而後cur也繼續往下走.因此如今cur和next這兩個節點都是同時執行doReleased方法的,因此也會有可能同時到達第一個CAS,因此此時確定只有一個線程CAS操做成功啦,另一個不成功的就根據狀況往下走了!這裏舉幾個例子。

    好比說有t3->t4->t5->t6>......tn

    (1)把t3和t4分別對應前面說的那個cur和next,那麼此時假設是t3喚醒t5,但t5尚未把本身設置爲head節點,那麼head節點仍然t4。又由於t4以前CAS是失敗的,因此會自旋一次,而後執行最後一個if語句的時候,會直接退出啦!t3執行最後一個if的時候,因爲t5尚未把本身設置爲head,因此此時t3也退出啦。那麼就說明t5之後的節點沒人喚醒了嗎?非也,由於t5成功設置本身爲頭結點的話,也會執行doReleased()方法啦!就由t5和它的後繼去喚醒吧!

    在(1)的中,若是t3喚醒t5以後,t5也成功設置本身爲頭結點了,那麼t5此時會可能會喚醒後面的,這裏先不考慮。而後t3和t4又指向新的head節點啦。這種狀況又開始循環啦。

    這裏說明一下,t3和t4之中只能有一個CAS成功,另一個就continue再次自旋啦,因此是t4喚醒t5,那麼t3就像(1)說的t4同樣啦。

咱們看到在,在setHeadAndPropagate裏面有doReleased方法,讓後面的節點能夠更快地喚醒,增大喚醒的吞吐量,不得不說設計者的厲害之處啊!

問題2:

在解釋問題2以前,咱們經過下面的步驟慢慢分析

假設這是某一狀況阻塞隊列的狀況

  1. 而後t3被喚醒以後,此時阻塞隊列可能沒有節點了(阻塞隊列不包含head節點)

  1. 咱們在流程圖裏面說過,可能以前await的節點可能在addWaiter以前阻塞了,這時候可能又不阻塞了,而後執行了addWaiter方法,可是沒有執行shouldParkAfterFailedAcquire,也就是t3沒有把t2的ws設置爲SIGNAL,因此就可能在執行if (h != null && h != tail)以前會有以下圖所示

  1. doReleased函數中,咱們已經把ws賦值爲t3的ws,也就是ws爲0,那麼在2的前提下,咱們來到這段代碼else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)),咱們很容易知道,ws爲0,那麼此次CAS有可能失敗,爲何呢?在2 咱們說過shouldParkAfterFailedAcquire可能還沒執行,那麼也有可能CAS以前,就已經執行了這個函數,也就是此時t3的ws爲-1了,那麼此時確定CAS失敗啦!若是失敗的話,就讓t3再回去檢查下狀態,看看是否能夠喚醒後繼節點啦,因此在else if以後就是讓它continue啦!

  2. 若是CAS成功,那麼就是說shouldParkAfterFailedAcquire這個函數尚未執行,因此在執行最後一個if語句的時候t3可能退出啦,那麼t4繼續執行shouldParkAfterFailedAcquire,CAS把t3設置爲-1以後,就去執行到setHeadAndPropagate這個方法,那麼也會執行doRelease,t4的使命就結束了!

通過上面4點的說明,咱們對問題2的分析就已經很清楚啦!

擴展

在解決問題2的時候,咱們若是把else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))代碼去掉的話,會有什麼影響麼?分析一下!

假如沒有這段代碼,假設此時阻塞隊列以下所示

假設這是t1喚醒t2,但t2還不是head節點,那麼在t4入隊的時候,由於咱們假設沒有那個else if語句嘛,因此會執行shouldParkAfterFailedAcquire,那麼將t2的wsCAS爲-1,此時head可能已經指向t2了,前面咱們也說過,在共享模式中,只要一個節點成爲頭結點,就會執行doReleased方法,因此在t2設置爲head節點的時候,可能在t4休眠以前就被t2park了,可是這個操做時能夠容許的,當咱們unpark一個並無被park的線程時,該線程在下一次調用park方法時就不會被掛起,而這一行爲是符合咱們的場景的——由於當前的共享鎖處於可獲取的狀態,後繼的線程應該直接來獲取鎖,不該該被掛起。

CyclicBarrier

說明

下面咱們來講下CyclicBarrier,和CountDownLatch差很少,CyclicBarrier 基於 Condition 來實現。因此若是沒了解過Condition的話,建議你們看下前一篇文章初步瞭解AQS是什麼(二)

啥都不說,開局一張圖,(引用別人大佬畫的圖)

源碼分析

下面咱們開始分析咯,依然await是最重要的方法

咱們先看下一些細節

public class CyclicBarrier {
    //CyclicBarrier 是能夠重複使用的,每次從開始使用到穿過柵欄當作"一代",或者"一個週期"
    private static class Generation {
        boolean broken = false;
    }

    /** The lock for guarding barrier entry */
    private final ReentrantLock lock = new ReentrantLock();
   
    //這裏看出是基於Condition的,因此等待線程的條件就是全部線程都在柵欄上await啦
    private final Condition trip = lock.newCondition();
    
    //參與跨過線程的線程數
    private final int parties;
    
    /* 若是設置這個,那麼跨過柵欄以前須要執行的操做*/
    private final Runnable barrierCommand;
    
    /** 當前所處的代 */
    private Generation generation = new Generation();
複製代碼

下面咱們看看如可開啓新的一代吧

private void nextGeneration() {
        // 須要喚醒全部等待在柵欄上的線程
        trip.signalAll();
        
        count = parties;
       	//開啓新的一代!
        generation = new Generation();
    }
複製代碼

基本能夠看出,咱們開啓新的一點,差很少和重寫生成一個CyclicBarrier差很少

咱們接下來看下如何打破一個柵欄

private void breakBarrier() {
    	//設置標誌位
        generation.broken = true;
     	//從新設置count的值
        count = parties;
     	//喚醒全部在等待的線程
        trip.signalAll();
    }
複製代碼

如今已經有了鋪墊了,那麼咱們來看下await方法吧

public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
        final ReentrantLock lock = this.lock;
        //這裏咱們先得到鎖唄,在finally後面再釋放鎖
        //由於在Condition中,await的線程須要先得到鎖的啦!
        lock.lock();
        try {
            final Generation g = generation;
			//先檢查柵欄是否有被打破,被打破就拋出異常唄
            if (g.broken)
                throw new BrokenBarrierException();
			//檢查中斷,有則拋出
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
			//index 就是這個返回值
            //index 存儲當前還有多少count
            int index = --count;
            //若是index 爲 0,那麼就說明全部線程在柵欄啦,那麼就準備打破而後下一代啦
            if (index == 0) {  // tripped
                //用來標記barrierCommand在執行的時候有沒有發生異常
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        //執行barrierCommand的操做
                        command.run();
                    ranAction = true;
                    //開啓下一代啦,這裏建議看下源碼!
                    nextGeneration();
                    return 0;
                } finally {
                    //若是barrierCommand執行的時候發生過異常,那就打破柵欄唄!
                    //這裏也建議看下源碼!
                    if (!ranAction)
                        breakBarrier();
                }
            }

            //若是到達這裏,說明上面的index不爲0,救是說不是最後一個線程到達柵欄的
            for (;;) {
                try {
                    //這裏是沒有超時機制的
                    //到這裏也就是說到達柵欄了,那麼就等待唄,因此就調用Condition的
                    //await,等待最後一個線程,而後被喚醒(Condition的signal)
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    //若是到達這裏,就說明線程在await的時候被中斷了
                    if (g == generation && ! g.broken) {
                        //到這裏就打破柵欄唄
                        breakBarrier();
                        //打破柵欄以後,拋出異常信息,讓外層本身去處理
                        throw ie;
                    } else {
                        //這裏就是說g != genneration,就是說前面已經開啓一個新的
                        //時代啦,說明最後一個線程await完成,全部線程就被喚醒啦!
                        //可是這個中斷已經沒有意義啦,因此記錄下就好!
                        Thread.currentThread().interrupt();
                    }
                }
				//若是柵欄被打破,這裏的被打破就是以前執行barrierCommand的時候
                //發生異常,那麼被就拋出異常啦
                if (g.broken)
                    throw new BrokenBarrierException();

                //到這裏基本都要退出啦
                //由於以前barrierCommand在執行完任務以後,就會 nextGeneration
                //開啓一個新的時代,而後釋放鎖,等待的線程都是await到這裏的,因此肯
                //定await以前的時代和被喚醒後的時代不同啦!
                //若是以前的柵欄破了或者await的時候被喚醒了,都是在前面就拋出異常
                //都會直接返回啦!
                if (g != generation)
                    return index;
				//這裏是超時的操做了,就不作分析了
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
複製代碼

上面的await方法咱們已經說得已經很清楚啦,咱們接下來看看如何獲取在柵欄等待的線程數

public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //也就是運算一下就好啦!
            return parties - count;
        } finally {
            lock.unlock();
        }
    }
複製代碼

檢查柵欄有沒有被打破,其實也就是看看那個標誌位有沒有被設置就好啦!

public boolean isBroken() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return generation.broken;
    } finally {
        lock.unlock();
    }
}
複製代碼

下面咱們來總結一下何時柵欄會被打破吧!

  1. 若是在await的線程被中斷,那麼就會打破柵欄,而後拋出InterruptedException的異常啦
  2. 若是在指定的操做被拋出異常的時候,也會打破柵欄!

咱們再來看下重置柵欄的源碼

public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }
複製代碼

跟簡單,就是打破柵欄,而後重建柵欄!也就是不破不立!

咱們來假設一個狀況,加入parties設置爲4,此時有3個線程await了,那麼在第4個await以前重置

那麼首先打破柵欄嘛,那麼就會跑出BrokenBarrierException 異常,而後開啓新的一代,再重置CyclicBarrier 的count和generation,一切從0開始!

關於AQS的核心功能的源碼分析就到此結束啦!繼續征戰下一個知識!

總結

  1. 本片博客重點分析了CountDownLatch的源碼,在分析的過程當中,有兩個問題迷惑了本身好久,可是本身在嘗試跳出局部思惟以後,發現有些問題能夠迎刃而解,也對AQS共享模式的工做機制有了必定的瞭解因此但願之後能夠保持這種思惟。
  2. 在分析CountDownLatch的源碼過程當中,本身起碼能夠耐下心去琢磨,但願之後能夠更加鍛鍊閱讀優秀源碼的能力吧
  3. 在寫本博客的時候參考了別人的想法和內容,但願本身能夠早日獨立寫出優秀的文章!!!!!
相關文章
相關標籤/搜索