逐行分析AQS源碼(3)——共享鎖的獲取與釋放

前言

前面兩篇咱們以ReentrantLock爲例瞭解了AQS獨佔鎖的獲取釋放,本篇咱們來看看共享鎖。因爲AQS對於共享鎖與獨佔鎖的實現框架比較相似,所以若是你搞定了前面的獨佔鎖模式,則共享鎖也就很容易弄懂了。java

系列文章目錄node

共享鎖與獨佔鎖的區別

共享鎖與獨佔鎖最大的區別在於,獨佔鎖是獨佔的,排他的,所以在獨佔鎖中有一個exclusiveOwnerThread屬性,用來記錄當前持有鎖的線程。當獨佔鎖已經被某個線程持有時,其餘線程只能等待它被釋放後,才能去爭鎖,而且同一時刻只有一個線程能爭鎖成功。segmentfault

而對於共享鎖而言,因爲鎖是能夠被共享的,所以它能夠被多個線程同時持有。換句話說,若是一個線程成功獲取了共享鎖,那麼其餘等待在這個共享鎖上的線程就也能夠嘗試去獲取鎖,而且極有可能獲取成功。併發

共享鎖的實現和獨佔鎖是對應的,咱們能夠從下面這張表中看出:框架

獨佔鎖 共享鎖
tryAcquire(int arg) tryAcquireShared(int arg)
tryAcquireNanos(int arg, long nanosTimeout) tryAcquireSharedNanos(int arg, long nanosTimeout)
acquire(int arg) acquireShared(int arg)
acquireQueued(final Node node, int arg) doAcquireShared(int arg)
acquireInterruptibly(int arg) acquireSharedInterruptibly(int arg)
doAcquireInterruptibly(int arg) doAcquireSharedInterruptibly(int arg)
doAcquireNanos(int arg, long nanosTimeout) doAcquireSharedNanos(int arg, long nanosTimeout)
release(int arg) releaseShared(int arg)
tryRelease(int arg) tryReleaseShared(int arg)
- doReleaseShared()

能夠看出,除了最後一個屬於共享鎖的doReleaseShared()方法沒有對應外,其餘的方法,獨佔鎖和共享鎖都是一一對應的。oop

事實上,其實與doReleaseShared()對應的獨佔鎖的方法應當是unparkSuccessor(h),只是doReleaseShared()邏輯不只僅包含了unparkSuccessor(h),還包含了其餘操做,這一點咱們下面分析源碼的時候再看。性能

另外,尤爲須要注意的是,在獨佔鎖模式中,咱們只有在獲取了獨佔鎖的節點釋放鎖時,纔會喚醒後繼節點——這是合理的,由於獨佔鎖只能被一個線程持有,若是它尚未被釋放,就沒有必要去喚醒它的後繼節點。優化

然而,在共享鎖模式下,當一個節點獲取到了共享鎖,咱們在獲取成功後就能夠喚醒後繼節點了,而不須要等到該節點釋放鎖的時候,這是由於共享鎖能夠被多個線程同時持有,一個鎖獲取到了,則後繼的節點均可以直接來獲取。所以,在共享鎖模式下,在獲取鎖和釋放鎖結束時,都會喚醒後繼節點。 這一點也是doReleaseShared()方法與unparkSuccessor(h)方法沒法直接對應的根本緣由所在。ui

共享鎖的獲取

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

咱們拿它和獨佔鎖模式對比一下:this

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

這二者的結構看上去彷佛有點差異,但事實上是同樣的,只不過是共享鎖模式下,將與addWaiter(Node.EXCLUSIVE)對應的addWaiter(Node.SHARED),以及selfInterrupt()操做所有移到了doAcquireShared方法內部,這一點咱們在下面分析doAcquireShared方法時就一目瞭然了。

不過這裏先插一句,相對於獨佔的鎖的tryAcquire(int arg)返回boolean類型的值,共享鎖的tryAcquireShared(int acquires)返回的是一個整型值:

  • 若是該值小於0,則表明當前線程獲取共享鎖失敗
  • 若是該值大於0,則表明當前線程獲取共享鎖成功,而且接下來其餘線程嘗試獲取共享鎖的行爲極可能成功
  • 若是該值等於0,則表明當前線程獲取共享鎖成功,可是接下來其餘線程嘗試獲取共享鎖的行爲會失敗

所以,只要該返回值大於等於0,就表示獲取共享鎖成功。

acquireShared中的tryAcquireShared方法由具體的子類負責實現,這裏咱們暫且不表。

