AQS之ReentLock源碼解析

前言:

Java中的同步類 ReentrantLock是基於AbstractQueuedSynchronizer(簡稱爲 AQS)實現的。

今天從源碼來了解下ReentrantLock中非公平鎖的加鎖和釋放鎖(ReentrantLock中支持公平鎖和非公平鎖,默認是非公平鎖的,但能夠經過建立ReentrantLock對象時傳入參數指定使用公平鎖)。html

在瞭解ReentrantLock前,須要對AQS有必定的瞭解,不然在學習時會比較困難的,而且在經過源碼學習ReentrantLock時也會穿插着講解AQS內容。java

AQS掃蕩:

1.0、AQS中state變量

​AQS中提供了一個int類型的state變量,而且state變量被volatile修飾,表示state變量的讀寫操做能夠保證原子性;而且AQS還提供了針對state變量的讀寫方法,以及使用CAS算法更新state變量的方法。 AQS使用state變量這個狀態變量來實現同步狀態。node

①、源碼展現算法

/**
 * The synchronization state. 
 */
private volatile int state;

/**
 * get 獲取state變量值 
 */
protected final int getState() {
    return state;
}

/**
 * set 更新state變量值 
 * @param newState  新的狀態變量值
 */
protected final void setState(int newState) {
    state = newState;
}


/**
 * 使用CAS算法更新state變量值; 當從共享內存中讀取出的state變量值與expect指望值一致的話,
 * 就將其更新爲update值。使用CAS算法保證其操做的原子性
 *
 * @param expect  指望值
 * @param update  更新值
 */
