一、爲何用讀寫鎖 ReentrantReadWriteLock?html
重入鎖ReentrantLock是排他鎖,在同一時刻僅有一個線程能夠進行訪問,可是在大多數場景下,大部分時間都是提供讀服務,而寫服務佔有的時間較少。然而讀服務不存在數據競爭問題,若是一個線程在讀時禁止其餘線程讀勢必會致使性能下降。因此就提供了讀寫鎖。java
讀寫鎖維護着一對鎖,一個讀鎖和一個寫鎖。經過分離讀鎖和寫鎖,使得併發性比通常的排他鎖有了較大的提高:在同一時間能夠容許多個讀線程同時訪問,可是在寫線程訪問時,全部讀線程和寫線程都會被阻塞。緩存
二、讀寫鎖實現原理:併發
(1)每一個ReentrantReadWriteLock對象都對應着讀鎖和寫鎖兩個鎖。oop
(2)ReentrantReadWriteLock經過其屬性sync(繼承了AQS),一個對象實現了讀寫兩個鎖。源碼分析
(3)sync.state(int)分爲高 16 位和低16位,高16位用於共享模式ReadLock,低16位用於獨佔模式WriteLock
post
(4)獲取寫鎖標誌:性能
1.sync.state的低16位(0表明沒有被佔用,大於0表明有線程持有當前鎖(鎖能夠重入,每次重入都+1) 最多2^16-1次重入ui
2.sync.exclusiveOwnerThread == Thread.currentThread()url
(5)獲取讀鎖標誌:
1.state的高16位(0表明沒有被佔用,大於0表明有線程持有當前鎖(鎖能夠重入,每次重入都+1) 最多2^16-1次重入
2.ThreadLocalHoldCounter readHolds; // 記錄線程持有的讀鎖數量(ThreadLocalHoldCounter extends ThreadLocal)
readHolds.threadLocals - Map<ThreadLocal, HoldCounter>
HoldCounter - count tid
(關於ThreadLocal:Java併發(二十):線程本地變量ThreadLocal)
3.sync.cachedHoldCounter 記錄最後一個獲取讀鎖的線程的讀鎖重入次數,用於緩存提升性能
4.sync.firstReader 第一個獲取讀鎖的線程(而且其未釋放讀鎖),以及它持有的讀鎖數量 提升性能
(6)ReentrantReadWriteLock的內部類WriteLock/ReadLock經過操做sync的屬性實現的鎖的操做。
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { // 屬性 private final ReentrantReadWriteLock.ReadLock readerLock; private final ReentrantReadWriteLock.WriteLock writerLock; final Sync sync; // 鎖 // 內部類 abstract static class Sync extends AbstractQueuedSynchronizer {} static final class FairSync extends Sync {} static final class NonfairSync extends Sync {} public static class ReadLock implements Lock, java.io.Serializable {} public static class WriteLock implements Lock, java.io.Serializable {} }
ReadLock 使用了共享模式,WriteLock 使用了獨佔模式。
ReadLock 和 WriteLock都是經過同一個Sync實例實現的。
AQS 將 state(int)分爲高 16 位和低16位,高16位用於共享模式ReadLock,低16位用於獨佔模式WriteLock 。
static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; static int sharedCount(int c) { return c >>> SHARED_SHIFT; } // 無符號補0右移16位 - 讀鎖 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } // 抹掉高16位 - 寫鎖
1.state的低16位(0表明沒有被佔用,大於0表明有線程持有當前鎖(鎖能夠重入,每次重入都+1) 最多2^16-1次重入
2.exclusiveOwnerThread == Thread.currentThread()
1.state的高16位(0表明沒有被佔用,大於0表明有線程持有當前鎖(鎖能夠重入,每次重入都+1) 最多2^16-1次重入
2.ThreadLocalHoldCounter readHolds; // 記錄線程持有的讀鎖數量
readHolds.threadLocals - Map<Thread, HoldCounter>
HoldCounter - count tid
// ReentrantReadWriteLock.WriteLock.lock() public void lock() { sync.acquire(1); } // AbstractQueuedSynchronizer.acquire(int) public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } /** * ReentrantReadWriteLock.Sync.tryAcquire(int) * 能夠獲取寫鎖的兩種狀況: * 1.沒有線程佔用該鎖(讀鎖和寫鎖都沒有被佔用) * 2.當前線程已經拿到過該寫鎖,重入 */ protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); // 寫鎖 if (c != 0) { // 有鎖 if (w == 0 || current != getExclusiveOwnerThread()) // 有鎖且沒有寫鎖(即有讀鎖) || 其餘線程佔用了寫鎖 return false; if (w + exclusiveCount(acquires) > MAX_COUNT) // 重入鎖上限 2^16-1 throw new Error("Maximum lock count exceeded"); setState(c + acquires); return true; } // 沒有線程佔用該鎖,直接獲取鎖 if (writerShouldBlock() || // 若是是公平鎖須要排隊 !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
// ReentrantReadWriteLock.WriteLock.unlock() public void unlock() { sync.release(1); } // AbstractQueuedSynchronizer.release(int) public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } /** * ReentrantReadWriteLock.Sync.tryRelease(int) * 釋放寫鎖:維護state和exclusiveOwnerThread * */ protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
abstract static class Sync extends AbstractQueuedSynchronizer { // 這個嵌套類的實例用來記錄每一個線程持有的讀鎖數量(讀鎖重入) static final class HoldCounter { int count = 0; // 持有的讀鎖數 final long tid = getThreadId(Thread.currentThread()); // 線程 id } static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } /** * 組合使用上面兩個類,用一個 ThreadLocal 來記錄線程持有的讀鎖數量 */ private transient ThreadLocalHoldCounter readHolds; /** * 用於緩存,記錄最後一個獲取讀鎖的線程的讀鎖重入次數 * 無論哪一個線程獲取到讀鎖後,就把這個值佔爲已用,這樣就不用到 ThreadLocal 中查詢 map 了 * 一般讀鎖的獲取很快就會伴隨着釋放,在 獲取->釋放 讀鎖這段時間,若是沒有其餘線程獲取讀鎖的話,此緩存就能幫助提升性能 */ private transient HoldCounter cachedHoldCounter; /** * 第一個獲取讀鎖的線程(而且其未釋放讀鎖),以及它持有的讀鎖數量 * 提升性能用 */ private transient Thread firstReader = null; private transient int firstReaderHoldCount; }
// ReentrantReadWriteLock.ReadLock.lock() public void lock() { sync.acquireShared(1); } // AbstractQueuedSynchronizer.acquireShared(int) public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } /** * ReentrantReadWriteLock.Sync.tryAcquireShared(int) * 能夠獲取讀鎖狀況: * 1.沒有線程佔用該鎖 * 2.只有讀鎖 * 3.有寫鎖,寫鎖被當前線程佔用,鎖降級 * 三種狀況 - 只要沒有其餘線程佔用寫鎖就能夠獲取讀鎖 */ protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) // 其餘線程佔用寫鎖 return -1; int r = sharedCount(c); // 讀鎖 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // 進入此if,表示能夠拿到讀鎖了 if (r == 0) { // 將"第一個"獲取讀鎖的線程記錄在firstReader屬性中 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { // 1.當前線程及對應讀鎖次數存入cachedHoldCounter 2.當前線程及對應讀鎖次數存入readHolds HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get();// readHolds中取當先線程的ThreadLocal(沒有就建立一個) else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current);// 公平鎖排隊非公平鎖下一個是寫鎖/讀鎖重入次數上限/CAS失敗 從新拿讀鎖 } /** * ReentrantReadWriteLock.Sync.fullTryAcquireShared(Thread) */ final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { // 循環CAS拿鎖 int c = getState(); if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) // 其餘線程佔用寫鎖 return -1; } else if (readerShouldBlock()) { // 處理讀鎖重入,將cachedHoldCounter設置爲當前線程 if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) // 不是重入,返回-1 return -1; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { // 正常拿讀鎖 if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
持有寫鎖的線程,去獲取讀鎖的過程稱爲鎖降級
// ReentrantReadWriteLock.ReadLock.unlock() public void unlock() { sync.releaseShared(1); } // AbstractQueuedSynchronizer.releaseShared(int) public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } /** * ReentrantReadWriteLock.Sync.tryReleaseShared(int) * 維護readHolds state */ protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // 第一個獲取讀鎖的線程 if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { // readHolds中次數-1 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { // state int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }