5、聊聊併發- 聊聊AQS獨佔鎖

前言

AQS全稱是(Abstract Queued Synchronizer),單從名字能夠翻譯爲抽象隊列同步器,它是構建J.U.C(java.util.concurrent)包下併發工具類的基礎框架,AQS除了提供了可中斷鎖(等待中斷),超時鎖、獨佔鎖、共享鎖等等功能以外,又在這些基礎的功能上進行擴展,衍生除了其餘的一些工具類,這些工具類基本上能夠知足咱們實際應用中對於鎖的各類需求。java

在我沒有看過AQS源碼以前,感受它的實現和synchronized原理是同樣的,感受都是經過對象鎖來實現併發訪問控制,但事實上它僅僅是一個普通的工具類,就比如咱們平時開發過程當中寫的utils類同樣,AQS的實現沒有像synchronized關鍵字同樣,利用高級的機器指令和內存模型的規則,它沒有利用高級機器指令,也沒有利用JDK編譯時的特殊處理,僅僅是一個普通的類,就實現了併發的控制。這是令我很是有興趣想去深刻的探索和學習它的設計思想和實現原理。node

咱們爲何要研究AQS的實現呢?由於synchronized和J.U.C包下的工具類是咱們併發編程中常用到的,J.U.C包下的大部分工具類都是基於AQS進行的擴展實現,想要掌握和了解J.U.C包下工具類的實現原理,瞭解AQS的實現是必不可少的。web

爲何有Synchronized還要設計AQS呢?

既然JVM已經提供了像synchronized、volatile這樣的關鍵字,已經能夠解決併發中的三個問題,也能夠解決線程執行順序的問題,那爲何還要去創造一個AQS框架,重複造造輪子的意義又在哪裏?編程

其實咱們在使用synchronized的工程中設計模式

一個框架或者技術的出現確定是爲了解決某些問題,那功能和性能是否能成爲重複造輪子的理由呢?那AQS同步框架的出現是爲了解決synchronized沒有辦法知足的使用場景,咱們來看一下AQS提供的功能特色。併發

  1. 等待中斷。synchronized不能夠被中斷,指的是synchronized等待不可中斷,一旦進入阻塞狀態,就沒法被中斷。只能經過調用的方法拋出InterruptedException異常,那麼它就能夠被中斷,不拋出InterruptedException的方法是不可中斷的。
  2. 鎖超時。AQS支持超時鎖,能夠指定一個時間,若是指定時間內沒有獲取鎖,就直接退出獲取鎖。
  3. 非阻塞。嘗試獲取鎖失敗,並不進入阻塞狀態,而是直接返回,那這個線程也有機會釋放曾經持有的鎖。

上述所說的幾個特色,都是synchronized這個關鍵字不不具有的特色,AQS除了知足synchronized的全部功能以外呢,又基於實現了擴展讀寫鎖(ReadWriteLock)、信號量(Semaphore)、柵欄(CyclicBarrier)等額外的功能鎖,極大的提升的使用場景和靈活性。那咱們接下就一塊兒看看AQS的詳細實現。框架

AQS的設計思想

咱們進入到AQS的源碼中能夠看到AQS是一個抽象類,可是咱們發現AQS中並無一個抽象方法,這是由於AQS是被設計來支持多種用途的,它是做爲不少工具類的基礎框架來使用的,若是有抽象方法則子類在繼承時必需要重寫全部的抽象方法,這顯然是不符合AQS的設計初衷;因此,AQS框架採用了模板方法的設計模式,AQS將一些須要子類覆寫的方法都設計成protect方法,將其默認實現爲拋出UnsupportedOperationException異常,若是子類須要使用到此方法,則重寫此方法。編輯器

AQS底層設計並非特別複雜,它底層採用的是狀態標誌位(state變量)+FIFO隊列的方式來記錄獲取鎖、釋放鎖、競爭鎖等一系列鎖操做;對於AQS而言,其中的state變量能夠看作是鎖,隊列採用的是先進先出的雙向鏈表,state共享狀態變量表示鎖狀態,內部使用CAS對state進行原子操做修改來完成鎖狀態變動(鎖的持有和釋放)。函數

