java 併發編程-AQS源碼分析

什麼是AQS

  • AQS全稱是 AbstractQueuedSynchronizer (抽象隊列同步器),是經過一個先進先出的隊列(存儲等待的線程)來實現同步器的一個框架是一個抽象類,是java.util.concurrent包下不少多線程工具類的實現基礎。Lock、CountDownLatch、Semaphore等都是基於AQS實現的。因此若是想研究Lock、CountDownLatch、Semaphore等基於AQS實現的類的源碼,明白AQS原理是很重要的一步。

AQS實現

  • AQS支持兩種鎖一種是獨佔鎖(獨佔模式),一種是共享鎖(共享模式)java

    • 獨佔鎖:好比像ReentrantLock就是一種獨佔鎖模式,多個線程去同時搶一個鎖,只有一個線程能搶到這個鎖,其餘線程就只能阻塞等待鎖被釋放後從新競爭鎖。
    • 共享鎖:好比像讀寫鎖裏面的讀鎖,一個鎖能夠同時被多個線程擁有(多個線程能夠同時擁有讀鎖),再好比Semaphore 設置一個資源數目(能夠理解爲一個鎖能同時被多少個線程擁有)。
  • ps:共享鎖跟獨佔鎖能夠同時存在,好比好比讀寫鎖,讀鎖寫鎖分別對應共享鎖和獨佔鎖node

  • 先來介紹一下AQS的幾個主要成員變量bash

//AQS等待隊列的頭結點,AQS的等待隊列是基於一個雙向鏈表來實現的,這個頭結點並不包含具體的線程是一個空結點(注意不是null)
private transient volatile Node head;
//AQS等待隊列的尾部結點
private transient volatile Node tail;
//AQS同步器狀態,也能夠說是鎖的狀態,注意volatile修飾證實這個變量狀態要對多線程可見
private volatile int state;
複製代碼
  • AQS的內部類Node
    • Node顧名思義就是接點的意思,前面說過AQS等待隊列是一個雙鏈表,每一個線程進入AQS的等待隊列的時候都會被包裝成一個Node節點
    static final class Node {
          //下面兩個屬性都是說明這個結點是共享模式仍是獨佔模式,具體怎麼表示下面會分析
          static final Node SHARED = new Node();
          static final Node EXCLUSIVE = null;
    
          //下面這四個屬性就是說明結點的狀態
        static final int CANCELLED =  1;//因爲超時或中斷,節點已被取消
        static final int SIGNAL    = -1;//表示下一個節點是經過park阻塞的,須要經過unpark喚醒
        static final int CONDITION = -2;//表示線程在等待條件變量(先獲取鎖,加入 
        到條件等待隊列,而後釋放鎖,等待條件變量知足條件;只有從新獲取鎖之 
        後才能返回)
      static final int PROPAGATE = -3;//表示後續結點會傳播喚醒的操做,共享模式下起做用
          
          volatile int waitStatus;
          //前驅結點(雙鏈表)
          volatile Node prev;
          //後繼結點(雙鏈表)
          volatile Node next;
         //  結點所包裝的線程
          volatile Thread thread;
        //對於Condtion表示下一個等待條件變量的節點;其它狀況下用於區分共享模式和獨佔模式
          Node nextWaiter;
    
          final boolean isShared() {
              return nextWaiter == SHARED;
          }
    
          //取得前驅結點
          final Node predecessor() throws NullPointerException {
              Node p = prev;
              if (p == null)
                  //null的時候拋出異常
                  throw new NullPointerException();
              else
                  return p;
          }
    
          Node() {  
          }
    
          Node(Thread thread, Node mode) {  
              this.nextWaiter = mode;
              this.thread = thread;
          }
    
          Node(Thread thread, int waitStatus) {
              this.waitStatus = waitStatus;
              this.thread = thread;
          }
      }
    複製代碼

獨佔模式具體分析

acquire方法分析

  • 首先是acquire方法
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
複製代碼
  • acquire方法簡要執行說明
    • 首先是tryAcquire方法會嘗試的獲取一下鎖,成功的話就直接證實獲取鎖成功,acquire方法執行完畢(注意tryAcquire方法須要實現的子類根據本身的須要實現怎麼搶鎖,AQS不實現能夠看到,只是拋出了異常,後面咱們分析一個ReentrantLock的tryAcquire給你們感覺下,暫時你只須要知道這個方法只是嘗試獲取鎖)
    protected boolean tryAcquire(int arg) {
          throw new UnsupportedOperationException();
      }
    複製代碼
    • tryAcquire方法搶鎖失敗就執行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
    • 先是addWaiter(Node.EXCLUSIVE)方法,這個方法表示將當前搶鎖線程包裝成結點並加入等待隊列,返回包裝後的結點
    • addWaiter方法返回的結點,做爲acquireQueued方法的參數,該方法主要是等待隊列順序獲取資源
    • 注意acquireQueued返回true表示線程發生中斷,這時就會執行selfInterrupt方法響應中斷。
    • 因爲tryAcquireAQS沒有具體實現,下面咱們就接着看下addWaiter這個方法

