往期回顧:html
上一篇文章介紹了AQS內置隊列節點的出隊入隊操做,以及獨佔式獲取共享資源與釋放資源的詳細流程,爲告終構完整,本篇繼續以AQS的角度介紹另一種:共享模式獲取與釋放資源的細節,本篇暫不分析具體子類如ReentrantLock、ReentrantReadWriteLock的實現,以後會陸續補充。java
友情提示:本篇文章着重介紹共享模式獲取和釋放資源的特色,許多代碼實現上面和共享式和獨佔式其實邏輯差很少,爲了清晰對比,這邊會將獨佔式的部分核心代碼粘貼過來,注意理解共享式和獨佔式存在差別的地方。詳細解析可戳:Java併發包源碼學習系列:CLH同步隊列及同步資源獲取與釋放node
public final void acquire(int arg) { if (!tryAcquire(arg) && // tryAcquire由子類實現,表示獲取鎖,若是成功,這個方法直接返回了 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 若是獲取失敗,執行 selfInterrupt(); }
// 這個方法若是返回true,代碼將進入selfInterrupt() final boolean acquireQueued(final Node node, int arg) { // 注意默認爲true boolean failed = true; try { // 是否中斷 boolean interrupted = false; // 自旋,即死循環 for (;;) { // 獲得node的前驅節點 final Node p = node.predecessor(); // 咱們知道head是虛擬的頭節點,p==head表示若是node爲阻塞隊列的第一個真實節點 // 就執行tryAcquire邏輯,這裏tryAcquire也須要由子類實現 if (p == head && tryAcquire(arg)) { // tryAcquire獲取成功走到這,執行setHead出隊操做 setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 走到這有兩種狀況 1.node不是第一個節點 2.tryAcquire爭奪鎖失敗了 // 這裏就判斷 若是當前線程爭鎖失敗,是否須要掛起當前這個線程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 死循環退出,只有tryAcquire獲取鎖失敗的時候failed才爲true if (failed) cancelAcquire(node); } }
public final boolean release(int arg) { if (tryRelease(arg)) { // 子類實現tryRelease方法 // 得到當前head Node h = head; // head不爲null而且head的等待狀態不爲0 if (h != null && h.waitStatus != 0) // 喚醒下一個能夠被喚醒的線程,不必定是next哦 unparkSuccessor(h); return true; } return false; }
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; // 若是node的waitStatus<0爲signal,CAS修改成0 // 將 head 節點的 ws 改爲 0,清除信號。表示,他已經釋放過了。不能重複釋放。 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 喚醒後繼節點,可是有可能後繼節點取消了等待 即 waitStatus == 1 Node s = node.next; // 若是後繼節點爲空或者它已經放棄鎖了 if (s == null || s.waitStatus > 0) { s = null; // 從隊尾往前找,找到沒有沒取消的全部節點排在最前面的【直到t爲null或t==node才退出循環嘛】 for (Node t = tail; t != null && t != node; t = t.prev) // 若是>0表示節點被取消了,就一直向前找唄,找到以後不會return,還會一直向前 if (t.waitStatus <= 0) s = t; } // 若是後繼節點存在且沒有被取消,會走到這,直接喚醒後繼節點便可 if (s != null) LockSupport.unpark(s.thread); }
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) //子類實現 doAcquireShared(arg); }
tryAcquireShared(int)
是AQS提供給子類實現的鉤子方法,子類能夠自定義實現共享式獲取資源的方式,獲取狀態失敗返回小於0,返回零值表示被獨佔方式獲取,返回正值表示共享方式獲取。doAcquireShared(arg);
的邏輯。注意這裏和獨佔式獲取資源acquireQueued
的區別。編程
private void doAcquireShared(int arg) { // 包裝成共享模式的節點,入隊 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; // 自旋 for (;;) { final Node p = node.predecessor(); if (p == head) { // 嘗試獲取同步狀態,子類實現 int r = tryAcquireShared(arg); if (r >= 0) { // 設置新的首節點,並根據條件,喚醒下一個節點 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
咱們能夠看到有幾個存在差別的地方:併發
setHeadAndPropagate(node, r)
,該方法首先會設置新的首節點,將第一個節點出隊,接着會不斷喚醒下一個共享模式節點,實現同步狀態被多個線程共享獲取。接下來咱們着重看下setHeadAndPropagate方法。app
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below // 節點出隊,設置node爲新的head setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ // 這個方法進來的時候propagate>=0 // propagate>0表示同步狀態還能夠被後面的節點獲取 // h指向原先的head節點,以後h = head,h表示新的head節點 // h.waitStatus<0表示該節點後面還有節點須要被喚醒 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { // 獲取下一個節點 Node s = node.next; // 沒有下一個節點或下一個節點爲共享式獲取狀態 if (s == null || s.isShared()) // 喚醒後續的共享式獲取同步狀態的節點 doReleaseShared(); } }
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { // 子類實現 doReleaseShared(); return true; } return false; }
能夠發現共享模式下,不管是獲取資源仍是釋放資源都調用了doReleaseShared方法,可見該方法是共享模式釋放資源喚醒節點的核心方法,主要功能是喚醒下一個線程或者設置傳播狀態。oop
後繼線程被喚醒後,會嘗試獲取共享鎖,若是成功以後,則又會調用setHeadAndPropagate,將喚醒傳播下去。這個方法的做用是保障在acquire和release存在競爭的狀況下,保證隊列中處於等待狀態的節點可以有辦法被喚醒。學習
private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ // 自旋 for (;;) { Node h = head; // 隊列已經初始化且至少有一個節點 if (h != null && h != tail) { int ws = h.waitStatus; // 不管是獨佔仍是共享,只有節點的ws爲signal的時候,纔會在釋放的時候,喚醒後面的節點 if (ws == Node.SIGNAL) { // cas將ws設置爲0,設置失敗,將會繼續從循環開始 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 喚醒後繼節點,unparkSuccessor這個方法上面已經解析過 unparkSuccessor(h); } // 若是ws爲0,則更新狀態爲propagate, // 以後setHeadAndPropagate讀到ws<0的時候,會繼續喚醒後面節點 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 若是head在這個過程當中被更改了,會繼續自旋 if (h == head) // loop if head changed break; } }
該方法在 head 節點存在後繼節點的狀況下,作了兩件事情:ui
若是 head 節點等待狀態爲 SIGNAL,則將 head 節點狀態設爲 0,並喚醒後繼未取消節點。this
若是 head 節點等待狀態爲 0,則將 head 節點狀態設爲 PROPAGATE,保證喚醒可以正常傳播下去。
設置PROPAGATE的做用:PROPAGATE狀態用在[setHeadAndPropagate](#void setHeadAndPropagate(Node node, int propagate)) ,當頭節點狀態被設爲 PROPAGATE 後,後繼節點成爲新的頭結點後。若 propagate > 0
條件不成立,則根據條件h.waitStatus < 0
成立與否,來決定是否喚醒後繼節點,即向後傳播喚醒動做。
引入PROPAGATE是爲了解決什麼問題?
AbstractQueuedSynchronizer源碼解讀,強烈建議閱讀這篇博客。
共享式獲取與獨佔式獲取最大的區別就是同一時刻可否有多個線程同時獲取到同步狀態。
- 共享式訪問資源時,同一時刻其餘共享式的訪問會被容許。
- 獨佔式訪問資源時,同一時刻其餘訪問均被阻塞。
AQS都提供了子類實現的鉤子方法,獨佔式的表明方法有:tryAcquire和tryRelease以及isHeldExclusively方法,共享式的表明方法有:tryAcquireShared和tryReleaseShared方法。
AQS中獲取操做和釋放操做的標準形式:
boolean acquire() throws InterruptedException{ while( 當前狀態不容許獲取操做 ){ if( 須要阻塞獲取請求){ 若是當前線程不在隊列中,則將其插入隊列 阻塞當前線程 }else{ 返回失敗 } } 可能更新同步器的狀態 若是線程位於隊列中,則將其移除隊列 返回成功 } void release(){ 更新同步器的狀態 if( 新的狀態容許某個被阻塞的線程獲取成功 ){ 解除隊列中一個或多個線程的阻塞狀態 } }
圖源:《併發編程的藝術》下圖是獨佔式同步狀態獲取的流程
當某個線程爭奪同步資源失敗以後,他們都會將線程包裝爲節點,並加入CLH同步隊列的隊尾,並保持自旋,一個是addWaiter(Node.EXCLUSIVE)
,一個是addWaiter(Node.EXCLUSIVE)
。
同步隊列中的線程在自旋時會判斷其前驅節點是否爲首節點,若是是首節點node.predecessor() == head
,他們都會嘗試獲取同步狀態,只不過:
線程執行完邏輯以後,他們都會釋放同步狀態,釋放以後將會unparkSuccessor(h)
喚醒其後可被喚醒的某個後繼節點。