死磕 java同步系列之ReentrantReadWriteLock源碼解析

問題

(1)讀寫鎖是什麼?java

(2)讀寫鎖具備哪些特性?node

(3)ReentrantReadWriteLock是怎麼實現讀寫鎖的?緩存

(4)如何使用ReentrantReadWriteLock實現高效安全的TreeMap?安全

簡介

讀寫鎖是一種特殊的鎖,它把對共享資源的訪問分爲讀訪問和寫訪問,多個線程能夠同時對共享資源進行讀訪問,可是同一時間只能有一個線程對共享資源進行寫訪問,使用讀寫鎖能夠極大地提升併發量。併發

特性

讀寫鎖具備如下特性:高併發

是否互斥

能夠看到,讀寫鎖除了讀讀不互斥,讀寫、寫讀、寫寫都是互斥的。oop

那麼,ReentrantReadWriteLock是怎麼實現讀寫鎖的呢?源碼分析

類結構

在看源碼以前,咱們仍是先來看一下ReentrantReadWriteLock這個類的主要結構。ui

ReentrantReadWriteLock

ReentrantReadWriteLock中的類分紅三個部分:this

(1)ReentrantReadWriteLock自己實現了ReadWriteLock接口,這個接口只提供了兩個方法readLock()writeLock()

(2)同步器,包含一個繼承了AQS的Sync內部類,以及其兩個子類FairSync和NonfairSync;

(3)ReadLock和WriteLock兩個內部類實現了Lock接口,它們具備鎖的一些特性。

源碼分析

主要屬性

// 讀鎖
private final ReentrantReadWriteLock.ReadLock readerLock;
// 寫鎖
private final ReentrantReadWriteLock.WriteLock writerLock;
// 同步器
final Sync sync;

維護了讀鎖、寫鎖和同步器。

主要構造方法

// 默認構造方法
public ReentrantReadWriteLock() {
    this(false);
}
// 是否使用公平鎖的構造方法
public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}

它提供了兩個構造方法,默認構造方法使用的是非公平鎖模式,在構造方法中初始化了讀鎖和寫鎖。

獲取讀鎖和寫鎖的方法

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

屬性中的讀鎖和寫鎖是私有屬性,經過這兩個方法暴露出去。

下面咱們主要分析讀鎖和寫鎖的加鎖、解鎖方法,且都是基於非公平模式的。

ReadLock.lock()

// ReentrantReadWriteLock.ReadLock.lock()
public void lock() {
    sync.acquireShared(1);
}
// AbstractQueuedSynchronizer.acquireShared()
public final void acquireShared(int arg) {
    // 嘗試獲取共享鎖(返回1表示成功,返回-1表示失敗)
    if (tryAcquireShared(arg) < 0)
        // 失敗了就可能要排隊
        doAcquireShared(arg);
}
// ReentrantReadWriteLock.Sync.tryAcquireShared()
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    // 狀態變量的值
    // 在讀寫鎖模式下,高16位存儲的是共享鎖(讀鎖)被獲取的次數,低16位存儲的是互斥鎖(寫鎖)被獲取的次數
    int c = getState();
    // 互斥鎖的次數
    // 若是其它線程得到了寫鎖,直接返回-1
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    // 讀鎖被獲取的次數
    int r = sharedCount(c);
    
    // 下面說明此時尚未寫鎖,嘗試去更新state的值獲取讀鎖
    // 讀者是否須要排隊(是不是公平模式)
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        // 獲取讀鎖成功
        if (r == 0) {
            // 若是以前尚未線程獲取讀鎖
            // 記錄第一個讀者爲當前線程
            firstReader = current;
            // 第一個讀者重入的次數爲1
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            // 若是有線程獲取了讀鎖且是當前線程是第一個讀者
            // 則把其重入次數加1
            firstReaderHoldCount++;
        } else {
            // 若是有線程獲取了讀鎖且當前線程不是第一個讀者
            // 則從緩存中獲取重入次數保存器
            HoldCounter rh = cachedHoldCounter;
            // 若是緩存不屬性當前線程
            // 再從ThreadLocal中獲取
            // readHolds自己是一個ThreadLocal,裏面存儲的是HoldCounter
            if (rh == null || rh.tid != getThreadId(current))
                // get()的時候會初始化rh
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                // 若是rh的次數爲0,把它放到ThreadLocal中去
                readHolds.set(rh);
            // 重入的次數加1(初始次數爲0)
            rh.count++;
        }
        // 獲取讀鎖成功,返回1
        return 1;
    }
    // 經過這個方法再去嘗試獲取讀鎖(若是以前其它線程獲取了寫鎖,同樣返回-1表示失敗)
    return fullTryAcquireShared(current);
}
// AbstractQueuedSynchronizer.doAcquireShared()
private void doAcquireShared(int arg) {
    // 進入AQS的隊列中
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 當前節點的前一個節點
            final Node p = node.predecessor();
            // 若是前一個節點是頭節點(說明是第一個排隊的節點)
            if (p == head) {
                // 再次嘗試獲取讀鎖
                int r = tryAcquireShared(arg);
                // 若是成功了
                if (r >= 0) {
                    // 頭節點後移並傳播
                    // 傳播即喚醒後面連續的讀節點
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 沒獲取到讀鎖,阻塞並等待被喚醒
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
// AbstractQueuedSynchronizer.setHeadAndPropagate()
private void setHeadAndPropagate(Node node, int propagate) {
    // h爲舊的頭節點
    Node h = head;
    // 設置當前節點爲新頭節點
    setHead(node);
    
    // 若是舊的頭節點或新的頭節點爲空或者其等待狀態小於0(表示狀態爲SIGNAL/PROPAGATE)
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        // 須要傳播
        // 取下一個節點
        Node s = node.next;
        // 若是下一個節點爲空,或者是須要獲取讀鎖的節點
        if (s == null || s.isShared())
            // 喚醒下一個節點
            doReleaseShared();
    }
}
// AbstractQueuedSynchronizer.doReleaseShared()
// 這個方法只會喚醒一個節點
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 若是頭節點狀態爲SIGNAL,說明要喚醒下一個節點
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 喚醒下一個節點
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     // 把頭節點的狀態改成PROPAGATE成功纔會跳到下面的if
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 若是喚醒後head沒變,則跳出循環
        if (h == head)                   // loop if head changed
            break;
    }
}

