原理剖析(第 005 篇)AQS工做原理分析

原理剖析(第 005 篇)AQS工做原理分析

-node

1、大體介紹

一、前面章節講解了一下CAS,簡單講就是cmpxchg+lock的原子操做;
二、而在談到併發操做裏面,咱們不得不談到AQS,JDK的源碼裏面好多併發的類都是經過Sync的內部類繼承AQS而實現出五花八門的功能;
三、本章節就和你們分享分析一下AQS的工做原理; 

2、簡單認識AQS

2.1 何爲AQS?

一、AQS是一個抽象類,類名爲AbstractQueuedSynchronizer,抽象的都是一些公用的方法屬性,其自身是沒有實現任何同步接口的;

二、AQS定義了同步器中獲取鎖和釋放鎖,目的來讓自定義同步器組件來使用或重寫;

三、縱觀AQS的子類,絕大多數都是一個叫Sync的靜態內部類來繼承AQS類,經過重寫AQS中的一些方法來實現自定義同步器;

四、AQS定義了兩種資源共享方式:EXCLUSIVE( 獨佔式:每次僅有一個Thread能執行 )、SHARED( 共享式:多個線程可同時執行 );

五、AQS維護了一個FIFO的CLH鏈表隊列,且該隊列不支持基於優先級的同步策略;

2.2 AQS的state關鍵詞

一、private volatile int state:維護了一個volatile的int類型的state字段,該字段是實現AQS的核心關鍵詞; 

二、經過getState、setState、compareAndSetState方法類獲取、設置更新state值;

三、該字段在不一樣的併發類中起着不一樣的紐帶做用,下面會接着講到state字段的一些應用場景;

2.3 Node的waitStatus關鍵詞

一、正常默認的狀態值爲0;

二、對於釋放操做的時候,前一個結點有喚醒後一個結點的任務;

三、當前結點的前置結點waitStatus > 0,則結點處於CANCELLED狀態,應該須要踢出隊列;

四、當前結點的前置結點waitStatus = 0,則須要將前置結點改成SIGNAL狀態;

2.4 CLH隊列

一、隊列模型:
      +------+  prev +------+  prev +------+
      |      | <---- |      | <---- |      |  
 head | Node |  next | Node |  next | Node |  tail
      |      | ----> |      | ----> |      |  
      +------+       +------+       +------+

二、鏈表結構,在頭尾結點中,須要特別指出的是頭結點是一個空對象結點,無任何意義,即傀儡結點;
      
三、每個Node結點都維護了一個指向前驅的指針和指向後驅的指針,結點與結點之間相互關聯構成鏈表;

四、入隊在尾,出隊在頭,出隊後須要激活該出隊結點的後繼結點,若後繼結點爲空或後繼結點waitStatus>0,則從隊尾向前遍歷取waitStatus<0的觸發阻塞喚醒;

2.5 state在AQS簡單應用舉例

一、CountDownLatch,簡單大體意思爲:A組線程等待另外B組線程,B組線程執行完了,A組線程才能夠執行;
   state初始化假設爲N,後續每countDown()一次,state會CAS減1。
   等到全部子線程都執行完後(即state=0),會unpark()主調用線程,而後主調用線程就會從await()函數返回,繼續後餘動做。

二、ReentrantLock,簡單大體意思爲:獨佔式鎖的類;
   state初始化爲0,表示未鎖定狀態,而後每lock()時調用tryAcquire()使state加1,
   其餘線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)爲止,其它線程纔有機會獲取該鎖;

三、Semaphore,簡單大體意思爲:A、B、C、D線程同時爭搶資源,目前卡槽大小爲2,若A、B正在執行且未執行完,那麼C、D線程在門外等着,一旦A、B有1個執行完了,那麼C、D就會競爭看誰先執行;
   state初始值假設爲N,後續每tryAcquire()一次,state會CAS減1,當state爲0時其它線程處於等待狀態,
   直到state>0且<N後,進程又能夠獲取到鎖進行各自操做了;

2.6 經常使用重要的方法

一、protected boolean isHeldExclusively()
   // 須要被子類實現的方法,調用該方法的線程是否持有獨佔鎖,通常用到了condition的時候才須要實現此方法

