AbstractQueuedSynchronizer 隊列同步器(AQS)

AbstractQueuedSynchronizer 隊列同步器(AQS)

隊列同步器 (AQS), 是用來構建鎖或其餘同步組件的基礎框架,它經過使用 int 變量表示同步狀態,經過內置的 FIFO 的隊列完成資源獲取的排隊工做。(摘自《Java併發編程的藝術》)java

咱們知道獲取同步狀態有獨佔和共享兩種模式,本文先針對獨佔模式進行分析。node

變量定義

private transient volatile Node head;
複製代碼

head 同步隊列頭節點編程

private transient volatile Node tail;
複製代碼

tail 同步隊列尾節點安全

private volatile int state;
複製代碼

state 同步狀態值併發

Node - 同步隊列節點定義

volatile int waitStatus;
複製代碼

waitStatus 節點的等待狀態,可取值以下 :app

  • 0 : 初始狀態
  • -1 : SIGNAL 處於該狀態的節點,說明其後置節點處於等待狀態; 若當前節點釋放了鎖可喚醒後置節點
  • -2 : CONDITION 該狀態與 Condition 操做有關後續在說明
  • -3 : PROPAGATE 該狀態與共享式獲取同步狀態操做有關後續在說明
  • 1 : CANCELLED 處於該狀態的節點會取消等待,從隊列中移除
volatile Node prev;
複製代碼

prev 指向當前節點的前置節點框架

volatile Node next;
複製代碼

next 指向當前節點的後置節點ui

volatile Thread thread;
複製代碼

thread 節點對應的線程也是指當前獲取鎖失敗的線程this

Node nextWaiter;
複製代碼

acquire()

獨佔模式下獲取同步狀態, 既是當前只容許一個線程獲取到同步狀態atom

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
複製代碼

從 acquire 方法中咱們能夠大概猜想下,獲取鎖的過程以下:

  • tryAcquire 嘗試獲取同步狀態, 具體如何斷定獲取到同步狀態由子類實現
  • 當獲取同步狀態失敗時,執行 addWaiter 建立獨佔模式下的 Node 並將其添加到同步隊列尾部
  • 加入同步隊列以後,再次嘗試獲取同步狀態,當達到某種條件的時候將當前線程掛起等待喚醒

下面具體看下各個階段如何實現:

private Node addWaiter(Node mode) {
	// 綁定當前線程 建立 Node 節點
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    // 判斷同步隊列尾節點是否爲空
    if (pred != null) {
    	// node 的前置節點指向隊列尾部
        node.prev = pred;
        // 將同步隊列的 tail 移動指向 node
        if (compareAndSetTail(pred, node)) {
        	// 將原同步隊列的尾部後置節點指向 node
            pred.next = node;
            return node;
        }
    }
    // tail 爲空說明同步隊列還未初始化
    // 此時調用 enq 完成隊列的初始化及 node 入隊
    enq(node);
    return node;
}
複製代碼
private Node enq(final Node node) {
	// 輪詢的方式執行
	// 成功入隊後退出
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
        	// 建立 Node, 並將 head 指向該節點
        	// 同時將 tail 指向該節點
        	// 完成隊列的初始化
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
        	// node 的前置節點指向隊列尾部
            node.prev = t;
            // 將同步隊列的 tail 移動指向 node
            if (compareAndSetTail(t, node)) {
            	// 將原同步隊列的尾部後置節點指向 node
                t.next = node;
                return t;
            }
        }
    }
}
複製代碼

從代碼中能夠看出經過 CAS 操做保證節點入隊的有序安全,其入隊過程當中以下圖所示:

AQS節點入隊過程

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 
        for (;;) {
        	// 獲取當前節點的前置節點
            final Node p = node.predecessor();
            // 判斷前置節點是否爲 head 頭節點
            // 若前置節點爲 head 節點,則再次嘗試獲取同步狀態
            if (p == head && tryAcquire(arg)) {
            	// 若獲取同步狀態成功
            	// 則將隊列的 head 移動指向當前節點
                setHead(node);
                // 將原頭部節點的 next 指向爲空,便於對象回收
                p.next = null; // help GC
                failed = false;
                // 退出輪詢過程
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
複製代碼
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /* * This node has already set status asking a release * to signal it, so it can safely park. */
    	// 若前置節點狀態爲 -1 ,則說明後置節點 node 能夠安全掛起了
        return true;
    if (ws > 0) {
        /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
        do {
        	// ws > 0 說明前置節點狀態爲 CANCELLED , 也就是說前置節點爲無效節點
        	// 此時從前置節點開始向隊列頭節點方向尋找有效的前置節點
        	// 此操做也便是將 CANCELLED 節點從隊列中移除
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
        // 若前置節點狀態爲初始狀態 則將其狀態設爲 -1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
複製代碼
private final boolean parkAndCheckInterrupt() {
	// 將當前線程掛起
    LockSupport.park(this);
    // 被喚醒後檢查當前線程是否被掛起
    return Thread.interrupted();
}
複製代碼

從 acquireQueued 的實現能夠看出,節點在入隊後會採用輪詢的方式(自旋)重複執行如下過程:

  • 判斷前置節點是否爲 head, 若爲 head 節點則嘗試獲取同步狀態; 若獲取同步狀態成功則移動 head 指向當前節點並退出循環
  • 若前置節點非 head 節點或者獲取同步狀態失敗,則將前置節點狀態修改成 -1, 並掛起當前線程,等待被喚醒重複執行以上過程

以下圖所示:

AQS-節點自旋活動圖

接下來咱們看看同步狀態釋放的實現。

release

釋放同步狀態

public final boolean release(int arg) {
	// 嘗試釋放同步狀態
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
        	// 喚醒後置節點
            unparkSuccessor(h);
        return true;
    }
    return false;
}
複製代碼
private void unparkSuccessor(Node node) {
    /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */
    int ws = node.waitStatus;
    if (ws < 0)
    	// 將 head 節點狀態改成 0
        compareAndSetWaitStatus(node, ws, 0);

    /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
    // 獲取後置節點
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
    	// 喚醒後置節點上所阻塞的線程
        LockSupport.unpark(s.thread);
}
複製代碼

從上述代碼,咱們能夠明白釋放同步狀態的過程以下:

  • 調用 tryRelease 嘗試釋放同步狀態,一樣其具體的實現由子類控制
  • 成功釋放同步狀態後,將 head 節點狀態改成 0
  • 喚醒後置節點上阻塞的線程

以下圖所示(紅色曲線表示節點自旋過程) :

AQS-釋放鎖

acquireInterruptibly()

獨佔模式下獲取同步狀態,不一樣於 acquire 方法,該方法對中斷操做敏感; 也就是說當前線程在獲取同步狀態的過程當中,若被中斷則會拋出中斷異常

public final void acquireInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
    	// 檢查線程是否被中斷
    	// 中斷則拋出中斷異常由調用方處理
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}
複製代碼
private void doAcquireInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 不一樣於 acquire 的操做,此處在喚醒後檢查是否中斷,若被中斷直接拋出中斷異常
                throw new InterruptedException();
        }
    } finally {
        if (failed)
        	// 拋出中斷異常後最終執行 cancelAcquire
            cancelAcquire(node);
    }
}
複製代碼
private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // Skip cancelled predecessors
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    // 若當前節點爲 tail 節點,則將 tail 移動指向 node 的前置節點
    if (node == tail && compareAndSetTail(node, pred)) {
    	// 同時將node 前置節點的 next 指向 null
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
        	// 當前節點位於隊列中部 
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
            	// 將前置節點的 next 指向 node 的後置節點
                compareAndSetNext(pred, predNext, next);
        } else {
        	// 若 node 的前置節點爲 head 節點則喚醒 node 節點的後置節點
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}
複製代碼

從 acquireInterruptibly 的實現能夠看出,若線程在獲取同步狀態的過程當中出現中斷操做,則會將當前線程對應的同步隊列等待節點從隊列中移除並喚醒可獲取同步狀態的線程。

tryAcquireNanos()

獨佔模式超時獲取同步狀態,該操做與acquireInterruptibly同樣對中斷操做敏感,不一樣在於超過等待時間若未獲取到同步狀態將會返回

public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}
複製代碼
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    // 計算等待到期時間
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
            	// 超時時間到期直接返回
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                // 按指定時間掛起s
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
複製代碼

節點的狀態

同步隊列中的節點在自旋獲取同步狀態的過程當中,會將前置節點的狀態由 0 初始狀態改成 -1 (SIGNAL), 如果中斷敏感的操做則會將狀態由 0 改成 1 (CANCELLED)

同步隊列中的節點在釋放同步狀態的過程當中會將同步隊列的 head 節點的狀態改成 0, 也便是由 -1(SIGNAL) 變爲 0;

小結

本文主要分析了獨佔模式獲取同步狀態的操做,其大概流程以下:

  • 在獲取同步狀態時,AQS 內部維護了一個同步隊列,獲取狀態失敗的線程會被構造一個節點加入到隊列中並進行一系列自旋操做
  • 在釋放同步狀態時,喚醒 head 的後置節點去獲取同步狀態
相關文章
相關標籤/搜索