隊列結構.png

當某個線程請求持有鎖時,經過判斷state當前狀態,判斷鎖是否被其餘線程持有,若是沒有被佔用,那就讓請求線程持有鎖;若是鎖被佔用,那請求線程將進入阻塞狀態,將其封裝成Node節點,而後經過節點之間進行關聯,組成了一個雙向鏈表。當持有鎖的線程完成操做之後,會釋放鎖資源,而後喚醒在隊列中的節點(固然這是公平鎖的作法,咱們下面會說到),就這樣經過隊列來實現了線程的阻塞和喚醒。那下面咱們就經過具體的代碼來看一下AQS的實現。工具

AQS核心實現

AQS核心要素

狀態(state)

    private volatile int state;

    protected final int getState() {
        return state;
    }

    protected final void setState(int newState) {
        state = newState;
    }

    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
複製代碼

state狀態這裏仍是比較簡單的,使用volatile修飾,保證state變量的可見性, setState(int newState)方法只是用做給state進行初始化,而compareAndSetState(int expect, int update)用做了在運行期間對state變量的修改。

爲何要單獨多出來一個compareAndSetState方法對state變量進行修改呢?由於對共享變量的賦值,不是原子操做須要額外的鎖同步,咱們可能想到使用synchronized來保證原子性,可是synchronizedh會使線程阻塞,致使線程上下文的切換,影響其性能。這裏採用的是CAS無鎖操做,可是CAS也是有不足的,它會進行自旋操做,這樣也會對CPU的資源形成浪費。

compareAndSet.gif
compareAndSet.gif

同步隊列(FIFO)

AQS會把沒有爭搶到鎖的線程包裝成Node節點,加入到隊列中,咱們看一下Node的結構

static final class Node {
   //標記節點是共享模式
    static final Node SHARED = new Node();
    //標記節點是獨佔的
    static final Node EXCLUSIVE = null;

   //表明此節點的線程取消了爭搶資源
    static final int CANCELLED =  1;
  
   //表示當前node的後繼節點對應的線程須要被喚醒
    static final int SIGNAL    = -1;
  
  //這兩個狀態和condition有關係,這裏先不說condition
    static final int CONDITION = -2;
   
    static final int PROPAGATE = -3;

   // 取值爲上面的一、-一、-二、-3 或者 0
    volatile int waitStatus;

    volatile Node prev;

    volatile Node next;
   //等待線程
    volatile Thread thread;

    Node nextWaiter;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {   
    }
 
   //線程入隊。
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

  //使用condition用到
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}
複製代碼

同步隊列是AQS的核心,用來實現線程的阻塞和喚醒操做,waitStatus它表示了當前Node所表明的線程的等待鎖的狀態,在獨佔鎖模式下,咱們只須要關注CANCELLED、SIGNAL兩種狀態便可。nextWaiter屬性,它在獨佔鎖模式下永遠爲null,僅僅起到一個標記做用。下圖是基於獨佔鎖畫的。

CHL.jpg
CHL.jpg

獨佔鎖&共享鎖

AQS定義兩種資源共享方式:

  • Exclusive (獨佔模式):只有一個線程能訪問共享資源。如 ReentrantLock
  • Share(共享模式):多個線程可同時訪問共享資源,如Semaphore/CountDownLatch)。

AQS的設計是基於模板方法模式,隊列維護和Node節點的入隊出隊或者獲取資源失敗等操做,AQS都已經實現了。資源的實際獲取邏輯交由子類去實現。並且它提供了兩種資源訪問的方式:Exclusive(獨佔模式)和Share(共享模式);須要實現什麼樣的資源訪問模式,子類只須要重寫AQS預留的方法,利用其提供的原子操做方法,來修改state變量實現相應的同步邏輯就能夠了。通常狀況下,子類只須要根據需求實現其中一種模式,固然也有同時實現兩種模式的同步類,如ReadWriteLock。

