Java併發包基石-AQS詳解

目錄java

    1 基本實現原理node

      1.1 如何使用設計模式

       1.2 設計思想安全

    2 自定義同步器多線程

      2.1 同步器代碼實現併發

       2.2 同步器代碼測試框架

    3 源碼分析ide

      3.1 Node結點工具

       3.2 獨佔式源碼分析

       3.3 共享式

    4 總結 

 Java併發包(JUC)中提供了不少併發工具,這其中,不少咱們耳熟能詳的併發工具,譬如ReentrangLock、Semaphore,它們的實現都用到了一個共同的基類--AbstractQueuedSynchronizer,簡稱AQS。AQS是一個用來構建鎖和同步器的框架,使用AQS能簡單且高效地構造出應用普遍的大量的同步器,好比咱們提到的ReentrantLock,Semaphore,其餘的諸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基於AQS的。固然,咱們本身也能利用AQS很是輕鬆容易地構造出符合咱們本身需求的同步器。

  本章咱們就一塊兒探究下這個神奇的東東,並對其實現原理進行剖析理解

基本實現原理

  AQS使用一個int成員變量來表示同步狀態,經過內置的FIFO隊列來完成獲取資源線程的排隊工做。

    private volatile int state;//共享變量,使用volatile修飾保證線程可見性

狀態信息經過procted類型的getStatesetStatecompareAndSetState進行操做

AQS支持兩種同步方式:

  1.獨佔式

  2.共享式

  這樣方便使用者實現不一樣類型的同步組件,獨佔式如ReentrantLock,共享式如Semaphore,CountDownLatch,組合式的如ReentrantReadWriteLock。總之,AQS爲使用提供了底層支撐,如何組裝實現,使用者能夠自由發揮。

同步器的設計是基於模板方法模式的,通常的使用方式是這樣:

  1.使用者繼承AbstractQueuedSynchronizer並重寫指定的方法。(這些重寫方法很簡單,無非是對於共享資源state的獲取和釋放)

  2.將AQS組合在自定義同步組件的實現中,並調用其模板方法,而這些模板方法會調用使用者重寫的方法。

這實際上是模板方法模式的一個很經典的應用。

咱們來看看AQS定義的這些可重寫的方法:

    protected boolean tryAcquire(int arg) : 獨佔式獲取同步狀態,試着獲取,成功返回true,反之爲false

    protected boolean tryRelease(int arg) :獨佔式釋放同步狀態,等待中的其餘線程此時將有機會獲取到同步狀態;

    protected int tryAcquireShared(int arg) :共享式獲取同步狀態,返回值大於等於0,表明獲取成功;反之獲取失敗;

    protected boolean tryReleaseShared(int arg) :共享式釋放同步狀態,成功爲true,失敗爲false

    protected boolean isHeldExclusively() : 是否在獨佔模式下被線程佔用。

關於AQS的使用,咱們來簡單總結一下:

  如何使用

  首先,咱們須要去繼承AbstractQueuedSynchronizer這個類,而後咱們根據咱們的需求去重寫相應的方法,好比要實現一個獨佔鎖,那就去重寫tryAcquire,tryRelease方法,要實現共享鎖,就去重寫tryAcquireShared,tryReleaseShared;最後,在咱們的組件中調用AQS中的模板方法就能夠了,而這些模板方法是會調用到咱們以前重寫的那些方法的。也就是說,咱們只須要很小的工做量就能夠實現本身的同步組件,重寫的那些方法,僅僅是一些簡單的對於共享資源state的獲取和釋放操做,至於像是獲取資源失敗,線程須要阻塞之類的操做,天然是AQS幫咱們完成了。

  設計思想

  對於使用者來說,咱們無需關心獲取資源失敗,線程排隊,線程阻塞/喚醒等一系列複雜的實現,這些都在AQS中爲咱們處理好了。咱們只須要負責好本身的那個環節就好,也就是獲取/釋放共享資源state的姿式T_T。很經典的模板方法設計模式的應用,AQS爲咱們定義好頂級邏輯的骨架,並提取出公用的線程入隊列/出隊列,阻塞/喚醒等一系列複雜邏輯的實現,將部分簡單的可由使用者決定的操做邏輯延遲到子類中去實現便可。

自定義同步器

  同步器代碼實現

