Java併發-同步器AQS

什麼是AQS

aqs全稱爲AbstractQueuedSynchronizer,它提供了一個FIFO隊列,能夠當作是一個用來實現同步鎖以及其餘涉及到同步功能的核心組件,常見的有:ReentrantLock、CountDownLatch等。
AQS是一個抽象類,主要是經過繼承的方式來使用,它自己沒有實現任何的同步接口,僅僅是定義了同步狀態的獲取以及釋放的方法來提供自定義的同步組件。
能夠這麼說,只要搞懂了AQS,那麼J.U.C中絕大部分的api都能輕鬆掌握。html

AQS的兩種功能

從使用層面來講,AQS的功能分爲兩種:獨佔和共享java

  • 獨佔鎖,每次只能有一個線程持有鎖,好比前面給你們演示的ReentrantLock就是以獨佔方式實現的互斥鎖
  • 共享鎖,容許多個線程同時獲取鎖,併發訪問共享資源,好比ReentrantReadWriteLock

ReentrantLock的類圖

仍然以ReentrantLock爲例,來分析AQS在重入鎖中的使用。畢竟單純分析AQS沒有太多的含義。先理解這個類圖,能夠方便咱們理解AQS的原理
ReentrantLock的類圖node

AQS的內部實現

AQS的實現依賴內部的同步隊列,也就是FIFO的雙向隊列,若是當前線程競爭鎖失敗,那麼AQS會把當前線程以及等待狀態信息構形成一個Node加入到同步隊列中,同時再阻塞該線程。當獲取鎖的線程釋放鎖之後,會從隊列中喚醒一個阻塞的節點(線程)。
AQS同步隊列面試

AQS隊列內部維護的是一個FIFO的雙向鏈表,這種結構的特色是每一個數據結構都有兩個指針,分別指向直接的後繼節點和直接前驅節點。因此雙向鏈表能夠從任意一個節點開始很方便的訪問前驅和後繼。每一個Node實際上是由線程封裝,當線程爭搶鎖失敗後會封裝成Node加入到ASQ隊列中去編程

釋放鎖以及添加線程對於隊列的變化

添加節點

當出現鎖競爭以及釋放鎖的時候,AQS同步隊列中的節點會發生變化,首先看一下添加節點的場景。
節點添加到同步隊列
這裏會涉及到兩個變化segmentfault

  • 新的線程封裝成Node節點追加到同步隊列中,設置prev節點以及修改當前節點的前置節點的next節點指向本身
  • 經過CAS講tail從新指向新的尾部節點

釋放鎖移除節點

head節點表示獲取鎖成功的節點,當頭結點在釋放同步狀態時,會喚醒後繼節點,若是後繼節點得到鎖成功,會把本身設置爲頭結點,節點的變化過程以下
移除節點的變化
這個過程也是涉及到兩個變化api

  • 修改head節點指向下一個得到鎖的節點
  • 新的得到鎖的節點,將prev的指針指向null

這裏有一個小的變化,就是設置head節點不須要用CAS,緣由是設置head節點是由得到鎖的線程來完成的,而同步鎖只能由一個線程得到,因此不須要CAS保證,只須要把head節點設置爲原首節點的後繼節點,而且斷開原head節點的next引用便可數據結構

state在AQS簡單應用舉例

一、CountDownLatch,簡單大體意思爲:A組線程等待另外B組線程,B組線程執行完了,A組線程才能夠執行;
   state初始化假設爲N,後續每countDown()一次,state會CAS減1。
   等到全部子線程都執行完後(即state=0),會unpark()主調用線程,而後主調用線程就會從await()函數返回,繼續後餘動做。

二、ReentrantLock,簡單大體意思爲:獨佔式鎖的類;
   state初始化爲0,表示未鎖定狀態,而後每lock()時調用tryAcquire()使state加1,
   其餘線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)爲止,其它線程纔有機會獲取該鎖;

三、Semaphore,簡單大體意思爲:A、B、C、D線程同時爭搶資源,目前卡槽大小爲2,若A、B正在執行且未執行完,那麼C、D線程在門外等着,一旦A、B有1個執行完了,那麼C、D就會競爭看誰先執行;
   state初始值假設爲N,後續每tryAcquire()一次,state會CAS減1,當state爲0時其它線程處於等待狀態,
   直到state>0且<N後,進程又能夠獲取到鎖進行各自操做了;

經常使用重要的方法