protected final boolean compareAndSetState(int expect, int update) {
    // 使用Unsafe類的本地方法來實現CAS
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

1.一、state同步狀態的競爭

​ 多個線程同時競爭AQS的state同步狀態,在同一時刻只能有一個線程獲取到同步狀態(獲取到鎖),那其它沒獲取到鎖的線程該怎麼辦呢app

它們會進去到一個同步隊列中,在隊列中等待同步鎖的釋放;

這個同步隊列是一個基於鏈表的雙向隊列, 基於鏈表的話,就會存在Node節點,那麼AQS中節點是怎麼實現的呢源碼分析

①、Node節點:學習

AQS中本身實現了一個內部Node節點類,Node節點類中定義了一些屬性,下面來簡單說說屬性的意思:ui

static final class Node {
        // 標誌在同步隊列中Node節點的模式,共享模式 
        static final Node SHARED = new Node();
        // 標誌在同步隊列中Node節點的模式,獨佔(排他)模式 
        static final Node EXCLUSIVE = null;

        // waitStatus值爲1時表示該線程節點已釋放(超時等),已取消的節點不會再阻塞。 
        static final int CANCELLED =  1;
    
        // waitStatus值爲-1時表示當此節點的前驅結點釋放鎖時,而後當前節點中的線程就能夠去獲取鎖運行 
        static final int SIGNAL    = -1;
    
        /**
         * waitStatus爲-2時,表示該線程在condition隊列中阻塞(Condition有使用),
         * 當其餘線程調用了Condition的signal()方法後,CONDITION狀態的結點將從
         * 等待隊列轉移到同步隊列中,等待獲取同步鎖。
         */ 
        static final int CONDITION = -2;
    
        /**
         * waitStatus爲-3時,與共享模式有關,在共享模式下,該狀態表示可運行
         * (CountDownLatch中有使用)。
         */
        static final int PROPAGATE = -3;

        /**
         * waitStatus:等待狀態,指的是當前Node節點中存放的線程的等待狀態,
         * 等待狀態值就是上面的四個狀態值:CANCELLED、SIGNAL、CONDITION、PROPAGATE
         */
        volatile int waitStatus;

        /**
         * 由於同步隊列是雙向隊列,那麼每一個節點都會有指向前一個節點的 prev 指針
         */
        volatile Node prev;

        /**
         * 由於同步隊列是雙向隊列,那麼每一個節點也都會有指向後一個節點的 next 指針
         */
        volatile Node next;

        /**
         * Node節點中存放的阻塞的線程引用
         */
        volatile Thread thread;

        /**
         * 當前節點與其next後繼結點的所屬模式,是SHARED共享模式,仍是EXCLUSIVE獨佔模式,
         *
         * 注:好比說當前節點A是共享的,那麼它的這個字段是shared,也就是說在這個等待隊列中,
         * A節點的後繼節點也是shared。
         */
        Node nextWaiter;

        /**
         * 獲取當前節點是否爲共享模式
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * 獲取當前節點的 prev前驅結點
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() { }
    
        // 在後面的addWaiter方法會使用到,線程競爭state同步鎖失敗時,會建立Node節點存放thread
        Node(Thread thread, Node mode) {     
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

②、同步隊列結構圖(雙向隊列):this

1.二、圖解AQS原理

​ 經過前面兩點,能夠了解到AQS的原理究竟是什麼了,總結爲一句話:AQS使用一個Volatile的int類型的成員變量來表示同步狀態,經過內置的FIFO隊列來完成資源獲取的排隊工做,經過CAS完成對State值的修改spa

而後再來一張圖,使得理解更加深入:

圖片來源:Java技術之AQS詳解

好了, AQS暫時能夠先了解到這裏了,知道這些後,在後面瞭解 ReentrantLock時就會變的容易些,而且後面經過源碼學習 ReentrantLock時,因爲會使用到AQS的模版方法,因此也會講解到AQS的內容。

劍指ReentrantLock源碼:

2.0、ReentrantLock   vs   Synchronized

​ 在瞭解ReentrantLock以前,先將ReentrantLockSynchronized進行比較下,這樣能夠更加了解ReentrantLock的特性,也有助於下面源碼的閱讀;

2.一、ReentrantLock的公平鎖與非公平鎖

建立一個ReentrantLock對象,在建立對象時,若是不指定公平鎖的話,默認是非公平鎖;

①、簡單瞭解下什麼是公平鎖,什麼是非公平鎖?

公平鎖:按照申請同步鎖的順序來獲取鎖;

非公平鎖:不會按照申請鎖的順序獲取鎖,存在鎖的搶佔;

注:後面會經過源碼瞭解下非公平鎖和公平鎖是怎樣獲取鎖的。

②、源碼以下:

// 默認是非公平的鎖
ReentrantLock lock = new ReentrantLock();
// 構造方法默認建立了一個 NonfairSync 非公平鎖對象
public ReentrantLock() {
    // NonfairSync繼承了Sync類,Sync類又繼承了AQS類
    sync = new NonfairSync();
}


// 傳入參數 true,指定爲公平鎖
ReentrantLock lock = new ReentrantLock(true);
// 傳入參數的構造方法,當fair爲true時,建立一個公平鎖對象,不然建立一個非公平鎖對象
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

2.二、經過源碼看下非公平鎖的加鎖機制:(獨佔模式)

①、開始先經過一個簡單流程圖來看下獨佔模式下加鎖的流程:

​ 圖片來源:美團技術團隊

②、源碼分析:加鎖時首先使用CAS算法嘗試將state狀態變量設置爲1,設置成功後,表示當前線程獲取到了鎖,而後將獨佔鎖的擁有者設置爲當前線程;若是CAS設置不成功,則進入Acquire方法進行後續處理。

final void lock() {
    // 使用CAS算法嘗試將state狀態變量設置爲1
    if (compareAndSetState(0, 1))
        // 設置成功後,表示當前線程獲取到了鎖,而後將獨佔鎖的擁有者設置爲當前線程
        setExclusiveOwnerThread(Thread.currentThread());
    else
        // 進行後續處理,會涉及到重入性、建立Node節點加入到隊列尾等
        acquire(1);
}

③、探究下acquire(1) 方法裏面是什麼呢 acquire(1) 方法是AQS提供的方法:

public final void acquire(int arg) {
    /**
     * 使用tryAcquire()方法,讓當前線程嘗試獲取同步鎖,獲取成的話,就不會執行後面的acquireQueued()
     * 方法了,這是因爲 && 邏輯運算符的特性決定的。
     *
     * 若是使用tryAcquire()方法獲取同步鎖失敗的話,就會繼續執行acquireQueued()方法,它的做用是
     * 一直死循環遍歷同步隊列,直到使addWaiter()方法建立的節點中線程獲取到鎖。
     *
     * 若是acquireQueued()返回的true,這個true不是表明成功的獲取到鎖,而是表明當前線程是否存在
     * 中斷標誌,若是存在的話,在獲取到同步鎖後,須要使用selfInterrupt()對當前線程進行中斷。
     */
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

1)tryAcquire(arg) 方法源碼解讀:NonfairSync 非公平鎖中重寫了AQS的tryAcquire()方法

final boolean nonfairTryAcquire(int acquires) {
    // 當前線程
    final Thread current = Thread.currentThread();
    // 獲取當前state同步狀態變量值,因爲使用volatile修飾,單獨的讀寫操做具備原子性
    int c = getState();
    // 若是狀態值爲0
    if (c == 0) {
        // 使用compareAndSetState方法這個CAS算法嘗試將state同步狀態變量設置爲1 獲取同步鎖
        if (compareAndSetState(0, acquires)) {
            // 而後將獨佔鎖的擁有者設置爲當前線程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 若是擁有獨佔鎖的的線程是當前線程的話,表示當前線程須要重複獲取鎖(重入鎖)
    else if (current == getExclusiveOwnerThread()) {
        // 當前同步狀態state變量值加1
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 寫入state同步狀態變量值,因爲使用volatile修飾,單獨的讀寫操做具備原子性
        setState(nextc);
        return true;
    }
    return false;
}

2)addWaiter( Node.EXCLUSIVE ) :建立一個同步隊列Node節點,同時綁定節點的模式爲獨佔模式,而且將建立的節點插入到同步隊列尾部;addWaiter( ) 方法是AQS提供方法。

private Node addWaiter(Node mode) {
    // model參數是獨佔模式,默認爲null;
    Node node = new Node(Thread.currentThread(), mode);
    // 將當前同步隊列的tail尾節點的地址引用賦值給pre變量
    Node pred = tail;
    // 若是pre不爲null,說明同步隊列中存在節點
    if (pred != null) {
        // 當前節點的前驅結點指向pre尾節點
        node.prev = pred;
        // 使用CAS算法將當前節點設置爲尾節點,使用CAS保證其原子性
        if (compareAndSetTail(pred, node)) {
            // 尾節點設置成功,將pre舊尾節點的後繼結點指向新尾節點node
            pred.next = node;
            return node;
        }
    }
    // 若是尾節點爲null,表示同步隊列中尚未節點,enq()方法將當前node節點插入到隊列中
    enq(node);
    return node;
}

3)、說完addWaiter( Node.EXCLUSIVE )方法,接下來講下acquireQueued()方法,它是怎樣使addWaiter()建立的節點中的線程獲取到state同步鎖的。(這個方法也是AQS提供的)

源碼走起:

final boolean acquireQueued(final Node node, int arg) {
    // 標誌cancelAcquire()方法是否執行
    boolean failed = true;
    try {
        // 標誌是否中斷,默認爲false不中斷
        boolean interrupted = false;
        for (;;) {
            // 獲取當前節點的前驅結點
            final Node p = node.predecessor();
            /**
             * 若是當前節點的前驅結點已是同步隊列的頭結點了,說明了兩點內容:
             * 一、其前驅結點已經獲取到了同步鎖了,而且鎖還沒釋放
             * 二、其前驅結點已經獲取到了同步鎖了,可是鎖已經釋放了
             *
             * 而後使用tryAcquire()方法去嘗試獲取同步鎖,若是前驅結點已經釋放了鎖,那麼就會獲取成功,
             * 不然同步鎖獲取失敗,繼續循環
             */
            if (p == head && tryAcquire(arg)) {
                // 將當前節點設置爲同步隊列的head頭結點
                setHead(node);
                // 而後將當前節點的前驅結點的後繼結點置爲null,幫助進行垃圾回收
                p.next = null; // help GC
                failed = false;
                // 返回中斷的標誌
                return interrupted;
            }
            /**
             * shouldParkAfterFailedAcquire()是對當前節點的前驅結點的狀態進行判斷,以及去針對各類
             * 狀態作出相應處理,因爲文章篇幅問題,具體源碼本文不作講解;只需知道若是前驅結點p的狀態爲
             * SIGNAL的話,就返回true。
             *
             * parkAndCheckInterrupt()方法會使當前線程進去waiting狀態,而且查看當前線程是否被中斷,
             * interrupted() 同時會將中斷標誌清除。
             */
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 中斷標誌置爲true
                interrupted = true;
        }
    } finally {
        if (failed)
            /**
             * 若是for(;;)循環中出現異常,而且failed=false沒有執行的話,cancelAcquire方法
             * 就會將當前線程的狀態置爲 node.CANCELLED 已取消狀態,而且將當前節點node移出
             * 同步隊列。
             */
            cancelAcquire(node);
    }
}

4)、最後說下 selfInterrupt() 方法, 這個方法就是將當前線程進行中斷:

