StampedLock是JUC併發包裏面JDK1.8版本新增的一個鎖,該鎖提供了三種模式的讀寫控制,當調用獲取鎖的系列函數的時候,會返回一個long 型的變量,該變量被稱爲戳記(stamp),這個戳記表明了鎖的狀態。java
try系列獲取鎖的函數,當獲取鎖失敗後會返回爲0的stamp值。當調用釋放鎖和轉換鎖的方法時候須要傳入獲取鎖時候返回的stamp值。node
StampedLockd的內部實現是基於CLH鎖的,CLH鎖原理:鎖維護着一個等待線程隊列,全部申請鎖且失敗的線程都記錄在隊列。一個節點表明一個線程,保存着一個標記位locked,用以判斷當前線程是否已經釋放鎖。當一個線程試圖獲取鎖時,從隊列尾節點做爲前序節點,循環判斷全部的前序節點是否已經成功釋放鎖。服務器
以下圖所示:多線程
咱們首先看Stampedlock有哪些屬性先,源碼以下:併發
private static final long serialVersionUID = -6001602636862214147L; /** 獲取服務器CPU核數 */ private static final int NCPU = Runtime.getRuntime().availableProcessors(); /** 線程入隊列前自旋次數 */ private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0; /** 隊列頭結點自旋獲取鎖最大失敗次數後再次進入隊列 */ private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 0; /** 從新阻塞前的最大重試次數 */ private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 0; /** The period for yielding when waiting for overflow spinlock */ private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1 /** 溢出以前用於閱讀器計數的位數 */ private static final int LG_READERS = 7; // 鎖定狀態和stamp操做的值 private static final long RUNIT = 1L; private static final long WBIT = 1L << LG_READERS; private static final long RBITS = WBIT - 1L; private static final long RFULL = RBITS - 1L; private static final long ABITS = RBITS | WBIT; //前8位都爲1 private static final long SBITS = ~RBITS; // 1 1000 0000 //鎖state初始值,第9位爲1,避免算術時和0衝突 private static final long ORIGIN = WBIT << 1; // 來自取消獲取方法的特殊值,所以調用者能夠拋出IE private static final long INTERRUPTED = 1L; // WNode節點的status值 private static final int WAITING = -1; private static final int CANCELLED = 1; // WNode節點的讀寫模式 private static final int RMODE = 0; private static final int WMODE = 1; /** Wait nodes */ static final class WNode { volatile WNode prev; volatile WNode next; volatile WNode cowait; // 讀模式使用該節點造成棧 volatile Thread thread; // non-null while possibly parked volatile int status; // 0, WAITING, or CANCELLED final int mode; // RMODE or WMODE WNode(int m, WNode p) { mode = m; prev = p; } } /** CLH隊頭節點 */ private transient volatile WNode whead; /** CLH隊尾節點 */ private transient volatile WNode wtail; // views transient ReadLockView readLockView; transient WriteLockView writeLockView; transient ReadWriteLockView readWriteLockView; /** 鎖隊列狀態, 當處於寫模式時第8位爲1,讀模式時前7爲爲1-126(附加的readerOverflow用於當讀者超過126時) */ private transient volatile long state; /** 將state超過 RFULL=126的值放到readerOverflow字段中 */ private transient int readerOverflow;
StampedLockd源碼中的WNote就是等待鏈表隊列,每個WNode標識一個等待線程,whead爲CLH隊列頭,wtail爲CLH隊列尾,state爲鎖的狀態。long型即64位,倒數第八位標識寫鎖狀態,若是爲1,標識寫鎖佔用!下面圍繞這個state來說述鎖操做。函數
首先是常量標識:性能
WBIT=1000 0000(即-128)測試
RBIT =0111 1111(即127) ui
SBIT =1000 0000(後7位表示當前正在讀取的線程數量,清0)this
StampedLock 給咱們提供了3種讀寫模式的鎖,以下:
1.寫鎖writeLock是一個獨佔鎖,同時只有一個線程能夠獲取該鎖,當一個線程獲取該鎖後,其餘請求讀鎖和寫鎖的線程必須等待,這跟ReentrantReadWriteLock 的寫鎖很類似,不過要注意的是StampedLock的寫鎖是不可重入鎖,
當目前沒有線程持有讀鎖或者寫鎖的時候才能夠獲取到該鎖,請求該鎖成功後會返回一個stamp 票據變量來表示該鎖的版本,以下源碼所示:
/** * *獲取寫鎖,獲取失敗會一直阻塞,直到得到鎖成功 * @return 能夠用來解鎖或轉換模式的戳記(128的整數) */ public long writeLock() { long s, next; return ((((s = state) & ABITS) == 0L && // 徹底沒有任何鎖(沒有讀鎖和寫鎖)的時候能夠經過 U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ? //第8位置爲1 next : acquireWrite(false, 0L)); }
writeLock():典型的cas操做,若是STATE等於s,設置寫鎖位爲1(s+WBIT)。acquireWrite跟acquireRead邏輯相似,先自旋嘗試、加入等待隊列、直至最終Unsafe.park()掛起線程。
private long acquireWrite(boolean interruptible, long deadline) { WNode node = null, p; for (int spins = -1;;) { // 入隊時自旋 long m, s, ns; //無鎖 if ((m = (s = state) & ABITS) == 0L) { if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) return ns; } else if (spins < 0) //持有寫鎖,而且隊列爲空 spins = (m == WBIT && wtail == whead) ? SPINS : 0; else if (spins > 0) { //恆成立 if (LockSupport.nextSecondarySeed() >= 0) --spins; } else if ((p = wtail) == null) { //初始化隊列,寫鎖入隊列 WNode hd = new WNode(WMODE, null); if (U.compareAndSwapObject(this, WHEAD, null, hd)) wtail = hd; } else if (node == null) //不爲空,寫鎖入隊列 node = new WNode(WMODE, p); else if (node.prev != p) node.prev = p; else if (U.compareAndSwapObject(this, WTAIL, p, node)) { p.next = node; break;//入隊列成功退出循環 } } for (int spins = -1;;) { WNode h, np, pp; int ps; //前驅節點爲頭節點 if ((h = whead) == p) { if (spins < 0) spins = HEAD_SPINS; else if (spins < MAX_HEAD_SPINS) spins <<= 1; for (int k = spins;;) { // spin at head long s, ns; //無鎖 if (((s = state) & ABITS) == 0L) { if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) { //當前節點設置爲頭結點 whead = node; node.prev = null; return ns; } } else if (LockSupport.nextSecondarySeed() >= 0 && --k <= 0) break; } } else if (h != null) { // help release stale waiters WNode c; Thread w; //頭結點爲讀鎖將棧中全部讀鎖線程喚醒 while ((c = h.cowait) != null) { if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); } } // if (whead == h) { if ((np = node.prev) != p) { if (np != null) (p = np).next = node; // stale } else if ((ps = p.status) == 0) //前驅節點置爲等待狀態 U.compareAndSwapInt(p, WSTATUS, 0, WAITING); else if (ps == CANCELLED) { if ((pp = p.prev) != null) { node.prev = pp; pp.next = node; } } else { long time; // 0 argument to park means no timeout if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, node, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); node.thread = wt; if (p.status < 0 && (p != h || (state & ABITS) != 0L) && whead == h && node.prev == p) U.park(false, time); // emulate LockSupport.park node.thread = null; U.putObject(wt, PARKBLOCKER, null); if (interruptible && Thread.interrupted()) return cancelWaiter(node, node, true); } } } }
而且StampedLock還提供了非阻塞tryWriteLock方法,源碼以下:
/** * 沒有任何鎖時則獲取寫鎖,不然返回0 * * @return 能夠用來解鎖或轉換模式的戳記(128的整數),獲取失敗返回0 */ public long tryWriteLock() { long s, next; return ((((s = state) & ABITS) == 0L && U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ? next : 0L); }
/** * unit時間內得到寫鎖成功返回狀態值,失敗返回0,或拋出InterruptedException * @return 0:得到鎖失敗 * @throws InterruptedException 線程得到鎖以前調用interrupt()方法拋出的異常 */ public long tryWriteLock(long time, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(time); if (!Thread.interrupted()) { long next, deadline; if ((next = tryWriteLock()) != 0L) //得到鎖成功 return next; if (nanos <= 0L) //超時返回0 return 0L; if ((deadline = System.nanoTime() + nanos) == 0L) deadline = 1L; if ((next = acquireWrite(true, deadline)) != INTERRUPTED) //規定時間內得到鎖結果 return next; } throw new InterruptedException(); }
當釋放該鎖的時候須要調用unlockWrite方法並傳遞獲取鎖的時候的stamp參數。源碼以下:
/** * state匹配stamp則釋放寫鎖, * @throws IllegalMonitorStateException 不匹配則拋出異常 */ public void unlockWrite(long stamp) { WNode h; //state不匹配stamp 或者 沒有寫鎖 if (state != stamp || (stamp & WBIT) == 0L) throw new IllegalMonitorStateException(); //state += WBIT, 第8位置爲0,但state & SBITS 會循環,一共有4個值 state = (stamp += WBIT) == 0L ? ORIGIN : stamp; if ((h = whead) != null && h.status != 0) //喚醒繼承者節點線程 release(h); }
unlockWrite():釋放鎖與加鎖動做相反。將寫標記位清零,若是state溢出,則退回到初始值;
2.悲觀鎖readLock,是個共享鎖,在沒有線程獲取獨佔寫鎖的狀況下,同時多個線程能夠獲取該鎖;若是已經有線程持有寫鎖,其餘線程請求獲取該鎖會被阻塞,這相似ReentrantReadWriteLock 的讀鎖(不一樣在於這裏的讀鎖是不可重入鎖)。
這裏說的悲觀是指在具體操做數據前,悲觀的認爲其餘線程可能要對本身操做的數據進行修改,因此須要先對數據加鎖,這是在讀少寫多的狀況下的一種考慮,請求該鎖成功後會返回一個stamp票據變量來表示該鎖的版本,源碼以下:
/** * 悲觀讀鎖,非獨佔鎖,爲得到鎖一直處於阻塞狀態,直到得到鎖爲止 */ public long readLock() { long s = state, next; // 隊列爲空 && 沒有寫鎖同時讀鎖數小於126 && CAS修改狀態成功 則狀態加1並返回,不然自旋獲取讀鎖 return ((whead == wtail && (s & ABITS) < RFULL && U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ? next : acquireRead(false, 0L)); }
樂觀鎖失敗後鎖升級爲readLock():嘗試state+1,用於統計讀線程的數量,若是失敗,進入acquireRead()進行自旋,經過CAS獲取鎖。
若是自旋失敗,入CLH隊列,而後再自旋,若是成功得到讀鎖,則激活cowait隊列中的讀線程Unsafe.unpark(),若是最終依然失敗,則Unsafe().park()掛起當前線程。
/** * @param interruptible 是否容許中斷 * @param 標識超時限時(0表明不限時),而後進入循環。 * @return next state, or INTERRUPTED */ private long acquireRead(boolean interruptible, long deadline) { WNode node = null, p; //自旋 for (int spins = -1;;) { WNode h; //判斷隊列爲空 if ((h = whead) == (p = wtail)) { //定義 long m,s,ns,並循環 for (long m, s, ns;;) { //將state超過 RFULL=126的值放到readerOverflow字段中 if ((m = (s = state) & ABITS) < RFULL ? U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) //獲取鎖成功返回 return ns; //state高8位大於0,那麼說明當前鎖已經被寫鎖獨佔,那麼咱們嘗試自旋 + 隨機的方式來探測狀態 else if (m >= WBIT) { if (spins > 0) { if (LockSupport.nextSecondarySeed() >= 0) --spins; } else { if (spins == 0) { WNode nh = whead, np = wtail; //一直獲取鎖失敗,或者有線程入隊列了退出內循環自旋,後續進入隊列 if ((nh == h && np == p) || (h = nh) != (p = np)) break; } //自旋 SPINS 次 spins = SPINS; } } } } if (p == null) { //初始隊列 WNode hd = new WNode(WMODE, null); if (U.compareAndSwapObject(this, WHEAD, null, hd)) wtail = hd; } //當前節點爲空則構建當前節點,模式爲RMODE,前驅節點爲p即尾節點。 else if (node == null) node = new WNode(RMODE, p); //當前隊列爲空即只有一個節點(whead=wtail)或者當前尾節點的模式不是RMODE,那麼咱們會嘗試在尾節點後面添加該節點做爲尾節點,而後跳出外層循環 else if (h == p || p.mode != RMODE) { if (node.prev != p) node.prev = p; else if (U.compareAndSwapObject(this, WTAIL, p, node)) { p.next = node; //入隊列成功,退出自旋 break; } } //隊列不爲空而且是RMODE模式, 添加該節點到尾節點的cowait鏈(實際上構成一個讀線程stack)中 else if (!U.compareAndSwapObject(p, WCOWAIT, node.cowait = p.cowait, node)) //失敗處理 node.cowait = null; else { //經過CAS方法將該節點node添加至尾節點的cowait鏈中,node成爲cowait中的頂元素,cowait構成了一個LIFO隊列。 //循環 for (;;) { WNode pp, c; Thread w; //嘗試unpark頭元素(whead)的cowait中的第一個元素,假如是讀鎖會經過循環釋放cowait鏈 if ((h = whead) != null && (c = h.cowait) != null && U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); //node所在的根節點p的前驅就是whead或者p已是whead或者p的前驅爲null if (h == (pp = p.prev) || h == p || pp == null) { long m, s, ns; do { //根據state再次積極的嘗試獲取鎖 if ((m = (s = state) & ABITS) < RFULL ? U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) return ns; } while (m < WBIT);//條件爲讀模式 } if (whead == h && p.prev == pp) { long time; if (pp == null || h == p || p.status > 0) { //這樣作的緣由是被其餘線程闖入奪取了鎖,或者p已經被取消 node = null; // throw away break; } if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, p, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); node.thread = wt; if ((h != pp || (state & ABITS) == WBIT) && whead == h && p.prev == pp) U.park(false, time); node.thread = null; U.putObject(wt, PARKBLOCKER, null); //出現的中斷狀況下取消當前節點的cancelWaiter操做 if (interruptible && Thread.interrupted()) return cancelWaiter(node, p, true); } } } } for (int spins = -1;;) { WNode h, np, pp; int ps; if ((h = whead) == p) { if (spins < 0) spins = HEAD_SPINS; else if (spins < MAX_HEAD_SPINS) spins <<= 1; for (int k = spins;;) { // spin at head long m, s, ns; if ((m = (s = state) & ABITS) < RFULL ? U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) { WNode c; Thread w; whead = node; node.prev = null; while ((c = node.cowait) != null) { if (U.compareAndSwapObject(node, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); } return ns; } else if (m >= WBIT && LockSupport.nextSecondarySeed() >= 0 && --k <= 0) break; } } else if (h != null) { WNode c; Thread w; while ((c = h.cowait) != null) { if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); } } if (whead == h) { if ((np = node.prev) != p) { if (np != null) (p = np).next = node; // stale } else if ((ps = p.status) == 0) U.compareAndSwapInt(p, WSTATUS, 0, WAITING); else if (ps == CANCELLED) { if ((pp = p.prev) != null) { node.prev = pp; pp.next = node; } } else { long time; if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, node, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); node.thread = wt; if (p.status < 0 && (p != h || (state & ABITS) == WBIT) && whead == h && node.prev == p) U.park(false, time); node.thread = null; U.putObject(wt, PARKBLOCKER, null); if (interruptible && Thread.interrupted()) return cancelWaiter(node, node, true); } } } }
而且StampedLock還提供了非阻塞tryReadLock方法,源碼以下:
/** * 能夠當即得到鎖,則獲取讀鎖,不然返回0 */ public long tryReadLock() { for (;;) { long s, m, next; //持有寫鎖返回0 if ((m = (s = state) & ABITS) == WBIT) return 0L; //讀線程數 < RFULL,CAS變動狀態 else if (m < RFULL) { if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) return next; } //將state超過 RFULL的值放到readerOverflow字段 else if ((next = tryIncReaderOverflow(s)) != 0L) return next; } }
/** * unit時間內得到讀鎖成功返回狀態值,失敗返回0,或拋出InterruptedException */ public long tryReadLock(long time, TimeUnit unit) throws InterruptedException { long s, m, next, deadline; long nanos = unit.toNanos(time); if (!Thread.interrupted()) { if ((m = (s = state) & ABITS) != WBIT) { if (m < RFULL) { if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) return next; } else if ((next = tryIncReaderOverflow(s)) != 0L) return next; } if (nanos <= 0L) return 0L; if ((deadline = System.nanoTime() + nanos) == 0L) deadline = 1L; if ((next = acquireRead(true, deadline)) != INTERRUPTED) return next; } throw new InterruptedException(); }
StampedLock的悲觀讀鎖readLock 當釋放該鎖時候須要 unlockRead 並傳遞參數 stamp。源碼以下:
/** * state匹配stamp則釋放讀鎖, */ public void unlockRead(long stamp) { long s, m; WNode h; for (;;) { //不匹配拋出異常 if (((s = state) & SBITS) != (stamp & SBITS) || (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT) throw new IllegalMonitorStateException(); //小於最大記錄數值 if (m < RFULL) { if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) { if (m == RUNIT && (h = whead) != null && h.status != 0) release(h); break; } } //不然readerOverflow減一 else if (tryDecReaderOverflow(s) != 0L) break; } }
3.樂觀讀鎖 tryOptimisticRead,是相對於悲觀鎖來講的,在操做數據前並無經過 CAS 設置鎖的狀態,僅僅是經過位運算測試;若是當前沒有線程持有寫鎖,則簡單的返回一個非 0 的 stamp 版本信息,
獲取該 stamp 後在具體操做數據前還須要調用 validate 驗證下該 stamp 是否已經不可用,也就是看當調用 tryOptimisticRead 返回 stamp 後,到當前時間是否有其它線程持有了寫鎖,若是是那麼 validate 會返回 0,
否者就可使用該 stamp 版本的鎖對數據進行操做。因爲 tryOptimisticRead 並無使用 CAS 設置鎖狀態,因此不須要顯示的釋放該鎖。
該鎖的一個特色是適用於讀多寫少的場景,由於獲取讀鎖只是使用位操做進行檢驗,不涉及 CAS 操做,因此效率會高不少,可是同時因爲沒有使用真正的鎖,在保證數據一致性上須要拷貝一份要操做的變量到方法棧,而且在操做數據時候可能其它寫線程已經修改了數據,
而咱們操做的是方法棧裏面的數據,也就是一個快照,因此最多返回的不是最新的數據,可是一致性仍是獲得保障的。源碼以下:
/** * 獲取樂觀讀鎖,返回郵票stamp */ public long tryOptimisticRead() { long s; //有寫鎖返回0. 不然返回256 return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L; }
tryOptimisticRead():若是當前沒有寫鎖佔用,返回state(後7位清0,即清0讀線程數),若是有寫鎖,返回0,即失敗。
/** * 驗證從調用tryOptimisticRead開始到如今這段時間內有無寫鎖佔用過鎖資源,有寫鎖得到過鎖資源則返回false. stamp爲0返回false. * @return 從返回stamp開始,沒有寫鎖得到過鎖資源返回true,不然返回false */ public boolean validate(long stamp) { //強制讀取操做和驗證操做在一些狀況下的內存排序問題 U.loadFence(); //當持有寫鎖後再釋放寫鎖,該校驗也不成立,返回false return (stamp & SBITS) == (state & SBITS); }
StamedLock還支持這三種鎖在必定條件下進行相互轉換,例如long tryConvertToWriteLock(long stamp)指望把stamp標示的鎖升級爲寫鎖,這個函數會在下面幾種狀況下返回一個有效的 stamp(也就是晉升寫鎖成功):
1.當前鎖已是寫鎖模式了。
2.當前鎖處於讀鎖模式,而且沒有其餘線程是讀鎖模式
3.當前處於樂觀讀模式,而且當前寫鎖可用。
源碼以下:
/** * state匹配stamp時, 執行下列操做之一. * 一、stamp 已經持有寫鎖,直接返回. * 二、讀模式,可是沒有更多的讀取者,並返回一個寫鎖stamp. * 三、有一個樂觀讀鎖,只在即時可用的前提下返回一個寫鎖stamp * 四、其餘狀況都返回0 */ public long tryConvertToWriteLock(long stamp) { long a = stamp & ABITS, m, s, next; //state匹配stamp while (((s = state) & SBITS) == (stamp & SBITS)) { //沒有鎖 if ((m = s & ABITS) == 0L) { if (a != 0L) break; //CAS修改狀態爲持有寫鎖,並返回 if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) return next; } //持有寫鎖 else if (m == WBIT) { if (a != m) //其餘線程持有寫鎖 break; //當前線程已經持有寫鎖 return stamp; } //有一個讀鎖 else if (m == RUNIT && a != 0L) { //釋放讀鎖,並嘗試持有寫鎖 if (U.compareAndSwapLong(this, STATE, s, next = s - RUNIT + WBIT)) return next; } else break; } return 0L; }
/** * state匹配stamp時, 執行下列操做之一. 一、stamp 表示持有寫鎖,釋放寫鎖,並持有讀鎖 2 stamp 表示持有讀鎖 ,返回該讀鎖 3 有一個樂觀讀鎖,只在即時可用的前提下返回一個讀鎖stamp 四、其餘狀況都返回0,表示失敗 * */ public long tryConvertToReadLock(long stamp) { long a = stamp & ABITS, m, s, next; WNode h; //state匹配stamp while (((s = state) & SBITS) == (stamp & SBITS)) { //沒有鎖 if ((m = s & ABITS) == 0L) { if (a != 0L) break; else if (m < RFULL) { if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) return next; } else if ((next = tryIncReaderOverflow(s)) != 0L) return next; } //寫鎖 else if (m == WBIT) { //非當前線程持有寫鎖 if (a != m) break; //釋放寫鎖持有讀鎖 state = next = s + (WBIT + RUNIT); if ((h = whead) != null && h.status != 0) release(h); return next; } //持有讀鎖 else if (a != 0L && a < WBIT) return stamp; else break; } return 0L; }
校驗這個戳是否有效validate():比較當前stamp和發生樂觀鎖獲得的stamp比較,不一致則失敗。
還有一個轉換成樂觀鎖tryConvertToOptimisticRead(long stamp) ,這裏就不講了,道理都差很少。
另外 StampedLock 的讀寫鎖都是不可重入鎖,因此當獲取鎖後釋放鎖前,不該該再調用會獲取鎖的操做,以免產生死鎖。
當多個線程同時嘗試獲取讀鎖和寫鎖的時候,誰先獲取鎖沒有必定的規則,徹底都是盡力而爲,是隨機的,而且該鎖不是直接實現 Lock 或 ReadWriteLock 接口,而是內部本身維護了一個雙向阻塞隊列。
下面經過 JDK8 裏面提供的一個管理二維點的例子講解來加深對上面講解的理解。代碼以下所示:
package com.hjc; import java.util.concurrent.locks.StampedLock; /** * Created by cong on 2018/6/16. */ public class Point { // 成員變量 private double x, y; // 鎖實例 private final StampedLock sl = new StampedLock(); // 排它鎖-寫鎖(writeLock) void move(double deltaX, double deltaY) { long stamp = sl.writeLock(); try { x += deltaX; y += deltaY; } finally { sl.unlockWrite(stamp); } } // 樂觀讀鎖(tryOptimisticRead) double distanceFromOrigin() { // 嘗試獲取樂觀讀鎖(1) long stamp = sl.tryOptimisticRead(); // 將所有變量拷貝到方法體棧內(2) double currentX = x, currentY = y; // 檢查在(1)獲取到讀鎖票據後,鎖有沒被其它寫線程排它性搶佔(3) if (!sl.validate(stamp)) { // 若是被搶佔則獲取一個共享讀鎖(悲觀獲取)(4) stamp = sl.readLock(); try { // 將所有變量拷貝到方法體棧內(5) currentX = x; currentY = y; } finally { // 釋放共享讀鎖(6) sl.unlockRead(stamp); } } // 返回計算結果(7) return Math.sqrt(currentX * currentX + currentY * currentY); } // 使用悲觀鎖獲取讀鎖,並嘗試轉換爲寫鎖 void moveIfAtOrigin(double newX, double newY) { // 這裏可使用樂觀讀鎖替換(1) long stamp = sl.readLock(); try { // 若是當前點在原點則移動(2) while (x == 0.0 && y == 0.0) { // 嘗試將獲取的讀鎖升級爲寫鎖(3) long ws = sl.tryConvertToWriteLock(stamp); // 升級成功,則更新票據,並設置座標值,而後退出循環(4) if (ws != 0L) { stamp = ws; x = newX; y = newY; break; } else { // 讀鎖升級寫鎖失敗則釋放讀鎖,顯示獲取獨佔寫鎖,而後循環重試(5) sl.unlockRead(stamp); stamp = sl.writeLock(); } } } finally { // 釋放鎖(6) sl.unlock(stamp); } } }
如上代碼 Point 類裏面有兩個成員變量(x,y) 來標示一個點的二維座標,和三個操做座標變量的方法,另外實例化了一個 StampedLock 對象用來保證操做的原子性。
首先分析下 move 方法,該函數做用是使用參數的增量值,改變當前 point 座標的位置;代碼先獲取到了寫鎖,而後對 point 座標進行修改,而後釋放鎖。該鎖是排它鎖,這保證了其它線程調用 move 函數時候會被阻塞,也保證了其它線程不能獲取讀鎖,讀取座標的值,直到當前線程顯示釋放了寫鎖,
也就是保證了對變量 x,y 操做的原子性和數據一致性。
接下來再看 distanceFromOrigin 方法,該方法做用是計算當前位置到原點(座標爲 0,0)的距離,代碼(1)首先嚐試獲取樂觀讀鎖,若是當前沒有其它線程獲取到了寫鎖,那麼(1)會返回一個非 0 的 stamp 用來表示版本信息,代碼(2)拷貝座標變量到本地方法棧裏面。
代碼(3)檢查在(1)獲取到的 stamp 值是否還有效,之因此還要在此校驗是由於代碼(1)獲取讀鎖時候並無經過 CAS 操做修改鎖的狀態,而是簡單的經過與或操做返回了一個版本信息,這裏校驗是看在在獲取版本信息到如今的時間段裏面是否有其它線程持有了寫鎖,若是有則以前獲取的版本信息就無效了。
這裏若是校驗成功則執行(7)使用本地方法棧裏面的值進行計算而後返回。須要注意的是在代碼(3) 校驗成功後,代碼(7)計算期間,其它線程可能獲取到了寫鎖而且修改了 x,y 的值,而當前線程執行代碼(7)進行計算時候採用的仍是修改前值的拷貝,也就是操做的值是對以前值的一個拷貝,一個快照,並非最新的值。
也許咱們會想,代碼(2) 和(3)可否互換?。
答案是明顯不能的,若是位置換了,那麼首先執行validate ,假設驗證經過了,要拷貝x,y 值到本地方法棧,而在拷貝的過程當中頗有可能其餘線程已經修改過了 x,y 中的一個,這就形成了數據的不一致性了。
那麼你可能還會這樣會想,即便不交換代碼 (2) 和(3),在拷貝 x,y 值到本地方法棧裏面時,也會存在其餘線程修改了x,y中的一個值,這不也會存在問題嗎?
這個確實會存在,可是別忘記了拷貝後還有一道validate,若是這時候有線程修改了x,y 中的值,那麼確定是有線程在調用 validate 前,調用 sl.tryOptimisticRead 後獲取了寫鎖,那麼進行 validate 時候就會失敗。
好了知道這麼多原理後,咱們就會驚歎這也是樂觀讀設計的精妙之處也是使用時候容易出問題的地方。下面繼續分析 validate 失敗後會執行代碼(4)獲取悲觀讀鎖,若是這時候其餘線程持有寫鎖則代碼(4)會致使的當前線程阻塞直到其它線程釋放了寫鎖。
若是這時候沒有其餘線程獲取到寫鎖,那麼當前線程就能夠獲取到讀鎖,而後執行代碼(5)從新拷貝新的座標值到本地方法棧,而後就是代碼(6)釋放了鎖,拷貝的時候因爲加了讀鎖,因此拷貝期間其它線程獲取寫鎖時候會被阻塞,
這保證了數據的一致性,另外這裏 x,y 沒有被聲明爲 volatie,會不會存在內存不可見性問題那?答案是不會,由於加鎖的語義保證了內存可見性,
最後代碼(7)使用方法棧裏面數據計算返回,同理這裏在計算時候使用的數據也可能不是最新的,其它寫線程可能已經修改過原來的 x,y 值了。
最後一個方法 moveIfAtOrigin 做用是若是當前座標爲原點則移動到指定的位置。代碼(1)獲取悲觀讀鎖,保證其它線程不能獲取寫鎖修改 x,y 值,而後代碼(2)判斷若是當前點在原點則更新座標,
代碼(3) 嘗試升級讀鎖爲寫鎖,這裏升級不必定成功,由於多個線程均可以同時獲取悲觀讀鎖,當多個線程都執行到(3)時候只有一個能夠升級成功,升級成功則返回非 0 的 stamp,否非返回 0。
這裏假設當前線程升級成功,而後執行步驟(4)更新 stamp 值和座標值,而後退出循環。若是升級失敗則執行步驟(5)首先釋放讀鎖而後申請寫鎖,獲取到寫鎖後在循環從新設置座標值。最後步驟(6) 釋放鎖。
使用樂觀讀鎖仍是很容易犯錯誤的,必需要嚴謹,必需要保證以下的使用順序,用僞代碼做爲講解,以下:
long stamp = lock.tryOptimisticRead(); //非阻塞獲取版本信息 copyVaraibale2ThreadMemory();//拷貝變量到線程本地堆棧 if(!lock.validate(stamp)){ // 校驗 long stamp = lock.readLock();//獲取讀鎖 try { copyVaraibale2ThreadMemory();//拷貝變量到線程本地堆棧 } finally { lock.unlock(stamp);//釋放悲觀鎖 } } useThreadMemoryVarables();//使用線程本地堆棧裏面的數據進行操做
總結:StampedLock 提供的讀寫鎖與 ReentrantReadWriteLock 相似,只是前者的都是不可重入鎖。可是前者經過提供樂觀讀鎖在多線程多讀的狀況下提供更好的性能,這是由於獲取樂觀讀鎖時候不須要進行 CAS 操做設置鎖的狀態,而只是簡單的測試狀態。