Java顯式鎖學習總結之五:ReentrantReadWriteLock源碼分析

概述

咱們在介紹AbstractQueuedSynchronizer的時候介紹過,AQS支持獨佔式同步狀態獲取/釋放、共享式同步狀態獲取/釋放兩種模式,對應的典型應用分別是ReentrantLock和Semaphore,AQS還能夠混合兩種模式使用,讀寫鎖ReentrantReadWriteLock就是如此。java

設想如下情景:咱們在系統中有一個多線程訪問的緩存,多個線程均可以對緩存進行讀或寫操做,可是讀操做遠遠多於寫操做,要求寫操做要線程安全,且寫操做執行完成要求對當前的全部讀操做立刻可見。緩存

分析上面的需求:由於有多個線程可能會執行寫操做,所以多個線程的寫操做必須同步串行執行;而寫操做執行完成要求對當前的全部讀操做立刻可見,這就意味着當有線程正在讀的時候,要阻塞寫操做,當正在執行寫操做時,要阻塞讀操做。一個簡單的實現就是將數據直接加上互斥鎖,同一時刻不論是讀仍是寫線程,都只能有一個線程操做數據。可是這樣的問題就是若是當前只有N個讀線程,沒有寫線程,這N個讀線程也要傻呵呵的排隊讀,儘管實際上是能夠安全併發提升效率的。所以理想的實現是:安全

當有寫線程時,則寫線程獨佔同步狀態。多線程

當沒有寫線程時只有讀線程時,則多個讀線程能夠共享同步狀態。併發

讀寫鎖就是爲了實現這種效果而生。函數

使用示例

咱們先來看一下讀寫鎖怎麼使用,這裏咱們基於hashmap(自己線程不安全)作一個多線程併發安全的緩存:源碼分析

public class ReadWriteCache {
    private static Map<String, Object> data = new HashMap<>();
    private static ReadWriteLock lock = new ReentrantReadWriteLock(false);
    private static Lock rlock = lock.readLock();
    private static Lock wlock = lock.writeLock();

    public static Object get(String key) {
        rlock.lock();
        try {
            return data.get(key);
        } finally {
            rlock.unlock();
        }
    }

    public static Object put(String key, Object value) {
        wlock.lock();
        try {
            return data.put(key, value);
        } finally {
            wlock.unlock();
        }
    }

}

限於篇幅咱們只實現2個方法,get和put。從代碼能夠看出,咱們先建立一個  ReentrantReadWriteLock 對象,構造函數 false 表明是非公平的(非公平的含義和ReentrantLock相同)。而後經過readLock、writeLock方法分別獲取讀鎖和寫鎖。在作讀操做的時候,也就是get方法,咱們要先獲取讀鎖;在作寫操做的時候,即put方法,咱們要先獲取寫鎖。ui

經過以上代碼,咱們就構造了一個線程安全的緩存,達到咱們以前說的:寫線程獨佔同步狀態,多個讀線程能夠共享同步狀態。this

源碼分析

咱們先來看下 ReentrantReadWriteLock 類的總體結構:spa

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** Inner class providing writelock */
    private final ReentrantReadWriteLock.WriteLock writerLock;
    /** Performs all synchronization mechanics */
    final Sync sync;

    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }


    abstract static class Sync extends AbstractQueuedSynchronizer {}
static final class NonfairSync extends Sync {}
static final class FairSync extends Sync {}
public static class ReadLock implements Lock, java.io.Serializable {}
public static class WriteLock implements Lock, java.io.Serializable {} }

能夠看到,在公平鎖與非公平鎖的實現上,與ReentrantLock同樣,也是有一個繼承AQS的內部類Sync,而後NonfairSync和FairSync都繼承Sync,經過構造函數傳入的布爾值決定要構造哪種Sync實例。

讀寫鎖比ReentrantLock多出了兩個內部類:ReadLock和WriteLock, 用來定義讀鎖和寫鎖,而後在構造函數中,會構造一個讀鎖和一個寫鎖實例保存到成員變量 readerLock 和 writerLock。咱們在上面的示例中使用到的 readLock() 和 writeLock() 方法就是返回這兩個成員變量保存的鎖實例。

咱們在Sync類中能夠看到下列代碼:

        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT); //每次要讓共享鎖+1,就應該讓state加 1<<16
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;  //每種鎖的最大重入數量
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        /** Returns the number of shared holds represented in count  */
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        /** Returns the number of exclusive holds represented in count  */
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

能夠看到主要是幾個位移操做,經過上面的總體結構,咱們知道了在讀寫鎖內保存了讀鎖和寫鎖的兩個實例。以前在ReentrantLock中,咱們知道鎖的狀態是保存在Sync實例的state字段中的(繼承自父類AQS),如今有了讀寫兩把鎖,然而能夠看到仍是隻有一個Sync實例,那麼一個Sync實例的state是如何同時保存兩把鎖的狀態的呢?答案就是用了位分隔:

state字段是32位的int,讀寫鎖用state的低16位保存寫鎖(獨佔鎖)的狀態;高16位保存讀鎖(共享鎖)的狀態。