上面大概講了一些關於AQS如何使用的理論性的東西,接下來,咱們就來看下實際如何使用,直接採用JDK官方文檔中的小例子來講明問題

 1 package juc;  2 
 3 import java.util.concurrent.locks.AbstractQueuedSynchronizer;  4 
 5 /**
 6  * Created by chengxiao on 2017/3/28.  7  */
 8 public class Mutex implements java.io.Serializable {  9     //靜態內部類,繼承AQS
10     private static class Sync extends AbstractQueuedSynchronizer { 11         //是否處於佔用狀態
12         protected boolean isHeldExclusively() { 13             return getState() == 1; 14  } 15         //當狀態爲0的時候獲取鎖,CAS操做成功,則state狀態爲1,
16         public boolean tryAcquire(int acquires) { 17             if (compareAndSetState(0, 1)) { 18  setExclusiveOwnerThread(Thread.currentThread()); 19                 return true; 20  } 21             return false; 22  } 23         //釋放鎖,將同步狀態置爲0
24         protected boolean tryRelease(int releases) { 25             if (getState() == 0) throw new IllegalMonitorStateException(); 26             setExclusiveOwnerThread(null); 27             setState(0); 28             return true; 29  } 30  } 31         //同步對象完成一系列複雜的操做,咱們僅需指向它便可
32         private final Sync sync = new Sync(); 33         //加鎖操做,代理到acquire(模板方法)上就行,acquire會調用咱們重寫的tryAcquire方法
34         public void lock() { 35             sync.acquire(1); 36  } 37         public boolean tryLock() { 38             return sync.tryAcquire(1); 39  } 40         //釋放鎖,代理到release(模板方法)上就行,release會調用咱們重寫的tryRelease方法。
41         public void unlock() { 42             sync.release(1); 43  } 44         public boolean isLocked() { 45             return sync.isHeldExclusively(); 46  } 47 }

  同步器代碼測試

測試下這個自定義的同步器,咱們使用以前文章中作過的併發環境下a++的例子來講明問題(a++的原子性其實最好使用原子類AtomicInteger來解決,此處用Mutex有點大炮打蚊子的意味,好在能說明問題就好)

 1 package juc;
 2 
 3 import java.util.concurrent.CyclicBarrier;
 4 
 5 /**
 6  * Created by chengxiao on 2017/7/16.
 7  */
 8 public class TestMutex {
 9     private static CyclicBarrier barrier = new CyclicBarrier(31);
10     private static int a = 0;
11     private static  Mutex mutex = new Mutex();
12 
13     public static void main(String []args) throws Exception {
14         //說明:咱們啓用30個線程,每一個線程對i自加10000次,同步正常的話,最終結果應爲300000;
15         //未加鎖前
16         for(int i=0;i<30;i++){
17             Thread t = new Thread(new Runnable() {
18                 @Override
19                 public void run() {
20                     for(int i=0;i<10000;i++){
21                         increment1();//沒有同步措施的a++;
22                     }
23                     try {
24                         barrier.await();//等30個線程累加完畢
25                     } catch (Exception e) {
26                         e.printStackTrace();
27                     }
28                 }
29             });
30             t.start();
31         }
32         barrier.await();
33         System.out.println("加鎖前,a="+a);
34         //加鎖後
35         barrier.reset();//重置CyclicBarrier
36         a=0;
37         for(int i=0;i<30;i++){
38             new Thread(new Runnable() {
39                 @Override
40                 public void run() {
41                     for(int i=0;i<10000;i++){
42                         increment2();//a++採用Mutex進行同步處理
43                     }
44                     try {
45                         barrier.await();//等30個線程累加完畢
46                     } catch (Exception e) {
47                         e.printStackTrace();
48                     }
49                 }
50             }).start();
51         }
52         barrier.await();
53         System.out.println("加鎖後,a="+a);
54     }
55     /**
56      * 沒有同步措施的a++
57      * @return
58      */
59     public static void increment1(){
60         a++;
61     }
62     /**
63      * 使用自定義的Mutex進行同步處理的a++
64      */
65     public static void increment2(){
66         mutex.lock();
67         a++;
68         mutex.unlock();
69     }
70 }
TestMutex

測試結果:

加鎖前,a=279204 加鎖後,a=300000