一、protected boolean isHeldExclusively()
   // 須要被子類實現的方法,調用該方法的線程是否持有獨佔鎖,通常用到了condition的時候才須要實現此方法

二、protected boolean tryAcquire(int arg)
   // 須要被子類實現的方法,獨佔方式嘗試獲取鎖,獲取鎖成功後返回true,獲取鎖失敗後返回false

三、protected boolean tryRelease(int arg)  
   // 須要被子類實現的方法,獨佔方式嘗試釋放鎖,釋放鎖成功後返回true,釋放鎖失敗後返回false
   
四、protected int tryAcquireShared(int arg)  
   // 須要被子類實現的方法,共享方式嘗試獲取鎖,獲取鎖成功後返回正數1,獲取鎖失敗後返回負數-1
   
五、protected boolean tryReleaseShared(int arg)   
   // 須要被子類實現的方法,共享方式嘗試釋放鎖,釋放鎖成功後返回正數1,釋放鎖失敗後返回負數-1
   
六、final boolean acquireQueued(final Node node, int arg)
   // 對於進入隊尾的結點,檢測本身能夠休息了,若是能夠修改則進入SIGNAL狀態且進入park()阻塞狀態

七、private Node addWaiter(Node mode)
   // 添加結點到鏈表隊尾

八、private Node enq(final Node node)
   // 若是addWaiter嘗試添加隊尾失敗,則再次調用enq此方法自旋將結點加入隊尾

九、private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)
   // 檢測結點狀態,若是能夠休息的話則設置waitStatus=SIGNAL並調用LockSupport.park休息;

十、private void unparkSuccessor(Node node)   
   // 釋放鎖時,該方法須要負責喚醒後繼節點

設計與實現僞代碼

一、獲取獨佔鎖:
    public final void acquire(int arg) {
        if (!tryAcquire(arg) && 
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    acquire{
        若是嘗試獲取獨佔鎖失敗的話( 嘗試獲取獨佔鎖的各類方式由AQS的子類實現 ),
        那麼就新增獨佔鎖結點經過自旋操做加入到隊列中,而且根據結點中的waitStatus來決定是否調用LockSupport.park進行休息
    }
    
    
二、釋放獨佔鎖:
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    release{
        若是嘗試釋放獨佔鎖成功的話( 嘗試釋放獨佔鎖的各類方式由AQS的子類實現 ),
        那麼取出頭結點並根據結點waitStatus來決定是否有義務喚醒其後繼結點
    }

三、獲取共享鎖:
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    
    acquireShared{
        若是嘗試獲取共享鎖失敗的話( 嘗試獲取共享鎖的各類方式由AQS的子類實現 ),
        那麼新增共享鎖結點經過自旋操做加入到隊尾中,而且根據結點中的waitStatus來決定是否調用LockSupport.park進行休息
    }

四、釋放共享鎖:
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    releaseShared{
        若是嘗試釋放共享鎖失敗的話( 嘗試釋放共享鎖的各類方式由AQS的子類實現 ),
        那麼經過自旋操做喚完成阻塞線程的喚起操做
    }

前置源碼

AbstractQueuedSynchronizer

public abstract class AbstractQueuedSynchronizer

    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    private static final long serialVersionUID = 7373984972572414691L;

    /**
     * 同步隊列頭節點
     */
    private transient volatile Node head;


    /**
     * 同步隊列尾節點
     */
    private transient volatile Node tail;


    /**
     * 同步狀態
     */
    private volatile int state;
    
     /**
     * 狀態在內存中的偏移位置
     */
    private static final long stateOffset;
    /**
     * 同步隊列頭節點在內存中的偏移位置
     */
    private static final long headOffset;
    /**
     * 同步隊列尾節點在內存中的偏移位置
     */
    private static final long tailOffset;
    /**
     * 節點等待狀態在內存中的偏移位置
     */
    private static final long waitStatusOffset;
    /**
     * 節點next節點在內存中的偏移位置
     */
    private static final long nextOffset;

    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));

        } catch (Exception ex) { throw new Error(ex); }
    }

    /**
     * 使用CAS 初始化同步對了頭節點
     */
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }

    /**
     * 使用CAS,更新同步隊列的頭節點
     */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

    /**
     * 使用CAS 更新節點中等待狀態waitStatus
     */
    private static final boolean compareAndSetWaitStatus(Node node,
                                                         int expect,
                                                         int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                        expect, update);
    }

    /**
     * 使用CAS 更新節點中next
     */
    private static final boolean compareAndSetNext(Node node,
                                                   Node expect,
                                                   Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }
    
    ....省略