static void selfInterrupt() {
    // 中斷當前線程
    Thread.currentThread().interrupt();
}

2.三、公平鎖與非公平鎖在加鎖時的區別:

①、公平鎖 FairSync 的加鎖 lock() 加鎖方法:

final void lock() {
    acquire(1);
}

②、非公平鎖 NonfairSync 的加鎖 lock() 加鎖方法:上面講解源碼的時候有提到喲,還有印象嗎,沒印象的話也不要緊,不要哭 , 嘿嘿,我都準備好了。 源碼奉上:

final void lock() {
    /** 
     * 看到這,是否是發現了什麼,非公平鎖在此處直觀看的話,發現比公平鎖多了這幾行代碼; 
     * 這裏就是使得線程存在了一個搶佔,若是當前同步隊列中的head頭結點中 線程A 恰好釋放了同步鎖,
     * 而後此時 線程B 正好來了,那麼此時線程B就會獲取到鎖,而此時同步隊列中head頭結點的後繼結點中的
     * 線程C 就沒法獲取到同步鎖,只能等待線程B釋放鎖後,嘗試獲取鎖了。
     */
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

③、除了上面那處不一樣以外,還有別的地方嗎;別急,再看看 acquire(1) 方法是否同樣呢?

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

​ 誒呀,方法點進去都是同樣的呀,可不嘛,都是調用的AQS提供的 acquire(1) 方法;可是彆着急,上面在講解非公平鎖加鎖時,有提到的 tryAcquire(arg) 方法在AQS的不一樣子孫類中都有各自的實現的。如今打開公平鎖的 tryAcquire(arg) 方法看看其源碼與非公平鎖有什麼區別:

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        /**
         * 經過對比源碼發現,公平鎖比非公平鎖多了這塊代碼: !hasQueuedPredecessors() 
         * hasQueuedPredecessors() 是作什麼呢?就是判斷當前同步隊列中是否存在節點,若是存在節點呢,
         * 就返回true,因爲前面有個 !,那麼就是false,再根據 && 邏輯運算符的特性,不會繼續執行了;
         * 
         * tryAcquire()方法直接返回false,後面的邏輯就和非公平鎖的一致了,就是建立Node節點,並將
         * 節點加入到同步隊列尾; 公平鎖:發現當前同步隊列中存在節點,有線程在本身前面已經申請可鎖,那
         * 本身就得乖乖的向後面排隊去。
         *
         * 友情提示:在生活中,咱們也須要按照先來後到去排隊,保證素質; 還有就是怕大家不排隊被別人打了。
         */
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
鬆口氣,從中午一直寫到下午快四點了,先讓我歇口氣,快累成狗了;本文還剩下釋放鎖部分沒寫呢,歇口氣,喝口水繼續

注意:ReentrantLock在釋放鎖的時候,並不區分公平鎖和非公平鎖

2.四、經過源碼看下釋放鎖機制:(獨佔模式)

①、unlock() 釋放鎖的方法:

public void unlock() {
    // 釋放鎖時,須要將state同步狀態變量值進行減 1,傳入參數 1
    sync.release(1);
}

②、release( int arg ) 方法解析:(此方法是AQS提供的)

public final boolean release(int arg) {
    // tryRelease方法:嘗試釋放鎖,成功true,失敗false
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 頭結點不爲空而且頭結點的waitStatus不是初始化節點狀況,而後喚醒此阻塞的線程
            unparkSuccessor(h);
        return true;
    }
    return false;
}

