AbstractQueuedSynchronizer,簡稱 AQS,是一個用於構建鎖和同步器的框架。
JUC 包下常見的鎖工具如 ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch 都是基於 AQS 實現的。
本文將介紹 AQS 的數據結構及獨佔模式的實現原理。java
本文基於 jdk1.8.0_91
AQS 全部操做都圍繞着同步資源(synchronization state)來展開,解決了資源訪問的互斥和同步問題。node
AQS框架將剩下的一個問題留給用戶:獲取、釋放資源的具體方式和結果。
這實際上是一種典型的模板方法設計模式:父類(AQS框架)定義好骨架和內部操做細節,具體規則由子類去實現。segmentfault
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable
AbstractQueuedSynchronizer 繼承 AbstractOwnableSynchronizer,後者具備屬性 exclusiveOwnerThread,用於記錄獨佔模式下得到鎖的線程。設計模式
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { /** * The current owner of exclusive mode synchronization. */ private transient Thread exclusiveOwnerThread; }
AbstractQueuedSynchronizer 具備 ConditionObject 和 Node 兩個內部類。數據結構
AQS 定義了一系列模板方法以下:併發
// 獨佔獲取(資源數) protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } // 獨佔釋放(資源數) protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } // 共享獲取(資源數) protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } // 共享獲取(資源數) protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } // 是否排它狀態 protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); }
Java 中經常使用的鎖工具都是基於 AQS 來實現的。app
鎖和資源是同一個概念,是多個線程爭奪的對象。
AQS 使用 state 來表示資源/鎖,經過內置的等待隊列來完成獲取資源/鎖的排隊工做。
等待隊列(wait queue)是嚴格的 FIFO 隊列,是 CLH 鎖隊列的變種。框架
因爲 state 是共享的,使用 volatile 來保證其可見性,並提供了getState/setState/compareAndSetState
三個方法來操做 state。ide
/** * The synchronization state. */ private volatile int state;// 資源/鎖 /** * Returns the current value of synchronization state. * This operation has memory semantics of a {@code volatile} read. * @return current state value */ protected final int getState() { return state; } /** * Sets the value of synchronization state. * This operation has memory semantics of a {@code volatile} write. * @param newState the new state value */ protected final void setState(int newState) { state = newState; } /** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a {@code volatile} read * and write. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that the actual * value was not equal to the expected value. */ protected final boolean compareAndSetState(int expect, int update) { // 原子操做 // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
AQS 的內部實現了兩個隊列:同步隊列和條件隊列。這兩種隊列都使用了 Node 做爲節點。工具
節點的定義主要包含三部份內容:
節點的狀態
java.util.concurrent.locks.AbstractQueuedSynchronizer.Node
static final class Node { /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** waitStatus value to indicate the next acquireShared should unconditionally propagate */ static final int PROPAGATE = -3; // 等待狀態:SIGNAL、CANCELLED、CONDITION、PROPAGATE、0 volatile int waitStatus; // 指向同步隊列中的上一個節點 volatile Node prev; // 指向同步隊列中的下一個節點 volatile Node next; volatile Thread thread; // 在同步隊列中,nextWaiter用於標記節點的模式:獨佔、共享 // 在條件隊列中,nextWaiter指向條件隊列中的下一個節點 Node nextWaiter; /** * Returns true if node is waiting in shared mode. */ // 節點模式是否爲共享 final boolean isShared() { return nextWaiter == SHARED; } /** * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. * * @return the predecessor of this node */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
同步隊列是等待獲取鎖的隊列,是一個雙向鏈表(prev/next),使用 head/tail 執行隊列的首尾節點。
java.util.concurrent.locks.AbstractQueuedSynchronizer
/** * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. */ // 等待隊列的頭節點,懶初始化。 // 注意,若是頭節點存在,那麼它的 waitStatus 必定不是 CANCELLED private transient volatile Node head; /** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. */ // 等待隊列的尾節點,懶初始化。 // 只能經過 enq 方法給等待隊列添加新的節點。 private transient volatile Node tail; /** * The synchronization state. */ private volatile int state;
在線程嘗試獲取資源失敗後,會進入同步隊列隊尾,給前繼節點設置一個喚醒信號後,經過 LockSupport.park(this)
讓自身進入等待狀態,直到被前繼節點喚醒。
當線程在同步隊列中等待,獲取資源成功後,經過執行 setHead(node)
將自身設爲頭節點。
同步隊列的頭節點是一個 dummy node,它的 thread 爲空(某些狀況下能夠看作是表明了當前持有鎖的線程)。
/** * Sets head of queue to be node, thus dequeuing. Called only by * acquire methods. Also nulls out unused fields for sake of GC * and to suppress unnecessary signals and traversals. * * @param node the node */ private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
AQS 不會在初始化隊列的時候構建空的頭節點(dummy node),而是在第一次發生爭用時構造:
第一個線程獲取鎖,第二個線程獲取鎖失敗入隊,此時纔會初始化隊列,構造空節點並將 head/tail 指向該空節點。
具體見 AbstractQueuedSynchronizer#enq。
條件隊列是等待條件成立的隊列,是一個單向鏈表(nextWaiter),使用 firstWaiter/lastWaiter 指向隊列的首尾節點。
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject
/** First node of condition queue. */ private transient Node firstWaiter; // 條件隊列的頭節點 /** Last node of condition queue. */ private transient Node lastWaiter; // 條件隊列的尾節點
當線程獲取鎖成功以後,執行 Conition.await(),釋放鎖並進入條件隊列中等待,直到其餘線程執行 Conition.signal 喚醒當前線程。
當前線程被喚醒後,從條件隊列轉移到同步隊列,從新等待獲取鎖。
獨佔模式下,只要有一個線程佔有鎖,其餘線程試圖獲取該鎖將沒法取得成功。
獨佔模式下獲取鎖/資源,無視中斷,Lock#lock的內部實現
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
嘗試獲取資源,成功返回true。具體資源獲取方式交由自定義同步器實現。
java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquire
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
獲取資源/鎖失敗後,將當前線程封裝爲新的節點,設置節點的模式(獨佔、共享),加入同步隊列的尾部,返回該新節點。
java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared // 獨佔模式、共享模式 * @return the new node */ // 從隊列尾部入隊 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { // 設置新的尾節點 pred.next = node; return node; } } enq(node); // tail爲空,入隊 return node; // 返回當前的新節點 }
從同步隊列的尾部入隊,若是隊列不存在則進行初始化。
java.util.concurrent.locks.AbstractQueuedSynchronizer#enq
/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { // 從同步隊列的尾部入隊 for (;;) { Node t = tail; if (t == null) { // Must initialize // 隊列爲空,則建立一個空節點,做頭節點 if (compareAndSetHead(new Node())) tail = head; // 初始化完成後並無返回,而是進行下一次循環 } else { node.prev = t; if (compareAndSetTail(t, node)) { // 隊列不爲空,則當前節點做爲新的tail // CAS失敗,可能會出現尾分叉的現象,由下一次循環消除分叉 t.next = node; // 因爲不是原子操做,入隊操做先設置prev指針,再設置next指針,會致使併發狀況下沒法經過next遍歷到尾節點 return t; // 返回當前節點的上一個節點(舊的尾節點) } } } }
注意:
在同步隊列自旋、等待獲取資源直到成功,返回等待期間的中斷狀態。
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { // 上一個節點若是是頭結點,說明當前節點的線程能夠嘗試獲取鎖資源 // 獲取鎖成功,當前節點做爲新的頭節點,而且清理掉當前節點中的線程信息(也就是說頭節點是個dummy node) // 這裏不會發生爭用,不須要CAS setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 上一個節點不是頭節點,或者當前節點的線程獲取鎖失敗,須要判斷是否進入阻塞: // 1. 不能進入阻塞,則重試獲取鎖。2. 進入阻塞 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 阻塞當前線程,當從阻塞中被喚醒時,檢測當前線程是否已中斷,並清除中斷狀態。接着繼續重試獲取鎖。 interrupted = true; // 標記當前線程已中斷(若是線程在阻塞時被中斷喚醒,會重試獲取鎖直到成功以後,再響應中斷) } } finally { if (failed) // 自旋獲取鎖和阻塞過程當中發生異常 cancelAcquire(node); // 取消獲取鎖 } }
在 acquireQueued 方法中,線程在自旋中主要進行兩個判斷:
具體代碼流程:
當前節點獲取鎖失敗以後,經過校驗上一個節點的等待狀態,判斷當前節點可否進入阻塞。
返回 true,可進入阻塞;返回 false,不可進入阻塞,需重試獲取鎖。
java.util.concurrent.locks.AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire
/** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release // 當前節點已經給它的上一個節點設置了喚醒信號 * to signal it, so it can safely park. // 當前節點能夠進入阻塞 */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and // 上一個節點狀態大於 0,說明是已取消狀態 CANCELLED,不會通知當前節點 * indicate retry. // 則一直往前找到一個等待狀態的節點,並排在它的後邊 */ // 當前節點不能進入阻塞,需重試獲取鎖 do { node.prev = pred = pred.prev; // pred = pred.prev; node.prev = pred; // 跳過上一個節點,直到找到 waitStatus > 0 的節點 } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we // 上一個節點狀態等於 0 或 PROPAGATE,說明正在等待獲取鎖/資源 * need a signal, but don't park yet. Caller will need to // 此時須要給上一個節點設置喚醒信號SIGNAL,但不直接阻塞 * retry to make sure it cannot acquire before parking. // 由於在阻塞前調用者須要重試來確認它確實不能獲取資源 */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 經過 CAS 將上一個節點的狀態改成 SIGNAL } return false; }
當前節點可以進入阻塞的條件是:具備其餘線程來喚醒它。
經過設置上一個節點狀態爲 SIGNAL,以確保上一個節點在釋放鎖以後,可以喚醒當前節點。
分爲三種狀況:
進入阻塞,阻塞結束後,檢查中斷狀態。
java.util.concurrent.locks.AbstractQueuedSynchronizer#parkAndCheckInterrupt
/** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
線程在 acquireQueued 中自旋嘗試獲取鎖的過程當中,若是發生異常,會在 finally 代碼塊中執行 cancelAcquire,終止獲取鎖。
/** * Cancels an ongoing attempt to acquire. * * @param node the node */ private void cancelAcquire(Node node) { // 取消獲取鎖 // Ignore if node doesn't exist if (node == null) return; node.thread = null; // Skip cancelled predecessors // 跳過已取消的前繼節點,爲當前節點找出一個有效的前繼節點 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. Node predNext = pred.next; // Can use unconditional write instead of CAS here. // 寫操做具備可見性(volatile),所以這裏無需使用 CAS // After this atomic step, other Nodes can skip past us. // 把當前節點設爲已取消以後,其餘節點尋找有效前繼節點時會跳過當前節點 // Before, we are free of interference from other threads. node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { // 若是是尾節點,則出隊 compareAndSetNext(pred, predNext, null); } else { // 進入這裏,說明不是尾節點,或者是尾節點但出隊失敗,須要處理後繼節點 // If successor needs signal, try to set pred's next-link // 若是後繼節點須要獲得通知,則嘗試給它找一個新的前繼節點 // so it will get one. Otherwise wake it up to propagate. // 不然把後繼節點喚醒 int ws; if (pred != head && // 前繼節點不是頭節點 ((ws = pred.waitStatus) == Node.SIGNAL || // 前繼節點的狀態爲SIGNAL 或者 前繼節點的狀態爲未取消且嘗試設置爲SIGNAL成功 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) // 後繼節點存在且未取消 compareAndSetNext(pred, predNext, next); // 給後繼節點設置一個新的前繼節點(即前面找的有效節點),當前節點出隊 } else { unparkSuccessor(node); // 若是存在後繼節點,這裏說明沒法給後繼節點找到新的前繼節點(可能前繼節點是head,或者前繼節點失效了),直接喚醒該後繼節點 } node.next = node; // help GC } }
節點 node 取消獲取鎖,說明當前節點 node 狀態變爲已取消,成爲一個無效節點。
須要考慮如何處理節點 node 的後繼節點:
喚醒當前節點的後繼節點。
java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor
/** * Wakes up node's successor, if one exists. * * @param node the node */ // 喚醒當前節點的後繼節點 private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; // 若是當前節點的狀態爲已取消,則不變;若是小於0(有可能後繼節點須要當前節點來喚醒),則清零。 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // CAS失敗也無所謂(說明後繼節點的線程先一步修改了當前節點的狀態),由於接下來會手動喚醒後繼節點 Node s = node.next; if (s == null || s.waitStatus > 0) { // 後繼節點爲空,或已取消,則從tail開始向前遍歷有效節點 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; // // 注意! 這裏找到了以後並無return, 而是繼續向前找 } if (s != null) LockSupport.unpark(s.thread); // 喚醒後繼節點(或者是隊列中距離head節點最近的有效節點)的線程 }
一般狀況下, 要喚醒的節點就是本身的後繼節點。若是後繼節點存在且也在等待鎖, 那就直接喚醒它。
可是有可能存在 後繼節點取消等待鎖 的狀況,此時從尾節點開始向前找起, 直到找到距離 head 節點最近的未取消的節點,對它進行喚醒。
爲何不從當前節點向後遍歷有效節點呢?
對比 acquire,二者對獲取鎖過程當中發生中斷的處理不一樣。
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireInterruptibly
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireInterruptibly
/** * Acquires in exclusive interruptible mode. * @param arg the acquire argument */ private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); // 線程在阻塞等待鎖的過程當中,被中斷喚醒,則放棄等待鎖,直接拋出異常 } } finally { if (failed) cancelAcquire(node); } }
獨佔模式下釋放鎖/資源,是 Lock#unlock 的內部實現。
java.util.concurrent.locks.AbstractQueuedSynchronizer#release
public final boolean release(int arg) { if (tryRelease(arg)) { // 釋放鎖資源 Node h = head; if (h != null && h.waitStatus != 0) // head.waitStatus == 0,說明head節點後沒有須要喚醒的節點 unparkSuccessor(h); // 喚醒head的後繼節點 return true; } return false; }
java.util.concurrent.locks.AbstractQueuedSynchronizer#tryRelease
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
頭節點 h 的狀態:
相關閱讀:
閱讀 JDK 源碼:AQS 中的獨佔模式
閱讀 JDK 源碼:AQS 中的共享模式
閱讀 JDK 源碼:AQS 對 Condition 的實現
做者:Sumkor