所以要獲取獨佔鎖當前的重入數量,就是 state & ((1 << 16) -1) (即 exclusiveCount 方法)

要獲取共享鎖當前的重入數量,就是 state >>> 16 (即 sharedCount 方法)

下面咱們具體看寫鎖和讀鎖的實現。

寫鎖

看下WriteLock類中的lock和unlock方法:

        public void lock() {
            sync.acquire(1);
        }

        public void unlock() {
            sync.release(1);
        }

能夠看到就是調用的獨佔式同步狀態的獲取與釋放,所以真實的實現就是Sync的 tryAcquire和 tryRelease。

寫鎖的獲取

看下tryAcquire:

        protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c); //獲取獨佔鎖的重入數
            if (c != 0) {
                // 當前state不爲0,此時:若是寫鎖狀態爲0說明讀鎖此時被佔用返回false;若是寫鎖狀態不爲0且寫鎖沒有被當前線程持有返回false
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded"); //寫鎖重入數溢出
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }
//到這裏了說明state爲0,嘗試直接cas。writerShouldBlock是爲了實現公平或非公平策略的
if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }

邏輯很簡單,直接看註釋就能理解。

寫鎖的釋放

看下tryRelease:

        protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();  //非獨佔模式直接拋異常
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            if (free) 
                setExclusiveOwnerThread(null); //若是獨佔模式重入數爲0了,說明獨佔模式被釋放
            setState(nextc);  //無論獨佔模式是否被釋放,更新獨佔重入數
            return free;
        }

邏輯很簡單,直接看註釋就能理解。

讀鎖

相似於寫鎖,讀鎖的lock和unlock的實際實現對應Sync的 tryAcquireShared 和 tryReleaseShared方法。

讀鎖的獲取

        protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1; //若是獨佔模式被佔且不是當前線程持有,則獲取失敗
            int r = sharedCount(c);
//若是公平策略沒有要求阻塞且重入數沒有到達最大值,則直接嘗試CAS更新state
if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
//更新成功後會在firstReaderHoldCount中或readHolds(ThreadLocal類型的)的本線程副本中記錄當前線程重入數(淺藍色代碼),這是爲了實現jdk1.6中加入的getReadHoldCount()方法的,這個方法能獲取當前線程重入共享鎖的次數(state中記錄的是多個線程的總重入次數),加入了這個方法讓代碼複雜了很多,可是其原理仍是很簡單的:若是當前只有一個線程的話,還不須要動用ThreadLocal,直接往firstReaderHoldCount這個成員變量裏存重入數,當有第二個線程來的時候,就要動用ThreadLocal變量readHolds了,每一個線程擁有本身的副本,用來保存本身的重入數。
if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); //用來處理CAS沒成功的狀況,邏輯和上面的邏輯是相似的,就是加了無限循環 }

 下面這個方法就不用細說了,和上面的處理邏輯相似,加了無限循環用來處理CAS失敗的狀況。

        final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                } else if (readerShouldBlock()) {
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

讀鎖的釋放

 

        protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
//淺藍色代碼也是爲了實現jdk1.6中加入的getReadHoldCount()方法,在更新當前線程的重入數。
if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; }
//這裏是真正的釋放同步狀態的邏輯,就是直接同步狀態-SHARED_UNIT,而後CAS更新,沒啥好說的
for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }

 

補充內容

經過上面的源碼分析,咱們能夠發現一個現象:

在線程持有讀鎖的狀況下,該線程不能取得寫鎖(由於獲取寫鎖的時候,若是發現當前的讀鎖被佔用,就立刻獲取失敗,無論讀鎖是否是被當前線程持有)

在線程持有寫鎖的狀況下,該線程能夠繼續獲取讀鎖(獲取讀鎖時若是發現寫鎖被佔用,只有寫鎖沒有被當前線程佔用的狀況纔會獲取失敗)

仔細想一想,這個設計是合理的:由於當線程獲取讀鎖的時候,可能有其餘線程同時也在持有讀鎖,所以不能把獲取讀鎖的線程「升級」爲寫鎖;而對於得到寫鎖的線程,它必定獨佔了讀寫鎖,所以能夠繼續讓它獲取讀鎖,當它同時獲取了寫鎖和讀鎖後,還能夠先釋放寫鎖繼續持有讀鎖,這樣一個寫鎖就「降級」爲了讀鎖。

綜上:

一個線程要想同時持有寫鎖和讀鎖,必須先獲取寫鎖再獲取讀鎖;

寫鎖能夠「降級」爲讀鎖;

讀鎖不能「升級」爲寫鎖。

總結

讀寫鎖仍是很實用的,由於通常場景下,數據的併發操做都是讀多於寫,在這種狀況下,讀寫鎖可以提供比排它鎖更好的併發性。

在讀寫鎖的實現方面,原本覺得會比較複雜,結果看完源碼的感覺也是快刀切西瓜,看來AQS的設計真的很棒,在AQS的基礎上構建的組件實現都很簡單。

相關文章
相關標籤/搜索