經過一個生活中的案例場景,揭開併發包底層AQS的神祕面紗

本文導讀

  • 生活中案例場景介紹
  • 聯想到 AQS 究竟是什麼
  • AQS 的設計初衷
  • 揭祕 AQS 底層實現
  • 最後的總結

當你在學習某一個技能的時候,是否曾有過這樣的感受,就是同一個技能點學完了以後,過了一段時間,若是你沒有任何總結,或者是不常常回顧,遺忘的速度是很是之快的。node

忘記了以後,而後再從新學,由於已經間隔了一段時間,再次學習又當作了新的知識點來學。這種狀態如此反覆,浪費了相同的時間,但學習效果卻收效甚微。面試

每當遇到這種狀況,咱們能夠停下來,思考一下。對於某一個技術知識點理解起來不是那麼好懂的時候,或者是學習起來有點吃力的時候,我們能夠嘗試找找生活中的例子來聯想下。算法

由於技術源於生活。編程

找到一個合適的生活案例,而後結合你本身作筆記總結和動手實踐的過程。按期的去回顧一下,慢慢的就會理解的更加透徹。c#

一、生活中案例場景介紹

今天咱們就舉一個生活中的例子來理解下併發底層的AQS。設計模式

你們若是去過某些大醫院的話,就能知道,因爲互聯網的快速發展,醫院的掛號、交費、取藥的流程都是比較方便的,交費也可使用支付寶、微信支付了,而不用帶現金了。安全

醫生開完單子,交費完成 ,單子上都會有一個長條二維碼,能夠直接在取藥的地方自助掃碼,叫號系統自動分配取藥窗口,而後你在關注下指定窗口等待着叫號就能夠了,叫到你的時候再過去取藥,而不須要一直在等待着。微信

咱們用一張圖來直觀的感覺下:數據結構

file

這裏面涉及到了幾個角色:多線程

1)藥房,提供取藥窗口的,內部有自助取藥機或人工取藥

2)取藥叫號系統,當用戶掃碼藥單後,自動錄入到該系統中

3)取藥用戶

接下來我們細化下取藥流程。

當取藥用戶在自助機器上掃碼時,能夠直觀的看下下面的流程圖:

取藥流程圖1

第一個用戶是程序猿,由於有多個自助掃碼機,他一看二維碼就知道咋回事了,因此第一個在自助機上掃碼完成,能夠優先第一個去取藥窗口(State窗口)。

此時叫號系統的藥單隊列中尚未其餘人,程序猿掃碼後,就能夠直接去窗口等待着取藥了。

接下來,原本是張大爺和王大媽看着先前程序猿的操做,也跟着在自助機上來回掃碼一把,因爲不大懂掃哪裏,掃了半天也沒有個反應,老頭此時有點懵 : (。

後來熱心的程序猿看到了,給指點了一下 : ),幫助順利的掃碼完成。

再看下面這個流程圖:

取藥流程圖2

正好,張大爺和王大媽的取藥單,也被分配到跟程序猿同一個取藥窗口中 ,此時只能排隊了,按照他們的掃碼順序排隊,如上圖所示。

當程序猿取藥完成,叫號系統會自動呼叫下一位用戶,即隊列中的排在首節點的張大爺,自助取藥機收到消息會自動給張大爺取藥。此時,王大媽仍是要等一會。後面的用戶 CCC 掃碼完成後,會繼續放到藥單隊列中,藥單隊列是按照 FIFO,也就是誰先掃碼誰就在前面,因此 CCC 排在王大媽的後面。

再看下面的流程圖:

取藥流程圖3

張大爺還在等待取藥過程當中,王大媽也知道下一個可能就是她了,因此王大媽會時不時的,擡頭看看叫號窗口是否顯示了本身的名字。
此時,王大媽能夠稍微在等待區休息一會,等待系統叫號就能夠了。

二、聯想到 AQS 究竟是什麼