源碼分析

   咱們先來簡單描述下AQS的基本實現,前面咱們提到過,AQS維護一個共享資源state,經過內置的FIFO來完成獲取資源線程的排隊工做。(這個內置的同步隊列稱爲"CLH"隊列)。該隊列由一個一個的Node結點組成,每一個Node結點維護一個prev引用和next引用,分別指向本身的前驅和後繼結點。AQS維護兩個指針,分別指向隊列頭部head和尾部tail。

  其實就是個雙端雙向鏈表

  當線程獲取資源失敗(好比tryAcquire時試圖設置state狀態失敗),會被構形成一個結點加入CLH隊列中,同時當前線程會被阻塞在隊列中(經過LockSupport.park實現,實際上是等待態)。當持有同步狀態的線程釋放同步狀態時,會喚醒後繼結點,而後此結點線程繼續加入到對同步狀態的爭奪中。

  Node結點

  Node結點是AbstractQueuedSynchronizer中的一個靜態內部類,咱們撿Node的幾個重要屬性來講一下

 1 static final class Node {  2         /** waitStatus值,表示線程已被取消(等待超時或者被中斷)*/
 3         static final int CANCELLED =  1;  4         /** waitStatus值,表示後繼線程須要被喚醒(unpaking)*/
 5         static final int SIGNAL    = -1;  6         /**waitStatus值,表示結點線程等待在condition上,當被signal後,會從等待隊列轉移到同步到隊列中 */
 7         /** waitStatus value to indicate thread is waiting on condition */
 8         static final int CONDITION = -2;  9        /** waitStatus值,表示下一次共享式同步狀態會被無條件地傳播下去 10  static final int PROPAGATE = -3; 11  /** 等待狀態,初始爲0 */
12         volatile int waitStatus; 13         /**當前結點的前驅結點 */
14         volatile Node prev; 15         /** 當前結點的後繼結點 */
16         volatile Node next; 17         /** 與當前結點關聯的排隊中的線程 */
18         volatile Thread thread; 19         /** ...... */
20     }  

獨佔式

  獲取同步狀態--acquire()

  來看看acquire方法,lock方法通常會直接代理到acquire上

1  public final void acquire(int arg) { 2         if (!tryAcquire(arg) &&
3  acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 4  selfInterrupt(); 5     }

  咱們來簡單理一下代碼邏輯:

    a.首先,調用使用者重寫的tryAcquire方法,若返回true,意味着獲取同步狀態成功,後面的邏輯再也不執行;若返回false,也就是獲取同步狀態失敗,進入b步驟;

    b.此時,獲取同步狀態失敗,構造獨佔式同步結點,經過addWatiter將此結點添加到同步隊列的尾部(此時可能會有多個線程結點試圖加入同步隊列尾部,須要以線程安全的方  式添加);

    c.該結點以在隊列中嘗試獲取同步狀態,若獲取不到,則阻塞結點線程,直到被前驅結點喚醒或者被中斷。

  addWaiter

    爲獲取同步狀態失敗的線程,構形成一個Node結點,添加到同步隊列尾部

 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode);//構造結點 //指向尾結點tail
        Node pred = tail; //若是尾結點不爲空,CAS快速嘗試在尾部添加,若CAS設置成功,返回;不然,eng。
        if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }

  先cas快速設置,若失敗,進入enq方法  

  將結點添加到同步隊列尾部這個操做,同時可能會有多個線程嘗試添加到尾部,是非線程安全的操做。

  以上代碼能夠看出,使用了compareAndSetTail這個cas操做保證安全添加尾結點。

  enq方法

 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { //若是隊列爲空,建立結點,同時被head和tail引用
                if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) {//cas設置尾結點,不成功就一直重試
                    t.next = node; return t; } } } }

  enq內部是個死循環,經過CAS設置尾結點,不成功就一直重試。很經典的CAS自旋的用法,咱們在以前關於原子類的源碼分析中也提到過。這是一種樂觀的併發策略

  最後,看下acquireQueued方法

  acquireQueued

