Java併發編程之鎖機制之ReentrantReadWriteLock(讀寫鎖)

前言

在前面的文章中,咱們講到了ReentrantLock(重入鎖),接下來咱們講ReentrantReadWriteLock(讀寫鎖),該鎖具有重入鎖的可重入性可中斷獲取鎖等特徵,可是與ReentrantLock不同的是,在ReentrantReadWriteLock中,維護了一對鎖,一個讀鎖一個寫鎖,而讀寫鎖在同一時刻容許多個線程訪問。可是在寫線程訪問時,全部的讀線程和其餘的寫線程均被阻塞。在閱讀本片文章以前,但願你已閱讀過如下幾篇文章:編程

基本結構

在具體瞭解ReentrantReadWriteLock以前,咱們先看一下其總體結構,具體結構以下圖所示: 緩存

ReentrantReadWriteLock.png

從總體圖上來看,ReentrantReadWriteLock實現了ReadWriteLock接口,其中在ReentrantReadWriteLock中分別聲明瞭如下幾個靜態內部類:bash

  • WriteLockReadLock(維護的一對讀寫鎖):單從類名咱們能夠看出這兩個類的做用,就是控制讀寫線程的鎖
  • Sync及其子類NofairSyncFairSync:若是你閱讀過 Java併發編程之鎖機制之重入鎖中公平鎖與非公平鎖的介紹,那麼咱們也能夠猜想出ReentrantReadWriteLock(讀寫鎖)是支持公平鎖與非公平鎖的。
  • ThreadLoclHoldCounterHoldCounter:涉及到鎖的重進入,在下文中咱們會具體進行描述。

基本使用

在使用某些種類的Collection時,可使用ReentrantReadWriteLock 來提升併發性。一般,在預期Collection 很大,且讀取線程訪問它的次數多於寫入線程的狀況下,且所承擔的操做開銷高於同步開銷時,這很值得一試。例如,如下是一個使用 TreeMap(咱們假設預期它很大,而且能被同時訪問) 的字典類。數據結構

class RWDictionary {
    private final Map<String, Data> m = new TreeMap<String, Data>();
    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    private final Lock r = rwl.readLock();//獲取讀鎖
    private final Lock w = rwl.writeLock();//獲取寫鎖
    
	//讀取Map中的對應key的數據
    public Data get(String key) {
        r.lock();
        try { return m.get(key); }
        finally { r.unlock(); }
    }
    //讀取Map中全部的key
    public String[] allKeys() {
        r.lock();
        try { return m.keySet().toArray(); }
        finally { r.unlock(); }
    }
    //往Map中寫數據
    public Data put(String key, Data value) {
        w.lock();
        try { return m.put(key, value); }
        finally { w.unlock(); }
    }
    //清空數據
    public void clear() {
        w.lock();
        try { m.clear(); }
        finally { w.unlock(); }
    }
 }
複製代碼

在上述例子中,咱們分別對TreeMap中的讀取操做進行了加鎖的操做。當咱們調用get(String key)方法,去獲取TreeMap中對應key值的數據時,須要先獲取讀鎖。那麼其餘線程對於寫鎖的獲取將會被阻塞,而對獲取讀鎖的線程不會阻塞。同理,當咱們調用put(String key, Data value)方法,去更新數據時,咱們須要獲取寫鎖。那麼其餘線程對於寫鎖與讀鎖的獲取都將會被阻塞。只有當獲取寫鎖的線程釋放了鎖以後。其餘讀寫操做才能進行。併發

這裏可能會有小夥伴會有疑問,爲何當獲取寫鎖成功後,會阻塞其餘的讀寫操做?,這裏實際上是爲了保證數據可見性。若是不阻塞其餘讀寫操做,假如讀操做優先與寫操做,那麼在數據更新以前,讀操做獲取的數據與寫操做更新後的數據就會產生不一致的狀況。app

須要注意的是:ReentrantReadWriteLock最多支持 65535 個遞歸寫入鎖和65535個讀取鎖。試圖超出這些限制將致使鎖方法拋出 Error。具體緣由會在下文進行描述。函數

實現原理

到如今爲止,咱們已經基本瞭解了ReentrantReadWriteLock的基本結構與基本使用。我相信你們確定對其內部原理感到好奇,下面我會帶着你們一塊兒去了解其內部實現。這裏我會對總體的一個原理進行分析,內部更深的細節會在下文進行描述。由於我以爲只有理解總體原理後,再去理解其中的細節。那麼對整個ReentrantReadWriteLock(讀寫鎖)的學習來講,要容易一點。高併發

總體原理

在前文中,咱們介紹了ReentrantReadWriteLock的基本使用,咱們發現整個讀寫鎖對線程的控制是交給了WriteLockReadLock。當咱們調用讀寫鎖的lock()方法去獲取相應的鎖時,咱們會執行如下代碼:工具