其實,上面的場景介紹中,在醫院裏是很常見的。那麼這個場景對應的,咱們能夠聯想到 Java 中的併發編程。

若是沒有中間的叫號系統來作控制,若是醫院沒有限制,不少用戶要麼蜂擁而上沒有秩序的亂擠,要麼就有秩序的都在窗口站着排成長隊等待着。

因此中間的叫號系統解決了不少問題,解決了不少取藥用戶的有序性、安全性,並且不須要用戶一直等着,用戶線程無阻塞,當收到系統通知信號後,用戶再繼續執行取藥動做。

這個生活中的例子,能夠很好的聯想到 Java 中咱們經常使用的,併發包的底層技術:AQS (AbstractQueuedSynchronizer)隊列同步器(簡稱同步器)。

就像咱們舉得例子中的提到的幾個角色,有不少用戶(理解爲用戶線程),有共享資源(取藥窗口)。在用戶線程和共享資源之間,是經過中間系統來協調控制的,這裏面就會涉及的概念。

是用來控制多個線程訪問共享資源的方式。一個鎖能防止多個線程對共享資源的同時訪問,有些鎖也容許多個線程併發訪問共享資源,好比讀寫鎖。

在 Java 中常用的鎖是 synchronized,synchronized 會隱式的得到鎖,但它必須是先得到鎖再釋放鎖。這種方式簡化了同步的管理,但擴展性不如 Lock 顯示的得到鎖和釋放鎖更加靈活。

synchronized 和 Lock 鎖之間的區別:

synchronized和Lock鎖區別

從性能上來說,當併發量高、競爭激烈的場景下,Lock 鎖會較 synchronized 性能上表現的
更穩定些。反之,當併發量不高的狀況下,synchronized 有分級鎖的優點,所以二者性能差很少,synchronized 相對來講使用上更加簡單,不用考慮手工釋放鎖。

直觀感覺下二者的性能對比:

性能對比

Lock 顯示的鎖使用,由於使用上更加靈活,這得益於其底層基礎同步框架的實現機制,它就是 AQS。

以下圖所示:

多線程訪問共享資源

上述圖中列出了多個併發包中的類,每個併發工具類解決的問題場景不一樣,可是其底層同步框架基本都是使用的 AQS 來實現的。

三、AQS 的設計初衷

Java 大佬考慮併發底層使用 AQS 的設計思想初衷,就是爲了可以抽象出來統一的同步協調處理器,設計好頂層結構,做爲併發包構建的基本骨架,該骨架裏封裝了多線程的入隊/出隊、線程阻塞/喚醒等一系列複雜的操做。Java SDK 中面向開發者針對不一樣需求場景提供了多個併發包工具。

儘管,提供的這些併發包的實現方式是不同的,但都是基於頂層抽象出來的 AQS 所定義的統一接口基礎上,而後部分定製邏輯延遲到子類去自行實現。同時,部分定義的方法中是按照既定的順序執行的,由此,咱們也可以想到,AQS 使用了模板方法模式。

在上一節圖中提到的幾個併發包中,咱們來簡單介紹下實現場景。

多線程獨佔式併發工具:

1)ReentrantLock

可重入鎖,同一時刻僅容許一個線程訪問,因此能夠稱做 獨佔鎖,線程能夠重複獲取同一把鎖。

多線程共享式併發工具:

1)ReentrantReadWriteLock

可重入的讀寫鎖,容許多個讀線程同時進行,但不容許寫-讀、寫-寫線程同時訪問。

適用於讀多寫少的場景下。

2)CountDownLatch

主要用來解決一個線程等待 N 個線程的場景。

就像短跑運動員比賽,等到全部運動員所有都跑完纔算競賽結束。

3)CycliBarrier

主要用於 N 個線程之間互相等待。

就像幾個驢友約好登山,要等待全部驢友都到齊後才能統一出發。

4)Semaphore

