閱讀 JDK 源碼:AQS 中的獨佔模式

AbstractQueuedSynchronizer,簡稱 AQS,是一個用於構建鎖和同步器的框架。
JUC 包下常見的鎖工具如 ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch 都是基於 AQS 實現的。
本文將介紹 AQS 的數據結構及獨佔模式的實現原理。java

本文基於 jdk1.8.0_91

1. AQS 框架

AQS 全部操做都圍繞着同步資源(synchronization state)來展開,解決了資源訪問的互斥和同步問題。node

  • 支持獨佔、共享方式來訪問資源。
  • 對於沒法獲取資源的線程,在同步隊列中等待,直到獲取資源成功(或超時、中斷)並出隊。
  • 線程成功獲取資源以後,若指定條件不成立,則釋放資源並進入條件隊列中等待,直到被喚醒,再轉移到同步隊列中等待再次獲取資源。

AQS框架將剩下的一個問題留給用戶:獲取、釋放資源的具體方式和結果。
這實際上是一種典型的模板方法設計模式:父類(AQS框架)定義好骨架和內部操做細節,具體規則由子類去實現。segmentfault

1.1 繼承體系

繼承體系

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable

AbstractQueuedSynchronizer 繼承 AbstractOwnableSynchronizer,後者具備屬性 exclusiveOwnerThread,用於記錄獨佔模式下得到鎖的線程。設計模式

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
    /**
     * The current owner of exclusive mode synchronization.
     */
    private transient Thread exclusiveOwnerThread;
}

AbstractQueuedSynchronizer 具備 ConditionObject 和 Node 兩個內部類。數據結構

  • ConditionObject 是對 Condition 接口的實現,能夠與 Lock 配合使用。
  • Node 是 AQS 中同步隊列、條件隊列的節點。

1.2 模板方法

AQS 定義了一系列模板方法以下:併發

// 獨佔獲取(資源數)
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

// 獨佔釋放(資源數)
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

// 共享獲取(資源數)
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

// 共享獲取(資源數)
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

// 是否排它狀態
protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}

Java 中經常使用的鎖工具都是基於 AQS 來實現的。app

AQS工具

2. 數據結構

2.1 資源定義

鎖和資源是同一個概念,是多個線程爭奪的對象。
AQS 使用 state 來表示資源/鎖,經過內置的等待隊列來完成獲取資源/鎖的排隊工做。
等待隊列(wait queue)是嚴格的 FIFO 隊列,是 CLH 鎖隊列的變種。框架

state

因爲 state 是共享的,使用 volatile 來保證其可見性,並提供了getState/setState/compareAndSetState三個方法來操做 state。ide

/**
 * The synchronization state.
 */                          
private volatile int state;// 資源/鎖

/**
 * Returns the current value of synchronization state.
 * This operation has memory semantics of a {@code volatile} read.
 * @return current state value
 */
protected final int getState() {
    return state;
}

/**
 * Sets the value of synchronization state.
 * This operation has memory semantics of a {@code volatile} write.
 * @param newState the new state value
 */
protected final void setState(int newState) {
    state = newState;
}

/**
 * Atomically sets synchronization state to the given updated
 * value if the current state value equals the expected value.
 * This operation has memory semantics of a {@code volatile} read
 * and write.
 *
 * @param expect the expected value
 * @param update the new value
 * @return {@code true} if successful. False return indicates that the actual
 *         value was not equal to the expected value.
 */
