J.U.C|AQS獨佔式源碼分析

1、寫在前面


上篇文章經過ReentrantLock 的加鎖和釋放鎖過程給你們聊了聊AQS架構以及實現原理,具體參見《J.U.C|AQS的原理》node

理解了原理,咱們在來看看再來一步一步的聊聊其源碼是如何實現的。segmentfault

本章給你們聊聊AQS中獨佔式獲取和釋放共享狀態的流程,主要根據tryAcquire(int arg) -- > tryRelease(int arg)來說。安全

2、什麼是獨佔式


AQS的同步隊列提供兩種模式即獨佔式(EXCLUSIVE) 和 共享式(SHARED)。架構

本章咱們主要聊獨佔式: 即同一時刻只能有一個線程獲取同步狀態,其它獲取同步狀態失敗的線程則會加入到同步隊列中進行等待。函數

主要講解方法:源碼分析

  • tryAcquire(int):獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。
  • tryRelease(int):獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。

有對同步隊列不明白的請看《J.U.C|同步隊列(CLH)》ui

3、核心方法分析


3.1 共享狀態的獲取

acquire(int arg)this

獨佔式獲取同步狀態的頂級入口acquire(int arg)方法,若是線程獲取到共享狀態則直接返回, 不然把當前線程構形成獨佔式(node.EXCLUSIVE)模式節點並添加到同步隊列尾部,直到獲取到共享狀態爲止,整個過程忽略中斷。

方法源碼spa

public final void acquire(int arg) { 
        if (!tryAcquire(arg) && 
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
            selfInterrupt(); 
    } 
 }

方法函數:線程

  • tryAcquire(arg):嘗試獲取同步狀態、獲取成功則直接返回。
  • addWaiter(Node.EXCLUSIVE):當同步狀態獲取失敗時,構建一個獨佔式節點並將其加入到同步隊列的尾部。
  • acquireQueued(Node, arg)) : 獲取該節點指定數量的資源,經過自旋的方式直到獲取成功,返回是該節點線程的中斷狀態。
  • selfInterrupt(): 將中斷補上(因其獲取資源的整個過程是忽略中斷的因此最後手動將中斷補上)

    源碼分析

tryAcquire(arg)

protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

??? 什麼鬼? 直接拋出異常? AQS 中對共享狀態的獲取沒有提供具體的實現,等待子類根據本身的場景去實現。有沒有人疑惑,那爲何不是 abstract 的尼? 由於AQS不止是獨佔模式的鎖須要繼承它還有別人也須要繼承它,總不能讓別人也來實現一個無關的方法吧。

addWaiter(Node node)

private Node addWaiter(Node mode) {
// 以給定的模式來構建節點, mode有兩種模式 
//  共享式SHARED, 獨佔式EXCLUSIVE;
  Node node = new Node(Thread.currentThread(), mode);
    // 嘗試快速將該節點加入到隊列的尾部
    Node pred = tail;
     if (pred != null) {
        node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 若是快速加入失敗,則經過 anq方式入列
        enq(node);
        return node;
    }

addWaiter(Node mode) 方法嘗試將當前Node節點快速加入到隊列的尾部,若是快速加入失敗則經過enq(node)方法自旋加入。

enq(final Node node)

private Node enq(final Node node) {
// CAS自旋,直到加入隊尾成功        
for (;;) {
    Node t = tail;
        if (t == null) { // 若是隊列爲空,則必須先初始化CLH隊列,新建一個空節點標識做爲Hader節點,並將tail 指向它
            if (compareAndSetHead(new Node()))
                tail = head;
            } else {// 正常流程,加入隊列尾部
                node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                }
            }
        }
    }

enq(final Node node) 方法經過自旋的方式將當前Node節點加入到隊列尾部,直到成功爲止。

注: 在這無論是快速仍是自旋的方式將當前Node節點加入到隊列尾部都是經過compareAndSetTail(t, node) 來保證線程安全的,這也是典型實現無鎖化線程安全的方式,CAS自旋volatile變量。

acquireQueued(final Node, int arg)

