做者:碼哥字節 公衆號:碼哥字節如需轉載請聯繫我(微信ID):MageByte1024java
在JDK 1.8 引入 StampedLock
,能夠理解爲對 ReentrantReadWriteLock
在某些方面的加強,在原先讀寫鎖的基礎上新增了一種叫樂觀讀(Optimistic Reading)的模式。該模式並不會加鎖,因此不會阻塞線程,會有更高的吞吐量和更高的性能。mysql
它的設計初衷是做爲一個內部工具類,用於開發其餘線程安全的組件,提高系統性能,而且編程模型也比ReentrantReadWriteLock
複雜,因此用很差就很容易出現死鎖或者線程安全等莫名其妙的問題。sql
跟着「碼哥字節」帶着問題一塊兒學習StampedLock
給咱們帶來了什麼…數據庫
ReentrantReadWriteLock
,爲什麼還要引入StampedLock
?StampedLock
如何解決寫線程難以獲取鎖的線程「飢餓」問題?三種訪問數據模式:編程
writeLock
方法會使線程阻塞等待獨佔訪問,可類比ReentrantReadWriteLock
的寫鎖模式,同一時刻有且只有一個寫線程獲取鎖資源;readLock
方法,容許多個線程同時獲取悲觀讀鎖,悲觀讀鎖與獨佔寫鎖互斥,與樂觀讀共享。tryOptimisticRead
纔會返回非 0 的郵戳(Stamp),若是在獲取樂觀讀以後沒有出現寫模式線程獲取鎖,則在方法validate
返回 true ,容許多個線程獲取樂觀讀以及讀鎖。同時容許一個寫線程獲取寫鎖。支持讀寫鎖相互轉換segmentfault
ReentrantReadWriteLock
當線程獲取寫鎖後能夠降級成讀鎖,可是反過來則不行。緩存
StampedLock
提供了讀鎖和寫鎖相互轉換的功能,使得該類支持更多的應用場景。安全
注意事項微信
StampedLock
是不可重入鎖,若是當前線程已經獲取了寫鎖,再次重複獲取的話就會死鎖;Conditon
條件將線程等待;StampedLock
裏的寫鎖和悲觀讀鎖加鎖成功以後,都會返回一個 stamp;而後解鎖的時候,須要傳入這個 stamp。那爲什麼 StampedLock
性能比 ReentrantReadWriteLock
好?多線程
關鍵在於StampedLock
提供的樂觀讀,咱們知道ReentrantReadWriteLock
支持多個線程同時獲取讀鎖,可是當多個線程同時讀的時候,全部的寫線程都是阻塞的。
StampedLock
的樂觀讀容許一個寫線程獲取寫鎖,因此不會致使全部寫線程阻塞,也就是當讀多寫少的時候,寫線程有機會獲取寫鎖,減小了線程飢餓的問題,吞吐量大大提升。
這裏可能你就會有疑問,居然同時容許多個樂觀讀和一個先線程同時進入臨界資源操做,那讀取的數據多是錯的怎麼辦?
是的,樂觀讀不能保證讀取到的數據是最新的,因此將數據讀取到局部變量的時候須要經過 lock.validate(stamp)
椒鹽蝦是否被寫線程修改過,如果修改過則須要上悲觀讀鎖,再從新讀取數據到局部變量。
同時因爲樂觀讀並非鎖,因此沒有線程喚醒與阻塞致使的上下文切換,性能更好。
其實跟數據庫的「樂觀鎖」有殊途同歸之妙,它的實現思想很簡單。咱們舉個數據庫的例子。
在生產訂單的表 product_doc 裏增長了一個數值型版本號字段 version,每次更新 product_doc 這個表的時候,都將 version 字段加 1。
select id,... ,version from product_doc where id = 123
在更新的時候匹配 version 才執行更新。
update product_doc set version = version + 1,... where id = 123 and version = 5
數據庫的樂觀鎖就是查詢的時候將 version 查出來,更新的時候利用 version 字段驗證,如果相等說明數據沒有被修改,讀取的數據是安全的。
這裏的 version 就相似於 StampedLock
的 Stamp。
模仿寫一個將用戶id與用戶名數據保存在 共享變量 idMap 中,而且提供 put 方法添加數據、get 方法獲取數據、以及 putIfNotExist 先從 map 中獲取數據,若沒有則模擬從數據庫查詢數據並放到 map 中。
public class CacheStampedLock { /** * 共享變量數據 */ private final Map<Integer, String> idMap = new HashMap<>(); private final StampedLock lock = new StampedLock(); /** * 添加數據,獨佔模式 */ public void put(Integer key, String value) { long stamp = lock.writeLock(); try { idMap.put(key, value); } finally { lock.unlockWrite(stamp); } } /** * 讀取數據,只讀方法 */ public String get(Integer key) { // 1. 嘗試經過樂觀讀模式讀取數據,非阻塞 long stamp = lock.tryOptimisticRead(); // 2. 讀取數據到當前線程棧 String currentValue = idMap.get(key); // 3. 校驗是否被其餘線程修改過,true 表示未修改,不然須要加悲觀讀鎖 if (!lock.validate(stamp)) { // 4. 上悲觀讀鎖,並從新讀取數據到當前線程局部變量 stamp = lock.readLock(); try { currentValue = idMap.get(key); } finally { lock.unlockRead(stamp); } } // 5. 若校驗經過,則直接返回數據 return currentValue; } /** * 若是數據不存在則從數據庫讀取添加到 map 中,鎖升級運用 * @param key * @param value 能夠理解成從數據庫讀取的數據,假設不會爲 null * @return */ public String putIfNotExist(Integer key, String value) { // 獲取讀鎖,也能夠直接調用 get 方法使用樂觀讀 long stamp = lock.readLock(); String currentValue = idMap.get(key); // 緩存爲空則嘗試上寫鎖從數據庫讀取數據並寫入緩存 try { while (Objects.isNull(currentValue)) { // 嘗試升級寫鎖 long wl = lock.tryConvertToWriteLock(stamp); // 不爲 0 升級寫鎖成功 if (wl != 0L) { // 模擬從數據庫讀取數據, 寫入緩存中 stamp = wl; currentValue = value; idMap.put(key, currentValue); break; } else { // 升級失敗,釋放以前加的讀鎖並上寫鎖,經過循環再試 lock.unlockRead(stamp); stamp = lock.writeLock(); } } } finally { // 釋放最後加的鎖 lock.unlock(stamp); } return currentValue; } }
上面的使用例子中,須要引發注意的是 get()
和 putIfNotExist()
方法,第一個使用了樂觀讀,使得讀寫能夠併發執行,第二個則是使用了讀鎖轉換成寫鎖的編程模型,先查詢緩存,當不存在的時候從數據庫讀取數據並添加到緩存中。
在使用樂觀讀的時候必定要按照固定模板編寫,不然很容易出 bug,咱們總結下樂觀讀編程模型的模板:
public void optimisticRead() { // 1. 非阻塞樂觀讀模式獲取版本信息 long stamp = lock.tryOptimisticRead(); // 2. 拷貝共享數據到線程本地棧中 copyVaraibale2ThreadMemory(); // 3. 校驗樂觀讀模式讀取的數據是否被修改過 if (!lock.validate(stamp)) { // 3.1 校驗未經過,上讀鎖 stamp = lock.readLock(); try { // 3.2 拷貝共享變量數據到局部變量 copyVaraibale2ThreadMemory(); } finally { // 釋放讀鎖 lock.unlockRead(stamp); } } // 3.3 校驗經過,使用線程本地棧的數據進行邏輯操做 useThreadMemoryVarables(); }
對於讀多寫少的高併發場景 StampedLock
的性能很好,經過樂觀讀模式很好的解決了寫線程「飢餓」的問題,咱們可使用StampedLock
來代替ReentrantReadWriteLock
,可是須要注意的是 StampedLock 的功能僅僅是 ReadWriteLock 的子集,在使用的時候,仍是有幾個地方須要注意一下。
StampedLock
是不可重入鎖,使用過程當中必定要注意;Conditon
,當須要這個特性的時候須要注意;
咱們發現它並不像其餘鎖同樣經過定義內部類繼承 AbstractQueuedSynchronizer
抽象類而後子類實現模板方法實現同步邏輯。可是實現思路仍是有相似,依然使用了 CLH 隊列來管理線程,經過同步狀態值 state 來標識鎖的狀態。
其內部定義了不少變量,這些變量的目的仍是跟 ReentrantReadWriteLock
同樣,將狀態爲按位切分,經過位運算對 state 變量操做用來區分同步狀態。
好比寫鎖使用的是第八位爲 1 則表示寫鎖,讀鎖使用 0-7 位,因此通常狀況下獲取讀鎖的線程數量爲 1-126,超過之後,會使用 readerOverflow int 變量保存超出的線程數。
自旋優化
對多核 CPU 也進行必定優化,NCPU 獲取核數,當核數目超過 1 的時候,線程獲取鎖的重試、入隊錢的重試都有自旋操做。主要就是經過內部定義的一些變量來判斷,如圖所示。
隊列的節點經過 WNode 定義,如上圖所示。等待隊列的節點相比 AQS 更簡單,只有三種狀態分別是:
另外還有一個字段 cowait ,經過該字段指向一個棧,保存讀線程。結構如圖所示
同時定義了兩個變量分別指向頭結點與尾節點。
/** Head of CLH queue */ private transient volatile WNode whead; /** Tail (last) of CLH queue */ private transient volatile WNode wtail;
另外有一個須要注意點就是 cowait, 保存全部的讀節點數據,使用的是頭插法。
當讀寫線程競爭造成等待隊列的數據以下圖所示:
public long writeLock() { long s, next; // bypass acquireWrite in fully unlocked case only return ((((s = state) & ABITS) == 0L && U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ? next : acquireWrite(false, 0L)); }
獲取寫鎖,若是獲取失敗則構建節點放入隊列,同時阻塞線程,須要注意的時候該方法不響應中斷,如需中斷須要調用 writeLockInterruptibly()
。不然會形成高 CPU 佔用的問題。
(s = state) & ABITS
標識讀鎖和寫鎖未被使用,那麼久直接執行 U.compareAndSwapLong(this, STATE, s, next = s + WBIT))
CAS 操做將第八位設置 1,標識寫鎖佔用成功。CAS失敗的話則調用 acquireWrite(false, 0L)
加入等待隊列,同時將線程阻塞。
另外acquireWrite(false, 0L)
方法很複雜,運用大量自旋操做,好比自旋入隊列。
public long readLock() { long s = state, next; // bypass acquireRead on common uncontended case return ((whead == wtail && (s & ABITS) < RFULL && U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ? next : acquireRead(false, 0L)); }
獲取讀鎖關鍵步驟
(whead == wtail && (s & ABITS) < RFULL
若是隊列爲空而且讀鎖線程數未超過限制,則經過 U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
CAS 方式修改 state 標識獲取讀鎖成功。
不然調用 acquireRead(false, 0L)
嘗試使用自旋獲取讀鎖,獲取不到則進入等待隊列。
acquireRead
當 A 線程獲取了寫鎖,B 線程去獲取讀鎖的時候,調用 acquireRead 方法,則會加入阻塞隊列,並阻塞 B 線程。方法內部依然很複雜,大體流程梳理後以下:
不管是 unlockRead
釋放讀鎖仍是 unlockWrite
釋放寫鎖,整體流程基本都是經過 CAS 操做,修改 state 成功後調用 release 方法喚醒等待隊列的頭結點的後繼節點線程。
釋放讀鎖
unlockRead(long stamp)
若是傳入的 stamp 與鎖持有的 stamp 一致,則釋放非排它鎖,內部主要是經過自旋 + CAS 修改 state 成功,在修改 state 以前作了判斷是否超過讀線程數限制,如果小於限制才經過CAS 修改 state 同步狀態,接着調用 release 方法喚醒 whead 的後繼節點。
釋放寫鎖
unlockWrite(long stamp)
若是傳入的 stamp 與鎖持有的 stamp 一致,則釋放寫鎖,whead 不爲空,且當前節點狀態 status != 0 則調用 release 方法喚醒頭結點的後繼節點線程。
StampedLock 並不能徹底代替ReentrantReadWriteLock
,在讀多寫少的場景下由於樂觀讀的模式,容許一個寫線程獲取寫鎖,解決了寫線程飢餓問題,大大提升吞吐量。
在使用樂觀讀的時候須要注意按照編程模型模板方式去編寫,不然很容易形成死鎖或者意想不到的線程安全問題。
它不是可重入鎖,且不支持條件變量 Conditon
。而且線程阻塞在 readLock() 或者 writeLock() 上時,此時調用該阻塞線程的 interrupt() 方法,會致使 CPU 飆升。若是須要中斷線程的場景,必定要注意調用悲觀讀鎖 readLockInterruptibly() 和寫鎖 writeLockInterruptibly()。
另外喚醒線程的規則和 AQS 相似,先喚醒頭結點,不一樣的是 StampedLock
喚醒的節點是讀節點的時候,會喚醒此讀節點的 cowait 鎖指向的棧的全部讀節點,可是喚醒與插入的順序相反。
推薦閱讀
如下幾篇文章閱讀量與讀者反饋都很好,推薦你們閱讀:
參考內容