protected final boolean compareAndSetState(int expect, int update) { // 原子操做
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

2.2 節點定義

AQS 的內部實現了兩個隊列:同步隊列和條件隊列。這兩種隊列都使用了 Node 做爲節點。工具

節點的定義主要包含三部份內容:

  1. 節點的狀態:SIGNAL、CANCELLED、CONDITION、PROPAGATE、0。
  2. 節點的模式:同步隊列中的節點具備兩種模式,獨佔(SHARED)、共享(EXCLUSIVE)。
  3. 節點的指向:同步隊列是雙向鏈表(prev/next),條件隊列是單向鏈表(nextWaiter)。

節點的狀態

  • CANCELLED:值爲 1,表示當前節點因爲超時或中斷被取消。
  • SIGNAL: 值爲 -1,喚醒信號,表示當前節點的後繼節點正在等待獲取鎖。該狀態下的節點在 release 或 cancel 時須要執行 unpark 來喚醒後繼節點。
  • CONDITION:值爲 -2,表示當前節點爲條件隊列節點,同步隊列的節點不會有這個狀態。當節點從條件隊列轉移到同步隊列時,狀態會初始化爲 0。
  • PROPAGATE:值爲 -3,只有共享模式下,同步隊列的頭節點纔會設置爲該狀態(見 doReleaseShared),表示後繼節點能夠發起獲取共享資源的操做。
  • 0:初始狀態,表示當前節點在同步隊列中,等待獲取鎖。

java.util.concurrent.locks.AbstractQueuedSynchronizer.Node

static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;

    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /** waitStatus value to indicate the next acquireShared should unconditionally propagate */ 
    static final int PROPAGATE = -3;

    // 等待狀態:SIGNAL、CANCELLED、CONDITION、PROPAGATE、0
    volatile int waitStatus;

    // 指向同步隊列中的上一個節點
    volatile Node prev;

    // 指向同步隊列中的下一個節點
    volatile Node next;

    volatile Thread thread;

    // 在同步隊列中,nextWaiter用於標記節點的模式:獨佔、共享
    // 在條件隊列中,nextWaiter指向條件隊列中的下一個節點
    Node nextWaiter;

    /**
     * Returns true if node is waiting in shared mode.
     */
    // 節點模式是否爲共享
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    /**
     * Returns previous node, or throws NullPointerException if null.
     * Use when predecessor cannot be null.  The null check could
     * be elided, but is present to help the VM.
     *
     * @return the predecessor of this node
     */
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;          
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

2.3 同步隊列

同步隊列
同步隊列是等待獲取鎖的隊列,是一個雙向鏈表(prev/next),使用 head/tail 執行隊列的首尾節點。

java.util.concurrent.locks.AbstractQueuedSynchronizer

/**
 * Head of the wait queue, lazily initialized.  Except for         
 * initialization, it is modified only via method setHead.  Note:
 * If head exists, its waitStatus is guaranteed not to be          
 * CANCELLED.
 */
// 等待隊列的頭節點,懶初始化。 
// 注意,若是頭節點存在,那麼它的 waitStatus 必定不是 CANCELLED
private transient volatile Node head;

/**
 * Tail of the wait queue, lazily initialized.  Modified only via  
 * method enq to add new wait node.                                
 */
// 等待隊列的尾節點,懶初始化。 
// 只能經過 enq 方法給等待隊列添加新的節點。
private transient volatile Node tail;

/**
 * The synchronization state.
 */
private volatile int state;

在線程嘗試獲取資源失敗後,會進入同步隊列隊尾,給前繼節點設置一個喚醒信號後,經過 LockSupport.park(this) 讓自身進入等待狀態,直到被前繼節點喚醒。

當線程在同步隊列中等待,獲取資源成功後,經過執行 setHead(node) 將自身設爲頭節點。
同步隊列的頭節點是一個 dummy node,它的 thread 爲空(某些狀況下能夠看作是表明了當前持有鎖的線程)。

/**
 * Sets head of queue to be node, thus dequeuing. Called only by
 * acquire methods.  Also nulls out unused fields for sake of GC
 * and to suppress unnecessary signals and traversals.
 *
 * @param node the node
 */
private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

AQS 不會在初始化隊列的時候構建空的頭節點(dummy node),而是在第一次發生爭用時構造:
第一個線程獲取鎖,第二個線程獲取鎖失敗入隊,此時纔會初始化隊列,構造空節點並將 head/tail 指向該空節點。
具體見 AbstractQueuedSynchronizer#enq。

2.4 條件隊列

條件隊列

條件隊列是等待條件成立的隊列,是一個單向鏈表(nextWaiter),使用 firstWaiter/lastWaiter 指向隊列的首尾節點。

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject

/** First node of condition queue. */
private transient Node firstWaiter; // 條件隊列的頭節點
/** Last node of condition queue. */
private transient Node lastWaiter;  // 條件隊列的尾節點

當線程獲取鎖成功以後,執行 Conition.await(),釋放鎖並進入條件隊列中等待,直到其餘線程執行 Conition.signal 喚醒當前線程。

當前線程被喚醒後,從條件隊列轉移到同步隊列,從新等待獲取鎖。

3. 獨佔模式

獨佔模式下,只要有一個線程佔有鎖,其餘線程試圖獲取該鎖將沒法取得成功。

3.1 獲取鎖-acquire

獨佔模式下獲取鎖/資源,無視中斷,Lock#lock的內部實現

java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire

public final void acquire(int arg) { 
    if (!tryAcquire(arg) &&                            
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
        selfInterrupt();
}
  1. tryAcquire:嘗試直接獲取資源/鎖,若是成功則直接返回,失敗進入下一步;
  2. addWaiter:獲取資源/鎖失敗後,將當前線程加入同步隊列的尾部,並標記爲獨佔模式,返回新入隊的節點;
  3. acquireQueued:使線程在同步隊列等待獲取資源,一直獲取到後才返回,若是在等待過程當中被中斷過,則返回true,不然返回false。
  4. selfInterrupt:若是線程在等待過程當中被中斷過,在獲取資源成功以後,把中斷狀態補上。

3.1.1 tryAcquire

嘗試獲取資源,成功返回true。具體資源獲取方式交由自定義同步器實現。

java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquire

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

3.1.2 addWaiter

獲取資源/鎖失敗後,將當前線程封裝爲新的節點,設置節點的模式(獨佔、共享),加入同步隊列的尾部,返回該新節點。

java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter

/**
 * Creates and enqueues node for current thread and given mode.
 *
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared // 獨佔模式、共享模式
 * @return the new node
 */
// 從隊列尾部入隊
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) { // 設置新的尾節點
            pred.next = node;
            return node;
        }
    }
    enq(node); // tail爲空,入隊
    return node; // 返回當前的新節點
}

