解讀 JUC —— AQS 獨佔模式

1. 前言

提及 JUC,咱們經常會想起其中的線程池(ExecutorService)。然而,咱們今天來看看另外一個核心模塊 AQS。php

AQS 是 AbstractQueuedSynchronizer 的簡稱,在 JUC 中做爲各類同步器的基石。舉個例子,常見的 ReentrantLock 就是由它實現的。java

2. 如何實現一個鎖?

咱們知道,java 有一個關鍵字 synchronized 來給一段代碼加鎖,但是這是 JVM 層面的事情。那麼問題來了,如何在 java 代碼層面來實現模擬一個鎖?node

即實現這樣一個接口:算法

package java.util.concurrent.locks;

public interface Lock {
    void lock();

    void unlock();
}
複製代碼

自旋鎖

一個簡單的想法是:讓全部線程去競爭一個變量owner,確保只有一個線程成功,並設置本身爲owner,其餘線程陷入死循環等待。這即是所謂的自旋鎖編程

一個簡單的代碼實現:api

import java.util.concurrent.atomic.AtomicReference;

public class SpinLock {
   private AtomicReference<Thread> owner = new AtomicReference<Thread>();

   public void lock() {
       Thread currentThread = Thread.currentThread();

       // 若是鎖未被佔用,則設置當前線程爲鎖的擁有者
       while (!owner.compareAndSet(null, currentThread)) {
       }
   }

   public void unlock() {
       Thread currentThread = Thread.currentThread();

       // 只有鎖的擁有者才能釋放鎖
       owner.compareAndSet(currentThread, null);
   }
}
複製代碼

扯這個自旋鎖,主要是爲了引出 AQS 背後的算法 CLH鎖安全

關於CLH鎖更多細節能夠參考篇文章:多線程

自旋鎖、排隊自旋鎖、MCS鎖、CLH鎖併發

3. AQS 的實現

CLH鎖的思想,簡單的說就是:一羣人去ATM取錢,頭一我的拿到鎖,在裏面用銀行卡取錢,其他的人在後面排隊等待;前一我的取完錢出來,喚醒下一我的進去取錢。函數

關鍵部分翻譯成代碼就是:

  • 排隊 -> 隊列
  • 等待/喚醒 -> wait()/notify() 或者別的什麼 api

3.1 同步隊列

AQS 使用節點爲 Node 的雙向鏈表做爲同步隊列。拿到鎖的線程能夠繼續執行代碼,沒拿到的線程就進入這個隊列排隊。

public abstract class AbstractQueuedSynchronizer ... {
    // 隊列頭
    private transient volatile Node head;
    // 隊列尾
    private transient volatile Node tail;

    static final class Node {
        /** 共享模式,可用於實現 CountDownLatch */
        static final Node SHARED = new Node();
        /** 獨佔模式,可用於實現 ReentrantLock */
        static final Node EXCLUSIVE = null;
	
        /** 取消 */
        static final int CANCELLED =  1;
        /** 意味着它的後繼節點的線程在排隊,等待被喚醒 */
        static final int SIGNAL    = -1;
        /** 等待在條件上(與Condition相關,暫不解釋) */
        static final int CONDITION = -2;
        /** * 與共享模式相關,暫不解釋 */
        static final int PROPAGATE = -3;
	
        // 可取值:CANCELLED, 0, SIGNAL, CONDITION, PROPAGATE
        volatile int waitStatus;
	
        volatile Node prev;
	
        volatile Node next;
	
        volatile Thread thread;
	    
        Node nextWaiter;
    }
}
複製代碼

這個隊列大致上長這樣:圖片來源

sync queue

條件隊列是爲了支持 Lock.newCondition() 這個功能,暫時不care,先跳過。

3.2 獨佔模式的 api

AQS 支持獨佔鎖(Exclusive)和共享鎖(Share)兩種模式:

  • 獨佔鎖:只能被一個線程獲取到 (ReentrantLock);
  • 共享鎖:能夠被多個線程同時獲取 (CountDownLatch、ReadWriteLock 的讀鎖)。

這邊咱們只看獨佔模式,它對外提供一套 api:

  • acquire(int n):獲取n個資源(鎖)
  • release(int n):釋放n個資源(鎖)

簡單看一眼怎麼用的 (ReentrantLock 的例子):

public class ReentrantLock implements Lock, java.io.Serializable {
    private final Sync sync;
    
    abstract static class Sync extends AbstractQueuedSynchronizer {...}

    public void lock() {
        sync.acquire(1);
    }

