一文帶你學會AQS和併發工具類的關係

1. 存在的意義

  AQS(AbstractQueuedSynchronizer)是JAVA中衆多鎖以及併發工具的基礎,其底層採用樂觀鎖,大量使用了CAS操做, 而且在衝突時,採用自旋方式重試,以實現輕量級和高效地獲取鎖。java

  提供一個框架,用於實現依賴於先進先出(FIFO)等待隊列的阻塞鎖和相關的同步器(semaphore等)。 此類旨在爲大多數依賴單個原子int值表示狀態的同步器提供有用的基礎。 子類必須定義更改此狀態的受保護方法,並定義該狀態對於獲取或釋放此對象而言意味着什麼。 鑑於這些,此類中的其餘方法將執行全部排隊和阻塞機制。 子類能夠維護其餘狀態字段,可是相對於同步,僅跟蹤使用方法getState,setState和compareAndSetState操做的原子更新的int值。node

  此類支持默認獨佔模式和共享模式之一或二者。 當以獨佔方式進行獲取時,其餘線程嘗試進行的獲取將沒法成功。 由多個線程獲取的共享模式可能(但不必定)成功。 該類不「理解」這些差別,當共享模式獲取成功時,下一個等待線程(若是存在)還必須肯定它是否也能夠獲取。 在不一樣模式下等待的線程共享相同的FIFO隊列。 一般,實現子類僅支持這些模式之一,但例如能夠在ReadWriteLock發揮做用。 僅支持獨佔模式或僅支持共享模式的子類無需定義支持未使用模式的方法。安全


2. 核心知識點

2.1 state

private volatile int state; // 同步狀態

  state是整個工具的核心,一般整個工具都是在設置和修改狀態,不少方法的操做都依賴於當前狀態是什麼。因爲狀態是全局共享的,通常會被設置成volatile類型,爲了保證其修改的可見性;數據結構

2.2 CLH隊列

  AQS中的隊列是CLH變體的虛擬雙向隊列(FIFO),AQS是經過將每條請求共享資源的線程封裝成一個節點來實現鎖的分配。隊列採用的是悲觀鎖的思想,表示當前所等待的資源,狀態或者條件短期內可能沒法知足。所以,它會將當前線程包裝成某種類型的數據結構,扔到一個等待隊列中,當必定條件知足後,再從等待隊列中取出。多線程

2.3 CAS操做

  CAS操做是最輕量的併發處理,一般咱們對於狀態的修改都會用到CAS操做,由於狀態可能被多個線程同時修改,CAS操做保證了同一個時刻,只有一個線程能修改爲功,從而保證了線程安全。CAS採用的是樂觀鎖的思想,所以經常伴隨着自旋,若是發現當前沒法成功地執行CAS,則不斷重試,直到成功爲止。併發


3. 核心實現原理

3.1 做爲同步器的基礎

  要將此類用做同步器的基礎,請使用getState,setState或compareAndSetState檢查或修改同步狀態,以從新定義如下方法(如適用):框架

  1. tryAcquire

獨佔方式,arg爲獲取鎖的次數,嘗試獲取資源,成功則返回True,失敗則返回False。函數

  1. tryRelease

獨佔方式,arg爲釋放鎖的次數,嘗試釋放資源,成功則返回True,失敗則返回False。工具

  1. tryAcquireShared

共享方式,arg爲獲取鎖的次數,嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。ui

  1. tryReleaseShared

共享方式,arg爲釋放鎖的次數,嘗試釋放資源,若是釋放後容許喚醒後續等待結點返回True,不然返回False。

  1. isHeldExclusively

該線程是否正在獨佔資源。只有用到Condition才須要去實現它。

  默認狀況下,這些方法中的每個都會引起UnsupportedOperationException 。 這些方法的實現必須在內部是線程安全的,而且一般應簡短且不阻塞。 定義這些方法是使用此類的惟一受支持的方法。 全部其餘方法都被聲明爲final方法,由於它們不能獨立變化。

