AbstractQueuedSynchronizer (抽象隊列同步器,如下簡稱 AQS)出如今 JDK 1.5 中,由大師 Doug Lea 所創做。AQS 是不少同步器的基礎框架,好比 ReentrantLock、CountDownLatch 和 Semaphore 等都是基於 AQS 實現的。除此以外,咱們還能夠基於 AQS,定製出咱們所須要的同步器。html
AQS 的使用方式一般都是經過內部類繼承 AQS 實現同步功能,經過繼承 AQS,能夠簡化同步器的實現。如前面所說,AQS 是不少同步器實現的基礎框架。弄懂 AQS 對理解 Java 併發包裏的組件大有裨益,這也是我學習 AQS 並寫出這篇文章的原因。另外,須要說明的是,AQS 自己並非很好理解,細節不少。在看的過程當中藥有必定的耐心,作好看多遍的準備。好了,其餘的就很少說了,開始進入正題吧。java
在 AQS 內部,經過維護一個FIFO 隊列
來管理多線程的排隊工做。在公平競爭的狀況下,沒法獲取同步狀態的線程將會被封裝成一個節點,置於隊列尾部。入隊的線程將會經過自旋的方式獲取同步狀態,若在有限次的嘗試後,仍未獲取成功,線程則會被阻塞住。大體示意圖以下:node
當頭結點釋放同步狀態後,且後繼節點對應的線程被阻塞,此時頭結點編程
線程將會去喚醒後繼節點線程。後繼節點線程恢復運行並獲取同步狀態後,會將舊的頭結點從隊列中移除,並將本身設爲頭結點。大體示意圖以下:多線程
本節將介紹三組重要的方法,經過使用這三組方法便可實現一個同步組件。併發
第一組方法是用於訪問/設置同步狀態的,以下:oracle
方法 | 說明 |
---|---|
int getState() | 獲取同步狀態 |
void setState() | 設置同步狀態 |
boolean compareAndSetState(int expect, int update) | 經過 CAS 設置同步狀態 |
第二組方須要由同步組件覆寫。以下:框架
方法 | 說明 |
---|---|
boolean tryAcquire(int arg) | 獨佔式獲取同步狀態 |
boolean tryRelease(int arg) | 獨佔式釋放同步狀態 |
int tryAcquireShared(int arg) | 共享式獲取同步狀態 |
boolean tryReleaseShared(int arg) | 共享式私房同步狀態 |
boolean isHeldExclusively() | 檢測當前線程是否獲取獨佔鎖 |
第三組方法是一組模板方法,同步組件可直接調用。以下:ide
方法 | 說明 |
---|---|
void acquire(int arg) | 獨佔式獲取同步狀態,該方法將會調用 tryAcquire 嘗試獲取同步狀態。獲取成功則返回,獲取失敗,線程進入同步隊列等待。 |
void acquireInterruptibly(int arg) | 響應中斷版的 acquire |
boolean tryAcquireNanos(int arg,long nanos) | 超時+響應中斷版的 acquire |
void acquireShared(int arg) | 共享式獲取同步狀態,同一時刻可能會有多個線程得到同步狀態。好比讀寫鎖的讀鎖就是就是調用這個方法獲取同步狀態的。 |
void acquireSharedInterruptibly(int arg) | 響應中斷版的 acquireShared |
boolean tryAcquireSharedNanos(int arg,long nanos) | 超時+響應中斷版的 acquireShared |
boolean release(int arg) | 獨佔式釋放同步狀態 |
boolean releaseShared(int arg) | 共享式釋放同步狀態 |
上面列舉了一堆方法,看似繁雜。但稍微理一下,就會發現上面諸多方法無非就兩大類:一類是獨佔式獲取和釋放共享狀態,另外一類是共享式獲取和釋放同步狀態。至於這兩類方法的實現細節,我會在接下來的章節中講到,繼續往下看吧。oop
在併發的狀況下,AQS 會將未獲取同步狀態的線程將會封裝成節點,並將其放入同步隊列尾部。同步隊列中的節點除了要保存線程,還要保存等待狀態。無論是獨佔式仍是共享式,在獲取狀態失敗時都會用到節點類。因此這裏咱們要先看一下節點類的實現,爲後面的源碼分析進行簡單鋪墊。源碼以下:
static final class Node { /** 共享類型節點,標記節點在共享模式下等待 */ static final Node SHARED = new Node(); /** 獨佔類型節點,標記節點在獨佔模式下等待 */ static final Node EXCLUSIVE = null; /** 等待狀態 - 取消 */ static final int CANCELLED = 1; /** * 等待狀態 - 通知。某個節點是處於該狀態,當該節點釋放同步狀態後, * 會通知後繼節點線程,使之能夠恢復運行 */ static final int SIGNAL = -1; /** 等待狀態 - 條件等待。代表節點等待在 Condition 上 */ static final int CONDITION = -2; /** * 等待狀態 - 傳播。表示無條件向後傳播喚醒動做,詳細分析請看第五章 */ static final int PROPAGATE = -3; /** * 等待狀態,取值以下: * SIGNAL, * CANCELLED, * CONDITION, * PROPAGATE, * 0 * * 初始狀況下,waitStatus = 0 */ volatile int waitStatus; /** * 前驅節點 */ volatile Node prev; /** * 後繼節點 */ volatile Node next; /** * 對應的線程 */ volatile Thread thread; /** * 下一個等待節點,用在 ConditionObject 中 */ Node nextWaiter; /** * 判斷節點是不是共享節點 */ final boolean isShared() { return nextWaiter == SHARED; } /** * 獲取前驅節點 */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } /** addWaiter 方法會調用該構造方法 */ Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } /** Condition 中會用到此構造方法 */ Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
獨佔式獲取同步狀態時經過 acquire 進行的,下面來分析一下該方法的源碼。以下:
/** * 該方法將會調用子類複寫的 tryAcquire 方法獲取同步狀態, * - 獲取成功:直接返回 * - 獲取失敗:將線程封裝在節點中,並將節點置於同步隊列尾部, * 經過自旋嘗試獲取同步狀態。若是在有限次內仍沒法獲取同步狀態, * 該線程將會被 LockSupport.park 方法阻塞住,直到被前驅節點喚醒 */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } /** 向同步隊列尾部添加一個節點 */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 嘗試以快速方式將節點添加到隊列尾部 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 快速插入節點失敗,調用 enq 方法,不停的嘗試插入節點 enq(node); return node; } /** * 經過 CAS + 自旋的方式插入節點到隊尾 */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize // 設置頭結點,初始狀況下,頭結點是一個空節點 if (compareAndSetHead(new Node())) tail = head; } else { /* * 將節點插入隊列尾部。這裏是先將新節點的前驅設爲尾節點,以後在嘗試將新節點設爲尾節 * 點,最後再將原尾節點的後繼節點指向新的尾節點。除了這種方式,咱們還先設置尾節點, * 以後再設置前驅和後繼,即: * * if (compareAndSetTail(t, node)) { * node.prev = t; * t.next = node; * } * * 但但若是是這樣作,會致使一個問題,即短時內,隊列結構會遭到破壞。考慮這種狀況, * 某個線程在調用 compareAndSetTail(t, node)成功後,該線程被 CPU 切換了。此時 * 設置前驅和後繼的代碼還沒帶的及執行,但尾節點指針卻設置成功,致使隊列結構短時內會 * 出現以下狀況: * * +------+ prev +-----+ +-----+ * head | | <---- | | | | tail * | | ----> | | | | * +------+ next +-----+ +-----+ * * tail 節點徹底脫離了隊列,這樣致使一些隊列遍歷代碼出錯。若是先設置 * 前驅,在設置尾節點。及時線程被切換,隊列結構短時可能以下: * * +------+ prev +-----+ prev +-----+ * head | | <---- | | <---- | | tail * | | ----> | | | | * +------+ next +-----+ +-----+ * * 這樣並不會影響從後向前遍歷,不會致使遍歷邏輯出錯。 * * 參考: * https://www.cnblogs.com/micrari/p/6937995.html */ node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } /** * 同步隊列中的線程在此方法中以循環嘗試獲取同步狀態,在有限次的嘗試後, * 若仍未獲取鎖,線程將會被阻塞,直至被前驅節點的線程喚醒。 */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 循環獲取同步狀態 for (;;) { final Node p = node.predecessor(); /* * 前驅節點若是是頭結點,代表前驅節點已經獲取了同步狀態。前驅節點釋放同步狀態後, * 在不出異常的狀況下, tryAcquire(arg) 應返回 true。此時節點就成功獲取了同 * 步狀態,並將本身設爲頭節點,原頭節點出隊。 */ if (p == head && tryAcquire(arg)) { // 成功獲取同步狀態,設置本身爲頭節點 setHead(node); p.next = null; // help GC failed = false; return interrupted; } /* * 若是獲取同步狀態失敗,則根據條件判斷是否應該阻塞本身。 * 若是不阻塞,CPU 就會處於忙等狀態,這樣會浪費 CPU 資源 */ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { /* * 若是在獲取同步狀態中出現異常,failed = true,cancelAcquire 方法會被執行。 * tryAcquire 需同步組件開發者覆寫,不免不了會出現異常。 */ if (failed) cancelAcquire(node); } } /** 設置頭節點 */ private void setHead(Node node) { // 僅有一個線程能夠成功獲取同步狀態,因此這裏不須要進行同步控制 head = node; node.thread = null; node.prev = null; } /** * 該方法主要用途是,當線程在獲取同步狀態失敗時,根據前驅節點的等待狀態,決定後續的動做。好比前驅 * 節點等待狀態爲 SIGNAL,代表當前節點線程應該被阻塞住了。不能總是嘗試,避免 CPU 忙等。 * ————————————————————————————————————————————————————————————————— * | 前驅節點等待狀態 | 相應動做 | * ————————————————————————————————————————————————————————————————— * | SIGNAL | 阻塞 | * | CANCELLED | 向前遍歷, 移除前面全部爲該狀態的節點 | * | waitStatus < 0 | 將前驅節點狀態設爲 SIGNAL, 並再次嘗試獲取同步狀態 | * ————————————————————————————————————————————————————————————————— */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; /* * 前驅節點等待狀態爲 SIGNAL,表示當前線程應該被阻塞。 * 線程阻塞後,會在前驅節點釋放同步狀態後被前驅節點線程喚醒 */ if (ws == Node.SIGNAL) return true; /* * 前驅節點等待狀態爲 CANCELLED,則之前驅節點爲起點向前遍歷, * 移除其餘等待狀態爲 CANCELLED 的節點。 */ if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * 等待狀態爲 0 或 PROPAGATE,設置前驅節點等待狀態爲 SIGNAL, * 並再次嘗試獲取同步狀態。 */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { // 調用 LockSupport.park 阻塞本身 LockSupport.park(this); return Thread.interrupted(); } /** * 取消獲取同步狀態 */ private void cancelAcquire(Node node) { if (node == null) return; node.thread = null; // 前驅節點等待狀態爲 CANCELLED,則向前遍歷並移除其餘爲該狀態的節點 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 記錄 pred 的後繼節點,後面會用到 Node predNext = pred.next; // 將當前節點等待狀態設爲 CANCELLED node.waitStatus = Node.CANCELLED; /* * 若是當前節點是尾節點,則經過 CAS 設置前驅節點 prev 爲尾節點。設置成功後,再利用 CAS 將 * prev 的 next 引用置空,斷開與後繼節點的聯繫,完成清理工做。 */ if (node == tail && compareAndSetTail(node, pred)) { /* * 執行到這裏,代表 pred 節點被成功設爲了尾節點,這裏經過 CAS 將 pred 節點的後繼節點 * 設爲 null。注意這裏的 CAS 即便失敗了,也不要緊。失敗了,代表 pred 的後繼節點更新 * 了。pred 此時已是尾節點了,若後繼節點被更新,則是有新節點入隊了。這種狀況下,CAS * 會失敗,但失敗不會影響同步隊列的結構。 */ compareAndSetNext(pred, predNext, null); } else { int ws; // 根據條件判斷是喚醒後繼節點,仍是將前驅節點和後繼節點鏈接到一塊兒 if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) /* * 這裏使用 CAS 設置 pred 的 next,代表多個線程同時在取消,這裏存在競爭。 * 不過此處沒針對 compareAndSetNext 方法失敗後作一些處理,代表即便失敗了也 * 不要緊。實際上,多個線程同時設置 pred 的 next 引用時,只要有一個能設置成 * 功便可。 */ compareAndSetNext(pred, predNext, next); } else { /* * 喚醒後繼節點對應的線程。這裏簡單講一下爲何要喚醒後繼線程,考慮下面一種狀況: * head node1 node2 tail * ws=0 ws=1 ws=-1 ws=0 * +------+ prev +-----+ prev +-----+ prev +-----+ * | | <---- | | <---- | | <---- | | * | | ----> | | ----> | | ----> | | * +------+ next +-----+ next +-----+ next +-----+ * * 頭結點初始狀態爲 0,node一、node2 和 tail 節點依次入隊。node1 自旋過程當中調用 * tryAcquire 出現異常,進入 cancelAcquire。head 節點此時等待狀態仍然是 0,它 * 會認爲後繼節點還在運行中,所它在釋放同步狀態後,不會去喚醒後繼等待狀態爲非取消的 * 節點 node2。若是 node1 再不喚醒 node2 的線程,該線程面臨沒法被喚醒的狀況。此 * 時,整個同步隊列就回所有阻塞住。 */ unparkSuccessor(node); } node.next = node; // help GC } } private void unparkSuccessor(Node node) { int ws = node.waitStatus; /* * 經過 CAS 將等待狀態設爲 0,讓後繼節點線程多一次 * 嘗試獲取同步狀態的機會 */ if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; /* * 這裏若是 s == null 處理,是否是代表 node 是尾節點?答案是不必定。緣由以前在分析 * enq 方法時說過。這裏再囉嗦一遍,新節點入隊時,隊列瞬時結構可能以下: * node1 node2 * +------+ prev +-----+ prev +-----+ * head | | <---- | | <---- | | tail * | | ----> | | | | * +------+ next +-----+ +-----+ * * node2 節點爲新入隊節點,此時 tail 已經指向了它,但 node1 後繼引用還未設置。 * 這裏 node1 就是 node 參數,s = node1.next = null,但此時 node1 並非尾 * 節點。因此這裏不能從前向後遍歷同步隊列,應該從後向前。 */ for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 喚醒 node 的後繼節點線程 LockSupport.unpark(s.thread); }
到這裏,獨佔式獲取同步狀態的分析就講完了。若是僅分析獲取同步狀態的大體流程,那麼這個流程並不難。但若深刻到細節之中,仍是須要思考思考。這裏對獨佔式獲取同步狀態的大體流程作個總結,以下:
上面的步驟對應下面的流程圖:
上面流程圖參考自《Java併發編程》第128頁圖 5-5,這裏進行了從新繪製,並作了必定的修改。
相對於獲取同步狀態,釋放同步狀態的過程則要簡單的多,這裏簡單羅列一下步驟:
就兩個步驟,下面看一下源碼分析。
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; /* * 這裏簡單列舉條件分支的可能性,以下: * 1. head = null * head 還未初始化。初始狀況下,head = null,當第一個節點入隊後,head 會被初始 * 爲一個虛擬(dummy)節點。這裏,若是還沒節點入隊就調用 release 釋放同步狀態, * 就會出現 h = null 的狀況。 * * 2. head != null && waitStatus = 0 * 代表後繼節點對應的線程仍在運行中,不須要喚醒 * * 3. head != null && waitStatus < 0 * 後繼節點對應的線程可能被阻塞了,須要喚醒 */ if (h != null && h.waitStatus != 0) // 喚醒後繼節點,上面分析過了,這裏再也不贅述 unparkSuccessor(h); return true; } return false; }
與獨佔模式不一樣,共享模式下,同一時刻會有多個線程獲取共享同步狀態。共享模式是實現讀寫鎖中的讀鎖、CountDownLatch 和 Semaphore 等同步組件的基礎,搞懂了,再去理解一些共享同步組件就不難了。
共享類型的節點獲取共享同步狀態後,若是後繼節點也是共享類型節點,當前節點則會喚醒後繼節點。這樣,多個節點線程便可同時獲取共享同步狀態。
public final void acquireShared(int arg) { // 嘗試獲取共享同步狀態,tryAcquireShared 返回的是整型 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; // 這裏和前面同樣,也是經過有限次自旋的方式獲取同步狀態 for (;;) { final Node p = node.predecessor(); /* * 前驅是頭結點,其類型多是 EXCLUSIVE,也多是 SHARED. * 若是是 EXCLUSIVE,線程沒法獲取共享同步狀態。 * 若是是 SHARED,線程則可獲取共享同步狀態。 * 能不能獲取共享同步狀態要看 tryAcquireShared 具體的實現。好比多個線程競爭讀寫 * 鎖的中的讀鎖時,均能成功獲取讀鎖。但多個線程同時競爭信號量時,可能就會有一部分線 * 程因沒法競爭到信號量資源而阻塞。 */ if (p == head) { // 嘗試獲取共享同步狀態 int r = tryAcquireShared(arg); if (r >= 0) { // 設置頭結點,若是後繼節點是共享類型,喚醒後繼節點 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } /** * 這個方法作了兩件事情: * 1. 設置自身爲頭結點 * 2. 根據條件判斷是否要喚醒後繼節點 */ private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // 設置頭結點 setHead(node); /* * 這個條件分支由 propagate > 0 和 h.waitStatus < 0 兩部分組成。 * h.waitStatus < 0 時,waitStatus = SIGNAL 或 PROPAGATE。這裏僅依賴 * 條件 propagate > 0 判斷是否喚醒後繼節點是不充分的,至於緣由請參考第五章 */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; /* * 節點 s 若是是共享類型節點,則應該喚醒該節點 * 至於 s == null 的狀況前面分析過,這裏不在贅述。 */ if (s == null || s.isShared()) doReleaseShared(); } } /** * 該方法用於在 acquires/releases 存在競爭的狀況下,確保喚醒動做向後傳播。 */ private void doReleaseShared() { /* * 下面的循環在 head 節點存在後繼節點的狀況下,作了兩件事情: * 1. 若是 head 節點等待狀態爲 SIGNAL,則將 head 節點狀態設爲 0,並喚醒後繼節點 * 2. 若是 head 節點等待狀態爲 0,則將 head 節點狀態設爲 PROPAGATE,保證喚醒可以正 * 常傳播下去。關於 PROPAGATE 狀態的細節分析,後面會講到。 */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } /* * ws = 0 的狀況下,這裏要嘗試將狀態從 0 設爲 PROPAGATE,保證喚醒向後 * 傳播。setHeadAndPropagate 在讀到 h.waitStatus < 0 時,能夠繼續喚醒 * 後面的節點。 */ else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
到這裏,共享模式下獲取同步狀態的邏輯就分析完了,不過我這裏只作了簡單分析。相對於獨佔式獲取同步狀態,共享式的狀況更爲複雜。獨佔模式下,只有一個節點線程能夠成功獲取同步狀態,也只有獲取已同步狀態節點線程才能夠釋放同步狀態。但在共享模式下,多個共享節點線程能夠同時得到同步狀態,在一些線程獲取同步狀態的同時,可能還會有另一些線程正在釋放同步狀態。因此,共享模式更爲複雜。這裏個人腦力跟不上了,無法面面俱到的分析,見諒。
最後說一下共享模式下獲取同步狀態的大體流程,以下:
釋放共享狀態主要邏輯在 doReleaseShared 中,doReleaseShared 上節已經分析過,這裏就不贅述了。共享節點線程在獲取同步狀態和釋放同步狀態時都會調用 doReleaseShared,因此 doReleaseShared 是多線程競爭集中的地方。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
AQS 的節點有幾種不一樣的狀態,這個在 4.1 節介紹過。在這幾個狀態中,PROPAGATE 的用途多是最很差理解的。網上包括一些書籍關於該狀態的敘述基本都是一句帶過,也就是 PROPAGATE 字面意義,即向後傳播喚醒動做。至於怎麼傳播,鮮有資料說明過。不過,好在最終我仍是找到了一篇詳細敘述了 PROPAGATE 狀態的文章。在博客園上,博友 活在夢裡 在他的文章 AbstractQueuedSynchronizer源碼解讀 對 PROPAGATE,以及其餘的一些細節進行了說明,頗有深度。在欽佩之餘,不禁得感嘆做者思考的很深刻。在徵得他的贊成後,我將在本節中引用他文章中對 PROPAGATE 狀態說明的部分,並進行必定的補充說明。這裏感謝做者 活在夢裡 的精彩分享,若不參考他的文章,個人這篇文章內容會比較空洞。好了,其餘的很少說了,繼續往下分析。
在本節中,將會說明兩個個問題,以下:
這兩個問題將會在下面兩節中分別進行說明。
PROPAGATE 狀態是用來傳播喚醒動做的,那麼它是在哪裏進行傳播的呢?答案是在setHeadAndPropagate
方法中,這裏再來看看 setHeadAndPropagate 方法的實現:
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
你們注意看 setHeadAndPropagate 方法中那個長長的判斷語句,其中有一個條件是h.waitStatus < 0
,當 h.waitStatus = SIGNAL(-1) 或 PROPAGATE(-3) 是,這個條件就會成立。那麼 PROPAGATE 狀態是在什麼時候被設置的呢?答案是在doReleaseShared
方法中,以下:
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) {...} // 若是 ws = 0,則將 h 狀態設爲 PROPAGATE else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } ... } }
再回到 setHeadAndPropagate 的實現,該方法既然引入了h.waitStatus < 0
這個條件,就意味着僅靠條件propagate > 0
判斷是否喚醒後繼節點線程的機制是不充分的。至於爲啥不充分,請繼續往看下看。
PROPAGATE 的引入是爲了解決一個 BUG -- JDK-6801020,復現這個 BUG 的代碼以下:
import java.util.concurrent.Semaphore; public class TestSemaphore { private static Semaphore sem = new Semaphore(0); private static class Thread1 extends Thread { @Override public void run() { sem.acquireUninterruptibly(); } } private static class Thread2 extends Thread { @Override public void run() { sem.release(); } } public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 10000000; i++) { Thread t1 = new Thread1(); Thread t2 = new Thread1(); Thread t3 = new Thread2(); Thread t4 = new Thread2(); t1.start(); t2.start(); t3.start(); t4.start(); t1.join(); t2.join(); t3.join(); t4.join(); System.out.println(i); } } }
根據 BUG 的描述消息可知 JDK 6u11,6u17 兩個版本受到影響。那麼,接下來再來看看引發這個 BUG 的代碼 -- JDK 6u17 中 setHeadAndPropagate 和 releaseShared 兩個方法源碼,以下:
private void setHeadAndPropagate(Node node, int propagate) { setHead(node); if (propagate > 0 && node.waitStatus != 0) { /* * Don't bother fully figuring out successor. If it * looks null, call unparkSuccessor anyway to be safe. */ Node s = node.next; if (s == null || s.isShared()) unparkSuccessor(node); } } // 和 release 方法的源碼基本同樣 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
下面來簡單說明 TestSemaphore 這個類的邏輯。這個類持有一個數值爲 0 的信號量對象,並建立了4個線程,線程 t1 和 t2 用於獲取信號量,t3 和 t4 則是調用 release() 方法釋放信號量。在通常狀況下,TestSemaphore 這個類的代碼均可以正常執行。但當有極端狀況出現時,可能會致使同步隊列掛掉。這裏演繹一下這個極端狀況,考慮某次循環時,隊列結構以下:
0
,並喚醒線程 t1。此時信號量數值爲1。0
。而後線程 t1 被切換,暫停運行。0
,因此 t4 不會調用 unparkSuccessor 方法。下面再來看一下修復 BUG 後的代碼,根據 BUG 詳情頁顯示,該 BUG 在 JDK 1.7 中被修復。這裏找一個 JDK 7 較早版本(JDK 7u10)的代碼看一下,以下:
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
在按照上面的代碼演繹一下邏輯,以下:
0
,並喚醒線程t1。此時信號量數值爲1。0
。而後線程 t1 被切換,暫停運行。h.waitStatus = 0
,t4 將頭節點等待狀態由0
設爲PROPAGATE(-3)
。propagate > 0
條件不知足。而 h.waitStatus = PROPAGATE(-3)
,因此條件h.waitStatus < 0
成立。進而,線程 t1 能夠喚醒線程 t2,完成喚醒動做的傳播。到這裏關於狀態 PROPAGATE 的內容就講完了。最後,簡單總結一下本章開頭提的兩個問題。
問題一:PROPAGATE 狀態用在哪裏,以及怎樣向後傳播喚醒動做的?
答:PROPAGATE 狀態用在 setHeadAndPropagate。當頭節點狀態被設爲 PROPAGATE 後,後繼節點成爲新的頭結點後。若 propagate > 0
條件不成立,則根據條件h.waitStatus < 0
成立與否,來決定是否喚醒後繼節點,即向後傳播喚醒動做。
問題二:引入 PROPAGATE 狀態是爲了解決什麼問題?
答:引入 PROPAGATE 狀態是爲了解決併發釋放信號量所致使部分請求信號量的線程沒法被喚醒的問題。
聲明:
本章內容是在博友 活在夢裡 的文章 AbstractQueuedSynchronizer源碼解讀 基礎上,進行了必定的補充說明。本章所參考的觀點已通過原做者贊成,爲避免抄襲嫌疑,特此聲明。
到這裏,本文就差很少結束了。若是你們從頭看到尾,到這裏就能夠放鬆一下了。寫到這裏,我也能夠放鬆一下了。這篇文章總共花費了我12天的空閒時間,確實不容易。原本我只打算講一下基本原理,但知道後來看到本文屢次推薦的那篇文章。那篇文章給個人第一感受是,做者很厲害。第二感受是,我也要寫出一篇較爲深刻的 AQS 分析文章。雖然寫出來也不能怎麼樣,水平也不會所以提升多少,也不會去造個相似的輪子。可是寫完後,確實感受頗有成就感。本文的最後,來講一下如何學習 AQS 原理。AQS 的大體原理不是很難理解,因此一開始不建議糾結細節,應該先弄懂它的大體原理。在此基礎上,再去分析一些細節,分析細節時,要從多線程的角度去考慮。好比,有點地方 CAS 失敗後要重試,有的不用重試。整體來講 AQS 的大體原理容易理解,細節部分比較複雜。不少細節要在腦子裏演繹一遍,好好思考才能想通,有點燒腦。另外由於文章篇幅的問題,關於 AQS ConditionObject 部分的分析將會放在下一篇文章中進行。
最後,再向 AQS 的做者 Doug Lea 致以崇高的敬意。僅盡力弄懂 AQS 的原理都很難了,可想而知,實現 AQS 的難度有多大。
限於本人的能力,加之深刻分析 AQS 自己就比較有難度。因此文中不免會有錯誤出現,若是不慎翻車,請見諒。也歡迎在評論區指明這些錯誤,感謝。
本文在知識共享許可協議 4.0 下發布,轉載需在明顯位置處註明出處
做者:coolblog
本文同步發佈在個人我的博客: http://www.coolblog.xyz
本做品採用知識共享署名-非商業性使用-禁止演繹 4.0 國際許可協議進行許可。