AbstractQueuedSynchronizer(如下簡稱AQS)是Java中用於構建鎖和同步器的框架,許多同步器均可以經過AQS很容易而且高效地構造出來。不少文章已經基於論文和源碼對實現進行了解讀,本文試着從另外的角度入手:先不考慮AQS的實現,假設讓咱們本身實現鎖,咱們能夠怎麼作?最後再來看AQS的實現,才能更好地理解爲何要這麼實現。html
咱們能夠形象地把鎖理解成門票,只有當線程拿到了門票,才能進入臨界區。所以咱們能夠用一個狀態變量state
表示鎖,state = true
表示能夠獲取到鎖,反之就是表示鎖已經被佔用。那麼當鎖被佔用時,應該怎麼處理?這裏有兩種思路:node
基於第一種思路實現的鎖叫作自旋鎖(SpinLock)。下面咱們先看自選鎖中最簡單的實現,這個實現叫作Test-And-Set-LOCK,簡稱TAS Lock。linux
public class TASLock {
AtomicBoolean state = new AtomicBoolean(false);
public void lock() {
while (!state.getAndSet(true)) {} // 循環檢測直到狀態可用
}
public void unlock() {
state.set(false);
}
}
複製代碼
從實現上咱們能夠看出,獲取鎖的線程一直處於活躍狀態,可是並無執行任何有效的任務,所以使用自旋鎖會形成busy-waiting
。git
在對TAS鎖提出優化思路前,先介紹一下緩存一致性。下面這張圖描述的是每一個處理器都有本身的緩存,但共享一個內存,緩存的內容來自內存。一旦處理器更新了本身的緩存,若是這個更新須要被其餘處理器感知,就須要經過總線來通知。所以頻繁更新會佔用大量總線流量。github
目前咱們是用一個狀態變量來標識鎖的狀態。TAS鎖每次循環都會調用getAndSet()
,這是一個更新指令,會致使其餘線程的緩存都失效,從而都會去內存中獲取值,所以佔用總線流量資源。數組
TAS鎖
的問題在於每次循環都修改狀態,實際上只有狀態是可用的狀況下,纔有必要去修改。TTAS
(Test-Test-And-Set
)改進就是在加鎖前先檢查狀態變量是否爲false,只有條件知足纔去修改。緩存
public class TTASLock {
AtomicBoolean state = new AtomicBoolean(false);
public void lock() {
while (true) {
while (state.get()) {} // 循環讀取state狀態
if (!state.getAndSet(true)) { // 只有當state爲false,纔會修改
return;
}
}
}
public void unlock() {
state.set(false);
}
}
複製代碼
可是當釋放鎖時,其餘線程檢測到state
都是false
,這時都會調用state.getAndSet(true)
,又退化到TAS
的情形。bash
TTAS
的問題關鍵在於全部線程都同時去獲取鎖,所以引入延遲能夠解決問題:當獲取鎖失敗時,在重試前先睡眠一段時間,再次失敗則延遲時間翻倍——指數退避。架構
public class BackoffLock {
AtomicBoolean state = new AtomicBoolean(false);
private int minDelay;
private int maxDelay;
public BackoffLock(int minDelay, int maxDelay) {
this.minDelay = minDelay;
this.maxDelay = maxDelay;
}
public void lock() throws InterruptedException {
int delay = minDelay;
while (true) {
while (state.get()) {}
if (!state.getAndSet(true)) {
return;
}
Thread.sleep((int) (Math.random() * delay));
if (delay < maxDelay) {
delay = 2 * delay;
}
}
}
public void unlock() {
state.set(false);
}
}
複製代碼
指數退避自旋的不足在於須要設置好延遲參數,極可能就在線程睡眠過程當中,獲取鎖的線程恰好就釋放了鎖。框架
一開始由於咱們都是基於一個狀態變量來標識鎖,纔會致使頻繁佔用總線流量,那麼若是每一個線程都有一個狀態,就能夠大幅減小佔用。
基於數組的隊列鎖lock()
時從數組中按順序找到一個可用的位置,用來表明當前線程。unlock()
時通知下一個線程。
public class ArrayLock {
private int n;
private volatile boolean[] flags;
private AtomicInteger next = new AtomicInteger(0);
private ThreadLocal<Integer> slot = new ThreadLocal<>();
public ArrayLock(int n) {
this.n = n;
flags = new boolean[n];
flags[0] = true;
}
public void lock() {
int index = next.getAndIncrement();
slot.set(index);
while (!flags[index % n]) {}
}
public void unlock() {
int index = slot.get();
flags[index % n] = false; // 爲複用作好準備
flags[(index + 1) % n] = true; // 通知下一個線程
}
}
複製代碼
顯然,基於數組的隊列鎖的不足之處就是鎖的數量受限於數組長度。所以,可用考慮經過鏈表來改進。
CLH鎖
內部就維護了一個隱式的鏈表。CLH
是Craig, Landin, and Hagersten的縮寫。
public class CLHSpinLock {
private final ThreadLocal<QNode> node;
private final ThreadLocal<QNode> prev;
AtomicReference<QNode> tail = new AtomicReference<>(new QNode());
public CLHSpinLock() {
node = new ThreadLocal<QNode>() {
@Override
protected QNode initialValue() {
return new QNode();
}
};
prev = new ThreadLocal<QNode>() {
@Override
protected QNode initialValue() {
return null;
}
};
}
public void lock() {
QNode myNode = node.get();
myNode.locked = true;
QNode pred = tail.getAndSet(myNode);
prev.set(pred);
// 在前繼節點自旋
while (pred.locked) {};
}
public void unlock() {
QNode myNode = node.get();
myNode.locked = false;
node.set(prev.get());
}
class QNode {
volatile boolean locked = false;
}
}
複製代碼
因爲CLH是在前繼節點上自旋,在NUMA架構下,可能須要頻繁訪問遠端內存,影響性能。那麼能不能直接在本地節點自旋呢?
MCS鎖
就是在本地節點自旋,把CLH的屢次對遠端內存的監聽 + 一次對本地內存的更新,簡化成了屢次對本地內存的監聽 + 一次對遠端內存的更新。
public class MCSSpinLock {
ThreadLocal<QNode> node = new ThreadLocal<QNode>() {
@Override
protected QNode initialValue() {
return new QNode();
}
};
AtomicReference<QNode> tail = new AtomicReference<>(null);
public void lock() {
QNode qNode = node.get();
QNode pred = tail.getAndSet(qNode);
if (pred != null) {
qNode.locked = true;
pred.next = qNode; // QNode.next是volatile,保證了線程可見性
while (qNode.locked) {};
}
}
public void unlock() {
QNode qNode = node.get();
if (qNode.next == null) { // 當前節點沒有發現後繼節點
if (tail.compareAndSet(qNode, null)) { // 確實沒有後繼節點
return;
}
while (qNode.next == null) {}; // 有後繼節點,可是尚未關聯上,須要等待
}
qNode.next.locked = false;
}
class QNode {
volatile boolean locked = false;
volatile QNode next = null;
}
}
複製代碼