class CachedData { Object data; volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() { rwl.readLock().lock(); if (!cacheValid) { // 必須先釋放讀鎖再獲取寫鎖 rwl.readLock().unlock(); rwl.writeLock().lock(); try { //再次檢查cacheValid防止其餘線程得到寫鎖改變cacheValid值 if (!cacheValid) { data = ... cacheValid = true; } // 寫鎖降級爲讀鎖 rwl.readLock().lock(); } finally { //釋放寫鎖 rwl.writeLock().unlock(); } } try { use(data); } finally { rwl.readLock().unlock(); } } }
//偏移量 static final int SHARED_SHIFT = 16; //線程得到讀鎖,state加SHARED_UNIT,state高16位SHARED_UNIT個數表明了有多少個共享鎖 static final int SHARED_UNIT = (1 << SHARED_SHIFT); //讀寫鎖重入最多不超過65535 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** Returns the number of shared holds represented in count */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** Returns the number of exclusive holds represented in count */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
從圖中能夠看到,當前線程獲取了寫鎖,重進入了3次,連續得到了兩次讀鎖,每次得到寫鎖,就把state加1,而低16位總共最大是65535,就是MAX_COUNT的值。每得到一次讀鎖,就把state加SHARED_COUNT。那麼如何獲取讀寫狀態呢?只要經過位運算取出高16位或低16位就好了,對於讀狀態,state>>>SHARED_SHIFT(無符號補0右移16位)就能夠獲得加了多少次SHARED_UNIT從而得到讀狀態;對於寫狀態,state & EXCLUSIVE_MASK(0X0000FFFF,高16位都變爲0,低16位不變)就能夠得到寫狀態。併發
abstract static class Sync extends AbstractQueuedSynchronizer { static final class HoldCounter { //計數器,用於統計線程重入讀鎖次數 int count = 0; // Use id, not reference, to avoid garbage retention //線程TID,區分線程,能夠惟一標識一個線程 final long tid = getThreadId(Thread.currentThread()); } static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { //重寫初始化方法,在沒有進行set的狀況下,獲取的都是該HoldCounter值 public HoldCounter initialValue() { return new HoldCounter(); } } private transient ThreadLocalHoldCounter readHolds; private transient HoldCounter cachedHoldCounter; private transient Thread firstReader = null; private transient int firstReaderHoldCount; Sync() { //本地線程讀鎖計數器 readHolds = new ThreadLocalHoldCounter(); setState(getState()); // ensures visibility of readHolds } }
//獲取讀鎖 public void lock() { sync.acquireShared(1); } //獲取共享鎖 public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
protected final int tryAcquireShared(int unused) { //當前線程 Thread current = Thread.currentThread(); //同步狀態state int c = getState(); //檢查獨佔鎖是否被佔據,若是被佔據,是不是當前線程獲取了獨佔鎖 //若是是當前線程獲取了寫鎖,能夠繼續獲取讀鎖,若是都不是返回-1表示獲取失敗 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //讀鎖數量 int r = sharedCount(c); //!readerShouldBlock() 根據公平與否策略和隊列是否含有等待節點決定當前線程是否繼續獲取鎖 //不能大於65535且CAS修改爲功 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { //若是沒有線程獲取過讀鎖 if (r == 0) { //將當前線程設置爲第一個讀鎖線程 firstReader = current; // 計數器爲一 firstReaderHoldCount = 1; //讀鎖重入 } else if (firstReader == current) { //計數器加一 firstReaderHoldCount++; } else { // 若是不是第一個線程,獲取鎖成功 // cachedHoldCounter 表明的是最後一個獲取讀鎖的線程的計數器 HoldCounter rh = cachedHoldCounter; // 若是計數器是 null 或者不指向當前線程,那麼就新建一個 HoldCounter 對象 if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); //計數器爲0,保存到readHolds中 else if (rh.count == 0) readHolds.set(rh); //計數器加一 rh.count++; } return 1; } return fullTryAcquireShared(current); }
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; //死循環 for (;;) { //同步狀態 int c = getState(); //檢查寫鎖獲取狀況 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; //進入到這裏,說明沒有其餘線程獲取寫鎖 //公平鎖策略檢查 } else if (readerShouldBlock()) { //readerShouldBlock()返回true,應該堵塞,檢查是否獲取過讀鎖 // 第一個獲取讀鎖線程是當前線程,重入 if (firstReader == current) { } else { //循環中,若計數器爲null if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } //須要阻塞且是非重入(還未獲取讀鎖的),獲取失敗。 if (rh.count == 0) return -1; } } //檢查讀鎖總數量是否超過最大值 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //CAS設置同步狀態state if (compareAndSetState(c, c + SHARED_UNIT)) { //當前線程得到第一個讀鎖 if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; //讀鎖重入 } else if (firstReader == current) { firstReaderHoldCount++; } else { //從緩存讀入計數器,提升效率 if (rh == null) rh = cachedHoldCounter; //計數器爲空或不是指向當前線程 if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
private void doAcquireShared(int arg) { //將當前節點以共享型類型加入同步隊列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //前驅節點獲取到鎖,可能佔據鎖,也可能已經釋放鎖,調用tryAcquireShared嘗試獲取鎖 if (p == head) { int r = tryAcquireShared(arg); //獲取成功 if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } //與獨佔鎖ReentrantLock堵塞邏輯一致 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { //因中斷/超時,取消獲取鎖 if (failed) cancelAcquire(node); } }
public void unlock() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); //當前線程是第一個得到讀鎖的線程 if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; //不是firstReader,更新計數器 } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; //徹底釋放鎖 if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } //重入鎖退出 --rh.count; } //CAS更新同步狀態, for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; } }
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; //若是節點狀態爲 Node.SIGNAL,將狀態設置爲0,設置成功,喚醒線程。 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } //若是自己頭結點的waitStatus是出於重置狀態(waitStatus==0)的, //將其設置爲「傳播」狀態。意味着須要將狀態向後一個節點傳播。 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } }
public void lock() { sync.acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); //同步狀態不爲0 if (c != 0) { //其餘線程得到寫鎖,獲取失敗;w爲0而同步狀態不爲0,沒有線程佔據寫鎖,有線程佔據讀鎖 //注意:不存在讀鎖與寫鎖同時被多個線程獲取的狀況。 if (w == 0 || current != getExclusiveOwnerThread()) return false; //當前線程已經得到寫鎖,重入次數超過MAX_COUNT,失敗 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 鎖重入 setState(c + acquires); return true; } //公平策略檢查 //CAS設置同步狀態成功則得到寫鎖 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //獨佔鎖,只有當前線程釋放同步狀態,不須要考慮併發 int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() { rwl.readLock().lock(); if (!cacheValid) { // 必須先釋放讀鎖再獲取寫鎖 rwl.readLock().unlock(); rwl.writeLock().lock(); try { if (!cacheValid) { data = ... cacheValid = true; } // 寫鎖未釋放得到讀鎖 rwl.readLock().lock(); } finally { //釋放寫鎖,降級爲讀鎖 rwl.writeLock().unlock(); } } try { use(data); } finally { rwl.readLock().unlock(); } }