最近一直在研究AQS的源碼,但願能夠更深入的理解AQS的實現原理。雖然網上有不少關於AQS的源碼分析,可是看完之後感受仍是隻知其一;不知其二。因而,我將本身的整個理解過程記錄下來了,但願對你們有所幫助。java
AQS是Java中鎖的基礎,主要由兩個隊列組成。一個隊列是同步隊列,另外一個是條件隊列。node
head
,隊列尾部是tail
節點,head
節點是一個空節點,同步隊列是一個雙向鏈表,經過next
和prev
鏈接全部節點Node
節點,線程與節點綁定在一塊兒,(若是是同步鎖和排他鎖不一樣之處是經過nextWaiter
來區分的)而且添加到同步隊列的尾部head
的第一個節點獲取鎖,其他節點都須要等待被喚醒null
的狀況(如:線程超時中斷、線程更新節點的中間態),被取消和null
的節點不能被喚醒,將會被視爲無效節點null
的節點除外)喚醒nextWaiter
來鏈接起來,條件隊列的頭節點是firstWaiter
,尾節點是lastWaiter
signal
或signalAll
方法喚醒的節點)纔會被轉移到同步隊列首先,瞭解如下同步隊列中隊列的節點Node
的數據結構安全
static final class Node { /** 共享鎖的標識 */ static final Node SHARED = new Node(); /** 排他鎖的標識 */ static final Node EXCLUSIVE = null; /** 線程取消 */ static final int CANCELLED = 1; /** 持有鎖的線程的後繼線程被掛起 */ static final int SIGNAL = -1; /** 條件隊列標識 */ static final int CONDITION = -2; /** * 共享鎖狀況下,通知全部其餘節點 */ static final int PROPAGATE = -3; /** * waitStatus的取值以下: * SIGNAL(-1): 當前節點的後繼節點應該被掛起 * CANCELLED(1): 當前節點被取消 * CONDITION(-2): 當前節點在條件隊列 * PROPAGATE(-3): 釋放共享鎖時須要通知全部節點 * 0: 初始值 * */ volatile int waitStatus; /** * 前驅節點 */ volatile Node prev; /** * 後繼節點 */ volatile Node next; /** * 節點對應的線程 */ volatile Thread thread; /** * 在共享鎖的狀況下,該節點的值爲SHARED * 在排他鎖的狀況下,該節點的值爲EXCLUSIVE * 在條件隊列的狀況下,連接的是下一個等待條件的線程 */ Node nextWaiter; }
其次,咱們來看一下同步隊列的鏈表結構
數據結構
接着,咱們根據同步隊列的原理來分析如下acquire
和release
須要作哪些事情:源碼分析
node
(該節點多是排他鎖,也能夠能是共享鎖)node
添加到同步隊列尾部,若是同步隊列爲空(初始狀況下),須要先建立一個空的頭節點,而後再添加到隊列的尾部node
的前驅節點是head
,說明node
是第一個節點,可以獲取鎖,須要將head
修改爲node
,釋放前驅節點的資源node
的前驅節點不是head
,說明獲取鎖失敗,須要檢測是否須要將node
綁定的線程掛起,分如下幾種狀況:
node
的waitStatus
已經被設置爲SIGNAL
表示須要被掛起node
的waitStatus
設置爲CANCEL
表示該節點已經被取消,須要被去掉,並修改 node
的prev
,直到連接上一個有效的節點爲止node
的waitStatus
設置爲SIGNAL
,表示即將要被掛起node
綁定的線程掛起,則讓出CPU,直到當前驅節點來喚起node
纔會開始繼續從步驟3
開始執行public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire(arg)
:對外提供的一個擴展方法,經常使用的鎖都要實現這個方法,具體實現與鎖相關ui
addWaiter(Node.EXCLUSIVE)
: 建立一個排他鎖節點,並將該節點添加到同步隊列尾部,代碼以下:this
private Node addWaiter(Node mode) { // 建立一個node,EXCLUSIVE類型 Node node = new Node(mode); for (;;) { // 獲取尾節點 Node oldTail = tail; if (oldTail != null) { // 設置即將成爲尾節點的前驅 node.setPrevRelaxed(oldTail); // CAS操做設置尾節點 if (compareAndSetTail(oldTail, node)) { // 將新尾節點的前驅節點與新的尾節點關聯起來 oldTail.next = node; // 返回添加的節點 // 這個節點如今不必定是尾節點,由於若是有多個線程調用這個方法時, // 可能還有節點添加在這個節點後面 return node; } } else { // 若是隊列爲空,初始化頭節點 initializeSyncQueue(); } } }
acquireQueued
:同步隊列中的節點獲取排他鎖final boolean acquireQueued(final Node node, int arg) { try { // 線程是否中斷 boolean interrupted = false; for (;;) { // 獲取前驅節點 final Node p = node.predecessor(); // 若是前驅節點是頭節點,獲取鎖 if (p == head && tryAcquire(arg)) { // 修改頭節點 setHead(node); // 釋放頭節點的資源 p.next = null; // help GC // 返回線程中斷的狀態 // 這也是該方法惟一的返回值 // 沒有獲取鎖的線程會一直執行該方法直到獲取鎖之後再返回 return interrupted; } // 獲取鎖失敗後是否須要將線程掛起 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 線程掛起並返回是否被中斷 interrupted = true; } } catch (Throwable t) { // 取消該節點 cancelAcquire(node); throw t; } }
shouldParkAfterFailedAcquire
:檢測線程獲取鎖失敗之後是否須要被掛起private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 前驅節點的狀態 int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * 狀態已經設置成SIGNAL,能夠直接掛起該節點 */ return true; // 節點被取消 if (ws > 0) { /* * 找到pred第一個有效的前驅節點 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // pred多是一個新的節點,須要將pred的next重寫設置爲node pred.next = node; } else { /* * CAS操做將pred節點的狀態設置爲SIGNAL */ pred.compareAndSetWaitStatus(ws, Node.SIGNAL); } // 只有當pred節點的waitStatus已是SIGNAL狀態時,才能夠安全的掛起線程 // 不然須要不能被掛起 return false; }
parkAndCheckInterrupt
:將當前線程掛起,並檢測當前線程是否中斷private final boolean parkAndCheckInterrupt() { // 線程掛起 LockSupport.park(this); // 檢測線程是否中斷 return Thread.interrupted(); }
cancelAcquire
:取消節點private void cancelAcquire(Node node) { // 若是節點爲空,什麼都不作 if (node == null) return; // 釋放線程 node.thread = null; // 從後往前過濾掉全部的被取消的節點 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 有效前驅節點的nex節點 Node predNext = pred.next; // 將node設置爲CANCELLED node.waitStatus = Node.CANCELLED; // 若是是尾節點,設置新的尾節點 if (node == tail && compareAndSetTail(node, pred)) { // 將新的尾節點的後續設置爲null pred.compareAndSetNext(predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; // 若是前驅節點的線程不爲null而且waitStatus爲SIGNAL if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; // 將node設置成pred的後繼節點 if (next != null && next.waitStatus <= 0) pred.compareAndSetNext(predNext, next); } else { // 喚起node節點的後繼節點 // 由於node節點已經釋放鎖了 unparkSuccessor(node); } node.next = node; // help GC } }
unparkSuccessor
:喚醒後繼節點private void unparkSuccessor(Node node) { /* * 獲取node節點的waitStatus */ int ws = node.waitStatus; // 用CSA操做將waitStatus設置成初始狀態 // 無論設置是否成功,都無所謂,由於該節點即將被銷燬 if (ws < 0) node.compareAndSetWaitStatus(ws, 0); /* * 獲取node的後繼節點 */ Node s = node.next; // 若是後繼節點爲null或者被取消, // 經過從同步隊列的尾節點開始一直往前找到一個有效的後繼節點 if (s == null || s.waitStatus > 0) { s = null; for (Node p = tail; p != node && p != null; p = p.prev) if (p.waitStatus <= 0) s = p; } // 若是後繼節點不爲空 if (s != null) LockSupport.unpark(s.thread);// 喚醒後繼節點的線程 }
與acquire
方法相似的還有acquireInterruptibly
、tryAcquireNanos
、acquireShared
、acquireSharedInterruptibly
和tryAcquireSharedNanos
,咱們都一一分析如下線程
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) // 若是線程中斷,直接返回 throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); // 中斷式的獲取鎖 }
doAcquireInterruptibly
:可中斷式的獲取鎖private void doAcquireInterruptibly(int arg) throws InterruptedException { // 建立一個排他節點加入同步隊列 final Node node = addWaiter(Node.EXCLUSIVE); try { for (;;) { // 獲取前驅節點 final Node p = node.predecessor(); // 若是前驅節點是頭節點,說明已經獲取的鎖 if (p == head && tryAcquire(arg)) { // 修改頭節點 setHead(node); p.next = null; // help GC return; } // 若是沒有獲取鎖,檢測是否須要掛起 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); // 若是發現線程已經被中斷,須要拋出異常 } } catch (Throwable t) { // 發生異常取消節點 cancelAcquire(node); throw t; } }
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 線程中斷直接返回 return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); // 超時獲取排他鎖 }
doAcquireNanos
:超時獲取排他鎖private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { // 若是超時直接返回 if (nanosTimeout <= 0L) return false; // 獲取超時時長 final long deadline = System.nanoTime() + nanosTimeout; // 添加一個排他節點到同步隊列尾部 final Node node = addWaiter(Node.EXCLUSIVE); try { for (;;) { // 獲取前驅節點 final Node p = node.predecessor(); // 已經獲取鎖 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return true; } nanosTimeout = deadline - System.nanoTime(); // 若是超時了就取消 if (nanosTimeout <= 0L) { cancelAcquire(node); return false; } // 檢測節點是否須要被掛起 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) // 若是須要掛起,且超時時長大於SPIN_FOR_TIMEOUT_THRESHOLD // 線程掛起nanosTimeout時間 LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } catch (Throwable t) { // 發生異常取消節點 cancelAcquire(node); throw t; } }
public final void acquireShared(int arg) { // 對外提供的一個擴展方法,經常使用的鎖都要實現這個方法, // 該方法的實現與鎖的用途有關 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); // 獲取共享鎖 }
doAcquireShared
:獲取共享鎖private void doAcquireShared(int arg) { // 添加一個共享節點到同步隊列尾部 final Node node = addWaiter(Node.SHARED); try { boolean interrupted = false; for (;;) { // 獲取前驅節點 final Node p = node.predecessor(); if (p == head) { // 返回結果大於等於0表示獲取共享鎖 int r = tryAcquireShared(arg); if (r >= 0) { // 設置頭節點並廣播通知其餘獲取共享鎖的節點 setHeadAndPropagate(node, r); p.next = null; // help GC // 若是線程被中斷,將該線程中斷 // 共享鎖會被多個線程獲取,若是須要中斷 // 全部獲取共享鎖的線程都要被中斷 if (interrupted) selfInterrupt(); return; } } // 檢測是否須要掛起 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 掛起並中斷 interrupted = true; } } catch (Throwable t) { // 發生異常取消節點 cancelAcquire(node); throw t; } }
setHeadAndPropagate
:設置頭節點並廣播其餘節點來獲取鎖private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // 記錄舊的頭節點 setHead(node);// 設置新的頭節點 /* * 若是頭節點爲null或者是否是取消狀態,嘗試喚醒後繼節點 */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; // node節點的next是SHARED,即共享鎖 if (s == null || s.isShared()) // 喚起獲取共享鎖的線程 doReleaseShared(); } }
doReleaseShared
:喚醒等待共享鎖的節點private void doReleaseShared() { /* * 喚醒時是從頭節點開始先喚醒第一個共享節點, * 第一個共享節點被喚醒後會在doAcquireShared方法裏繼續執行(以前就是在這個方法裏被掛起的) * 第一個共享節點若是獲取鎖會調用setHeadAndPropagate方法修改頭節點,而後再調用doReleaseShared方法 * 喚醒第二個共享節點,以此類推,最後把全部的共享節點都喚醒 */ for (;;) { Node h = head; if (h != null && h != tail) { // 獲取頭節點的狀態 int ws = h.waitStatus; // 若是頭節點是SIGNAL,須要將狀態設置爲0,表示已經即將被喚醒 if (ws == Node.SIGNAL) { if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) continue; // 若是失敗了說明有其餘線程在修改頭節點,須要繼續重試 unparkSuccessor(h); // 喚醒頭節點的後繼節點 } else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) continue; // 將頭節點狀態從0設置成PROPAGATE,若是失敗了繼續,由於也有其餘獲取共享鎖的線程在更改頭節點 } // 若是頭節點未改變(由於沒有後繼節點須要等待共享鎖),跳出循環 if (h == head) break; } }
selfInterrupt
:中斷當前線程static void selfInterrupt() { Thread.currentThread().interrupt(); }
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 若是線程被中斷拋出異常 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); // 可中斷的方式獲取共享鎖 }
doAcquireSharedInterruptibly
:可中斷的方式後去共享鎖private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 添加共享鎖節點到同步隊列尾部 final Node node = addWaiter(Node.SHARED); try { for (;;) { // 獲取前驅節點 final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { // 獲取共享鎖之後修改頭節點,通知其餘等待共享鎖的節點 setHeadAndPropagate(node, r); p.next = null; // help GC return; } } // 線程獲取共享鎖失敗後須要掛起,而且發現線程被中斷,因此拋出異常 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } catch (Throwable t) { // 發生異常取消節點 cancelAcquire(node); throw t; } }
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) // 線程若是中斷了,直接拋出異常 throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); // 超時獲取共享鎖 }
doAcquireSharedNanos
:超時的方式獲取中斷鎖private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { // 超時直接返回 if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; // 添加共享節點到同步隊列尾部 final Node node = addWaiter(Node.SHARED); try { for (;;) { // 獲取前驅節點 final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { // 獲取鎖,修改頭節點,通知全部其餘等待共享鎖的節點 setHeadAndPropagate(node, r); p.next = null; // help GC return true; } } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) { // 超時取消節點 cancelAcquire(node); return false; } if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) // 若是須要掛起,且超時時長大於SPIN_FOR_TIMEOUT_THRESHOLD // 線程掛起nanosTimeout時間 LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); // 中斷了拋出異常 } } catch (Throwable t) { // 發生異常取消節點 cancelAcquire(node); throw t; } }
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; // 頭節點不能是一箇中間態 if (h != null && h.waitStatus != 0) // 喚醒後繼節點 unparkSuccessor(h); return true; } return false; }
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { // 釋放共享鎖,從頭節點開始一個一個的釋放 // 若是存在多個共享節點在同步隊列時,doReleaseShared方式實際上是遞歸調用 doReleaseShared(); return true; } return false; }
至此,將全部獲取鎖和釋放鎖的方法相關的源碼所有分析完3d
咱們來看一下條件隊列的鏈表結構
code
CONDITION
類型的節點,將該節點添加到條件隊列Condition.await()
方法)public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 若是線程中斷,直接拋出異常 // 建立一個CONDITION類型的節點,將該節點添加到條件隊列尾部 Node node = addConditionWaiter(); // 釋放鎖 // 在調用await方法以前都會調用lock方法,這個時候已經獲取鎖了 // 有時候鎖仍是可重入的,因此須要將全部的資源都釋放掉 int savedState = fullyRelease(node); int interruptMode = 0; // 若是節點再也不同步隊列,所有都要掛起 while (!isOnSyncQueue(node)) { LockSupport.park(this); // 若是在等待期間發生過中斷(不論是調用signal以前仍是以後),直接退出 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 讓線程嘗試去獲取鎖,若是沒法獲取鎖就掛起 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 清除全部在條件隊列中是取消狀態的節點 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); // 發生中斷,上報中斷狀況 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
addConditionWaiter
:在條件隊列中添加一個節點private Node addConditionWaiter() { Node t = lastWaiter; // 清除條件隊列中無效的節點 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } // 建立一個節點 Node node = new Node(Node.CONDITION); // 添加到條件隊列尾部 if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
unlinkCancelledWaiters
:清除在條件隊列中被取消的節點private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; // 遍歷條件隊列將全部不是CONDITION狀態的節點所有清除掉 // 這些節點都是取消狀態的節點 while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
fullyRelease
:釋放線程持有的全部鎖資源final int fullyRelease(Node node) { try { int savedState = getState(); // 釋放全部的資源 // 若是是可重入鎖,savedState就是重入的次數 if (release(savedState)) return savedState; throw new IllegalMonitorStateException(); } catch (Throwable t) { // 發生異常就取消該節點 node.waitStatus = Node.CANCELLED; throw t; } }
isOnSyncQueue
:判斷節點是否在同步隊列final boolean isOnSyncQueue(Node node) { // waitStatus是CONDITION或者node沒有前驅節點,說明node不在同步隊列 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) // 有後繼節點必定在同步隊列 return true; /* * 在同步隊列中查找node,看是否在同步隊列中 */ return findNodeFromTail(node); }
findNodeFromTail
:在同步隊列中查找節點private boolean findNodeFromTail(Node node) { // 從尾節點開始查找 for (Node p = tail;;) { if (p == node) // 找到了 return true; if (p == null) // 找到頭了還沒找到 return false; p = p.prev; } }
checkInterruptWhileWaiting
:檢測中斷的狀況private int checkInterruptWhileWaiting(Node node) { // 沒有發生中斷返回0 // 調用signal以前發生中斷返回THROW_IE // 調用signal以後發生中斷返回REINTERRUPT return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
transferAfterCancelledWait
:清除在條件隊列中被取消的節點// 只有線程處於中斷狀態,纔會調用此方法 // 若是須要的話,將這個已經取消等待的節點轉移到阻塞隊列 // 返回 true,若是此線程在 signal 以前被取消,不然返回false final boolean transferAfterCancelledWait(Node node) { // 用 CAS 將節點狀態設置爲 0 // 若是這步 CAS 成功,說明是 signal 方法以前發生的中斷, // 由於若是 signal 先發生的話,signal 中會將 waitStatus 設置爲 0 if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) { enq(node); // 將節點放入阻塞隊列 return true; } // 到這裏是由於 CAS 失敗,確定是由於 signal 方法已經將 waitStatus 設置爲了 0 // signal 方法會將節點轉移到阻塞隊列,可是可能還沒完成,這邊自旋等待其完成 // 固然,這種事情仍是比較少的吧:signal 調用以後,沒完成轉移以前,發生了中斷 while (!isOnSyncQueue(node)) Thread.yield(); return false; }
enq
:把節點添加到同步隊列private Node enq(Node node) { // 無限循環,將節點添加到同步隊列尾部 for (;;) { Node oldTail = tail; if (oldTail != null) { node.setPrevRelaxed(oldTail); if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return oldTail; } } else { // 若是同步隊列爲空,初始化 initializeSyncQueue(); } } }
reportInterruptAfterWait
:中斷處理private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { // 若是是THROW_IE狀態,拋異常 if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) // 再次中斷,由於中斷狀態被使用過一次 selfInterrupt(); }
awaitNanos
、awaitUntil
和await(long time, TimeUnit unit)
這幾個方法的總體邏輯是同樣的,就再也不分析了
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 獲取條件隊列中的第一個節點 Node first = firstWaiter; if (first != null) // 喚醒等待條件的節點 doSignal(first); }
doSignal
:喚醒等待條件的節點private void doSignal(Node first) { do { // 去掉無效的節點 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && // 將節點轉移到同步隊列 (first = firstWaiter) != null); }
transferForSignal
:將節點轉移到同步隊列final boolean transferForSignal(Node node) { /* * 取消的節點不須要轉移 */ if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) return false; /* * 將節點加入同步隊列尾部 */ Node p = enq(node); int ws = p.waitStatus; // ws > 0 說明 node 在阻塞隊列中的前驅節點取消了等待鎖,直接喚醒 node 對應的線程 // 若是 ws <= 0, 那麼 compareAndSetWaitStatus 將會被調用 // 節點入隊後,須要把前驅節點的狀態設爲SIGNAL if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) // 若是前驅節點取消或者 CAS 失敗,會進到這裏喚醒線程 LockSupport.unpark(node.thread); return true; }
public final void signalAll() { // 若是是當前線程 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) // 喚醒全部等待條件的節點 doSignalAll(first); }
doSignalAll
:喚醒全部等待條件的節點// 將全部的節點都轉移到同步隊列 private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }
如今將與AQS相關的核心代碼都整理了一遍,裏面若是有描述不清晰或者不許確的地方但願你們能夠幫忙指出!