enq

從同步隊列的尾部入隊,若是隊列不存在則進行初始化。

java.util.concurrent.locks.AbstractQueuedSynchronizer#enq

/**
 * Inserts node into queue, initializing if necessary. See picture above.
 * @param node the node to insert
 * @return node's predecessor
 */
private Node enq(final Node node) { // 從同步隊列的尾部入隊
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize   // 隊列爲空,則建立一個空節點,做頭節點
            if (compareAndSetHead(new Node()))
                tail = head;                  // 初始化完成後並無返回,而是進行下一次循環
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) { // 隊列不爲空,則當前節點做爲新的tail // CAS失敗,可能會出現尾分叉的現象,由下一次循環消除分叉
                t.next = node;                // 因爲不是原子操做,入隊操做先設置prev指針,再設置next指針,會致使併發狀況下沒法經過next遍歷到尾節點
                return t;                     // 返回當前節點的上一個節點(舊的尾節點)
            }
        }
    }
}

注意:

  1. 當第一次發生爭用時,爭奪鎖失敗的線程入隊,會先構造空節點(dummy node)做爲 head/tail 節點進行初始化隊列,再從隊列尾部入隊。
  2. 入隊時,依次設置 node.prev、tail、pred.next 指針,是非原子操做。
  3. 設置 prev 以後,若 CAS 設置 tail 失敗,說明其餘線程先一步入隊了,此時進入下一次循環會修正 prev 的指向。
  4. 因爲入隊是非原子操做,併發狀況下可能沒法從 head 開始經過 next 遍歷到尾節點 tail,可是從尾節點 tail 開始經過 prev 向前遍歷能夠訪問到完整的隊列。

3.1.3 acquireQueued

在同步隊列自旋、等待獲取資源直到成功,返回等待期間的中斷狀態。

java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued

