AQS源碼分析

概述

當咱們提到 juc 包下的鎖,就不得不聯繫到 AbstractQueuedSynchronizer 這個類,這個類就是大名鼎鼎的 AQS,AQS 按字面意思翻譯爲抽象隊列同步器,調用者能夠經過繼承該類快速的實現同步多線程下的同步容器。不論是咱們熟悉的 ReadWriteLock 亦或是 ReentrantLock,或者 CountDownLatch 與 Semaphore,甚至是線程池類 ThreadPoolExecutor 都繼承了 AQS。java

在本文,將深刻源碼,瞭解 AQS 的運行機制,瞭解經過 AQS 實現非公平鎖,公平鎖,可重入鎖等的原理。node

1、AQS 中的數據結構

AQS 的底層數據結構實際上是一條雙向鏈表以及一個表明鎖狀態的變量 state。當加鎖後,state會改變,而競爭鎖的線程會被封裝到節點中造成鏈表,而且嘗試改變 state以獲取鎖。數據結構

1.等待隊列

在 AQS 中有一個 Node 內部類,該類即爲鏈表的節點類。當經過 AQS 競爭鎖的時候,線程會被封裝到一個對應的節點中,多個競爭不到鎖的線程最終會連成一條鏈表,這條鏈表上節點表明的線程處於等待狀態,所以咱們稱之爲等待隊列,也就是 CLH多線程

節點類中封裝了競爭鎖的線程的等待狀態:工具

  • CANCELLED:1,表示當前結點已取消等待。當timeout或被中斷(響應中斷的狀況下),會觸發變動爲此狀態,進入該狀態後的結點將不會再變化。
  • SIGNAL:-1,表示後繼結點在等待當前結點喚醒。後繼結點入隊時,會將前繼結點的狀態更新爲SIGNAL。
  • CONDITION:-2,表示結點等待在Condition上,當其餘線程調用了Condition的signal()方法後,CONDITION狀態的結點將從等待隊列轉移到同步隊列中,等待獲取同步鎖。
  • PROPAGATE:-3,共享模式下,前繼結點不只會喚醒其後繼結點,同時也可能會喚醒後繼的後繼結點
  • 0:新節點入隊時的默認狀態。

和線程池中的狀態同樣,Node 只有小於 0 的時候才處於正常的等待狀態中,所以不少地方經過判斷是否小於 0 來肯定節點是否處於等待狀態oop

static final class Node {
    
    static final Node SHARED = new Node();
    
    static final Node EXCLUSIVE = null;
    
    // 等待狀態
    volatile int waitStatus;
    
    volatile Node prev;

    volatile Node next;

    // 等待線程
    volatile Thread thread;

    // 下一等待節點
    Node nextWaiter;
}

2.鎖狀態

private volatile int state;

AQS 中提供了 state變量作爲鎖狀態,通常來講,0 被視爲無鎖狀態,1 被視爲加鎖狀態,若是是可重入鎖,就會大於 1。ui

所以,AQS 中的加鎖解鎖實際上就是經過 CAS 改變 state的過程,即下列三個方法:this

protected final int getState() {
    return state;
}

protected final void setState(int newState) {
    state = newState;
}

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

2、AQS 獨佔鎖的加鎖過程

AQS 的同步過程其實就是同步隊列節點中依次獲取鎖的過程。AQS 一共提供了獨佔和非獨佔兩種獲取資源的方法線程

  • acquire():以獨佔模式獲取鎖;
  • release():以獨佔模式釋放鎖;
  • acquireShared():以共享模式獲取鎖;
  • releaseShared():以共享模式釋放鎖;

1.獨佔鎖

獨佔鎖和非獨佔鎖二者從流程上來講都差很少,只在一些實現上有區別。翻譯

獨佔鎖,顧名思義,即只有佔有鎖的線程才能操做資源,在 synchronize 底層的鎖中,獨佔經過鎖對象對象頭中的指針來聲明獨佔的線程,而在 AQS 中則經過父類 AbstractOwnableSynchronizer 提供的 exclusiveOwnerThread 變量來聲明獨佔的線程:

private transient Thread exclusiveOwnerThread;