自定義同步器實現時主要實現如下幾種方法:

  • tryAcquire(int):獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。
  • tryRelease(int):獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。
  • tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
  • tryReleaseShared(int):共享方式。嘗試釋放資源,若是釋放後容許喚醒後續等待結點返回true,不然返回false。

在獨佔模式下和synchronized實現的效果是同樣的,一次只能有一個線程訪問。state 等於0 表明沒有線程持有鎖,大於0表明有線程持有當前鎖。這個值能夠大於1,是由於鎖能夠重入,每次重入都加上 1,也須要對應的屢次釋放。

在共享模式下,state的值表明着有多少個許可,可是它在每一個具體的工具類裏的應用仍是有一些差異的。經過下面的動畫來感覺一下什麼是共享鎖的用法。

Apr-15-2020 07-22-21.gif
Apr-15-2020 07-22-21.gif
CountDownLatch.gif
CountDownLatch.gif

Exclusive(獨佔模式): ReenTranLock example

前面咱們說AQS獲取鎖的邏輯都是交由子類去實現,那咱們就經過具體代碼來看一會兒類究竟是如何實現的,以ReentranLock爲例,來看一下實現的細節。

ReentrantLock有公平鎖 和 非公平鎖 兩種實現, 默認實現爲非公平鎖, 這體如今它的構造函數中,咱們接下來就以獨佔鎖開始分析一下ReentranLock,咱們先來看一下ReentranLock的結構。

public class ReentrantLock implements Lockjava.io.Serializable {
  
    private final Sync sync;
    
    //ReentranLock的內部類,
    abstract static class Sync extends AbstractQueuedSynchronizer {
        ...
    }
    
    // 非公平鎖
    static final class NonfairSync extends Sync{...}
    
    //公平鎖
    static final class FairSync extends Sync {...}
    
    //構造函數
    public ReentrantLock() {
      sync = new NonfairSync();
    }

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    
    // 獲取鎖
    public void lock() {
        sync.lock();
    }
    
    // 釋放鎖
    public void unlock() {
        sync.release(1);
    }
    ...
}
複製代碼

能夠看到FairSync和NonfairSync都是繼承自Sync,而Sync又繼承自AQS。ReentrantLock獲取鎖的邏輯是直接調用了FairSync或者NonfairSync的邏輯.咱們就以FairSync爲例,來看一下具體實現。

FairSync(公平鎖)

 static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
        //搶鎖
        final void lock() {
        //這裏直接調用的AQS的acquire()方法
            acquire(1);
        }
    
     //====此方法來自AQS,爲了方便閱讀,貼過來了====
     /**
     經過代碼咱們能看到,若是tryAcquire(arg)這個方法返回true,直接就退出了,後續也就不會進行了。
     因此咱們能夠推斷出來,大部分狀況下,應該返回的是false。咱們一個方法一個方法來看。
     */

      public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
          acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
            selfInterrupt();
          }
      }
    //====================================== 
      
      protected final boolean tryAcquire(int acquires) {
         ...
      }
}
複製代碼

tryAcquire

tryAcquire 判斷當前鎖有沒有被佔用:

  1. 若是鎖沒有被佔用, 嘗試以公平的方式獲取鎖
  2. 若是鎖已經被佔用, 檢查是否是鎖重入

