咱們在介紹AbstractQueuedSynchronizer的時候介紹過,AQS支持獨佔式同步狀態獲取/釋放、共享式同步狀態獲取/釋放兩種模式,對應的典型應用分別是ReentrantLock和Semaphore,AQS還能夠混合兩種模式使用,讀寫鎖ReentrantReadWriteLock就是如此。java
設想如下情景:咱們在系統中有一個多線程訪問的緩存,多個線程均可以對緩存進行讀或寫操做,可是讀操做遠遠多於寫操做,要求寫操做要線程安全,且寫操做執行完成要求對當前的全部讀操做立刻可見。緩存
分析上面的需求:由於有多個線程可能會執行寫操做,所以多個線程的寫操做必須同步串行執行;而寫操做執行完成要求對當前的全部讀操做立刻可見,這就意味着當有線程正在讀的時候,要阻塞寫操做,當正在執行寫操做時,要阻塞讀操做。一個簡單的實現就是將數據直接加上互斥鎖,同一時刻不論是讀仍是寫線程,都只能有一個線程操做數據。可是這樣的問題就是若是當前只有N個讀線程,沒有寫線程,這N個讀線程也要傻呵呵的排隊讀,儘管實際上是能夠安全併發提升效率的。所以理想的實現是:安全
當有寫線程時,則寫線程獨佔同步狀態。多線程
當沒有寫線程時只有讀線程時,則多個讀線程能夠共享同步狀態。併發
讀寫鎖就是爲了實現這種效果而生。函數
咱們先來看一下讀寫鎖怎麼使用,這裏咱們基於hashmap(自己線程不安全)作一個多線程併發安全的緩存:源碼分析
public class ReadWriteCache { private static Map<String, Object> data = new HashMap<>(); private static ReadWriteLock lock = new ReentrantReadWriteLock(false); private static Lock rlock = lock.readLock(); private static Lock wlock = lock.writeLock(); public static Object get(String key) { rlock.lock(); try { return data.get(key); } finally { rlock.unlock(); } } public static Object put(String key, Object value) { wlock.lock(); try { return data.put(key, value); } finally { wlock.unlock(); } } }
限於篇幅咱們只實現2個方法,get和put。從代碼能夠看出,咱們先建立一個 ReentrantReadWriteLock 對象,構造函數 false 表明是非公平的(非公平的含義和ReentrantLock相同)。而後經過readLock、writeLock方法分別獲取讀鎖和寫鎖。在作讀操做的時候,也就是get方法,咱們要先獲取讀鎖;在作寫操做的時候,即put方法,咱們要先獲取寫鎖。ui
經過以上代碼,咱們就構造了一個線程安全的緩存,達到咱們以前說的:寫線程獨佔同步狀態,多個讀線程能夠共享同步狀態。this
咱們先來看下 ReentrantReadWriteLock 類的總體結構:spa
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { private final ReentrantReadWriteLock.ReadLock readerLock; /** Inner class providing writelock */ private final ReentrantReadWriteLock.WriteLock writerLock; /** Performs all synchronization mechanics */ final Sync sync; public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; } abstract static class Sync extends AbstractQueuedSynchronizer {}
static final class NonfairSync extends Sync {}
static final class FairSync extends Sync {}
public static class ReadLock implements Lock, java.io.Serializable {}
public static class WriteLock implements Lock, java.io.Serializable {} }
能夠看到,在公平鎖與非公平鎖的實現上,與ReentrantLock同樣,也是有一個繼承AQS的內部類Sync,而後NonfairSync和FairSync都繼承Sync,經過構造函數傳入的布爾值決定要構造哪種Sync實例。
讀寫鎖比ReentrantLock多出了兩個內部類:ReadLock和WriteLock, 用來定義讀鎖和寫鎖,而後在構造函數中,會構造一個讀鎖和一個寫鎖實例保存到成員變量 readerLock 和 writerLock。咱們在上面的示例中使用到的 readLock() 和 writeLock() 方法就是返回這兩個成員變量保存的鎖實例。
咱們在Sync類中能夠看到下列代碼:
static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); //每次要讓共享鎖+1,就應該讓state加 1<<16 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; //每種鎖的最大重入數量 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** Returns the number of shared holds represented in count */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** Returns the number of exclusive holds represented in count */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
能夠看到主要是幾個位移操做,經過上面的總體結構,咱們知道了在讀寫鎖內保存了讀鎖和寫鎖的兩個實例。以前在ReentrantLock中,咱們知道鎖的狀態是保存在Sync實例的state字段中的(繼承自父類AQS),如今有了讀寫兩把鎖,然而能夠看到仍是隻有一個Sync實例,那麼一個Sync實例的state是如何同時保存兩把鎖的狀態的呢?答案就是用了位分隔:
state字段是32位的int,讀寫鎖用state的低16位保存寫鎖(獨佔鎖)的狀態;高16位保存讀鎖(共享鎖)的狀態。
所以要獲取獨佔鎖當前的重入數量,就是 state & ((1 << 16) -1) (即 exclusiveCount 方法)
要獲取共享鎖當前的重入數量,就是 state >>> 16 (即 sharedCount 方法)
下面咱們具體看寫鎖和讀鎖的實現。
看下WriteLock類中的lock和unlock方法:
public void lock() { sync.acquire(1); } public void unlock() { sync.release(1); }
能夠看到就是調用的獨佔式同步狀態的獲取與釋放,所以真實的實現就是Sync的 tryAcquire和 tryRelease。
看下tryAcquire:
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); //獲取獨佔鎖的重入數 if (c != 0) { // 當前state不爲0,此時:若是寫鎖狀態爲0說明讀鎖此時被佔用返回false;若是寫鎖狀態不爲0且寫鎖沒有被當前線程持有返回false if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); //寫鎖重入數溢出 // Reentrant acquire setState(c + acquires); return true; }
//到這裏了說明state爲0,嘗試直接cas。writerShouldBlock是爲了實現公平或非公平策略的 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
邏輯很簡單,直接看註釋就能理解。
看下tryRelease:
protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //非獨佔模式直接拋異常 int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); //若是獨佔模式重入數爲0了,說明獨佔模式被釋放 setState(nextc); //無論獨佔模式是否被釋放,更新獨佔重入數 return free; }
邏輯很簡單,直接看註釋就能理解。
相似於寫鎖,讀鎖的lock和unlock的實際實現對應Sync的 tryAcquireShared 和 tryReleaseShared方法。
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //若是獨佔模式被佔且不是當前線程持有,則獲取失敗 int r = sharedCount(c);
//若是公平策略沒有要求阻塞且重入數沒有到達最大值,則直接嘗試CAS更新state if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
//更新成功後會在firstReaderHoldCount中或readHolds(ThreadLocal類型的)的本線程副本中記錄當前線程重入數(淺藍色代碼),這是爲了實現jdk1.6中加入的getReadHoldCount()方法的,這個方法能獲取當前線程重入共享鎖的次數(state中記錄的是多個線程的總重入次數),加入了這個方法讓代碼複雜了很多,可是其原理仍是很簡單的:若是當前只有一個線程的話,還不須要動用ThreadLocal,直接往firstReaderHoldCount這個成員變量裏存重入數,當有第二個線程來的時候,就要動用ThreadLocal變量readHolds了,每一個線程擁有本身的副本,用來保存本身的重入數。 if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); //用來處理CAS沒成功的狀況,邏輯和上面的邏輯是相似的,就是加了無限循環 }
下面這個方法就不用細說了,和上面的處理邏輯相似,加了無限循環用來處理CAS失敗的狀況。
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } else if (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread();
//淺藍色代碼也是爲了實現jdk1.6中加入的getReadHoldCount()方法,在更新當前線程的重入數。 if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; }
//這裏是真正的釋放同步狀態的邏輯,就是直接同步狀態-SHARED_UNIT,而後CAS更新,沒啥好說的 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }
經過上面的源碼分析,咱們能夠發現一個現象:
在線程持有讀鎖的狀況下,該線程不能取得寫鎖(由於獲取寫鎖的時候,若是發現當前的讀鎖被佔用,就立刻獲取失敗,無論讀鎖是否是被當前線程持有)
在線程持有寫鎖的狀況下,該線程能夠繼續獲取讀鎖(獲取讀鎖時若是發現寫鎖被佔用,只有寫鎖沒有被當前線程佔用的狀況纔會獲取失敗)
仔細想一想,這個設計是合理的:由於當線程獲取讀鎖的時候,可能有其餘線程同時也在持有讀鎖,所以不能把獲取讀鎖的線程「升級」爲寫鎖;而對於得到寫鎖的線程,它必定獨佔了讀寫鎖,所以能夠繼續讓它獲取讀鎖,當它同時獲取了寫鎖和讀鎖後,還能夠先釋放寫鎖繼續持有讀鎖,這樣一個寫鎖就「降級」爲了讀鎖。
綜上:
一個線程要想同時持有寫鎖和讀鎖,必須先獲取寫鎖再獲取讀鎖;
寫鎖能夠「降級」爲讀鎖;
讀鎖不能「升級」爲寫鎖。
讀寫鎖仍是很實用的,由於通常場景下,數據的併發操做都是讀多於寫,在這種狀況下,讀寫鎖可以提供比排它鎖更好的併發性。
在讀寫鎖的實現方面,原本覺得會比較複雜,結果看完源碼的感覺也是快刀切西瓜,看來AQS的設計真的很棒,在AQS的基礎上構建的組件實現都很簡單。