接下來咱們看看doAcquireShared方法,它對應於獨佔鎖的acquireQueued,二者其實很相似,咱們把它們相同的部分註釋掉,只看不一樣的部分:

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    /*boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();*/
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            /*if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }*/
}

關於上面的if部分,獨佔鎖對應的acquireQueued方法爲:

if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return interrupted;
}

所以,綜合來看,這二者的邏輯僅有兩處不一樣:

  1. addWaiter(Node.EXCLUSIVE) -> addWaiter(Node.SHARED)
  2. setHead(node) -> setHeadAndPropagate(node, r)

這裏第一點不一樣就是獨佔鎖的acquireQueued調用的是addWaiter(Node.EXCLUSIVE),而共享鎖調用的是addWaiter(Node.SHARED),代表了該節點處於共享模式,這兩種模式的定義爲:

/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

該模式被賦值給了節點的nextWaiter屬性:

Node(Thread thread, Node mode) {     // Used by addWaiter
    this.nextWaiter = mode;
    this.thread = thread;
}

咱們知道,在條件隊列中,nextWaiter是指向條件隊列中的下一個節點的,它將條件隊列中的節點串起來,構成了單鏈表。可是在sync queue隊列中,咱們只用prev,next屬性來串聯節點,造成雙向鏈表,nextWaiter屬性在這裏只起到一個標記做用,不會串聯節點,這裏不要被Node SHARED = new Node()所指向的空節點迷惑,這個空節點並不屬於sync queue,不表明任何線程,它只起到標記做用,僅僅用做判斷節點是否處於共享模式的依據:

// Node#isShard()
final boolean isShared() {
    return nextWaiter == SHARED;
}

這裏的第二點不一樣就在於獲取鎖成功後的行爲,對於獨佔鎖而言,是直接調用了setHead(node)方法,而共享鎖調用的是setHeadAndPropagate(node, r)

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);

    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

在該方法內部咱們不只調用了setHead(node),還在必定條件下調用了doReleaseShared()來喚醒後繼的節點。這是由於在共享鎖模式下,鎖能夠被多個線程所共同持有,既然當前線程已經拿到共享鎖了,那麼就能夠直接通知後繼節點來拿鎖,而沒必要等待鎖被釋放的時候再通知。

關於這個doReleaseShared方法,咱們到下面分析鎖釋放的時候再看。

共享鎖的釋放

咱們使用releaseShared(int arg)方法來釋放共享鎖:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

該方法對應於獨佔鎖的release(int arg)方法:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

在獨佔鎖模式下,因爲頭節點就是持有獨佔鎖的節點,在它釋放獨佔鎖後,若是發現本身的waitStatus不爲0,則它將負責喚醒它的後繼節點。

在共享鎖模式下,頭節點就是持有共享鎖的節點,在它釋放共享鎖後,它也應該喚醒它的後繼節點,可是值得注意的是,咱們在以前的setHeadAndPropagate方法中可能已經調用過該方法了,也就是說它可能會被同一個頭節點調用兩次,也有可能在咱們從releaseShared方法中調用它時,當前的頭節點已經易主了,下面咱們就來詳細看看這個方法:

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

該方法多是共享鎖模式最難理解的方法了,在看該方法時,咱們須要明確如下幾個問題:

(1) 該方法有幾處調用?

該方法有兩處調用,一處在acquireShared方法的末尾,當線程成功獲取到共享鎖後,在必定條件下調用該方法;一處在releaseShared方法中,當線程釋放共享鎖的時候調用。

(2) 調用該方法的線程是誰?

在獨佔鎖中,只有獲取了鎖的線程才能調用release釋放鎖,所以調用unparkSuccessor(h)喚醒後繼節點的必然是持有鎖的線程,該線程可看作是當前的頭節點(雖然在setHead方法中已經將頭節點的thread屬性設爲了null,可是這個頭節點曾經表明的就是這個線程)

在共享鎖中,持有共享鎖的線程能夠有多個,這些線程均可以調用releaseShared方法釋放鎖;而這些線程想要得到共享鎖,則它們必然曾經成爲過頭節點,或者就是如今的頭節點。所以,若是是在releaseShared方法中調用的doReleaseShared,可能此時調用方法的線程已經不是頭節點所表明的線程了,頭節點可能已經被易主好幾回了。

(3) 調用該方法的目的是什麼?