AQS內部類Node

AQS 內部提供了一個內部類.用來做爲同步隊列和等待隊列的節點對象.
不一樣隊列的節點.其使用的屬性和含義是不一樣的多線程

static final class Node {
    /** 共享 */
    static final Node SHARED = new Node();

    /** 獨佔 */
    static final Node EXCLUSIVE = null;

    /**
     * 雙向同步隊列節點時使用,由於超時或者中斷,節點會被設置爲取消狀態,被取消的節點時不會參與到競爭中的,他會一直保持取消狀態不會轉變爲其餘狀態;
     */
    static final int CANCELLED =  1;

    /**
     * 雙向同步隊列節點時使用,後繼節點的線程處於等待狀態.
     * 而當前節點的線程若是釋放了同步狀態或者被取消,將會通知後繼節點,使後繼節點的線程得以運行
     */
    static final int SIGNAL    = -1;

    /**
     * 單向等待隊列節點時使用,等待節點須要被喚醒
     */
    static final int CONDITION = -2;

    /**
     * 雙向同步隊列節點時使用,共享模式釋放時,會將節點設置爲此狀態,並一直傳播通知後續節點中止阻塞。嘗試獲取鎖。
     */
    static final int PROPAGATE = -3;

    /** 等待狀態 */
    volatile int waitStatus;

    /** 雙向同步隊列節點時使用,前置節點指針 */
    volatile Node prev;

    /** 雙向同步隊列節點時使用,後置節點指針 */
    volatile Node next;

    /** 獲取同步狀態的線程 */
    volatile Thread thread;

    /** 單項等待隊列節點時使用,後置節點指針**/
    Node nextWaiter;


    //是否時CLH隊列的節點同時時共享式獲取同步狀態
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    //獲取當前節點的前置節點
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {
    }
    //建立同步隊列節點,node傳入Node.SHARED或Node.EXCLUSIVE
    Node(Thread thread, Node mode) {
        this.nextWaiter = mode;
        this.thread = thread;
    }