addWaiter方法分析

private Node addWaiter(Node mode) {
        //首先把當前競爭鎖的線程包裝成一個節點
        Node node = new Node(Thread.currentThread(), mode);
        //若是之前的尾結點不爲null(爲null表示當前結點就是等待隊列的第一個結點),
        就將當前結點設置爲尾結點,並經過cas操做更新tail尾新入隊的結點
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //cas更新tail失敗就以自旋的方式繼續嘗試入隊列
        enq(node);
        return node;
    }
複製代碼
  • 繼續往下看下enq方法
private Node enq(final Node node) {
      //死循環進行自旋操做
        for (;;) {
            Node t = tail;
            //這裏利用了延遲加載,尾結點爲空的時候生成tail結點,初始化
            if (t == null) { 
              //  隊列爲空的時候天然尾結點就等於頭結點,因此經過cas操做設置tail = head
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
               //尾結點初始化成功後就一直自旋的更新尾結點爲當前結點
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
複製代碼
  • 總結一下addWaiter方法仍是比較簡單的,主要是將當前競爭鎖的線程包裝成爲Node結點而後經過先嚐試失敗在自旋的方式加入到等待隊列的尾部,同時更新尾結點tail
  • addWaiter執行完畢以後就證實結點已經成功入隊,因此就要開始執行acquireQueued方法進行資源的獲取。

acquireQueued方法分析

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //獲取結點的前驅結點
                final Node p = node.predecessor();
                /**
                若是結點的前驅結點是head表示當前結點就是等待隊列的第一個,
                由於head結點並不指向一個實際的線程,因此這個時候就會執行下
                tryAcquire函數嘗試性的獲取下鎖。由於這個時候頗有可能競爭成功
                **/
                if (p == head && tryAcquire(arg)) {
                    /**
                     拿到鎖以後就更新頭結點爲當前結點(這個結點的線程已經拿到鎖了,
                     因此更新爲頭結點也不會繼續參與鎖競爭,再次提示頭結點不會參加競爭)
                    **/
                    setHead(node);
                    //  設置之前的頭結點不指向其餘結點,幫助gc
                    p.next = null; 
                    failed = false;
                    return interrupted;
                }
               /**
                上面前驅不是頭結點或者獲取鎖失敗就會執行shouldParkAfterFailedAcquire
                函數判斷是否應該掛起線程,注意只是判斷並不會執行掛起線程的操做,掛起線程的
                操做由後面的parkAndCheckInterrupt函數執行
               **/
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            //  當出現異常的時候會執行cancelAcquire方法,來取消當前結點並從等待隊列中清除出去
            if (failed)
                cancelAcquire(node);
        }
    }
複製代碼
  • 來一塊兒看下shouldParkAfterFailedAcquire函數
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //獲取下前驅結點的waitStatus,這個決定着是否要對後續結點進行掛起操做
        int ws = pred.waitStatus;
        /**
        若是ws的waitStatus=-1時,證實他的後繼結點已經被park阻塞了後面到了競爭的時候會unpark喚醒後繼結        
        點,因此若是結點的前驅結點waitStatus是-1,shouldParkAfterFailedAcquire就會判斷須要park當前線程 
        因此返回true。
        **/
        if (ws == Node.SIGNAL)
            return true;
      //ws>0證實前驅結點已經被取消因此須要往前找到沒有被取消的結點ws>0
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /**
            前驅結點生成時,ws=0,因此若是是第一次執行shouldParkAfterFailedAcquire函數就會發現前驅結點
            的ws = 0就會由於須要阻塞後面的結點設置爲-1。
            **/
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        /**
        注意把前驅結點ws設置爲-1以後會雖然返回false,不掛起當前線程,注意只是這一次循環不掛起,由於
        acquireQueued函數是一個死循環,因此到下一個循環若是前驅結點不是head而且tryAcquire競爭鎖失敗還 
        是會執行shouldParkAfterFailedAcquire方法,這個時候前驅結點已經爲-1,因此就會直接返回true
       **/
        return false;
    }
複製代碼
  • parkAndCheckInterrupt方法主要是park阻塞線程並在unpark的時候返回中斷狀態
