在上一篇《你真的懂ReentrantReadWriteLock嗎?》中我給你們留了一個引子,一個更高效同時能夠避免寫飢餓的讀寫鎖---StampedLock。StampedLock實現了不只多個讀不互相阻塞,同時在讀操做時不會阻塞寫操做。java
爲何StampedLock這麼神奇?可以達到這種效果,它的核心思想在於,在讀的時候若是發生了寫,應該經過重試的方式來獲取新的值,而不該該阻塞寫操做。這種模式也就是典型的無鎖編程思想,和CAS自旋的思想同樣。這種操做方式決定了StampedLock在讀線程很是多而寫線程很是少的場景下很是適用,同時還避免了寫飢餓狀況的發生。這篇文章將經過如下幾點來分析StampedLock。編程
先來看一個官方給出的StampedLock使用案例:緩存
public class Point { private double x, y; private final StampedLock stampedLock = new StampedLock(); //寫鎖的使用 void move(double deltaX, double deltaY){ long stamp = stampedLock.writeLock(); //獲取寫鎖 try { x += deltaX; y += deltaY; } finally { stampedLock.unlockWrite(stamp); //釋放寫鎖 } } //樂觀讀鎖的使用 double distanceFromOrigin() { long stamp = stampedLock.tryOptimisticRead(); //得到一個樂觀讀鎖 double currentX = x; double currentY = y; if (!stampedLock.validate(stamp)) { //檢查樂觀讀鎖後是否有其餘寫鎖發生,有則返回false stamp = stampedLock.readLock(); //獲取一個悲觀讀鎖 try { currentX = x; } finally { stampedLock.unlockRead(stamp); //釋放悲觀讀鎖 } } return Math.sqrt(currentX*currentX + currentY*currentY); } //悲觀讀鎖以及讀鎖升級寫鎖的使用 void moveIfAtOrigin(double newX,double newY) { long stamp = stampedLock.readLock(); //悲觀讀鎖 try { while (x == 0.0 && y == 0.0) { long ws = stampedLock.tryConvertToWriteLock(stamp); //讀鎖轉換爲寫鎖 if (ws != 0L) { //轉換成功 stamp = ws; //票據更新 x = newX; y = newY; break; } else { stampedLock.unlockRead(stamp); //轉換失敗釋放讀鎖 stamp = stampedLock.writeLock(); //強制獲取寫鎖 } } } finally { stampedLock.unlock(stamp); //釋放全部鎖 } } }
首先看看第一個方法move,能夠看到它和ReentrantReadWriteLock寫鎖的使用基本同樣,都是簡單的獲取釋放,能夠猜想這裏也是一個獨佔鎖的實現。須要注意的是 在獲取寫鎖是會返回個只long類型的stamp,而後在釋放寫鎖時會將stamp傳入進去。這個stamp是作什麼用的呢?若是咱們在中間改變了這個值又會發生什麼呢?這裏先暫時不作解釋,後面分析源碼時會解答這個問題。安全
第二個方法distanceFromOrigin就比較特別了,它調用了tryOptimisticRead,根據名字判斷這是一個樂觀讀鎖。首先什麼是樂觀鎖?樂觀鎖的意思就是先假定在樂觀鎖獲取期間,共享變量不會被改變,既然假定不會被改變,那就不須要上鎖。在獲取樂觀讀鎖以後進行了一些操做,而後又調用了validate方法,這個方法就是用來驗證tryOptimisticRead以後,是否有寫操做執行過,若是有,則獲取一個讀鎖,這裏的讀鎖和ReentrantReadWriteLock中的讀鎖相似,猜想也是個共享鎖。源碼分析
第三個方法moveIfAtOrigin,它作了一個鎖升級的操做,經過調用tryConvertToWriteLock嘗試將讀鎖轉換爲寫鎖,轉換成功後至關於獲取了寫鎖,轉換失敗至關於有寫鎖被佔用,這時經過調用writeLock來獲取寫鎖進行操做。性能
看過了上面的三個方法,估計你們對怎麼使用StampedLock有了一個初步的印象。下面就經過對StampedLock源碼的分析來一步步瞭解它背後是怎麼解決鎖飢餓問題的。測試
從上面的使用示例中咱們看到,在StampedLock中,除了提供了相似ReentrantReadWriteLock讀寫鎖的獲取釋放方法,還提供了一個樂觀讀鎖的獲取方式。那麼這三種方式是如何交互的呢?根據AQS的經驗,StampedLock中應該也是使用了一個狀態量來標誌鎖的狀態。經過下面的源碼能夠證實這點:優化
// 用於操做state後獲取stamp的值 private static final int LG_READERS = 7; private static final long RUNIT = 1L; //0000 0000 0001 private static final long WBIT = 1L << LG_READERS; //0000 1000 0000 private static final long RBITS = WBIT - 1L; //0000 0111 1111 private static final long RFULL = RBITS - 1L; //0000 0111 1110 private static final long ABITS = RBITS | WBIT; //0000 1111 1111 private static final long SBITS = ~RBITS; //1111 1000 0000 //初始化時state的值 private static final long ORIGIN = WBIT << 1; //0001 0000 0000 //鎖共享變量state private transient volatile long state; //讀鎖溢出時用來存儲多出的毒素哦 private transient int readerOverflow;
上面的源碼中除了定義state變量外,還提供了一系列變量用來操做state,用來表示讀鎖和寫鎖的各類狀態。爲了方便理解,我將他們都表示成二進制的值,長度有限,這裏用低12位來表示64的long,高位自動用0補齊。要理解這些狀態的做用,就須要具體分析三種鎖操做方式是怎麼經過state這一個變量來表示的,首先來看看獲取寫鎖和釋放寫鎖。ui
public StampedLock() { state = ORIGIN; //初始化state爲 0001 0000 0000 } public long writeLock() { long s, next; return ((((s = state) & ABITS) == 0L && //沒有讀寫鎖 U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ? //cas操做嘗試獲取寫鎖 next : acquireWrite(false, 0L)); //獲取成功後返回next,失敗則進行後續處理,排隊也在後續處理中 } public void unlockWrite(long stamp) { WNode h; if (state != stamp || (stamp & WBIT) == 0L) //stamp值被修改,或者寫鎖已經被釋放,拋出錯誤 throw new IllegalMonitorStateException(); state = (stamp += WBIT) == 0L ? ORIGIN : stamp; //加0000 1000 0000來記錄寫鎖的變化,同時改變寫鎖狀態 if ((h = whead) != null && h.status != 0) release(h); }
這裏先說明兩點結論:讀鎖經過前7位來表示,每獲取一個讀鎖,則加1。寫鎖經過除前7位後剩下的位來表示,每獲取一次寫鎖,則加1000 0000,這兩點在後面的源碼中均可以得倒證實。
初始化時將state變量設置爲0001 0000 0000。寫鎖獲取經過((s = state) & ABITS)
操做等於0時默認沒有讀鎖和寫鎖。寫鎖獲取分三種狀況:this
沒有讀鎖和寫鎖時,state爲0001 0000 0000
0001 0000 0000 & 0000 1111 1111 = 0000 0000 0000 // 等於0L,能夠嘗試獲取寫鎖
有一個讀鎖時,state爲0001 0000 0001
0001 0000 0001 & 0000 1111 1111 = 0000 0000 0001 // 不等於0L
有一個寫鎖,state爲0001 1000 0000
0001 1000 0000 & 0000 1111 1111 = 0000 1000 0000 // 不等於0L
獲取到寫鎖,須要將s + WBIT設置到state,也就是說每次獲取寫鎖,都須要加0000 1000 0000。同時返回s + WBIT的值
0001 0000 0000 + 0000 1000 0000 = 0001 1000 0000
釋放寫鎖首先判斷stamp的值有沒有被修改過或者屢次釋放,以後經過state = (stamp += WBIT) == 0L ? ORIGIN : stamp
來釋放寫鎖,位操做表示以下:
stamp += WBIT
0010 0000 0000 = 0001 1000 0000 + 0000 1000 0000
這一步操做是重點!!!寫鎖的釋放並非像ReentrantReadWriteLock同樣+1而後-1,而是經過再次加0000 1000 0000來使高位每次都產生變化,爲何要這樣作?直接減掉0000 1000 0000不就能夠了嗎?這就是爲了後面樂觀鎖作鋪墊,讓每次寫鎖都留下痕跡。
你們能夠想象這樣一個場景,字母A變化爲B能看到變化,若是在一段時間內從A變到B而後又變到A,在內存中自會顯示A,而不能記錄變化的過程,這也就是CAS中的ABA問題。在StampedLock中就是經過每次對高位加0000 1000 0000來達到記錄寫鎖操做的過程,能夠經過下面的步驟理解:
第一次獲取寫鎖:
0001 0000 0000 + 0000 1000 0000 = 0001 1000 0000
第一次釋放寫鎖:
0001 1000 0000 + 0000 1000 0000 = 0010 0000 0000
第二次獲取寫鎖:
0010 0000 0000 + 0000 1000 0000 = 0010 1000 0000
第二次釋放寫鎖:
0010 1000 0000 + 0000 1000 0000 = 0011 0000 0000
第n次獲取寫鎖:
1110 0000 0000 + 0000 1000 0000 = 1110 1000 0000
第n次釋放寫鎖:
1110 1000 0000 + 0000 1000 0000 = 1111 0000 0000
能夠看到第8位在獲取和釋放寫鎖時會產生變化,也就是說第8位是用來表示寫鎖狀態的,前7位是用來表示讀鎖狀態的,8位以後是用來表示寫鎖的獲取次數的。這樣就有效的解決了ABA問題,留下了每次寫鎖的記錄,也爲後面樂觀鎖檢查變化提供了基礎。
關於acquireWrite
方法這裏不作具體分析,方法很是複雜,感興趣的同窗能夠網上搜索相關資料。這裏只對該方法作下簡單總結,該方法分兩步來進行線程排隊,首先經過隨機探測的方式屢次自旋嘗試獲取鎖,而後自旋必定次數失敗後再初始化節點進行插入。
public long readLock() { long s = state, next; return ((whead == wtail && (s & ABITS) < RFULL && //隊列爲空,無寫鎖,同時讀鎖未溢出,嘗試獲取讀鎖 U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ? //cas嘗試獲取讀鎖+1 next : acquireRead(false, 0L)); //獲取讀鎖成功,返回s + RUNIT,失敗進入後續處理,相似acquireWrite } public void unlockRead(long stamp) { long s, m; WNode h; for (;;) { if (((s = state) & SBITS) != (stamp & SBITS) || (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT) throw new IllegalMonitorStateException(); if (m < RFULL) { //小於最大記錄值(最大記錄值127超事後放在readerOverflow變量中) if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) { //cas嘗試釋放讀鎖-1 if (m == RUNIT && (h = whead) != null && h.status != 0) release(h); break; } } else if (tryDecReaderOverflow(s) != 0L) //readerOverflow - 1 break; } }
悲觀讀鎖的獲取和ReentrantReadWriteLock相似,不一樣在於StampedLock的讀鎖很容易溢出,最大隻有127,超事後經過一個額外的變量readerOverflow來存儲,這是爲了給寫鎖留下更大的空間,由於寫鎖是在不停增長的。悲觀讀鎖獲取分下面四種狀況:
沒有讀鎖和寫鎖時,state爲0001 0000 0000
// 小於 0000 0111 1110,能夠嘗試獲取讀鎖
0001 0000 0000 & 0000 1111 1111 = 0000 0000 0000
有一個讀鎖時,state爲0001 0000 0001
// 小於 0000 0111 1110,能夠嘗試獲取讀鎖
0001 0000 0001 & 0000 1111 1111 = 0000 0000 0001
有一個寫鎖,state爲0001 1000 0000
// 大於 0000 0111 1110,不能夠獲取讀鎖
0001 1000 0000 & 0000 1111 1111 = 0000 1000 0000
讀鎖溢出,state爲0001 0111 1110
// 等於 0000 0111 1110,不能夠獲取讀鎖
0001 0111 1110 & 0000 1111 1111 = 0000 0111 1110
讀鎖的釋放過程在沒有溢出的狀況下是經過s - RUNIT
操做也就是-1來釋放的,當溢出後則將readerOverflow變量-1。
樂觀讀鎖由於實際上沒有獲取過鎖,因此也就沒有釋放鎖的過程,只是在操做後經過驗證檢查和獲取前的變化。源碼以下:
//嘗試獲取樂觀鎖 public long tryOptimisticRead() { long s; return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L; } //驗證樂觀鎖獲取以後是否有過寫操做 public boolean validate(long stamp) { //該方法以前的全部load操做在內存屏障以前完成,對應的還有storeFence()及fullFence() U.loadFence(); return (stamp & SBITS) == (state & SBITS); //比較是否有過寫操做 }
樂觀鎖基本原理就時獲取鎖時記錄state的寫狀態,而後在操做完成以後檢查寫狀態是否有變化,由於寫狀態每次都會在高位留下記錄,這樣就避免了寫鎖獲取又釋放後得不到準確數據。獲取寫鎖記錄有三種狀況:
沒有讀鎖和寫鎖時,state爲0001 0000 0000
//((s = state) & WBIT) == 0L) true
0001 0000 0000 & 0000 1000 0000 = 0000 0000 0000
//(s & SBITS)
0001 0000 0000 & 1111 1000 0000 = 0001 0000 0000
有一個讀鎖時,state爲0001 0000 0001
//((s = state) & WBIT) == 0L) true
0001 0000 0001 & 0000 1000 0000 = 0000 0000 0000
//(s & SBITS)
0001 0000 0001 & 1111 1000 0000 = 0001 0000 0000
有一個寫鎖,state爲0001 1000 0000
//((s = state) & WBIT) == 0L) false
0001 1000 0000 & 0000 1000 0000 = 0000 1000 0000
//0L
0000 0000 0000
驗證過程當中是否有過寫操做,分四種狀況
寫過一次
0001 0000 0000 & 1111 1000 0000 = 0001 0000 0000
0010 0000 0000 & 1111 1000 0000 = 0010 0000 0000 //false
未寫過,但讀過
0001 0000 0000 & 1111 1000 0000 = 0001 0000 0000
0001 0000 1111 & 1111 1000 0000 = 0001 0000 0000 //true
正在寫
0001 0000 0000 & 1111 1000 0000 = 0001 0000 0000
0001 1000 0000 & 1111 1000 0000 = 0001 1000 0000 //false
以前正在寫,不管是否寫完都不會爲0L
0000 0000 0000 & 1111 1000 0000 = 0000 0000 0000 //false
分析完了StampedLock的實現原理,這裏對StampedLock、ReentrantReadWriteLock以及Synchronized分別在各類場景下進行性能測試,測試的基準代碼採用https://blog.takipi.com/java-8-stampedlocks-vs-readwritelocks-and-synchronized/ 文章中的代碼,首先貼出上述博客中的測試結果,文章中的OPTIMISTIC模式因爲採用了「髒讀」模式,這裏不採用OPTIMISTIC的測試結果,只比較StampedLock、ReentrantReadWriteLock以及Synchronized。
5個讀線程和5個寫線程場景:表現最好的是StampedLock的正常模式以及ReentrantReadWriteLock。
10個讀線程和10個寫線程場景:表現最好的是StampedLock的正常模式以及Synchronized。
16個讀線程和4個寫線程場景:表現最好的是StampedLock的正常模式以及Synchronized。
19個讀線程和1個寫線程場景:表現最好的是Synchronized。
博客評論中還有一種測試場景2000讀線程和1個寫線程,測試結果以下:
StampedLock ... 12814.2 ReentrantReadWriteLock ... 18882.8 Synchronized ... 22696.4
表現最好的是StampedLock。
看完了上面的測試,前面3種場景表現最好的都爲StampedLock,但第4種狀況下StampedLock表現不好,因而我本身對代碼又進行了一遍測試,同時鑑於讀寫鎖的大量應用在緩存場景下,讀寫差距極大,我增長了100個讀和1個寫的場景。
測試機器:MAC OS(10.12.6),CPU : 2.4 GHz Intel Core i5,內存:8G 軟件版本:JDK1.8
測試結果以下:
19個讀線程和1個寫線程場景:表現最好的是StampedLock以及Synchronized。
讀線程: 19. 寫線程: 1. 循環次數: 5. 計算總和: 1000000
100個讀線程和1個寫線程場景:表現最好的是StampedLock以及Synchronized。
讀線程: 100. 寫線程: 1. 循環次數: 5. 計算總和: 100000
經過上述測試,能夠發現總體性能平均而言StampedLock和Synchronized相差不大,StampedLock在讀寫差距加大時稍微有點優點。而ReentrantReadWriteLock性能之差有點出乎意料,基本能夠達到拋棄使用的地步了,不知道你們對ReentrantReadWriteLock的使用場景有什麼建議?
同時鑑於原生的Synchronized後期可優化空間比較大,並且在代碼複雜性以及安全性上面都具備必定優點,所以在絕大多數場景可使用Synchronized來進行同步,對性能有必定要求的在某些特定場景下可使用StampedLock。測試所用代碼在我所引用的博客中均可以找到,你們能夠自行嘗試測試,若是對結果有什麼疑問,歡迎在評論中提出。
參考資料:
https://blog.takipi.com/java-8-stampedlocks-vs-readwritelocks-and-synchronized/