final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) {//死循環
                final Node p = node.predecessor();//找到當前結點的前驅結點
                if (p == head && tryAcquire(arg)) {//若是前驅結點是頭結點,才tryAcquire,其餘結點是沒有機會tryAcquire的。
                    setHead(node);//獲取同步狀態成功,將當前結點設置爲頭結點。
                    p.next = null; // 方便GC
                    failed = false; return interrupted; } // 若是沒有獲取到同步狀態,經過shouldParkAfterFailedAcquire判斷是否應該阻塞,parkAndCheckInterrupt用來阻塞線程
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

  acquireQueued內部也是一個死循環,只有前驅結點是頭結點的結點,也就是老二結點,纔有機會去tryAcquire;若tryAcquire成功,表示獲取同步狀態成功,將此結點設置爲頭結點;如果非老二結點,或者tryAcquire失敗,則進入shouldParkAfterFailedAcquire去判斷判斷當前線程是否應該阻塞,若能夠,調用parkAndCheckInterrupt阻塞當前線程,直到被中斷或者被前驅結點喚醒。若還不能休息,繼續循環。

 shouldParkAfterFailedAcquire

shouldParkAfterFailedAcquire用來判斷當前結點線程是否能休息
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //獲取前驅結點的wait值 
        int ws = pred.waitStatus; if (ws == Node.SIGNAL)//若前驅結點的狀態是SIGNAL,意味着當前結點能夠被安全地park
            return true; if (ws > 0) { // ws>0,只有CANCEL狀態ws才大於0。若前驅結點處於CANCEL狀態,也就是此結點線程已經無效,從後往前遍歷,找到一個非CANCEL狀態的結點,將本身設置爲它的後繼結點
            do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 若前驅結點爲其餘狀態,將其設置爲SIGNAL狀態
 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }   

  若shouldParkAfterFailedAcquire返回true,也就是當前結點的前驅結點爲SIGNAL狀態,則意味着當前結點能夠放心休息,進入parking狀態了。parkAncCheckInterrupt阻塞線程並處理中斷。

private final boolean parkAndCheckInterrupt() { LockSupport.park(this);//使用LockSupport使線程進入阻塞狀態
        return Thread.interrupted();// 線程是否被中斷過
    }

  至此,關於acquire的方法源碼已經分析完畢,咱們來簡單總結下

    a.首先tryAcquire獲取同步狀態,成功則直接返回;不然,進入下一環節;

    b.線程獲取同步狀態失敗,就構造一個結點,加入同步隊列中,這個過程要保證線程安全;

    c.加入隊列中的結點線程進入自旋狀態,如果老二結點(即前驅結點爲頭結點),纔有機會嘗試去獲取同步狀態;不然,當其前驅結點的狀態爲SIGNAL,線程即可安心休息,進入阻塞狀態,直到被中斷或者被前驅結點喚醒。

  釋放同步狀態--release()

  當前線程執行完本身的邏輯以後,須要釋放同步狀態,來看看release方法的邏輯

 public final boolean release(int arg) { if (tryRelease(arg)) {//調用使用者重寫的tryRelease方法,若成功,喚醒其後繼結點,失敗則返回false
            Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h);//喚醒後繼結點
            return true; } return false; }
  unparkSuccessor:喚醒後繼結點 
 1 private void unparkSuccessor(Node node) {  2         //獲取wait狀態
 3         int ws = node.waitStatus;  4         if (ws < 0)  5             compareAndSetWaitStatus(node, ws, 0);// 將等待狀態waitStatus設置爲初始值0
 6         Node s = node.next;//後繼結點
 7         if (s == null || s.waitStatus > 0) {//若後繼結點爲空,或狀態爲CANCEL(已失效),則從後尾部往前遍歷找到一個處於正常阻塞狀態的結點     進行喚醒
 8             s = null;  9             for (Node t = tail; t != null && t != node; t = t.prev) 10                 if (t.waitStatus <= 0) 11                     s = t; 12  } 13         if (s != null) 14  LockSupport.unpark(s.thread);//使用LockSupprot喚醒結點對應的線程 15     }    

  release的同步狀態相對簡單,須要找到頭結點的後繼結點進行喚醒,若後繼結點爲空或處於CANCEL狀態,從後向前遍歷找尋一個正常的結點,喚醒其對應線程。

共享式

  共享式:共享式地獲取同步狀態。對於獨佔式同步組件來說,同一時刻只有一個線程能獲取到同步狀態,其餘線程都得去排隊等待,其待重寫的嘗試獲取同步狀態的方法tryAcquire返回值爲boolean,這很容易理解;對於共享式同步組件來說,同一時刻能夠有多個線程同時獲取到同步狀態,這也是「共享」的意義所在。其待重寫的嘗試獲取同步狀態的方法tryAcquireShared返回值爲int。

 protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }

 

  1.當返回值大於0時,表示獲取同步狀態成功,同時還有剩餘同步狀態可供其餘線程獲取;

  2.當返回值等於0時,表示獲取同步狀態成功,但沒有可用同步狀態了;

  3.當返回值小於0時,表示獲取同步狀態失敗。

  獲取同步狀態--acquireShared  

