本文首發於一世流雲的專欄: https://segmentfault.com/blog...
StampedLock類,在JDK1.8時引入,是對讀寫鎖ReentrantReadWriteLock的加強,該類提供了一些功能,優化了讀鎖、寫鎖的訪問,同時使讀寫鎖之間能夠互相轉換,更細粒度控制併發。java
首先明確下,該類的設計初衷是做爲一個內部工具類,用於輔助開發其它線程安全組件,用得好,該類能夠提高系統性能,用很差,容易產生死鎖和其它莫名其妙的問題。node
先來看下,爲何有了ReentrantReadWriteLock,還要引入StampedLock?
ReentrantReadWriteLock使得多個讀線程同時持有讀鎖(只要寫鎖未被佔用),而寫鎖是獨佔的。segmentfault
可是,讀寫鎖若是使用不當,很容易產生「飢餓」問題:api
好比在讀線程很是多,寫線程不多的狀況下,很容易致使寫線程「飢餓」,雖然使用「公平」策略能夠必定程度上緩解這個問題,可是「公平」策略是以犧牲系統吞吐量爲代價的。(在ReentrantLock類的介紹章節中,介紹過這種狀況)安全
StampedLock的主要特色歸納一下,有如下幾點:併發
咱們知道,在ReentrantReadWriteLock中,當讀鎖被使用時,若是有線程嘗試獲取寫鎖,該寫線程會阻塞。
可是,在Optimistic reading中,即便讀線程獲取到了讀鎖,寫線程嘗試獲取寫鎖也不會阻塞,這至關於對讀模式的優化,可是可能會致使數據不一致的問題。因此,當使用Optimistic reading獲取到讀鎖時,必須對獲取結果進行校驗。
先來看一個Oracle官方的例子:oracle
class Point { private double x, y; private final StampedLock sl = new StampedLock(); void move(double deltaX, double deltaY) { long stamp = sl.writeLock(); //涉及對共享資源的修改,使用寫鎖-獨佔操做 try { x += deltaX; y += deltaY; } finally { sl.unlockWrite(stamp); } } /** * 使用樂觀讀鎖訪問共享資源 * 注意:樂觀讀鎖在保證數據一致性上須要拷貝一份要操做的變量到方法棧,而且在操做數據時候可能其餘寫線程已經修改了數據, * 而咱們操做的是方法棧裏面的數據,也就是一個快照,因此最多返回的不是最新的數據,可是一致性仍是獲得保障的。 * * @return */ double distanceFromOrigin() { long stamp = sl.tryOptimisticRead(); // 使用樂觀讀鎖 double currentX = x, currentY = y; // 拷貝共享資源到本地方法棧中 if (!sl.validate(stamp)) { // 若是有寫鎖被佔用,可能形成數據不一致,因此要切換到普通讀鎖模式 stamp = sl.readLock(); try { currentX = x; currentY = y; } finally { sl.unlockRead(stamp); } } return Math.sqrt(currentX * currentX + currentY * currentY); } void moveIfAtOrigin(double newX, double newY) { // upgrade // Could instead start with optimistic, not read mode long stamp = sl.readLock(); try { while (x == 0.0 && y == 0.0) { long ws = sl.tryConvertToWriteLock(stamp); //讀鎖轉換爲寫鎖 if (ws != 0L) { stamp = ws; x = newX; y = newY; break; } else { sl.unlockRead(stamp); stamp = sl.writeLock(); } } } finally { sl.unlock(stamp); } } }
能夠看到,上述示例最特殊的實際上是distanceFromOrigin方法,這個方法中使用了「Optimistic reading」樂觀讀鎖,使得讀寫能夠併發執行,可是「Optimistic reading」的使用必須遵循如下模式:框架
long stamp = lock.tryOptimisticRead(); // 非阻塞獲取版本信息 copyVaraibale2ThreadMemory(); // 拷貝變量到線程本地堆棧 if(!lock.validate(stamp)){ // 校驗 long stamp = lock.readLock(); // 獲取讀鎖 try { copyVaraibale2ThreadMemory(); // 拷貝變量到線程本地堆棧 } finally { lock.unlock(stamp); // 釋放悲觀鎖 } } useThreadMemoryVarables(); // 使用線程本地堆棧裏面的數據進行操做
StampedLock雖然不像其它鎖同樣定義了內部類來實現AQS框架,可是StampedLock的基本實現思路仍是利用CLH隊列進行線程的管理,經過同步狀態值來表示鎖的狀態和類型。工具
StampedLock內部定義了不少常量,定義這些常量的根本目的仍是和ReentrantReadWriteLock同樣,對同步狀態值按位切分,以經過位運算對State進行操做:性能
對於StampedLock來講,寫鎖被佔用的標誌是第8位爲1,讀鎖使用0-7位,正常狀況下讀鎖數目爲1-126,超過126時,使用一個名爲
readerOverflow
的int整型保存超出數。
部分常量的比特位表示以下:
另外,StampedLock相比ReentrantReadWriteLock,對多核CPU進行了優化,能夠看到,當CPU核數超過1時,會有一些自旋操做:
假設如今有三個線程:ThreadA、ThreadB、ThreadC、ThreadD。操做以下:
//ThreadA調用writeLock, 獲取寫鎖
//ThreadB調用readLock, 獲取讀鎖
//ThreadC調用readLock, 獲取讀鎖
//ThreadD調用writeLock, 獲取寫鎖
//ThreadE調用readLock, 獲取讀鎖
StampedLock的構造器很簡單,構造時設置下同步狀態值:
另外,StamedLock提供了三類視圖:
這些視圖實際上是對StamedLock方法的封裝,便於習慣了ReentrantReadWriteLock的用戶使用:
例如,ReadLockView其實至關於ReentrantReadWriteLock.readLock()
返回的讀鎖;
來看下writeLock方法:
StampedLock中大量運用了位運算,這裏(s = state) & ABITS == 0L
表示讀鎖和寫鎖都未被使用,這裏寫鎖能夠當即獲取成功,而後CAS操做更新同步狀態值State。
操做完成後,等待隊列的結構以下:
注意:StampedLock中,等待隊列的結點要比AQS中簡單些,僅僅三種狀態。
0:初始狀態
-1:等待中
1:取消
另外,結點的定義中有個cowait
字段,該字段指向一個棧,用於保存讀線程,這個後續會講到。
來看下readLock方法:
因爲ThreadA此時持有寫鎖,因此ThreadB獲取讀鎖失敗,將調用acquireRead方法,加入等待隊列:
acquireRead方法很是複雜,用到了大量自旋操做:
/** * 嘗試自旋的獲取讀鎖, 獲取不到則加入等待隊列, 並阻塞線程 * * @param interruptible true 表示檢測中斷, 若是線程被中斷過, 則最終返回INTERRUPTED * @param deadline 若是非0, 則表示限時獲取 * @return 非0表示獲取成功, INTERRUPTED表示中途被中斷過 */ private long acquireRead(boolean interruptible, long deadline) { WNode node = null, p; // node指向入隊結點, p指向入隊前的隊尾結點 /** * 自旋入隊操做 * 若是寫鎖未被佔用, 則當即嘗試獲取讀鎖, 獲取成功則返回. * 若是寫鎖被佔用, 則將當前讀線程包裝成結點, 並插入等待隊列(若是隊尾是寫結點,直接連接到隊尾;不然,連接到隊尾讀結點的棧中) */ for (int spins = -1; ; ) { WNode h; if ((h = whead) == (p = wtail)) { // 若是隊列爲空或只有頭結點, 則會當即嘗試獲取讀鎖 for (long m, s, ns; ; ) { if ((m = (s = state) & ABITS) < RFULL ? // 判斷寫鎖是否被佔用 U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : //寫鎖未佔用,且讀鎖數量未超限, 則更新同步狀態 (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) //寫鎖未佔用,但讀鎖數量超限, 超出部分放到readerOverflow字段中 return ns; // 獲取成功後, 直接返回 else if (m >= WBIT) { // 寫鎖被佔用,以隨機方式探測是否要退出自旋 if (spins > 0) { if (LockSupport.nextSecondarySeed() >= 0) --spins; } else { if (spins == 0) { WNode nh = whead, np = wtail; if ((nh == h && np == p) || (h = nh) != (p = np)) break; } spins = SPINS; } } } } if (p == null) { // p == null表示隊列爲空, 則初始化隊列(構造頭結點) WNode hd = new WNode(WMODE, null); if (U.compareAndSwapObject(this, WHEAD, null, hd)) wtail = hd; } else if (node == null) { // 將當前線程包裝成讀結點 node = new WNode(RMODE, p); } else if (h == p || p.mode != RMODE) { // 若是隊列只有一個頭結點, 或隊尾結點不是讀結點, 則直接將結點連接到隊尾, 連接完成後退出自旋 if (node.prev != p) node.prev = p; else if (U.compareAndSwapObject(this, WTAIL, p, node)) { p.next = node; break; } } // 隊列不爲空, 且隊尾是讀結點, 則將添加當前結點連接到隊尾結點的cowait鏈中(實際上構成一個棧, p是棧頂指針 ) else if (!U.compareAndSwapObject(p, WCOWAIT, node.cowait = p.cowait, node)) { // CAS操做隊尾結點p的cowait字段,實際上就是頭插法插入結點 node.cowait = null; } else { for (; ; ) { WNode pp, c; Thread w; // 嘗試喚醒頭結點的cowait中的第一個元素, 假如是讀鎖會經過循環釋放cowait鏈 if ((h = whead) != null && (c = h.cowait) != null && U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null) // help release U.unpark(w); if (h == (pp = p.prev) || h == p || pp == null) { long m, s, ns; do { if ((m = (s = state) & ABITS) < RFULL ? U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) return ns; } while (m < WBIT); } if (whead == h && p.prev == pp) { long time; if (pp == null || h == p || p.status > 0) { node = null; // throw away break; } if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, p, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); node.thread = wt; if ((h != pp || (state & ABITS) == WBIT) && whead == h && p.prev == pp) { // 寫鎖被佔用, 且當前結點不是隊首結點, 則阻塞當前線程 U.park(false, time); } node.thread = null; U.putObject(wt, PARKBLOCKER, null); if (interruptible && Thread.interrupted()) return cancelWaiter(node, p, true); } } } } for (int spins = -1; ; ) { WNode h, np, pp; int ps; if ((h = whead) == p) { // 若是當前線程是隊首結點, 則嘗試獲取讀鎖 if (spins < 0) spins = HEAD_SPINS; else if (spins < MAX_HEAD_SPINS) spins <<= 1; for (int k = spins; ; ) { // spin at head long m, s, ns; if ((m = (s = state) & ABITS) < RFULL ? // 判斷寫鎖是否被佔用 U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : //寫鎖未佔用,且讀鎖數量未超限, 則更新同步狀態 (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) { //寫鎖未佔用,但讀鎖數量超限, 超出部分放到readerOverflow字段中 // 獲取讀鎖成功, 釋放cowait鏈中的全部讀結點 WNode c; Thread w; // 釋放頭結點, 當前隊首結點成爲新的頭結點 whead = node; node.prev = null; // 從棧頂開始(node.cowait指向的結點), 依次喚醒全部讀結點, 最終node.cowait==null, node成爲新的頭結點 while ((c = node.cowait) != null) { if (U.compareAndSwapObject(node, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); } return ns; } else if (m >= WBIT && LockSupport.nextSecondarySeed() >= 0 && --k <= 0) break; } } else if (h != null) { // 若是頭結點存在cowait鏈, 則喚醒鏈中全部讀線程 WNode c; Thread w; while ((c = h.cowait) != null) { if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); } } if (whead == h) { if ((np = node.prev) != p) { if (np != null) (p = np).next = node; // stale } else if ((ps = p.status) == 0) // 將前驅結點的等待狀態置爲WAITING, 表示以後將喚醒當前結點 U.compareAndSwapInt(p, WSTATUS, 0, WAITING); else if (ps == CANCELLED) { if ((pp = p.prev) != null) { node.prev = pp; pp.next = node; } } else { // 阻塞當前讀線程 long time; if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) //限時等待超時, 取消等待 return cancelWaiter(node, node, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); node.thread = wt; if (p.status < 0 && (p != h || (state & ABITS) == WBIT) && whead == h && node.prev == p) { // 若是前驅的等待狀態爲WAITING, 且寫鎖被佔用, 則阻塞當前調用線程 U.park(false, time); } node.thread = null; U.putObject(wt, PARKBLOCKER, null); if (interruptible && Thread.interrupted()) return cancelWaiter(node, node, true); } } } }
咱們來分析下這個方法。
該方法會首先自旋的嘗試獲取讀鎖,獲取成功後,就直接返回;不然,會將當前線程包裝成一個讀結點,插入到等待隊列。
因爲,目前等待隊列仍是空,因此ThreadB會初始化隊列,而後將自身包裝成一個讀結點,插入隊尾,而後在下面這個地方跳出自旋:
此時,等待隊列的結構以下:
跳出自旋後,ThreadB會繼續向下執行,進入下一個自旋,在下一個自旋中,依然會再次嘗試獲取讀鎖,若是此次再獲取不到,就會將前驅的等待狀態置爲WAITING, 表示我(當前線程)要去睡了(阻塞),到時記得叫醒我:
最終, ThreadB進入阻塞狀態:
最終,等待隊列的結構以下:
這個過程和ThreadB獲取讀鎖同樣,區別在於ThreadC被包裝成結點加入等待隊列後,是連接到ThreadB結點的棧指針中的。調用完下面這段代碼後,ThreadC會連接到以Thread B爲棧頂指針的棧中:
注意:讀結點的cowait字段其實構成了一個棧,入棧的過程實際上是個「頭插法」插入單鏈表的過程。好比,再來個ThreadX讀結點,則cowait鏈表結構爲:
ThreadB - > ThreadX -> ThreadC
。最終喚醒讀結點時,將從棧頂開始。
而後會在下一次自旋中,阻塞當前讀線程:
最終,等待隊列的結構以下:
能夠看到,此時ThreadC結點並無把它的前驅的等待狀態置爲-1,由於ThreadC是連接到棧中的,當寫鎖釋放的時候,會從棧底元素開始,喚醒棧中全部讀結點。
ThreadD調用writeLock方法獲取寫鎖失敗後(ThreadA依然佔用着寫鎖),會調用acquireWrite方法,該方法總體邏輯和acquireRead差很少,首先自旋的嘗試獲取寫鎖,獲取成功後,就直接返回;不然,會將當前線程包裝成一個寫結點,插入到等待隊列。
acquireWrite源碼:
/** * 嘗試自旋的獲取寫鎖, 獲取不到則阻塞線程 * * @param interruptible true 表示檢測中斷, 若是線程被中斷過, 則最終返回INTERRUPTED * @param deadline 若是非0, 則表示限時獲取 * @return 非0表示獲取成功, INTERRUPTED表示中途被中斷過 */ private long acquireWrite(boolean interruptible, long deadline) { WNode node = null, p; /** * 自旋入隊操做 * 若是沒有任何鎖被佔用, 則當即嘗試獲取寫鎖, 獲取成功則返回. * 若是存在鎖被使用, 則將當前線程包裝成獨佔結點, 並插入等待隊列尾部 */ for (int spins = -1; ; ) { long m, s, ns; if ((m = (s = state) & ABITS) == 0L) { // 沒有任何鎖被佔用 if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) // 嘗試當即獲取寫鎖 return ns; // 獲取成功直接返回 } else if (spins < 0) spins = (m == WBIT && wtail == whead) ? SPINS : 0; else if (spins > 0) { if (LockSupport.nextSecondarySeed() >= 0) --spins; } else if ((p = wtail) == null) { // 隊列爲空, 則初始化隊列, 構造隊列的頭結點 WNode hd = new WNode(WMODE, null); if (U.compareAndSwapObject(this, WHEAD, null, hd)) wtail = hd; } else if (node == null) // 將當前線程包裝成寫結點 node = new WNode(WMODE, p); else if (node.prev != p) node.prev = p; else if (U.compareAndSwapObject(this, WTAIL, p, node)) { // 連接結點至隊尾 p.next = node; break; } } for (int spins = -1; ; ) { WNode h, np, pp; int ps; if ((h = whead) == p) { // 若是當前結點是隊首結點, 則當即嘗試獲取寫鎖 if (spins < 0) spins = HEAD_SPINS; else if (spins < MAX_HEAD_SPINS) spins <<= 1; for (int k = spins; ; ) { // spin at head long s, ns; if (((s = state) & ABITS) == 0L) { // 寫鎖未被佔用 if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) { // CAS修改State: 佔用寫鎖 // 將隊首結點從隊列移除 whead = node; node.prev = null; return ns; } } else if (LockSupport.nextSecondarySeed() >= 0 && --k <= 0) break; } } else if (h != null) { // 喚醒頭結點的棧中的全部讀線程 WNode c; Thread w; while ((c = h.cowait) != null) { if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); } } if (whead == h) { if ((np = node.prev) != p) { if (np != null) (p = np).next = node; // stale } else if ((ps = p.status) == 0) // 將當前結點的前驅置爲WAITING, 表示當前結點會進入阻塞, 前驅未來須要喚醒我 U.compareAndSwapInt(p, WSTATUS, 0, WAITING); else if (ps == CANCELLED) { if ((pp = p.prev) != null) { node.prev = pp; pp.next = node; } } else { // 阻塞當前調用線程 long time; // 0 argument to park means no timeout if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, node, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); node.thread = wt; if (p.status < 0 && (p != h || (state & ABITS) != 0L) && whead == h && node.prev == p) U.park(false, time); // emulate LockSupport.park node.thread = null; U.putObject(wt, PARKBLOCKER, null); if (interruptible && Thread.interrupted()) return cancelWaiter(node, node, true); } } } }
acquireWrite中的下面這個自旋操做,用於將線程包裝成寫結點,插入隊尾:
插入完成後,隊列結構以下:
而後,進入下一個自旋,並在下一個自旋中阻塞ThreadD,最終隊列結構以下:
一樣,因爲寫鎖被ThreadA佔用着,因此最終會調用acquireRead方法,在該方法的第一個自旋中,會將ThreadE加入等待隊列:
注意,因爲隊尾結點是寫結點,因此當前讀結點會直接連接到隊尾;若是隊尾是讀結點,則會連接到隊尾讀結點的cowait鏈中。
而後進入第二個自旋,阻塞ThreadE,最終隊列結構以下:
經過CAS操做,修改State成功後,會調用release方法喚醒等待隊列的隊首結點:
release方法很是簡單,先將頭結點的等待狀態置爲0,表示即將喚醒後繼結點,而後當即喚醒隊首結點:
此時,等待隊列的結構以下:
ThreadB被喚醒後,會從原阻塞處繼續向下執行,而後開始下一次自旋:
第二次自旋時,ThreadB發現寫鎖未被佔用,則成功獲取到讀鎖,而後從棧頂(ThreadB的cowait指針指向的結點)開始喚醒棧中全部線程,
最後返回:
最終,等待隊列的結構以下:
ThreadC被喚醒後,繼續執行,並進入下一次自旋,下一次自旋時,會成功獲取到讀鎖。
注意,此時ThreadB和ThreadC已經拿到了讀鎖,ThreadD(寫線程)和ThreadE(讀線程)依然阻塞中,原來ThreadC對應的結點是個孤立結點,會被GC回收。
最終,等待隊列的結構以下:
ThreadB和ThreadC調用unlockRead方法釋放讀鎖,CAS操做State將讀鎖數量減1:
注意,當讀鎖的數量變爲0時纔會調用release方法,喚醒隊首結點:
隊首結點(ThreadD寫結點被喚醒),最終等待隊列的結構以下:
ThreadD會從原阻塞處繼續向下執行,並在下一次自旋中獲取到寫鎖,而後返回:
最終,等待隊列的結構以下:
ThreadD釋放寫鎖的過程和步驟7徹底相同,會調用unlockWrite喚醒隊首結點(ThreadE)。
ThreadE被喚醒後會從原阻塞處繼續向下執行,但因爲ThreadE是個讀結點,因此同時會喚醒cowait棧中的全部讀結點,過程和步驟8徹底同樣。最終,等待隊列的結構以下:
至此,所有執行完成。
參考Oracle官方文檔:https://docs.oracle.com/javas...
類聲明:
方法聲明:
StampedLock的等待隊列與RRW的CLH隊列相比,有如下特色:
另外,StampedLock使用時要特別當心,避免鎖重入的操做,在使用樂觀讀鎖時也須要遵循相應的調用模板,防止出現數據不一致的問題。