讀寫鎖實現邏輯相對比較複雜,可是倒是一個常用到的功能,但願將我對
ReentrantReadWriteLock
的源碼的理解記錄下來,能夠對你們有幫助html
在理解ReentrantReadWriteLock
時須要具有一些基本的知識java
以前有寫過一篇《深刻淺出AQS源碼解析》關於AQS的文章,對AQS原理不瞭解的同窗能夠先看一下緩存
ReentrantLock
的實現原理能夠參考《深刻淺出ReentrantLock源碼解析》函數
對於資源的訪問就兩種形式:要麼是讀操做,要麼是寫操做。讀寫鎖是將被鎖保護的臨界資源的讀操做和寫操做分開,容許同時有多個線程同時對臨界資源進行讀操做,任意時刻只容許一個線程對資源進行寫操做。簡單的說,對與讀操做採用的是共享鎖,對於寫操做採用的是排他鎖。優化
ReentrantReadWriteLock
是用state
字段來表示讀寫鎖重複獲取資源的次數,高16位用來標記讀鎖的同步狀態,低16位用來標記寫鎖的同步狀態ui
// 劃分的邊界線,用16位來劃分 static final int SHARED_SHIFT = 16; // 讀鎖的基本單位,也就是讀鎖加1或者減1的基本單位(1左移16位後的值) static final int SHARED_UNIT = (1 << SHARED_SHIFT); // 讀寫鎖的最大值(在計算讀鎖的時候須要先右移16位) static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; // 寫鎖的掩碼,state值與掩碼作與運算後獲得寫鎖的真實值 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 獲取資源被讀鎖佔用的次數 static int sharedCount(int c){ return c >>> SHARED_SHIFT; } // 獲取資源被寫鎖佔用的次數 static int exclusiveCount(int c){ return c & EXCLUSIVE_MASK; }
在統計讀鎖被每一個線程持有的次數時,ReentrantReadWriteLock
採用的是HoldCounter
來實現的,具體以下:線程
// 持有讀鎖的線程重入的次數 static final class HoldCounter { // 重入的次數 int count = 0; // 持有讀鎖線程的線程id final long tid = getThreadId(Thread.currentThread()); } /** * 採用ThreadLocal機制,作到線程之間的隔離 */ static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } /** * 線程持有可重入讀鎖的次數 */ private transient ThreadLocalHoldCounter readHolds; /** * 緩存最後一個成功獲取讀鎖的線程的重入次數,有兩方面的好處: * 一、避免了經過訪問ThreadLocal來獲取讀鎖的信息,這個優化的前提是 * 假設多數狀況下,一個獲取讀鎖的線程,使用完之後就會釋放讀鎖, * 也就是說最後獲取讀鎖的線程和最早釋放讀鎖的線程大多數狀況下是同一個線程 * 二、由於ThreadLocal中的key是一個弱引用類型,當有一個變量持有HoldCounter對象時, * ThreadLocalHolderCounter中最後一個獲取鎖的線程信息不會被GC回收掉 */ private transient HoldCounter cachedHoldCounter; /** * 第一個獲取讀鎖的線程,有兩方面的考慮: * 一、記錄將共享數量從0變成1的線程 * 二、對於無競爭的讀鎖來講進行線程重入次數數據的追蹤的成本是比較低的 */ private transient Thread firstReader = null; /** * 第一個獲取讀鎖線程的重入次數,能夠參考firstReader的解析 */ private transient int firstReaderHoldCount;
ReentrantReadWriteLock
一共有5個內部類,具體以下:設計
Sync
:公平鎖和非公平鎖的抽象類NonfairSync
:非公平鎖的具體實現FairSync
:公平鎖的具體實現ReadLock
:讀鎖的具體實現WriteLock
:寫鎖的具體實現咱們從讀鎖ReadLock
和寫鎖WriteLock
的源碼開始分析,而後順着這個思路將整個ReentrantReadWriteLock
中全部的核心源碼(全部的包括內部類)進行分析。code
public static class ReadLock implements Lock, java.io.Serializable { private final Sync sync; /** * 經過ReentrantReadWriteLock中的公平鎖或非公平鎖來初始化sync變量 */ protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; } /** * 阻塞的方式獲取鎖,由於讀鎖是共享鎖,因此調用acquireShared方法 */ public void lock() { sync.acquireShared(1); } /** * 可中斷且阻塞的方式獲取鎖 */ public void lockInterruptibly() throws InterruptedException { sync.acquireSharedInterruptibly(1); } /** * 超時嘗試獲取鎖,非阻塞的方式 */ public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } /** * 嘗試獲取寫鎖,非阻塞的方式 */ public boolean tryLock() { return sync.tryReadLock(); } /** * 釋放鎖 */ public void unlock() { sync.releaseShared(1); } }
接下來,咱們重點看一下在公平鎖和非公平鎖下Sync.acquireShared
、Sync.releaseShared
和Sync.tryLock
這3個方法的實現(acquireSharedInterruptibly
和tryAcquireSharedNanos
是AQS中的方法,這裏就不在討論了,具體能夠參考《深刻淺出AQS源碼解析》),其中Sync.acquireShared
中核心調用的方法是Sync.tryAcquireShared
,Sync. releaseShared
中核心調用的方法是Sync.tryReleaseShared
,Sync.tryLock
中核心調用的方法是Sync.tryReadLock
,因此咱們重點分析Sync.tryAcquireShared
方法、Sync.tryReleaseShared
方法和sync.tryReadLock
方法htm
protected final int tryAcquireShared(int unused) { /** * 以共享鎖的方式嘗試獲取讀鎖,步驟以下: * 一、若是資源已經被寫鎖獲取了,直接返回失敗 * 二、若是讀鎖不須要等待(公平鎖和非公平鎖的具體實現有區別)、 * 而且讀鎖未超過上限、同時設置讀鎖的state值成功,則返回成功 * 三、若是步驟2失敗了,須要進入fullTryAcquireShared函數再次嘗試獲取讀鎖 */ Thread current = Thread.currentThread(); int c = getState(); /** * 資源已經被寫鎖獨佔,直接返回false */ if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); /** * 一、讀鎖不須要等待 * 二、讀鎖未超過上限 * 三、設置讀鎖的state值成功 * 則返回成功 */ if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { // 記錄第一個獲取讀鎖的線程信息 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { // 第一個獲取讀鎖的線程再次獲取鎖(重入) firstReaderHoldCount++; } else { // 修改獲取鎖的線程的重入的次數 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != LockSupport.getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } /** * 若是CAS失敗再次獲取讀鎖 */ return fullTryAcquireShared(current); }
接下來看一下fullTryAcquireShared方法:
final int fullTryAcquireShared(Thread current) { /** * 調用該方法的線程都是但願獲取讀鎖的線程,有3種狀況: * 一、在嘗試經過CAS操做修改state時因爲有多個競爭讀鎖的線程致使CAS操做失敗 * 二、須要排隊等待獲取讀鎖的線程(公平鎖) * 三、超過讀鎖限制的最大申請次數的線程 */ HoldCounter rh = null; for (;;) { // 無限循環獲取鎖 int c = getState(); // 已經被寫線程獲取鎖了,直接返回 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; // 須要被block的讀線程(公平鎖) } else if (readerShouldBlock()) { // 若是時當前線程 if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { // 清理當前線程中重入次數爲0的數據 if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } // 當前線程獲取鎖失敗 if (rh.count == 0) return -1; } } // 判斷是否超過讀鎖的最大值 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 修改讀鎖的state值 if (compareAndSetState(c, c + SHARED_UNIT)) { // 最新獲取到讀鎖的線程設置相關的信息 if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; // 當前線程重複獲取鎖(重入) } else { // 在readHolds中記錄獲取鎖的線程的信息 if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
tryReleaseShared
方法的實現邏輯比較簡單,咱們直接看代碼中的註釋
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); /** * 若是當前線程是第一個獲取讀鎖的線程,有兩種狀況: * 一、若是持有鎖的次數爲1,直接釋放成功 * 二、若是持有鎖的次數大於1,說明有重入的狀況,須要次數減1 */ if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { /** * 若是當前線程不是第一個獲取讀鎖的線程 * 須要更新線程持有鎖的重入次數 * 若是次數小於等於0說明有異常,由於只有當前線程纔會出現持有鎖的重入次數等於0或者1 */ HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } // 修改state的值 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // 若是是最後一個釋放讀鎖的線程nextc爲0,不然不是 return nextc == 0; } }
tryReadLock
的代碼比較簡單,就直接在將解析過程在註釋中描述
final boolean tryReadLock() { Thread current = Thread.currentThread(); for (;;) { // 無限循環獲取讀鎖 int c = getState(); // 當前線程不是讀線程,直接返回失敗 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return false; int r = sharedCount(c); // 讀鎖的總重入次數是否超過最大次數限制 if (r == MAX_COUNT) throw new Error("Maximum lock count exceeded"); /** * 經過CAS操做設置state的值,若是成功表示嘗試獲取讀鎖成功,須要作如下幾件事情: * 一、若是是第一獲取讀鎖要記錄第一個獲取讀鎖的線程信息 * 二、若是是當前獲取鎖的線程和第一次獲取鎖的線程相同,須要更新第一獲取線程的重入次數 * 三、更新獲取讀鎖線程相關的信息 */ if (compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return true; } } }
public static class WriteLock implements Lock, java.io.Serializable { private final Sync sync; /** * 經過ReentrantReadWriteLock中的公平鎖或非公平鎖來初始化sync變量 */ protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync; } /** * 阻塞的方式獲取寫鎖 */ public void lock() { sync.acquire(1); } /** * 中斷的方式獲取寫鎖 */ public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } /** * 嘗試獲取寫鎖 */ public boolean tryLock( ) { return sync.tryWriteLock(); } /** * 超時嘗試獲取寫鎖 */ public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } /** * 釋放寫鎖 */ public void unlock() { sync.release(1); } }
接下來,咱們重點看一下在公平鎖和非公平鎖下Sync.tryAcquire
、Sync.tryRelease
和Sync.tryWriteLock
這幾個核心方法是如何實現寫鎖的功能
Sync.tryAcquire
方法的邏輯比較簡單,就直接在代碼中註釋,具體以下:
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); // 讀寫鎖的次數 int c = getState(); // 寫鎖的次數 int w = exclusiveCount(c); /* * 若是讀寫鎖的次數不爲0,說明鎖可能有如下3中狀況: * 一、所有是讀線程佔用資源 * 2. 所有是寫線程佔用資源 * 3. 讀寫線程都佔用了資源(鎖降級:持有寫鎖的線程能夠去持有讀鎖),可是讀寫線程都是同一個線程 */ if (c != 0) { // 寫線程不佔用資源,第一個獲取鎖的線程也不是當前線程,直接獲取失敗 if (w == 0 || current != getExclusiveOwnerThread()) return false; // 檢查獲取寫鎖的線程是否超過了最大的重入次數 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 修改state的狀態,之因此沒有用CAS操做來修改,是由於寫線程只有一個,是獨佔的 setState(c + acquires); return true; } /* * 寫線程是第一個競爭鎖資源的線程 * 若是寫線程須要等待(公平鎖的狀況),或者 * 寫線程的state設置失敗,直接返回false */ if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; // 設置當前線程爲owner setExclusiveOwnerThread(current); return true; }
Sync.tryRelease
方便的代碼很簡單,直接看代碼中的註釋
protected final boolean tryRelease(int releases) { // 若是釋放鎖的線程不持有鎖,返回失敗 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 獲取寫鎖的重入的次數 int nextc = getState() - releases; // 若是次數爲0,須要釋放鎖的owner boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); // 釋放鎖的owner setState(nextc); return free; }
Sync.tryWriteLock
這個方法也比較簡單,就直接上代碼了
final boolean tryWriteLock() { Thread current = Thread.currentThread(); int c = getState(); // 讀鎖或者寫鎖已經被線程持有 if (c != 0) { int w = exclusiveCount(c); // 寫鎖第一次獲取鎖或者當前線程不是第一次獲取寫鎖的線程(也就是否是owner),直接失敗 if (w == 0 || current != getExclusiveOwnerThread()) return false; // 超出寫鎖的最大次數,直接失敗 if (w == MAX_COUNT) throw new Error("Maximum lock count exceeded"); } // 競爭寫鎖的線程修改state, // 若是成功將本身設置成鎖的owner, // 若是失敗直接返回 if (!compareAndSetState(c, c + 1)) return false; setExclusiveOwnerThread(current); // 設置當前線程持有鎖 return true; }