3.2 同步狀態state

  AQS中維護了一個名爲state的字段,意爲同步狀態,是由volatile修飾的,用於展現當前臨界資源的獲鎖狀況。

private volatile int state;

下面提供了幾個訪問這個state字段的方法:
返回同步狀態的當前值。 此操做具備volatile讀取的內存語義

protected final int getState() {
    return state;
}

設置同步狀態的值。 此操做具備volatile寫操做的內存語義。

protected final void setState(int newState) {
    state = newState;
}

  若是當前狀態值等於指望值,則以原子方式將同步狀態設置爲給定的更新值。 此操做具備volatile讀寫的內存語義

protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

  這幾個方法都是Final修飾的,說明子類中沒法重寫它們。咱們能夠經過修改State字段表示的同步狀態來實現多線程的獨佔模式和共享模式
state的值即表示了鎖的狀態,state爲0表示鎖沒有被佔用,state大於0表示當前已經有線程持有該鎖,這裏之因此說大於0而不說等於1是由於可能存在可重入的狀況。你能夠把state變量當作是當前持有該鎖的線程數量。

public abstract class AbstractOwnableSynchronizer
    protected AbstractOwnableSynchronizer() {
      
    private transient Thread exclusiveOwnerThread;
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

exclusiveOwnerThread 屬性的值即爲當前持有鎖的線程獨佔模式獲取鎖流程:

圖片

共享模式獲取鎖流程:

圖片

3.3 數據結構

  1. AQS中最基本的數據結構是Node,Node即爲CLH變體隊列中的節點。
static final class Node {
    // 表示線程以共享的模式等待鎖
    static final Node SHARED = new Node();
    // 表示線程正在以獨佔的方式等待鎖
    static final Node EXCLUSIVE = null;
    // 爲1,表示線程獲取鎖的請求已經取消了
    static final int CANCELLED =  1;
    // 爲-1,表示線程已經準備好了,就等資源釋放了
    static final int SIGNAL    = -1;
    // 爲-2,表示節點在等待隊列中,節點線程等待喚醒
    static final int CONDITION = -2;
    // 爲-3,當前線程處在SHARED狀況下,該字段纔會使用
    static final int PROPAGATE = -3;
    // 當前節點在隊列中的狀態
    volatile int waitStatus;
    // 前驅節點
    volatile Node prev;
    // 後續節點
    volatile Node next;
    // 當前節點的線程
    volatile Thread thread;
    // 指向下一個處於CONDITION狀態的節點
    Node nextWaiter;
    ...
}
  1. AQS中CLH變體的虛擬雙向隊列(FIFO),AQS是經過將每條請求共享資源的線程封裝成一個節點來實現鎖的分配。
// 隊列頭節點
private transient volatile Node head;
// 隊列尾節點
private transient volatile Node tail;

圖片

  在AQS中的隊列是一個FIFO隊列,它的head節點永遠是一個虛擬結點(dummy node), 它不表明任何線程,所以head所指向的Node的thread屬性永遠是null。可是咱們不會在構建過程當中建立它們,由於若是沒有爭用,這將是浪費時間。 而是構造節點,並在第一次爭用時設置頭和尾指針。只有從次頭節點日後的全部節點才表明了全部等待鎖的線程。也就是說,在當前線程沒有搶到鎖被包裝成Node扔到隊列中時,即便隊列是空的,它也會排在第二個,咱們會在它的前面新建一個虛擬節點。

4. 獲取鎖實現

4.1 ReentrantLock 獨佔鎖內部結構

構造函數源代碼

// 默認建立非公平鎖
public ReentrantLock() {
    sync = new NonfairSync();
}
// 經過傳值爲true來進行建立公平鎖
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

ReentrantLock 裏面有三個內部類:

  1. 一個是抽象的 Sync 實現了 AbstractQueuedSynchronizer
  2. NonfairSync 繼承了 Sync
  3. FairSync 繼承了 Sync

圖片

4.2 非公平鎖的實現

ReentrantLock 種獲取鎖的方法

public void lock() {
    sync.lock();
}

ReentrantLock 的非公平鎖實現

static final class NonfairSync extends Sync {
    final void lock() {
    // 若是設置state的值從0變爲1成功
        if (compareAndSetState(0, 1))
        // 則將當前線程設置爲獨佔線程
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

compareAndSetState(0,1)

protected final boolean compareAndSetState(int expect, int update) {
    // 經過unsafe.compareAndSwapInt方法來進行設置值
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

stateOffset 爲AQS種維護的state屬性的偏移量
圖片

setExclusiveOwnerThread(Thread.currentThread());

protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}

acquire(1); 調用的是AQS 中的acquire(int arg) 方法

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire(arg) 該方法是protected的由子類去具體實現的
圖片

圖片

  咱們須要看的是NonfairSync中實現的tryAcquire方法,裏面又調用了nonfairTryAcquire方法,再進去看看

static final class NonfairSync extends Sync {
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

nonfairTryAcquire(int acquires) 方法實現

final boolean nonfairTryAcquire(int acquires) {
    // 獲取當前線程
    final Thread current = Thread.currentThread();
    // 獲取當前state的值
    int c = getState(); 
    if (c == 0) {
      // 看看設置值是否能成功
            if (compareAndSetState(0, acquires)) {
           // 則將當前線程設置爲獨佔線程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
 // 返回由setExclusiveOwnerThread設置的最後一個線程;若是從不設置,則返回null 
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 設置state的值
        setState(nextc);
        return true;
    }
    return false;
}

acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 方法實現,先看裏addWaiter(Node.EXCLUSIVE)方法注意:Node.EXCLUSIVE 此時是空值,因此mode 就是空的,因此此時建立的Node節點中的nextWaiter是空值。

static final class Node {
  Node(Thread thread, Node mode) {     // Used by addWaiter
    this.nextWaiter = mode;
    this.thread = thread;
  }
}
private Node addWaiter(Node mode) {
    // 建立一個新的節點
    Node node = new Node(Thread.currentThread(), mode);
    // 將當前CLH隊列的尾部節點賦予給 pred
    Node pred = tail;
    if (pred != null) { // 若是尾節點不爲空
        node.prev = pred; // 將當前node節點的前驅節點指向CLH隊列的尾部節點
        if (compareAndSetTail(pred, node)) { // CAS設置值
            pred.next = node; // CLH隊列的尾部節點的後繼節點指向新的node節點
            return node;
        }
    }
    enq(node);
    return node;
}

若是CLH隊列的尾部節點爲空值的話,執行enq(node)方法

// 經過CAS方式設置隊列的頭節點
private final boolean compareAndSetHead(Node update) {
    return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
// 經過CAS方式設置隊列的尾部節點
private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
// 節點入隊操做
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 若是尾部節點爲空值
        if (t == null) {
        // 則進行初始化一個節點,將其設置爲頭節點
            if (compareAndSetHead(new Node()))
                tail = head; //頭尾指向同一個空節點
        } else {
      // 若是尾部節點不爲空,那麼將傳進來的入隊節點的前驅節點指向當前隊列的尾部節點
            node.prev = t;
      // 將當前傳進來的入隊節點設置爲當前隊列的尾部節點
            if (compareAndSetTail(t, node)) { 
          // 將當前隊列的尾部節點的後繼節點指向傳進來的新節點
                 t.next = node;
                return t;
            }
        }
    }
}

查看 acquireQueued 方法實現

// 獲取當前節點的前驅節點
final Node predecessor() throws NullPointerException {
    Node p = prev;
    if (p == null)
        throw new NullPointerException();
    else
        return p;
}
// 檢查並更新沒法獲取的節點的狀態。 若是線程應阻塞,則返回true
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//SIGNAL這個狀態就有點意思了,它不是表徵當前節點的狀態,而是當前節點的下一個節點 //的狀態。當一個節點的waitStatus被置爲SIGNAL,就說明它的下一個節點(即它的後繼 // 節點)已經被掛起了(或者立刻就要被掛起了),所以在當前節點釋放了鎖或者放棄獲取 // 鎖時,若是它的waitStatus屬性爲SIGNAL,它還要完成一個額外的操做——喚醒它的後繼節點。
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
     // 當前節點的 ws > 0, 則爲 Node.CANCELLED 說明前驅節點                                        // 已經取消了等待鎖(因爲超時或者中斷等緣由)
        // 既然前驅節點不等了, 那就繼續往前找, 直到找到一個還在等待鎖的節點
        // 而後咱們跨過這些不等待鎖的節點, 直接排在等待鎖的節點的後面 
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
     // 前驅節點的狀態既不是SIGNAL,也不是CANCELLED
        // 用CAS設置前驅節點的ws爲 Node.SIGNAL,給本身定一個鬧鐘
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
// 中止的便捷方法,而後檢查是否中斷
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
// 取消正在進行的獲取嘗試
private void cancelAcquire(Node node) {
    if (node == null)
        return;
    node.thread = null;
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    Node predNext = pred.next;
    node.waitStatus = Node.CANCELLED;
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }
        node.next = node;
    }
}
// 能執行到該方法, 說明addWaiter 方法已經成功將包裝了當前Thread的節點添加到了等待隊列的隊尾
// 該方法中將再次嘗試去獲取鎖
// 在再次嘗試獲取鎖失敗後, 判斷是否須要把當前線程掛起
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
         // 獲取當前節點的前驅節點
            final Node p = node.predecessor();
         // 在前驅節點就是head節點的時候,繼續嘗試獲取鎖
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 將當前線程掛起,使CPU再也不調度它
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed) 
            cancelAcquire(node);
    }
}

  爲何前面獲取鎖失敗了, 這裏還要再次嘗試獲取鎖呢?
