ReentrantReadWriteLock 內部維護了 ReadLock 跟 WriteLock類,state狀態高16bit表明讀鎖狀態 低16bit表明寫鎖狀態,Sync內部封裝了,WriteLock -> acquire(1) -> tryAcquire() ; ReadLock -> acquireShared -> tryAcqurieShare()方法。node
同事也支持公平鎖/非公平鎖 分別控制讀寫鎖不一樣的操做。若是熟悉AQS源碼,這個類其實仍是蠻簡單的。ui
WriteLock: 關鍵方法spa
// writeLock: 嘗試獲取鎖 protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); // 獲取獨佔線程數 int w = exclusiveCount(c); if (c != 0) { // 存在讀鎖 或者寫鎖 // (Note: if c != 0 and w == 0 then shared count != 0) // 若是 寫鎖爲空 讀鎖確定不爲空 此時不容許獲取寫鎖 若是w!=0 有寫鎖獲取到鎖 此時須要等待 if (w == 0 || current != getExclusiveOwnerThread()) return false; // 判斷是否超過寫鎖最大數 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire // 設置狀態+acquires (寫鎖是低16位 直接+acquires) setState(c + acquires); return true; } // 公平鎖: writerShouldBlock() 若是隊列有node 直接加入隊列 沒有直接獲取鎖 // 非公平鎖: writerShouldBlock() 返回false 競爭鎖失敗加入隊列 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
//writeLock: 嘗試釋放鎖 protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
WriteLock:關鍵代碼線程
//readLock 共享鎖 嘗試獲取鎖 protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); // 存在寫鎖 切不是重入鎖 return -1 加入隊列 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //沒有寫鎖 獲取讀鎖 int r = sharedCount(c); // 公平鎖 判斷是否隊列中有前置節點 非公平鎖 判斷next節點是不是獨佔節點 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // 成功獲取到讀鎖 增長 c+SHARED_UNIT if (r == 0) { // 沒有讀鎖 firstReader = current; firstReaderHoldCount = 1;// 計數 } else if (firstReader == current) { // 記錄重入鎖number firstReaderHoldCount++ firstReaderHoldCount++; } else {// 初始化count rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } //獲取讀鎖失敗,放到循環裏重試 return fullTryAcquireShared(current); }
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; // 開始循環 for (;;) { int c = getState(); // 存在讀鎖 切 不是重入鎖 寫鎖會降級成讀鎖 直接返回-1 加入隊列 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } else if (readerShouldBlock()) { //公平鎖: 判斷是否有next節點 非公平鎖 判斷next節點是不是獨佔節點 // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // 是不是第一個讀線程 // assert firstReaderHoldCount > 0; } else { // 更新 readHolds中 HoldCounter 計數 if (rh == null) { rh = cachedHoldCounter; // 獲取線程計數器 if (rh == null || rh.tid != getThreadId(current)) { // 比較線程id rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) // 是否超過讀最大限制 throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { // state + 1<<16 if (sharedCount(c) == 0) { // 讀鎖爲0 firstReader = current; // 設置第一個讀 避免查找 readHolds firstReaderHoldCount = 1; // 設置計數 } else if (firstReader == current) { // 是第一個 增長計數 firstReaderHoldCount++; } else { // 更新 readHolds中 HoldCounter 計數 if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // 是不是第一個讀鎖 // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) // 計數是否爲1 是的話設置爲空 gc firstReader = null; else firstReaderHoldCount--; // 減小計數 } else { // 判斷是夠計數爲0 爲0從 threadLocal中移除 不然 --rh.count; HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { // 設置state int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }
對於ReadLock 跟 WriteLock 支持的 tryLock() 分別調用 sync 中的 tryWriteLock() 跟 tryReadLock();code
final boolean tryWriteLock() { Thread current = Thread.currentThread(); int c = getState(); // 存在讀寫鎖 if (c != 0) { // 獲取寫鎖 int w = exclusiveCount(c); // case 1:存在讀鎖 case 2: 存在寫鎖 切不是重入鎖 if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w == MAX_COUNT) throw new Error("Maximum lock count exceeded"); } // 競爭鎖 if (!compareAndSetState(c, c + 1)) return false; setExclusiveOwnerThread(current); return true;
final boolean tryReadLock() { Thread current = Thread.currentThread(); for (;;) { int c = getState(); // 存在寫鎖 且 不是重入鎖 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return false; // 獲取獲取讀鎖線程數量 int r = sharedCount(c); if (r == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 競爭鎖 if (compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return true; } } }