Java併發——讀寫鎖ReentrantReadWriteLock

簡介

ReentrantReadWriteLock便可重入讀寫鎖,一樣也依賴於AQS來實現。在介紹ReentrantLock咱們知道其依託AQS的同步狀態來判斷鎖是否佔有,而ReentrantReadWriteLock既有讀鎖又有寫鎖,是如何依靠一個狀態來維持的?
編程

ReentrantReadWriteLock

ReentrantReadWriteLock讀寫鎖,與ReentrantLock同樣默認非公平,內部定義了讀鎖ReadLock()和寫鎖WriteLock(),在同一時間容許被多個讀線程訪問,但在寫線程訪問時,全部讀線程和寫線程都會被阻塞。讀寫鎖主要特性:公平性、可重入性、鎖降級安全

寫鎖的獲取與釋放

  • 寫鎖獲取
  • 寫鎖是一個支持重進入的排它鎖,其獲取的核心方法:併發

    protected final boolean tryAcquire(int acquires) {
                // 獲取當前線程
                Thread current = Thread.currentThread();
                // 獲取ReentrantReadWriteLock鎖總體同步狀態
                int c = getState();
                // 獲取寫鎖同步狀態
                int w = exclusiveCount(c);
                // 存在讀鎖或寫鎖
                if (c != 0) {
                    // c != 0 && w == 0 即若存在讀鎖或寫鎖持有線程不是當前線程,獲取寫鎖失敗
                    if (w == 0 || current != getExclusiveOwnerThread())
                        return false;
                    // 最多65535次重入,若超過報錯
                    if (w + exclusiveCount(acquires) > MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    // 可重入,設置同步狀態
                    setState(c + acquires);
                    return true;
                }
                // 公平與非公平,同步隊列是否有節點,同時cas設置狀態
                if (writerShouldBlock() ||
                    !compareAndSetState(c, c + acquires))
                    return false;
                // 設置獲取鎖的線程爲當前線程
                setExclusiveOwnerThread(current);
                return true;
            }
    複製代碼

    從源碼中咱們能夠發現getState()獲取的是讀鎖與寫鎖總同步狀態,再經過exclusiveCount()方法單獨獲取寫鎖同步狀態post

    static final int SHARED_SHIFT   = 16;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
            
        static int exclusiveCount(int c) {
            return c & EXCLUSIVE_MASK; 
        }
    複製代碼

    ReentrantReadWriteLock經過按位切割state變量,同步狀態的低16位表示寫鎖獲取次數,高16位表示讀鎖獲取次數,如圖示意 ui

    因此這解釋了爲何寫鎖獲取次數最多65535次
    spa

    寫鎖獲取總體思路:當讀鎖已經被讀線程獲取或者寫鎖已經被其餘寫線程獲取,則寫鎖獲取失敗;不然,獲取成功並可重入,增長寫鎖同步狀態線程

  • 寫鎖釋放
  • protected final boolean tryRelease(int releases) {
                // 若釋放的線程不爲鎖的持有者
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                // 從新設置同步狀態
                int nextc = getState() - releases;
                // 若新的寫鎖持有線程數爲0,則將鎖的持有線程置爲null
                boolean free = exclusiveCount(nextc) == 0;
                if (free)
                    setExclusiveOwnerThread(null);
                // 更新同步狀態    
                setState(nextc);
                return free;
            }
    複製代碼
    寫鎖的釋放與ReentrantLock的釋放過程基本相似,每次釋放均減小寫狀態,當寫狀態爲0 時表示寫鎖已被釋放,從而等待的讀寫線程可以繼續訪問讀寫鎖,同時前次寫線程的修改對後續讀寫線程可見

    讀鎖的獲取與釋放

    讀鎖相對於寫鎖(獨佔鎖或排他鎖),讀鎖是一個支持重進入的共享鎖,它可以被多個線程同時獲取,在沒有其餘寫線程訪問(或者寫狀態爲0)時,讀鎖總會被成功地獲取,而所作的也只是(線程安全的)增長讀狀態
    3d

  • 讀鎖的獲取
  • protected final int tryAcquireShared(int unused) {
                // 獲取當前線程
                Thread current = Thread.currentThread();
                int c = getState();
                // 若寫鎖已被佔有,且寫鎖佔有線程不是當前線程,讀鎖獲取失敗
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return -1;
                // 讀鎖狀態
                int r = sharedCount(c);
                // 判斷讀鎖是否須要公平,讀鎖持有線程數是否小於極值,CAS設置讀鎖狀態
                if (!readerShouldBlock() &&
                    r < MAX_COUNT &&
                    compareAndSetState(c, c + SHARED_UNIT)) {
                    // 若讀鎖未被線程佔有,則更新firstReader和firstReaderHoldCount
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    // 若是獲取讀鎖的線程爲第一次獲取讀鎖的線程,則firstReaderHoldCount重入數 + 1    
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return 1;
                }
                return fullTryAcquireShared(current);
            }
    複製代碼

    讀鎖獲取總體思路
    ①.判斷寫鎖是否被佔有,寫鎖佔有線程是否不是當前線程,若成立則讀鎖獲取失敗
    ②.判斷讀鎖是否須要公平,讀鎖持有線程數是否小於極值,CAS設置讀鎖狀態成功,若條件不知足,會調用fullTryAcquireShared()方法自旋再次嘗試獲取讀鎖;若條件知足修改當前線程HoldCounter的值
    final int fullTryAcquireShared(Thread current) {
                HoldCounter rh = null;
                for (;;) {
                    int c = getState();
                    // 若寫鎖已被佔有,且寫鎖佔有線程不是當前線程
                    if (exclusiveCount(c) != 0) {
                        if (getExclusiveOwnerThread() != current)
                            return -1;
                    // 公平性        
                    } else if (readerShouldBlock()) {
                        // Make sure we're not acquiring read lock reentrantly
                        if (firstReader == current) {
                            // assert firstReaderHoldCount > 0;
                        } else {
                            if (rh == null) {
                                rh = cachedHoldCounter;
                                if (rh == null || rh.tid != getThreadId(current)) {
                                    rh = readHolds.get();
                                    if (rh.count == 0)
                                        readHolds.remove();
                                }
                            }
                            if (rh.count == 0)
                                return -1;
                        }
                    }
                    // 讀鎖佔有線程達到極值
                    if (sharedCount(c) == MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    // cas設置成功    
                    if (compareAndSetState(c, c + SHARED_UNIT)) {
                        // 若讀鎖未被線程佔有,則更新firstReader和firstReaderHoldCount
                        if (sharedCount(c) == 0) {
                            firstReader = current;
                            firstReaderHoldCount = 1;
                        // 若是獲取讀鎖的線程爲第一次獲取讀鎖的線程,則firstReaderHoldCount重入數 + 1    
                        } else if (firstReader == current) {
                            firstReaderHoldCount++;
                        } else {
                            if (rh == null)
                                rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current))
                                rh = readHolds.get();
                            else if (rh.count == 0)
                                readHolds.set(rh);
                            rh.count++;
                            cachedHoldCounter = rh; // cache for release
                        }
                        return 1;
                    }
                }
            }
    複製代碼
  • 讀鎖的釋放
  • protected final boolean tryReleaseShared(int unused) {
                Thread current = Thread.currentThread();
                // 若當前線程爲第一個獲取讀鎖的線程
                if (firstReader == current) {
                    // 若只有獲取一次,將firstReader置爲null
                    if (firstReaderHoldCount == 1)
                        firstReader = null;
                    // 若屢次,firstReaderHoldCount-1
                    else
                        firstReaderHoldCount--;
                } else {
                    // 更新當前線程獲取鎖次數
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    int count = rh.count;
                    if (count <= 1) {
                        readHolds.remove();
                        if (count <= 0)
                            throw unmatchedUnlockException();
                    }
                    --rh.count;
                }
                // 自旋CAS更新讀鎖同步狀態
                for (;;) {
                    int c = getState();
                    int nextc = c - SHARED_UNIT;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
    複製代碼
  • HoldCounter
  • HoldCounter在讀鎖中起到了很重要的做用,用來計算每一個線程的讀鎖重入次數,並使用ThreadLocal類型的HoldCounter,能夠記錄每一個線程的鎖的重入次數。 cachedHoldCounter記錄了最後1個獲取讀鎖的線程的重入次數。 firstReader指向了第一個獲取讀鎖的線程,firstReaderHoldCounter記錄了第一個獲取讀鎖的線程的重入次數

    code

    static final class HoldCounter { int count = 0; final long tid = getThreadId(Thread.currentThread()); }
        static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }
        
        private transient HoldCounter cachedHoldCounter;
        private transient int firstReaderHoldCount;
        private transient Thread firstReader = null;
    複製代碼
    複製代碼static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } private transient HoldCounter cachedHoldCounter; private transient int firstReaderHoldCount; private transient Thread firstReader = null; 複製代碼複製代碼

    鎖降級

    鎖降級指的是寫鎖降級成爲讀鎖,即先獲取寫鎖、獲取讀鎖在釋放寫鎖的過程,目的爲了保證數據的可見性。假設有兩個線程A、B,若線程A獲取到寫鎖,不獲取讀鎖而是直接釋放寫鎖,這時線程B獲取了寫鎖並修改了數據,那麼線程A沒法知道線程B的數據更新。若是線程A獲取讀鎖,即遵循鎖降級的步驟,則線程B將會被阻塞,直到線程A使用數據並釋放讀鎖以後,線程B才能獲取寫鎖進行數據更新。cdn

    總結

    當有線程獲取讀鎖時,不容許再有線程得到寫鎖
    當有線程得到寫鎖時,不容許其餘線程得到讀鎖和寫鎖
    寫鎖能降級爲讀鎖,讀鎖沒法升級成寫鎖

    感謝

    《Java併發編程的藝術》 http://cmsblogs.com/?p=2213

    相關文章
    相關標籤/搜索