AQS 是用來構建鎖和同步工具的基本框架。本文主要基於 AQS 做者 Doug Lea 的論文 The java.util.concurrent Synchronizer Framework 和 JDK 1.8 的文檔。java
這篇文章也同時發佈在個人博客中。node
不過英文好的話,仍是直接看論文吧。編程
同步(維持變量在各個線程間狀態的一致性)至少須要兩種操做:設計模式
同時支持兩種模式:bash
同時框架須要一些高級功能:併發
tryLock
和lock
)同步器的基本思想很直接簡潔,用僞代碼表示以下:框架
acquireide
while (syncronization state does not allow acquire) {
enqueue current thread if not already queued;
possiblly block current thread;
}
複製代碼
release工具
update synchronization state;
if (state may premit a blocked thread acuire) {
unblock one or more queued thread;
}
複製代碼
要實現這兩個操做,須要三個基本模塊的配合:優化
AQS 使用一個 32 位整數(int)來表明共享資源,也就是同步狀態。
該整數能夠表現任何狀態。好比,
Semaphore
用它來表現剩餘的許可數,ReentrantLock
用它來表現擁有它的線程已經請求了多少次鎖;FutureTask
用它來表現任務的狀態 (還沒有開始、運行、完成和取消)
AQS 使用 JUC 包下LockSupport
中的pack()
和unpack()
方法來阻塞和喚醒進程。最終會調用Unsafe.park()
和Unsafe.unpack()
兩個 native 方法,最終的阻塞線程和喚醒線程具體實現仍是由操做系統來實現的。
AQS 維護一個 FIFO 的隊列,來管理阻塞的線程,能夠實現公平性(也能夠不公平),也就是同時支持公平鎖和非公平鎖兩種模式。內部使用 CLH Lock,可是作了不少優化,好比CLH 鎖不是自旋的而是阻塞的。
AQS 中的 CLH lock 和原汁原味的 CLH lock 相比,主要有兩點不一樣:
不使用自旋鎖而是阻塞鎖,調用pack()
和unpack()
實現。
節點有顯式的後繼節點next
,原來的 CLH lock 不須要顯式的鏈表由於當前一個節點爲釋放鎖時,後一個節點在一直輪詢,因此它可以拿到鎖。而 AQS 的鎖是阻塞的,須要調用unpack(Thread)
來喚醒請求鎖的線程,因此須要知道它的後繼節點。
AQS 同時還設置了一個
signal bit
來避免沒必要要的pack()
和unpack()
調用。在調用pack()
以前,首先設置signal bit
爲 true,而後再次檢查節點狀態,若是還不能拿到鎖,就調用pack()
阻塞線程。
這樣就能夠用更加詳細的僞代碼來描述acquire
和release
,這裏只考慮exclusive mode、不可中斷的、沒有超時功能的狀況:
acquire:
if (!tryAcquire(arg)) {
node = create and enqueue new node;
pred = node's effective predecessor; while (pred is not head node || !tryAcquire(arg)) { if (pred's signal bit is set)
park();
else
compareAndSet pred's signal bit to ture pred = node's effective predecessor;
}
head = node;
}
複製代碼
release:
if (tryRelease(arg) && head node's signal bit is set) { compareAndSet head's signal bit to false;
unpack head's successor, if one exists }s 複製代碼
實現一個同步器須要實現下面的方法:
tryAcquire()
tryRelease()
tryAcquireShared()
tryReleaseShared()
isHeldExclusively()
複製代碼
以上方法不須要所有實現,根據獲取的鎖的種類能夠選擇實現不一樣的方法.
tryAcquire
、 tryRelease
、isHeldExclusively
tryAcquireShared
、tryReleaseShared
、isHeldExclusively
ReentrantReadWriteLock
實現一個同步器最好的設計模式是把功能委託給一個AQS的私有內部子類,而不是直接繼承 AQS 來實現(這樣會破壞同步器的簡潔性,調用者可能會調用 AQS 的其餘方法破壞同步狀態)。
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/** * @author leer * Created at 4/25/19 6:24 PM * 一個不可重入的互斥鎖 */
public class Mutex {
static final class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int ignore) {
return compareAndSetState(0, 1);
}
@Override
protected boolean tryRelease(int ignore) {
setState(0);
return true;
}
}
private final Sync sync = new Sync();
public void lock() {
sync.acquire(0);
}
public void unlock() {
sync.release(0);
}
}
複製代碼
非公平鎖的tryAcquire
實現:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
複製代碼
ReentrantReadWriteLock 使用 同步狀態的 16 位來存放讀鎖計數,另外的 16 位存放寫鎖計數。
ReentrantLock
相似Semaphore
使用同步狀態來保存當前可用許可數量。它重寫tryAcquireShared
來減小計數來模擬獲取資源,若是計數小於 0 則會阻塞線程;重寫tryReleaseShared
來模擬釋放資源。同時它也有公平模式和非公平模式。
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
複製代碼
和Semaphore
相似,同步狀態保存當前的計數值。countDown()
調用releaseShared()
,await()
方法調用acquireShared()
,等待計數器到零。
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
複製代碼
Only when JDK version < 1.7
FutureTask
使用同步狀態保存Future
任務的狀態(initial、running、cancelled、done)。
設置和取消一個任務將調用release()
,調用Future.get()
等待結果將會調用acquire()
。
Doug Lea 的論文: The java.util.concurrent Synchronizer Framework
《Java併發編程實戰》第14章