/**
 * Acquires in exclusive uninterruptible mode for thread already in
 * queue. Used by condition wait methods as well as acquire.
 *
 * @param node the node
 * @param arg the acquire argument
 * @return {@code true} if interrupted while waiting
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) { // 上一個節點若是是頭結點,說明當前節點的線程能夠嘗試獲取鎖資源
                // 獲取鎖成功,當前節點做爲新的頭節點,而且清理掉當前節點中的線程信息(也就是說頭節點是個dummy node)
                // 這裏不會發生爭用,不須要CAS
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 上一個節點不是頭節點,或者當前節點的線程獲取鎖失敗,須要判斷是否進入阻塞:
            // 1. 不能進入阻塞,則重試獲取鎖。2. 進入阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt()) // 阻塞當前線程,當從阻塞中被喚醒時,檢測當前線程是否已中斷,並清除中斷狀態。接着繼續重試獲取鎖。
                interrupted = true;      // 標記當前線程已中斷(若是線程在阻塞時被中斷喚醒,會重試獲取鎖直到成功以後,再響應中斷)
        }
    } finally {
        if (failed)              // 自旋獲取鎖和阻塞過程當中發生異常
            cancelAcquire(node); // 取消獲取鎖
    }
}

在 acquireQueued 方法中,線程在自旋中主要進行兩個判斷:

  1. 可否獲取鎖
  2. 可否進入阻塞

具體代碼流程:

  1. 在同步隊列中自旋,若判斷前繼節點爲頭節點,則當前節點嘗試獲取鎖。
  2. 若當前線程獲取鎖成功,則將當前節點設爲頭節點,返回當前線程的中斷狀態。
  3. 若當前線程沒法獲取鎖、獲取鎖失敗,則判斷是否進入阻塞。
  4. 若是沒法進入阻塞,則繼續自旋,不然進入阻塞。
  5. 線程從阻塞中被喚醒後,檢查並標記線程的中斷狀態,從新進入自旋。

shouldParkAfterFailedAcquire

當前節點獲取鎖失敗以後,經過校驗上一個節點的等待狀態,判斷當前節點可否進入阻塞。
返回 true,可進入阻塞;返回 false,不可進入阻塞,需重試獲取鎖。

java.util.concurrent.locks.AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire

/**
 * Checks and updates status for a node that failed to acquire. 
 * Returns true if thread should block. This is the main signal
 * control in all acquire loops.  Requires that pred == node.prev.
 *
 * @param pred node's predecessor holding status
 * @param node the node
 * @return {@code true} if thread should block
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release // 當前節點已經給它的上一個節點設置了喚醒信號
         * to signal it, so it can safely park.              // 當前節點能夠進入阻塞
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and // 上一個節點狀態大於 0,說明是已取消狀態 CANCELLED,不會通知當前節點
         * indicate retry.                                       // 則一直往前找到一個等待狀態的節點,並排在它的後邊
         */                                                      // 當前節點不能進入阻塞,需重試獲取鎖
        do {
            node.prev = pred = pred.prev; // pred = pred.prev; node.prev = pred; // 跳過上一個節點,直到找到 waitStatus > 0 的節點
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we    // 上一個節點狀態等於 0 或 PROPAGATE,說明正在等待獲取鎖/資源
         * need a signal, but don't park yet.  Caller will need to // 此時須要給上一個節點設置喚醒信號SIGNAL,但不直接阻塞
         * retry to make sure it cannot acquire before parking.    // 由於在阻塞前調用者須要重試來確認它確實不能獲取資源
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 經過 CAS 將上一個節點的狀態改成 SIGNAL
    }
    return false;
}

當前節點可以進入阻塞的條件是:具備其餘線程來喚醒它。
經過設置上一個節點狀態爲 SIGNAL,以確保上一個節點在釋放鎖以後,可以喚醒當前節點。

分爲三種狀況:

  1. 上一個節點狀態爲 Node.SIGNAL,說明當前節點已具有被喚醒的條件,能夠進入阻塞。
  2. 上一個節點狀態爲已取消,則把當前節點排到未取消的節點後面,繼續自旋不進入阻塞。
  3. 上一個節點狀態爲 0 或 PROPAGATE,說明正在等待獲取鎖,則當前節點將上一個節點設爲 SIGNAL,繼續自旋不進入阻塞。

parkAndCheckInterrupt

進入阻塞,阻塞結束後,檢查中斷狀態。

java.util.concurrent.locks.AbstractQueuedSynchronizer#parkAndCheckInterrupt

/**
 * Convenience method to park and then check if interrupted
 *
 * @return {@code true} if interrupted
 */
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

cancelAcquire

線程在 acquireQueued 中自旋嘗試獲取鎖的過程當中,若是發生異常,會在 finally 代碼塊中執行 cancelAcquire,終止獲取鎖。

/**
 * Cancels an ongoing attempt to acquire.
 *
 * @param node the node
 */
private void cancelAcquire(Node node) { // 取消獲取鎖
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // Skip cancelled predecessors // 跳過已取消的前繼節點,爲當前節點找出一個有效的前繼節點
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.        // 寫操做具備可見性(volatile),所以這裏無需使用 CAS
    // After this atomic step, other Nodes can skip past us.   // 把當前節點設爲已取消以後,其餘節點尋找有效前繼節點時會跳過當前節點
    // Before, we are free of interference from other threads.
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    if (node == tail && compareAndSetTail(node, pred)) { // 若是是尾節點,則出隊
        compareAndSetNext(pred, predNext, null);
    } else {                                                      // 進入這裏,說明不是尾節點,或者是尾節點但出隊失敗,須要處理後繼節點
        // If successor needs signal, try to set pred's next-link // 若是後繼節點須要獲得通知,則嘗試給它找一個新的前繼節點
        // so it will get one. Otherwise wake it up to propagate. // 不然把後繼節點喚醒
        int ws;
        if (pred != head &&                                       // 前繼節點不是頭節點
            ((ws = pred.waitStatus) == Node.SIGNAL ||             // 前繼節點的狀態爲SIGNAL 或者 前繼節點的狀態爲未取消且嘗試設置爲SIGNAL成功
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)    // 後繼節點存在且未取消
                compareAndSetNext(pred, predNext, next); // 給後繼節點設置一個新的前繼節點(即前面找的有效節點),當前節點出隊
        } else {
            unparkSuccessor(node); // 若是存在後繼節點,這裏說明沒法給後繼節點找到新的前繼節點(可能前繼節點是head,或者前繼節點失效了),直接喚醒該後繼節點
        }

        node.next = node; // help GC
    }
}