不管是在acquireShared中調用,仍是在releaseShared方法中調用,該方法的目的都是在當前共享鎖是可獲取的狀態時,喚醒head節點的下一個節點。這一點看上去和獨佔鎖彷佛同樣,可是它們的一個重要的差異是——在共享鎖中,當頭節點發生變化時,是會回到循環中再當即喚醒head節點的下一個節點的。也就是說,在當前節點完成喚醒後繼節點的任務以後將要退出時,若是發現被喚醒後繼節點已經成爲了新的頭節點,則會當即觸發喚醒head節點的下一個節點的操做,如此周而復始。

(4) 退出該方法的條件是什麼

該方法是一個自旋操做(for(;;)),退出該方法的惟一辦法是走最後的break語句:

if (h == head)   // loop if head changed
    break;

即,只有在當前head沒有易主時,纔會退出,不然繼續循環。
這個怎麼理解呢?
爲了說明問題,這裏咱們假設目前sync queue隊列中依次排列有

dummy node -> A -> B -> C -> D

如今假設A已經拿到了共享鎖,則它將成爲新的dummy node,

dummy node (A) -> B -> C -> D

此時,A線程會調用doReleaseShared,咱們寫作doReleaseShared[A],在該方法中將喚醒後繼的節點B,它很快得到了共享鎖,成爲了新的頭節點:

dummy node (B) -> C -> D

此時,B線程也會調用doReleaseShared,咱們寫作doReleaseShared[B],在該方法中將喚醒後繼的節點C,可是別忘了,在doReleaseShared[B]調用的時候,doReleaseShared[A]還沒運行結束呢,當它運行到if(h == head)時,發現頭節點如今已經變了,因此它將繼續回到for循環中,與此同時,doReleaseShared[B]也沒閒着,它在執行過程當中也進入到了for循環中。。。

因而可知,咱們這裏造成了一個doReleaseShared的「調用風暴」,大量的線程在同時執行doReleaseShared,這極大地加速了喚醒後繼節點的速度,提高了效率,同時該方法內部的CAS操做又保證了多個線程同時喚醒一個節點時,只有一個線程能操做成功。

那若是這裏doReleaseShared[A]執行結束時,節點B尚未成爲新的頭節點時,doReleaseShared[A]方法不就退出了嗎?是的,但即便這樣也沒有關係,由於它已經成功喚醒了線程B,即便doReleaseShared[A]退出了,當B線程成爲新的頭節點時,doReleaseShared[B]就開始執行了,它也會負責喚醒後繼節點的,這樣即便變成這種每一個節點只喚醒本身後繼節點的模式,從功能上講,最終也能夠實現喚醒全部等待共享鎖的節點的目的,只是效率上沒有以前的「調用風暴」快。

由此咱們知道,這裏的「調用風暴」事實上是一個優化操做,由於在咱們執行到該方法的末尾的時候,unparkSuccessor基本上已經被調用過了,而因爲如今是共享鎖模式,因此被喚醒的後繼節點極有可能已經獲取到了共享鎖,成爲了新的head節點,當它成爲新的head節點後,它可能仍是要在setHeadAndPropagate方法中調用doReleaseShared喚醒它的後繼節點。

明確了上面幾個問題後,咱們再來詳細分析這個方法,它最重要的部分就是下面這兩個if語句:

if (ws == Node.SIGNAL) {
    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
        continue;            // loop to recheck cases
    unparkSuccessor(h);
}
else if (ws == 0 &&
         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    continue;                // loop on failed CAS

第一個if很好理解,若是當前ws值爲Node.SIGNAL,則說明後繼節點須要喚醒,這裏採用CAS操做先將Node.SIGNAL狀態改成0,這是由於前面講過,可能有大量的doReleaseShared方法在同時執行,咱們只須要其中一個執行unparkSuccessor(h)操做就好了,這裏經過CAS操做保證了unparkSuccessor(h)只被執行一次。

比較難理解的是第二個else if,首先咱們要弄清楚ws啥時候爲0,一種是上面的compareAndSetWaitStatus(h, Node.SIGNAL, 0)會致使ws爲0,可是很明顯,若是是由於這個緣由,則它是不會進入到else if語句塊的。因此這裏的ws爲0是指當前隊列的最後一個節點成爲了頭節點。爲何是最後一個節點呢,由於每次新的節點加進來,在掛起前必定會將本身的前驅節點的waitStatus修改爲Node.SIGNAL的。(對這一點不理解的詳細看這裏)

其次,compareAndSetWaitStatus(h, 0, Node.PROPAGATE)這個操做何時會失敗?既然這個操做失敗,說明就在執行這個操做的瞬間,ws此時已經不爲0了,說明有新的節點入隊了,ws的值被改成了Node.SIGNAL,此時咱們將調用continue,在下次循環中直接將這個剛剛新入隊但準備掛起的線程喚醒。

其實,若是咱們再結合外部的總體條件,就很容易理解這種狀況所針對的場景,不要忘了,進入上面這段還有一個條件是

if (h != null && h != tail)

它處於最外層:

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) { // 注意這裏說明了隊列至少有兩個節點
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;               
        }
        if (h == head)
            break;
    }
}

