有了以上兩篇文章的鋪墊,來理解本文要介紹的既有獨佔式,又有共享式獲取同步狀態的 ReadWriteLock
,就很是輕鬆了html
ReadWriteLock
直譯過來爲【讀寫鎖】。現實中,讀多寫少的業務場景是很是廣泛的,好比應用緩存java
一個線程將數據寫入緩存,其餘線程能夠直接讀取緩存中的數據,提升數據查詢效率
以前提到的互斥鎖都是排他鎖,也就是說同一時刻只容許一個線程進行訪問,當面對可共享讀的業務場景,互斥鎖顯然是比較低效的一種處理方式。爲了提升效率,讀寫鎖模型就誕生了編程
效率提高是一方面,但併發編程更重要的是在保證準確性的前提下提升效率api
一個寫線程改變了緩存中的值,其餘讀線程必定是能夠 「感知」 到的,不然可能致使查詢到的值不許確
因此關於讀寫鎖模型就了下面這 3 條規定:緩存
ReadWriteLock
是一個接口,其內部只有兩個方法:安全
public interface ReadWriteLock { // 返回用於讀的鎖 Lock readLock(); // 返回用於寫的鎖 Lock writeLock(); }
因此要了解整個讀/寫鎖的整個應用過程,須要從它的實現類 ReentrantReadWriteLock
提及多線程
直接對比ReentrantReadWriteLock 與 ReentrantLock的類結構併發
他們又很類似吧,根據類名稱以及類結構,按照我們前序文章的分析,你也就能看出 ReentrantReadWriteLock 的基本特性:oracle
其中黃顏色標記的的 鎖降級 是看不出來的, 這裏先有個印象,下面會單獨說明app
另外,不知道你是否還記得,Java AQS隊列同步器以及ReentrantLock的應用 說過,Lock 和 AQS 同步器是一種組合形式的存在,既然這裏是讀/寫兩種鎖,他們的組合模式也就分紅了兩種:
public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }
這裏只是提醒你們,模式沒有變,不要被讀/寫兩種鎖迷惑
說了這麼多,若是你忘了前序知識,總體理解感受應該是有斷檔的,因此先來看個示例(模擬使用緩存)讓你們對 ReentrantReadWriteLock 有個直觀的使用印象
public class ReentrantReadWriteLockCache { // 定義一個非線程安全的 HashMap 用於緩存對象 static Map<String, Object> map = new HashMap<String, Object>(); // 建立讀寫鎖對象 static ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); // 構建讀鎖 static Lock rl = readWriteLock.readLock(); // 構建寫鎖 static Lock wl = readWriteLock.writeLock(); public static final Object get(String key) { rl.lock(); try{ return map.get(key); }finally { rl.unlock(); } } public static final Object put(String key, Object value){ wl.lock(); try{ return map.put(key, value); }finally { wl.unlock(); } } }
你瞧,使用就是這麼簡單。可是你知道的,AQS 的核心是鎖的實現,即控制同步狀態 state 的值,ReentrantReadWriteLock 也是應用AQS的 state 來控制同步狀態的,那麼問題來了:
一個 int 類型的 state 怎麼既控制讀的同步狀態,又能夠控制寫的同步狀態呢?
顯然須要一點設計了
若是要在一個 int 類型變量上維護多個狀態,那確定就須要拆分了。咱們知道 int 類型數據佔32位,因此咱們就有機會按位切割使用state了。咱們將其切割成兩部分:
因此,要想準確的計算讀/寫各自的狀態值,確定就要應用位運算了,下面代碼是 JDK1.8,ReentrantReadWriteLock 自定義同步器 Sync 的位操做
abstract static class Sync extends AbstractQueuedSynchronizer { static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; static int sharedCount(int c) { return c >>> SHARED_SHIFT; } static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } }
乍一看真是有些複雜的可怕,別慌,我們經過幾道小小數學題就能夠搞定整個位運算過程
整個 ReentrantReadWriteLock 中 讀/寫狀態的計算就是反覆應用這幾道數學題,因此,在閱讀下面內容以前,但願你搞懂這簡單的運算
基礎鋪墊足夠了,咱們進入源碼分析吧
因爲寫鎖是排他的,因此確定是要重寫 AQS 中 tryAcquire
方法
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); // 獲取 state 總體的值 int c = getState(); // 獲取寫狀態的值 int w = exclusiveCount(c); if (c != 0) { // w=0: 根據推理二,總體狀態不等於零,寫狀態等於零,因此,讀狀態大於0,即存在讀鎖 // 或者當前線程不是已獲取寫鎖的線程 // 兩者之一條件成真,則獲取寫狀態失敗 if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 根據推理一第 1 條,更新寫狀態值 setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
上述代碼 第 19 行 writerShouldBlock 也並無什麼神祕的,只不過是公平/非公平獲取鎖方式的判斷(是否有前驅節點來判斷)
你瞧,寫鎖獲取方式就是這麼簡單
因爲讀鎖是共享式的,因此確定是要重寫 AQS 中 tryAcquireShared
方法
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); // 寫狀態不等於0,而且鎖的持有者不是當前線程,根據約定 3,則獲取讀鎖失敗 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; // 獲取讀狀態值 int r = sharedCount(c); // 這個地方有點不同,咱們單獨說明 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } // 若是獲取讀鎖失敗則進入自旋獲取 return fullTryAcquireShared(current); }
readerShouldBlock
和 writerShouldBlock
在公平鎖的實現上都是判斷是否有前驅節點,可是在非公平鎖的實現上,前者是這樣的:
final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive(); } final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && // 等待隊列頭節點的下一個節點 (s = h.next) != null && // 若是是排他式的節點 !s.isShared() && s.thread != null; }
簡單來講,若是請求讀鎖的當前線程發現同步隊列的 head 節點的下一個節點爲排他式節點,那麼就說明有一個線程在等待獲取寫鎖(爭搶寫鎖失敗,被放入到同步隊列中),那麼請求讀鎖的線程就要阻塞,畢竟讀多寫少,若是尚未這點判斷機制,寫鎖可能會發生【飢餓】
上述條件都知足了,也就會進入
tryAcquireShared
代碼的第 14 行到第 25 行,這段代碼主要是爲了記錄線程持有鎖的次數。讀鎖是共享式的,還想記錄每一個線程持有讀鎖的次數,就要用到 ThreadLocal 了,由於這不影響同步狀態 state 的值,因此就不分析了, 只把關係放在這吧
到這裏讀鎖的獲取也就結束了,比寫鎖稍稍複雜那麼一丟丟,接下來就說明一下那個可能讓你迷惑的鎖升級/降級問題吧
我的理解:讀鎖是能夠被多線程共享的,寫鎖是單線程獨佔的,也就是說寫鎖的併發限制比讀鎖高,因此
在真正瞭解讀寫鎖的升級與降級以前,咱們須要完善一下本文開頭 ReentrantReadWriteLock 的例子
public static final Object get(String key) { Object obj = null; rl.lock(); try{ // 獲取緩存中的值 obj = map.get(key); }finally { rl.unlock(); } // 緩存中值不爲空,直接返回 if (obj!= null) { return obj; } // 緩存中值爲空,則經過寫鎖查詢DB,並將其寫入到緩存中 wl.lock(); try{ // 再次嘗試獲取緩存中的值 obj = map.get(key); // 再次獲取緩存中值仍是爲空 if (obj == null) { // 查詢DB obj = getDataFromDB(key); // 僞代碼:getDataFromDB // 將其放入到緩存中 map.put(key, obj); } }finally { wl.unlock(); } return obj; }
有童鞋可能會有疑問
在寫鎖裏面,爲何代碼第19行還要再次獲取緩存中的值呢?不是畫蛇添足嗎?
其實這裏再次嘗試獲取緩存中的值是頗有必要的,由於可能存在多個線程同時執行 get 方法,而且參數 key 也是相同的,執行到代碼第 16 行 wl.lock()
,好比這樣:
線程 A,B,C 同時執行到臨界區 wl.lock(), 只有線程 A 獲取寫鎖成功,線程B,C只能阻塞,直到線程A 釋放寫鎖。這時,當線程B 或者 C 再次進入臨界區時,線程 A 已經將值更新到緩存中了,因此線程B,C不必再查詢一次DB,而是再次嘗試查詢緩存中的值
既然再次獲取緩存頗有必要,我可否在讀鎖裏直接判斷,若是緩存中沒有值,那就再次獲取寫鎖來查詢DB不就能夠了嘛,就像這樣:
public static final Object getLockUpgrade(String key) { Object obj = null; rl.lock(); try{ obj = map.get(key); if (obj == null){ wl.lock(); try{ obj = map.get(key); if (obj == null) { obj = getDataFromDB(key); // 僞代碼:getDataFromDB map.put(key, obj); } }finally { wl.unlock(); } } }finally { rl.unlock(); } return obj; }
這還真是不能夠的,由於獲取一個寫入鎖須要先釋放全部的讀取鎖,若是有兩個讀取鎖試圖獲取寫入鎖,且都不釋放讀取鎖時,就會發生死鎖,因此在這裏,鎖的升級是不被容許的
讀寫鎖的升級是不能夠的,那麼鎖的降級是能夠的嘛?這個是 Oracle 官網關於鎖降級的示例 ,我將代碼粘貼在此處,你們有興趣能夠點進去鏈接看更多內容
class CachedData { Object data; volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() { rwl.readLock().lock(); if (!cacheValid) { // 必須在獲取寫鎖以前釋放讀鎖,由於鎖的升級是不被容許的 rwl.readLock().unlock(); rwl.writeLock().lock(); try { // 再次檢查,緣由多是其餘線程已經更新過緩存 if (!cacheValid) { data = ... cacheValid = true; } //在釋放寫鎖前,降級爲讀鎖 rwl.readLock().lock(); } finally { //釋放寫鎖,此時持有讀鎖 rwl.writeLock().unlock(); } } try { use(data); } finally { rwl.readLock().unlock(); } } }
代碼中聲明瞭一個 volatile 類型的 cacheValid 變量,保證其可見性。
這個過程就是一個完整的鎖降級的過程,目的是保證數據可見性,聽起來頗有道理的樣子,那麼問題來了:
上述代碼爲何在釋放寫鎖以前要獲取讀鎖呢?
若是當前的線程A在修改完cache中的數據後,沒有獲取讀鎖而是直接釋放了寫鎖;假設此時另外一個線程B 獲取了寫鎖並修改了數據,那麼線程A沒法感知到數據已被修改,但線程A還應用了緩存數據,因此就可能出現數據錯誤
若是遵循鎖降級的步驟,線程A 在釋放寫鎖以前獲取讀鎖,那麼線程B在獲取寫鎖時將被阻塞,直到線程A完成數據處理過程,釋放讀鎖,從而保證數據的可見性
那問題又來了:
使用寫鎖必定要降級嗎?
若是你理解了上面的問題,相信這個問題已經有了答案。假如線程A修改完數據以後, 通過耗時操做後想要再使用數據時,但願使用的是本身修改後的數據,而不是其餘線程修改後的數據,這樣的話確實是須要鎖降級;若是隻是但願最後使用數據的時候,拿到的是最新的數據,而不必定是本身剛修改過的數據,那麼先釋放寫鎖,再獲取讀鎖,而後使用數據也無妨
在這裏我要額外說明一下你可能存在的誤解:
相信你到這裏也理解了鎖的升級與降級過程,以及他們被容許或被禁止的緣由了
本文主要說明了 ReentrantReadWriteLock 是如何應用 state 作位拆分實現讀/寫兩種同步狀態的,另外也經過源碼分析了讀/寫鎖獲取同步狀態的過程,最後又瞭解了讀寫鎖的升級/降級機制,相信到這裏你對讀寫鎖已經有了必定的理解。若是你對文中的哪些地方以爲理解有些困難,強烈建議你回看本文開頭的兩篇文章,那裏鋪墊了很是多的內容。接下來咱們就看看在應用AQS的最後一個併發工具類 CountDownLatch 吧
// WriteLock public Condition newCondition() { return sync.newCondition(); } // ReadLock public Condition newCondition() { throw new UnsupportedOperationException(); }
日拱一兵 | 原創