沉澱再出發:關於java中的AQS理解

沉澱再出發:關於java中的AQS理解

1、前言

    在java中有不少鎖結構都繼承自AQS(AbstractQueuedSynchronizer)這個抽象類若是咱們仔細瞭解能夠發現AQS的做用是很是大的,可是AQS的底層其實也是使用了大量的CAS,所以咱們能夠看到CAS的重要性了,可是CAS也是有缺陷的,可是在大部分使用的狀況下,咱們每每忽略了這種缺陷。html

2、AQS的認識

  2.一、AQS的基本概念

    AQS(AbstractQueuedSynchronizer)就是抽象的隊列式的同步器,AQS定義了一套多線程訪問共享資源的同步器框架,許多同步類實現都依賴於它,AQS是一個Java提供的底層同步工具類,用一個int類型的變量表示同步狀態,並提供了一系列的CAS操做來管理這個同步狀態。AQS的主要做用是爲Java中的併發同步組件提供統一的底層支持,如經常使用的ReentrantLock/Semaphore/CountDownLatch等等就是基於AQS實現的,用法是經過繼承AQS實現其模版方法,而後將子類做爲同步組件的內部類。java

    同步隊列是AQS很重要的組成部分,它是一個雙端隊列,遵循FIFO原則,主要做用是用來存放在鎖上阻塞的線程,當一個線程嘗試獲取鎖時,若是已經被佔用,那麼當前線程就會被構形成一個Node節點加入到同步隊列的尾部,隊列的頭節點是成功獲取鎖的節點,當頭節點線程釋放鎖時,會喚醒後面的節點並釋放當前頭節點的引用。node

       它維護了一個volatile int state(表明共享資源)和一個FIFO線程等待隊列(多線程爭用資源被阻塞時會進入此隊列)。state的訪問方式有三種:安全

1     getState()
2     setState()
3     compareAndSetState()

     AQS定義兩種資源共享方式:Exclusive(獨佔,只有一個線程能執行,如ReentrantLock)和Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)。
    不一樣的自定義同步器爭用共享資源的方式也不一樣。自定義同步器在實現時只須要實現共享資源state的獲取與釋放方式便可,至於具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。自定義同步器實現時主要實現如下幾種方法:數據結構