限流場景使用,限定最多容許N個線程能夠訪問某些資源。

就像車輛行駛到路口,必需要看紅綠燈指示,要等到綠燈才能通行。

基於上述這些併發包工具,咱們能夠根據多線程的不一樣使用場景去選擇。JDK 提供的這些併發包基本可以知足了大部分的開發者的使用需求。

四、揭祕 AQS 底層實現

在用戶取藥的這個例子中,咱們能夠把多個用戶掃碼取藥行爲,聯想爲多線程共用爭搶一個窗口的鎖,窗口就做爲共享資源來看待。因此,哪一個用戶先掃碼,這個用戶就優先有機會能提早取藥。

對應聯想到 AQS 內部結構,以下圖所示:

AQS內部結構模擬圖

咱們根據用戶取藥的流程,對應畫出來的一個 AQS 底層的大體結構圖。通過舉例分析,多個用戶(線程)掃碼取藥會爭搶一把鎖(同一個取藥窗口,共享資源),因此用 Java 併發包裏的 ReentrantLock 鎖的使用來描繪一下也更加貼切,由於 ReentrantLock 是一個獨佔鎖,同一個時刻只容許一個用戶執行。

結構圖中的 AQS 裏,包含了幾個關鍵的屬性:

  • state 變量:表示同步狀態
  • exclusiveOwnerThread 變量:表示當前加鎖的線程
  • Node:CLH 隊列,是一個 FIFO 的雙端雙向鏈表隊列

啥是CLH?在 AQS 源碼中你能找到一段話,The wait queue is a variant of a "CLH" (Craig, Landin, and Hagersten) lock queue,看上去像是三我的的名字,他們來發明的自旋算法,沒具體查資料。

AQS 隊列同步器主要包括:

  • 獨佔式同步狀態獲取和釋放,如:ReentrantLock
  • 共享式同步狀態獲取和釋放,如:Semaphore、CountDownLatch、CycliBarrier

接下來,咱們就用獨佔式 ReentrantLock 可重入鎖來分析下 AQS 底層到底了作了哪些事情。

使用 ReentrantLock 顯示加鎖解鎖代碼很簡單,以下所示:

Lock lock = new ReentrantLock();
lock.lock();

// doSomething...

lock.unlock();

先來一張類圖:

類圖

列出了 Lock 接口和 ReentrantLock 實現類裏的核心方法,其中 ReentrantLock 裏的有個很是核心的屬性是 Sync ,它纔是最最關鍵的組件,繼承了 AbstractQueuedSynchronizer 抽象類,做爲子類實現了加鎖和解鎖。

再看一張全景類圖:

file

這張類圖中列出了 ReentrantLock 類裏的 Sync 及其兩個子類 FairSync 公平鎖 和 NonfairSync 非公平鎖的核心方法,AQS 類裏的核心屬性和方法。

AQS 中的 Node同步隊列關鍵屬性介紹:

waitStatus 等待狀態:

CANCELLED:值爲1,等待的線程等待超時或被中斷,需從同步隊列中取消等待,節點進入該狀態不在變化。

SIGNAL:值爲 -1,後繼節點的狀態處於等待狀態,而當前節點線程若是釋放了同步狀態或被取消,將會通知後繼節點,使得後繼節點的線程得以運行。

CONDITION:值爲 -2,節點在等待隊列中,節點線程等待在 Condition 上,當其餘線程對 Condition 調用了 signal() 方法後,該節點將會從等待隊列轉移到同步隊列中,獲取同步狀態。

PROPAGATE:值爲 -3,表示下一次共享式同步狀態獲取將會無條件的被傳播下去。

INITAL:值爲 0,初始狀態,當你建立新的節點時,默認就是這個狀態值。

雙向雙端隊列:

在 AQS 結構圖中已經有所描述,Node 是一個雙端雙向鏈表的隊列,雙端表示有 head (頭節點)和 tail(尾節點)。

