按部就班理解AQS(2):AQS實現(未完)

概述

上一篇 咱們已經討論了鎖的實現思路,那麼AbstractQueuedSynchronizer(如下簡稱AQS)和前面的實現有不一樣?node

  1. 當線程獲取不到鎖時,AQS使用自旋和阻塞;
  2. 爲了支持取消和超時操做,AQS對CLH鎖的隊列進行了改進,增長顯式的連接指向前繼節點。若是直接前繼節點取消或者超時了,就尋找直接前繼的前繼;
  3. 因爲釋放鎖須要通知後繼節點,AQS又增長了後繼節點連接進行優化(非必要)。

功能

一個同步器通常須要包含如下兩個操做:bash

  1. 獲取操做:acquire

阻塞調用的線程,直到同步狀態容許其繼續執行。app

while (同步狀態獲取失敗) {
  若是當前線程還未入隊,則加入隊列;
  阻塞當前線程;
}
若是當前線程在隊列中,則移除
複製代碼
  1. 釋放操做:release

經過某種方式改變同步狀態,使得一或多個被acquire阻塞的線程繼續執行。post

更新同步狀態
if (同步狀態許可一個阻塞線程進行獲取) {
  喚醒一個或多個隊列中的線程
}
複製代碼

所以咱們能夠從這兩個接口入手對源碼進行解讀。另外須要補充說明的是,鎖的實現能夠分爲獨佔鎖和共享鎖,簡單起見,咱們先聚焦獨享鎖的代碼實現,後續再看共享鎖的差別性。優化

源碼解讀

acquire

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
複製代碼
  1. 嘗試獲取鎖,獲取成功直接返回,不執行後續操做;
  2. 建立表明當前線程的節點,加入等待隊列;
  3. 自旋前繼節點狀態和阻塞線程
tryAcquire

該方法檢查同步狀態state是否被許可,通俗來說就是看看是否能取到鎖。AQS中的實現只拋出異常,因此基於AQS實現的鎖須要實現這個方法。ui

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
複製代碼
Node

當同步狀態沒有被許可時,須要在等待隊列中排隊,所以須要建立一個表明該線程的節點加入隊列。下面咱們來看節點的定義(刪減了部分目前無須關注的屬性)。this

static final class Node {
    static final Node EXCLUSIVE = null;

    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    
    volatile int waitStatus;
    volatile Thread thread;
}
複製代碼
  • EXCLUSIVE:表示節點類型是獨佔鎖。
  • waitStatus:描述狀態,目前咱們只關注CANCELLED(因爲超時或線程中斷致使取消等待)和SIGNAL(表示若是該節點釋放了鎖,須要通知後繼節點,後繼節點在等待被喚醒)兩種狀態。
  • thread:節點對應的線程。
addWaiter

接下來須要把節點加入到等待隊列,總體思路是在隊尾插入節點。spa

入隊的時候須要考慮隊尾爲空和不爲空兩種狀況,不過AQS的實現上是認爲多數狀況下隊尾都不爲空,所以先按照隊尾不爲空的方式嘗試快速入隊,若是失敗才用完整的入隊邏輯去入隊。線程

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
複製代碼
enq

初始化隊列的頭節點和尾節點:code

compareAndSetHead

在隊尾插入新節點,addWaiter中快速插入新節點的路徑就是這塊邏輯:

addWaiter

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
複製代碼
acquireQueued

節點加入隊列後,接下來要作的事就是不斷檢查狀態是否可用。這裏實現的思路是先看前繼節點是不是頭節點(由於只有頭節點釋放鎖後,後繼節點纔有可能獲取到鎖),而後再去檢查狀態;若是前繼不是頭節點,則修改前繼節點的狀態waitStatus = SIGNAL(表示後繼在等待喚醒),而後阻塞線程。

若是頭節點的後繼成功獲取到鎖了,則頭節點能夠出隊了:

  1. 修改頭節點的指向到新節點(原頭節點的後繼);
  2. 新頭節點的前繼prev置爲null(新頭節點的前繼就是原頭節點)

爲了幫助GC回收原頭節點,把原頭結點的後繼也置爲null

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
複製代碼

接下來是釋放鎖的操做,從節點入隊的流程來看,釋放鎖時除了須要修改同步狀態status,還須要喚醒後繼節點。

release

整個實現主要涉及下面三個事情:

  • 修改同步狀態
  • 檢查是否有後繼節點須要喚醒
  • 喚醒後繼節點
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
複製代碼
unparkSuccessor

unparkSuccessor(h)

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}
複製代碼
相關文章
相關標籤/搜索