final boolean acquireQueued(final Node node, int arg) {
    // 是否拿到資源
    boolean failed = true;
        try {
            // 標記等待過程當中是否被中斷過
            boolean interrupted = false;
            // 自旋
           for (;;) {
            // 獲取當前節點的前驅節點
           final Node p = node.predecessor();
           // 若是其前驅節點爲head 節點,說明此節點有資格去獲取資源了。(多是被前驅節點喚醒,也可能被interrupted了的)
          if (p == head && tryAcquire(arg)) {
            // 拿到資源後將本身設置爲head節點,
            setHead(node);
           // 將前驅節點 p.next = nul 在setHead(node); 中已經將node.prev = null 設置爲空了,方便GC回收前驅節點,也至關於出列。
          p.next = null; // help GC
         failed = false;
         return interrupted;
        }
    // 若是不符合上述條件,說明本身能夠休息了,進入waiting狀態,直到被unpark()
        if (shouldParkAfterFailedAcquire(p, node) &&
            parkAndCheckInterrupt())
         interrupted = true;
     } finally {
        if (failed)
            cancelAcquire(node);
     }
}

當前節點的線程在‘死循環’中嘗試獲取同步狀態,前提是隻有其前驅節點爲head節點時纔有嘗試獲取同步狀態的資格,不然繼續在同步隊列中等待被喚醒。

Why?

  • 由於只有head是成功獲取同步狀態的節點,而head節點的線程在釋放同步狀態的同時,會喚醒後繼節點,後繼節點在被喚醒後檢測本身的前驅節點是不是head節點,若是是則會經過自旋嘗試獲取同步狀態。
  • 維護CLH的FIFO原則。該方法中節點自旋獲取同步狀態。

以下圖

圖片描述

shouldParkAfterFailedAcquire(Node pred, Node node)

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 拿到前驅的狀態
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
           // 若是已經告訴過前驅節點,獲取到資源後通知本身下,那就能夠安心的去休息了。
            return true;
        if (ws > 0) {
           // 若是前驅節點放棄了,那就循環一直往前找,直到找到一個正常等待狀態的節點,排在他後面
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
        // 若是前驅狀態爲0 或者 PROPAGATE 狀態, 那就把前驅狀態設置成SIGNAL,告訴它獲取資源後通知下本身。
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

此方法檢測本身前驅節點是否時head節點,若是是則嘗試獲取同步狀態,不是則再次回到同步隊列中找到一個溫馨地方(也就是找到一個waitStatus > 0 的節點,排在他後面繼續等待)休息,並告訴前驅節點釋放同步狀態或者被中斷後通知本身下(compareAndSetWaitStatus(pred, ws, Node.SIGNAL))。

注意:在此查找一個溫馨區域休息(waitStatus > 0 的節點)時那些不符合條件的節點會造成了一個無效鏈,等待GC回收。

private final boolean parkAndCheckInterrupt() {
        // 調用park方法是線程進入waiting 狀態
        LockSupport.park(this);
        //若是被喚醒查看是否是被中斷狀態
        return Thread.interrupted();
    }

最後調用park方法使節點中線程進入wating狀態,等待被unpark()喚醒。

小結

  1. 請求線程首先調用tryAcquire(arg) 方法嘗試獲取同步狀態,成功則直接返回。
  2. 若是失敗:
  • 構造一個獨佔式節點Node.EXCLUSIVE
  • addWaiter(Node.EXCLUSIVE) 將該節點嘗試快速加入到隊列尾部,成功則直接返回該節點,失敗則調用enq(final Node node)方法利用自旋CAS將該節點加入到隊列尾部 。
  1. 調用acquireQueued(final Node, int arg) 方法找到一個溫馨的休息區,並通知前驅節點在釋放同步狀態或者被中斷後喚醒本身重新嘗試獲取同步狀態。
  2. 最後若是節點線程在等待時被中斷,則將中斷補上selfInterrupt()

到這獨佔式獲取共享狀態已經聊完了,下面咱們一塊兒來看看釋放共享狀態的過程。

3.2共享狀態的釋放

release(int arg)

獨佔式釋放共享資源的頂級入口release(int arg) 方法,完全釋放共享狀態(state = 0)並喚醒其後繼節點來獲取共享狀態。

方法源碼

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                // 喚醒head節點的後繼節點。
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

源碼分析
tryRelease(arg)

protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

tryRelease(int arg) 和 tryAcquire(arg) 語義基本相同,留給子類去實現。

unparkSuccessor(h)

private void unparkSuccessor(Node node) {
        // 獲取當前節點的等待狀態
        int ws = node.waitStatus;
        
        if (ws < 0)
            // 若是節點狀態小於0 (),將其狀態設置爲0
            compareAndSetWaitStatus(node, ws, 0);
         // 獲取其下一個須要喚醒的節點
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 若是下一個節點爲null,或者等待狀態大於0(被取消的狀態)繼續往下查找
            直到等待狀態小於等於0的節點
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            // 喚醒該節點等待線程
            LockSupport.unpark(s.thread);
    }

小結

  • 首先調用實現者的tryRelease(),失敗則返回false
  • 成功則找到下一個有效的節點並喚醒它。

到這獨佔式獲取同步和釋放同步狀態的源碼已經分析完了。 有沒有懵尼? 懵了也別怕最後咱們再來張流程圖幫助你們理解。
圖片描述
結合上面源碼分析,應該對AQS獨佔式獲取和釋放共享狀態的源碼有所瞭解了吧。

4、總結


分析了獨佔式同步狀態的獲取和釋放過程,適當作下總結: 在獲取同步狀態時,同步器維持一個同步隊列,獲取狀態失敗的線程都會加入到隊列中並在隊列中進行自旋,出列(或者中止自旋的)的條件是前驅節點爲頭節點且成功獲取了同步狀態。在釋放同步狀態時,同步器調用tryRelease(int arg)方法釋放同步狀態,而後喚醒頭節點的後繼節點。

相關文章
相關標籤/搜索