private final boolean parkAndCheckInterrupt() {
        //park掛起線程
        LockSupport.park(this);
        /**
        線程被unpark喚醒的時候會檢查終端狀態並返回,這個終端狀態會在acquireQueued方法中最後返回,
        因此acquireQueued函數並不響應中斷而是返回中斷狀態由外層函數處理。
        **/
        return Thread.interrupted();
    }
複製代碼
  • cancelAcquire函數
private void cancelAcquire(Node node) {
        if (node == null)
            return;
        node.thread = null;
        Node pred = node.prev;
      //循環往前找到沒有被取消的結點,直到找到正常狀態的結點
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        Node predNext = pred.next;
        //由於要取消當前結點因此修改當前結點得ws爲CANCELLED
        node.waitStatus = Node.CANCELLED;
        //若是node爲尾結點就修改尾結點並將尾結點得next設爲null
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            //若是不是尾結點
            /**
              知足下面三個條件,將pred的next指向node的下一節點
              1.pred不是head節點:若是pred爲頭節點,
              而node又被cancel,則node.next爲等待隊列中的第  一個節點,須要unpark喚醒
              2.pred節點狀態爲SIGNAL或能更新爲SIGNAL
              3.pred的thread變量不能爲null
            **/
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                // 
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                //若是pred爲頭節點,則喚醒node的後節點,注意unparkSuccessor方法爲喚醒當前結點得下一個結點
                unparkSuccessor(node);
            }
            node.next = node; // help GC
        }
    }
複製代碼
  • cancleAcquire函數主要是取消當前結點,將當前結點從等待隊列中移出多線程

  • 同時遍歷前面的結點將被取消的結點從隊列中清除出去框架

  • unparkSuccessor 方法函數

private void unparkSuccessor(Node node) {
        
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
      //若是下一個結點爲null或者ws爲取消狀態就未開始遍歷找到正常狀態的結點
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            //  經過LockSupport.unpark()方法喚醒阻塞的線程,注意被阻塞的線程從哪開始繼續執行
            LockSupport.unpark(s.thread);
    }
複製代碼
  • 簡單回顧下整個過程
    • 首先會經過addWaiter方法來將第一次競爭失敗的線程包裝成Node結點自旋的方式加入到等待隊列
    • 加入到等待隊列以後就會該結點就會運行acquireQueued方法開始同等待隊列的其餘結點一塊兒獲取鎖
    • 先是判斷該結點是否是第一個實際的等待的結點(不是head結點,head結點是空節點),若是是就用tryAcquire方法嘗試獲取鎖,成功就更新head結點。
    • 若是上面的操做失敗,就會判斷該線程是否須要被掛起(當前驅結點的ws爲signal就會被掛起),而後就掛起該線程,當被喚醒以後就繼續重複上面的步驟獲取鎖
    • 當獲取到鎖之後就會有一個釋放鎖,釋放鎖的方法主要是release方法

release方法

public final boolean release(int arg) {
        //首先是執行tryRelease()方法,主要如何去釋放獲取到的鎖,這個方法須要子類本身去實現
        if (tryRelease(arg)) {
            //釋放成功之後若是發現等待隊列還有在等待獲取鎖的Node就用unparkSuccessor喚醒頭結點的下一個結點
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
    //tryRelease方法調用失敗會返回false
        return false;
    }
複製代碼
  • release方法邏輯仍是比較簡單的,釋放掉獲取的鎖以後喚醒等待隊列的後續Node。

  • 到這裏咱們獨佔模式下的AQS獲取鎖的過程就分析完了。基本上了解了獨佔模式得鎖競爭過程,在看共享模式下的鎖競爭過程就比較簡單了,看的時候注意對比着看。

共享模式具體分析

  • 這裏我建議你們在看下文章前面我講的共享模式與獨佔模式得區別

首先是acquireShared方法(對應獨佔模式得acquire方法)

//注意方法會忽略中斷,沒有selfInterrupt這個方法來響應中斷
 public final void acquireShared(int arg) {
    /**
      這個tryAcquireShared方法對應獨佔模式的tryAcquire方法,也是須要子類本身去實現的。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。<0就表示獲取失敗就進doAcquireShared方法來開始進入等待隊列等待獲取資源
   **/
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
複製代碼

doAcquireShared方法

private void doAcquireShared(int arg) {
        //  構造一個共享模式得結點加入到等待隊列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    //若是當前結點是除了head得隊列中的第一個結點那麼就嘗試獲取資源
                    int r = tryAcquireShared(arg);
                    //tryAcquireShared返回值>=0資源獲取成功,就開始進行更新結點操做
                    if (r >= 0) {
                        //這裏注意下獨佔模式調用的是setHead方法,可是共享模式調用的是setHeadAndPropagate方法
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        //這裏注意下,這裏響應中斷,跟獨佔模式不一樣
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                } 
        //  獲取資源失敗就開始繼續判斷是否須要掛起線程,與獨佔模式得相同
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            //  出現異常就取消結點
            if (failed)
                cancelAcquire(node);
        }
    }
複製代碼
  • 能夠看到共享模式下的鎖競爭同非共享模式下的步驟大致上相同
  • tryAcquireShared不一樣的是他會返回三種狀態,負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。由於前面說過共享模式下資源是有不少的(或者說是有多個鎖),容許最多由對應數量的線程持有相應的資源,一個線程持有一個資源量就-1直到0。
  • 還有就是共享模式下的setHeadAndPropagate方法,下面一塊兒來看下

setHeadAndPropagate方法

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; 
        //首先更新head結點
        setHead(node);
        /** 
        注意propagate表示的上次執行tryAcquireShared    方法後的返回值。>0表示還有剩餘資源,既然有剩餘資  源就繼續喚醒後面等待獲取資源的而且是共享模式得  Node
        或者h == null
        或者當前獲取到資源的得結點<0,signal須要喚醒後續結點
        **/
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
               //喚醒後續共享模式得Node結點
                doReleaseShared();
        }
    }