首先, 這裏再次嘗試獲取鎖是基於必定的條件的,即:當前節點的前驅節點就是HEAD節點,由於咱們知道,head節點就是個虛擬節點,它不表明任何線程,或者表明了持有鎖的線程,若是當前節點的前驅節點就是head節點,那就說明當前節點已是排在整個等待隊列最前面的了。

setHead(node); 方法

// 這個方法將head指向傳進來的node,而且將node的thread和prev屬性置爲null
private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

  能夠看出,這個方法的本質是丟棄原來的head,將head指向已經得到了鎖的node。可是接着又將該node的thread屬性置爲null了,這某種意義上致使了這個新的head節點又成爲了一個虛擬節點,它不表明任何線程。爲何要這樣作呢,由於在tryAcquire調用成功後,exclusiveOwnerThread屬性就已經記錄了當前獲取鎖的線程了,此處沒有必要再記錄。這某種程度上就是將當前線程從等待隊列裏面拿出來了,是一個變相的出隊操做。
shouldParkAfterFailedAcquire(Node pred, Node node)方法

  1. 若是爲前驅節點的waitStatus值爲 Node.SIGNAL 則直接返回 true
  2. 若是爲前驅節點的waitStatus值爲 Node.CANCELLED (ws > 0), 則跳過那些節點, 從新尋找正常等待中的前驅節點,而後排在它後面,返回false
  3. 其餘狀況, 將前驅節點的狀態改成 Node.SIGNAL, 返回false

