Java高併發編程基礎之AQS

引言

曾經有一道比較比較經典的面試題「你可以說說java的併發包下面有哪些常見的類?」大多數人應該均可以說出
CountDownLatch、CyclicBarrier、Sempahore多線程併發三大利器。這三大利器都是經過AbstractQueuedSynchronizer抽象類(下面簡寫AQS)來實現的,因此學習三大利器以前咱們有必要先來學習下AQShtml

AQS是一種提供了原子式管理同步狀態、阻塞和喚醒線程功能以及隊列模型的簡單框架java

AQS結構

說到同步咱們如何來保證同步?你們第一印象確定是加鎖了,說到鎖的話你們確定首先會想到的是Synchronized。
Synchronized你們應該基本上都會使用,加鎖和釋放鎖都是jvm 來幫咱們實現的,咱們只須要簡單的加個 Synchronized關鍵字就能夠了。
用起來超級方便。可是有沒有一種狀況咱們設置一個鎖的超時時間Synchronized就有點實現不了,這時候咱們就能夠用ReentrantLock來實現,ReentrantLock是經過aqs來實現的,今天咱們就經過ReentrantLock來學習一下aqs。node

CAS && 公平鎖和非公平鎖

AQS裏面用到了大量的CAS學習AQS以前咱們仍是有必要簡單的先了解下CAS、公平鎖和非公平鎖。面試

CAS
  • CAS 全稱是 compare and swap,是一種用於在多線程環境下實現同步功能的機制。CAS 操做包含三個操做數 -- 內存位置、預期數值和新值。CAS 的實現邏輯是將內存位置處的數值與預期數值想比較,若相等,則將內存位置處的值替換爲新值。若不相等,則不作任何操做,這個操做是個原子性操做,java裏面的AtomicInteger等類都是經過cas來實現的。
    公平鎖和非公平鎖
  • 公平鎖:多個線程按照申請鎖的順序去得到鎖,線程會直接進入隊列去排隊,隊列中第一個才能得到到鎖。
    優勢:等待鎖的線程不會餓死,每一個線程均可以獲取到鎖。
    缺點:總體吞吐效率相對非公平鎖要低,等待隊列中除第一個線程之外的全部線程都會阻塞,CPU喚醒阻塞線程的開銷比非公平鎖大。
  • 非公平鎖:多個線程去獲取鎖的時候,會直接去嘗試獲取,獲取不到,再去進入等待隊列,若是能獲取到,就直接獲取到鎖。
    優勢:能夠減小CPU喚醒線程的開銷,總體的吞吐效率會高點,CPU也沒必要取喚醒全部線程,會減小喚起線程的數量。
    缺點:處於等待隊列中的線程可能會餓死,或者等好久纔會得到鎖。
    文字有點拗口,咱們來個實際的例子說明下。好比咱們去食堂就餐的時候都要排隊,你們都按照先來後到的順序排隊打飯,這就是公平鎖。若是等到你準備拿盤子打飯的時候
    直接蹦出了一個五大三粗的胖子插隊到你前面,你看打不贏他只能忍氣吞聲讓他插隊,等胖子打完飯了又來個小個子也來插你隊,這時候你無法忍了,直接大吼一聲讓他滾,這個
    小個子只能屁顛屁顛到隊尾去排隊了這就是非公平鎖。
    咱們先來看看AQS有哪些屬性
    // 頭結點
    private transient volatile Node head;

// 阻塞的尾節點,每一個新的節點進來,都插入到最後,也就造成了一個鏈表
private transient volatile Node tail;編程

// 這個是最重要的,表明當前鎖的狀態,0表明沒有被佔用,大於 0 表明有線程持有當前鎖
// 這個值能夠大於 1,是由於鎖能夠重入,每次重入都加上 1
private volatile int state;多線程

// 表明當前持有獨佔鎖的線程,舉個最重要的使用例子,由於鎖能夠重入
// reentrantLock.lock()能夠嵌套調用屢次,因此每次用這個來判斷當前線程是否已經擁有了鎖
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread; //繼承自AbstractOwnableSynchronizer併發