    //建立等待隊列節點,waitStatus傳入Node.CONDITION
    Node(Thread thread, int waitStatus) {
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

具體執行源碼解讀

ReentrantLock構造器

一、構造器源碼:
    /**
     * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    /**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    
二、默認構造方法爲非公平鎖,帶參構造方法還可經過傳入變量還決定調用方是使用公平鎖仍是非公平鎖;

Sync同步器

一、AQS --> Sync ---> FairSync // 公平鎖
                  |
                  |> NonfairSync // 非公平鎖
                  
二、ReentrantLock內的同步器都是經過Sync抽象接口來操做調用關係的,細看會發現基本上都是經過sync.xxx之類的這種調用方式的;

lock()

一、源碼:
    public void lock() {
        sync.lock();
    }
    
    // FairSync 公平鎖調用方式
    final void lock() {
        acquire(1); // 嘗試獲取獨佔鎖
    }    
    
    // NonfairSync 非公平鎖調用方式
    final void lock() {
        if (compareAndSetState(0, 1)) // 首先判斷state資源是否爲0,若是恰巧爲0則代表目前沒有線程佔用鎖,則利用CAS佔有鎖
            setExclusiveOwnerThread(Thread.currentThread()); // 當獨佔鎖以後則將設置exclusiveOwnerThread爲當前線程
        else
            acquire(1); // 若CAS佔用鎖失敗的話,則再嘗試獲取獨佔鎖
    }
    
二、這裏的區別就是非公平鎖在調用lock時首先檢測了是否經過CAS獲取鎖,發現鎖一旦空着的話,則搶先一步佔爲己有,
   無論有沒有阻塞隊列,只要當前線程來的時候發現state資源沒被佔用那麼當前線程就搶先一步試一下CAS,CAS失敗了它纔去排隊;

acquire(int)

一、源碼:
    public final void acquire(int arg) {
        if (!tryAcquire(arg) && // 嘗試獲取鎖資源,若獲取到資源的話則線程直接返回,此方法由AQS的具體子類實現
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 不然獲取資源失敗的話,那麼就進入等待隊列
            selfInterrupt();
    }
    
二、該方法是獨佔模式下線程獲取state共享資源的入口,若是獲取到資源的話就返回,不然建立獨佔模式結點加入阻塞隊列,直到獲取到共享資源;

三、並且這裏須要加上自我中斷判斷,主要是由於線程在等待過程當中被中斷的話,它是不響應的,那麼就只有等到線程獲取到資源後經過自我判斷將這個判斷後續補上;

四、獨佔模式的該方法,正常狀況下只要沒有獲取到鎖,該方法一直處於阻塞狀態,獲取到了則跳出該方法區;

tryAcquire(int)

一、公平鎖tryAcquire源碼:
    // FairSync 公平鎖的 tryAcquire 方法
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState(); // 獲取鎖資源的最新內存值
        if (c == 0) { // 當state=0,說明鎖資源目前尚未被任何線程被佔用
            if (!hasQueuedPredecessors() && // 檢查線程是否有阻塞隊列
                compareAndSetState(0, acquires)) { // 若是沒有阻塞隊列,則經過CAS操做獲取鎖資源
                setExclusiveOwnerThread(current); // 沒有阻塞隊列,且CAS又成功獲取鎖資源,則設置獨佔線程對象爲當前線程
                return true; // 返回標誌,告訴上層該線程已經獲取到了鎖資源
            }
        }
        // 執行到此,鎖資源值不爲0,說明已經有線程正在佔用這鎖資源
        else if (current == getExclusiveOwnerThread()) { // 既然鎖已經被佔用,則看看佔用鎖的線程是否是當前線程
            int nextc = c + acquires; // 若是佔用的鎖的線程是當前線程的話,則爲重入鎖概念,狀態值作加1操做
            // int類型值小於0,是由於該int類型的state狀態值溢出了,溢出了的話那得說明這個鎖有多難獲取啊,可能出問題了
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true; // 返回成功標誌,告訴上層該線程已經獲取到了鎖資源
        }
        return false; // 返回失敗標誌,告訴上層該線程沒有獲取到鎖資源
    }

二、非公平鎖tryAcquire源碼:
    // NonfairSync 非公平鎖的 tryAcquire 方法
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires); // 調用父類的非公平獲取鎖資源方法
    }    

    // NonfairSync 非公平鎖父類 Sync 類的 nonfairTryAcquire 方法    
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState(); // 獲取鎖資源的最新內存值
        if (c == 0) { // 當state=0,說明鎖資源目前尚未被任何線程被佔用
            if (compareAndSetState(0, acquires)) { // 先無論三七二十一,先嚐試經過CAS操做獲取鎖資源
                setExclusiveOwnerThread(current); // CAS一旦成功獲取鎖資源,則設置獨佔線程對象爲當前線程
                return true;// 返回成功標誌,告訴上層該線程已經獲取到了鎖資源
            }
        }
        // 執行到此,鎖資源值不爲0,說明已經有線程正在佔用這鎖資源
        else if (current == getExclusiveOwnerThread()) { // 既然鎖已經被佔用,則看看佔用鎖的線程是否是當前線程
            int nextc = c + acquires; // 若是佔用的鎖的線程是當前線程的話,則爲重入鎖概念,狀態值作加1操做
            // int類型值小於0,是由於該int類型的state狀態值溢出了,溢出了的話那得說明這個鎖有多難獲取啊,可能出問題了
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc); // 
            return true; // 返回成功標誌,告訴上層該線程已經獲取到了鎖資源
        }
        return false; // 返回失敗標誌,告訴上層該線程沒有獲取到鎖資源
    }    

三、tryAcquire方法是AQS的子類實現的,也就是ReentrantLock的兩個靜態內部類實現的,目的就是經過CAS嘗試獲取鎖資源,
   獲取鎖資源成功則返回true,獲取鎖資源失敗則返回false;

addWaiter(Node)

一、源碼:
    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        // 按照給定的mode模式建立新的結點,模式有兩種:Node.EXCLUSIVE獨佔模式、Node.SHARED共享模式;
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail; // 將先隊尾結點賦值給臨時變量
        if (pred != null) { // 若是pred不爲空,說明該隊列已經有結點了
            node.prev = pred;
            if (compareAndSetTail(pred, node)) { // 經過CAS嘗試將node結點設置爲隊尾結點
                pred.next = node;
                return node;
            }
        }
        // 執行到此,說明隊尾沒有元素,則進入自旋首先設置頭結點,而後將此新建結點添加到隊尾
        enq(node); // 進入自旋添加node結點
        return node;
    }
    
二、    addWaiter經過傳入不一樣的模式來建立新的結點嘗試加入到隊列尾部,若是因爲併發致使添加結點到隊尾失敗的話那麼就進入自旋將結點加入隊尾;

enq(Node)

一、源碼:
    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) { // 自旋的死循環操做方式
            Node t = tail;
            // 由於是自旋方式,首次鏈表隊列tail確定爲空,可是後續鏈表有數據後就不會爲空了
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node())) // 隊列爲空時,則建立一個空對象結點做爲頭結點,無心思,可認爲傀儡結點
                    tail = head; // 空隊列的話,頭尾都指向同一個對象
            } else {
                // 進入 else 方法裏面,說明鏈表隊列已經有結點了
                node.prev = t;
                // 由於存在併發操做,經過CAS嘗試將新加入的node結點設置爲隊尾結點
                if (compareAndSetTail(t, node)) { 
                    // 若是node設置隊尾結點成功,則將以前的舊的對象尾結點t的後繼結點指向node,node的前驅結點也設置爲t
                    t.next = node;
                    return t;
                }
            }
            
            // 若是執行到這裏,說明上述兩個CAS操做任何一個失敗的話,該方法是不會放棄的,由於是自旋操做,再次循環繼續入隊
        }
    }

二、enq經過自旋這種死循環的操做方式,來確保結點正確的添加到隊列尾部,經過CAS操做若是頭部爲空則添加傀儡空結點,而後在循環添加隊尾結點;

compareAndSetHead/compareAndSetTail

一、源碼:
    /**
     * CAS head field. Used only by enq.
     */
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }
    