雙向鏈表表示有 prev (指向前驅節點)和 next (指向後繼節點)兩個指針來標識 ,在上述 AbstractQueuedSynchronizer.Node 類圖中也可以看獲得。
此外,Node 中還有 thread 屬性表示當前的線程。

介紹完了類圖中的關鍵屬性和數據結構,咱們來分析下,ReentrantLock 對象調用了 lock() 方法加鎖的過程。

找到 ReentrantLock 類裏的 lock() 方法以下:

public void lock() {
        sync.lock();
}

看到沒,sync 變量就是 Sync 剛提到的 AQS 的子類,調用了 sync 的 lock() 方法。

當咱們點擊進去 sync#lock() 方法時,發現是個抽象方法,能夠找到兩個實現類,以下所示:

lock方法

此時,若是不常常看源碼的同窗,可能有點懵,究竟是走那個方法?一種方式,你能夠在 NonfairSync 和 FairSync 兩個類的 lock() 方法上都打上斷點,直接調試看究竟是哪一個類;另外,你能夠猜想下,這個實現類應該是在對象初始化時建立的,因此你就直接去找構造方法。

public ReentrantLock() {
        sync = new NonfairSync();
}

咱們是經過默認構造方法建立的 ReentrantLock,跟進去看到的是建立的 NonfairSync,即默認建立的是非公平鎖方式。

看下 NonfairSync#lock() 方法實現:

final void lock() {
    if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
    else
            acquire(1);
}

有兩條執行路徑:

1)直接經過 compareAndSetState(0, 1) 方法,使用了 CAS 能夠無鎖化的保證一個數值修改的原子性,判斷下若是 state 變量是 0,說明沒有線程加鎖,能夠把 state 設置爲 1。設置成功後,

調用 setExclusiveOwnerThread(Thread.currentThread()) 方法,將當前線程設置爲加鎖線程,即將 exclusiveOwnerThread 變量賦值爲當前線程。

protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

compareAndSetState(int expect, int update) 底層調用了 UnSafe 類的 compareAndSwapInt(this, stateOffset, expect, update) 方法,該方法爲 JDK 內部使用的API,進行的是指針操做,基於 CPU 指令實現的原子性的 CAS。

圖示以下:

線程1得到鎖

2)若是 state 變量不是 0,說明有線程已經加鎖了,compareAndSetState(0, 1) 方法返回 false,執行 acquire(1) 方法。

當咱們點擊 acquire(1) 方法後,就進入到了 AbstractQueuedSynchronizer 類裏面了。

acquire(int arg) 方法源碼:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}

其餘線程加入同步隊列,圖示以下:

線程加入AQS同步隊列

上述代碼完成以下幾個步驟:

1)首先調用 tryAcquire(int arg) 方法,保證線程安全的獲取同步狀態,若是同步狀態獲取失敗,進入步驟2)。

2)調用 addWaiter(Node node) 方法,參數爲構建的獨佔式 Node.EXCLUSIVE 節點,將構建好的節點經過 CAS 無鎖化方式添加到同步隊列的尾部,並返回該節點。

3)最後調用 acquireQueued(Node node, int arg) 方法,使得該節點按「死循環」方式獲取同步狀態。若是節點獲取不到同步狀態,則會調用 LockSupport#park() 方法掛起,阻塞節點中的線程,被阻塞的線程等待喚醒,喚醒方式主要是前驅節點出隊或被中斷來實現的。

下面結合源碼具體剖析下上述的幾個步驟。

當調用 tryAcquire(int arg) 方法,注意 AQS 裏的 方法是這樣的:

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

1)tryAcquire(int arg) 嘗試獲取同步狀態分析:

這就是 AQS 提供的模板方法,因爲子類自定義同步器去實現的。

因此,會跳轉到 NonfairSync 裏的 tryAcquire(int arg) 方法:

protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
}

內部調用了 nonfairTryAcquire(int acquires) 方法,該方法是 Sync 父類的,以下所示:

