前面兩篇咱們以ReentrantLock爲例瞭解了AQS獨佔鎖的獲取與釋放,本篇咱們來看看共享鎖。因爲AQS對於共享鎖與獨佔鎖的實現框架比較相似,所以若是你搞定了前面的獨佔鎖模式,則共享鎖也就很容易弄懂了。java
系列文章目錄node
共享鎖與獨佔鎖最大的區別在於,獨佔鎖是獨佔的,排他的,所以在獨佔鎖中有一個exclusiveOwnerThread
屬性,用來記錄當前持有鎖的線程。當獨佔鎖已經被某個線程持有時,其餘線程只能等待它被釋放後,才能去爭鎖,而且同一時刻只有一個線程能爭鎖成功。segmentfault
而對於共享鎖而言,因爲鎖是能夠被共享的,所以它能夠被多個線程同時持有。換句話說,若是一個線程成功獲取了共享鎖,那麼其餘等待在這個共享鎖上的線程就也能夠嘗試去獲取鎖,而且極有可能獲取成功。併發
共享鎖的實現和獨佔鎖是對應的,咱們能夠從下面這張表中看出:框架
獨佔鎖 | 共享鎖 |
---|---|
tryAcquire(int arg) | tryAcquireShared(int arg) |
tryAcquireNanos(int arg, long nanosTimeout) | tryAcquireSharedNanos(int arg, long nanosTimeout) |
acquire(int arg) | acquireShared(int arg) |
acquireQueued(final Node node, int arg) | doAcquireShared(int arg) |
acquireInterruptibly(int arg) | acquireSharedInterruptibly(int arg) |
doAcquireInterruptibly(int arg) | doAcquireSharedInterruptibly(int arg) |
doAcquireNanos(int arg, long nanosTimeout) | doAcquireSharedNanos(int arg, long nanosTimeout) |
release(int arg) | releaseShared(int arg) |
tryRelease(int arg) | tryReleaseShared(int arg) |
- | doReleaseShared() |
能夠看出,除了最後一個屬於共享鎖的doReleaseShared()
方法沒有對應外,其餘的方法,獨佔鎖和共享鎖都是一一對應的。oop
事實上,其實與doReleaseShared()
對應的獨佔鎖的方法應當是unparkSuccessor(h)
,只是doReleaseShared()
邏輯不只僅包含了unparkSuccessor(h)
,還包含了其餘操做,這一點咱們下面分析源碼的時候再看。性能
另外,尤爲須要注意的是,在獨佔鎖模式中,咱們只有在獲取了獨佔鎖的節點釋放鎖時,纔會喚醒後繼節點——這是合理的,由於獨佔鎖只能被一個線程持有,若是它尚未被釋放,就沒有必要去喚醒它的後繼節點。優化
然而,在共享鎖模式下,當一個節點獲取到了共享鎖,咱們在獲取成功後就能夠喚醒後繼節點了,而不須要等到該節點釋放鎖的時候,這是由於共享鎖能夠被多個線程同時持有,一個鎖獲取到了,則後繼的節點均可以直接來獲取。所以,在共享鎖模式下,在獲取鎖和釋放鎖結束時,都會喚醒後繼節點。 這一點也是doReleaseShared()
方法與unparkSuccessor(h)
方法沒法直接對應的根本緣由所在。ui
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
咱們拿它和獨佔鎖模式對比一下:this
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
這二者的結構看上去彷佛有點差異,但事實上是同樣的,只不過是共享鎖模式下,將與addWaiter(Node.EXCLUSIVE)
對應的addWaiter(Node.SHARED)
,以及selfInterrupt()
操做所有移到了doAcquireShared
方法內部,這一點咱們在下面分析doAcquireShared
方法時就一目瞭然了。
不過這裏先插一句,相對於獨佔的鎖的tryAcquire(int arg)
返回boolean類型的值,共享鎖的tryAcquireShared(int acquires)
返回的是一個整型值:
所以,只要該返回值大於等於0,就表示獲取共享鎖成功。
acquireShared
中的tryAcquireShared
方法由具體的子類負責實現,這裏咱們暫且不表。
接下來咱們看看doAcquireShared
方法,它對應於獨佔鎖的acquireQueued
,二者其實很相似,咱們把它們相同的部分註釋掉,只看不一樣的部分:
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); /*boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor();*/ if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } /*if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }*/ }
關於上面的if部分,獨佔鎖對應的acquireQueued
方法爲:
if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; }
所以,綜合來看,這二者的邏輯僅有兩處不一樣:
addWaiter(Node.EXCLUSIVE)
-> addWaiter(Node.SHARED)
setHead(node)
-> setHeadAndPropagate(node, r)
這裏第一點不一樣就是獨佔鎖的acquireQueued
調用的是addWaiter(Node.EXCLUSIVE)
,而共享鎖調用的是addWaiter(Node.SHARED)
,代表了該節點處於共享模式,這兩種模式的定義爲:
/** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null;
該模式被賦值給了節點的nextWaiter
屬性:
Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; }
咱們知道,在條件隊列中,nextWaiter
是指向條件隊列中的下一個節點的,它將條件隊列中的節點串起來,構成了單鏈表。可是在sync queue
隊列中,咱們只用prev
,next
屬性來串聯節點,造成雙向鏈表,nextWaiter
屬性在這裏只起到一個標記做用,不會串聯節點,這裏不要被Node SHARED = new Node()
所指向的空節點迷惑,這個空節點並不屬於sync queue
,不表明任何線程,它只起到標記做用,僅僅用做判斷節點是否處於共享模式的依據:
// Node#isShard() final boolean isShared() { return nextWaiter == SHARED; }
這裏的第二點不一樣就在於獲取鎖成功後的行爲,對於獨佔鎖而言,是直接調用了setHead(node)
方法,而共享鎖調用的是setHeadAndPropagate(node, r)
:
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
在該方法內部咱們不只調用了setHead(node)
,還在必定條件下調用了doReleaseShared()
來喚醒後繼的節點。這是由於在共享鎖模式下,鎖能夠被多個線程所共同持有,既然當前線程已經拿到共享鎖了,那麼就能夠直接通知後繼節點來拿鎖,而沒必要等待鎖被釋放的時候再通知。
關於這個doReleaseShared
方法,咱們到下面分析鎖釋放的時候再看。
咱們使用releaseShared(int arg)
方法來釋放共享鎖:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
該方法對應於獨佔鎖的release(int arg)
方法:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
在獨佔鎖模式下,因爲頭節點就是持有獨佔鎖的節點,在它釋放獨佔鎖後,若是發現本身的waitStatus不爲0,則它將負責喚醒它的後繼節點。
在共享鎖模式下,頭節點就是持有共享鎖的節點,在它釋放共享鎖後,它也應該喚醒它的後繼節點,可是值得注意的是,咱們在以前的setHeadAndPropagate
方法中可能已經調用過該方法了,也就是說它可能會被同一個頭節點調用兩次,也有可能在咱們從releaseShared
方法中調用它時,當前的頭節點已經易主了,下面咱們就來詳細看看這個方法:
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
該方法多是共享鎖模式最難理解的方法了,在看該方法時,咱們須要明確如下幾個問題:
(1) 該方法有幾處調用?
該方法有兩處調用,一處在acquireShared
方法的末尾,當線程成功獲取到共享鎖後,在必定條件下調用該方法;一處在releaseShared
方法中,當線程釋放共享鎖的時候調用。
(2) 調用該方法的線程是誰?
在獨佔鎖中,只有獲取了鎖的線程才能調用release釋放鎖,所以調用unparkSuccessor(h)喚醒後繼節點的必然是持有鎖的線程,該線程可看作是當前的頭節點(雖然在setHead方法中已經將頭節點的thread屬性設爲了null,可是這個頭節點曾經表明的就是這個線程)
在共享鎖中,持有共享鎖的線程能夠有多個,這些線程均可以調用releaseShared
方法釋放鎖;而這些線程想要得到共享鎖,則它們必然曾經成爲過頭節點,或者就是如今的頭節點。所以,若是是在releaseShared
方法中調用的doReleaseShared
,可能此時調用方法的線程已經不是頭節點所表明的線程了,頭節點可能已經被易主好幾回了。
(3) 調用該方法的目的是什麼?
不管是在acquireShared
中調用,仍是在releaseShared
方法中調用,該方法的目的都是在當前共享鎖是可獲取的狀態時,喚醒head節點的下一個節點。這一點看上去和獨佔鎖彷佛同樣,可是它們的一個重要的差異是——在共享鎖中,當頭節點發生變化時,是會回到循環中再當即喚醒head節點的下一個節點的。也就是說,在當前節點完成喚醒後繼節點的任務以後將要退出時,若是發現被喚醒後繼節點已經成爲了新的頭節點,則會當即觸發喚醒head節點的下一個節點的操做,如此周而復始。
(4) 退出該方法的條件是什麼
該方法是一個自旋操做(for(;;)
),退出該方法的惟一辦法是走最後的break語句:
if (h == head) // loop if head changed break;
即,只有在當前head沒有易主時,纔會退出,不然繼續循環。
這個怎麼理解呢?
爲了說明問題,這裏咱們假設目前sync queue隊列中依次排列有
dummy node -> A -> B -> C -> D
如今假設A已經拿到了共享鎖,則它將成爲新的dummy node,
dummy node (A) -> B -> C -> D
此時,A線程會調用doReleaseShared,咱們寫作doReleaseShared[A]
,在該方法中將喚醒後繼的節點B,它很快得到了共享鎖,成爲了新的頭節點:
dummy node (B) -> C -> D
此時,B線程也會調用doReleaseShared,咱們寫作doReleaseShared[B]
,在該方法中將喚醒後繼的節點C,可是別忘了,在doReleaseShared[B]
調用的時候,doReleaseShared[A]
還沒運行結束呢,當它運行到if(h == head)
時,發現頭節點如今已經變了,因此它將繼續回到for循環中,與此同時,doReleaseShared[B]
也沒閒着,它在執行過程當中也進入到了for循環中。。。
因而可知,咱們這裏造成了一個doReleaseShared的「調用風暴」,大量的線程在同時執行doReleaseShared,這極大地加速了喚醒後繼節點的速度,提高了效率,同時該方法內部的CAS操做又保證了多個線程同時喚醒一個節點時,只有一個線程能操做成功。
那若是這裏doReleaseShared[A]
執行結束時,節點B尚未成爲新的頭節點時,doReleaseShared[A]
方法不就退出了嗎?是的,但即便這樣也沒有關係,由於它已經成功喚醒了線程B,即便doReleaseShared[A]
退出了,當B線程成爲新的頭節點時,doReleaseShared[B]
就開始執行了,它也會負責喚醒後繼節點的,這樣即便變成這種每一個節點只喚醒本身後繼節點的模式,從功能上講,最終也能夠實現喚醒全部等待共享鎖的節點的目的,只是效率上沒有以前的「調用風暴」快。
由此咱們知道,這裏的「調用風暴」事實上是一個優化操做,由於在咱們執行到該方法的末尾的時候,unparkSuccessor
基本上已經被調用過了,而因爲如今是共享鎖模式,因此被喚醒的後繼節點極有可能已經獲取到了共享鎖,成爲了新的head節點,當它成爲新的head節點後,它可能仍是要在setHeadAndPropagate
方法中調用doReleaseShared
喚醒它的後繼節點。
明確了上面幾個問題後,咱們再來詳細分析這個方法,它最重要的部分就是下面這兩個if語句:
if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS
第一個if很好理解,若是當前ws值爲Node.SIGNAL,則說明後繼節點須要喚醒,這裏採用CAS操做先將Node.SIGNAL狀態改成0,這是由於前面講過,可能有大量的doReleaseShared方法在同時執行,咱們只須要其中一個執行unparkSuccessor(h)
操做就好了,這裏經過CAS操做保證了unparkSuccessor(h)
只被執行一次。
比較難理解的是第二個else if,首先咱們要弄清楚ws啥時候爲0,一種是上面的compareAndSetWaitStatus(h, Node.SIGNAL, 0)
會致使ws爲0,可是很明顯,若是是由於這個緣由,則它是不會進入到else if語句塊的。因此這裏的ws爲0是指當前隊列的最後一個節點成爲了頭節點。爲何是最後一個節點呢,由於每次新的節點加進來,在掛起前必定會將本身的前驅節點的waitStatus修改爲Node.SIGNAL的。(對這一點不理解的詳細看這裏)
其次,compareAndSetWaitStatus(h, 0, Node.PROPAGATE)
這個操做何時會失敗?既然這個操做失敗,說明就在執行這個操做的瞬間,ws此時已經不爲0了,說明有新的節點入隊了,ws的值被改成了Node.SIGNAL,此時咱們將調用continue
,在下次循環中直接將這個剛剛新入隊但準備掛起的線程喚醒。
其實,若是咱們再結合外部的總體條件,就很容易理解這種狀況所針對的場景,不要忘了,進入上面這段還有一個條件是
if (h != null && h != tail)
它處於最外層:
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { // 注意這裏說明了隊列至少有兩個節點 int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } }
這個條件意味着,隊列中至少有兩個節點。
結合上面的分析,咱們能夠看出,這個
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
描述了一個極其嚴苛且短暫的狀態:
else if
語句,說明咱們跳過了前面的if條件,說明頭節點是剛剛成爲頭節點的,它的waitStatus值還爲0,尾節點是在這以後剛剛加進來的,它須要執行shouldParkAfterFailedAcquire
,將它的前驅節點(即頭節點)的waitStatus值修改成Node.SIGNAL
,可是目前這個修改操做尚未來的及執行。這種狀況使咱們得以進入else if的前半部分else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)
這一條件,說明此時頭節點的waitStatus
已經不是0了,這說明以前那個沒有來得及執行的 在shouldParkAfterFailedAcquire
將前驅節點的的waitStatus值修改成Node.SIGNAL
的操做如今執行完了。因而可知,else if
的 &&
鏈接了兩個不一致的狀態,分別對應了shouldParkAfterFailedAcquire
的compareAndSetWaitStatus(pred, ws, Node.SIGNAL)
執行成功前和執行成功後,由於doReleaseShared
和shouldParkAfterFailedAcquire
是能夠併發執行的,因此這一條件是有可能知足的,只是知足的條件很是嚴苛,可能只是一瞬間的事。
這裏不得不說,若是以上的分析沒有錯的話,那做者對於AQS性能的優化已經到了「使人髮指」的地步!!!雖然說這種短暫的瞬間確實存在,也確實有必要從新回到for循環中再次去喚醒後繼節點,可是這種優化也太太太~~~過於精細了吧!
咱們來看看若是不加入這個精細的控制條件有什麼後果呢?
這裏咱們複習一下新節點入隊的過程,前面說過,在發現新節點的前驅不是head節點的時候,它將調用shouldParkAfterFailedAcquire
:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
因爲前驅節點的ws值如今還爲0,新節點將會把它改成Node.SIGNAL,
但修改後,該方法返回的是false,也就是說線程不會當即掛起,而是回到上層再嘗試一次搶鎖:
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // shouldParkAfterFailedAcquire的返回處 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
當咱們再次回到for(;;)
循環中,因爲此時當前節點的前驅節點已經成爲了新的head,因此它能夠參與搶鎖,因爲它搶的是共享鎖,因此大機率它是搶的到的,因此極有可能它不會被掛起。這有可能致使在上面的doReleaseShared
調用unparkSuccessor
方法unpark
了一個並無被park
的線程。然而,這一操做是被容許的,當咱們unpark
一個並無被park
的線程時,該線程在下一次調用park
方法時就不會被掛起,而這一行爲是符合咱們的場景的——由於當前的共享鎖處於可獲取的狀態,後繼的線程應該直接來獲取鎖,不該該被掛起。
事實上,我我的認爲:
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS
這一段其實也能夠省略,固然有了這一段確定會加速喚醒後繼節點的過程,做者針對上面那種極其短暫的狀況進行了優化能夠說是和它以前「調用風暴」的設計一脈相承,可能也正是因爲做者對於性能的極致追求才使得AQS如此之優秀吧。
(完)