看完【死磕 java同步系列之ReentrantLock源碼解析(一)——公平鎖、非公平鎖】的分析再看這章的內容應該會比較簡單,中間同樣的方法咱們這裏直接跳過了。

咱們來看看大體的邏輯:

(1)先嚐試獲取讀鎖;

(2)若是成功了直接結束;

(3)若是失敗了,進入doAcquireShared()方法;

(4)doAcquireShared()方法中首先會生成一個新節點並進入AQS隊列中;

(5)若是頭節點正好是當前節點的上一個節點,再次嘗試獲取鎖;

(6)若是成功了,則設置頭節點爲新節點,並傳播;

(7)傳播即喚醒下一個讀節點(若是下一個節點是讀節點的話);

(8)若是頭節點不是當前節點的上一個節點或者(5)失敗,則阻塞當前線程等待被喚醒;

(9)喚醒以後繼續走(5)的邏輯;

在整個邏輯中是在哪裏連續喚醒讀節點的呢?

答案是在doAcquireShared()方法中,在這裏一個節點A獲取了讀鎖後,會喚醒下一個讀節點B,這時候B也會獲取讀鎖,而後B繼續喚醒C,依次往復,也就是說這裏的節點是一個喚醒一個這樣的形式,而不是一個節點獲取了讀鎖後一次性喚醒後面全部的讀節點。

ReentrantReadWriteLock1

ReadLock.unlock()