    /**
     * CAS tail field. Used only by enq.
     */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }    

二、CAS操做,設置頭結點、尾結點;

acquireQueued(Node, int)

一、源碼:
    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) { // 自旋的死循環操做方式
                final Node p = node.predecessor(); // 若是新建結點的前驅結點是頭結點
                // 若是前驅結點爲頭結點,那麼該結點則是老二,僅次於老大,也但願嘗試去獲取一下鎖,萬一頭結點恰巧剛剛釋放呢?但願仍是要有的,萬一實現了呢。。。
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    // 拿到鎖資源後,則該node結點升級作頭結點,且設置後繼結點指針爲空,便於GC回收
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && // 根據前驅結點看看是否須要休息一下子
                    parkAndCheckInterrupt()) // 阻塞操做,正常狀況下,獲取不到鎖,代碼就在該方法中止了,直到被喚醒
                    interrupted = true;
                    
                // 若是執行到這裏,說明嘗試休息失敗了,由於是自旋操做,因此還會再次循環繼續操做判斷
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

二、acquireQueued也是採用一個自旋的死循環操做方式,只有頭結點才能嘗試獲取鎖資源,其他的結點挨個挨個在那裏等待修改,等待被喚醒,等待機會成爲頭結點;
   而新添加的node結點也天然逃不過如此命運,先看看是否頭結點,而後再看看是否能休息;

shouldParkAfterFailedAcquire(Node, Node)