    public void unlock() {
        sync.release(1);
    }
}
複製代碼

能夠看到,AQS 封裝了排隊、阻塞、喚醒之類的操做,使得實現一個鎖變的如此簡潔。

3.2.1 acquire(int)

獲取資源

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
複製代碼

這個函數很短,其中 tryAcquire(int) 爲模板方法,留給子類實現。相似 Activity.onCreate()。

根據 tryAcquire(arg) 的結果,分兩種狀況:

  • 返回 true: 該線程拿到鎖,因爲短路,直接跳出 if,該線程能夠往下執行本身的業務代碼。
  • 返回 false: 該線程沒有拿到鎖,會繼續走 acquireQueued(),執行排隊等待邏輯。

3.2.1.1 addWaiter(Node)

這一步把當前線程(Thread.currentThread())做爲一個Node節點,加入同步隊列的尾部,並標記爲獨佔模式。

固然,加入隊列這個動做,要保證線程安全

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 1處嘗試更快地 enq(), 成功的話直接 return。失敗的話, 在2處退化爲完整版的 enq(),相對更慢些
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) { // 1
            pred.next = node;
            return node;
        }
    }
    enq(node); // 2
    return node;
}

private Node enq(final Node node) {
    // 神奇的死循環 + CAS
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
複製代碼

能夠看到,這邊有一個死循環 + CAS的神奇操做,這是非阻塞算法的經典操做,可自行查閱相關資料。簡單的說,非阻塞算法就是在多線程的狀況下,不加鎖同時保證某個變量(本例中爲雙向鏈表)的線程安全,並且一般比 synchronized 的效率要高。

3.2.1.2 acquireQueued(Node,int)

這個函數主要作兩件事:

  • 查看prev的waitStatus,看是否是須要阻塞,須要的話阻塞該線程
  • 排在隊首的傢伙調用了release(),會喚醒老二。老二嘗試去得到鎖,成功的話本身變成隊首,跳出循環。

結合這張圖來看,每次出隊完須要確保 head 始終指向佔用資源的線程:

sync queue

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 這又是一個死循環 + CAS,此次CAS比較隱蔽,在 shouldParkAfterFailedAcquire()裏邊
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) { // 排在隊首的後面,看看能不能得到鎖,成功的話本身變成隊首
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 這裏失敗會作一些回滾操做,不分析
        if (failed)
            cancelAcquire(node);
    }
}
複製代碼

這邊的 interrupted 主要是保證這樣一個功能。線程在排隊的時候不響應中斷,直到出來之後,若是等待的過程當中被中斷過,做爲彌補,當即相應中斷(即調用selfInterrupt())。

shouldParkAfterFailedAcquire()

查看prev的waitStatus,看是否是須要阻塞。能夠預見的是,通過幾回死循環,所有都會變成SIGNAL狀態。以後所有陷入阻塞。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; // 查看前驅節點的狀態
    if (ws == Node.SIGNAL) // SIGNAL: 能夠安全的阻塞
        return true;
    if (ws > 0) { // CANCEL: 取消排隊的節點,直接從隊列中清除。
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else { // 0 or PROPAGATE: 須要變成 SIGNAL,但不能當即阻塞,須要重走外層的死循環二次確認。
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
複製代碼

值得一提的是,阻塞和喚醒沒有使用常說的 wait()/notify(),而是使用了 LockSupport.park()/unpark()。這應該是出於效率上的考慮。

3.2.2 release(int)

釋放資源

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /* * 喚醒next節點對應的線程,一般就是老二(直接後繼)。 * 若是是null,或者是cancel狀態(出現異常如線程遇到空指針掛掉了), * 那麼跳過cancel節點,找到後繼節點。 */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 喚醒 node.next
    if (s != null)
        LockSupport.unpark(s.thread);
}
複製代碼

釋放的邏輯比較簡單。注意一點,對於 next 節點 unpark(),至關於在把 next 節點從 acquireQueued() 中的死循環中解放出來。

回到 ATM 的例子,至關於,他取完錢,輪到後一我的取錢了。這樣邏輯所有都串起來了。

4. 總結

這樣,順着獨佔鎖這條線,AQS 的獨佔模式就分析完了。其餘還有用於實現閉鎖的共享模式,用於實現 Condition 的條件隊列就不展開了。

5. 參考

Java併發編程實戰(chapter_4)(AQS源碼分析)

JUC源碼分析—AQS

自旋鎖、排隊自旋鎖、MCS鎖、CLH鎖

《JAVA併發編程實踐》

相關文章
相關標籤/搜索