複製代碼
  • setHeadAndPropagate方法主要乾了兩件事工具

      1. 更新頭結點
    • 2.檢查是否須要喚醒後續結點,知足上面說的三個條件
  • 看下doReleaseShared方法ui

private void doReleaseShared() {
        for (;;) {
            Node h = head;
            //若是頭結點不爲空,而且不是tail,隊列還有結點
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //若是head結點ws爲signal就更新爲0並喚醒後續結點
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                
            }
            if (h == head)                   
                break;
        }
    }
複製代碼
  • 能夠看到doReleaseShared方法主要就是喚醒阻塞隊列中等待的結點
  • 這裏引入另一篇博文的分析(地址:www.jianshu.com/p/c244abd58…
    • 從上面的分析能夠知道,獨佔模式和共享模式的最大區別在於獨佔模式只容許一個線程持有資源,而共享模式下,當調用doAcquireShared時,會看後續的節點是不是共享模式,若是是,會經過unpark喚醒後續節點; 從前面的分析能夠知道,被喚醒的節點是被堵塞在doAcquireShared的parkAndCheckInterrupt方法,所以喚醒以後,會再次調用setHeadAndPropagate,從而將等待共享鎖的線程都喚醒,也就是說會將喚醒傳播下去;this

    • 加入同步隊列並阻塞的節點,它的前驅節點只會是SIGNAL,表示前驅節點釋放鎖時,後繼節點會被喚醒。shouldParkAfterFailedAcquire()方法保證了這點,若是前驅節點不是SIGNAL,它會把它修改爲SIGNAL。 形成前驅節點是PROPAGATE的狀況是前驅節點得到鎖時,會喚醒一次後繼節點,但這時候後繼節點尚未加入到同步隊列,因此暫時把節點狀態設置爲PROPAGATE,當後繼節點加入同步隊列後,會把PROPAGATE設置爲SIGNAL,這樣前驅節點釋放鎖時會再次doReleaseShared,這時候它的狀態已是SIGNAL了,就能夠喚醒後續節點了。(補充下,想一下若是不考慮,沒有後繼結點的時候直接講ws置爲signal,那麼每次doReleaseShared執行的以後就直接unparkSuccessor喚醒後繼結點那麼就沒意義,由於沒有後繼結點。因此在沒有後繼節點的時候ws = 0,那麼就先ws置爲PROPAGATE,反正後繼結點加入的時候shouldParkAfterFailedAcquire會將前面的結點的ws置爲signal)spa

    • 舉例說明:例如讀寫鎖,寫讀操做和寫寫操做互斥,讀讀之間不互斥;當調用acquireShared獲取讀鎖時,會檢查後續節點是不是獲取讀鎖,若是是,則一樣釋放;

總結

  • 看完整篇博文以後各位應該就對aqs實現有必定了解了,第一次看的時候獲取會很懵逼,但其實多看幾遍慢慢就懂了,看的時候注意線程在何時阻塞,以及被喚醒後從哪裏開始執行
  • 第二個就是注意共享模式和獨佔模式的區別。另外建議各位看下基於aqs框架實現的鎖的tryAcquire方法以及tryRelease方法,好比reentrantLock。源碼比較簡單就是搶到資源該怎麼處理,釋放資源該怎麼處理。
相關文章
相關標籤/搜索