二、protected boolean tryAcquire(int arg)
   // 須要被子類實現的方法,獨佔方式嘗試獲取鎖,獲取鎖成功後返回true,獲取鎖失敗後返回false

三、protected boolean tryRelease(int arg)  
   // 須要被子類實現的方法,獨佔方式嘗試釋放鎖,釋放鎖成功後返回true,釋放鎖失敗後返回false
   
四、protected int tryAcquireShared(int arg)  
   // 須要被子類實現的方法,共享方式嘗試獲取鎖,獲取鎖成功後返回正數1,獲取鎖失敗後返回負數-1
   
五、protected boolean tryReleaseShared(int arg)   
   // 須要被子類實現的方法,共享方式嘗試釋放鎖,釋放鎖成功後返回正數1,釋放鎖失敗後返回負數-1
   
六、final boolean acquireQueued(final Node node, int arg)
   // 對於進入隊尾的結點,檢測本身能夠休息了,若是能夠修改則進入SIGNAL狀態且進入park()阻塞狀態

七、private Node addWaiter(Node mode)
   // 添加結點到鏈表隊尾

八、private Node enq(final Node node)
   // 若是addWaiter嘗試添加隊尾失敗,則再次調用enq此方法自旋將結點加入隊尾

九、private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)
   // 檢測結點狀態,若是能夠休息的話則設置waitStatus=SIGNAL並調用LockSupport.park休息;

十、private void unparkSuccessor(Node node)   
   // 釋放鎖時,該方法須要負責喚醒後繼節點

2.7 設計與實現僞代碼