注意:這裏的判斷條件爲何是 h != null && h.waitStatus != 0 ?

h == null Head還沒初始化。初始狀況下,head == null,第一個節點入隊,Head會被初始化一個虛擬節點。因此說,這裏若是還沒來得及入隊,就會出現head == null 的狀況。

h != null && waitStatus == 0 代表後繼節點對應的線程仍在運行中,不須要喚醒。

h != null && waitStatus < 0 代表後繼節點可能被阻塞了,須要喚醒。

③、而後再來看看tryRelease(arg) 方法:

protected final boolean tryRelease(int releases) {
    // 當前state狀態值進行減一
    int c = getState() - releases;
    // 若是當前獨佔鎖的擁有者不是當前線程,則拋出 非法監視器狀態 異常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 更新state同步狀態值
    setState(c);
    return free;
}

④、最後看看unparkSuccessor(Node node) 方法:

private void unparkSuccessor(Node node) {
    // 獲取頭結點waitStatus
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 獲取當前節點的下一個節點
    Node s = node.next;
    // 若是下個節點是null或者下個節點被cancelled,就找到隊列最開始的非cancelled狀態的節點
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 就從尾部節點開始找,到隊首,找到隊列第一個waitStatus<0的節點。
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 若是當前節點的後繼結點不爲null,則將其節點中處於阻塞狀態的線程unpark喚醒
    if (s != null)
        LockSupport.unpark(s.thread);
}