final boolean nonfairTryAcquire(int acquires) {
  // 獲取當前線程
    final Thread current = Thread.currentThread();
  //  獲取同步狀態
    int c = getState();
    // 若是同步狀態是0,沒人加鎖
    if (c == 0) {
      // 經過CAS方式設置同步狀態,嘗試將0修改成1
        if (compareAndSetState(0, acquires)) {
                // 設置當前加鎖的線程,給exclusiveOwnerThread變量賦值
                setExclusiveOwnerThread(current);
                return true;
        }
    }
    // 當前線程等於當前加鎖線程
    else if (current == getExclusiveOwnerThread()) {
        // 計算新的同步狀態值,nextc = 1 + 1 = 2
        int nextc = c + acquires;
        // 判斷下nextc,防止溢出
        if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
        // 更新同步狀態值
        setState(nextc);
        return true;
    }
    return false;
}

即便是多線程訪問,同一時刻老是僅有一個線程可以得到同步狀態,就會走上述的 c == 0 裏的邏輯。

若是是在同一個線程中進行了第二次調用 ReentrantLock#lock() 和 unlock() 方法呢?此時 c = 1,因此會走到 current == getExclusiveOwnerThread() 判斷當前線程是等於加鎖線程的,那麼就會計算 nextc 新的同步狀態 ,若是該值不會溢出,則調用 setState(int newState) 更新同步狀態值,state 同步狀態值變爲 2。

** 2)addWaiter(Node node) 添加到同步隊列分析:**

addWaiter(Node node) 主要是將節點加入到同步隊列隊尾,源碼以下所示:

private Node addWaiter(Node mode) {
  // mode傳進來的參數爲Node.EXCLUSIVE,構建Node節點
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    // 尾節點不爲空
    if (pred != null) {
            node.prev = pred;
            // 1. 將當前節點做爲尾節點添加到同步隊列
            // 2. 原尾節點做爲當前節點的前驅節點
            if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
            }
    }
    // 同步隊列爲空,調用enq(node)方法
    enq(node);
    return node;
}

繼續看 enq(Node node) 方法源碼:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 第一次循環,尾節點爲空
        if (t == null) { // Must initialize
                // 建立空Node節點做爲Head頭節點
                if (compareAndSetHead(new Node()))
                        tail = head;
        } else {
             // 第二次循環過來,只有一個節點,就是頭結點
                node.prev = t;
                // 將當前節點做爲尾節點添加到同步隊列中
                if (compareAndSetTail(t, node)) {
                     // 當前節點做爲頭結點的後繼節點
                        t.next = node;
                        return t;
                }
        }
    }
}

也是使用了 CAS 無鎖化保證節點,能夠正確的添加到同步隊列中。

第一次循環,尾節點爲空,調用了 compareAndSetHead(new Node()) 方法,底層調用了 unsafe.compareAndSwapObject(this, headOffset, null, update) 若是 head 變量所在位置爲 null,則更新爲空 Node 節點。

第二次循環,尾節點不空,調用了 compareAndSetTail(t, node) 方法,底層調用了 unsafe.compareAndSwapObject(this, tailOffset, expect, update) ,此時 tail 變量所在位置爲空 Node 節點,更新爲當前節點,即 Node.EXCLUSIVE 獨佔式節點。

** 3)acquireQueued(Node node, int arg) 得到同步狀態分析:**

節點加入到同步隊列後,就進入到了自旋的過程,每一個節點都在不斷的觀察,是否能夠得到同步狀態,成功得到同步狀態,就會從這個自旋過程當中退出。以下所示是自旋過程的實現代碼。