一、獲取獨佔鎖:
    public final void acquire(int arg) {
        if (!tryAcquire(arg) && 
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    acquire{
        若是嘗試獲取獨佔鎖失敗的話( 嘗試獲取獨佔鎖的各類方式由AQS的子類實現 ),
        那麼就新增獨佔鎖結點經過自旋操做加入到隊列中,而且根據結點中的waitStatus來決定是否調用LockSupport.park進行休息
    }
    
    
二、釋放獨佔鎖:
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    release{
        若是嘗試釋放獨佔鎖成功的話( 嘗試釋放獨佔鎖的各類方式由AQS的子類實現 ),
        那麼取出頭結點並根據結點waitStatus來決定是否有義務喚醒其後繼結點
    }

三、獲取共享鎖:
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    
    acquireShared{
        若是嘗試獲取共享鎖失敗的話( 嘗試獲取共享鎖的各類方式由AQS的子類實現 ),
        那麼新增共享鎖結點經過自旋操做加入到隊尾中,而且根據結點中的waitStatus來決定是否調用LockSupport.park進行休息
    }

四、釋放共享鎖:
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    releaseShared{
        若是嘗試釋放共享鎖失敗的話( 嘗試釋放共享鎖的各類方式由AQS的子類實現 ),
        那麼經過自旋操做喚完成阻塞線程的喚起操做
    }

3、舉例ReentrantLock

3.一、ReentrantLock

一、在分析AQS源碼前,咱們須要依賴一個載體來講,畢竟AQS的一些方法都是空方法且拋異常的,因此單講AQS不太生動形象;

二、所以咱們決定採用ReentrantLock來說解,其餘都大體差很少,由於瞭解了一個,其餘均可以依葫蘆畫瓢秒懂;

3.二、ReentrantLock生活細節化理解

好比咱們每天在外面吃快餐,我就以吃快餐爲例生活化闡述該ReentrantLock原理:

一、場景:餐廳只有一個排隊的走廊,只有一個打飯菜的師傅;

二、開飯時間點,你們都爭先恐後的去吃飯,所以排上了隊,挨個挨個排隊打飯菜,任何一我的只要排到了打飯師傅的前面,均可以打到飯菜;

三、可是有時候隊很長,有些人之間的關係是家眷關係,若是後來的人看到本身家眷正在打飯菜,這個時候能夠不用排隊直接跑到前面打飯菜;

四、總之你們都挨個挨個排隊打飯,有家眷關係的直接跑到前面打飯菜;

五、到此打止,一、二、三、4能夠認爲是一種公平方式的獨佔鎖,3能夠理解爲重入鎖;

五、可是呢,還有那麼些緊急趕時間的人,並且又跟排隊的人沒半點瓜葛,來餐廳時恰好看到師傅剛剛打完一我的的飯菜,因而插入去打飯菜敢時間;

六、若是敢時間人的來的時候發現師傅還在打飯菜,那麼就只得乖乖的排隊等候打飯菜咯;

七、到此打止,一、二、五、6能夠認爲是一種非公平方式的獨佔鎖;

4、源碼分析ReentrantLock

4.一、ReentrantLock構造器

一、構造器源碼:
    /**
     * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    /**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    
二、默認構造方法爲非公平鎖,帶參構造方法還可經過傳入變量還決定調用方是使用公平鎖仍是非公平鎖;

4.二、Sync同步器

一、AQS --> Sync ---> FairSync // 公平鎖
                  |
                  |> NonfairSync // 非公平鎖
                  
二、ReentrantLock內的同步器都是經過Sync抽象接口來操做調用關係的,細看會發現基本上都是經過sync.xxx之類的這種調用方式的;

4.三、lock()

一、源碼:
    public void lock() {
        sync.lock();
    }
    
    // FairSync 公平鎖調用方式
    final void lock() {
        acquire(1); // 嘗試獲取獨佔鎖
    }    
    
    // NonfairSync 非公平鎖調用方式
    final void lock() {
        if (compareAndSetState(0, 1)) // 首先判斷state資源是否爲0,若是恰巧爲0則代表目前沒有線程佔用鎖,則利用CAS佔有鎖
            setExclusiveOwnerThread(Thread.currentThread()); // 當獨佔鎖以後則將設置exclusiveOwnerThread爲當前線程
        else
            acquire(1); // 若CAS佔用鎖失敗的話,則再嘗試獲取獨佔鎖
    }
    
二、這裏的區別就是非公平鎖在調用lock時首先檢測了是否經過CAS獲取鎖,發現鎖一旦空着的話,則搶先一步佔爲己有,
   無論有沒有阻塞隊列,只要當前線程來的時候發現state資源沒被佔用那麼當前線程就搶先一步試一下CAS,CAS失敗了它纔去排隊;

4.四、acquire(int)

一、源碼:
    public final void acquire(int arg) {
        if (!tryAcquire(arg) && // 嘗試獲取鎖資源,若獲取到資源的話則線程直接返回,此方法由AQS的具體子類實現
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 不然獲取資源失敗的話,那麼就進入等待隊列
            selfInterrupt();
    }
    
二、該方法是獨佔模式下線程獲取state共享資源的入口,若是獲取到資源的話就返回,不然建立獨佔模式結點加入阻塞隊列,直到獲取到共享資源;

三、並且這裏須要加上自我中斷判斷,主要是由於線程在等待過程當中被中斷的話,它是不響應的,那麼就只有等到線程獲取到資源後經過自我判斷將這個判斷後續補上;

四、獨佔模式的該方法,正常狀況下只要沒有獲取到鎖,該方法一直處於阻塞狀態,獲取到了則跳出該方法區;

4.五、tryAcquire(int)

一、公平鎖tryAcquire源碼:
    // FairSync 公平鎖的 tryAcquire 方法
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState(); // 獲取鎖資源的最新內存值
        if (c == 0) { // 當state=0,說明鎖資源目前尚未被任何線程被佔用
            if (!hasQueuedPredecessors() && // 檢查線程是否有阻塞隊列
                compareAndSetState(0, acquires)) { // 若是沒有阻塞隊列,則經過CAS操做獲取鎖資源
                setExclusiveOwnerThread(current); // 沒有阻塞隊列,且CAS又成功獲取鎖資源,則設置獨佔線程對象爲當前線程
                return true; // 返回標誌,告訴上層該線程已經獲取到了鎖資源
            }
        }
        // 執行到此,鎖資源值不爲0,說明已經有線程正在佔用這鎖資源
        else if (current == getExclusiveOwnerThread()) { // 既然鎖已經被佔用,則看看佔用鎖的線程是否是當前線程
            int nextc = c + acquires; // 若是佔用的鎖的線程是當前線程的話,則爲重入鎖概念,狀態值作加1操做
            // int類型值小於0,是由於該int類型的state狀態值溢出了,溢出了的話那得說明這個鎖有多難獲取啊,可能出問題了
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true; // 返回成功標誌,告訴上層該線程已經獲取到了鎖資源
        }
        return false; // 返回失敗標誌,告訴上層該線程沒有獲取到鎖資源
    }

二、非公平鎖tryAcquire源碼:
    // NonfairSync 非公平鎖的 tryAcquire 方法
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires); // 調用父類的非公平獲取鎖資源方法
    }    

    // NonfairSync 非公平鎖父類 Sync 類的 nonfairTryAcquire 方法    
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState(); // 獲取鎖資源的最新內存值
        if (c == 0) { // 當state=0,說明鎖資源目前尚未被任何線程被佔用
            if (compareAndSetState(0, acquires)) { // 先無論三七二十一,先嚐試經過CAS操做獲取鎖資源
                setExclusiveOwnerThread(current); // CAS一旦成功獲取鎖資源,則設置獨佔線程對象爲當前線程
                return true;// 返回成功標誌,告訴上層該線程已經獲取到了鎖資源
            }
        }
        // 執行到此,鎖資源值不爲0,說明已經有線程正在佔用這鎖資源
        else if (current == getExclusiveOwnerThread()) { // 既然鎖已經被佔用,則看看佔用鎖的線程是否是當前線程
            int nextc = c + acquires; // 若是佔用的鎖的線程是當前線程的話,則爲重入鎖概念,狀態值作加1操做
            // int類型值小於0,是由於該int類型的state狀態值溢出了,溢出了的話那得說明這個鎖有多難獲取啊,可能出問題了
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc); // 
            return true; // 返回成功標誌,告訴上層該線程已經獲取到了鎖資源
        }
        return false; // 返回失敗標誌,告訴上層該線程沒有獲取到鎖資源
    }    

三、tryAcquire方法是AQS的子類實現的,也就是ReentrantLock的兩個靜態內部類實現的,目的就是經過CAS嘗試獲取鎖資源,
   獲取鎖資源成功則返回true,獲取鎖資源失敗則返回false;

4.六、addWaiter(Node)

一、源碼:
    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        // 按照給定的mode模式建立新的結點,模式有兩種:Node.EXCLUSIVE獨佔模式、Node.SHARED共享模式;
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail; // 將先隊尾結點賦值給臨時變量
        if (pred != null) { // 若是pred不爲空,說明該隊列已經有結點了
            node.prev = pred;
            if (compareAndSetTail(pred, node)) { // 經過CAS嘗試將node結點設置爲隊尾結點
                pred.next = node;
                return node;
            }
        }
        // 執行到此,說明隊尾沒有元素,則進入自旋首先設置頭結點,而後將此新建結點添加到隊尾
        enq(node); // 進入自旋添加node結點
        return node;
    }
    
二、    addWaiter經過傳入不一樣的模式來建立新的結點嘗試加入到隊列尾部,若是因爲併發致使添加結點到隊尾失敗的話那麼就進入自旋將結點加入隊尾;

4.七、enq(Node)

一、源碼:
    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) { // 自旋的死循環操做方式
            Node t = tail;
            // 由於是自旋方式,首次鏈表隊列tail確定爲空,可是後續鏈表有數據後就不會爲空了
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node())) // 隊列爲空時,則建立一個空對象結點做爲頭結點,無心思,可認爲傀儡結點
                    tail = head; // 空隊列的話,頭尾都指向同一個對象
            } else {
                // 進入 else 方法裏面,說明鏈表隊列已經有結點了
                node.prev = t;
                // 由於存在併發操做,經過CAS嘗試將新加入的node結點設置爲隊尾結點
                if (compareAndSetTail(t, node)) { 
                    // 若是node設置隊尾結點成功,則將以前的舊的對象尾結點t的後繼結點指向node,node的前驅結點也設置爲t
                    t.next = node;
                    return t;
                }
            }
            
            // 若是執行到這裏,說明上述兩個CAS操做任何一個失敗的話,該方法是不會放棄的,由於是自旋操做,再次循環繼續入隊
        }
    }

二、enq經過自旋這種死循環的操做方式,來確保結點正確的添加到隊列尾部,經過CAS操做若是頭部爲空則添加傀儡空結點,而後在循環添加隊尾結點;

4.八、compareAndSetHead/compareAndSetTail

一、源碼:
    /**
     * CAS head field. Used only by enq.
     */
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }
    
    /**
     * CAS tail field. Used only by enq.
     */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }    