一、源碼:
    /**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus; // 獲取前驅結點的狀態值
        if (ws == Node.SIGNAL) // 若前驅結點的狀態爲SIGNAL狀態的話,那麼該結點就不要想事了,直接返回true準備休息
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            // 若前驅結點的狀態爲CANCELLED狀態的話,那麼就一直向前遍歷,直到找到一個不爲CANCELLED狀態的結點
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
             // 剩下的結點狀態,則設置其爲SIGNAL狀態,而後返回false標誌等外層循環再次判斷
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

二、shouldParkAfterFailedAcquire主要是檢測前驅結點狀態,前驅結點爲SIGNAL的話,則新結點能夠安安心心休息了;
   若是前驅結點大於零,說明前驅結點處於CANCELLED狀態,那麼則以入參pred前驅爲起點,一直往前找,直到找到最近一個正常等待狀態的結點;
   若是前驅結點小於零,那麼就將前驅結點設置爲SIGNAL狀態,而後返回false依賴acquireQueued的自旋再次判斷是否須要進行休息;

parkAndCheckInterrupt()

一、源碼:
    /**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this); // 阻塞等待
        return Thread.interrupted(); // 被喚醒後查看是否有被中斷過否?
    }

二、parkAndCheckInterrupt首先調用park讓線程進入等待狀態,而後當park阻塞被喚醒後,再次檢測是否曾經被中斷過;
   而被喚醒有兩種狀況,一個是利用unpark喚醒,一個是利用interrupt喚醒;

unlock()

一、源碼:
    public void unlock() {
        sync.release(1); // 
    }

二、unlock釋放鎖資源,通常都是在finally中被調用,防止當臨界區由於任何異常時怕鎖不被釋放;
   而釋放鎖不像獲取鎖lock的實現多色多樣,沒有所謂公平或不公平,就是規規矩矩的釋放資源而已;

release(int)

一、源碼:
    /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) { // 嘗試釋放鎖資源,此方法由AQS的具體子類實現
            Node h = head;
            if (h != null && h.waitStatus != 0) // 從頭結點開始,喚醒後繼結點
                unparkSuccessor(h); // 踢出CANCELLED狀態結點,而後喚醒後繼結點
            return true;
        }
        return false;
    }

二、release嘗試釋放鎖,而且有義務移除CANCELLED狀態的結點,還有義務喚醒後繼結點繼續運行獲取鎖資源;

tryRelease(int)

一、源碼:
    // NonfairSync 和 FairSync 的父類 Sync 類的 tryRelease 方法    
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases; // 獲取鎖資源值並作減1操做
        if (Thread.currentThread() != getExclusiveOwnerThread()) // 查看當前線程是否和持有鎖的線程是否是同一個線程
            // 正常狀況下,須要釋放的線程確定是持有鎖的線程,不然不就亂套了,確定哪裏出問題了,因此拋出異常
            throw new IllegalMonitorStateException(); 
        boolean free = false;
        if (c == 0) { // 若此時鎖資源值作減法操做後正好是0,則全部鎖資源已經釋放乾淨,所以持有鎖的變量也置爲空
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c); // 若此時作減法操做尚未歸零,那麼這種狀況就是那種重入鎖,須要重重釋放後才行
        return free;
    }

二、tryRelease主要經過CAS操做對state鎖資源進行減1操做;

unparkSuccessor(Node)

一、源碼:
    /**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        // 該node通常都是傳入head進來,也就是說,須要釋放頭結點,也就是當前結點須要釋放鎖操做,順便喚醒後繼結點
        int ws = node.waitStatus;
        if (ws < 0) // 若結點狀態值小於0,則歸零處理,經過CAS歸零,容許失敗,可是無論怎麼着,仍然要往下走去喚醒後繼結點
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next; // 取出後繼結點,這個時候通常都是Head後面的一個結點,因此通常都是老二
        if (s == null || s.waitStatus > 0) { // 若後繼結點爲空或者後繼結點已經處於CANCELLED狀態的話
            s = null;
            // 那麼從隊尾向前遍歷,直到找到一個小於等於0的結點
            // 這裏爲何要從隊尾向前尋找?
            // * 由於在這個隊列中,任何一個結點都有可能被中斷,只是有可能,並不表明絕對的,但有一點是肯定的,
            // * 被中斷的結點會將結點的狀態設置爲CANCELLED狀態,標識這個結點在未來的某個時刻會被踢出;
            // * 踢出隊列的規則很簡單,就是該結點的前驅結點不會指向它,而是會指向它的後面的一個非CANCELLED狀態的結點;
            // * 而這個將被踢出的結點,它的next指針將會指向它本身;
            // * 因此設想一下,若是咱們從head日後找,一旦發現這麼一個處於CANCELLED狀態的結點,那麼for循環豈不是就是死循環了;
            // * 可是全部的這些結點當中,它們的prev前驅結點仍是沒有被誰動過,因此從tail結點向前遍歷最穩妥

            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread); // 喚醒線程
    }

二、unparkSuccessor主要是踢出CANCELLED狀態結點,而後喚醒後繼結點;
   可是這個喚醒的後繼結點爲空的話,那麼則從隊尾一直向前循環查找小於等於零狀態的結點並調用unpark喚醒;

參考:

Java併發包基石-AQS詳解併發

併發編程面試必備:AQS 原理以及 AQS 同步組件總結

深刻學習java同步器AQS

Java併發之AQS源碼分析(一)

淺談Java併發編程系列(九)—— AQS結構及原理分析

深刻分析AQS實現原理

原理剖析(第 005 篇)AQS工做原理分析

Java多線程進階(十)—— J.U.C之locks框架:基於AQS的讀寫鎖(5)

AbstractQueuedSynchronizer 原理分析 - Condition 實現原理

AbstractQueuedSynchronizer(AQS源碼解讀)

相關文章
相關標籤/搜索