1     isHeldExclusively():該線程是否正在獨佔資源。只有用到condition才須要去實現它。
2     tryAcquire(int):獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。
3     tryRelease(int):獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。
4     tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
5     tryReleaseShared(int):共享方式。嘗試釋放資源,若是釋放後容許喚醒後續等待結點返回true,不然返回false

  ReentrantLock:state初始化爲0,表示未鎖定狀態。A線程lock()時,會調用tryAcquire()獨佔該鎖並將state+1。此後,其餘線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)爲止,其它線程纔有機會獲取該鎖。固然,釋放鎖以前,A線程本身是能夠重複獲取此鎖的(state會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多麼次,這樣才能保證state是能回到零態的。
  CountDownLatch:任務分爲N個子線程去執行,state也初始化爲N(注意N要與線程個數一致)。這N個子線程是並行執行的,每一個子線程執行完後countDown()一次,state會CAS減1。等到全部子線程都執行完後(即state=0),會unpark()主調用線程,而後主調用線程就會從await()函數返回,繼續後餘動做。
  通常來講,自定義同步器要麼是獨佔方法,要麼是共享方式,他們也只需實現tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種便可。但AQS也支持自定義同步器同時實現獨佔和共享兩種方式,如ReentrantReadWriteLock。多線程

2.1.一、acquire(int)

  此方法是獨佔模式下線程獲取共享資源的頂層入口。若是獲取到資源,線程直接返回,不然進入等待隊列,直到獲取到資源爲止,且整個過程忽略中斷的影響。這也正是lock()的語義,固然不只僅只限於lock()。獲取到資源後,線程就能夠去執行其臨界區代碼了。下面是acquire()的源碼:併發

1 public final void acquire(int arg) {
2      if (!tryAcquire(arg) &&
3          acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4          selfInterrupt();
5 }
tryAcquire()嘗試直接去獲取資源,若是成功則直接返回;
addWaiter()將該線程加入等待隊列的尾部,並標記爲獨佔模式;
acquireQueued()使線程在等待隊列中獲取資源,一直獲取到資源後才返回。若是在整個等待過程當中被中斷過,則返回true,不然返回false。
若是線程在等待過程當中被中斷過,它是不響應的。只是獲取資源後纔再進行自我中斷selfInterrupt(),將中斷補上。

tryAcquire(int)

  此方法嘗試去獲取獨佔資源。若是獲取成功,則直接返回true,不然直接返回false。以下是tryAcquire()的源碼:框架

protected boolean tryAcquire(int arg) {
     throw new UnsupportedOperationException();
}

   AQS只是一個框架,具體資源的獲取/釋放方式交由自定義同步器去實現,AQS這裏只定義了一個接口,具體資源的獲取交由自定義同步器去實現(經過state的get/set/CAS)。至於能不能重入,能不能阻塞,那就看具體的自定義同步器怎麼去設計了,固然,自定義同步器在進行資源訪問時要考慮線程安全的影響。這裏沒有定義成abstract是由於獨佔模式下只用實現tryAcquire-tryRelease,而共享模式下只用實現tryAcquireShared-tryReleaseShared。若是都定義成abstract,那麼每一個模式也要去實現另外一模式下的接口,這樣設計能夠儘可能減小沒必要要的工做量jvm

 addWaiter(Node)

  此方法用於將當前線程加入到等待隊列的隊尾,並返回當前線程所在的結點。函數

private Node addWaiter(Node mode) {
     //以給定模式構造結點。mode有兩種:EXCLUSIVE(獨佔)和SHARED(共享)
    Node node = new Node(Thread.currentThread(), mode);
     //嘗試快速方式直接放到隊尾。
     Node pred = tail;
     if (pred != null) {
         node.prev = pred;
         if (compareAndSetTail(pred, node)) {
             pred.next = node;
             return node;
         }
     }
     //上一步失敗或者初次加入,則採用終極自旋方式保證必定加入隊尾
    enq(node);
    return node;
 }

    Node結點是對每個訪問同步代碼的線程的封裝,其包含了須要同步的線程自己以及線程的狀態,如是否被阻塞,是否等待喚醒,是否已經被取消等。變量waitStatus則表示當前被封裝成Node結點的等待狀態,共有4種取值CANCELLED、SIGNAL、CONDITION、PROPAGATE。

1     CANCELLED:值爲1,在同步隊列中等待的線程等待超時或被中斷,須要從同步隊列中取消該Node的結點,其結點的waitStatus爲CANCELLED,即結束狀態,
進入該狀態後的結點將不會再變化。
2 SIGNAL:值爲-1,被標識爲該等待喚醒狀態的後繼結點,當其前繼結點的線程釋放了同步鎖或被取消,將會通知該後繼結點的線程執行。說白了,就是處於喚醒狀態,
只要前繼結點釋放鎖,就會通知標識爲SIGNAL狀態的後繼結點的線程執行。
3 CONDITION:值爲-2,與Condition相關,該標識的結點處於等待隊列中,結點的線程等待在Condition上,當其餘線程調用了Condition的signal()方法後,
CONDITION狀態的結點將從等待隊列轉移到同步隊列中,等待獲取同步鎖。
4 PROPAGATE:值爲-3,與共享模式相關,在共享模式中,該狀態標識結點的線程處於可運行狀態。 5 0狀態:值爲0,表明初始化狀態。 6 AQS在判斷狀態時,經過用waitStatus>0表示取消狀態,而waitStatus<0表示有效狀態。

enq(Node)

    此方法用於將node加入隊尾,採用終極自旋方式保證必定加入隊尾。CAS自旋volatile變量,是一種很經典的用法。

private Node enq(final Node node) {
     //CAS"自旋",直到成功加入隊尾
    for (;;) {
         Node t = tail;
         if (t == null) { // 隊列爲空,建立一個空的標誌結點做爲head結點,並將tail也指向它。
             if (compareAndSetHead(new Node()))
                 tail = head;
         } else {//正常流程,放入隊尾
            node.prev = t;
             if (compareAndSetTail(t, node)) {
                 t.next = node;
                 return t;
             }
         }
     }
}

acquireQueued(Node, int)

     經過tryAcquire()和addWaiter(),該線程獲取資源失敗,已經被放入等待隊列尾部了。該線程下一部進入等待狀態休息,直到其餘線程完全釋放資源後喚醒本身,本身再拿到資源,而後就能夠去幹本身想幹的事了。acquireQueued()就是幹這件事:在等待隊列中排隊拿號(中間沒其它事幹能夠休息),直到拿到號後再返回,這個函數很是關鍵。

 1 final boolean acquireQueued(final Node node, int arg) {
 2     boolean failed = true;//標記是否成功拿到資源
 3     try {
 4          boolean interrupted = false;//標記等待過程當中是否被中斷過
 5         //又是一個「自旋」!
 6         for (;;) {  7             final Node p = node.predecessor();//拿到前驅  8             //若是前驅是head,即該結點已成老二,那麼便有資格去嘗試獲取資源(多是老大釋放完資源喚醒本身的,固然也可能被interrupt了)。
 9             if (p == head && tryAcquire(arg)) { 10                 setHead(node);//拿到資源後,將head指向該結點。因此head所指的標杆結點,就是當前獲取到資源的那個結點或null。
11                 p.next = null; 
// setHead中node.prev已置爲null,此處再將head.next置爲null,就是爲了方便GC回收之前的head結點。也就意味着以前拿完資源的結點出隊了! 12 failed = false; 13 return interrupted;//返回等待過程當中是否被中斷過 14 } 15 16 //若是本身能夠休息了,就進入waiting狀態,直到被unpark() 17 if (shouldParkAfterFailedAcquire(p, node) && 18 parkAndCheckInterrupt()) 19 interrupted = true;//若是等待過程當中被中斷過,哪怕只有那麼一次,就將interrupted標記爲true 20 } 21 } finally { 22 if (failed) 23 cancelAcquire(node); 24 } 25 }

 到這裏了,咱們先看看shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()具體幹些什麼。

shouldParkAfterFailedAcquire(Node, Node)

   此方法主要用於檢查狀態,看看本身是否真的能夠去休息了,以避免隊列前邊的線程都放棄了盲等。

 1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 2      int ws = pred.waitStatus;//拿到前驅的狀態
 3       if (ws == Node.SIGNAL)
 4          //若是已經告訴前驅拿完號後通知本身一下,那就能夠安心休息了
 5         return true;
 6         if (ws > 0) {
 7          /*
 8          * 若是前驅放棄了,那就一直往前找,直到找到最近一個正常等待的狀態,並排在它的後邊。
 9          * 注意:那些放棄的結點,因爲被本身「加塞」到它們前邊,它們至關於造成一個無引用鏈,
10          *           稍後就會被GC回收
11         */
12          do {
13              node.prev = pred = pred.prev;
14           } while (pred.waitStatus > 0);
15            pred.next = node;
16      } else {
17           //若是前驅正常,那就把前驅的狀態設置成SIGNAL,告訴它拿完號後通知本身一下。
18          //有可能失敗,前驅說不定剛剛釋放完。
19          compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
20      }
21       return false;
22 }

      整個流程中,若是前驅結點的狀態不是SIGNAL,那麼本身就不能安心去休息,須要去找個安心的休息點,同時能夠再嘗試下看有沒有機會輪到本身拿號。

1 parkAndCheckInterrupt()
2    若是線程找好安全休息點後,那就能夠安心去休息了。此方法就是讓線程去休息,真正進入等待狀態。
3 private final boolean parkAndCheckInterrupt() {
4      LockSupport.park(this);//調用park()使線程進入waiting狀態
5      return Thread.interrupted();//若是被喚醒,查看本身是否是被中斷的。
6 }

    park()會讓當前線程進入waiting狀態。在此狀態下,有兩種途徑能夠喚醒該線程:1)被unpark();2)被interrupt()。須要注意的是,Thread.interrupted()會清除當前線程的中斷標記位。
    至此,咱們看一下前面的總函數就知道了整個流程了:

2.1.二、release(int)

  這裏咱們來說一下acquire()的反操做release()。此方法是獨佔模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,若是完全釋放(即state=0),則喚醒等待隊列裏的其餘線程。

1 public final boolean release(int arg) {
2     if (tryRelease(arg)) {
3          Node h = head;//找到頭結點
4        if (h != null && h.waitStatus != 0)
5             unparkSuccessor(h);//喚醒等待隊列裏的下一個線程
6          return true;
7      }
8      return false;
9 }

   邏輯並不複雜。調用tryRelease()來釋放資源。有一點須要注意的是,它是根據tryRelease()的返回值來判斷該線程是否已經完成釋放掉資源了,因此自定義同步器在設計tryRelease()的時候要明確這一點。

tryRelease(int)

1 protected boolean tryRelease(int arg) {
2     throw new UnsupportedOperationException();
3 }

  跟tryAcquire()同樣,這個方法是須要獨佔模式的自定義同步器去實現。正常來講,tryRelease()都會成功的,由於這是獨佔模式,該線程來釋放資源,那麼它確定已經拿到獨佔資源了,直接減掉相應量的資源便可(state-=arg),也不須要考慮線程安全的問題。但要注意它的返回值,release()是根據tryRelease()的返回值來判斷該線程是否已經完成釋放掉資源了,因此自義定同步器在實現時,若是已經完全釋放資源(state=0),要返回true,不然返回false。

unparkSuccessor(Node)

  此方法用於喚醒等待隊列中下一個線程。

 1 private void unparkSuccessor(Node node) {
 2     //這裏,node通常爲當前線程所在的結點。
 3         int ws = node.waitStatus;
 4      if (ws < 0)//置零,當前線程所在的結點狀態,容許失敗。
 5          compareAndSetWaitStatus(node, ws, 0);
 6      Node s = node.next;//找到下一個須要喚醒的結點s
 7      if (s == null || s.waitStatus > 0) {//若是爲空或已取消
 8         s = null;
 9         for (Node t = tail; t != null && t != node; t = t.prev)
10             if (t.waitStatus <= 0)//從這裏能夠看出,<=0的結點,都是還有效的結點。
11                  s = t;
12      }
13      if (s != null)
14           LockSupport.unpark(s.thread);//喚醒
15  }

     用unpark()喚醒等待隊列中最前邊的那個未放棄線程,這裏咱們也用s來表示吧。此時,再和acquireQueued()聯繫起來,s被喚醒後,進入if (p == head && tryAcquire(arg))的判斷(即便p!=head也不要緊,它會再進入shouldParkAfterFailedAcquire()尋找一個安全點。這裏既然s已是等待隊列中最前邊的那個未放棄線程了,那麼經過shouldParkAfterFailedAcquire()的調整,s也必然會跑到head的next結點,下一次自旋p==head就成立啦),而後s把本身設置成head標杆結點,表示本身已經獲取到資源了,acquire()也返回了。
  release()是獨佔模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,若是完全釋放了(即state=0),它會喚醒等待隊列裏的其餘線程來獲取資源。


    一樣的讓咱們再來看看對於共享鎖的狀況下,資源的獲取和釋放。

2.1.三、acquireShared(int)

  此方法是共享模式下線程獲取共享資源的頂層入口。它會獲取指定量的資源,獲取成功則直接返回,獲取失敗則進入等待隊列,直到獲取到資源爲止,整個過程忽略中斷。

1 public final void acquireShared(int arg) {
2     if (tryAcquireShared(arg) < 0)
3         doAcquireShared(arg);
4     }
5 }

  這裏tryAcquireShared()依然須要自定義同步器去實現。可是AQS已經把其返回值的語義定義好了:負值表明獲取失敗;0表明獲取成功,但沒有剩餘資源;正數表示獲取成功,還有剩餘資源,其餘線程還能夠去獲取。因此這裏acquireShared()的流程就是:
    tryAcquireShared()嘗試獲取資源,成功則直接返回;失敗則經過doAcquireShared()進入等待隊列,直到獲取到資源爲止才返回。

doAcquireShared(int)

  此方法用於將當前線程加入等待隊列尾部休息,直到其餘線程釋放資源喚醒本身,本身成功拿到相應量的資源後才返回。

 1 private void doAcquireShared(int arg) {
 2     final Node node = addWaiter(Node.SHARED);//加入隊列尾部
 3     boolean failed = true;//是否成功標誌
 4     try {
 5          boolean interrupted = false;//等待過程當中是否被中斷過的標誌
 6          for (;;) {
 7              final Node p = node.predecessor();//前驅
 8              if (p == head) {//若是到head的下一個,由於head是拿到資源的線程,此時node被喚醒,極可能是head用完資源來喚醒本身的
 9                  int r = tryAcquireShared(arg);//嘗試獲取資源
10                  if (r >= 0) {//成功
11                      setHeadAndPropagate(node, r);//將head指向本身,還有剩餘資源能夠再喚醒以後的線程
12                      p.next = null; // help GC
13                      if (interrupted)//若是等待過程當中被打斷過,此時將中斷補上
14                          selfInterrupt();
15                      failed = false;
16                      return;
17                  }
18               }
19              //判斷狀態,尋找安全點,進入waiting狀態,等着被unpark()或interrupt()
20              if (shouldParkAfterFailedAcquire(p, node) &&
21                  parkAndCheckInterrupt())
22                  interrupted = true;
23               }
24           }
25      } finally {
26          if (failed)
27              cancelAcquire(node);
28      }
29 }

    其實和acquireQueued()流程並無太大區別。只不過這裏將補中斷的selfInterrupt()放到doAcquireShared()裏了,而獨佔模式是放到acquireQueued()以外。
  跟獨佔模式比,還有一點須要注意的是,這裏只有線程是head.next時(「老二」),纔會去嘗試獲取資源,有剩餘的話還會喚醒以後的隊友。那麼問題就來了,假如老大用完後釋放了5個資源,而老二須要6個,老三須要1個,老四須要2個。老大先喚醒老二,老二一看資源不夠,他是把資源讓給老三呢,仍是不讓?答案是否認的!老二會繼續park()等待其餘線程釋放資源,也更不會去喚醒老三和老四了。獨佔模式,同一時刻只有一個線程去執行,這樣作何嘗不可;但共享模式下,多個線程是能夠同時執行的,如今由於老二的資源需求量大,而把後面量小的老三和老四也都卡住了。固然,這並非問題,只是AQS保證嚴格按照入隊順序喚醒罷了(保證公平,但下降了併發)。

setHeadAndPropagate(Node, int)

    此方法在setHead()的基礎上多了一步,就是本身甦醒的同時,若是條件符合(好比還有剩餘資源),還會去喚醒後繼結點,畢竟是共享模式。

 1 private void setHeadAndPropagate(Node node, int propagate) {
 2      Node h = head;
 3     setHead(node);//head指向本身
 4       //若是還有剩餘量,繼續喚醒下一個鄰居線程
 5      if (propagate > 0 || h == null || h.waitStatus < 0) {
 6          Node s = node.next;
 7          if (s == null || s.isShared())
 8              doReleaseShared();
 9     }
10 }

2.1.四、releaseShared()

  咱們來看acquireShared()的反操做releaseShared(),此方法是共享模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,若是成功釋放且容許喚醒等待線程,它會喚醒等待隊列裏的其餘線程來獲取資源。

1 public final boolean releaseShared(int arg) {
2     if (tryReleaseShared(arg)) {//嘗試釋放資源
3         doReleaseShared();//喚醒後繼結點
4         return true;
5    }
6     return false;
7 }

     此方法的流程也比較簡單,釋放掉資源後,喚醒後繼。跟獨佔模式下的release()類似,可是獨佔模式下的tryRelease()在徹底釋放掉資源(state=0)後,纔會返回true去喚醒其餘線程,這主要是基於獨佔下可重入的考量;而共享模式下的releaseShared()則沒有這種要求,共享模式實質就是控制必定量的線程併發執行,那麼擁有資源的線程在釋放掉部分資源時就能夠喚醒後繼等待結點。例如資源總量是13,A(5)和B(7)分別獲取到資源併發運行,C(4)來時只剩1個資源就須要等待。A在運行過程當中釋放掉2個資源量,而後tryReleaseShared(2)返回true喚醒C,C一看只有3個仍不夠繼續等待;隨後B又釋放2個,tryReleaseShared(2)返回true喚醒C,C一看有5個夠本身用了,而後C就能夠跟A和B一塊兒運行。而ReentrantReadWriteLock讀鎖的tryReleaseShared()只有在徹底釋放掉資源(state=0)才返回true,因此自定義同步器能夠根據須要決定tryReleaseShared()的返回值。

doReleaseShared()

    此方法用於喚醒後繼。

 1 private void doReleaseShared() {
 2     for (;;) {
 3        Node h = head;
 4         if (h != null && h != tail) {
 5            int ws = h.waitStatus;
 6            if (ws == Node.SIGNAL) {
 7                  if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
 8                      continue;
 9                  unparkSuccessor(h);//喚醒後繼
10            }
11            else if (ws == 0 &&
12                       !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
13                  continue;
14          }
15          if (h == head)// head發生變化
16                break;
17      }
18 }

    至此咱們詳解了獨佔和共享兩種模式下獲取-釋放資源(acquire-release、acquireShared-releaseShared)的源碼,值得注意的是,acquire()和acquireShared()兩種方法下,線程在等待隊列中都是忽略中斷的。AQS也支持響應中斷的,acquireInterruptibly()/acquireSharedInterruptibly()便是,這裏相應的源碼跟acquire()和acquireSahred()差很少。

 

2.三、獨佔鎖和共享鎖在實現上的區別

    獨佔鎖的同步狀態值爲1,即同一時刻只能有一個線程成功獲取同步狀態。共享鎖的同步狀態>1,取值由上層同步組件肯定。
    獨佔鎖隊列中頭節點運行完成後釋放它的直接後繼節點。共享鎖隊列中頭節點運行完成後釋放它後面的全部節點。
    共享鎖中會出現多個線程(即同步隊列中的節點)同時成功獲取同步狀態的狀況。

 2.四、簡單使用

   既然明白基本的操做機理,咱們就能夠實現本身的鎖機制了,好比mutex這種不可重入的互斥鎖。

 1 class Mutex implements Lock, java.io.Serializable {
 2     // 自定義同步器
 3     private static class Sync extends AbstractQueuedSynchronizer {
 4         // 判斷是否鎖定狀態
 5         protected boolean isHeldExclusively() {
 6             return getState() == 1;
 7         }
 8 
 9         // 嘗試獲取資源,當即返回。成功則返回true,不然false。
10         public boolean tryAcquire(int acquires) {
11             assert acquires == 1; // 這裏限定只能爲1個量
12             if (compareAndSetState(0, 1)) {//state爲0才設置爲1,不可重入!
13                 setExclusiveOwnerThread(Thread.currentThread());//設置爲當前線程獨佔資源
14                 return true;
15             }
16             return false;
17         }
18 
19         // 嘗試釋放資源,當即返回。成功則爲true,不然false。
20         protected boolean tryRelease(int releases) {
21             assert releases == 1; // 限定爲1個量
22             if (getState() == 0)//既然來釋放,那確定就是已佔有狀態了。只是爲了保險,多層判斷!
23                 throw new IllegalMonitorStateException();
24             setExclusiveOwnerThread(null);
25             setState(0);//釋放資源,放棄佔有狀態
26             return true;
27         }
28     }
29 
30     // 真正同步類的實現都依賴繼承於AQS的自定義同步器!
31     private final Sync sync = new Sync();
32 
33     //lock<-->acquire。二者語義同樣:獲取資源,即使等待,直到成功才返回。
34     public void lock() {
35         sync.acquire(1);
36     }
37 
38     //tryLock<-->tryAcquire。二者語義同樣:嘗試獲取資源,要求當即返回。成功則爲true,失敗則爲false。
39     public boolean tryLock() {
40         return sync.tryAcquire(1);
41     }
42 
43     //unlock<-->release。二者語義同樣:釋放資源。
44     public void unlock() {
45         sync.release(1);
46     }
47 
48     //鎖是否佔有狀態
49     public boolean isLocked() {
50         return sync.isHeldExclusively();
51     }
52 }

    同步類在實現時通常都將自定義同步器(sync)定義爲內部類,供本身使用;而同步類本身(Mutex)則實現某個接口,對外服務。固然,接口的實現要直接依賴sync,它們在語義上也存在某種對應關係,而sync只用實現資源state的獲取-釋放方式tryAcquire-tryRelelase,至於線程的排隊、等待、喚醒等,上層的AQS都已經實現好了,咱們不用關心。
   除了Mutex,ReentrantLock/CountDownLatch/Semphore這些同步類的實現方式都差很少,不一樣的地方就在獲取-釋放資源的方式tryAcquire-tryRelelase。掌握了這點,AQS的核心便被攻破了。

 2.五、重入鎖

    重入鎖指的是當前線成功獲取鎖後,若是再次訪問該臨界區,則不會對本身產生互斥行爲。Java中對ReentrantLock和synchronized都是可重入鎖,synchronized由jvm實現可重入機制,ReentrantLock的可重入性基於AQS實現。同時,ReentrantLock還提供公平鎖和非公平鎖兩種模式。

ReentrantLock重入鎖  

    重入鎖的基本原理是判斷上次獲取鎖的線程是否爲當前線程,若是是則可再次進入臨界區,若是不是,則阻塞。重入鎖的最主要邏輯就鎖判斷上次獲取鎖的線程是否爲當前線程。因爲ReentrantLock是基於AQS實現的,底層經過操做同步狀態來獲取鎖,下面看一下非公平鎖的實現邏輯:

 1 final boolean nonfairTryAcquire(int acquires) {
 2             //獲取當前線程
 3             final Thread current = Thread.currentThread();
 4             //經過AQS獲取同步狀態
 5             int c = getState();
 6             //同步狀態爲0,說明臨界區處於無鎖狀態,
 7             if (c == 0) {
 8                 //修改同步狀態,即加鎖
 9                 if (compareAndSetState(0, acquires)) {
10                     //將當前線程設置爲鎖的owner
11                     setExclusiveOwnerThread(current);
12                     return true;
13                 }
14             }
15             //若是臨界區處於鎖定狀態,且上次獲取鎖的線程爲當前線程
16             else if (current == getExclusiveOwnerThread()) {
17                  //則遞增同步狀態
18                 int nextc = c + acquires;
19                 if (nextc < 0) // overflow
20                     throw new Error("Maximum lock count exceeded");
21                 setState(nextc);
22                 return true;
23             }
24             return false;
25 }

非公平鎖

    非公平鎖是指當鎖狀態爲可用時,無論在當前鎖上是否有其餘線程在等待,新近線程都有機會搶佔鎖。上述代碼即爲非公平鎖和核心實現,能夠看到只要同步狀態爲0,任何調用lock的線程都有可能獲取到鎖,而不是按照鎖請求的FIFO原則來進行的。

公平鎖

    公平鎖是指當多個線程嘗試獲取鎖時,成功獲取鎖的順序與請求獲取鎖的順序相同,下面看一個ReentrantLock的實現:

 1 protected final boolean tryAcquire(int acquires) {
 2             final Thread current = Thread.currentThread();
 3             int c = getState();
 4             if (c == 0) {
 5                 //此處爲公平鎖的核心,即判斷同步隊列中當前節點是否有前驅節點
 6                 if (!hasQueuedPredecessors() &&
 7                     compareAndSetState(0, acquires)) {
 8                     setExclusiveOwnerThread(current);
 9                     return true;
10                 }
11             }
12             else if (current == getExclusiveOwnerThread()) {
13                 int nextc = c + acquires;
14                 if (nextc < 0)
15                     throw new Error("Maximum lock count exceeded");
16                 setState(nextc);
17                 return true;
18             }
19             return false;
20 }

    從上面的代碼中能夠看出,公平鎖與非公平鎖的區別僅在因而否判斷當前節點是否存在前驅節點!hasQueuedPredecessors() ,由AQS可知,若是當前線程獲取鎖失敗就會被加入到AQS同步隊列中,那麼,若是同步隊列中的節點存在前驅節點,也就代表存在線程比當前節點線程更早的獲取鎖,故只有等待前面的線程釋放鎖後才能獲取鎖。

 2.六、讀寫鎖

    Java提供了一個基於AQS到讀寫鎖實現ReentrantReadWriteLock,該讀寫鎖到實現原理是:將同步變量state按照高16位和低16位進行拆分,高16位表示讀鎖,低16位表示寫鎖。

 

1 一次只有一個線程能夠佔有寫模式的讀寫鎖, 可是能夠有多個線程同時佔有讀模式的讀寫鎖. 正是由於這個特性2 當讀寫鎖是寫加鎖狀態時, 在這個鎖被解鎖以前, 全部試圖對這個鎖加鎖的線程都會被阻塞.
3 當讀寫鎖在讀加鎖狀態時, 全部試圖以讀模式對它進行加鎖的線程均可以獲得訪問權, 可是若是線程但願以寫模式對此鎖進行加鎖, 它必須直到全部的線程釋放鎖.
4 一般, 當讀寫鎖處於讀模式鎖住狀態時, 若是有另外線程試圖以寫模式加鎖, 讀寫鎖一般會阻塞隨後的讀模式鎖請求, 這樣能夠避免讀模式鎖長期佔用, 而等待的寫模式鎖請求長期阻塞.
5 讀寫鎖適合於對數據結構的讀次數比寫次數多得多的狀況. 由於, 讀模式鎖定時能夠共享, 以寫模式鎖住時意味着獨佔, 因此讀寫鎖又叫共享-獨佔鎖.

2.6.一、寫鎖的獲取與釋放

  寫鎖是一個獨佔鎖,因此咱們看一下ReentrantReadWriteLock中tryAcquire(arg)的實現:
寫鎖的獲取處理流程以下:

1     獲取同步狀態,並從中分離出低16爲的寫鎖狀態
2     若是同步狀態不爲0,說明存在讀鎖或寫鎖
3     若是存在讀鎖(c !=0 && w == 0),則不能獲取寫鎖(保證寫對讀的可見性)
4     若是當前線程不是上次獲取寫鎖的線程,則不能獲取寫鎖(寫鎖爲獨佔鎖)
5     若是以上判斷均經過,則在低16爲寫鎖同步狀態上利用CAS進行修改(增長寫鎖同步狀態,實現可重入)
6     將當前線程設置爲寫鎖的獲取線程
 1 protected final boolean tryAcquire(int acquires) {
 2             Thread current = Thread.currentThread();
 3             int c = getState();
 4             int w = exclusiveCount(c);
 5             if (c != 0) {
 6                 if (w == 0 || current != getExclusiveOwnerThread())
 7                     return false;
 8                 if (w + exclusiveCount(acquires) > MAX_COUNT)
 9                     throw new Error("Maximum lock count exceeded");
10                 // Reentrant acquire
11                 setState(c + acquires);
12                 return true;
13             }
14             if (writerShouldBlock() ||
15                 !compareAndSetState(c, c + acquires))
16                 return false;
17             setExclusiveOwnerThread(current);
18             return true;
19 }

寫鎖的釋放過程與獨佔鎖基本相同:

 1 protected final boolean tryRelease(int releases) {
 2             if (!isHeldExclusively())
 3                 throw new IllegalMonitorStateException();
 4             int nextc = getState() - releases;
 5             boolean free = exclusiveCount(nextc) == 0;
 6             if (free)
 7                 setExclusiveOwnerThread(null);
 8             setState(nextc);
 9             return free;
10 }

   在釋放的過程當中,不斷減小讀鎖同步狀態,當同步狀態爲0時,寫鎖徹底釋放。

2.6.二、讀鎖的獲取與釋放

    讀鎖是一個共享鎖,獲取讀鎖的步驟以下:

1     獲取當前同步狀態
2     計算高16爲讀鎖狀態+1後的值
3     若是大於可以獲取到的讀鎖的最大值,則拋出異常
4     若是存在寫鎖而且當前線程不是寫鎖的獲取者,則獲取讀鎖失敗
5     若是上述判斷都經過,則利用CAS從新設置讀鎖的同步狀態

   讀鎖的獲取步驟與寫鎖相似,即不斷的釋放寫鎖狀態,直到爲0時,表示沒有線程獲取讀鎖。

3、總結

    經過對java中的AQS鎖機制的剖析,咱們理解了獨佔和共享兩種基本的持有鎖的方式,而且分析了Mutex、可重入鎖、公平鎖、非公平鎖的實現,讀寫鎖等特殊的鎖。經過對這些所的理解,咱們更加深入地理解了AQS的本質,以及其中線程的狀態和活動軌跡。

參考文獻:http://www.javashuo.com/article/p-xcevmtwv-gz.html

          http://www.javashuo.com/article/p-ohfldhlj-dm.html

相關文章
相關標籤/搜索