節點 node 取消獲取鎖,說明當前節點 node 狀態變爲已取消,成爲一個無效節點。

須要考慮如何處理節點 node 的後繼節點:

  1. 無後繼節點,則須要將最後一個有效節點(waitStatus <= 0)設爲 tail。
  2. 存在後繼節點,則須要將它掛在最後的有效節點以後,後續由該節點來喚醒後繼節點。
  3. 存在後繼節點,且找不到有效的前繼節點,則直接把該後繼節點喚醒。

unparkSuccessor

喚醒當前節點的後繼節點。

java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor

/**
 * Wakes up node's successor, if one exists.
 *
 * @param node the node
 */
// 喚醒當前節點的後繼節點 
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus; // 若是當前節點的狀態爲已取消,則不變;若是小於0(有可能後繼節點須要當前節點來喚醒),則清零。
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0); // CAS失敗也無所謂(說明後繼節點的線程先一步修改了當前節點的狀態),由於接下來會手動喚醒後繼節點

    Node s = node.next;
    if (s == null || s.waitStatus > 0) { // 後繼節點爲空,或已取消,則從tail開始向前遍歷有效節點
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t; // // 注意! 這裏找到了以後並無return, 而是繼續向前找
    }
    if (s != null)
        LockSupport.unpark(s.thread); // 喚醒後繼節點(或者是隊列中距離head節點最近的有效節點)的線程
}

一般狀況下, 要喚醒的節點就是本身的後繼節點。若是後繼節點存在且也在等待鎖, 那就直接喚醒它。
可是有可能存在 後繼節點取消等待鎖 的狀況,此時從尾節點開始向前找起, 直到找到距離 head 節點最近的未取消的節點,對它進行喚醒。

爲何不從當前節點向後遍歷有效節點呢?

  1. 當前節點多是尾節點,不存在後繼節點。
  2. 入隊時先設置 prev 指針,再設置 next 指針(見 AbstractQueuedSynchronizer#enq),是非原子操做,根據 prev 指針往前遍歷比較準確。

3.2 獲取鎖-acquireInterruptibly

對比 acquire,二者對獲取鎖過程當中發生中斷的處理不一樣。

  1. acquire 等待鎖的過程發生中斷,會等到獲取鎖成功以後,再處理中斷。
  2. acquireInterruptibly 等待鎖的過程發生中斷,會當即拋出 InterruptedException,再也不等待獲取鎖。

java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireInterruptibly

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireInterruptibly

/**
 * Acquires in exclusive interruptible mode.
 * @param arg the acquire argument
 */
private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException(); // 線程在阻塞等待鎖的過程當中,被中斷喚醒,則放棄等待鎖,直接拋出異常
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

3.3 釋放鎖-release

獨佔模式下釋放鎖/資源,是 Lock#unlock 的內部實現。

java.util.concurrent.locks.AbstractQueuedSynchronizer#release

public final boolean release(int arg) {
    if (tryRelease(arg)) { // 釋放鎖資源
        Node h = head;
        if (h != null && h.waitStatus != 0) // head.waitStatus == 0,說明head節點後沒有須要喚醒的節點
            unparkSuccessor(h); // 喚醒head的後繼節點
        return true;
    }
    return false;
}
  1. tryRelease:釋放鎖資源,釋放成功則進入下一步。
  2. unparkSuccessor:若是同步隊列的頭節點存在且知足 waitStatus != 0,則喚醒後繼節點。

java.util.concurrent.locks.AbstractQueuedSynchronizer#tryRelease

protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

頭節點 h 的狀態:

  • h.waitStatus == 0,同步隊列中的節點初始狀態爲 0,說明沒有須要喚醒的後繼節點。
  • h.waitStatus < 0,獨佔模式下,說明是 SIGNAL 狀態,此時具備後繼節點等待喚醒,見 AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire。
  • h.waitStatus > 0,說明是 CANCELLED 狀態,頭節點是由取消獲取鎖的節點而來的,保險起見,檢查有沒有須要喚醒的後繼節點。

相關閱讀:
閱讀 JDK 源碼:AQS 中的獨佔模式
閱讀 JDK 源碼:AQS 中的共享模式
閱讀 JDK 源碼:AQS 對 Condition 的實現

做者:Sumkor

相關文章
相關標籤/搜索