讀寫鎖ReadWriteLock的實現原理

掃描下方二維碼或者微信搜索公衆號菜鳥飛呀飛,便可關注微信公衆號,閱讀更多Spring源碼分析Java併發編程文章。java

微信公衆號

問題

在閱讀本文以前能夠先思考一下幾個問題編程

    1. 什麼是讀寫鎖?
    1. ReadWriteLock存在的意義是什麼?
    1. 讀寫鎖適用於什麼場景?
    1. 什麼是鎖降級和鎖升級?

簡介

  • synchronized和ReentrantLock實現的鎖是排他鎖,所謂排他鎖就是同一時刻只容許一個線程訪問共享資源,可是在平時場景中,咱們一般會碰到對於共享資源讀多寫少的場景。對於讀場景,每次只容許一個線程訪問共享資源,顯然這種狀況使用排他鎖效率就比較低下,那麼該如何優化呢?
  • 這個時候讀寫鎖就應運而生了,讀寫鎖是一種通用技術,並非Java特有的。從名字來看,讀寫鎖擁有兩把鎖,讀鎖寫鎖。讀寫鎖的特色是:同一時刻容許多個線程對共享資源進行讀操做;同一時刻只容許一個線程對共享資源進行寫操做;當進行寫操做時,同一時刻其餘線程的讀操做會被阻塞;當進行讀操做時,同一時刻全部線程的寫操做會被阻塞。對於讀鎖而言,因爲同一時刻能夠容許多個線程訪問共享資源,進行讀操做,所以稱它爲共享鎖;而對於寫鎖而言,同一時刻只容許一個線程訪問共享資源,進行寫操做,所以稱它爲排他鎖。
  • 在Java中經過ReadWriteLock來實現讀寫鎖。ReadWriteLock是一個接口,ReentrantReadWriteLock是ReadWriteLock接口的具體實現類。在ReentrantReadWriteLock中定義了兩個內部類ReadLockWriteLock,分別來實現讀鎖和寫鎖。ReentrantReadWriteLock底層是經過AQS來實現鎖的獲取與釋放的,所以ReentrantReadWriteLock內部還定義了一個繼承了AQS類的同步組件Sync,同時ReentrantReadWriteLock還支持公平與非公平性,所以它內部還定義了兩個內部類FairSync、NonfairSync,它們繼承了Sync。
  • ReentrantReadWriteLock除了提供讀鎖、寫鎖的釋放與獲取外,還提供了一些其餘和鎖狀態有關的方法。以下表所示(表格來源於《Java併發編程的藝術》一書第141頁)。
方法名 功能
int getReadLockCount() 獲取讀鎖的數量,此時讀鎖的數量不必定等於獲取鎖的數量,由於鎖能夠重入,可能有線程重入了讀鎖
int getReadHoldCount() 獲取當前線程重入讀鎖的次數
int getWriteHoldCount() 獲取當前線程重入寫鎖的次數
int isWriteLocked() 判斷鎖的狀態是不是寫鎖,返回true,表示鎖的狀態是寫鎖

實現原理

  • 在AQS中,經過int類型的全局變量state來表示同步狀態,即用state來表示鎖。ReentrantReadWriteLock也是經過AQS來實現鎖的,可是ReentrantReadWriteLock有兩把鎖:讀鎖和寫鎖,它們保護的都是同一個資源,那麼如何用一個共享變量來區分鎖是寫鎖仍是讀鎖呢?答案就是按位拆分
  • 因爲state是int類型的變量,在內存中佔用4個字節,也就是32位。將其拆分爲兩部分:高16位和低16位,其中高16位用來表示讀鎖狀態,低16位用來表示寫鎖狀態。當設置讀鎖成功時,就將高16位加1,釋放讀鎖時,將高16位減1;當設置寫鎖成功時,就將低16位加1,釋放寫鎖時,將第16位減1。以下圖所示。

讀寫鎖

  • 那麼如何根據state的值來判斷當前鎖的狀態時寫鎖仍是讀鎖呢?
  • 假設鎖當前的狀態值爲S,將S和16進制數0x0000FFFF進行與運算,即S&0x0000FFFF,運算時會將高16位全置爲0,將運算結果記爲c,那麼c表示的就是寫鎖的數量。若是c等於0就表示尚未線程獲取鎖;若是c不等於0,就表示有線程獲取到了鎖,c等於幾就表明寫鎖重入了幾回。
  • 將S無符號右移16位(S>>>16),獲得的結果就是讀鎖的數量。當S>>>16獲得的結果不等於0,且c也不等於0時,就表示當前線程既持有了寫鎖,也持有了讀鎖。
  • 當成功獲取到讀鎖時,如何對讀鎖進行加1呢?S +(1<<16)獲得的結果,就是將對鎖加1。釋放讀鎖是,就進行S - (1<<16)運算。
  • 當成功獲取到寫鎖時,令S+1即表示寫鎖狀態+1;釋放寫鎖時,就進行S-1運算。
  • 因爲讀鎖和寫鎖的狀態值都只佔用16位,因此讀鎖的最大數量爲 2^{16}-1,寫鎖可被重入的最大次數爲2^{16}-1。

源碼分析

理解了如何經過state來表示鎖的狀態,接下來將經過源碼來分析讀寫鎖的源碼實現。設計模式

  • 經過以下代碼,便可建立讀鎖和寫鎖。ReentrantReadWriteLock的構造方法中若是不傳參數,默認建立的是非公平的讀寫鎖。在讀寫鎖中,仍然是非公平的讀寫鎖性能要因爲公平的讀寫鎖
ReadWriteLock lock = new ReentrantReadWriteLock();
// 建立讀鎖
Lock readLock = lock.readLock();
// 建立寫鎖
Lock writeLock = lock.writeLock();	
複製代碼

寫鎖加鎖

  • 當調用寫鎖的lock()方法時,線程會嘗試獲取寫鎖,即writeLock.lock()。因爲寫鎖是排他鎖,因此寫鎖的獲取過程幾乎與ReentrantLock獲取鎖的邏輯同樣。當調用lock()方法時,會先調用到AQS的acquire()方法,在acquire()方法中會先調用子類的tryAcquire()方法,所以這裏調用的是ReentrantReadWriteLock的內部類Sync的tryAcquire()方法。該方法的源碼以下。
protected final boolean tryAcquire(int acquires) {
    
    Thread current = Thread.currentThread();
    int c = getState();
    // exclusiveCount()方法的做用是將同步變量與0xFFFF作&運算,計算結果就是寫鎖的數量。
    // 所以w的值的含義就是寫鎖的數量
    int w = exclusiveCount(c);
    // 若是c不爲0就表示鎖被佔用了,可是佔用的是寫鎖仍是讀書呢?這個時候就須要根據w的值來判斷了。
    // 若是c等於0就表示此時鎖尚未被任何線程佔用,那就讓線程直接去嘗試獲取鎖
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        //
        /** * 1. 若是w爲0,說明寫鎖的數量爲0,而此時又由於c不等於0,說明鎖被佔用,可是不是寫鎖,那麼此時鎖的狀態必定是讀鎖, * 既然是讀鎖狀態,那麼寫鎖此時來獲取鎖時,就確定失敗,所以當w等於0時,tryAcquire()方法返回false。 * 2. 若是w不爲0,說明此時鎖的狀態時寫鎖,接着進行current != getExclusiveOwnerThread()判斷,判斷持有鎖的線程是不是當前線程 * 若是不是當前線程,那麼tryAcquire()返回false;若是是當前線程,那麼就進行後面的邏輯。爲何是當前線程持有鎖,就還能執行後面的邏輯呢? * 由於讀寫鎖是支持重入的。 */
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 下面一行代碼是判斷,寫鎖的重入次數或不會超過最大限制,這個最大限制是:2的16次方減1
        // 爲何是2的16次方減1呢?由於state的低16位存放的是寫鎖,所以寫鎖數量的最大值是2的16次方減1
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);
        return true;
    }
    /** * 1. writerShouldBlock()方法的做用是判斷當前線程是否應該阻塞,對於公平的寫鎖和非公平寫鎖的具體實現不同。 * 對於非公平寫鎖而言,直接返回false,由於非公平鎖獲取鎖以前不須要去判斷是否排隊 * 對於公平鎖寫鎖而言,它會判斷同步隊列中是否有人在排隊,有人排隊,就返回true,表示當前線程須要阻塞。無人排隊就返回false。 * * 2. 當writerShouldBlock()返回true時,表示當前線程還不能直接獲取鎖,所以tryAcquire()方法直接返回false。 * 當writerShouldBlock()返回false時,表示當前線程能夠嘗試去獲取鎖,所以會執行if判斷中後面的邏輯,即經過CAS方法嘗試去修改同步變量的值, * 若是修改同步變量成功,則表示當前線程獲取到了鎖,最終tryAcquire()方法會返回true。若是修改失敗,那麼tryAcquire()會返回false,表示獲取鎖失敗。 * */
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}
複製代碼
  • 在tryAcquire()方法中,先經過exclusiveCount()方法來計算寫鎖的數量,怎麼計算的呢?就是將state和0x0000FFFF進行與運算
  • 而後判斷state是否等於0,若是等於0,就表示讀鎖和寫鎖都沒有被獲取,當前線程就調用writerShouldBlock()方法判斷線程是否須要等待,若是須要等待,tryAcquire()方法就返回false,表示獲取鎖失敗,那麼就會回到AQS的acquire()方法中,後面的邏輯與排他鎖的邏輯同樣。若是不須要等待,就嘗試去修改state的值,若是修改爲功,就表示獲取鎖成功,不然失敗。
  • 若是state不等於0,那麼就表示存在讀鎖或者寫鎖,那麼到底是讀鎖仍是寫鎖呢?就須要根據w的值進行判斷了。
  • 若是w爲0,說明寫鎖的數量爲0,而此時又由於c不等於0,說明鎖被佔用,可是不是寫鎖,那麼此時鎖的狀態必定是讀鎖,既然是讀鎖狀態,那麼寫鎖此時來獲取鎖時,就確定失敗,由於讀鎖存在時,是不能去獲取寫鎖的。所以當w等於0時,tryAcquire()方法返回false。
  • 若是w不爲0,說明此時鎖的狀態是寫鎖,接着進行current != getExclusiveOwnerThread()判斷,判斷持有鎖的線程是不是當前線程。若是不是當前線程,那麼tryAcquire()返回false;若是是當前線程,那麼就進行後面的邏輯。爲何是當前線程持有鎖,就能執行後面的邏輯呢? 由於讀寫鎖是支持重入的。
  • 若是是當前線程獲取的寫鎖,接着就判斷,再次對寫鎖進行重入時,會不會超出寫鎖的最大重入次數,若是是,就拋出異常。(由於state的低16位表示寫鎖,因此寫鎖最大可被重入的次數是2^{16}-1)。

寫鎖釋放

  • 寫鎖的釋放與排他鎖的釋放邏輯也幾乎同樣。當調用writeLock.unlock()時,先調用到AQS的release()方法,在release()方法中會先調用子類的tryRelease()方法。在這裏調用的是ReentrantReadWriteLock的內部類Sync的tryRelease()方法。寫鎖的釋放邏輯比較簡單,能夠參考下面源碼中的註釋。方法的源碼和註釋以下。
protected final boolean tryRelease(int releases) {
    // 判斷是不是當前線程持有鎖
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 將state的值減去releases
    int nextc = getState() - releases;
    // 調用exclusiveCount()方法,計算寫鎖的數量。若是寫鎖的數量爲0,表示寫鎖被徹底釋放,此時將AQS的exclusiveOwnerThread屬性置爲null
    // 並返回free標識,表示寫鎖是否被徹底釋放
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}
複製代碼

讀鎖加鎖

  • 讀鎖是共享鎖,因此當調用readLock.lock()方法時,會先調用到AQS的acquiredShared()方法,在acquireShared()方法中會先調用子類的tryAcquireShared()方法。在這裏會調用的是ReentrantReadWriteLock的內部類Sync的tryAcquireShared()方法。該方法的源碼以下。