acquireQueued() 方法源碼以下:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
                // 得到當前節點的前驅節點
                final Node p = node.predecessor();
                // 若是p是頭節點,則嘗試得到同步狀態
                if (p == head && tryAcquire(arg)) {
                        // 成功得到同步狀態,把本身做爲Head頭節點
                        setHead(node);
                        // 原頭節點從同步隊列移除,不須要CAS操做
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                }
                // 1. 若是不是頭節點,失敗得到同步狀態,判斷下是否能夠掛起
                // 2. 容許掛起 ,調用 LockSupport#park() 方法完成線程掛起,釋放鎖
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
        }
    } finally {
        if (failed)
                cancelAcquire(node);
    }
}

當前線程掛起過程,先調用 shouldParkAfterFailedAcquire(Node pred, Node node) 方法,以下所示:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // 前驅節點狀態爲SIGNAL,返回true
    if (ws == Node.SIGNAL)
            return true;
    if (ws > 0) {
        // 跳過 CACALLED 狀態的節點
            do {
                    node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
    } else {
            // 前驅節點的狀態小於0,則更新爲SIGNAL狀態
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

圖示以下:

AQS同步隊列節點自旋過程

線程1首先得到了同步狀態,線程二、線程3發現 AQS 類裏的 state 不爲 0,因此都被添加到 AQS 的同步隊列尾部。

此時,同步隊列中的線程2和線程3的節點會進行自旋過程,線程2的前驅節點是頭節點,知足這個條件,而後調用 tryAcquire(int arg) 方法嘗試得到同步狀態。

當線程1業務處理完成,須要釋放同步狀態,是的後續節點線程可以得到同步狀態。示例中會使用 ReentrantLock#unlock() 方法來解鎖。

繼續來分析 unlock() 方法,以下代碼所示:

public void unlock() {
    sync.release(1);
}

在 unlock() 方法中,調用的 Sync 類的 release(int arg),進入到該方法中。

public final boolean release(int arg) {
  // 釋放同步狀態
    if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                // 喚醒後繼節點
                    unparkSuccessor(h);
            return true;
    }
    return false;
}

這個 release(int arg) 是在 AQS 類裏的了,其內部會調用 tryRelease(int arg) 方法嘗試釋放同步狀態,若是成功釋放,得到同步隊列裏的 head 頭節點,頭節點不爲空而且它的 waitStatus 狀態不爲 0(即不爲 INITAL 初始狀態),則會調用 unparkSuccessor(Node node) 喚醒後續節點。

當直接點擊進入 tryRelease(int arg) 方法,仍是在 AQS 類裏,以下所示:

protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
}

AQS 類的該方法並無提供實現,跟 tryAcquire(int arg) 方法相似的,會由 Sync子類裏的 tryRelease(int arg) 重寫該方法實現,以下所示:

protected final boolean tryRelease(int releases) {
    // 得到同步狀態爲1,releases爲1,因此c計算獲得0
    int c = getState() - releases;
    // 當前線程不是加鎖線程,則拋出IllegalMonitorStateException異常
    if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
            free = true;
            // 將加鎖線程變量設置爲null
            setExclusiveOwnerThread(null);
    }
    // 將state變量更新爲計算獲得的0,即更新同步狀態
    setState(c);
    return free;
}

若是釋放同步狀態成功,上述方法將會返回 true。完成的事情很簡單,就是將 state 變量的同步狀態更新一下,而後將加鎖線程 exclusiveOwnerThread 變量設置爲 null。

而後,調用 unparkSuccessor(Node node) 方法通知後繼節點,源碼以下:

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
  // 等待狀態小於0,則經過CAS更新等待狀態值爲0
    if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
  // 得到頭節點的後繼節點,即線程2
    Node s = node.next;
    // 若是後繼節點等待狀態大於0,說明是CACELLED失效節點
    if (s == null || s.waitStatus > 0) {
            s = null;
            // 同步隊列從尾向頭遍歷,獲得一個正常節點
            for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                            s = t;
    }
    if (s != null)
        // 喚醒後續節點
            LockSupport.unpark(s.thread);
}

圖示以下:

線程1釋放鎖

