ReentrantReadWriteLock支持讀寫鎖,StampedLock支持寫鎖、悲觀鎖讀和樂觀讀(無鎖)。其中寫鎖、悲觀讀鎖的語義和ReentrantReadWriteLock中的寫鎖、讀鎖語義同樣,都是容許多個線程同時獲取悲觀鎖讀,可是隻容許一個線程獲取寫鎖,寫鎖和悲觀讀鎖是互斥的。java
不一樣的是:StampedLock 裏的寫鎖和悲觀讀鎖加鎖成功以後,都會返回一個 stamp;而後解鎖的時候,須要傳入這個 stamp。node
如下爲官方使用例子緩存
public class Point { private final StampedLock sl = new StampedLock(); private double x, y; void move(double deltaX, double deltaY) { long stamp = sl.writeLock(); try { x += deltaX; y += deltaY; } finally { sl.unlockWrite(stamp); } } double distanceFromOrigin() { long stamp = sl.tryOptimisticRead(); double currentX = x, currentY = y; if (!sl.validate(stamp)) { stamp = sl.readLock(); try { currentX = x; currentY = y; } finally { sl.unlockRead(stamp); } } // 若是處理業務須要保持互斥,那麼就用互斥鎖,若是不須要保持互斥才能夠 // 用讀寫鎖。通常來說緩存是不須要保持互斥性的,能接受瞬間的不一致 return Math.sqrt(currentX * currentX + currentY * currentY); } // StampedLock 支持鎖的降級(經過 tryConvertToReadLock() 方法)和升級(經過tryConvertToWriteLock() 方法) void moveIfAtOrigin(double newX, double newY) { long stamp = sl.readLock(); try { while (x == 0.0 && y == 0.0) { long ws = sl.tryConvertToWriteLock(stamp); if (ws != 0L) { stamp = ws; x = newX; y = newY; break; } else { sl.unlockRead(stamp); stamp = sl.writeLock(); } } } finally { sl.unlock(stamp); } } }
雖然StampedLock沒有使用AQS,不過它的數據結構中仍是用到了CLH隊列。數據結構
WNode是CLH隊列的節點,其源碼以下併發
static final class WNode { // 前驅節點 volatile WNode prev; // 後繼節點 volatile WNode next; // 獲取讀鎖的列表 volatile WNode cowait; // 線程 volatile Thread thread; // 節點狀態。0, WAITING, or CANCELLED volatile int status; // 讀模式或者寫模式。RMODE or WMODE final int mode; WNode(int m, WNode p) { mode = m; prev = p; } }
StampedLock中其中重要屬性以下ui
//CLH隊列頭結點 private transient volatile WNode whead; // CLH隊列尾節點 private transient volatile WNode wtail; // 鎖狀態 private transient volatile long state; // 讀鎖次數的額外計數器 private transient int readerOverflow; // 讀鎖的位數 private static final int LG_READERS = 7; // 計算state值常量 private static final long RUNIT = 1L; // 寫鎖標誌位,十進制:128 二進制:1000 0000 private static final long WBIT = 1L << LG_READERS; // 讀狀態標誌位。 十進制:127 二進制: 0111 1111 private static final long RBITS = WBIT - 1L; // state狀態中記錄讀鎖快滿了的值,126 private static final long RFULL = RBITS - 1L; // 用來獲取讀寫狀態。 十進制:255 二進制:1111 1111 private static final long ABITS = RBITS | WBIT; // -128 (1....1 1000 0000) private static final long SBITS = ~RBITS; // 狀態state的初始值 256 二進制: 00001 0000 0000 private static final long ORIGIN = WBIT << 1; // 鎖的狀態。使用state來控制當前是讀鎖,仍是寫鎖 private transient volatile long state; // 額外的讀鎖計數器 private transient int readerOverflow; // state初始值爲256 public StampedLock() { state = ORIGIN; }
看過我以前分析AQS源碼的應該對這個CLH很熟悉了。一樣的StampedLock也是經過這個stae變量來進行讀寫鎖判斷的。這個state承載了三個部分的內容this
StampedLock中state是long類型,佔64位,它被劃爲了三個部分來使用。低7位做爲讀鎖標誌位,能夠由多個線程共享,每有一個線程申請讀鎖成功,低7位就加1。第8位是寫鎖位,由線程獨佔。
其他位是stamp位,用來記錄寫鎖狀態的變化(版本號),每使用一次寫鎖,stamp位就會加1。spa
同時若是讀鎖數量超過了126以後,超出的次數使用readerOverflow來進行計數。線程
當出現併發的狀況的時候,CLH隊列的排隊狀況是怎樣的呢?code
好比,線程w1獲取了寫鎖,一直未釋放。此時有4個線程分別獲取讀鎖(獲取順序是R0-->R1-->R2-->R3),又有線程W2獲取寫鎖,最後還有R4,R5,R6三個線程獲取讀鎖,那麼此時隊列的排隊狀況以下
由於讀鎖是能夠被多個線程獲取的,若是同一時間有多個線程來獲取讀鎖卻獲取不到時,這個時候第一個獲取讀鎖的線程會被加入到鏈表中,然後面的的讀線程會被加入到cowait棧中,
能夠認爲cowait是一條副鏈。
這裏的cowait能夠理解爲協同等待,表示將這些獲取讀鎖的線程做爲一個總體來獲取鎖。
public long tryOptimisticRead() { long s; return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L; }
樂觀讀鎖邏輯比較簡單,就是判斷寫鎖是否被佔用(判斷state第8位的值是否爲1),若是寫鎖被佔用則返回0,不然返回stamp位。
state初始值爲256(1 0000 0000) 01000 0000(WBIT) & 10000 0000(state) ============== 00000 0000 1 0000 0000(state) 1 1000 0000(SBITS) ============== 1 0000 0000
public boolean validate(long stamp) { // 經過UNSafe插入內存屏障,禁止重排序 U.loadFence(); return (stamp & SBITS) == (state & SBITS); }
返回true: 表示期間沒有寫鎖,不關心讀鎖。
返回false: 表示期間有寫鎖發生
SBITS爲-128,用二進制表示是:1111 1111 1111 1000 0000
x xxxx xxxx(stamp) 1 1000 0000 1 yyyy yyyy(state) 1 1000 0000
SBITS後7位都是0,也就是不關心讀鎖,咱們只關心stamp位和寫位。
當咱們獲取樂觀讀時,若是此時已經有了寫鎖,那麼返回stamp值爲0,此時進行驗證確定爲false。
public long readLock() { long s = state, next; // whead == wtail時,隊列爲空,表示當前沒有線程在排隊 return ((whead == wtail && (s & ABITS) < RFULL && U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ? // acquireRead的第一個參數爲false,標識不處理線程中斷 next : acquireRead(false, 0L)); }
當獲取讀鎖成功時,會將state值加1。當條件不知足時(隊列不爲空,CAS失敗,或者讀鎖的個數已經大於等於126),都會進入到acquireRead()
方法,這個方法主要分爲兩個大的for循環,代碼比較長我就不貼出來了。
它的主要邏輯以下:
private long tryIncReaderOverflow(long s) { // 若是讀鎖的記錄數等於126, if ((s & ABITS) == RFULL) { // 將state的值加1 // CAS以後state的後七位的值是127,state的整個值是383(1 0111 1111)。 if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) { // 將readOverflow的值加1 ++readerOverflow; state = s; return s; } } else if ((LockSupport.nextSecondarySeed() & OVERFLOW_YIELD_RATE) == 0) Thread.yield(); return 0L; }
head --> node <-- tail
這樣的節點關係
若是還未等到鎖釋放,就阻塞當前線程,等待被喚醒,直到得到鎖或者超時中斷被取消。
第1-4步屬於一個大的循環,第5步驟屬於另一個大的循環。
同時當獲取讀鎖線程被喚醒獲取到鎖後,它同時也會喚醒掛在它身上的cowait棧中的線程。
從分析中能夠看到StampedLock中經過大量的自旋操做能夠必定程度避免線程阻塞,只要線程執行操做夠快,釋放鎖比較及時,能夠說幾乎不會存在阻塞。
釋放讀鎖的代碼比較簡單,主要操做以下
IllegalMonitorStateException
異常public void unlockRead(long stamp) { long s, m; WNode h; // 自旋 for (;;) { // 判斷stamp是否合法 if (((s = state) & SBITS) != (stamp & SBITS) || (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT) throw new IllegalMonitorStateException(); // 讀鎖個數小於126 if (m < RFULL) { if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) { // 釋放最後一個讀鎖時,須要喚醒下一個節點 if (m == RUNIT && (h = whead) != null && h.status != 0) release(h); break; } } // 讀鎖個數飽和溢出,須要減小readerOverflow else if (tryDecReaderOverflow(s) != 0L) break; } }
public long writeLock() { long s, next; return ((((s = state) & ABITS) == 0L && U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ? next : acquireWrite(false, 0L)); }
若是state中的第8位等於0而且CAS設置state值成功,則獲取鎖成功,不然進入到acquireWrite
方法。
acquireWrite的邏輯也分爲兩部分,分別是兩個for循環。
第一個for循環邏輯以下
for (int spins = -1;;) { // spin while enqueuing long m, s, ns; // 若是當前寫鎖標誌位仍然爲0,即沒有其餘線程獲取到寫鎖 if ((m = (s = state) & ABITS) == 0L) { // CAS將state的值加128,實質是將寫鎖狀態位加1 if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) return ns; } // 其餘線程獲取到了寫鎖,則設定自旋次數爲64次 else if (spins < 0) spins = (m == WBIT && wtail == whead) ? SPINS : 0; else if (spins > 0) { // 隨機減小自旋次數 if (LockSupport.nextSecondarySeed() >= 0) --spins; } // 自旋仍未獲取到寫鎖則執行下面的邏輯 // 初始化CLH隊列(主要是whead,wtail節點) // p指向了wtail節點 else if ((p = wtail) == null) { WNode hd = new WNode(WMODE, null); if (U.compareAndSwapObject(this, WHEAD, null, hd)) wtail = hd; } // 初始化節點 else if (node == null) node = new WNode(WMODE, p); // 最終造成 whead --> node <-- wtail else if (node.prev != p) node.prev = p; else if (U.compareAndSwapObject(this, WTAIL, p, node)) { p.next = node; // 造成鏈表關係以後自旋結束 break; } }
第二個for循環代碼也比較長,我就不貼出來了。它的核心邏輯以下
public void unlockWrite(long stamp) { WNode h; // 驗證stamp是否合法 if (state != stamp || (stamp & WBIT) == 0L) throw new IllegalMonitorStateException(); // 釋放寫鎖,stamp位(版本號)加1 // stamp+WBIT會將state的第8位置爲0,就至關於釋放了寫鎖 state = (stamp += WBIT) == 0L ? ORIGIN : stamp; // 若是頭結點不爲空,而且狀態不爲0,,則調用release方法喚醒下一個節點 if ((h = whead) != null && h.status != 0) release(h); } private void release(WNode h) { if (h != null) { WNode q; Thread w; U.compareAndSwapInt(h, WSTATUS, WAITING, 0); // 若是頭結點的下一個節點爲空或者其狀態爲已取消 if ((q = h.next) == null || q.status == CANCELLED) { // 從尾結點向前遍歷找到可用節點 for (WNode t = wtail; t != null && t != h; t = t.prev) if (t.status <= 0) q = t; } // 喚醒q節點所在的線程 if (q != null && (w = q.thread) != null) U.unpark(w); } }
釋放寫鎖過程總結以下
tryConvertToReadLock方法能夠用於鎖的降級。不過並非只有再獲取讀鎖時才能調用該方法。
public long tryConvertToReadLock(long stamp) { // a爲鎖標識,m則是最新的鎖標識 long a = stamp & ABITS, m, s, next; WNode h; // state的寫鎖標誌位和版本號一致(有可能寫鎖標誌位是0,便可能是讀鎖) while (((s = state) & SBITS) == (stamp & SBITS)) { // 還未添加任何鎖標識 if ((m = s & ABITS) == 0L) { if (a != 0L) break; // 讀鎖次數小於126 else if (m < RFULL) { // 將state的值加1 if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) return next; } // 讀鎖次數已經超出了,使用額外字段記錄 else if ((next = tryIncReaderOverflow(s)) != 0L) return next; } // 當前是寫鎖狀態 else if (m == WBIT) { if (a != m) break; // 將讀鎖次數加1,寫鎖標誌置爲0,stamp位加1。即加129 state = next = s + (WBIT + RUNIT); // 釋放鎖 if ((h = whead) != null && h.status != 0) release(h); return next; } // 已是讀鎖了直接返回 else if (a != 0L && a < WBIT) return stamp; else break; } // 轉換失敗 return 0L; }
它的主要邏輯以下,若是當前線程還未加任何鎖,則加上寫鎖並返回最新的stamp值。若是當前線程已是寫鎖,則釋放寫鎖,並更新state的值(讀鎖加1,寫鎖狀態爲置爲0,version加1)。
若是當前線程是讀鎖則直接返回stamp值。若是上面條件都不知足,則轉換失敗。
public long tryConvertToWriteLock(long stamp) { long a = stamp & ABITS, m, s, next; // state的寫鎖標誌位和版本號一致(有可能寫鎖標誌位是0,便可能是讀鎖) while (((s = state) & SBITS) == (stamp & SBITS)) { // 還未添加任何鎖標識 if ((m = s & ABITS) == 0L) { if (a != 0L) break; // 將寫鎖狀態位置爲0 if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) return next; } // 若是當前已是寫鎖狀態,則直接返回 else if (m == WBIT) { if (a != m) break; return stamp; } // 若是當前是惟一讀鎖,則轉換爲寫鎖 else if (m == RUNIT && a != 0L) { if (U.compareAndSwapLong(this, STATE, s, next = s - RUNIT + WBIT)) return next; } else break; } return 0L; }
鎖的升級和鎖的降級的邏輯相似,這裏就再也不讀過介紹了。