獲取鎖成功返回true, 失敗則返回false

   /**
   if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
        selfInterrupt();
    } 
    嘗試獲取鎖,返回boolean,是否獲取鎖成功。
    true:1.表明沒有線程在等待鎖。2.自己就持有鎖,可是是重入鎖。
    */

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        //若是等於0,那麼表明沒有線程持有鎖。
        if (c == 0) {
           /**
           到這裏說明沒有線程搶鎖,再去判斷隊列中是否有線程在等待獲取鎖。
           由於是公平鎖,老是先來後到的
           若是隊列中沒有線程等待獲取鎖,那就嘗試去獲取鎖。
           若是獲取成功了,那就把當前佔用鎖的線程,更新爲當前線程。
           */

           if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
           /**
           hasQueuedPredecessors方法,主要來判斷隊列是否爲空,
           判斷頭結點的後節點是否是當前節點。
            public final boolean hasQueuedPredecessors() {
             Node t = tail;
             Node h = head;
             Node s;
             return h != t &&
                    ((s = h.next) == null || s.thread != Thread.currentThread());
            }
           */

             // 將當前線程設置爲佔用鎖的線程
              setExclusiveOwnerThread(current);
    
              return true;
           }
        }else if (current == getExclusiveOwnerThread()) {
         /**
         若是到這裏,說明當前佔用鎖的線程就是自己。就是咱們所說的重入鎖。
         */

            int nextc = c + acquires;
            
            if (nextc < 0){
              throw new Error("Maximum lock count exceeded");
            }
            
            setState(nextc);
            
            return true;
        }
     //若是到這裏都沒有返回true,說明沒有獲取到鎖。
      return false;
    }


複製代碼

addWaiter

若是tryAcquire()方法返回false說明搶鎖失敗了,那就繼續執行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法, 這一步主要是將沒有搶到鎖的線程加入到隊列中,咱們先來看一下addWaiter()方法。

  /**
  if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
        selfInterrupt();
    } 
  */
  
private Node addWaiter(Node mode) {
   /** Node構造方法
        Node(Thread thread, Node mode) {     
            this.nextWaiter = mode;
            this.thread = thread;
        }
        Node(Thread thread, int waitStatus) { 
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
   */

        Node node = new Node(Thread.currentThread(), mode);
      
        Node pred = tail;
       // tail !=null 說明隊列不爲空。當隊列爲空時tail = head 是爲null的,
        if (pred != null) {
          //將新節點的前驅節點指向舊的尾結點。
           node.prev = pred;
           //將新的節點變成尾結點。
           if (compareAndSetTail(pred, node)) {
                //若是設置成功,那就將舊的尾結點的後繼節點,指向新的節點。直接返回Node
                pred.next = node;
                return node;
            }
        }
        //若是執行到這裏說明有兩種狀況 :1.隊列爲空。2.CAS失敗(有線程在競爭入隊)
        //這時會執行enq()方法
        enq(node);
        return node;
    }
 
複製代碼

此方法主要是將等待的線程包裝成 Node節點。可見,每個處於獨佔鎖模式下的節點,nextWaiter 必定是null。此方法會先判斷隊列是否爲空,若是不爲空,嘗試將Node節點添加到隊列的隊尾。若是入隊失敗了或者隊列爲空,就執行enq方法。

若是執行了enq()方法會有兩種可能:

  1. 等待隊列如今是空的,尚未被初始化。
  2. 其餘線程在當前線程入隊的過程當中率先完成了入隊,致使尾節點的值已經改變了,CAS操做失敗。

enq