此外,AQS 並未提供其餘具體實現。AQS 獨佔鎖加鎖的方法是 acquire(),其中涉及到 tryAcquire()方法是一個空實現,須要由子類實現並在在裏面進行具體的獨佔判斷:

public final void acquire(int arg) {
    // 嘗試獲取鎖
    if (!tryAcquire(arg) &&
        // 添加到等待隊列
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 進入等待隊列後阻塞
        selfInterrupt();
}

裏面還涉及到 addWaiter(),acquireQueued()selfInterrupt()四個方法。

2.獲取鎖資源

在 AQS 中,tryAccquire() 是一個未實現的方法:

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

他須要由具體的實現類去實現,並完成獲取資源的功能。這裏咱們以用可重入鎖 ReentrantLock 內爲例(後文無聲明亦同)。

在 ReentrantLock 中,鎖分爲公平鎖和非公平鎖兩種,兩者的區別在於公平鎖中等待隊列中的線程嚴格按順序獲取鎖,非公平鎖中的線程可能不會按順序獲取鎖。ReentrantLock 有一個內部類 Sync 繼承了 AQS,提供基本的加鎖解鎖方法。

而後分別有非公平鎖 NonfairSync 類與公平鎖類 FairSync 去繼承 Sync,進一步區別公平鎖與非公平的鎖的實現邏輯。咱們先看公平鎖 FairSync 的tryAccquire()方法:

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 若是當前等待隊列中沒有線程在等待
        if (!hasQueuedPredecessors() &&
            // 嘗試CAS修改state
            compareAndSetState(0, acquires)) {
            // 將當前鎖設爲本身獨佔
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 若是鎖已經被本身獲取過了,即重入
    else if (current == getExclusiveOwnerThread()) {
        // state + 1,即多獲取一次鎖
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    // 沒有獲取鎖
    return false;
}

而非公平鎖與公平鎖的tryAccquire()主要差異在於,公平鎖會先看看有沒有線程在等待,沒有才去競爭鎖,而非公平鎖不會看有沒有線程在等待,不管如何都會先去競爭一次鎖。

其餘鎖的 tryAccquire()與 ReentrantLock 的大致相同。

3.添加節點至等待隊列

addWaiter()方法用於建立並添加等待節點。

private Node addWaiter(Node mode) {
    // 以共享或者獨佔模式建立節點
    Node node = new Node(Thread.currentThread(), mode);
    // 尾插法插入節點
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 若是隊列爲空
    enq(node);
    return node;
}

這裏涉及到一個 enq()方法,這個方法不復雜,主要是自旋初始化 AQS 中的頭結點和尾節點,值得注意的是,這裏的頭結點其實是一個哨兵節點,自己並沒有意義,當等待隊列排隊獲取資源的時候,會直接從 head.next 開始。

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

4.在等待隊列中獲取鎖

這個方法主要作兩件事:

  • 若是當前節點已是隊列第二個結點了,而且獲取鎖成功,就設置當前節點爲新頭結點,而後執行完畢後設置爲中斷;
  • 若是當前節點不是隊列第二個節點,或者獲取鎖不成功,就掛起當前節點,等待上一節點的喚醒。
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 獲取當前節點的上一節點
            final Node p = node.predecessor();
            // 再次嘗試獲取鎖,若是前驅節點已是頭節點了,或者獲取資源成功
            if (p == head && tryAcquire(arg)) {
                // 將當前節點設置爲頭結點
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            
            // 前驅節點不是頭結點或者獲取鎖失敗
            // 若是前驅節點須要被 park 掛起
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 掛起當前線程
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 舊頭結點已經處理完了,直接刪除
        if (failed)
            cancelAcquire(node);
    }
}

這裏涉及到一個 shouldParkAfterFailedAcquire()方法:

這個方法主要是根據前驅節點的狀態判斷當前節點是否須要被 park 的。若是這個方法返回 true,那麼說明前驅節點被設置爲 SIGNAL 狀態,而後進入 parkAndCheckInterrupt()方法把當前線程掛起,等待前驅節點的喚醒。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // 若是前驅節點狀態爲SIGNAL
    if (ws == Node.SIGNAL)
        return true;
    // 若是前驅節點已經失效
    if (ws > 0) {
        // 移除所有失效節點,直到前驅節點爲正常等待狀態的節點爲止
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 將前驅節點設置爲SIGNAL,確保不影響後續節點的喚醒
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

若是上述shouldParkAfterFailedAcquire()返回 ture,那麼就會接着執行 parkAndCheckInterrupt()方法掛起線程:

private final boolean parkAndCheckInterrupt() {
    // 讓當前線程等待,並中斷任務
    LockSupport.park(this);
    return Thread.interrupted();
}

5.總結

當線程使用 acquire()方法獲取鎖的時候:

  • 先執行tryAcquire()方法,這是個須要由子類實現的空方法,公平或者非公平在這個方法中讓線程去獲取鎖,得到鎖的線程要修改 state
  • 而後失敗的線程須要執行addWaiter()方法,這個方法用於將線程封裝到節點中,並以尾插法插入等待隊列的鏈表,同時,若是等待隊列沒有初始化就會在此處先初始化;
  • 接着添加完成的節點執行acquireQueued()方法,此時會再次試圖獲取鎖,若是此時仍是失敗,就會判斷當前節點的前驅節點是否失效,若是不是就直接將前驅節點狀態改成 SIGNAL ,而後執行 parkAndCheckInterrupt()方法掛起當前線程,若是是就一直找到一個正常等待的前驅節點爲止,改前驅節點狀態而後再掛起線程。

3、AQS 獨佔鎖的釋放過程

和 AQS 使用 acquire() 方法加鎖的過程相似,AQS 也有一個 release()的解鎖方法,他們一樣須要實現類本身去實現 tryRelease()方法。

public final boolean release(int arg) {
    // 嘗試釋放鎖
    if (tryRelease(arg)) {
        Node h = head;
        // 若是當前頭節點爲空且不爲初始狀態
        if (h != null && h.waitStatus != 0)
            // 喚醒後繼節點
            unparkSuccessor(h);
        return true;
    }
    return false;
}

1.釋放鎖

tryAcquire()同樣,AQS 不提供 tryRelease()的具體實現,而是交由子類去實現它。

protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

咱們依然以可重入鎖 ReentrantLock 爲例,去了解 ReentrantLock 中 tryRelease()的實現。

雖然 ReentrantLock 中有公平鎖和非公平鎖兩種實現,可是他們是釋放過程都是同樣的,都經過他們的父類,即繼承 AQS 的內部類 Sync 的 tryRelease()方法來實現釋放的功能:

protected final boolean tryRelease(int releases) {
    // 可重入鎖,減去一次持鎖次數
    int c = getState() - releases;
    // 若是當前線程不是持有鎖的線程則拋出異常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 若是可重入次數爲0,說明確實釋放鎖了
    if (c == 0) {
        free = true;
        // 獨佔線程設置爲null
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

這個地方也很好理解,就是讓 tryRelease()去執行釋放鎖的過程,換句話說,就是改變 state

2.喚醒等待隊列的後繼節點

unparkSuccessor()方法的主要用途是

  • 在前驅節點(其實就是等待隊列的頭結點)釋放鎖後,去喚醒等待隊列中的後繼節點;
  • 若是後繼節點處於 CANCELLED 狀態,說明該節點已經掛掉了,就從尾節點向前找到離後繼節點最近的節點去喚醒,不然直接喚醒後繼節點。
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        // 若是頭節點狀態還處於等待狀態,則改回初始狀態
        compareAndSetWaitStatus(node, ws, 0);
    
    Node s = node.next;
    // 若是後繼節點存在並被標記爲CANCELLED狀態
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 從尾節點開始,找到離node最近的處於等待狀態的節點
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 喚醒節點
        LockSupport.unpark(s.thread);
}

3.總結

當線程使用 release()方法釋放鎖的時候:

  • 先執行tryRelease()方法釋放資源,改變 state以釋放鎖
  • 再執行 unparkSuccessor()方法喚醒後繼節點,若是後繼節點掛了,就找到最近的下一個處於等待狀態的有效節點喚醒。

4、AQS 共享鎖的加鎖釋放過程

相對 AQS 獨佔鎖,共享鎖在 AQS 中以及提供好的相關的實現。共享鎖經過 acquireShared()方法加鎖,經過releaseShared()方法解鎖。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

1.獲取鎖資源

tryAcquireShared()也是一個空實現方法,須要由子類去實現。根據註釋,咱們不難理解它的做用:

  • 檢查是否支持共享鎖,若是是才能獲取鎖;
  • 根據後繼等待節點的狀況返回值:大於 0 說明有後繼等待節點,執行完之後繼續喚醒後繼節點;等於 0 說明當前已經是最後一個能夠獲取共享鎖的節點,再也不喚醒後繼節點;小於 0 說明鎖獲取失敗,須要進入等待隊列。

其中,針對共享鎖,比較具備表明性的是讀寫鎖 ReentrantReadWriteLock,它經過 state的高 16 位記錄讀鎖,低 16 位記錄寫鎖,在獲取鎖資源的時候,若是檢測存在寫鎖則沒法得到鎖,若是是讀鎖則獲取資源並遞增讀鎖計數器,這部分的邏輯就是在其子類中獲得的實現。

2.喚醒後繼節點

基於上面的 tryAcquireShared()方法,doAcquireShared()要作的事情顯然很明瞭了:

  • 若是後繼節點能夠以共享模式喚醒,就直接依次喚醒;
  • 不然,則跟獲取獨佔鎖的流程同樣,再次嘗試獲取資源無果後將後將節點表明的線程掛起。
private void doAcquireShared(int arg) {
    // 建立共享模式的節點
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 獲取前驅節點
            final Node p = node.predecessor();
            // 若是前驅節點已是頭結點,即當前節點須要獲取鎖
            if (p == head) {
                // 嘗試獲取共享鎖
                int r = tryAcquireShared(arg);
                // 喚醒後繼節點
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            
            // 判斷當前線程是否須要被掛起
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

這裏有一個 setHeadAndPropagate()方法,根據方法名能夠猜出是用來設置頭結點和喚醒後繼共享節點的:

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    // 設置頭結點
    setHead(node);
    
    // 若是後續有須要喚醒的節點,而且當前節點沒有被CANCELLED
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // 若是下一節點處於共享狀態
        if (s == null || s.isShared())
            // 釋放共享鎖
            doReleaseShared();
    }
}

這裏其實只作了一些條件判斷,確保有後繼節點而且後繼節點是正常節點,核心邏輯實際上是 doReleaseShared()方法:

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 喚醒節點
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     // 若是後續節點不須要喚醒,則設置爲PROPAGATE避免影響後繼節點的喚醒
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

3.解鎖過程

解鎖使用的releaseShared()方法:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

這裏的 tryReleaseShared()其實跟獨佔鎖的tryRelease()相似,即改變狀態以表示釋放資源,而 doReleaseShared()即上文喚醒後繼節點的方法。

4.總結

共享鎖和獨佔鎖的根本區別在於,當頭是共享模式時,它被喚醒後會直接嘗試喚醒後繼全部共享模式的節點,直到遇到第一個非共享模式的節點爲止,而不是跟獨佔鎖同樣只喚醒後繼節點。

5、總結

AQS 在內部爲此了一個變量 state,用於記錄鎖狀態,線程經過 CAS 修改 state便是加鎖解鎖過程。

AQS 內存維護了一條雙向鏈表,即等待隊列 CLH,等待鎖的線程被封裝爲 Node 節點連成鏈表,經過 LockSuppor 工具類的 park()unpark()方法切換等待狀態。

AQS 提供了獨佔和非獨佔兩種鎖實現方式,分別提供了 acquire()/release()acquireShared()/releaseShared()兩套加鎖解鎖方式,同時,基於 state有衍生出可重入和非可重入鎖的實現——即重入鎖在state=1的狀況下繼續遞增,解鎖在 state上遞減直到爲 0 爲止。而且,根據是否先判斷等待隊列中是否已存在等待線程,而後再嘗試獲取鎖的狀況,又分出了公平鎖和非公平鎖兩種實現。

相關文章
相關標籤/搜索