protected final int tryAcquireShared(int unused) {
    
    Thread current = Thread.currentThread();
    int c = getState();
    // exclusiveCount(c)返回的是寫鎖的數量,若是它不爲0,說明寫鎖被佔用,若是此時佔用寫鎖的線程不是當前線程,就返回-1,表示獲取鎖失敗
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    // r表示的是讀鎖的數量
    int r = sharedCount(c);
    /** * 在下面的代碼中進行了三個判斷: * 一、讀鎖是否應該排隊。若是沒有人排隊,就進行if後面的判斷。有人排隊,就不會進行if後面的判斷,而是最終調用fullTryAcquireShared()方法 * 二、讀鎖數量是否超過最大值。(最大數量爲2的16次方-1) * 三、嘗試修改同步變量的值 */
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        // 讀鎖數量爲0時,就將當前線程設置爲firstReader,firstReaderHoldCount=1
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            // 讀鎖數量不爲0且firstReader(第一次獲取讀的線程)爲當前線程,就將firstReaderHoldCount累加
            firstReaderHoldCount++;
        } else {
            // 讀鎖數量不爲0,且第一個獲取到讀鎖的線程不是當前線程
            // 下面這一段邏輯就是保存當前線程獲取讀鎖的次數,如何保存的呢?
            // 經過ThreadLocal來實現的,readHolds就是一個ThreadLocal的實例
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        // 返回1表示獲取讀鎖成功
        return 1;
    }
    // 當if中的三個判斷均不知足時,就會執行到這兒,調用fullTryAcquireShared()方法嘗試獲取鎖
    return fullTryAcquireShared(current);
}
複製代碼
  • 在tryAcquireShared()方法中,會先經過exclusiveCount()方法來計算寫鎖的數量,若是寫鎖存在,再判斷持有寫鎖的線程是否是當前線程,若是不是當前線程,就表示寫鎖被其餘線程給佔用,此時當前線程不能獲取讀鎖。tryAcquireShared()方法返回-1,表示獲取讀鎖失敗。若是寫鎖不存在或者持有寫鎖的線程是當前線程,那麼就表示當前線程有機會獲取到讀鎖。
  • 接下里會判斷當前線程獲取讀鎖是否不須要排隊,讀鎖數量是否會超過最大值,以及經過CAS修改讀鎖的狀態是否成功(將state的值加 1<<16)。若是這三個條件成立,就進入if語句塊中,這一塊的代碼比較繁瑣,可是功能比較單一,就是統計讀鎖的數量以及當前線程對讀鎖的重入次數,底層原理就是ThreadLocal。由於在讀寫鎖中提供了getReadLockCount()、getReadHoldCount()等方法,這幾個方法的數據就來自這兒。
  • 若是上面的三個條件有一個不成立,就不會進入if語句塊,那麼就會調用fullTryAcquireShared()方法。該方法的做用就是讓線程不停的獲取鎖,其源碼以下。