在該方法中使用了死循環, 即以自旋方式將節點插入隊列,若是失敗則不停的嘗試, 直到成功爲止, 另外, 該方法也負責在隊列爲空時, 初始化隊列,這也說明,隊列是延時初始化的(lazily initialized):

   /*咱們再看一下enq()這個方法的代碼。
   這個方法採用了自旋式入隊列的方式。
   若是沒有搶到鎖,那就一直循環,直到入隊。
   */

     private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
          if (t == null) { 
           /**若是tail==null 說明隊列爲空,咱們在剛開始的時候會發現,head和tail都爲null,
            是沒有進行初始化的。這裏仍是使用的cas設置頭結點,跟設置尾結點同樣。
            */

              if (compareAndSetHead(new Node())){
               /**
              這裏設置了頭節點,可是尾結點仍是爲null,
              將尾結點也設置一下,注意,此時尚未return,繼續循環。
               */

                  tail = head;
              }
         
           }else {
           //這個其實和addWaiter()方法是相似的,都是將線程添加到隊尾。
           //只不過是若是不成功一直循環,直到成功爲止。
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
複製代碼

咱們這裏能夠看到,當隊列爲空的時候,初始化隊列沒有使傳入的那個Node節點,而是新建了一個Node節點。初始化之後裏面沒有返回,而是直接進入下一次循環,此時隊列已經不爲空了,就將傳入的Node節點添加到隊尾。這也說明了爲何在咱們剛開始說FIFO隊列的時候頭結點是空節點了。

這裏咱們能夠看到enq()方法是有返回值的,返回的是node結點的前驅節點,只不過在這裏沒有用到它的返回值,可是在其餘的地方用到了它的返回值。

acquireQueued

代碼能走到這裏已經說明,通過addWaiter(Node.EXCLUSIVE),此時節點添加到了隊列中。

注意:若是acquireQueued(addWaiter(Node.EXCLUSIVE),arg))返回true的話,意味着上面這段代碼將進入selfInterrupt(),因此正常狀況下,下面應該返回false。

 /**
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
        selfInterrupt();
    }
 */

  final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //當前節點的前驅節點。addWaiter方法返回的是通過封裝的Node節點。
                final Node p = node.predecessor();
               /**
              p == head 說明當前節點已經進到了阻塞隊列中,可是Node節點是阻塞隊列的第一個,由於它的前驅是         head。正常狀況下,咱們是將Node節點添加到隊尾的,若是說Node的前驅節點是head節點,說明Node節點是        阻塞隊列中的第一個,能夠再去嘗試獲取鎖。
               */

                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                /**
                private void setHead(Node node) {
                     head = node;
                     node.thread = null;
                     node.prev = null;
                }
                */

                    p.next = null// help GC
                    failed = false;
                    return interrupted;
                }
     
                //當前Node不是在CLH隊列的第一位或者是當前線程獲取鎖失敗,判斷是否須要把當前線程掛起。
                if(shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()){
                    interrupted = true;
                }
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
複製代碼

shouldParkAfterFailedAcquire

咱們在分析FIFO隊列的結構時,看到節點組成中有 waitStatus這個狀態,它的取值有四個

static final int CANCELLED =  1;
static final int SIGNAL    = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
複製代碼

在獨佔鎖的狀況下只會用到 CANCELLED 和 SIGNAL這兩個狀態,怎麼理解這個狀態表明的含義呢。

CANCELLED這個比較好理解,它表示當前的節點取消了排隊,即取消了搶鎖。SIGNAL這個狀態它不表示當前節點的狀態,它表明當前節點前驅節點的狀態,當一個節點的waitStatus被置爲SIGNAL,就說明它的下一個節點已經被掛起了(或者立刻就要被掛起了),所以在當前節點釋放了鎖或者放棄獲取鎖時,若是它的waitStatus屬性爲SIGNAL,它還要完成一個額外的操做——喚醒它的後繼節點。

/**if(shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()){}
*/


private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
  //我認爲這裏只是對前驅節點狀態進行判斷,判斷前驅節點時候是正常狀態,由於咱們知道若是當前節點被掛
  //喚醒時須要前驅節點來進行喚醒的,若是當前節點的前驅節點是正常狀態,就能保證當前節點能夠被正常喚醒,
  //由於在等待隊列中的節點有可能退出了所等待,因此須要判斷前驅節點狀態是否正常。
    if (ws == Node.SIGNAL)
   //若是前驅節點的狀態已是SIGNAL,就直接返回true,接下來就會直接去執行parkAndCheckInterrupt()將線程掛起
    //由於前驅節點狀態正常,當前節點能夠被掛起。
      return true;
  
        /*
         * 當前驅節點的status大於0說明前驅節點取消了搶鎖,退出了隊列。
         若是前驅節點取消了搶鎖,就繼續往前找,找到一個節點是正常狀態的節點,而後直接跳過那些不排隊的節點,添加到               第一個正常等待節點的後面
         */

    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         前驅節點的狀態既不是SIGNAL,也不是CANCELLED
         用CAS設置前驅節點的ws爲 Node.SIGNAL。
         */

        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
