提及 JUC,咱們經常會想起其中的線程池(ExecutorService)。然而,咱們今天來看看另外一個核心模塊 AQS。php
AQS 是 AbstractQueuedSynchronizer 的簡稱,在 JUC 中做爲各類同步器的基石。舉個例子,常見的 ReentrantLock 就是由它實現的。java
咱們知道,java 有一個關鍵字 synchronized 來給一段代碼加鎖,但是這是 JVM 層面的事情。那麼問題來了,如何在 java 代碼層面來實現模擬一個鎖?node
即實現這樣一個接口:算法
package java.util.concurrent.locks;
public interface Lock {
void lock();
void unlock();
}
複製代碼
一個簡單的想法是:讓全部線程去競爭一個變量owner,確保只有一個線程成功,並設置本身爲owner,其餘線程陷入死循環等待。這即是所謂的自旋鎖。編程
一個簡單的代碼實現:api
import java.util.concurrent.atomic.AtomicReference;
public class SpinLock {
private AtomicReference<Thread> owner = new AtomicReference<Thread>();
public void lock() {
Thread currentThread = Thread.currentThread();
// 若是鎖未被佔用,則設置當前線程爲鎖的擁有者
while (!owner.compareAndSet(null, currentThread)) {
}
}
public void unlock() {
Thread currentThread = Thread.currentThread();
// 只有鎖的擁有者才能釋放鎖
owner.compareAndSet(currentThread, null);
}
}
複製代碼
扯這個自旋鎖,主要是爲了引出 AQS 背後的算法 CLH鎖
。安全
關於CLH鎖
更多細節能夠參考篇文章:多線程
CLH鎖的思想,簡單的說就是:一羣人去ATM取錢,頭一我的拿到鎖,在裏面用銀行卡取錢,其他的人在後面排隊等待;前一我的取完錢出來,喚醒下一我的進去取錢。函數
關鍵部分翻譯成代碼就是:
AQS 使用節點爲 Node 的雙向鏈表做爲同步隊列。拿到鎖的線程能夠繼續執行代碼,沒拿到的線程就進入這個隊列排隊。
public abstract class AbstractQueuedSynchronizer ... {
// 隊列頭
private transient volatile Node head;
// 隊列尾
private transient volatile Node tail;
static final class Node {
/** 共享模式,可用於實現 CountDownLatch */
static final Node SHARED = new Node();
/** 獨佔模式,可用於實現 ReentrantLock */
static final Node EXCLUSIVE = null;
/** 取消 */
static final int CANCELLED = 1;
/** 意味着它的後繼節點的線程在排隊,等待被喚醒 */
static final int SIGNAL = -1;
/** 等待在條件上(與Condition相關,暫不解釋) */
static final int CONDITION = -2;
/** * 與共享模式相關,暫不解釋 */
static final int PROPAGATE = -3;
// 可取值:CANCELLED, 0, SIGNAL, CONDITION, PROPAGATE
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
}
複製代碼
這個隊列大致上長這樣:圖片來源
條件隊列是爲了支持 Lock.newCondition() 這個功能,暫時不care,先跳過。
AQS 支持獨佔鎖(Exclusive)和共享鎖(Share)兩種模式:
這邊咱們只看獨佔模式,它對外提供一套 api:
簡單看一眼怎麼用的 (ReentrantLock 的例子):
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {...}
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
}
複製代碼
能夠看到,AQS 封裝了排隊、阻塞、喚醒之類的操做,使得實現一個鎖變的如此簡潔。
獲取資源
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
複製代碼
這個函數很短,其中 tryAcquire(int) 爲模板方法,留給子類實現。相似 Activity.onCreate()。
根據 tryAcquire(arg) 的結果,分兩種狀況:
這一步把當前線程(Thread.currentThread())做爲一個Node節點,加入同步隊列的尾部,並標記爲獨佔模式。
固然,加入隊列這個動做,要保證線程安全。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 1處嘗試更快地 enq(), 成功的話直接 return。失敗的話, 在2處退化爲完整版的 enq(),相對更慢些
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) { // 1
pred.next = node;
return node;
}
}
enq(node); // 2
return node;
}
private Node enq(final Node node) {
// 神奇的死循環 + CAS
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
複製代碼
能夠看到,這邊有一個死循環 + CAS的神奇操做,這是非阻塞算法的經典操做,可自行查閱相關資料。簡單的說,非阻塞算法就是在多線程的狀況下,不加鎖同時保證某個變量(本例中爲雙向鏈表)的線程安全,並且一般比 synchronized 的效率要高。
這個函數主要作兩件事:
結合這張圖來看,每次出隊完須要確保 head 始終指向佔用資源的線程:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 這又是一個死循環 + CAS,此次CAS比較隱蔽,在 shouldParkAfterFailedAcquire()裏邊
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // 排在隊首的後面,看看能不能得到鎖,成功的話本身變成隊首
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 這裏失敗會作一些回滾操做,不分析
if (failed)
cancelAcquire(node);
}
}
複製代碼
這邊的 interrupted 主要是保證這樣一個功能。線程在排隊的時候不響應中斷,直到出來之後,若是等待的過程當中被中斷過,做爲彌補,當即相應中斷(即調用selfInterrupt())。
查看prev的waitStatus,看是否是須要阻塞。能夠預見的是,通過幾回死循環,所有都會變成SIGNAL狀態。以後所有陷入阻塞。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 查看前驅節點的狀態
if (ws == Node.SIGNAL) // SIGNAL: 能夠安全的阻塞
return true;
if (ws > 0) { // CANCEL: 取消排隊的節點,直接從隊列中清除。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { // 0 or PROPAGATE: 須要變成 SIGNAL,但不能當即阻塞,須要重走外層的死循環二次確認。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
複製代碼
值得一提的是,阻塞和喚醒沒有使用常說的 wait()/notify(),而是使用了 LockSupport.park()/unpark()。這應該是出於效率上的考慮。
釋放資源
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/* * 喚醒next節點對應的線程,一般就是老二(直接後繼)。 * 若是是null,或者是cancel狀態(出現異常如線程遇到空指針掛掉了), * 那麼跳過cancel節點,找到後繼節點。 */
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 喚醒 node.next
if (s != null)
LockSupport.unpark(s.thread);
}
複製代碼
釋放的邏輯比較簡單。注意一點,對於 next 節點 unpark(),至關於在把 next 節點從 acquireQueued() 中的死循環中解放出來。
回到 ATM 的例子,至關於,他取完錢,輪到後一我的取錢了。這樣邏輯所有都串起來了。
這樣,順着獨佔鎖這條線,AQS 的獨佔模式就分析完了。其餘還有用於實現閉鎖的共享模式,用於實現 Condition 的條件隊列就不展開了。
Java併發編程實戰(chapter_4)(AQS源碼分析)
《JAVA併發編程實踐》