目錄html
在前一篇博客多線程學習筆記三之ReentrantLock與AQS實現分析分析了基於同步器AQS實現的獨佔鎖ReentrantLock,AQS同步器做爲JUC組件實現鎖的框架,基於AQS除了能夠實現獨佔鎖,還能夠實現共享鎖。
ReentrantReadWriteLock是基於AQS實現的讀寫鎖,內部維護了一個讀鎖(共享鎖)和寫鎖(獨佔鎖)。若是咱們要在程序中提供共享的緩存數據結構,緩存確定是讀操做(數據查詢)多而寫操做(數據更新)少,只要保證寫操做對後續的讀操做是可見的就好了,這種狀況下使用獨佔鎖就不如讀寫鎖的吞吐量大,讀寫鎖中的讀鎖容許多個線程得到讀鎖對資源進行讀操做,寫鎖是傳統的獨佔鎖,只容許單個線程得到寫鎖對資源進行更新。如下是JDK提供基於ReentrantReadWriteLock簡單實現緩存結構的Demo:java
class CachedData { Object data; volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() { rwl.readLock().lock(); if (!cacheValid) { // 必須先釋放讀鎖再獲取寫鎖 rwl.readLock().unlock(); rwl.writeLock().lock(); try { //再次檢查cacheValid防止其餘線程得到寫鎖改變cacheValid值 if (!cacheValid) { data = ... cacheValid = true; } // 寫鎖降級爲讀鎖 rwl.readLock().lock(); } finally { //釋放寫鎖 rwl.writeLock().unlock(); } } try { use(data); } finally { rwl.readLock().unlock(); } } }
ReentranReadWriteLock的關係圖:
node
ReentrantReadWriteLock沒有實現Lock接口,實現了ReadWriteLock接口。內部類ReadLock和WriteLock實現Lock接口,ReadLock和WriteLock包含了繼承了AQS的Sync對象,從而提供了共享鎖和獨佔鎖特性的實現。讀寫鎖ReentrantReadWriteLock具備如下特性:緩存
在實現ReentrantLock時,當一個線程去嘗試獲取鎖時,線程會去檢查同步器AQS中維護的int型變量state是否爲0,同步狀態加一表示當前線程成功獲取鎖。而讀寫鎖ReentrantReadWriteLock維護了讀鎖和寫鎖,那麼一個線程得到了鎖,怎麼經過state代表究竟是讀鎖仍是寫鎖呢?答案是把int型變量切位兩部分,高16位表示讀狀態,低16位表示寫狀態。ReentrantReadWriteLock在內部類Sync定義瞭如下常量用以區分讀寫狀態:數據結構
//偏移量 static final int SHARED_SHIFT = 16; //線程得到讀鎖,state加SHARED_UNIT,state高16位SHARED_UNIT個數表明了有多少個共享鎖 static final int SHARED_UNIT = (1 << SHARED_SHIFT); //讀寫鎖重入最多不超過65535 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** Returns the number of shared holds represented in count */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** Returns the number of exclusive holds represented in count */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
經過把32位int型變量state按位切割成兩部分維護讀寫兩種狀態,具體劃分如圖:
多線程
從圖中能夠看到,當前線程獲取了寫鎖,重進入了3次,連續得到了兩次讀鎖,每次得到寫鎖,就把state加1,而低16位總共最大是65535,就是MAX_COUNT的值。每得到一次讀鎖,就把state加SHARED_COUNT。那麼如何獲取讀寫狀態呢?只要經過位運算取出高16位或低16位就好了,對於讀狀態,state>>>SHARED_SHIFT(無符號補0右移16位)就能夠獲得加了多少次SHARED_UNIT從而得到讀狀態;對於寫狀態,state & EXCLUSIVE_MASK(0X0000FFFF,高16位都變爲0,低16位不變)就能夠得到寫狀態。併發
因爲ReentrantReadWriteLock支持讀寫鎖的重入,而寫鎖是獨佔鎖,只要取出同步狀態state低16位對應的數值就是得到寫鎖的重入次數;而讀鎖是共享鎖,每一個線程得到讀鎖就會把state加上SHARED_UNIT(包括讀鎖重入),取出state高16位的對應的數值表示是全部線程得到讀鎖的次數,可是如何得到單個線程得到共享鎖的次數呢?內部類Sync爲同步器維護了一個讀鎖計數器,專門統計每一個線程得到讀鎖的次數。Sync內部有兩個內部類分別爲HoldCounter和ThreadLocalHoldCounter:框架
abstract static class Sync extends AbstractQueuedSynchronizer { static final class HoldCounter { //計數器,用於統計線程重入讀鎖次數 int count = 0; // Use id, not reference, to avoid garbage retention //線程TID,區分線程,能夠惟一標識一個線程 final long tid = getThreadId(Thread.currentThread()); } static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { //重寫初始化方法,在沒有進行set的狀況下,獲取的都是該HoldCounter值 public HoldCounter initialValue() { return new HoldCounter(); } } private transient ThreadLocalHoldCounter readHolds; private transient HoldCounter cachedHoldCounter; private transient Thread firstReader = null; private transient int firstReaderHoldCount; Sync() { //本地線程讀鎖計數器 readHolds = new ThreadLocalHoldCounter(); setState(getState()); // ensures visibility of readHolds } }
獲取讀鎖,由內部類ReadLock提供lock方法,調用了Sync父類AQS的方法:oop
//獲取讀鎖 public void lock() { sync.acquireShared(1); } //獲取共享鎖 public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
嘗試獲取共享鎖:學習
protected final int tryAcquireShared(int unused) { //當前線程 Thread current = Thread.currentThread(); //同步狀態state int c = getState(); //檢查獨佔鎖是否被佔據,若是被佔據,是不是當前線程獲取了獨佔鎖 //若是是當前線程獲取了寫鎖,能夠繼續獲取讀鎖,若是都不是返回-1表示獲取失敗 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //讀鎖數量 int r = sharedCount(c); //!readerShouldBlock() 根據公平與否策略和隊列是否含有等待節點決定當前線程是否繼續獲取鎖 //不能大於65535且CAS修改爲功 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { //若是沒有線程獲取過讀鎖 if (r == 0) { //將當前線程設置爲第一個讀鎖線程 firstReader = current; // 計數器爲一 firstReaderHoldCount = 1; //讀鎖重入 } else if (firstReader == current) { //計數器加一 firstReaderHoldCount++; } else { // 若是不是第一個線程,獲取鎖成功 // cachedHoldCounter 表明的是最後一個獲取讀鎖的線程的計數器 HoldCounter rh = cachedHoldCounter; // 若是計數器是 null 或者不指向當前線程,那麼就新建一個 HoldCounter 對象 if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); //計數器爲0,保存到readHolds中 else if (rh.count == 0) readHolds.set(rh); //計數器加一 rh.count++; } return 1; } return fullTryAcquireShared(current); }
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; //死循環 for (;;) { //同步狀態 int c = getState(); //檢查寫鎖獲取狀況 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; //進入到這裏,說明沒有其餘線程獲取寫鎖 //公平鎖策略檢查 } else if (readerShouldBlock()) { //readerShouldBlock()返回true,應該堵塞,檢查是否獲取過讀鎖 // 第一個獲取讀鎖線程是當前線程,重入 if (firstReader == current) { } else { //循環中,若計數器爲null if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } //須要阻塞且是非重入(還未獲取讀鎖的),獲取失敗。 if (rh.count == 0) return -1; } } //檢查讀鎖總數量是否超過最大值 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //CAS設置同步狀態state if (compareAndSetState(c, c + SHARED_UNIT)) { //當前線程得到第一個讀鎖 if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; //讀鎖重入 } else if (firstReader == current) { firstReaderHoldCount++; } else { //從緩存讀入計數器,提升效率 if (rh == null) rh = cachedHoldCounter; //計數器爲空或不是指向當前線程 if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
當tryAcquireShared嘗試獲取共享鎖失敗,返回-1,進入AQS同步隊列等待獲取共享鎖
private void doAcquireShared(int arg) { //將當前節點以共享型類型加入同步隊列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //前驅節點獲取到鎖,可能佔據鎖,也可能已經釋放鎖,調用tryAcquireShared嘗試獲取鎖 if (p == head) { int r = tryAcquireShared(arg); //獲取成功 if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } //與獨佔鎖ReentrantLock堵塞邏輯一致 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { //因中斷/超時,取消獲取鎖 if (failed) cancelAcquire(node); } }
釋放讀鎖,由內部類ReadLock提供unlock方法,調用了Sync父類AQS的方法:
public void unlock() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
tryReleaseShared返回true,即同步狀態爲0,不存在線程佔據讀鎖或寫鎖。
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); //當前線程是第一個得到讀鎖的線程 if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; //不是firstReader,更新計數器 } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; //徹底釋放鎖 if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } //重入鎖退出 --rh.count; } //CAS更新同步狀態, for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; } }
tryReleaseShared方法成功釋放鎖,調用doReleaseShared喚醒後繼節點。
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; //若是節點狀態爲 Node.SIGNAL,將狀態設置爲0,設置成功,喚醒線程。 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } //若是自己頭結點的waitStatus是出於重置狀態(waitStatus==0)的, //將其設置爲「傳播」狀態。意味着須要將狀態向後一個節點傳播。 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } }
獲取寫鎖,由內部類WriteLock提供lock方法,調用了Sync父類AQS的方法,重點解析一下tryAcquire實現:
public void lock() { sync.acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
內部類Sync重寫的tryAcquire方法:
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); //同步狀態不爲0 if (c != 0) { //其餘線程得到寫鎖,獲取失敗;w爲0而同步狀態不爲0,沒有線程佔據寫鎖,有線程佔據讀鎖 //注意:不存在讀鎖與寫鎖同時被多個線程獲取的狀況。 if (w == 0 || current != getExclusiveOwnerThread()) return false; //當前線程已經得到寫鎖,重入次數超過MAX_COUNT,失敗 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 鎖重入 setState(c + acquires); return true; } //公平策略檢查 //CAS設置同步狀態成功則得到寫鎖 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
當同步狀態state爲0時,tryRelease方法返回true。
protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //獨佔鎖,只有當前線程釋放同步狀態,不須要考慮併發 int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
讀寫鎖ReentrantReadWriteLock支持寫鎖降級,從下面能夠看到線程得到寫鎖後,在沒有釋放寫鎖的狀況下得到了讀鎖(鎖降級),而後在手動釋放寫鎖。這更像是一種特殊的鎖重入,因爲得到寫鎖有繼續得到讀鎖的須要,相對於釋放寫鎖再獲取讀鎖,直接去獲取讀鎖沒有其餘線程競爭,免去了因爲其餘線程得到寫鎖進入等待狀態的可能,效率更高。注意:鎖降級後須要手動釋放寫鎖,不然線程會一直持有獨佔鎖
讀寫鎖ReentrantReadWriteLock是不支持鎖升級的,若是一個得到了讀鎖的線程在持有讀鎖的狀況下嘗試獲取寫鎖,是不可能成功得到讀鎖的,由於得到寫鎖會判斷當前有沒有線程持有讀鎖,而嘗試鎖升級的線程自己讀鎖沒有釋放,因此會進入同步隊列等待同步狀態爲0獲取寫鎖,因爲讀鎖一直不釋放會致使其餘線程沒法獲取寫鎖(獲取寫鎖條件不能有其餘線程佔據讀鎖或寫鎖),只能獲取共享鎖讀鎖。所以ReentrantReadWriteLock是不支持讀寫鎖的。
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() { rwl.readLock().lock(); if (!cacheValid) { // 必須先釋放讀鎖再獲取寫鎖 rwl.readLock().unlock(); rwl.writeLock().lock(); try { if (!cacheValid) { data = ... cacheValid = true; } // 寫鎖未釋放得到讀鎖 rwl.readLock().lock(); } finally { //釋放寫鎖,降級爲讀鎖 rwl.writeLock().unlock(); } } try { use(data); } finally { rwl.readLock().unlock(); } }
讀寫鎖內部維護了共享鎖讀鎖和獨佔鎖寫鎖,讀鎖和寫鎖都支持重進入,當讀鎖已經被獲取(state高16位不爲0)或寫鎖已被其餘線程獲取,獲取寫鎖的線程進入等待狀態;當寫鎖已經被其餘線程獲取,獲取讀鎖的線程進入等待狀態。讀寫鎖支持由獨佔鎖(寫鎖)降級到(讀鎖),但不支持讀鎖升級到寫鎖,在使用時要考慮手動釋放好讀鎖與寫鎖的釋放,不然程序可能會出現意想不到的問題。