(1)讀寫鎖是什麼?java
(2)讀寫鎖具備哪些特性?node
(3)ReentrantReadWriteLock是怎麼實現讀寫鎖的?緩存
(4)如何使用ReentrantReadWriteLock實現高效安全的TreeMap?安全
讀寫鎖是一種特殊的鎖,它把對共享資源的訪問分爲讀訪問和寫訪問,多個線程能夠同時對共享資源進行讀訪問,可是同一時間只能有一個線程對共享資源進行寫訪問,使用讀寫鎖能夠極大地提升併發量。併發
讀寫鎖具備如下特性:高併發
是否互斥 | 讀 | 寫 |
---|---|---|
讀 | 否 | 是 |
寫 | 是 | 是 |
能夠看到,讀寫鎖除了讀讀不互斥,讀寫、寫讀、寫寫都是互斥的。oop
那麼,ReentrantReadWriteLock是怎麼實現讀寫鎖的呢?源碼分析
在看源碼以前,咱們仍是先來看一下ReentrantReadWriteLock這個類的主要結構。ui
ReentrantReadWriteLock中的類分紅三個部分:this
(1)ReentrantReadWriteLock自己實現了ReadWriteLock接口,這個接口只提供了兩個方法readLock()
和writeLock()
;
(2)同步器,包含一個繼承了AQS的Sync內部類,以及其兩個子類FairSync和NonfairSync;
(3)ReadLock和WriteLock兩個內部類實現了Lock接口,它們具備鎖的一些特性。
// 讀鎖 private final ReentrantReadWriteLock.ReadLock readerLock; // 寫鎖 private final ReentrantReadWriteLock.WriteLock writerLock; // 同步器 final Sync sync;
維護了讀鎖、寫鎖和同步器。
// 默認構造方法 public ReentrantReadWriteLock() { this(false); } // 是否使用公平鎖的構造方法 public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }
它提供了兩個構造方法,默認構造方法使用的是非公平鎖模式,在構造方法中初始化了讀鎖和寫鎖。
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
屬性中的讀鎖和寫鎖是私有屬性,經過這兩個方法暴露出去。
下面咱們主要分析讀鎖和寫鎖的加鎖、解鎖方法,且都是基於非公平模式的。
// ReentrantReadWriteLock.ReadLock.lock() public void lock() { sync.acquireShared(1); } // AbstractQueuedSynchronizer.acquireShared() public final void acquireShared(int arg) { // 嘗試獲取共享鎖(返回1表示成功,返回-1表示失敗) if (tryAcquireShared(arg) < 0) // 失敗了就可能要排隊 doAcquireShared(arg); } // ReentrantReadWriteLock.Sync.tryAcquireShared() protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); // 狀態變量的值 // 在讀寫鎖模式下,高16位存儲的是共享鎖(讀鎖)被獲取的次數,低16位存儲的是互斥鎖(寫鎖)被獲取的次數 int c = getState(); // 互斥鎖的次數 // 若是其它線程得到了寫鎖,直接返回-1 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; // 讀鎖被獲取的次數 int r = sharedCount(c); // 下面說明此時尚未寫鎖,嘗試去更新state的值獲取讀鎖 // 讀者是否須要排隊(是不是公平模式) if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // 獲取讀鎖成功 if (r == 0) { // 若是以前尚未線程獲取讀鎖 // 記錄第一個讀者爲當前線程 firstReader = current; // 第一個讀者重入的次數爲1 firstReaderHoldCount = 1; } else if (firstReader == current) { // 若是有線程獲取了讀鎖且是當前線程是第一個讀者 // 則把其重入次數加1 firstReaderHoldCount++; } else { // 若是有線程獲取了讀鎖且當前線程不是第一個讀者 // 則從緩存中獲取重入次數保存器 HoldCounter rh = cachedHoldCounter; // 若是緩存不屬性當前線程 // 再從ThreadLocal中獲取 // readHolds自己是一個ThreadLocal,裏面存儲的是HoldCounter if (rh == null || rh.tid != getThreadId(current)) // get()的時候會初始化rh cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) // 若是rh的次數爲0,把它放到ThreadLocal中去 readHolds.set(rh); // 重入的次數加1(初始次數爲0) rh.count++; } // 獲取讀鎖成功,返回1 return 1; } // 經過這個方法再去嘗試獲取讀鎖(若是以前其它線程獲取了寫鎖,同樣返回-1表示失敗) return fullTryAcquireShared(current); } // AbstractQueuedSynchronizer.doAcquireShared() private void doAcquireShared(int arg) { // 進入AQS的隊列中 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { // 當前節點的前一個節點 final Node p = node.predecessor(); // 若是前一個節點是頭節點(說明是第一個排隊的節點) if (p == head) { // 再次嘗試獲取讀鎖 int r = tryAcquireShared(arg); // 若是成功了 if (r >= 0) { // 頭節點後移並傳播 // 傳播即喚醒後面連續的讀節點 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // 沒獲取到讀鎖,阻塞並等待被喚醒 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } // AbstractQueuedSynchronizer.setHeadAndPropagate() private void setHeadAndPropagate(Node node, int propagate) { // h爲舊的頭節點 Node h = head; // 設置當前節點爲新頭節點 setHead(node); // 若是舊的頭節點或新的頭節點爲空或者其等待狀態小於0(表示狀態爲SIGNAL/PROPAGATE) if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { // 須要傳播 // 取下一個節點 Node s = node.next; // 若是下一個節點爲空,或者是須要獲取讀鎖的節點 if (s == null || s.isShared()) // 喚醒下一個節點 doReleaseShared(); } } // AbstractQueuedSynchronizer.doReleaseShared() // 這個方法只會喚醒一個節點 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // 若是頭節點狀態爲SIGNAL,說明要喚醒下一個節點 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 喚醒下一個節點 unparkSuccessor(h); } else if (ws == 0 && // 把頭節點的狀態改成PROPAGATE成功纔會跳到下面的if !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 若是喚醒後head沒變,則跳出循環 if (h == head) // loop if head changed break; } }
看完【死磕 java同步系列之ReentrantLock源碼解析(一)——公平鎖、非公平鎖】的分析再看這章的內容應該會比較簡單,中間同樣的方法咱們這裏直接跳過了。
咱們來看看大體的邏輯:
(1)先嚐試獲取讀鎖;
(2)若是成功了直接結束;
(3)若是失敗了,進入doAcquireShared()方法;
(4)doAcquireShared()方法中首先會生成一個新節點並進入AQS隊列中;
(5)若是頭節點正好是當前節點的上一個節點,再次嘗試獲取鎖;
(6)若是成功了,則設置頭節點爲新節點,並傳播;
(7)傳播即喚醒下一個讀節點(若是下一個節點是讀節點的話);
(8)若是頭節點不是當前節點的上一個節點或者(5)失敗,則阻塞當前線程等待被喚醒;
(9)喚醒以後繼續走(5)的邏輯;
在整個邏輯中是在哪裏連續喚醒讀節點的呢?
答案是在doAcquireShared()方法中,在這裏一個節點A獲取了讀鎖後,會喚醒下一個讀節點B,這時候B也會獲取讀鎖,而後B繼續喚醒C,依次往復,也就是說這裏的節點是一個喚醒一個這樣的形式,而不是一個節點獲取了讀鎖後一次性喚醒後面全部的讀節點。
// java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock.unlock public void unlock() { sync.releaseShared(1); } // java.util.concurrent.locks.AbstractQueuedSynchronizer.releaseShared public final boolean releaseShared(int arg) { // 若是嘗試釋放成功了,就喚醒下一個節點 if (tryReleaseShared(arg)) { // 這個方法實際是喚醒下一個節點 doReleaseShared(); return true; } return false; } // java.util.concurrent.locks.ReentrantReadWriteLock.Sync.tryReleaseShared protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // 若是第一個讀者(讀線程)是當前線程 // 就把它重入的次數減1 // 若是減到0了就把第一個讀者置爲空 if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { // 若是第一個讀者不是當前線程 // 同樣地,把它重入的次數減1 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { // 共享鎖獲取的次數減1 // 若是減爲0了說明徹底釋放了,才返回true int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; } } // java.util.concurrent.locks.AbstractQueuedSynchronizer.doReleaseShared // 行爲跟方法名有點不符,實際是喚醒下一個節點 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // 若是頭節點狀態爲SIGNAL,說明要喚醒下一個節點 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 喚醒下一個節點 unparkSuccessor(h); } else if (ws == 0 && // 把頭節點的狀態改成PROPAGATE成功纔會跳到下面的if !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 若是喚醒後head沒變,則跳出循環 if (h == head) // loop if head changed break; } }
解鎖的大體流程以下:
(1)將當前線程重入的次數減1;
(2)將共享鎖總共被獲取的次數減1;
(3)若是共享鎖獲取的次數減爲0了,說明共享鎖徹底釋放了,那就喚醒下一個節點;
以下圖,ABC三個節點各獲取了一次共享鎖,三者釋放的順序分別爲ACB,那麼最後B釋放共享鎖的時候tryReleaseShared()纔會返回true,進而纔會喚醒下一個節點D。
// java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock.lock() public void lock() { sync.acquire(1); } // java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire() public final void acquire(int arg) { // 先嚐試獲取鎖 // 若是失敗,則會進入隊列中排隊,後面的邏輯跟ReentrantLock如出一轍了 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } // java.util.concurrent.locks.ReentrantReadWriteLock.Sync.tryAcquire() protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); // 狀態變量state的值 int c = getState(); // 互斥鎖被獲取的次數 int w = exclusiveCount(c); if (c != 0) { // 若是c!=0且w==0,說明共享鎖被獲取的次數不爲0 // 這句話整個的意思就是 // 若是共享鎖被獲取的次數不爲0,或者被其它線程獲取了互斥鎖(寫鎖) // 那麼就返回false,獲取寫鎖失敗 if (w == 0 || current != getExclusiveOwnerThread()) return false; // 溢出檢測 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 到這裏說明當前線程已經獲取過寫鎖,這裏是重入了,直接把state加1便可 setState(c + acquires); // 獲取寫鎖成功 return true; } // 若是c等於0,就嘗試更新state的值(非公平模式writerShouldBlock()返回false) // 若是失敗了,說明獲取寫鎖失敗,返回false // 若是成功了,說明獲取寫鎖成功,把本身設置爲佔有者,並返回true if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; } // 獲取寫鎖失敗了後面的邏輯跟ReentrantLock是一致的,進入隊列排隊,這裏就不列源碼了
寫鎖獲取的過程大體以下:
(1)嘗試獲取鎖;
(2)若是有讀者佔有着讀鎖,嘗試獲取寫鎖失敗;
(3)若是有其它線程佔有着寫鎖,嘗試獲取寫鎖失敗;
(4)若是是當前線程佔有着寫鎖,嘗試獲取寫鎖成功,state值加1;
(5)若是沒有線程佔有着鎖(state==0),當前線程嘗試更新state的值,成功了表示嘗試獲取鎖成功,不然失敗;
(6)嘗試獲取鎖失敗之後,進入隊列排隊,等待被喚醒;
(7)後續邏輯跟ReentrantLock是一致;
// java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock.unlock() public void unlock() { sync.release(1); } //java.util.concurrent.locks.AbstractQueuedSynchronizer.release() public final boolean release(int arg) { // 若是嘗試釋放鎖成功(徹底釋放鎖) // 就嘗試喚醒下一個節點 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } // java.util.concurrent.locks.ReentrantReadWriteLock.Sync.tryRelease() protected final boolean tryRelease(int releases) { // 若是寫鎖不是當前線程佔有着,拋出異常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 狀態變量的值減1 int nextc = getState() - releases; // 是否徹底釋放鎖 boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); // 設置狀態變量的值 setState(nextc); // 若是徹底釋放了寫鎖,返回true return free; }
寫鎖釋放的過程大體爲:
(1)先嚐試釋放鎖,即狀態變量state的值減1;
(2)若是減爲0了,說明徹底釋放了鎖;
(3)徹底釋放了鎖才喚醒下一個等待的節點;
(1)ReentrantReadWriteLock採用讀寫鎖的思想,能提升併發的吞吐量;
(2)讀鎖使用的是共享鎖,多個讀鎖能夠一塊兒獲取鎖,互相不會影響,即讀讀不互斥;
(3)讀寫、寫讀和寫寫是會互斥的,前者佔有着鎖,後者須要進入AQS隊列中排隊;
(4)多個連續的讀線程是一個接着一個被喚醒的,而不是一次性喚醒全部讀線程;
(5)只有多個讀鎖都徹底釋放了纔會喚醒下一個寫線程;
(6)只有寫鎖徹底釋放了纔會喚醒下一個等待者,這個等待者有多是讀線程,也多是寫線程;
(1)若是同一個線程先獲取讀鎖,再獲取寫鎖會怎樣?
分析上圖中的代碼,在tryAcquire()方法中,若是讀鎖被獲取的次數不爲0(c != 0 && w == 0),返回false,返回以後外層方法會讓當前線程阻塞。
能夠經過下面的方法驗證:
readLock.lock(); writeLock.lock(); writeLock.unlock(); readLock.unlock();
運行程序後會發現代碼中止在writeLock.lock();
,固然,你也能夠打個斷點跟蹤進去看看。
(2)若是同一個線程先獲取寫鎖,再獲取讀鎖會怎樣?
分析上面的代碼,在tryAcquireShared()方法中,第一個紅框處並不會返回,由於不知足getExclusiveOwnerThread() != current
;第二個紅框處若是原子更新成功就說明獲取了讀鎖,而後就會執行第三個紅框處的代碼把其重入次數更改成1。
能夠經過下面的方法驗證:
writeLock.lock(); readLock.lock(); readLock.unlock(); writeLock.unlock();
你能夠打個斷點跟蹤一下看看。
(3)死鎖了麼?
經過上面的兩個例子,咱們能夠感覺到同一個線程先讀後寫和先寫後讀是徹底不同的,爲何不同呢?
先讀後寫,一個線程佔有讀鎖後,其它線程仍是能夠佔有讀鎖的,這時候若是在其它線程佔有讀鎖以前讓本身佔有了寫鎖,其它線程又不能佔有讀鎖了,這段程序會很是難實現,邏輯也很奇怪,因此,設計成只要一個線程佔有了讀鎖,其它線程包括它本身都不能再獲取寫鎖。
先寫後讀,一個線程佔有寫鎖後,其它線程是不能佔有任何鎖的,這時候,即便本身佔有一個讀鎖,對程序的邏輯也不會有任何影響,因此,一個線程佔有寫鎖後是能夠再佔有讀鎖的,只是這個時候其它線程依然沒法獲取讀鎖。
若是你仔細思考上面的邏輯,你會發現一個線程先佔有讀鎖後佔有寫鎖,會有一個很大的問題——鎖沒法被釋放也沒法被獲取了。這個線程先佔有了讀鎖,而後本身再佔有寫鎖的時候會阻塞,而後它就本身把本身搞死了,進而把其它線程也搞死了,它沒法釋放鎖,其它線程也沒法得到鎖了。
這是死鎖嗎?彷佛不是,死鎖的定義是線程A佔有着線程B須要的資源,線程B佔有着線程A須要的資源,兩個線程相互等待對方釋放資源,經典的死鎖例子以下:
Object a = new Object(); Object b = new Object(); new Thread(()->{ synchronized (a) { LockSupport.parkNanos(1000000); synchronized (b) { } } }).start(); new Thread(()->{ synchronized (b) { synchronized (a) { } } }).start();
簡單的死鎖用jstack是能夠看到的:
"Thread-1": at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest.lambda$main$1(ReentrantReadWriteLockTest.java:40) - waiting to lock <0x000000076baa9068> (a java.lang.Object) - locked <0x000000076baa9078> (a java.lang.Object) at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest$$Lambda$2/1831932724.run(Unknown Source) at java.lang.Thread.run(Thread.java:748) "Thread-0": at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest.lambda$main$0(ReentrantReadWriteLockTest.java:32) - waiting to lock <0x000000076baa9078> (a java.lang.Object) - locked <0x000000076baa9068> (a java.lang.Object) at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest$$Lambda$1/1096979270.run(Unknown Source) at java.lang.Thread.run(Thread.java:748) Found 1 deadlock.
(4)如何使用ReentrantReadWriteLock實現一個高效安全的TreeMap?
class SafeTreeMap { private final Map<String, Object> m = new TreeMap<String, Object>(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final Lock readLock = lock.readLock(); private final Lock writeLock = lock.writeLock(); public Object get(String key) { readLock.lock(); try { return m.get(key); } finally { readLock.unlock(); } } public Object put(String key, Object value) { writeLock.lock(); try { return m.put(key, value); } finally { writeLock.unlock(); } } }
歡迎關注個人公衆號「彤哥讀源碼」,查看更多源碼系列文章, 與彤哥一塊兒暢遊源碼的海洋。