public void lock() { sync.acquireShared(1);}
複製代碼

也就是會調用sync.acquireShared(1),而sync又是什麼呢?從其構造函數中咱們也能夠看出:post

public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
複製代碼

其中關於FairSyncNonfairSync的聲明以下所示:

//同步隊列
abstract static class Sync extends AbstractQueuedSynchronizer {省略部分代碼...}
//非公平鎖
static final class NonfairSync extends Sync{省略部分代碼...}
//公平鎖
static final class FairSync extends Sync {省略部分代碼...}
複製代碼

這裏咱們又看到了咱們熟悉的AQS,也就是說WriteLockReadLock這兩個鎖,實際上是經過AQS中的同步隊列來對線程的進行控制的。那麼結合咱們以前的AQS的知識,咱們能夠獲得下圖:

(若是你對AQS不熟,那麼你能夠閱讀該篇文章---->Java併發編程之鎖機制之AQS(AbstractQueuedSynchronizer)

讀寫鎖狀態關係圖.png
這裏我省略了 爲何維護的是同一個同步隊列的緣由,這個問題留給你們。

讀寫狀態設計

雖然如今咱們已經知道了,WriteLockReadLock這兩個鎖維護了同一個同步隊列,可是我相信你們都會有個疑問,同步隊列中只有一個int類型的state變量來表示當前的同步狀態。那麼其內部是怎麼將兩個讀寫狀態分開,而且達到控制線程的目的的呢?

ReentrantReadWriteLock中的同步隊列,實際上是將同步狀態分爲了兩個部分,其中高16位表示讀狀態低16位表示寫狀態,具體狀況以下圖所示:

讀寫鎖狀態劃分.png

在上圖中,咱們能得知,讀寫狀態能表示的最大值爲65535(排除負數),也就是說容許鎖重進入的次數爲65535次。

接下來 咱們單看高16位,這裏表示當前線程已經獲取了寫鎖,且重進入了七次。一樣的這裏若是咱們也只但看低16位,那麼就表示當前線程獲取了讀鎖,且重進入了七次。這裏你們須要注意的是,在實際的狀況中,讀狀態與寫狀態是不能被不一樣線程同時賦值的。由於根據ReentrantReadWriteLock的設計來講,讀寫操做線程是互斥的。上圖中這樣表示,只是爲了幫助你們理解同步狀態的劃分

到如今爲止咱們已經知道同步狀態的劃分,那接下來又有新的問題了。如何快速的區分及獲取讀寫狀態呢?其實也很是簡單。

  • 讀狀態:想要獲取讀狀態,只須要將當前同步變量無符號右移16位
  • 寫狀態:咱們只須要將當前同步狀態(這裏用S表示)進行這樣的操做S&0x0000FFFF),也就是S&(1<<16-1)

也就是以下圖所示(可能圖片不是很清楚,建議在pc端上觀看):

讀寫鎖狀態原理.png

細節分析

在瞭解了ReentrantReadWriteLock的總體原理及讀寫狀態的劃分後,咱們再來理解其內部的讀寫線程控制就容易的多了,下面的文章中,我會對讀鎖與寫鎖的獲取分別進行討論。

讀鎖的獲取

由於當調用ReentrantReadWriteLock中的ReadLock的lock()方法時,最終會走Sync中的tryAcquireShared(int unused)方法,來判斷可否獲取寫鎖。那如今咱們就來看看該方法的具體實現。具體代碼以下所示:

protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            //(1)判斷當前是否有寫鎖,有直接返回
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
               
            int r = sharedCount(c);
             //(2)獲取當前讀鎖的狀態,判斷是否小於最大值,
             //同時根據公平鎖,仍是非公平鎖的模式,判斷當前線程是否須要阻塞,
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
	                compareAndSetState(c, c + SHARED_UNIT)) {
                //(3)若是是不要阻塞,且寫狀態小於最大值,則設置當前線程重進入的次數
                if (r == 0) {
			        //若是當前讀狀態爲0,則設置當前讀線程爲,當前線程爲第一個讀線程。
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
		            //計算第一個讀線程,重進入的次數
                    firstReaderHoldCount++;
                } else {
	                //經過ThreadLocl獲取讀線程中進入的鎖
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;//獲取共享同步狀態成功
            }
            //(4)當獲取讀狀態失敗後,繼續嘗試獲取讀鎖,
            return fullTryAcquireShared(current);
        }