複製代碼

這裏值得咱們注意的是,只有當前節點的前驅節點狀態等於SIGNAL的時候纔會返回ture,其餘狀況只會返回false。

當返回false以後呢,又會回到acquireQueued方法中循環,由於當前節點的前驅節點發生了變化,說不定前驅節點是頭結點了呢,直到返回true,也就是前驅節點狀態時SIGNAL,就能夠安心的將當前線程掛起了,此時將調用parkAndCheckInterrupt將線程掛起。

parkAndCheckInterrupt

這個方法很簡單,由於前面返回true,因此須要掛起線程,這個方法就是負責掛起線程的,到這裏鎖獲取就已經分析完了。

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // 線程被掛起,停在這裏再也不往下執行了
    return Thread.interrupted();
}
複製代碼

NofairSync(非公平鎖)

非公平鎖的實現,其實和公平鎖的實現差異不大,具體經過代碼來看一下吧。

 static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        //非公平鎖的lock和公平鎖的lock區別在於,非公平鎖直接上來就去直接獲取鎖,無論阻塞隊列是有線程等待
        final void lock() {
            if (compareAndSetState(01))
             setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
    /**
     if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
     }
     */
 
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
    
    //這個方法來自於Sync
    final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
             //這裏非公平鎖直接去獲取鎖。
             //而公平鎖的話,要判斷隊列中是否有線程在等待。
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
複製代碼

公平鎖和非公平鎖的實現有細微的差異,可是差異不是很大。非公平鎖和公平鎖的不一樣在於,非公平鎖在lock()的時候,多了一段代碼

//非公平鎖的lock
final void lock() {
   if (compareAndSetState(01))
      setExclusiveOwnerThread(Thread.currentThread());
   else
      acquire(1);
}

//公平鎖的lock
final void lock() {
      acquire(1);
}
複製代碼

非公平鎖在lock的時候,就會直接去嘗試拿鎖,若是嘗試成功了,就直接佔有鎖。這是第一個不一樣。

在tryAcquire()方法中,公平鎖會多出!hasQueuedPredecessors()行這個代碼,這段代碼主要就是判斷阻塞隊列中是否已經有等待線程。

//公平鎖
if (c == 0) {
    if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {
      setExclusiveOwnerThread(current);
      return true;
   }
}
 /**
     public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
*/

//非公平鎖。
if (c == 0) {
    if (compareAndSetState(0, acquires)) {
      setExclusiveOwnerThread(current);
      return true;
   }
}
複製代碼

這裏公平鎖會去判斷隊列中是否有線程在等待獲取鎖,只有當阻塞隊列爲空時,纔會嘗試去獲取鎖。

可是非公平鎖不會檢查阻塞隊列中是否已經有線程等待,而是會直接去獲取鎖。

公平和非公平鎖的實現差別就這些不一樣,其餘的實現邏輯都是差很少的。

release 釋放鎖

前面咱們說到若是沒有搶到鎖,就會被LockSupport.park(this)掛起線程,那如何解鎖,喚醒線程的呢,接下來咱們看一下。

 public void unlock() {
     sync.release(1);
 }
 
 //====此方法來自AQS================
 public final boolean release(int arg) {
        //嘗試去釋放鎖
        if (tryRelease(arg)) {
         
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
 }
複製代碼

這裏會調用tryRelease()方法嘗試去釋放鎖,若是釋放鎖成功了,判斷頭結點的狀態,去喚醒線程。 這裏須要說明的一點,head != null 咱們能理解,可是爲何waitStatus != 0 呢。 咱們前面看了線程搶鎖,只有一處給waitStatus賦值了。在shouldParkAfterFailedAcquire這個方法中,將前驅節點的 waitStatus設爲Node.SIGNAL。能夠往前翻一下。

除此之外,還有就是在初始化的時候enq()方法中,對waitStatus初始化的時候默認爲0,其餘地方沒有對 waitStatus賦值。若是waitStatus != 0,那說明head後面沒有被掛起等待喚醒的線程,也就不須要去喚醒。

tryRelease

 protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
           //判斷佔有鎖的線程是否是當前線程。
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            //判斷是否徹底釋放鎖,有可能重入
            if (c == 0) {
              free = true;
              //將表示佔有鎖的線程標誌置空。
              setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
 }
複製代碼

太簡單了,沒有什麼好說的。

unparkSuccessor


 private void unparkSuccessor(Node node) {
     //咱們知道阻塞隊列是一個先進先出的隊列,喚醒的話,也是按照順序喚醒的,咱們能夠看到參數的Node是頭結點
    //若是頭結點的waitStatus < 0 ,說
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
    
        Node s = node.next;
    //下面的代碼就是喚醒後繼節點,可是有可能後繼節點取消了等待(waitStatus==1)
    // 從隊尾往前找,找到waitStatus<=0的全部節點中排在最前面的
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //喚醒線程  
        if (s != null)
            LockSupport.unpark(s.thread);
  }

 //喚醒線程之後,被喚醒的線程將從如下代碼中繼續往前走:
  
 private final boolean parkAndCheckInterrupt() {
         LockSupport.park(this); // 剛剛線程被掛起在這裏了
      return Thread.interrupted();
 }
 // 又回到這個方法了:acquireQueued(final Node node, int arg),這個時候,node的前驅是head了
複製代碼

到這裏基本上吧ReenTranLock的獲取鎖、釋放鎖都分析完了,具體的一些細節可能沒有說到,你們就本身去跟一下代碼就能夠了。

總結

本片文章基於ReentranLock獨佔鎖,分析了AQS瞭解到了一下幾點,

  1. AQS中用state屬性表示鎖,在ReentranLock中當state = 1 獲取鎖,state = 0表明釋放鎖, state>1表明重入鎖。exclusiveOwnerThread屬性表明了佔有鎖的線程。
  2. addWaiter負責將當前等待鎖的線程包裝成Node,併成功地添加到隊列的末尾,這一點是由它調用的enq方法保證的,enq方法同時還負責在隊列爲空時初始化隊列。
  3. acquireQueued方法用於在Node成功入隊後,繼續嘗試獲取鎖(取決於Node的前驅節點是否是head),或者將線程掛起
  4. shouldParkAfterFailedAcquire方法用於保證當前線程的前驅節點的waitStatus屬性值爲SIGNAL,從而保證了本身掛起後,前驅節點會負責在合適的時候喚醒本身。
  5. parkAndCheckInterrupt方法用於掛起當前線程,並檢查中斷狀態。
  6. 若是最終成功獲取了鎖,線程會從lock()方法返回,繼續往下執行;不然,線程會阻塞等待。

後記

本來的計劃是一週輸出一篇,可是臨時遇上有一個緊急需求要作,上上週六加班,周天又和chessy大佬面基約了一個飯,上週也是天天很晚回去,週六加班也在加班趕需求,本週要提測,月底要上線,中間也是抽時間磕磕絆絆的寫一點是一點,終於在昨天寫完了。最近兩週感受本身的精力被耗盡了,狀態不是和好好,這幾天把狀態調整一下。上上週跟chessy大佬聊了不少,讓我有不少感想,計劃寫一篇關於持續學習和我的成長方向的分享,你們到時候也能夠互相交流一下心得。

碼了這麼多字也是不容易,那就點個贊支持一下唄。

參考

https://javadoop.com/2017/06/16/AbstractQueuedSynchronizer/

相關文章
相關標籤/搜索