二、CAS操做,設置頭結點、尾結點;

4.九、acquireQueued(Node, int)

一、源碼:
    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) { // 自旋的死循環操做方式
                final Node p = node.predecessor(); // 若是新建結點的前驅結點是頭結點
                // 若是前驅結點爲頭結點,那麼該結點則是老二,僅次於老大,也但願嘗試去獲取一下鎖,萬一頭結點恰巧剛剛釋放呢?但願仍是要有的,萬一實現了呢。。。
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    // 拿到鎖資源後,則該node結點升級作頭結點,且設置後繼結點指針爲空,便於GC回收
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && // 根據前驅結點看看是否須要休息一下子
                    parkAndCheckInterrupt()) // 阻塞操做,正常狀況下,獲取不到鎖,代碼就在該方法中止了,直到被喚醒
                    interrupted = true;
                    
                // 若是執行到這裏,說明嘗試休息失敗了,由於是自旋操做,因此還會再次循環繼續操做判斷
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

二、acquireQueued也是採用一個自旋的死循環操做方式,只有頭結點才能嘗試獲取鎖資源,其他的結點挨個挨個在那裏等待修改,等待被喚醒,等待機會成爲頭結點;
   而新添加的node結點也天然逃不過如此命運,先看看是否頭結點,而後再看看是否能休息;