public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0)//返回值小於0,獲取同步狀態失敗,排隊去;獲取同步狀態成功,直接返回去幹本身的事兒。 doAcquireShared(arg); }

  doAcquireShared

 1  private void doAcquireShared(int arg) {  2         final Node node = addWaiter(Node.SHARED);//構造一個共享結點,添加到同步隊列尾部。若隊列初始爲空,先添加一個無心義的傀儡結點,再將新節點添加到隊列尾部。
 3         boolean failed = true;//是否獲取成功
 4         try {  5             boolean interrupted = false;//線程parking過程當中是否被中斷過
 6             for (;;) {//死循環
 7                 final Node p = node.predecessor();//找到前驅結點
 8                 if (p == head) {//頭結點持有同步狀態,只有前驅是頭結點,纔有機會嘗試獲取同步狀態
 9                     int r = tryAcquireShared(arg);//嘗試獲取同步裝填
10                     if (r >= 0) {//r>=0,獲取成功
11                         setHeadAndPropagate(node, r);//獲取成功就將當前結點設置爲頭結點,若還有可用資源,傳播下去,也就是繼續喚醒後繼結點
12                         p.next = null; // 方便GC
13                         if (interrupted) 14  selfInterrupt(); 15                         failed = false; 16                         return; 17  } 18  } 19                 if (shouldParkAfterFailedAcquire(p, node) &&//是否能安心進入parking狀態
20                     parkAndCheckInterrupt())//阻塞線程
21                     interrupted = true; 22  } 23         } finally { 24             if (failed) 25  cancelAcquire(node); 26  } 27     }

  大致邏輯與獨佔式的acquireQueued差距不大,只不過因爲是共享式,會有多個線程同時獲取到線程,也可能同時釋放線程,空出不少同步狀態,因此當排隊中的老二獲取到同步狀態,若是還有可用資源,會繼續傳播下去。

  setHeadAndPropagate

 private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below
 setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }

  釋放同步狀態--releaseShared

 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared();//釋放同步狀態 return true; } return false; }

  doReleaseShared

 private void doReleaseShared() {
        for (;;) {//死循環,共享模式,持有同步狀態的線程可能有多個,採用循環CAS保證線程安全
            Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue;          
                    unparkSuccessor(h);//喚醒後繼結點
 } else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue;                
 } if (h == head)              
                break; } }

  代碼邏輯比較容易理解,須要注意的是,共享模式,釋放同步狀態也是多線程的,此處採用了CAS自旋來保證。

總結

  關於AQS的介紹及源碼分析到此爲止了。

  AQS是JUC中不少同步組件的構建基礎,簡單來說,它內部實現主要是狀態變量state和一個FIFO隊列來完成,同步隊列的頭結點是當前獲取到同步狀態的結點,獲取同步狀態state失敗的線程,會被構形成一個結點(或共享式或獨佔式)加入到同步隊列尾部(採用自旋CAS來保證此操做的線程安全),隨後線程會阻塞;釋放時喚醒頭結點的後繼結點,使其加入對同步狀態的爭奪中。

  AQS爲咱們定義好了頂層的處理實現邏輯,咱們在使用AQS構建符合咱們需求的同步組件時,只需重寫tryAcquire,tryAcquireShared,tryRelease,tryReleaseShared幾個方法,來決定同步狀態的釋放和獲取便可,至於背後複雜的線程排隊,線程阻塞/喚醒,如何保證線程安全,都由AQS爲咱們完成了,這也是很是典型的模板方法的應用。AQS定義好頂級邏輯的骨架,並提取出公用的線程入隊列/出隊列,阻塞/喚醒等一系列複雜邏輯的實現,將部分簡單的可由使用者決定的操做邏輯延遲到子類中去實現。 

相關文章
相關標籤/搜索