經過圖示並結合源碼,相信你們理解起來就更加清晰了。

注意,線程1釋放同步狀態後,會通知 後繼節點是線程2,不是 Head 頭節點。

上述圖中,同步隊列中的線程2被喚醒後,咱們回到 acquireQueued(final Node node, int arg) 這個節點自旋過程的源碼看下。能夠在上面找一下這個方法的源碼,其中線程2調用了 parkAndCheckInterrupt() 方法將線程掛起着,以下所示:

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
}

喚醒以後,繼續執行,調用 Thread.interrupted() 方法檢測下當前線程中斷狀況。若是沒有被中斷,則繼續循環,執行以下代碼:

final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return interrupted;
}

node 變量爲線程2,調用 p = node.predecessor() 方法得到前驅節點爲頭節點,知足 p == head 條件,而後調用 tryAcquire(int arg) 嘗試得到同步狀態,通過上述分析,由於 state 爲 0,說明沒有線程加鎖,因此得到同步狀態成功,該方法返回 true。

調用 setHead(node) 方法,以下所示:

private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
}

將 node 做爲頭節點,node 的 prev 前驅節點指針和 thread 線程變量設置爲 null。

圖示以下所示:

AQS同步隊列節點喚醒

上述圖中,看到原來的頭節點,已經沒有任何引用了,未來會被 JVM 垃圾回收掉。

剛剛被喚醒的線程2當作了頭節點,但實際也是個空節點了, 由於該節點的 thread 設置爲 null了。此時,線程3的節點還在自旋狀態,等線程2釋放鎖後,通知後繼節點,喚醒線程3。都會執行咱們上面分析的同一個套路。

最後,通過對上述源碼和圖示的分析,我們來兩張完整的流程圖,方便你們記憶。

ReentrantLock#lock() 方法得到鎖流程圖:

ReentrantLock#lock()得到鎖流程圖

ReentrantLock#unlock() 方法釋放鎖流程圖:

ReentrantLock#unlock()釋放鎖流程圖

五、最後的總結

本文以生活案例場景(醫院窗口取藥流程)介紹爲例,聯想到 AQS 究竟是什麼,接着介紹對 AQS 設計初衷, 而且以 ReentrantLock 獨佔式鎖爲例,深刻剖析了 AQS 底層數據結構,以及源碼的實現細節。

AQS 是 Java 併發包中不少同步組件的構建基石,它內部主要是由同步狀態 state 變量和一個 CLH 同步 FIFO 隊列協做來完成的,CLH是一個雙端雙向鏈表數據結構。

當新的線程節點沒法得到同步狀態,將會加入到同步隊列隊尾,此時會採用 CAS 無鎖化來確保該操做的線程安全,保證原子性。線程加入到同步隊列後會被掛起,等待釋放鎖喚醒後繼節點,使得繼續得到同步狀態。

AQS 採用了模板方法設計模式,根據不一樣併發包組件同步需求場景,子類同步器只需重寫 tryAcquire(),tryAcquireShared(),tryRelease(),tryReleaseShared() 幾個方法來決定同步狀態的獲取和釋放,tryAcquire() 和 tryRelease() 方法同於獨佔式,tryAcquireShared() 和 tryReleaseShared() 用於共享式。

對於 Java 中不少併發包背後複雜的入隊/出隊,線程阻塞/喚醒,線程安全的保證等,所有都由 AQS 來幫助你完成了,Doug Lea 大神非常牛逼呀!

弄懂了 AQS,大部分併發包裏的工具類都是很容易理解了。另外,對於共享式併發包的源碼,你們若是感興趣,能夠藉助本文的源碼分析過程,去自行畫圖分析一下。

但願本文給你們能帶來一點點的幫助!可以抵擋得住面試官的N個連環炮式發問。

歡迎關注個人公衆號,掃二維碼關注得到更多精彩文章,與你一同成長~
Java愛好者社區

相關文章
相關標籤/搜索