前言:java
上一篇咱們瞭解了Lock接口與Condition接口。本篇來看看J.U.C中的ReadWriteLock,再次膜拜一下Doug Lea大神的傑做。面試
面試問題
Q :談談ReadWriteLock的好處?數據庫
ReadWriteLock接口是在JDK5提供的,具體的實現類爲ReentrantReadWriteLock,還有一個實現類ReadWriteLockView,是StampedLock的內部類。後邊會有講到。編程
public interface ReadWriteLock { Lock readLock(); Lock writeLock(); }
ReadWriteLock直譯爲讀寫鎖,從接口命名上就能夠看出該工具類用於特定的場景下,前面講過的ReentrantLock和synchronized基本能夠用來解決一切併發問題,但在特定的場景下可能表現的效果不那麼使人滿意,如在讀多寫少的時,大部分線程都在進行讀操做,不多有線程會修改共享數據。但因爲加鎖的特性,致使大量的讀操做進行了沒必要要的鎖競爭,若是能將讀寫的鎖分離,有寫操做的時候,進行讀操做須要加鎖;沒有寫操做的時候,能夠多個線程同時進行讀操做。這樣勢必會提高性能。segmentfault
讀寫鎖即是解決這種場景問題的。讀寫鎖有三個基本原則:緩存
ReadWriteLock readWriteLock=new ReentrantReadWriteLock(); Lock readLock = readWriteLock.readLock() Lock writeLock = readWriteLock.writeLock()
能夠看出不管是讀鎖仍是寫鎖都是Lock接口的實現類,那麼上一篇中提到Lock接口的三種加鎖方式均可以使用。併發
//支持中斷的加鎖 void lockInterruptibly() throws InterruptedException; //支持超時的加鎖 boolean tryLock(long time, TimeUnit unit) throws InterruptedException; //支持非阻塞獲取鎖 boolean tryLock();
Reentrant表明可重入的,ReentrantReadWriteLock支持重入鎖,並且也支持公平鎖和非公平鎖。框架
前面簡單的介紹的讀寫鎖的使用,這裏有一個須要注意的點,就是讀寫鎖的升級和降級。工具
ReentrantReadWriteLock不支持鎖的升級
,可是支持鎖的降級
。鎖降級就是持有寫鎖去申請讀鎖;鎖升級是持有讀鎖去申請寫鎖,若是出現相似鎖升級的代碼,則會致使線程阻塞,且沒法被喚醒。這點須要注意。post
鎖的升級:
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); @Test //沒法進行鎖的升級,從讀變成寫 public void test2() throws InterruptedException { Thread thread = new Thread(() -> { readWriteLock.readLock().lock(); System.out.println("獲取讀鎖"); readWriteLock.writeLock().lock(); System.out.println("獲取寫鎖"); readWriteLock.writeLock().unlock(); System.out.println("釋放寫鎖"); readWriteLock.readLock().unlock(); System.out.println("釋放讀鎖"); }); thread.start(); thread.join(); } /*Output 獲取讀鎖 ----- 發生阻塞----- 必須先釋放讀鎖才能去申請寫鎖,否則會阻塞
鎖的降級:
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); @Test //進行鎖的降級,從寫變成讀 public void test() throws InterruptedException { Thread thread = new Thread(() -> { readWriteLock.writeLock().lock(); System.out.println("獲取寫鎖"); readWriteLock.readLock().lock(); System.out.println("獲取讀鎖"); readWriteLock.readLock().unlock(); System.out.println("釋放寫鎖"); readWriteLock.writeLock().unlock(); System.out.println("釋放讀鎖"); }); thread.start(); thread.join(); } /*Output 獲取寫鎖 獲取讀鎖 釋放寫鎖 釋放讀鎖 */
鎖降級仍是有不少應用場景的。好比有業務須要先查緩存,發現緩存失效須要從新去數據庫查詢數據並修改緩存,完成修改操做後應該儘快釋放寫鎖,減少鎖的粒度。這樣能讓更多的讀線程儘快訪問到修改後的數據。否則業務邏輯半天執行不完,這期間儘管緩存數據是最新的,可是因爲寫鎖未釋放,其餘線程也沒法進行讀操做。極大的下降了併發性。
所以在完成修改緩存後去拿讀鎖,而後釋放寫鎖,這樣既能保證其餘線程讀取到最新的數據,又能保證當前線程的後續操做使用的數據是最新的。
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); final Lock r = readWriteLock.readLock(); final Lock w = readWriteLock.writeLock(); volatile boolean cacheValid; @Test public void processCachedData() { r.lock(); //獲取讀鎖 if (!cacheValid){ //緩存失效 r.unlock(); //先釋放讀鎖,不容許鎖升級 w.lock(); try { if (!cacheValid){ //再次檢查緩存狀態, // cache= ... cacheValid =true; //完成緩存更新 } r.lock(); //釋放寫鎖前,降級爲讀鎖 }finally { w.unlock(); //釋放寫鎖 } } try{ // ... //執行業務邏輯 }finally { r.unlock(); } }
讀鎖不支持條件變量
,若是讀鎖調用newCondition()會拋出UnsupportedOperationException異常;
寫鎖支持條件變量
,鎖與條件變量的搭配使用能夠參考上一篇 Lock & Condition。
ReentrantReadWriteLock內部仍是使用的AQS框架,經過前面的學習咱們知道,在AQS中,經過volatile int state
來表示線程鎖的狀態,ReentrantReadWriteLock有兩把鎖:讀鎖和寫鎖,它們保護的都是同一個資源,如何用一個共享變量來區分寫鎖和讀鎖的狀態呢?答案就是按位拆分。
因爲state是int類型的變量,在內存中佔用4個字節,也就是32位。將其拆分爲兩部分:高16位和低16位,其中高16位用來表示讀鎖狀態,低16位用來表示寫鎖狀態。這樣就能夠用一個int變量來表示兩種鎖的狀態,低16位寫鎖的加鎖和釋放鎖操做不會發生變化,還是state+1/state-1;但高16位的加鎖和釋放鎖就變成了state + (1<<16)/ state-(1<<16)。
一樣獲取讀鎖和寫鎖的狀態也有所不一樣:
獲取讀鎖:state >>> 16
無符號右移16位,空的地方補0。
獲取寫鎖:state & [(1 << 16) - 1]
至關於state & 0x0000FFFF 至關於把高16位置空,只保留低16位。
因爲讀鎖和寫鎖的狀態值都只佔用16位,因此讀鎖和寫鎖各自可重入鎖的最大數量爲2^16-1。
前面說了持有鎖狀態表示的問題,如今來看看其具體的實現。在此以前先了解一下ReentrantReadWriteLock的類結構。
ReentrantReadWriteLock中有兩個內部類,ReadLock和WriteLock,這兩個類在具體實現Lock接口時,分別調用ReentrantReadWriteLock中實現AQS類的同步組件Sync的共享和獨佔兩種加鎖釋放鎖方式來實現各自的功能。
Sync中實現AQS中獨佔鎖加鎖tryAcquire()和獨佔鎖釋放鎖tryRelease(),以及共享鎖的加鎖tryAcquireShared()和共享鎖的釋放鎖tryReleaseShared()。
下面的內容也是圍繞這四個方法展開。
protected final boolean tryAcquire(int acquires) { /* * Walkthrough * 1. 若是讀鎖或者寫鎖其中有一個不爲0,並且鎖的持有者不是線程, * 嘗試獲取鎖失敗,該方法返回false。 * 2. 若是加鎖的數量滿了,返回false。 * 3. 另外,重複獲取或者入口等待隊列容許的線程纔有資格加鎖, * 修改state和鎖的持有者 */ Thread current = Thread.currentThread(); //返回state,高16表明讀鎖的數量,低16表明寫鎖的數量。 int c = getState(); //返回獨佔鎖(也就是寫鎖)的數量 int w = exclusiveCount(c); //只有讀鎖和寫鎖有一個數量不爲0,state就不爲0,若是state爲0, //那麼沒有一個線程當前在使用讀寫鎖,能夠直接讓當前線程去拿寫鎖 if (c != 0) { /* 1.寫鎖數量爲0,且獨佔鎖不是被當前線程佔用,那麼必定是讀鎖的數量不爲0, 說明讀鎖在使用,嘗試獲取鎖失敗,對應前邊提到讀寫鎖的基本原則: 若是一個線程在讀,一樣也禁止寫共享變量。 2.寫鎖不爲0,可是獨佔鎖的持有線程不是當前線程,說明其餘線程在使用寫鎖 獨佔鎖是互斥的,因此也會返回false。 */
if (w == 0 || current != getExclusiveOwnerThread()) return false; //判斷一下重入鎖的次數會不會超過2^16-1 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); /* 這裏設置state時沒有使用CAS,緣由很簡單,能走到這裏的必然是 已經持有獨佔鎖的線程來重入鎖,其餘狀況沒法經過前邊的狀態判斷。 */ setState(c + acquires); return true; } /*state=0才能走到此處,writerShouldBlock()是用來判斷是否須要排隊 非公平鎖: 直接返回false,而後使用CAS來修改state,若是修改爲功則說明拿到鎖了 那麼能夠繼續將獨佔鎖的持有線程設置爲當前線程。 公平鎖: 公平鎖會調用hasQueuedPredecessors(),這個方法是判斷入口等待隊列 中是否有線程在等待鎖。若是沒有線程等待或者等待隊列中排在最前邊的是當前 線程,那麼能夠繼續進行後邊的CAS操做,不然返回false。 */ if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; //返回false的後續操做在下邊 setExclusiveOwnerThread(current); return true; }
下面附上tryAcquire()的流程圖:
tryAcquire返回false的後續操做。
public final void acquire(int arg) { //若是tryAcquire()返回false,會繼續往下執行。 if (!tryAcquire(arg) && //這行代碼是在AQS中實現的,具體步驟爲把當前線程封裝爲一個獨佔鎖節點, //並加入到等待隊列中,而後將當前線程阻塞 //在阻塞前還會當前線程還會再嘗試一次,是否能獲取到鎖, //此次嘗試時,當前線程已經在等待隊列中了,這個是acquireQueued()的內容。 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
protected final boolean tryRelease(int releases) {//realeases=1 //判斷當前線程是否持有鎖 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //realease是此次調用要釋放持有的鎖的個數,好比以前重入了3次, //getState()返回3,releases=1,則本次釋放1次,後邊還須要再釋放2次 //只有state爲0時纔算完全釋放鎖,獨佔鎖的全部線程纔會置爲null int nextc = getState() - releases; //檢查是不是完全釋放鎖 boolean free = exclusiveCount(nextc) == 0; //是完全釋放鎖 if (free) //獨佔鎖的全部線程纔會置爲null setExclusiveOwnerThread(null); //一樣沒用CAS,由於 if (!isHeldExclusively()),只有當前線程能夠經過 setState(nextc); return free; }
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); //若是不是當前線程佔用寫鎖則會返回-1,表示共享鎖加鎖失敗 //若是是當前線程佔用寫鎖,再申請讀鎖,則是被容許的,這是鎖降級的過程。 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //獲取讀鎖的數量 int r = sharedCount(c); //這裏又是公平鎖和非公平的一個區別。 if (!readerShouldBlock() && //判斷是否須要阻塞 r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { //讀鎖+1 //r=0,表明讀鎖沒被佔用,下邊的操做對應寫(獨佔)鎖的話則是 //設置寫鎖的持有者爲當前線程,可是讀鎖是共享的,因此不能這樣設置 if (r == 0) { /* 共享鎖會爲每一個獲取ReadLock的線程建立一個HoldCounter來記錄該線程 的線程ID和獲取ReadLock的次數(包括重入)。並將這個HoldCounter對象 保存在線程本身的ThreadLocal中。 ThreadLocalHoldCounter readHolds; HoldCounter cachedHoldCounter; */
/* static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } static final class HoldCounter { int count = 0; // Use id, not reference, to avoid garbage retention final long tid = getThreadId(Thread.currentThread()); } 設計者考慮到有些場景只有一個線程獲取讀鎖,那麼使用ThreadLocal 反而會下降性能,因此在ReentrantReadWriteLock中定義了: private transient Thread firstReader = null; private transient int firstReaderHoldCount; 來提供只有一個線程獲取讀鎖的性能保障。 */ firstReader = current; firstReaderHoldCount = 1;
} else if (firstReader == current) { //當前線程重入讀鎖 firstReaderHoldCount++; } else { //多個線程來申請讀鎖時會到這一步,默認會從cachedHoldCounter中拿 HoldCounter rh = cachedHoldCounter; //若是rh.tid == getThreadId(current),說明這個線程連續兩次來拿讀鎖 //若是不等的話,則說明緩存失效了,須要從新從ThreadLocal取出HoldCounter //順便修改緩存 if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); //上一次拿讀鎖的是別的線程,這個線程是第一次來拿讀鎖 else if (rh.count == 0) readHolds.set(rh); rh.count++; } //獲取讀鎖成功 return 1; } //在readerShouldBlock()返回true時,或者CAS修改失敗時走到這裏 //在這個方法中會用自旋的方式一直獲取讀鎖,中途寫鎖被其餘線程持有會返回-1 return fullTryAcquireShared(current); }
下面附上tryAcquireShared()的流程圖:
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); /* 前邊介紹了firstReader、firstReaderHoldCount 和cachedHoldCounter、readHolds,這裏就很容易理解了 減小線程持有讀鎖個數,若是完全釋放讀鎖,那麼還會將 firstReader或readHolds置空。 */ if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else {
HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } //這裏是自旋CAS釋放讀鎖的操做, 由於可能其餘的線程此刻也在進行 release操做 for (;;) { int c = getState(); //讀鎖是高16位,因此這裏經過c-(1<<16)來釋放讀鎖 int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) /* 這裏是判斷讀鎖有沒有完全被釋放,若是徹底釋放,後邊則會以傳播的方式 喚醒後繼節點中共享類型的節點,成功獲取讀鎖時也會執行喚醒後續共享類型節點 */ return nextc == 0; } }
前面提到的四個方法,是Doug Lea大神留給咱們的主要的發揮空間,AQS中其餘核心方法都沒法重寫。若是須要實現自定義的同步組件,那麼對這四個方法必然要深刻理解,而後根據組件特性來實現獨佔鎖或共享鎖。如ReentrantLock是獨佔鎖,因此其內部只實現了tryAcquire()和tryRelease()。所以本篇着重介紹了ReentrantReadWriteLock實現AQS的具體細節,沒有從AQS框架總體上展開,有些地方可能不太好理解,還但願你們多多諒解。
在ReentrantReadWriteLock中實現了獨佔鎖和共享鎖兩種方式,讀鎖是共享的,能夠在沒有寫鎖的時候被多個線程同時持有,併發地讀數據;寫鎖是獨佔的,每次只能被一個線程持有,其餘線程要想修改共享數據,則須要排隊等待。得到了讀鎖的線程可以看到前一個釋放的寫鎖所更新的內容。
理論上,讀寫鎖比互斥鎖容許對於共享數據更大程度的併發。與互斥鎖相比,讀寫鎖是否可以提升性能取決於讀寫數據的頻率、讀取和寫入操做的持續時間以及讀線程和寫線程之間的競爭。
ReentrantReadWriteLock不支持鎖的升級
,可是支持鎖的降級
。
ReentrantReadWriteLock的讀鎖不支持條件變量
,但寫鎖支持條件變量
。
《Java 併發編程實戰》
《Java 編程思想(第4版)》
https://juejin.im/post/5dc229...
http://www.tianxiaobo.com
感謝閱讀!