注意:爲何要從後往前找第一個非Cancelled的節點呢?緣由以下:

因爲以前加鎖時的addWaiter( )方法的緣由;

private Node addWaiter(Node mode) {
    // model參數是獨佔模式,默認爲null;
    Node node = new Node(Thread.currentThread(), mode);
    // 將當前同步隊列的tail尾節點的地址引用賦值給pre變量
    Node pred = tail;
    // 若是pre不爲null,說明同步隊列中存在節點
    if (pred != null) {
        // 當前節點的前驅結點指向pre尾節點
        node.prev = pred;
        // 使用CAS算法將當前節點設置爲尾節點,使用CAS保證其原子性
        if (compareAndSetTail(pred, node)) {
            // 尾節點設置成功,將pre舊尾節點的後繼結點指向新尾節點node
            pred.next = node;
            return node;
        }
    }
    // 若是尾節點爲null,表示同步隊列中尚未節點,enq()方法將當前node節點插入到隊列中
    enq(node);
    return node;
}

從這裏能夠看到,節點入隊並非原子操做,也就是說,node.prev = pred ; compareAndSetTail( pred, node ) 這兩個地方能夠看做Tail入隊的原子操做,可是此時 pred.next = node; 還沒執行,若是這個時候執行了unparkSuccessor方法,就沒辦法從前日後找了,因此須要從後往前找。還有一點緣由,在產生CANCELLED狀態節點的時候,先斷開的是Next指針,Prev指針並未斷開,所以也是必需要從後往前遍歷纔可以遍歷徹底部的Node。

end! 長吸一口氣,終於本文算是寫完了,最後再看看有沒有錯別字,以及排排版。

後續還會出一篇結合CountDownLatch源碼學習共享鎖(共享模式)的文章。

參考資料:

一、從ReentrantLock的實現看AQS的原理及應用

二、Java技術之AQS詳解

相關文章
相關標籤/搜索