// java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock.unlock
public void unlock() {
    sync.releaseShared(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer.releaseShared
public final boolean releaseShared(int arg) {
    // 若是嘗試釋放成功了,就喚醒下一個節點
    if (tryReleaseShared(arg)) {
        // 這個方法實際是喚醒下一個節點
        doReleaseShared();
        return true;
    }
    return false;
}
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync.tryReleaseShared
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {
        // 若是第一個讀者(讀線程)是當前線程
        // 就把它重入的次數減1
        // 若是減到0了就把第一個讀者置爲空
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        // 若是第一個讀者不是當前線程
        // 同樣地,把它重入的次數減1
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    for (;;) {
        // 共享鎖獲取的次數減1
        // 若是減爲0了說明徹底釋放了,才返回true
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer.doReleaseShared
// 行爲跟方法名有點不符,實際是喚醒下一個節點
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 若是頭節點狀態爲SIGNAL,說明要喚醒下一個節點
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 喚醒下一個節點
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     // 把頭節點的狀態改成PROPAGATE成功纔會跳到下面的if
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 若是喚醒後head沒變,則跳出循環
        if (h == head)                   // loop if head changed
            break;
    }
}

解鎖的大體流程以下:

(1)將當前線程重入的次數減1;

(2)將共享鎖總共被獲取的次數減1;

(3)若是共享鎖獲取的次數減爲0了,說明共享鎖徹底釋放了,那就喚醒下一個節點;

以下圖,ABC三個節點各獲取了一次共享鎖,三者釋放的順序分別爲ACB,那麼最後B釋放共享鎖的時候tryReleaseShared()纔會返回true,進而纔會喚醒下一個節點D。

ReentrantReadWriteLock2

WriteLock.lock()

// java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock.lock()
public void lock() {
    sync.acquire(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire()
public final void acquire(int arg) {
    // 先嚐試獲取鎖
    // 若是失敗,則會進入隊列中排隊,後面的邏輯跟ReentrantLock如出一轍了
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync.tryAcquire()
protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    // 狀態變量state的值
    int c = getState();
    // 互斥鎖被獲取的次數
    int w = exclusiveCount(c);
    if (c != 0) {
        // 若是c!=0且w==0,說明共享鎖被獲取的次數不爲0
        // 這句話整個的意思就是
        // 若是共享鎖被獲取的次數不爲0,或者被其它線程獲取了互斥鎖(寫鎖)
        // 那麼就返回false,獲取寫鎖失敗
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 溢出檢測
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 到這裏說明當前線程已經獲取過寫鎖,這裏是重入了,直接把state加1便可
        setState(c + acquires);
        // 獲取寫鎖成功
        return true;
    }
    // 若是c等於0,就嘗試更新state的值(非公平模式writerShouldBlock()返回false)
    // 若是失敗了,說明獲取寫鎖失敗,返回false
    // 若是成功了,說明獲取寫鎖成功,把本身設置爲佔有者,並返回true
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}
// 獲取寫鎖失敗了後面的邏輯跟ReentrantLock是一致的,進入隊列排隊,這裏就不列源碼了

寫鎖獲取的過程大體以下:

(1)嘗試獲取鎖;

(2)若是有讀者佔有着讀鎖,嘗試獲取寫鎖失敗;

(3)若是有其它線程佔有着寫鎖,嘗試獲取寫鎖失敗;

(4)若是是當前線程佔有着寫鎖,嘗試獲取寫鎖成功,state值加1;

(5)若是沒有線程佔有着鎖(state==0),當前線程嘗試更新state的值,成功了表示嘗試獲取鎖成功,不然失敗;

(6)嘗試獲取鎖失敗之後,進入隊列排隊,等待被喚醒;

(7)後續邏輯跟ReentrantLock是一致;

WriteLock.unlock()

// java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock.unlock()
public void unlock() {
    sync.release(1);
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer.release()
public final boolean release(int arg) {
    // 若是嘗試釋放鎖成功(徹底釋放鎖)
    // 就嘗試喚醒下一個節點
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync.tryRelease()
protected final boolean tryRelease(int releases) {
    // 若是寫鎖不是當前線程佔有着,拋出異常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 狀態變量的值減1
    int nextc = getState() - releases;
    // 是否徹底釋放鎖
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    // 設置狀態變量的值
    setState(nextc);
    // 若是徹底釋放了寫鎖,返回true
    return free;
}

寫鎖釋放的過程大體爲:

(1)先嚐試釋放鎖,即狀態變量state的值減1;

(2)若是減爲0了,說明徹底釋放了鎖;

(3)徹底釋放了鎖才喚醒下一個等待的節點;

總結

(1)ReentrantReadWriteLock採用讀寫鎖的思想,能提升併發的吞吐量;

(2)讀鎖使用的是共享鎖,多個讀鎖能夠一塊兒獲取鎖,互相不會影響,即讀讀不互斥;

(3)讀寫、寫讀和寫寫是會互斥的,前者佔有着鎖,後者須要進入AQS隊列中排隊;

(4)多個連續的讀線程是一個接着一個被喚醒的,而不是一次性喚醒全部讀線程;

(5)只有多個讀鎖都徹底釋放了纔會喚醒下一個寫線程;

(6)只有寫鎖徹底釋放了纔會喚醒下一個等待者,這個等待者有多是讀線程,也多是寫線程;

彩蛋

(1)若是同一個線程先獲取讀鎖,再獲取寫鎖會怎樣?

ReentrantReadWriteLock3

分析上圖中的代碼,在tryAcquire()方法中,若是讀鎖被獲取的次數不爲0(c != 0 && w == 0),返回false,返回以後外層方法會讓當前線程阻塞。

能夠經過下面的方法驗證:

readLock.lock();
writeLock.lock();
writeLock.unlock();
readLock.unlock();

運行程序後會發現代碼中止在writeLock.lock();,固然,你也能夠打個斷點跟蹤進去看看。

(2)若是同一個線程先獲取寫鎖,再獲取讀鎖會怎樣?

ReentrantReadWriteLock4

分析上面的代碼,在tryAcquireShared()方法中,第一個紅框處並不會返回,由於不知足getExclusiveOwnerThread() != current;第二個紅框處若是原子更新成功就說明獲取了讀鎖,而後就會執行第三個紅框處的代碼把其重入次數更改成1。

能夠經過下面的方法驗證:

writeLock.lock();
readLock.lock();
readLock.unlock();
writeLock.unlock();

你能夠打個斷點跟蹤一下看看。

(3)死鎖了麼?

經過上面的兩個例子,咱們能夠感覺到同一個線程先讀後寫和先寫後讀是徹底不同的,爲何不同呢?

先讀後寫,一個線程佔有讀鎖後,其它線程仍是能夠佔有讀鎖的,這時候若是在其它線程佔有讀鎖以前讓本身佔有了寫鎖,其它線程又不能佔有讀鎖了,這段程序會很是難實現,邏輯也很奇怪,因此,設計成只要一個線程佔有了讀鎖,其它線程包括它本身都不能再獲取寫鎖。

先寫後讀,一個線程佔有寫鎖後,其它線程是不能佔有任何鎖的,這時候,即便本身佔有一個讀鎖,對程序的邏輯也不會有任何影響,因此,一個線程佔有寫鎖後是能夠再佔有讀鎖的,只是這個時候其它線程依然沒法獲取讀鎖。

若是你仔細思考上面的邏輯,你會發現一個線程先佔有讀鎖後佔有寫鎖,會有一個很大的問題——鎖沒法被釋放也沒法被獲取了。這個線程先佔有了讀鎖,而後本身再佔有寫鎖的時候會阻塞,而後它就本身把本身搞死了,進而把其它線程也搞死了,它沒法釋放鎖,其它線程也沒法得到鎖了。

這是死鎖嗎?彷佛不是,死鎖的定義是線程A佔有着線程B須要的資源,線程B佔有着線程A須要的資源,兩個線程相互等待對方釋放資源,經典的死鎖例子以下:

Object a = new Object();
Object b = new Object();

new Thread(()->{
    synchronized (a) {
        LockSupport.parkNanos(1000000);
        synchronized (b) {

        }
    }
}).start();

new Thread(()->{
    synchronized (b) {
        synchronized (a) {

        }
    }
}).start();

簡單的死鎖用jstack是能夠看到的:

"Thread-1":
        at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest.lambda$main$1(ReentrantReadWriteLockTest.java:40)
        - waiting to lock <0x000000076baa9068> (a java.lang.Object)
        - locked <0x000000076baa9078> (a java.lang.Object)
        at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest$$Lambda$2/1831932724.run(Unknown Source)
        at java.lang.Thread.run(Thread.java:748)
"Thread-0":
        at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest.lambda$main$0(ReentrantReadWriteLockTest.java:32)
        - waiting to lock <0x000000076baa9078> (a java.lang.Object)
        - locked <0x000000076baa9068> (a java.lang.Object)
        at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest$$Lambda$1/1096979270.run(Unknown Source)
        at java.lang.Thread.run(Thread.java:748)

Found 1 deadlock.

(4)如何使用ReentrantReadWriteLock實現一個高效安全的TreeMap?

class SafeTreeMap {
    private final Map<String, Object> m = new TreeMap<String, Object>();
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private final Lock readLock = lock.readLock();
    private final Lock writeLock = lock.writeLock();

    public Object get(String key) {
        readLock.lock();
        try {
            return m.get(key);
        } finally {
            readLock.unlock();
        }
    }

    public Object put(String key, Object value) {
        writeLock.lock();
        try {
            return m.put(key, value);
        } finally {
            writeLock.unlock();
        }
    }
}

推薦閱讀

  1. 死磕 java同步系列之ReentrantLock VS synchronized

  2. 死磕 java同步系列之ReentrantLock源碼解析(二)——條件鎖

  3. 死磕 java同步系列之ReentrantLock源碼解析(一)——公平鎖、非公平鎖

  4. 死磕 java同步系列之AQS起篇

  5. 死磕 java同步系列之本身動手寫一個鎖Lock

  6. 死磕 java魔法類之Unsafe解析

  7. 死磕 java同步系列之JMM(Java Memory Model)

  8. 死磕 java同步系列之volatile解析

  9. 死磕 java同步系列之synchronized解析

歡迎關注個人公衆號「彤哥讀源碼」,查看更多源碼系列文章, 與彤哥一塊兒暢遊源碼的海洋。

qrcode

相關文章
相關標籤/搜索