final int fullTryAcquireShared(Thread current) {
    /* * This code is in part redundant with that in * tryAcquireShared but is simpler overall by not * complicating tryAcquireShared with interactions between * retries and lazily reading hold counts. */
    HoldCounter rh = null;
    // for死循環,直到知足相應的條件纔會return退出,不然一直循環
    for (;;) {
        int c = getState();
        // 鎖的狀態爲寫鎖時,持有鎖的線程不等於當期那線程,就說明當前線程獲取鎖失敗,返回-1
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 嘗試設置同步變量的值,只要設置成功了,就表示當前線程獲取到了鎖,而後就設置鎖的獲取次數等相關信息
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}
複製代碼
  • 當獲取到讀鎖成功之後,tryAcquireShared()方法會返回1,這樣當回到AQS的acquireShared()方法時,就會直接結束了。若是獲取鎖失敗,tryAcquireShared()方法會返回-1,那麼在AQS中,就會接着執行doAcquireShared()方法。doAcquireShared()方法的做用就是將本身加入到同步隊列中,等待獲取鎖,直到獲取鎖成功。該方法不響應中斷。

讀鎖釋放

  • 當調用readLock.unlock()方法時,會先調用到AQS的releaseShared()方法,在releaseShared()方法中會先調用子類的tryReleaseShared()方法。在這裏會調用的是ReentrantReadWriteLock的內部類Sync的tryReleaseShared()方法。該方法的源碼以下。
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    for (;;) {
        int c = getState();
        // 將修改同步變量的值(讀鎖狀態減去1<<16)
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}
複製代碼
  • 在tryReleaseShared()方法中,會先修改和讀鎖計數有關的數據,而後在for的死循環中,經過CAS操做將state的值減去1<<16。若是CAS操做成功,纔會從for循環中退出。當讀鎖數量爲0時,tryReleaseShared()返回true,表示鎖被徹底釋放。
  • 當tryReleaseShared()方法返回後,接下來的步驟和共享鎖的釋放邏輯徹底同樣的。

注意事項

  • 讀寫鎖的使用十分簡單,可是在讀寫鎖的使用過程當中,須要注意如下兩點。
  • 1. 讀寫鎖不支持鎖升級支持鎖降級。鎖升級指的是線程獲取到了讀鎖,在沒有釋放讀鎖的前提下,又獲取寫鎖。鎖降級指的是線程獲取到了寫鎖,在沒有釋放寫鎖的狀況下,又獲取讀鎖。爲何不支持鎖升級呢?能夠參考以下示例代碼。
public void lockUpgrade(){
    ReadWriteLock lock = new ReentrantReadWriteLock();
    // 建立讀鎖
    Lock readLock = lock.readLock();
    // 建立寫鎖
    Lock writeLock = lock.writeLock();
    readLock.lock();
    try{
        // ...處理業務邏輯
        writeLock.lock();   // 代碼①
    }finally {
        readLock.unlock();
    }
}
複製代碼
  • 在上面的示例代碼中,假如T1線程先獲取到了讀鎖,而後執行後面的代碼,在執行到代碼①的上一行時,T2線程也去獲取讀鎖,因爲讀鎖是共享鎖,且此時寫鎖尚未被獲取,因此此時T2線程能夠獲取到讀鎖,當T1執行到代碼①時,嘗試去獲取寫鎖,因爲有T2線程佔用了讀鎖,因此T1線程是沒法獲取到寫鎖的,只能等待,當T2也執行到代碼①時,因爲T1佔有了讀鎖,致使T2沒法獲取到寫鎖,這樣兩個線程就一直等待,即獲取不到寫鎖,也釋放不掉讀鎖。所以鎖是不支持鎖升級的。
  • 讀寫鎖支持鎖的降級,鎖的降級是爲了保證可見性。讓T1線程對數據的修改對其餘線程可見。
  • 2. 讀鎖不支持條件等待隊列。當調用ReadLock類的newCondition()方法時,會直接拋出異常。
public Condition newCondition() {
    throw new UnsupportedOperationException();
}
複製代碼
  • 由於讀鎖是共享鎖,最大獲取次數爲2^{16}-1),同一時刻能夠被多個線程持有,對於讀鎖而言,其餘線程沒有必要等待獲取讀鎖,Condition的等待喚醒毫無心義。

總結

  • 本文先簡單介紹了讀寫鎖的功能,它由兩把鎖組成:讀鎖和寫鎖。而後介紹了讀寫鎖的一些特性。接着分析瞭如何經過state這一個變量來表示讀寫鎖的狀態,state的高16位表示讀鎖,低16位表示寫鎖,將state和0x0000FFFF進行與運算,獲得的就是寫鎖的數量。
  • 最後分別經過源碼分析了寫鎖的釋放與獲取過程,讀鎖的釋放與獲取過程。其中寫鎖是排他鎖,所以它的釋放和獲取調用的是AQS中的獨佔式釋放鎖和獲取鎖的方法,讀鎖是共享鎖,所以它的釋放與獲取調用的是AQS中的共享式釋放鎖和獲取鎖的方法。

相關推薦

微信公衆號
相關文章
相關標籤/搜索