下面咱們來寫一個demo分析下lock 加鎖和釋放鎖的過程
```java
   final void lock() {
            // 上來先試試直接把狀態置位1,若是此時沒人獲取鎖就直接
            if (compareAndSetState(0, 1))
                 // 爭搶成功則修改得到鎖狀態的線程
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

cas嘗試失敗,說明已經有人再持有鎖,因此進入acquire方法app

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

tryAcquire方法,看名字大概能猜出什麼意思,就是試一試。
tryAcquire其實是調用了父類Sync的nonfairTryAcquire方法框架

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
             // 獲取下當前鎖的狀態
            int c = getState();
            // 這個if 邏輯跟前面一進來就獲取鎖的邏輯同樣都是經過cas嘗試獲取下鎖
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 進入這個判斷說明 鎖重入了 狀態須要進行+1
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                 // 若是鎖的重入次數大於int的最大值,直接就拋出異常了,正常狀況應該不存在這種狀況,不過jdk仍是嚴謹的
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            // 返回false 說明嘗試獲取鎖失敗了,失敗了就要進行acquireQueued方法了
            return false;
        }

tryAcquire方法若是獲取鎖失敗了,那麼確定就要排隊等待獲取鎖。排隊的線程須要待在哪裏等待獲取鎖?這個就跟咱們線程池執行任務同樣,線程池把任務都封裝成一個work,而後當線程處理任務不過來的時候,就把任務放到隊列裏面。AQS一樣也是相似的,把排隊等待獲取鎖的線程封裝成一個NODE。而後再把NODE放入到一個隊列裏面。隊列以下所示,不過須要注意一點head是不存NODE的。
在這裏插入圖片描述jvm

接下來咱們繼續分析源碼,看下獲取鎖失敗是如何被加入隊列的。
就要執行acquireQueued方法,執行acquireQueued方法以前須要先執行addWaiter方法

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            // cas 加入隊列隊尾
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 尾結點不爲空 || cas 加入尾結點失敗
        enq(node);
        return node;
    }

enq

接下來再看看enq方法

// 經過自旋和CAS必定要當前node加入隊尾
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            // 尾結點爲空說明隊列仍是空的,尚未被初始化,因此初始化頭結點,能夠看到頭結點的node 是沒有綁定線程的也就是不存數據的
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

經過addWaiter方法已經把獲取鎖的線程經過封裝成一個NODE加入對列。上述方法的一個執行流程圖以下:
在這裏插入圖片描述
,接下來就是繼續執行acquireQueued方法

acquireQueued

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                 // 經過自旋去獲取鎖 前驅節點==head的時候去嘗試獲取鎖,這個方法在前面已經分析過了。
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
               // 進入這個if說明node的前驅節點不等於head 或者嘗試獲取鎖失敗了
               // 判斷是否須要掛起當前線程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
               // 異常狀況進入cancelAcquire,在jdk11的時候這個源碼直接是catch (Throwable e){ cancelAcquire(node);} 簡單明瞭
            if (failed)
                cancelAcquire(node);
        }
    }

setHead

這個方法每當有一個node獲取到鎖了,就把當前node節點設置爲頭節點,能夠簡單的看作當前節點獲取到鎖了就把當前節點」移除「(變爲頭結點)隊列。

shouldParkAfterFailedAcquire

說到這個方法咱們就要先看下NODE可能會有哪些狀態在源碼裏面咱們能夠看到總共會有四種狀態

  • CANCELLED:值爲1,在同步隊列中等待的線程等待超時或被中斷,須要從同步隊列中取消該Node的結點,其結點的waitStatus爲CANCELLED,即結束狀態,進入該狀態後的結點將不會再變化。
  • SIGNAL:值爲-1,被標識爲該等待喚醒狀態的後繼結點,當其前繼結點的線程釋放了同步鎖或被取消,將會通知該後繼結點的線程執行。說白了,就是處於喚醒狀態,只要前繼結點釋放鎖,就會通知標識爲SIGNAL狀態的後繼結點的線程執行。
  • CONDITION:值爲-2,與Condition相關,該標識的結點處於等待隊列中,結點的線程等待在Condition上,當其餘線程調用了Condition的signal()方法後,CONDITION狀態的結點將從等待隊列轉移到同步隊列中,等待獲取同步鎖。
  • PROPAGATE:值爲-3,與共享模式相關,在共享模式中,該狀態標識結點的線程處於可運行狀態。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        // 前驅節點狀態 若是這個狀態爲-1 則返回true,把當前線程掛起
        if (ws == Node.SIGNAL)
            return true;
        // 大於0,說明狀態爲CANCELLED 
        if (ws > 0) {
            do {
               // 刪除被取消的node(讓被取消的node成爲一個沒有引用的node等着下次GC被回收)
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 進入這裏只能是 0,-2,-3。NODE節點初始化的時候waitStatus默認值是0,因此只有這裏纔有修改waitStatus的地方
            // 經過cas 把前驅節點的狀態設置爲-1,而後返回false ,外面調用這個方法的是個循環,又會調用一次這個方法
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

parkAndCheckInterrupt

掛起當前線程,而且阻塞

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // 掛起當前線程,阻塞
    return Thread.interrupted();
}

在這裏插入圖片描述

解鎖

加鎖成功了,那鎖用完了就應該釋放鎖了,釋放鎖重點看下unparkSuccessor這個方法就行了

private void unparkSuccessor(Node node) {
          // 頭結點狀態
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        // s==null head的successor節點獲取鎖成功後,執行了head.next=null的操做後,解鎖線程讀取了head.next,所以s==null
        // head的successor節點被取消(cancelAcquire)時,執行了以下操做:successor.waitStatus=1 ; successor.next = successor;
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 從尾節點開始往前找,找到最前面的非取消的節點 這裏沒有break 哦
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
             // 喚醒線程 ,喚醒的線程會從acquireQueued去獲取鎖
            LockSupport.unpark(s.thread);
    }

釋放鎖代碼比較簡單,基本都寫在代碼註釋裏面了,流程以下:
在這裏插入圖片描述
這段代碼裏面有一個比較經典的面試題:
若是頭結點的下一個節點爲空或者頭結點的下一個節點的狀態爲取消的時候爲何要從後往前找,找到最前面非取消的節點?

  • node.prev = pred; compareAndSetTail(pred, node) 這兩個地方能夠看做Tail入隊的原子操做,可是此時pred.next = node;還沒執行,若是這個時候執行了unparkSuccessor方法,就沒辦法從前日後找了,因此須要從後往前找。
  • 在產生CANCELLED狀態節點的時候,先斷開的是Next指針,Prev指針並未斷開,所以也是必需要從後往前遍歷纔可以遍歷徹底部的Node

    總結

  • reentrantLock的獲取鎖和釋放鎖基本就講完了,裏面還涉及多比較多的細節,感興趣的同窗能夠對着源碼一行一行去debug試試。
  • 適當的瞭解aqs才能更好的學習CountDownLatch、CyclicBarrier、Sempahore,由於這三個利器都是基於aqs來實現的。

    結束

  • 因爲本身才疏學淺,不免會有紕漏,假如你發現了錯誤的地方,還望留言給我指出來,我會對其加以修正。
  • 若是你以爲文章還不錯,你的轉發、分享、讚揚、點贊、留言就是對我最大的鼓勵。
  • 感謝您的閱讀,十分歡迎並感謝您的關注。
    Java高併發編程基礎之AQS
    站在巨人的肩膀上摘蘋果:
    https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html
    https://javadoop.com/post/AbstractQueuedSynchronizer
    https://www.cnblogs.com/yanlong300/p/10953185.html
相關文章
相關標籤/搜索