AQS、非阻塞數據結構和原子變量類(java.util.concurrent.atomic包中的類),concurrent包中的基礎類都是使用這種模式來實現的。而concurrent包中的高層類又是依賴於這些基礎類來實現的。從總體來看,concurrent包的實現示意圖以下:
java
Java中的許多可阻塞類,例如ReentrantLock、Semaphore、ReentrantReadWriteLock、CountDownLatch等,都是基於AQS構建的。
node
注:在jdk 1.8.0_111源碼中FutureTask不是基於AQS的,而是基於CAS編程
FutureTask源碼註釋: /* * Revision notes: This differs from previous versions of this * class that relied on AbstractQueuedSynchronizer, mainly to * avoid surprising users about retaining interrupt status during * cancellation races. Sync control in the current design relies * on a "state" field updated via CAS to track completion, along * with a simple Treiber stack to hold waiting threads. * * Style note: As usual, we bypass overhead of using * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics. */
AQS(AbstractQueuedSynchronizer),AQS是JDK下提供的一套用於實現基於FIFO等待隊列的阻塞鎖和相關的同步器的一個同步框架。這個抽象類被設計爲做爲一些可用原子int值來表示狀態的同步器的基類。若是你有看過相似 CountDownLatch 類的源碼實現,會發現其內部有一個繼承了 AbstractQueuedSynchronizer 的內部類 Sync 。可見 CountDownLatch 是基於AQS框架來實現的一個同步器.相似的同步器在JUC下還有很多。(eg. Semaphore )數據結構
如上所述,AQS管理一個關於狀態信息的單一整數,該整數能夠表現任何狀態。好比, Semaphore 用它來表現剩餘的許可數,ReentrantLock用它來表現擁有它的線程已經請求了多少次鎖;FutureTask 用它來表現任務的狀態(還沒有開始、運行、完成和取消)。
JDK源碼中描述以下:多線程
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic {@code int} value to represent state. Subclasses must define the protected methods that change this state, and which define what that state means in terms of this object being acquired or released. Given these, the other methods in this class carry out all queuing and blocking mechanics. Subclasses can maintain other state fields, but only the atomically updated {@code int} value manipulated using methods {@link #getState}, {@link #setState} and {@link #compareAndSetState} is tracked with respect to synchronization. Subclasses should be defined as non-public internal helper classes that are used to implement the synchronization properties of their enclosing class. Class {@code AbstractQueuedSynchronizer} does not implement any synchronization interface. Instead it defines methods such as {@link #acquireInterruptibly} that can be invoked as appropriate by concrete locks and related synchronizers to implement their public methods. This class supports either or both a default exclusive mode and a shared mode. When acquired in exclusive mode, attempted acquires by other threads cannot succeed. Shared mode acquires by multiple threads may (but need not) succeed. This class does not "understand" these differences except in the mechanical sense that when a shared mode acquire succeeds, the next waiting thread (if one exists) must also determine whether it can acquire as well. Threads waiting in the different modes share the same FIFO queue. Usually, implementation subclasses support only one of these modes, but both can come into play for example in a {@link ReadWriteLock}. Subclasses that support only exclusive or only shared modes need not define the methods supporting the unused mode. This class defines a nested {@link ConditionObject} class that can be used as a {@link Condition} implementation by subclasses supporting exclusive mode for which method {@link #isHeldExclusively} reports whether synchronization is exclusively held with respect to the current thread, method {@link #release} invoked with the current {@link #getState} value fully releases this object, and {@link #acquire}, given this saved state value, eventually restores this object to its previous acquired state. No {@code AbstractQueuedSynchronizer} method otherwise creates such a condition, so if this constraint cannot be met, do not use it. The behavior of {@link ConditionObject} depends of course on the semantics of its synchronizer implementation. This class provides inspection, instrumentation, and monitoring methods for the internal queue, as well as similar methods for condition objects. These can be exported as desired into classes using an {@code AbstractQueuedSynchronizer} for their synchronization mechanics. Serialization of this class stores only the underlying atomic integer maintaining state, so deserialized objects have empty thread queues. Typical subclasses requiring serializability will define a {@code readObject} method that restores this to a known initial state upon deserialization. Usage To use this class as the basis of a synchronizer, redefine the following methods, as applicable, by inspecting and/or modifying the synchronization state using {@link #getState}, {@link #setState} and/or {@link #compareAndSetState}: {@link #tryAcquire} {@link #tryRelease} {@link #tryAcquireShared} {@link #tryReleaseShared} {@link #isHeldExclusively} Each of these methods by default throws {@link UnsupportedOperationException}. Implementations of these methods must be internally thread-safe, and should in general be short and not block. Defining these methods is the only supported means of using this class. All other methods are declared {@code final} because they cannot be independently varied. You may also find the inherited methods from {@link AbstractOwnableSynchronizer} useful to keep track of the thread owning an exclusive synchronizer. You are encouraged to use them -- this enables monitoring and diagnostic tools to assist users in determining which threads hold locks. Even though this class is based on an internal FIFO queue, it does not automatically enforce FIFO acquisition policies. The core of exclusive synchronization takes the form: Acquire: while (!tryAcquire(arg)) { enqueue thread if it is not already queued; possibly block current thread; } Release: if (tryRelease(arg)) unblock the first queued thread; (Shared mode is similar but may involve cascading signals.) Because checks in acquire are invoked before enqueuing, a newly acquiring thread may barge ahead of others that are blocked and queued. However, you can, if desired, define {@code tryAcquire} and/or {@code tryAcquireShared} to disable barging by internally invoking one or more of the inspection methods, thereby providing a fair FIFO acquisition order. In particular, most fair synchronizers can define {@code tryAcquire} to return {@code false} if {@link #hasQueuedPredecessors} (a method specifically designed to be used by fair synchronizers) returns {@code true}. Other variations are possible. Throughput and scalability are generally highest for the default barging (also known as greedy, renouncement, and convoy-avoidance) strategy. While this is not guaranteed to be fair or starvation-free, earlier queued threads are allowed to recontend before later queued threads, and each recontention has an unbiased chance to succeed against incoming threads. Also, while acquires do not "spin" in the usual sense, they may perform multiple invocations of {@code tryAcquire} interspersed with other computations before blocking. This gives most of the benefits of spins when exclusive synchronization is only briefly held, without most of the liabilities when it is not. If so desired, you can augment this by preceding calls to acquire methods with "fast-path" checks, possibly prechecking {@link #hasContended} and/or {@link #hasQueuedThreads} to only do so if the synchronizer is likely not to be contended. This class provides an efficient and scalable basis for synchronization in part by specializing its range of use to synchronizers that can rely on {@code int} state, acquire, and release parameters, and an internal FIFO wait queue. When this does not suffice, you can build synchronizers from a lower level using {@link java.util.concurrent.atomic atomic} classes, your own custom {@link java.util.Queue} classes, and {@link LockSupport} blocking support. Usage Examples Here is a non-reentrant mutual exclusion lock class that uses the value zero to represent the unlocked state, and one to represent the locked state. While a non-reentrant lock does not strictly require recording of the current owner thread, this class does so anyway to make usage easier to monitor. It also supports conditions and exposes one of the instrumentation methods: {@code class Mutex implements Lock, java.io.Serializable { // Our internal helper class private static class Sync extends AbstractQueuedSynchronizer { // Reports whether in locked state protected boolean isHeldExclusively() { return getState() == 1; } // Acquires the lock if state is zero public boolean tryAcquire(int acquires) { assert acquires == 1; // Otherwise unused if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // Releases the lock by setting state to zero protected boolean tryRelease(int releases) { assert releases == 1; // Otherwise unused if (getState() == 0) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); setState(0); return true; } // Provides a Condition Condition newCondition() { return new ConditionObject(); } // Deserializes properly private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } } // The sync object does all the hard work. We just forward to it. private final Sync sync = new Sync(); public void lock() { sync.acquire(1); } public boolean tryLock() { return sync.tryAcquire(1); } public void unlock() { sync.release(1); } public Condition newCondition() { return sync.newCondition(); } public boolean isLocked() { return sync.isHeldExclusively(); } public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } }} Here is a latch class that is like a {@link java.util.concurrent.CountDownLatch CountDownLatch} except that it only requires a single {@code signal} to fire. Because a latch is non-exclusive, it uses the {@code shared} acquire and release methods. {@code class BooleanLatch { private static class Sync extends AbstractQueuedSynchronizer { boolean isSignalled() { return getState() != 0; } protected int tryAcquireShared(int ignore) { return isSignalled() ? 1 : -1; } protected boolean tryReleaseShared(int ignore) { setState(1); return true; } } private final Sync sync = new Sync(); public boolean isSignalled() { return sync.isSignalled(); } public void signal() { sync.releaseShared(1); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } }} @since 1.5 @author Doug Lea
如JDK的文檔中所說,使用AQS來實現一個同步器須要覆蓋實現以下幾個方法,而且使用getState,setState,compareAndSetState這幾個方法來設置獲取狀態併發
boolean tryAcquire(int arg) boolean tryRelease(int arg) int tryAcquireShared(int arg) boolean tryReleaseShared(int arg) boolean isHeldExclusively()
以上方法不須要所有實現,根據獲取的鎖的種類能夠選擇實現不一樣的方法,支持獨佔(排他)獲取鎖的同步器應該實現tryAcquire、tryRelease、isHeldExclusively而支持共享獲取的同步器應該實現tryAcquireShared、tryReleaseShared、isHeldExclusively。下面以 CountDownLatch 舉例說明基於AQS實現同步器, CountDownLatch 用同步狀態持有當前計數,countDown方法調用 release從而致使計數器遞減;當計數器爲0時,解除全部線程的等待;await調用acquire,若是計數器爲0,acquire 會當即返回,不然阻塞。一般用於某任務須要等待其餘任務都完成後才能繼續執行的情景。源碼以下:app
public class CountDownLatch { /** * 基於AQS的內部Sync * 使用AQS的state來表示計數count. */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { // 使用AQS的getState()方法設置狀態 setState(count); } int getCount() { // 使用AQS的getState()方法獲取狀態 return getState(); } // 覆蓋在共享模式下嘗試獲取鎖 protected int tryAcquireShared(int acquires) { // 這裏用狀態state是否爲0來表示是否成功,爲0的時候能夠獲取到返回1,不然不能夠返回-1 return (getState() == 0) ? 1 : -1; } // 覆蓋在共享模式下嘗試釋放鎖 protected boolean tryReleaseShared(int releases) { // 在for循環中Decrement count直至成功; // 當狀態值即count爲0的時候,返回false表示 signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; // 使用給定計數值構造CountDownLatch public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } // 讓當前線程阻塞直到計數count變爲0,或者線程被中斷 public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } // 阻塞當前線程,除非count變爲0或者等待了timeout的時間。當count變爲0時,返回true public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } // count遞減 public void countDown() { sync.releaseShared(1); } // 獲取當前count值 public long getCount() { return sync.getCount(); } public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; } }
Doug Lea曾經介紹過 AQS 的設計初衷。從原理上,一種同步結構每每是能夠利用其餘的結構實現的,例如可使用 Semaphore實現互斥鎖。可是,對某種同步結構的傾向,會致使複雜、晦澀的實現邏輯,因此,他選擇了將基礎的同步相關操做抽象在 AbstractQueuedSynchronizer 中,利用 AQS 爲咱們構建同步結構提供了範本。框架
AQS 內部數據和方法,能夠簡單拆分爲:ide
private volatile int state;
一個先入先出(FIFO)的等待線程隊列,以實現多線程間競爭和等待,這是 AQS 機制的核心之一。工具
各類基於 CAS 的基礎操做方法,以及各類指望具體同步結構去實現的 acquire/release 方法。
利用 AQS 實現一個同步結構,至少要實現兩個基本類型的方法,分別是 acquire操做,獲取資源的獨佔權;還有就是 release 操做,釋放對某個資源的獨佔。
以 ReentrantLock 爲例,它內部經過擴展 AQS 實現了 Sync 類型,以 AQS 的 state來反映鎖的持有狀況。
private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { …}
下面是 ReentrantLock 對應 acquire 和 release 操做,若是是 CountDownLatch 則能夠看做是 await()/countDown(),具體實現也有區別。
public void lock() { sync.acquire(1); } public void unlock() { sync.release(1); }
排除掉一些細節,總體地分析 acquire 方法邏輯,其直接實現是在 AQS 內部,調用了 tryAcquire 和 acquireQueued,這是兩個須要搞清楚的基本部分。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
首先,咱們來看看 tryAcquire。在 ReentrantLock 中,tryAcquire 邏輯實如今 NonfairSync 和 FairSync 中,分別提供了進一步的非公平或公平性方法,而 AQS 內部 tryAcquire 僅僅是個接近未實現的方法(直接拋異常),這是留個實現者本身定義的操做。
咱們能夠看到公平性在 ReentrantLock 構建時如何指定的,具體以下:
public ReentrantLock() { sync = new NonfairSync(); // 默認是非公平的 } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
以非公平的 tryAcquire 爲例,其內部實現瞭如何配合狀態與 CAS獲取鎖,注意,對比公平版本的tryAcquire,它在鎖無人佔有時,並不檢查是否有其餘等待者,這裏體現了非公平的語義。
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState();// 獲取當前 AQS 內部狀態量 if (c == 0) { // 0 表示無人佔有,則直接用 CAS 修改狀態位, if (compareAndSetState(0, acquires)) {// 不檢查排隊狀況,直接爭搶 setExclusiveOwnerThread(current); // 並設置當前線程獨佔鎖 return true; } } else if (current == getExclusiveOwnerThread()) { // 即便狀態不是 0,也可能當前線程是鎖持有者,由於這是再入鎖 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
接下來再來分析 acquireQueued,若是前面的 tryAcquire失敗,表明着鎖爭搶失敗,進入排隊競爭階段。這裏就是咱們所說的,利用 FIFO 隊列,實現線程間對鎖的競爭的部分,算是是 AQS 的核心邏輯。
當前線程會被包裝成爲一個排他模式的節點(EXCLUSIVE),經過 addWaiter 方法添加到隊列中。acquireQueued的邏輯,簡要來講,就是若是當前節點的前面是頭節點,則試圖獲取鎖,一切順利則成爲新的頭節點;不然,有必要則等待,具體處理邏輯請參考我添加的註釋。
final boolean acquireQueued(final Node node, int arg) { boolean interrupted = false; try { for (;;) {// 循環 final Node p = node.predecessor();// 獲取前一個節點 if (p == head && tryAcquire(arg)) { // 若是前一個節點是頭結點,表示當前節點合適去 tryAcquire setHead(node); // acquire 成功,則設置新的頭節點 p.next = null; // 將前面節點對當前節點的引用清空 return interrupted; } if (shouldParkAfterFailedAcquire(p, node)) // 檢查是否失敗後須要 park interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node);// 出現異常,取消 if (interrupted) selfInterrupt(); throw t; } }
到這裏線程試圖獲取鎖的過程基本展示出來了,tryAcquire是按照特定場景須要開發者去實現的部分,而線程間競爭則是 AQS 經過 Waiter 隊列與 acquireQueued 提供的,在 release方法中,一樣會對隊列進行對應操做。
參考資料:
JAVA併發編程: CAS和AQS
《JAVA併發編程實戰》
極客時間:第22講 | AtomicInteger底層實現原理是什麼?如何在本身的產品代碼中應用CAS操做
源碼剖析AQS在幾個同步工具類中的使用