以前ReentrantReadWriteLock講了讀寫鎖的場景,這邊來說他的源碼,以非公平鎖爲例,其實和公平鎖主要代碼是一致的。segmentfault
static final int SHARED_SHIFT = 16;//高16位是共享,用於讀,低16位是獨佔,用於寫,用一個字段保證原子性 static final int SHARED_UNIT = (1 << SHARED_SHIFT);//左移16位,也就是高位的最後一個是0000 0000 0000 0001 0000 0000 0000 0000 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;//最大讀的數量,正常不會這麼多 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;//左移16位,再減1,也就是0000 0000 0000 0000 1111 1111 1111 1111 static int sharedCount(int c) { return c >>> SHARED_SHIFT; }//無符號右移16位 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }//返回的不爲1,說明有寫鎖,由於低16位都是1,1與1爲1,若是有些,確定有個爲1 static final class HoldCounter {//每一個線程持有的鎖的數量 int count = 0; // Use id, not reference, to avoid garbage retention final long tid = getThreadId(Thread.currentThread()); } static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {//本地線程 public HoldCounter initialValue() { return new HoldCounter(); } } private transient ThreadLocalHoldCounter readHolds;//本地線程,記錄持有的鎖的數量信息 private transient HoldCounter cachedHoldCounter;//緩存HoldCounter 的數據 private transient Thread firstReader = null;//第一個獲取讀鎖的線程 private transient int firstReaderHoldCount;//第一個獲取讀鎖的線程持有的數量
public void lock() { sync.acquireShared(1); } public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0)//小於0沒獲取到鎖 doAcquireShared(arg); } protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread();//獲取本地線程 int c = getState();//獲取state的值 if (exclusiveCount(c) != 0 &&//不爲0說明有寫鎖,緣由上面分析了 getExclusiveOwnerThread() != current)//不是當前線程,說明不是重入 return -1; int r = sharedCount(c);//獲取讀鎖的個數 if (!readerShouldBlock() &&//讀鎖無堵塞 r < MAX_COUNT &&//讀鎖沒到最大值 compareAndSetState(c, c + SHARED_UNIT)) {//cas操做,高位加1成功說明獲取到了讀鎖 if (r == 0) {//等於0說明第一個獲取讀鎖 firstReader = current;//當前線程就是第一個 firstReaderHoldCount = 1;//數量爲1 } else if (firstReader == current) {//若是不是第一個,可是是當前線程 firstReaderHoldCount++;//數量加1 } else {//既不是第一個,也不是當前線程 HoldCounter rh = cachedHoldCounter;//獲取緩存HoldCounter if (rh == null || rh.tid != getThreadId(current))//若是不爲空,或者經過線程id對比不是當前線程 cachedHoldCounter = rh = readHolds.get();//緩存設置爲當前線程 else if (rh.count == 0)//緩存的是當前線程,並且鎖的數量爲0,加入到本地緩存,若是數量不爲0,說明已經在本地緩存了 readHolds.set(rh); rh.count++;//鎖的數量加1 } return 1; } return fullTryAcquireShared(current); } //若是阻塞或者cas失敗的狀況,再重試獲取鎖 final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) {// int c = getState(); if (exclusiveCount(c) != 0) {//上面分析了,若是是寫鎖,而且不是當前線程,放棄 if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } else if (readerShouldBlock()) {//阻塞的狀況 // Make sure we're not acquiring read lock reentrantly if (firstReader == current) {//當前線程是第一個不處理 // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove();//若是是0,移除本地緩存 } } if (rh.count == 0) return -1;// } } if (sharedCount(c) == MAX_COUNT)//讀鎖數量太大,拋異常 throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) {//沒有讀鎖 firstReader = current;//當前線程就是第一個 firstReaderHoldCount = 1;//數量爲1 } else if (firstReader == current) {//若是不是第一個,可是是當前線程 firstReaderHoldCount++;//數量加1 } else {//既不是第一個,也不是當前線程 if (rh == null) rh = cachedHoldCounter;//獲取緩存HoldCounter if (rh == null || rh.tid != getThreadId(current))//若是不爲空,或者經過線程id對比不是當前線程 rh = readHolds.get();//緩存設置爲當前線程 else if (rh.count == 0)//緩存的是當前線程,並且鎖的數量爲0,加入到本地緩存,若是數量不爲0,說明已經在本地緩存了 readHolds.set(rh); rh.count++;//鎖的數量加1 cachedHoldCounter = rh; // cache for release } return 1; } } }
public void unlock() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) {//若是第一個是當前線程 // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1)//若是數量爲1,就是直接設爲空 firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) {//數量小於等於1,移除 readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0;//寫鎖和讀鎖爲0,無鎖 } }
public void lock() { sync.acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//失敗了就進入阻塞隊列 selfInterrupt(); } protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread();//獲取當前線程 int c = getState();//獲取state int w = exclusiveCount(c);//不爲0說明有寫鎖 if (c != 0) {//有讀或者寫鎖 // 無寫鎖或者讀鎖被佔 if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); return true; } if (writerShouldBlock() ||//無阻塞 !compareAndSetState(c, c + acquires))//設置成功 return false; setExclusiveOwnerThread(current); return true; }
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h);//喚醒 return true; } return false; } protected final boolean tryRelease(int releases) { if (!isHeldExclusively())//不是獨佔鎖,拋異常 throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0;//寫鎖都釋放了 if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }