AQS(AbstractQueuedSynchronizer)隊列同步器,是JUC中很是重要的一個組件,基於它能夠簡單高效地構建一些通用的鎖和同步器,如ReentrantLock、Semaphore等(本文學習內容基於JDK1.8),本文主要關注AQS的源碼實現及基於AQS實現的一些經常使用的同步組件html
經過使用JUC中的同步組件,能夠比較簡潔地進行併發編程,而在不少同步組件的實現中都出現了Sync extends AbstractQueuedSynchronizer
的身影,經過對AQS的一些方法的重寫,實現了相應的組件的功能。AQS是實現鎖的關鍵,其中鎖是面向鎖的使用者的,定義了鎖的使用方式,而AQS是面向鎖的實現者的,簡化了鎖的實現方式,屏蔽了同步狀態管理、線程的排隊、等待與喚醒等底層操做。java
AQS採用了模板方法設計模式,支持經過子類重寫相應的方法實現不一樣的同步器。在AQS中,有一個state變量,表示同步狀態(這裏的同步狀態就能夠看做是一種資源,對同步狀態的獲取能夠看做是對同步資源的競爭),AQS提供了多種獲取同步狀態的方式,包括獨佔式獲取、共享式獲取以及超時獲取等,下面會進行具體的介紹。node
下面將結合源碼從模板方法、同步狀態管理、CLH鎖隊列、獨佔式獲取方式、共享式獲取方式、超時獲取方式等方面分析AQS的原理及實現git
能夠經過子類重寫的方法列表以下github
方法名稱 | 用途 |
---|---|
tryAcquire(int arg) | 主要用於實現獨佔式獲取同步狀態,實現該方法須要查詢當前狀態是否符合預期,而後進行相應的狀態更新實現控制(獲取成功返回true,不然返回false,成功一般是能夠更新同步狀態,失敗則是不符合更新同步狀態的條件),其中arg表示須要獲取的同步狀態數 |
tryRelease(int arg) | 主要用於實現獨佔式釋放同步狀態,同時更新同步狀態(一般在同步狀態state更新爲0纔會返回true,表示已經完全釋放同步資源),其中arg表示須要釋放的同步狀態數 |
tryAcquireShared(int arg) | 主要用於實現共享式獲取同步狀態,同時更新同步狀態 |
tryReleaseShared(int arg) | 主要用於實現共享式釋放同步狀態,同時更新同步狀態 |
isHeldExclusively() | 通常用於判斷同步器是否被當前線程獨佔 |
對線程進行加鎖在AQS中體現爲對同步狀態的操做,經過的同步狀態地管理,能夠實現不一樣的同步任務,同步狀態state
是AQS很關鍵的一個域面試
// 由於state是volatile的,因此get、set方法均爲原子操做,而compareAndSetState方法
// 使用了Unsafe類的CAS操做,因此也是原子的
// 同步狀態
private volatile int state;
// 同步狀態的操做包括
// 獲取同步狀態
protected final int getState() { return state;}
// 設置同步狀態
protected final void setState(int newState) { state = newState;}
// CAS操做更新同步狀態
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
複製代碼
CLH(Craig, Landin, and Hagersten)鎖,是自旋鎖的一種。AQS中使用了CLH鎖的一個變種,實現了一個雙向隊列,並使用其實現阻塞的功能,經過將請求共享資源的線程封裝爲隊列中的一個結點實現鎖的分配。編程
雙向隊列的頭結點記錄工做狀態下的線程,後繼結點若獲取不了同步狀態則會進入阻塞狀態,新的結點會從隊尾加入隊列,競爭同步狀態設計模式
// 隊列的數據結構以下
// 結點的數據結構
static final class Node {
// 表示該節點等待模式爲共享式,一般記錄於nextWaiter,
// 經過判斷nextWaiter的值能夠判斷當前結點是否處於共享模式
static final Node SHARED = new Node();
// 表示節點處於獨佔式模式,與SHARED相對
static final Node EXCLUSIVE = null;
// waitStatus的不一樣狀態,具體內容見下文的表格
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
// 記錄前置結點
volatile Node prev;
// 記錄後置結點
volatile Node next;
// 記錄當前的線程
volatile Thread thread;
// 用於記錄共享模式(SHARED), 也能夠用來記錄CONDITION隊列(見擴展分析)
Node nextWaiter;
// 經過nextWaiter的記錄值判斷當前結點的模式是否爲共享模式
final boolean isShared() { return nextWaiter == SHARED;}
// 獲取當前結點的前置結點
final Node predecessor() throws NullPointerException { ... }
// 用於初始化時建立head結點或者建立SHARED結點
Node() {}
// 在addWaiter方法中使用,用於建立一個新的結點
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
// 在CONDITION隊列中使用該構造函數新建結點
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
// 記錄頭結點
private transient volatile Node head;
// 記錄尾結點
private transient volatile Node tail;
複製代碼
Node狀態表(waitStatus,初始化時默認爲0)數據結構
狀態名稱 | 狀態值 | 狀態描述 |
---|---|---|
CANCELLED | 1 | 說明當前結點(即相應的線程)是由於超時或者中斷取消的,進入該狀態後將沒法恢復 |
SIGNAL | -1 | 說明當前結點的後繼結點是(或者將要)由park致使阻塞的,當結點被釋放或者取消時,須要經過unpark喚醒後繼結點(表現爲unparkSuccessor()方法) |
CONDITION | -2 | 該狀態是用於condition隊列結點的,代表結點在等待隊列中,結點線程等待在Condition上,當其餘線程對Condition調用了signal()方法時,會將其加入到同步隊列中去,關於這一部分的內容會在擴展中說起。 |
PROPAGATE | -3 | 說明下一次共享式同步狀態的獲取將會無條件地向後繼結點傳播 |
下圖展現該隊列的基本結構併發
獨佔式(EXCLUSIVE)獲取需重寫tryAcquire
、tryRelease
方法,並訪問acquire
、release
方法實現相應的功能。
acquire的流程圖以下:
上述流程圖比較複雜,這裏簡單概述一下其中的過程
主要代碼以下:
// 這裏不去看tryAcquire、tryRelease方法的具體實現,只知道它們的做用分別爲嘗試獲取同步狀態、
// 嘗試釋放同步狀態
public final void acquire(int arg) {
// 若是線程直接獲取成功,或者再嘗試獲取成功後都是直接工做,
// 若是是從阻塞狀態中喚醒開始工做的線程,將當前的線程中斷
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 包裝線程,新建結點並加入到同步隊列中
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 嘗試入隊, 成功返回
if (pred != null) {
node.prev = pred;
// CAS操做設置隊尾
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 經過CAS操做自旋完成node入隊操做
enq(node);
return node;
}
// 在同步隊列中等待獲取同步狀態
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋
for (;;) {
final Node p = node.predecessor();
// 檢查是否符合開始工做的條件
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
// 獲取不到同步狀態,將前置結點標爲SIGNAL狀態而且經過park操做將node包裝的線程阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 若是獲取失敗,將node標記爲CANCELLED
if (failed)
cancelAcquire(node);
}
}
複製代碼
release流程圖以下
release的過程比較簡單,主要就是經過tryRelease更新同步狀態,而後若是須要,喚醒後置結點中被阻塞的線程
主要代碼以下
// release
public final boolean release(int arg) {
// 首先嚐試釋放並更新同步狀態
if (tryRelease(arg)) {
Node h = head;
// 檢查是否須要喚醒後置結點
if (h != null && h.waitStatus != 0)
// 喚醒後置結點
unparkSuccessor(h);
return true;
}
return false;
}
// 喚醒後置結點
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 經過CAS操做將waitStatus更新爲0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 檢查後置結點,若爲空或者狀態爲CANCELLED,找到後置非CANCELLED結點
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 喚醒後置結點
if (s != null)
LockSupport.unpark(s.thread);
}
複製代碼
共享式(SHARED)獲取需重寫tryAcquireShared
、tryReleaseShared
方法,並訪問acquireShared
、releaseShared
方法實現相應的功能。與獨佔式相對,共享式支持多個線程同時獲取到同步狀態並進行工做
acquireShared
acquireShared過程和acquire很是類似,流程大體相同,下面簡單歸納一下
//
public final void acquireShared(int arg) {
// 嘗試共享式獲取同步狀態,若是成功獲取則能夠繼續執行,不然執行doAcquireShared
if (tryAcquireShared(arg) < 0)
// 以共享式不停得嘗試獲取同步狀態
doAcquireShared(arg);
}
// Acquires in shared uninterruptible mode.
private void doAcquireShared(int arg) {
// 向同步隊列中新增一個共享式的結點
final Node node = addWaiter(Node.SHARED);
// 標記獲取失敗狀態
boolean failed = true;
try {
// 標記中斷狀態(若在該過程當中被中斷是不會響應的,須要手動中斷)
boolean interrupted = false;
// 自旋
for (;;) {
// 獲取前置結點
final Node p = node.predecessor();
// 若前置結點爲頭結點
if (p == head) {
// 嘗試獲取同步狀態
int r = tryAcquireShared(arg);
// 若獲取到同步狀態。
if (r >= 0) {
// 此時,當前結點存儲的線程恢復執行,須要將當前結點設置爲頭結點而且向後傳播,
// 通知符合喚醒條件的結點一塊兒恢復執行
setHeadAndPropagate(node, r);
p.next = null; // help GC
// 須要中斷,中斷當前線程
if (interrupted)
selfInterrupt();
// 獲取成功
failed = false;
return;
}
}
// 獲取同步狀態失敗,須要進入阻塞狀態
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 獲取失敗,CANCELL node
if (failed)
cancelAcquire(node);
}
}
// 將node設置爲同步隊列的頭結點,而且向後通知當前結點的後置結點,完成傳播
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
// 向後傳播
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if(s == null || s.isShared())
doReleaseShared();
}
}
複製代碼
releasShared
releaseShared在嘗試釋放同步狀態成功後,會喚醒後置結點,而且保證傳播性
public final boolean releaseShared(int arg) {
// 嘗試釋放同步狀態
if (tryReleaseShared(arg)) {
// 成功後喚醒後置結點
doReleaseShared();
return true;
}
return false;
}
// 喚醒後置結點
private void doReleaseShared() {
// 循環的目的是爲了防止新結點在該過程當中進入同步隊列產生的影響,同時要保證CAS操做的完成
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
複製代碼
超時獲取使經過AQS實現的鎖支持超時獲取鎖,這是synchronized關鍵字所不具有的,關於其具體的實現,和上述實現方式類似,只是在獨佔式、共享式獲取的基礎上增長了時間的約束,同時經過parkNanos()方法爲阻塞定時,這裏再也不過多展開。
下面列舉幾個經常使用的併發組件
ReentrantLock,重入鎖。經過AQS獨佔式實現加鎖、解鎖操做,支持同一線程重複獲取鎖。主要操做爲lock,unlock,其實現分別依賴acquire和release
private final Sync sync;
// 繼承AQS,重寫相應方法
abstract static class Sync extends AbstractQueuedSynchronizer {
abstract void lock();
final boolean nonfairTryAcquire(int acquires) { ... }
protected final boolean tryRelease(int releases) { ... }
// ...略
}
static final class NonfairSync extends Sync {
final void lock() { ... }
protected final boolean tryAcquire(int acquires) { ... }
}
static final class FairSync extends Sync {
final void lock() { ... }
protected final boolean tryAcquire(int acquires) { ... }
}
複製代碼
相關總結
CountDownLatch,一種同步工具,可使一個或多個線程一直等待,直到指定數量線程所有執行完畢後纔再執行。經過AQS共享式實現。主要操做爲await(讓當前線程等待,調用了AQS的acquireSharedInterruptibly方法,能夠簡單將其看成acquireShared方法,實現基本相同),countDown(執行同步狀態釋放的操做),其實大致的思路就是,初始化CountDownLatch必定的同步狀態數,執行await操做的線程需等待同步狀態數徹底釋放(爲0)時才能夠執行,而須要先完成的任務在完成後都經過countDown釋放必定的同步狀態數
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
// 檢查同步狀態數是否已經爲0,不爲0則同步狀態獲取失敗
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 釋放必定的同步狀態數
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
複製代碼
ReentrantReadWriteLock,可重入的讀寫鎖,同時使用了AQS的獨佔式和共享式,當進行寫操做時,鎖由寫線程獨佔,其餘寫線程和讀線程阻塞。當進行讀操做時,寫線程阻塞,全部讀線程能夠共享鎖。讀寫鎖的實現相對複雜,這裏再也不貼過多的代碼,簡單歸納一下其實現的方式:
在synchronized加鎖的時候,能夠經過Object類的方法的wait()、notify()方法實現等待通知,那麼在Lock鎖的過程當中,也存在相似的操做,即Condition接口,該接口提供了await()、signal()方法,具備相同的功能。
在AQS中,有一個類ConditionObject,實現了Condition接口。它一樣使用了Node的數據結構,構成了一個隊列(FIFO),與同步隊列區別,能夠叫它等待隊列。獲取Condition須要經過Lock接口的newCondition方法,這意味着一個Lock能夠有多個等待隊列,而Object監視器模型提供的一個對象僅有一個等待隊列
// Condition的數據結構
static final class Node {
// next 指針
Node nextWaiter;
// ...
}
public class ConditionObject implements Condition, java.io.Serializable {
// head
private transient Node firstWaiter;
// tail
private transient Node lastWaiter;
// ...
}
複製代碼
下面具體來看await和signal操做
await()
// 涉及中斷的操做,暫時忽略
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 向等待隊列的隊尾新建一個CONDITION結點
Node node = addConditionWaiter();
// 由於要進入等待狀態,因此須要釋放同步狀態(即釋放鎖),若是失敗,該結點會被CANCELLED
int savedState = fullyRelease(node);
int interruptMode = 0;
// 判讀該結點是否在同步隊列上,若是不在就經過park操做將其阻塞,進入等待狀態
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 從等待狀態恢復,進入同步隊列競爭同步狀態
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
// 向等待隊列的隊尾新建一個CONDITION結點
private Node addConditionWaiter() {
Node t = lastWaiter;
// 若是最後一個結點的waitStatus並不是CONDITION,說明該結點被CANCELLED了,須要
// 從隊列中清除掉
if (t != null && t.waitStatus != Node.CONDITION) {
// 將CANCELLED結點從等待隊列中清除出去
unlinkCancelledWaiters();
t = lastWaiter;
}
// 新建CONDITION結點而且將其加入隊尾
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
複製代碼
關於await的操做
signal()
//
public final void signal() {
// 校驗
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
// 喚醒等待隊列的頭結點
if (first != null)
doSignal(first);
}
// 執行喚醒操做
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 喚醒結點而且將其加入同步隊列
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
// 將喚醒的結點加入到同步隊列中競爭同步狀態,恢復執行
final boolean transferForSignal(Node node) {
// 將node的狀態從CONDITION恢復到默認狀態,該CAS操做由外層doSignal的循環保證成功操做
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 將node加入到同步隊列中
Node p = enq(node);
int ws = p.waitStatus;
// 若是前置結點已經被取消或者將前置結點設置爲SIGNAL失敗,就經過unpark喚醒node包裝的線程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
複製代碼
關於signal的操做
將等待隊列的頭結點喚醒,從等待隊列中移除,並將其加入到同步隊列中競爭同步狀態,恢復執行
還有一些操做,如signalAll()則是將等待隊列中的所有結點從等待隊列中移除並加入到同步隊列中競爭同步狀態
StampedLock是Java8中新增的一個鎖,是對讀寫鎖的改進。讀寫鎖雖然分離了讀與寫的功能,可是它在處理讀與寫的併發上,採起的是一種悲觀的策略,這就致使了,當讀取的狀況不少而寫入的狀況不多時,寫入線程可能遲遲沒法競爭到鎖並被阻塞,遭遇飢餓問題。
StampedLock提供了3種控制鎖的模式,寫、讀、樂觀讀。在加鎖時能夠獲取一個stamp做爲校驗的憑證,在釋放鎖的時候須要校驗這個憑證,若是憑證失效的話(好比在讀的過程當中,寫線程產生了修改),就須要從新獲取憑證,而且從新獲取數據。這很適合在寫入操做較少,讀取操做較多的情景,能夠樂觀地認爲寫入操做不會發生在讀取數據的過程當中,而是在讀取線程解鎖前進行憑證的校驗,在必要的狀況下,切換成悲觀讀鎖,完成數據的獲取。這樣能夠大幅度提升程序的吞吐量。
StampedLock在實現上沒有藉助AQS,可是其不少設計的思想、方法都是參照AQS並進行了一些修改完成的。在StampedLock內部一樣維護了一個CLH隊列完成相關的功能。
與ReentrantReadWriteLock相比,StamptedLock的API調用相對複雜一些,因此在不少時候仍是會用ReentrantReadWriteLock。
更多的關於StampedLock的內容後續再補充。
若有問題,還請指出