複製代碼
  • (1)根據當前的同步狀態,判斷是否存在寫鎖,且當前擁有寫鎖的線程不是當前線程,那麼直接返回-1,須要注意的是若是該方法返回值爲負數,那麼會將該請求線程加入到AQS的同步隊列中。(對該方法不是很熟的小夥伴,建議查看 Java併發編程之鎖機制之AQS(AbstractQueuedSynchronizer)
  • (2)獲取當前讀鎖的狀態,判斷是否小於最大值,同時根據公平鎖,仍是非公平鎖的模式,判斷當前線程是否須要阻塞
  • (3)若是條件(2)知足,則設置分別第一個讀取線程重進入的次數後續線程重進入的次數
  • (4)若是條件(2)不知足,在再次嘗試獲取讀鎖。

在讀鎖的獲取中,涉及到的方法較爲複雜,因此下面會對每一個步驟中涉及到的方法,進行介紹。

步驟(1)中如何判斷是否有寫鎖?

在讀鎖的獲取中的步驟(1)中,代碼中會調用exclusiveCount(int c)方法來判當前是否存在寫鎖。而該方法是屬於Sync中的方法,具體代碼以下所示:

abstract static class Sync extends AbstractQueuedSynchronizer {

        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;//最大狀態數爲2的16次方-1
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        /*返回當前的讀狀態*/
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        /*返回當前的寫狀態 */
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
       }
複製代碼

從代碼中咱們能夠看出,只是簡單的執行了c & EXCLUSIVE_MASK,也就是S&0x0000FFFF,結合咱們上文中咱們所講的讀寫狀態的區分,我相信exclusiveCount(int c)sharedCount(int c)方法是不難理解的。

步驟(2)中如何判斷是公平鎖與非公平鎖。

在步驟(2)中,咱們發現調用了readerShouldBlock()方法,而該方法是Sync類中的抽象方法。在ReentrantReadWriteLock類中,公平鎖與非公平鎖進行了相應的實現,具體代碼以下圖所示:

//公平鎖
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451L;
        final boolean writerShouldBlock(){return hasQueuedPredecessors();}
        final boolean readerShouldBlock(){return hasQueuedPredecessors();
        }
    }
    //非公平鎖
    static final class NonfairSync extends Sync {
        final boolean writerShouldBlock() { return false;}
        final boolean readerShouldBlock() {return apparentlyFirstQueuedIsExclusive();}
    }
複製代碼

這裏就再也不對公平鎖與非公平鎖進行分析了。在文章 Java併發編程之鎖機制之重入鎖中已經對這個知識點進行了分析。有興趣的小夥伴能夠參考該文章。

步驟(3)中爲毛要記錄第一個獲取寫鎖的線程?線程的重進入是如何實現的?

在ReentrantReadWriteLock類中分別定義了Thread firstReaderint firstReaderHoldCount變量來記錄當前第一個獲取寫鎖的線程以及其重進入的次數。官方的給的解釋是便於跟蹤與記錄線程且這種記錄是很是廉價的。也就是說,之因此單獨定義一個變量來記錄第一個獲取獲取寫鎖的線程,是爲了在衆多的讀線程中區分線程,也是爲了之後的調試與跟蹤。

當咱們解決了第一個問題後,如今咱們來解決第二個問題。這裏我就不在對第一個線程如何記錄重進入次數進行分析了。咱們直接看其餘讀線程的重進入次數設置。這裏由於篇幅的限制,我就直接講原理,其餘線程的重進入的次數判斷是經過ThreadLocal來實現的。經過在每一個線程中的內存空間保存HodlerCount類(用於記錄當前線程獲取鎖的次數),來獲取相應的次數。具體代碼以下所示:

static final class HoldCounter {
            int count;//記錄當前線程進入的次數
            final long tid = getThreadId(Thread.currentThread());
        }
    
   static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }
     
   private transient ThreadLocalHoldCounter readHolds;
複製代碼

若是有小夥伴不熟悉ThreadLocal,能夠參看該篇文章《Android Handler機制之ThreadLocal》

步驟(4)中繼續嘗試獲取讀鎖?

當第一次獲取讀鎖失敗的時候,會調用fullTryAcquireShared(Thread current)方法會繼續嘗試獲取鎖。該函數返回的三個條件爲:

  • 當前已經存在寫鎖了。直接加入AQS同步隊列中。
  • 當前寫鎖的次數超過最大值,直接拋出異常
  • 獲取讀鎖成功。直接返回

具體代碼以下所示:

final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) {//注意這裏的for循環
                int c = getState();
                if (exclusiveCount(c) != 0) {//(1)存在寫鎖直接返回
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                } else if (readerShouldBlock()) {
                    // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT)//(2)鎖迭代次數超過最大值。拋出異常 throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) {//(3)獲取鎖成功,記錄次數 if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } } 複製代碼

由於該方法和上文提到的tryAcquireShared(int unused)方法較爲相似。因此這裏就再也不對其中的邏輯再次講解。你們須要注意的是該方法會自旋式的獲取鎖

寫鎖的獲取

瞭解了讀鎖的獲取,再來了解寫鎖的獲取就很是簡單了。寫鎖的獲取最終會走Sync中的tryAcquire(int acquires)方法。具體代碼以下所示:

protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            //(1)獲取同步狀態 = 寫狀態+讀狀態,單獨獲取寫狀態
            int c = getState();
            int w = exclusiveCount(c);
            //(2)若是c!=0則表示有線程操做
            if (c != 0) {
                // (2.1)沒有寫鎖線程,則表示有讀線程,則直接獲取失敗,並返回
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                    
                 //(2.2)若是w>0則,表示當前線程爲寫線程,則計算當前重進入的次數,若是已經飽和,則拋出異常
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                    
                // (2.3)獲取成功,直接記錄當前寫狀態
                setState(c + acquires);
                return true;
            }
            //(3)沒有線程獲取讀寫鎖,根據當前鎖的模式與設置寫狀態是否成功,判斷是否須要阻塞線程
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            //(4)第一次進入,獲取成功   
            setExclusiveOwnerThread(current);
            return true;
        }
複製代碼

爲了幫助你們理解,我這裏將該方法分爲了一下幾個步驟:

  • (1)獲取同步狀態 c(寫狀態+讀狀態),並單獨獲取寫狀態w
  • (2)若是c!=0則表示有線程操做。
  • (2.1)沒有寫鎖線程,則表示有讀線程,則直接獲取失敗,並返回。
  • (2.2)若是w>0則,表示當前線程爲寫線程,則計算當前重進入的次數,若是已經飽和,則拋出異常
  • (2.3)獲取成功,直接記錄當前寫狀態。
  • (3)在(2)條件不知足的條件下,沒有線程獲取讀寫鎖,根據當前鎖的模式與設置寫狀態是否成功,判斷是否須要阻塞線程
  • (4)在(2)(3)條件都不知足的狀況下,則爲第一次進入,那麼就獲取成功 。

相信結合以上步驟。再來理解代碼就很是容易了。

鎖降級

讀寫鎖除了保證寫操做對讀操做的可見性以及併發性的提高以外,讀寫鎖也能簡化讀寫交互的編程方式,試想一種狀況,在程序中咱們須要定義一個共享的用做緩存數據結構,而且其大部分時間提供讀服務(例如查詢和搜索),而寫操做佔有的時間不多,可是咱們又但願寫操做完成以後的更新須要對後續的讀操做可見。那麼該怎麼實現呢?參看以下例子:

public class CachedData {
    Object data;
    volatile boolean cacheValid;
    final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    
    void processCachedData() {
        rwl.readLock().lock();
        if (!cacheValid) {
            //若是緩存過時,釋放讀鎖,並獲取寫鎖
            rwl.readLock().unlock();
            rwl.writeLock().lock();(1)
            try {
                //從新檢查緩存是否過時,由於有可能在當前線程操做以前,其餘寫線程有可能改變緩存狀態
                if (!cacheValid) {
                    data = ...//從新寫入數據
                    cacheValid = true;
                }
                // 獲取讀鎖
                rwl.readLock().lock();(2)
            } finally {
	            //釋放寫鎖
                rwl.writeLock().unlock(); (3)
            }
        }

        try {
            use(data);//操做使用數據
        } finally {
            rwl.readLock().unlock();//最後釋放讀鎖
        }
    }
}
複製代碼

在上述例子中,若是數據緩存過時,也就是cacheValid變量(volatile 修飾的布爾類型)被設置爲false,那麼全部調用processCachedData()方法的線程都能感知到變化,可是隻有一個線程能過獲取到寫鎖。其餘線程會被阻塞在讀鎖和寫鎖的lock()方法上。當前線程獲取寫鎖完成數據準備以後,再獲取讀鎖,隨後釋放寫鎖(上述代碼的(1)(2)(3)三個步驟),這種在擁有寫鎖的狀況下,在獲取讀鎖。隨後釋放寫鎖的過程,稱之爲鎖降級(在讀寫鎖內部實現中,是支持鎖鎖降級的)

那接下來,我個問題想問你們,爲何當線程獲取寫鎖,修改數據完成後,要先獲取讀鎖呢,而不直接釋放寫鎖呢?,其實緣由很簡單,若是當前線程直接釋放寫鎖,那麼這個時候若是有其餘線程獲取了寫鎖,並修改了數據。那麼對於當前釋放寫鎖的線程是沒法感知數據變化的。先獲取讀鎖的目的,就是保證沒有其餘線程來修改數據啦。

總結

  • ReentrantReadWriteLock最多支持 65535 個遞歸寫入鎖和65535個讀取鎖。
  • ReentrantReadWriteLock中用同一int變量的高16位表示讀狀態低16位表示寫狀態
  • ReentrantReadWriteLock支持公平鎖與非公平鎖模式。
  • ReentrantReadWriteLock支持鎖的降級。
相關文章
相關標籤/搜索