acquireQueued方法中的Finally代碼

private void cancelAcquire(Node node) {
  // 將無效節點過濾
    if (node == null)
        return;
  // 設置該節點不關聯任何線程,也就是虛節點
    node.thread = null;
    Node pred = node.prev;
  // 經過前驅節點,跳過取消狀態的node
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
  // 獲取過濾後的前驅節點的後繼節點
    Node predNext = pred.next;
  // 把當前node的狀態設置爲CANCELLED
    node.waitStatus = Node.CANCELLED;
  // 若是當前節點是尾節點,將從後往前的第一個非取消狀態的節點設置爲尾節點
  // 更新失敗的話,則進入else,若是更新成功,將tail的後繼節點設置爲null
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
    // 若是當前節點不是head的後繼節點,
    // 1.判斷當前節點前驅節點的是否爲SIGNAL
    // 2.若是不是,則把前驅節點設置爲SINGAL看是否成功
    // 若是1和2中有一個爲true,再判斷當前節點的線程是否爲null
    // 若是上述條件都知足,把當前節點的前驅節點的後繼指針指向當前節點的後繼節點
        if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
   // 若是當前節點是head的後繼節點,或者上述條件不知足,那就喚醒當前節點的後繼節點
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

4.3 非公平鎖獲取流程圖

非公平鎖獲取鎖成功的流程圖

圖片

非公平鎖獲取鎖失敗的流程圖

圖片

5.釋放鎖實現

5.1釋放鎖代碼分析

  嘗試釋放此鎖。若是當前線程是此鎖的持有者,則保留計數將減小。 若是保持計數如今爲零,則釋放鎖定。 若是當前線程不是此鎖的持有者,則拋出IllegalMonitorStateException。

## ReentrantLock
public void unlock() {
    sync.release(1);
}

sync.release(1) 調用的是AbstractQueuedSynchronizer中的release方法

## AbstractQueuedSynchronizer
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

分析tryRelease(arg)方法
圖片

tryRelease(arg)該方法調用的是ReentrantLock中

protected final boolean tryRelease(int releases) {
// 獲取當前鎖持有的線程數量和須要釋放的值進行相減
    int c = getState() - releases; 
    // 若是當前線程不是鎖佔有的線程拋出異常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 若是此時c = 0就意味着state = 0,當前鎖沒有被任意線程佔有
    // 將當前所的佔有線程設置爲空
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 設置state的值爲 0
    setState(c);
    return free;
}

  若是頭節點不爲空,而且waitStatus != 0,喚醒後續節點若是存在的話。
這裏的判斷條件爲何是h != null && h.waitStatus != 0?

  由於h == null的話,Head還沒初始化。初始狀況下,head == null,第一個節點入隊,Head會被初始化一個虛擬節點。因此說,這裏若是還沒來得及入隊,就會出現head == null 的狀況。

  1. h != null && waitStatus == 0 代表後繼節點對應的線程仍在運行中,不須要喚醒
  2. h != null && waitStatus < 0 代表後繼節點可能被阻塞了,須要喚醒
private void unparkSuccessor(Node node) {
// 獲取頭結點waitStatus
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
// 獲取當前節點的下一個節點
    Node s = node.next;
//若是下個節點是null或者下個節點被cancelled,就找到隊列最開始的非cancelled的節點
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 就從尾部節點開始找往前遍歷,找到隊列中第一個waitStatus<0的節點。
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
  // 若是當前節點的下個節點不爲空,並且狀態<=0,就把當前節點喚醒
    if (s != null)
        LockSupport.unpark(s.thread);
}

  爲何要從後往前找第一個非Cancelled的節點呢?
看一下addWaiter方法

private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

  咱們從這裏能夠看到,節點入隊並非原子操做,也就是說,node.prev = pred, compareAndSetTail(pred, node) 這兩個地方能夠看做Tail入隊的原子操做,可是此時pred.next = node;還沒執行,若是這個時候執行了unparkSuccessor方法,就沒辦法從前日後找了,因此須要從後往前找。還有一點緣由,在產生CANCELLED狀態節點的時候,先斷開的是Next指針,Prev指針並未斷開,所以也是必需要從後往前遍歷纔可以遍歷徹底部的Node
因此,若是是從前日後找,因爲極端狀況下入隊的非原子操做和CANCELLED節點產生過程當中斷開Next指針的操做,可能會致使沒法遍歷全部的節點。因此,喚醒對應的線程後,對應的線程就會繼續往下執行。

5.2 釋放鎖流程圖

圖片

6.注意

  因爲篇幅較長公平鎖的實如今下一篇的博客中講述,謝謝你們的關注和支持!有問題但願你們指出,共同進步!!!

相關文章
相關標籤/搜索