Java併發(十):讀寫鎖ReentrantReadWriteLock

先作總結:

一、爲何用讀寫鎖 ReentrantReadWriteLock?html

重入鎖ReentrantLock是排他鎖,在同一時刻僅有一個線程能夠進行訪問,可是在大多數場景下,大部分時間都是提供讀服務,而寫服務佔有的時間較少。然而讀服務不存在數據競爭問題,若是一個線程在讀時禁止其餘線程讀勢必會致使性能下降。因此就提供了讀寫鎖。java

讀寫鎖維護着一對鎖,一個讀鎖和一個寫鎖。經過分離讀鎖和寫鎖,使得併發性比通常的排他鎖有了較大的提高:在同一時間能夠容許多個讀線程同時訪問,可是在寫線程訪問時,全部讀線程和寫線程都會被阻塞。緩存

二、讀寫鎖實現原理:併發

(1)每一個ReentrantReadWriteLock對象都對應着讀鎖和寫鎖兩個鎖。oop

(2)ReentrantReadWriteLock經過其屬性sync(繼承了AQS),一個對象實現了讀寫兩個鎖。源碼分析

(3)sync.state(int)分爲高 16 位和低16位,高16位用於共享模式ReadLock,低16位用於獨佔模式WriteLock
post

(4)獲取寫鎖標誌:性能

    1.sync.state的低16位(0表明沒有被佔用,大於0表明有線程持有當前鎖(鎖能夠重入,每次重入都+1) 最多2^16-1次重入ui

    2.sync.exclusiveOwnerThread == Thread.currentThread()url

(5)獲取讀鎖標誌:

  1.state的高16位(0表明沒有被佔用,大於0表明有線程持有當前鎖(鎖能夠重入,每次重入都+1) 最多2^16-1次重入

  2.ThreadLocalHoldCounter readHolds; // 記錄線程持有的讀鎖數量(ThreadLocalHoldCounter extends ThreadLocal)

    readHolds.threadLocals - Map<ThreadLocal, HoldCounter>

    HoldCounter - count  tid

    (關於ThreadLocal:Java併發(二十):線程本地變量ThreadLocal

  3.sync.cachedHoldCounter 記錄最後一個獲取讀鎖的線程的讀鎖重入次數,用於緩存提升性能

  4.sync.firstReader 第一個獲取讀鎖的線程(而且其未釋放讀鎖),以及它持有的讀鎖數量  提升性能

(6)ReentrantReadWriteLock的內部類WriteLock/ReadLock經過操做sync的屬性實現的鎖的操做。

1、類結構

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
    // 屬性
    private final ReentrantReadWriteLock.ReadLock readerLock;
    private final ReentrantReadWriteLock.WriteLock writerLock;
    final Sync sync; //// 內部類
    abstract static class Sync extends AbstractQueuedSynchronizer {}
    static final class FairSync extends Sync {}
    static final class NonfairSync extends Sync {}
    public static class ReadLock implements Lock, java.io.Serializable {}
    public static class WriteLock implements Lock, java.io.Serializable {}
}

2、讀寫鎖實現

ReadLock 使用了共享模式,WriteLock 使用了獨佔模式。

ReadLock 和 WriteLock都是經過同一個Sync實例實現的。

AQS 將 state(int)分爲高 16 位和低16位,高16位用於共享模式ReadLock,低16位用於獨佔模式WriteLock 。

    static final int SHARED_SHIFT   = 16;
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; } // 無符號補0右移16位 - 讀鎖
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } // 抹掉高16位 - 寫鎖

WriteLock:

1.state的低16位(0表明沒有被佔用,大於0表明有線程持有當前鎖(鎖能夠重入,每次重入都+1) 最多2^16-1次重入

2.exclusiveOwnerThread == Thread.currentThread()

ReadLock:

1.state的高16位(0表明沒有被佔用,大於0表明有線程持有當前鎖(鎖能夠重入,每次重入都+1) 最多2^16-1次重入

2.ThreadLocalHoldCounter readHolds; // 記錄線程持有的讀鎖數量

  readHolds.threadLocals - Map<Thread, HoldCounter>

  HoldCounter - count  tid

3、源碼分析

 寫鎖獲取:

    // ReentrantReadWriteLock.WriteLock.lock()
    public void lock() {
        sync.acquire(1);
    }
    
    // AbstractQueuedSynchronizer.acquire(int)
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    /**
     * ReentrantReadWriteLock.Sync.tryAcquire(int)
     * 能夠獲取寫鎖的兩種狀況:
     * 1.沒有線程佔用該鎖(讀鎖和寫鎖都沒有被佔用)
     * 2.當前線程已經拿到過該寫鎖,重入
     */
    protected final boolean tryAcquire(int acquires) {
        Thread current = Thread.currentThread();
        int c = getState();
        int w = exclusiveCount(c); // 寫鎖
        if (c != 0) { // 有鎖 
            if (w == 0 || current != getExclusiveOwnerThread()) // 有鎖且沒有寫鎖(即有讀鎖) || 其餘線程佔用了寫鎖
                return false;
            if (w + exclusiveCount(acquires) > MAX_COUNT) // 重入鎖上限 2^16-1
                throw new Error("Maximum lock count exceeded");
            setState(c + acquires);
            return true;
        }
        // 沒有線程佔用該鎖,直接獲取鎖
        if (writerShouldBlock() || // 若是是公平鎖須要排隊
            !compareAndSetState(c, c + acquires))
            return false;
        setExclusiveOwnerThread(current);
        return true;
    }

 寫鎖釋放:

    // ReentrantReadWriteLock.WriteLock.unlock()
    public void unlock() {
        sync.release(1);
    }
    
    // AbstractQueuedSynchronizer.release(int)
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    /**
     * ReentrantReadWriteLock.Sync.tryRelease(int)
     * 釋放寫鎖:維護state和exclusiveOwnerThread
     * 
     */
    protected final boolean tryRelease(int releases) {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        int nextc = getState() - releases;
        boolean free = exclusiveCount(nextc) == 0;
        if (free)
            setExclusiveOwnerThread(null);
        setState(nextc);
        return free;
    }

讀鎖

    abstract static class Sync extends AbstractQueuedSynchronizer {
        // 這個嵌套類的實例用來記錄每一個線程持有的讀鎖數量(讀鎖重入)
        static final class HoldCounter {
            int count = 0; // 持有的讀鎖數
            final long tid = getThreadId(Thread.currentThread()); // 線程 id
        }

        static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }
        
        /**
          * 組合使用上面兩個類,用一個 ThreadLocal 來記錄線程持有的讀鎖數量
          */ 
        private transient ThreadLocalHoldCounter readHolds;

        /**
         * 用於緩存,記錄最後一個獲取讀鎖的線程的讀鎖重入次數
         * 無論哪一個線程獲取到讀鎖後,就把這個值佔爲已用,這樣就不用到 ThreadLocal 中查詢 map 了
         * 一般讀鎖的獲取很快就會伴隨着釋放,在 獲取->釋放 讀鎖這段時間,若是沒有其餘線程獲取讀鎖的話,此緩存就能幫助提升性能
         */
        private transient HoldCounter cachedHoldCounter;

        /**
         * 第一個獲取讀鎖的線程(而且其未釋放讀鎖),以及它持有的讀鎖數量
         * 提升性能用
         */
        private transient Thread firstReader = null;
        private transient int firstReaderHoldCount;

    }    

讀鎖獲取:

// ReentrantReadWriteLock.ReadLock.lock()
    public void lock() {
        sync.acquireShared(1);
    }
    
    // AbstractQueuedSynchronizer.acquireShared(int)
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    
    /**
     * ReentrantReadWriteLock.Sync.tryAcquireShared(int)
     * 能夠獲取讀鎖狀況:
     * 1.沒有線程佔用該鎖
     * 2.只有讀鎖
     * 3.有寫鎖,寫鎖被當前線程佔用,鎖降級
     * 三種狀況 - 只要沒有其餘線程佔用寫鎖就能夠獲取讀鎖
     */
    protected final int tryAcquireShared(int unused) {
        Thread current = Thread.currentThread();
        int c = getState();
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current) // 其餘線程佔用寫鎖
            return -1;
        int r = sharedCount(c); // 讀鎖
        if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // 進入此if,表示能夠拿到讀鎖了 
            if (r == 0) { // 將"第一個"獲取讀鎖的線程記錄在firstReader屬性中
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else { // 1.當前線程及對應讀鎖次數存入cachedHoldCounter 2.當前線程及對應讀鎖次數存入readHolds
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();// readHolds中取當先線程的ThreadLocal(沒有就建立一個)
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return 1;
        }
        return fullTryAcquireShared(current);// 公平鎖排隊非公平鎖下一個是寫鎖/讀鎖重入次數上限/CAS失敗  從新拿讀鎖
    }
    
    /**
     * ReentrantReadWriteLock.Sync.fullTryAcquireShared(Thread)
     */
    final int fullTryAcquireShared(Thread current) {
        HoldCounter rh = null;
        for (;;) { // 循環CAS拿鎖 int c = getState();
            if (exclusiveCount(c) != 0) {
                if (getExclusiveOwnerThread() != current) // 其餘線程佔用寫鎖
                    return -1;
            } else if (readerShouldBlock()) { // 處理讀鎖重入,將cachedHoldCounter設置爲當前線程
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                } else {
                    if (rh == null) {
                        rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current)) {
                            rh = readHolds.get();
                            if (rh.count == 0)
                                readHolds.remove();
                        }
                    }
                    if (rh.count == 0) // 不是重入,返回-1
                        return -1;
                }
            }
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            if (compareAndSetState(c, c + SHARED_UNIT)) { // 正常拿讀鎖
                if (sharedCount(c) == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    if (rh == null)
                        rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                    cachedHoldCounter = rh; // cache for release
                }
                return 1;
            }
        }
    }

鎖降級:

持有寫鎖的線程,去獲取讀鎖的過程稱爲鎖降級

讀鎖釋放:

    // ReentrantReadWriteLock.ReadLock.unlock()
    public void unlock() {
        sync.releaseShared(1);
    }
    
    // AbstractQueuedSynchronizer.releaseShared(int)
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    /**
     * ReentrantReadWriteLock.Sync.tryReleaseShared(int)
     * 維護readHolds state
     */
    protected final boolean tryReleaseShared(int unused) {
        Thread current = Thread.currentThread();
        if (firstReader == current) { // 第一個獲取讀鎖的線程
            if (firstReaderHoldCount == 1)
                firstReader = null;
            else
                firstReaderHoldCount--;
        } else { // readHolds中次數-1
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();
            int count = rh.count;
            if (count <= 1) {
                readHolds.remove();
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            --rh.count;
        }
        for (;;) { // state
            int c = getState();
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc))
                // Releasing the read lock has no effect on readers,
                // but it may allow waiting writers to proceed if
                // both read and write locks are now free.
                return nextc == 0;
        }
    }

 

 

參考資料 / 相關推薦

Java 讀寫鎖 ReentrantReadWriteLock 源碼分析

【死磕Java併發】—–J.U.C之讀寫鎖:ReentrantReadWriteLock

相關文章
相關標籤/搜索