這個條件意味着,隊列中至少有兩個節點。

結合上面的分析,咱們能夠看出,這個

else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

描述了一個極其嚴苛且短暫的狀態:

  1. 首先,大前提是隊列裏至少有兩個節點
  2. 其次,要執行到else if語句,說明咱們跳過了前面的if條件,說明頭節點是剛剛成爲頭節點的,它的waitStatus值還爲0,尾節點是在這以後剛剛加進來的,它須要執行shouldParkAfterFailedAcquire,將它的前驅節點(即頭節點)的waitStatus值修改成Node.SIGNAL可是目前這個修改操做尚未來的及執行。這種狀況使咱們得以進入else if的前半部分else if (ws == 0 &&
  3. 緊接着,要知足!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)這一條件,說明此時頭節點的waitStatus已經不是0了,這說明以前那個沒有來得及執行的 shouldParkAfterFailedAcquire將前驅節點的的waitStatus值修改成Node.SIGNAL的操做如今執行完了。

因而可知,else if&& 鏈接了兩個不一致的狀態,分別對應了shouldParkAfterFailedAcquirecompareAndSetWaitStatus(pred, ws, Node.SIGNAL)執行成功前和執行成功後,由於doReleaseShared
shouldParkAfterFailedAcquire是能夠併發執行的,因此這一條件是有可能知足的,只是知足的條件很是嚴苛,可能只是一瞬間的事。

這裏不得不說,若是以上的分析沒有錯的話,那做者對於AQS性能的優化已經到了「使人髮指」的地步!!!雖然說這種短暫的瞬間確實存在,也確實有必要從新回到for循環中再次去喚醒後繼節點,可是這種優化也太太太~~~過於精細了吧!

咱們來看看若是不加入這個精細的控制條件有什麼後果呢?

這裏咱們複習一下新節點入隊的過程,前面說過,在發現新節點的前驅不是head節點的時候,它將調用shouldParkAfterFailedAcquire

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

因爲前驅節點的ws值如今還爲0,新節點將會把它改成Node.SIGNAL,

但修改後,該方法返回的是false,也就是說線程不會當即掛起,而是回到上層再嘗試一次搶鎖:

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // shouldParkAfterFailedAcquire的返回處
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

當咱們再次回到for(;;)循環中,因爲此時當前節點的前驅節點已經成爲了新的head,因此它能夠參與搶鎖,因爲它搶的是共享鎖,因此大機率它是搶的到的,因此極有可能它不會被掛起。這有可能致使在上面的doReleaseShared調用unparkSuccessor方法unpark了一個並無被park的線程。然而,這一操做是被容許的,當咱們unpark一個並無被park的線程時,該線程在下一次調用park方法時就不會被掛起,而這一行爲是符合咱們的場景的——由於當前的共享鎖處於可獲取的狀態,後繼的線程應該直接來獲取鎖,不該該被掛起。

事實上,我我的認爲:

else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    continue;  // loop on failed CAS

這一段其實也能夠省略,固然有了這一段確定會加速喚醒後繼節點的過程,做者針對上面那種極其短暫的狀況進行了優化能夠說是和它以前「調用風暴」的設計一脈相承,可能也正是因爲做者對於性能的極致追求才使得AQS如此之優秀吧。

總結

  • 共享鎖的調用框架和獨佔鎖很類似,它們最大的不一樣在於獲取鎖的邏輯——共享鎖能夠被多個線程同時持有,而獨佔鎖同一時刻只能被一個線程持有。
  • 因爲共享鎖同一時刻能夠被多個線程持有,所以當頭節點獲取到共享鎖時,能夠當即喚醒後繼節點來爭鎖,而沒必要等到釋放鎖的時候。所以,共享鎖觸發喚醒後繼節點的行爲可能有兩處,一處在當前節點成功得到共享鎖後,一處在當前節點釋放共享鎖後。

(完)

系列文章目錄

相關文章
相關標籤/搜索