4.十、shouldParkAfterFailedAcquire(Node, Node)

一、源碼:
    /**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus; // 獲取前驅結點的狀態值
        if (ws == Node.SIGNAL) // 若前驅結點的狀態爲SIGNAL狀態的話,那麼該結點就不要想事了,直接返回true準備休息
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            // 若前驅結點的狀態爲CANCELLED狀態的話,那麼就一直向前遍歷,直到找到一個不爲CANCELLED狀態的結點
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
             // 剩下的結點狀態,則設置其爲SIGNAL狀態,而後返回false標誌等外層循環再次判斷
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

二、shouldParkAfterFailedAcquire主要是檢測前驅結點狀態,前驅結點爲SIGNAL的話,則新結點能夠安安心心休息了;
   若是前驅結點大於零,說明前驅結點處於CANCELLED狀態,那麼則以入參pred前驅爲起點,一直往前找,直到找到最近一個正常等待狀態的結點;
   若是前驅結點小於零,那麼就將前驅結點設置爲SIGNAL狀態,而後返回false依賴acquireQueued的自旋再次判斷是否須要進行休息;

4.十一、parkAndCheckInterrupt()

一、源碼:
    /**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this); // 阻塞等待
        return Thread.interrupted(); // 被喚醒後查看是否有被中斷過否?
    }

二、parkAndCheckInterrupt首先調用park讓線程進入等待狀態,而後當park阻塞被喚醒後,再次檢測是否曾經被中斷過;
   而被喚醒有兩種狀況,一個是利用unpark喚醒,一個是利用interrupt喚醒;

4.十二、unlock()

一、源碼:
    public void unlock() {
        sync.release(1); // 
    }

二、unlock釋放鎖資源,通常都是在finally中被調用,防止當臨界區由於任何異常時怕鎖不被釋放;
   而釋放鎖不像獲取鎖lock的實現多色多樣,沒有所謂公平或不公平,就是規規矩矩的釋放資源而已;

4.1三、release(int)

一、源碼:
    /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) { // 嘗試釋放鎖資源,此方法由AQS的具體子類實現
            Node h = head;
            if (h != null && h.waitStatus != 0) // 從頭結點開始,喚醒後繼結點
                unparkSuccessor(h); // 踢出CANCELLED狀態結點,而後喚醒後繼結點
            return true;
        }
        return false;
    }

二、release嘗試釋放鎖,而且有義務移除CANCELLED狀態的結點,還有義務喚醒後繼結點繼續運行獲取鎖資源;

4.1四、tryRelease(int)

一、源碼:
    // NonfairSync 和 FairSync 的父類 Sync 類的 tryRelease 方法    
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases; // 獲取鎖資源值並作減1操做
        if (Thread.currentThread() != getExclusiveOwnerThread()) // 查看當前線程是否和持有鎖的線程是否是同一個線程
            // 正常狀況下,須要釋放的線程確定是持有鎖的線程,不然不就亂套了,確定哪裏出問題了,因此拋出異常
            throw new IllegalMonitorStateException(); 
        boolean free = false;
        if (c == 0) { // 若此時鎖資源值作減法操做後正好是0,則全部鎖資源已經釋放乾淨,所以持有鎖的變量也置爲空
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c); // 若此時作減法操做尚未歸零,那麼這種狀況就是那種重入鎖,須要重重釋放後才行
        return free;
    }

二、tryRelease主要經過CAS操做對state鎖資源進行減1操做;

4.1五、unparkSuccessor(Node)

一、源碼:
    /**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        // 該node通常都是傳入head進來,也就是說,須要釋放頭結點,也就是當前結點須要釋放鎖操做,順便喚醒後繼結點
        int ws = node.waitStatus;
        if (ws < 0) // 若結點狀態值小於0,則歸零處理,經過CAS歸零,容許失敗,可是無論怎麼着,仍然要往下走去喚醒後繼結點
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next; // 取出後繼結點,這個時候通常都是Head後面的一個結點,因此通常都是老二
        if (s == null || s.waitStatus > 0) { // 若後繼結點爲空或者後繼結點已經處於CANCELLED狀態的話
            s = null;
            // 那麼從隊尾向前遍歷,直到找到一個小於等於0的結點
            // 這裏爲何要從隊尾向前尋找?
            // * 由於在這個隊列中,任何一個結點都有可能被中斷,只是有可能,並不表明絕對的,但有一點是肯定的,
            // * 被中斷的結點會將結點的狀態設置爲CANCELLED狀態,標識這個結點在未來的某個時刻會被踢出;
            // * 踢出隊列的規則很簡單,就是該結點的前驅結點不會指向它,而是會指向它的後面的一個非CANCELLED狀態的結點;
            // * 而這個將被踢出的結點,它的next指針將會指向它本身;
            // * 因此設想一下,若是咱們從head日後找,一旦發現這麼一個處於CANCELLED狀態的結點,那麼for循環豈不是就是死循環了;
            // * 可是全部的這些結點當中,它們的prev前驅結點仍是沒有被誰動過,因此從tail結點向前遍歷最穩妥

            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread); // 喚醒線程
    }

二、unparkSuccessor主要是踢出CANCELLED狀態結點,而後喚醒後繼結點;
   可是這個喚醒的後繼結點爲空的話,那麼則從隊尾一直向前循環查找小於等於零狀態的結點並調用unpark喚醒;

5、總結

一、分析了這麼多,感受是否是有一種豁然開朗的感受,原來你們傳的神乎其神的AQS是否是沒有想象中那麼難以理解;

二、在這裏我簡要總結一下AQS的流程的一些特性:
    • 關鍵獲取鎖、釋放鎖操做由AQS子類實現:acquire-release、acquireShared-releaseShared;
    • 維護了一個FIFO鏈表結構的隊列,經過自旋方式將新結點添加到隊尾;
    • 添加結點時會從前驅結點向前遍歷,跳過那些處於CANCELLED狀態的結點;
    • 釋放結點時會從隊尾向前遍歷,踢出CANCELLED狀態的結點,而後喚醒後繼結點;

三、其實當了解了AQS後,這裏以ReentrantLock爲載體分析了一下,那麼再去分析CountDownLatch、Semaphore、ReentrantReadWriteLock等那些集成AQS而實現不一樣功能的模塊就會順利不少;

6、下載地址

https://gitee.com/ylimhhmily/SpringCloudTutorial.gitgit

SpringCloudTutorial交流QQ羣: 235322432微信

SpringCloudTutorial交流微信羣: 微信溝通羣二維碼圖片連接併發

歡迎關注,您的確定是對我最大的支持!!!app

相關文章
相關標籤/搜索