ReadWriteLock管理一組鎖,一個是隻讀的鎖,一個是寫鎖。讀鎖能夠在沒有寫鎖的時候被多個線程同時持有,寫鎖是獨佔的。相對於互斥鎖而言,ReadWriteLoc容許更高的併發量。java
全部讀寫鎖的實現必須確保寫操做對讀操做的內存影響。換句話說,一個得到了讀鎖的線程必須能看到前一個釋放的寫鎖所更新的內容。緩存
public interface ReadWriteLock { Lock readLock();//獲取讀鎖 Lock writeLock();//獲取寫鎖 }
ReetrantReadWriteLock實現了ReadWriteLock接口並添加了可重入的特性。併發
class ReadWriteLockTest { String data;//緩存中的對象 volatile boolean cacheValid;//緩存是否還有效 final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() throws InterruptedException { rwl.readLock().lock(); if (!cacheValid) {//緩存失效須要再次讀取緩存 //在讀取緩存前,必須釋放讀鎖 rwl.readLock().unlock(); rwl.writeLock().lock(); try { //再次檢查是否須要再次讀取緩存,若是須要則 if (!cacheValid) { data = Thread.currentThread().getName()+":緩存數據測試,實際開發多是一個對象!"; cacheValid = true; } //在釋放以前,經過獲取讀鎖降級寫鎖 rwl.readLock().lock(); } finally { rwl.writeLock().unlock(); //釋放寫鎖,持有讀鎖 } } try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName()+"【"+data+"】"); } finally { rwl.readLock().unlock(); } } public static void main(String[] args) throws InterruptedException { ReadWriteLockTest readWriteLockTest = new ReadWriteLockTest(); for(int i=0;i<10;i++){ Thread thread = new Thread(()->{ try { readWriteLockTest.processCachedData(); } catch (InterruptedException e) { e.printStackTrace(); } }); thread.start(); } } }
public ReentrantReadWriteLock() { this(false);//默認爲非公平模式 } public ReentrantReadWriteLock(boolean fair) { //決定了Sync是FairSync仍是NonfairSync。Sync繼承了AbstractQueuedSynchronizer,而Sync是一個抽象類,NonfairSync和FairSync繼承了Sync,並重寫了其中的抽象方法。 sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
static final class FairSync extends Sync { private static final long serialVersionUID = -2274990926593161451L; final boolean writerShouldBlock() { return hasQueuedPredecessors(); } final boolean readerShouldBlock() { return hasQueuedPredecessors(); } }
writerShouldBlock和readerShouldBlock方法都表示當有別的線程也在嘗試獲取鎖時,是否應該阻塞。app
對於公平模式,hasQueuedPredecessors()方法表示前面是否有等待線程。一旦前面有等待線程,那麼爲了遵循公平,當前線程也就應該被掛起。工具
static final class NonfairSync extends Sync { private static final long serialVersionUID = -8159625535654395037L; final boolean writerShouldBlock() { return false; // 寫線程無需阻塞 } final boolean readerShouldBlock() { //apparentlyFirstQueuedIsExclusive在當前線程是寫鎖佔用的線程時,返回true;不然返回false。也就說明,若是當前有一個寫線程正在寫,那麼該讀線程應該阻塞。 return apparentlyFirstQueuedIsExclusive(); } }
繼承AQS的類都須要使用state變量表明某種資源,ReentrantReadWriteLock中的state表明了讀鎖的數量和寫鎖的持有與否,整個結構以下: 能夠看到state的高16位表明讀鎖的個數;低16位表明寫鎖的狀態。源碼分析
public void lock() { sync.acquireShared(1); }
讀鎖使用的是AQS的共享模式,AQS的acquireShared方法以下:測試
if (tryAcquireShared(arg) < 0) doAcquireShared(arg);
當tryAcquireShared()方法小於0時,那麼會執行doAcquireShared方法將該線程加入到等待隊列中。ui
Sync實現了tryAcquireShared方法,以下:this
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); //若是當前有寫線程而且本線程不是寫線程,不符合重入,失敗. //在獲取讀鎖時,若是有寫線程,則獲取失敗 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //獲得讀鎖的個數 int r = sharedCount(c); //若是讀不該該阻塞而且讀鎖的個數小於最大值65535,而且能夠成功更新狀態值,成功 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) {//若是當前讀鎖爲0 firstReader = current;//第一個讀線程就是當前線程 firstReaderHoldCount = 1;//第一個線程持有讀鎖的個數 } //若是當前線程重入了,記錄firstReaderHoldCount else if (firstReader == current) { firstReaderHoldCount++; } //當前讀線程和第一個讀線程不一樣,記錄每個線程讀的次數 else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } //不然,循環嘗試 return fullTryAcquireShared(current); }
從上面的代碼以及註釋能夠看到,分爲三步:線程
fullTryAcquiredShared方法
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); //一旦有別的線程得到了寫鎖,而且得到寫鎖的線程不是本線程,返回-1,失敗 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; } //若是讀線程須要阻塞 else if (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; } //說明有別的讀線程佔有了鎖 else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } //若是讀鎖達到了最大值,拋出異常 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //若是成功更改狀態,成功返回 if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
public void lock() { sync.acquire(1); }
AQS的acquire方法以下:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
從上面能夠看到,寫鎖使用的是AQS的獨佔模式。首先嚐試獲取鎖,若是獲取失敗,那麼將會把該線程加入到等待隊列中。
Sync實現了tryAcquire方法用於嘗試獲取一把鎖,以下:
protected final boolean tryAcquire(int acquires) { //獲得調用lock方法的當前線程 Thread current = Thread.currentThread(); int c = getState(); //獲得寫鎖的個數 int w = exclusiveCount(c); //若是當前有寫鎖或者讀鎖.(對於讀鎖而言,若是當前寫線程能夠進行寫操做,那麼讀線程讀到的數據可能有誤) if (c != 0) { // 若是寫鎖爲0或者當前線程不是獨佔線程(不符合重入),返回false if (w == 0 || current != getExclusiveOwnerThread()) return false; //若是寫鎖的個數超過了最大值(65535),拋出異常 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 寫鎖重入,返回true setState(c + acquires); return true; } //若是當前沒有寫鎖或者讀鎖,若是寫線程應該阻塞或者CAS失敗,返回false if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; //不然將當前線程置爲得到寫鎖的線程,返回true setExclusiveOwnerThread(current); return true; }
ReadLock的unlock方法以下:
public void unlock() { sync.releaseShared(1); }
調用了Sync的releaseShared方法,該方法在AQS中提供,以下:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
調用tryReleaseShared方法嘗試釋放鎖,若是釋放成功,調用doReleaseShared嘗試喚醒下一個節點。
AQS的子類須要實現tryReleaseShared方法,Sync中的實現以下:
protected final boolean tryReleaseShared(int unused) { //獲得調用unlock的線程 Thread current = Thread.currentThread(); //若是是第一個得到讀鎖的線程 if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } //不然,是HoldCounter中計數-1 else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } //死循環 for (;;) { int c = getState(); //釋放一把讀鎖 int nextc = c - SHARED_UNIT; //若是CAS更新狀態成功,返回讀鎖是否等於0;失敗的話,則重試 if (compareAndSetState(c, nextc)) //釋放讀鎖對讀線程沒有影響,可是可能會使等待的寫線程解除掛起開始運行。因此,一旦沒有鎖了,就返回true,不然false;返回true後,那麼則須要釋放等待隊列中的線程,這時讀線程和寫線程都有可能再得到鎖。 return nextc == 0; } }
WriteLock的unlock方法以下:
public void unlock() { sync.release(1); }
Sync的release方法使用的AQS中的,以下:
public final boolean release(int arg) { if (tryRelease(arg)) {//嘗試釋放鎖 Node h = head; if (h != null && h.waitStatus != 0)//若是等待隊列中有線程再等待 unparkSuccessor(h);//將下一個線程解除掛起。 return true; } return false; }
Sync須要實現tryRelease方法,以下:
protected final boolean tryRelease(int releases) { //若是沒有線程持有寫鎖,可是仍要釋放,拋出異常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; //若是沒有寫鎖了,那麼將AQS的線程置爲null if (free) setExclusiveOwnerThread(null); //更新狀態 setState(nextc); return free;//此處返回當且僅當free爲0時返回,若是當前是寫鎖被佔有了,只有當寫鎖的數據降爲0時才認爲釋放成功;不然失敗。由於只要有寫鎖,那麼除了佔有寫鎖的那個線程,其餘線程即不能夠得到讀鎖,也不能得到寫鎖 }
getOwner方法用於返回當前得到寫鎖的線程,若是沒有線程佔有寫鎖,那麼返回null。實現以下:
protected Thread getOwner() { return sync.getOwner(); }
能夠看到直接調用了Sync的getOwner方法,下面是Sync的getOwner方法:
final Thread getOwner() { // Must read state before owner to ensure memory consistency //若是獨佔鎖的個數爲0,說明沒有線程佔有寫鎖,那麼返回null;不然返回佔有寫鎖的線程。 return ((exclusiveCount(getState()) == 0) ?null :getExclusiveOwnerThread()); }
getReadLockCount()方法用於返回讀鎖的個數,實現以下:
public int getReadLockCount() { return sync.getReadLockCount(); }
Sync的實現以下:
final int getReadLockCount() { return sharedCount(getState()); } //要想獲得讀鎖的個數,就是看AQS的state的高16位。這和前面講過的同樣,高16位表示讀鎖的個數,低16位表示寫鎖的個數。 static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
getReadHoldCount()方法用於返回當前線程所持有的讀鎖的個數,若是當前線程沒有持有讀鎖,則返回0。直接看Sync的實現便可:
final int getReadHoldCount() { //若是沒有讀鎖,天然每一個線程都是返回0 if (getReadLockCount() == 0) return 0; //獲得當前線程 Thread current = Thread.currentThread(); //若是當前線程是第一個讀線程,返回firstReaderHoldCount參數 if (firstReader == current) return firstReaderHoldCount; //若是當前線程不是第一個讀線程,獲得HoldCounter,返回其中的count HoldCounter rh = cachedHoldCounter; //若是緩存的HoldCounter不爲null而且是當前線程的HoldCounter,直接返回count if (rh != null && rh.tid == getThreadId(current)) return rh.count; //若是緩存的HoldCounter不是當前線程的HoldCounter,那麼從ThreadLocal中獲得本線程的HoldCounter,返回計數 int count = readHolds.get().count; //若是本線程持有的讀鎖爲0,從ThreadLocal中移除 if (count == 0) readHolds.remove(); return count; }
從上面的代碼中,能夠看到兩個熟悉的變量,firstReader和HoldCounter類型。這兩個變量在讀鎖的獲取中接觸過,前面沒有細說,這裏細說一下。HoldCounter類的實現以下:
static final class HoldCounter { int count = 0; // Use id, not reference, to avoid garbage retention final long tid = getThreadId(Thread.currentThread()); }
readHolds是ThreadLocalHoldCounter類,定義以下:
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } }
能夠看到,readHolds存儲了每個線程的HoldCounter,而HoldCounter中的count變量就是用來記錄線程得到的寫鎖的個數。因此能夠得出結論:Sync維持總的讀鎖的個數,在state的高16位;因爲讀線程能夠同時存在,因此每一個線程還保存了得到的讀鎖的個數,這個是經過HoldCounter來保存的。 除此以外,對於第一個讀線程有特殊的處理,Sync中有以下兩個變量:
private transient Thread firstReader = null;//第一個獲得讀鎖的線程 private transient int firstReaderHoldCount;//第一個線程得到的寫鎖
其他獲取到讀鎖的線程的信息保存在HoldCounter中。
看完了HoldCounter和firstReader,再來看一下getReadLockCount的實現,主要有三步:
getWriteLockCount()方法返回寫鎖的個數,Sync的實現以下:
final int getWriteHoldCount() { return isHeldExclusively() ? exclusiveCount(getState()) : 0; }
能夠看到若是沒有線程持有寫鎖,那麼返回0;不然返回AQS的state的低16位。
當分析ReentranctReadWriteLock時,或者說分析內部使用AQS實現的工具類時,須要明白的就是AQS的state表明的是什麼。ReentrantLockReadWriteLock中的state同時表示寫鎖和讀鎖的個數。爲了實現這種功能,state的高16位表示讀鎖的個數,低16位表示寫鎖的個數。AQS有兩種模式:共享模式和獨佔模式,讀寫鎖的實現中,讀鎖使用共享模式;寫鎖使用獨佔模式;另一點須要記住的即便,當有讀鎖時,寫鎖就不能得到;而當有寫鎖時,除了得到寫鎖的這個線程能夠得到讀鎖外,其餘線程不能得到讀鎖。