在 JUC 包中,共享鎖包括 CountDownLatch、CyclicBarrier、Semaphore、ReentrantReadWriteLock、JDK1.8 新增的 StampedLock 等。java
ReentrantReadWriteLock 中定義了兩個鎖:共享鎖 readLock 和獨佔鎖 writeLock。
共享鎖 readLock 用於讀操做,能同時被多個線程獲取;獨佔鎖 writeLock 用於寫入操做,只能被一個線程持有。
讀鎖、寫鎖均具備公平模式、非公平模式兩種獲取鎖的方式。c#
因爲 ReentrantReadWriteLock 是基於 AQS(AbstractQueuedSynchronizer)框架實現的鎖工具,在閱讀 ReentrantReadWriteLock 源碼以前,須要先了解 AQS 中的獨佔模式、共享模式、Condition 機制的實現原理,相關內容可翻閱個人前幾篇文章,文末已給出連接。segmentfault
本文基於 jdk1.8.0_91
@[toc]緩存
ReentrantReadWriteLock 具備三個重要的內部類:ReadLock、WriteLock、Sync併發
讀寫鎖是一種特殊的鎖,它把對共享資源的訪問分爲讀訪問和寫訪問,多個線程能夠同時對共享資源進行讀訪問,可是同一時間只能有一個線程對共享資源進行寫訪問,使用讀寫鎖能夠極大地提升吞吐量。app
讀與寫之間是否互斥:框架
讀 | 寫 | |
---|---|---|
讀 | 否 | 是 |
寫 | 是 | 是 |
能夠看到,讀寫鎖除了讀讀不互斥,讀寫、寫讀、寫寫都是互斥的。ide
經常使用業務場景:讀多寫少,好比服務緩存等。高併發
public interface ReadWriteLock { /** * Returns the lock used for reading. * * @return the lock used for reading */ Lock readLock(); /** * Returns the lock used for writing. * * @return the lock used for writing */ Lock writeLock(); }
ReentrantReadWriteLock 是接口 ReadWriteLock 的實現。工具
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { private static final long serialVersionUID = -6992448646407690164L; /** Inner class providing readlock */ private final ReentrantReadWriteLock.ReadLock readerLock; /** Inner class providing writelock */ private final ReentrantReadWriteLock.WriteLock writerLock; public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; } }
ReadLock、WriteLock 均實現了接口 Lock。
/** * The lock returned by method {@link ReentrantReadWriteLock#readLock}. */ public static class ReadLock implements Lock, java.io.Serializable {...} /** * The lock returned by method {@link ReentrantReadWriteLock#writeLock}. */ public static class WriteLock implements Lock, java.io.Serializable {...}
ReentrantReadWriteLock 具備內部類 Sync。跟 ReentrantLock 同樣,Sync 繼承自 AQS。
ReentrantReadWriteLock 中的 ReadLock、WriteLock 對 Lock 的實現都是經過 Sync 來作到的。
Lock 接口 ReadLock 實現 WriteLock 實現 lock() sync.acquireShared(1) sync.acquire(1) lockInterruptibly() sync.acquireSharedInterruptibly(1) sync.acquireInterruptibly(1) tryLock() sync.tryReadLock() sync.tryWriteLock() tryLock(long time, TimeUnit unit) sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)) sync.tryAcquireNanos(1, unit.toNanos(timeout)) unlock() sync.releaseShared(1) sync.release(1) newCondition() UnsupportedOperationException sync.newCondition()
能夠看到,ReadLock 是共享鎖,WriteLock 是獨佔鎖。ReadLock 不支持使用 Condition。
AQS 中提供了一系列的模板方法,在 Sync 中均獲得實現。
java.util.concurrent.locks.AbstractQueuedSynchronizer
// 獨佔獲取(資源數) protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } // 獨佔釋放(資源數) protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } // 共享獲取(資源數) protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } // 共享獲取(資源數) protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } // 是否排它狀態 protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); }
Sync 實現了 AQS 中所有的模板方法,所以能夠同時支持獨佔鎖、共享鎖兩種實現。
此外,還提供了兩個抽象方法,用於制定公平性策略,強制子類實現。
abstract static class Sync extends AbstractQueuedSynchronizer { /** * Returns true if the current thread, when trying to acquire * the read lock, and otherwise eligible to do so, should block * because of policy for overtaking other waiting threads. */ abstract boolean readerShouldBlock(); /** * Returns true if the current thread, when trying to acquire * the write lock, and otherwise eligible to do so, should block * because of policy for overtaking other waiting threads. */ abstract boolean writerShouldBlock(); }
Sync 具備兩個子類 FailSync 和 NonfairSync。對應的是 ReentrantReadWriteLock 的兩種實現:公平鎖(fair lock)、非公平鎖(non-fair lock)。
這兩個子類只須要實現 Sync 中的 readerShouldBlock、writerShouldBlock 方法。
公平鎖和非公平鎖的區別,主要體如今獲取讀寫鎖的機制不一樣:
若是當前線程沒法獲取鎖,會拋出異常,或進入同步隊列進行排隊等待。
公平模式:無論獲取讀鎖仍是寫鎖,都要嚴格按照前後順序排隊獲取。
java.util.concurrent.locks.ReentrantReadWriteLock.FairSync
/** * Fair version of Sync */ static final class FairSync extends Sync { private static final long serialVersionUID = -2274990926593161451L; final boolean writerShouldBlock() { return hasQueuedPredecessors(); // 若是同步隊列中等待時間最長的節點不是當前線程,返回true;不然返回false } final boolean readerShouldBlock() { return hasQueuedPredecessors(); } }
java.util.concurrent.locks.AbstractQueuedSynchronizer#hasQueuedPredecessors
public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && // 頭節點的下一個節點,不是當前線程的節點,說明當前線程等待鎖時間不是最長的 ((s = h.next) == null || s.thread != Thread.currentThread()); }
非公平模式:獲取寫鎖能夠當即獲取,無需排隊;獲取讀鎖以前,若判斷等待時間最長的是寫線程,則讀線程需讓步(重入讀除外),進入阻塞。
java.util.concurrent.locks.ReentrantReadWriteLock.NonfairSync
/** * Nonfair version of Sync */ static final class NonfairSync extends Sync { private static final long serialVersionUID = -8159625535654395037L; final boolean writerShouldBlock() { return false; // writers can always barge // 獲取寫鎖無需阻塞 } final boolean readerShouldBlock() { /* As a heuristic to avoid indefinite writer starvation, * block if the thread that momentarily appears to be head * of queue, if one exists, is a waiting writer. This is * only a probabilistic effect since a new reader will not * block if there is a waiting writer behind other enabled * readers that have not yet drained from the queue. */ // 爲了不寫飢餓,若是同步隊列中等待時間最長的節點是互斥節點,則獲取讀鎖須要阻塞,返回true。 return apparentlyFirstQueuedIsExclusive(); } }
java.util.concurrent.locks.AbstractQueuedSynchronizer#apparentlyFirstQueuedIsExclusive
final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && // 頭節點h不爲空 (s = h.next) != null && // 存在等待中的節點s !s.isShared() && // 節點s不是共享模式,即互斥 s.thread != null; // 節點s不是無效節點 }
構造 ReentrantReadWriteLock 的時候,會依據公平或非公平模式實例化 Sync,再使用 Sync 來構造 ReadLock、WriteLock 實例。
java.util.concurrent.locks.ReentrantReadWriteLock
/** Performs all synchronization mechanics */ final Sync sync; /** * Creates a new {@code ReentrantReadWriteLock} with * default (nonfair) ordering properties. */ public ReentrantReadWriteLock() { this(false); } /** * Creates a new {@code ReentrantReadWriteLock} with * the given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */ public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }
默認使用非公平鎖。
將 Sync 實例傳遞給 ReadLock、WriteLock 實例。
java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock
public static class ReadLock implements Lock, java.io.Serializable { private final Sync sync; protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; } }
java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock
public static class WriteLock implements Lock, java.io.Serializable { private final Sync sync; protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync; } }
使用 AQS 的屬性 state 來表示資源/鎖。
/** * The synchronization state. */ private volatile int state;
ReentrantLock 使用 state 表示當前共享資源是否被其餘線程鎖佔用。若是爲 0 則表示未被佔用,其餘值表示該鎖被重入的次數。
ReentrantReadWriteLock 使用 state 的高 16 位表示讀狀態,也就是獲取到讀鎖的次數;使用低 16 位表示寫狀態,也就是獲取到寫鎖的次數。
/* * Read vs write count extraction constants and functions. * Lock state is logically divided into two unsigned shorts: * The lower one representing the exclusive (writer) lock hold count, * and the upper the shared (reader) hold count. */ static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; // 十進制爲:65535 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 二進制爲:1111 1111 1111 1111 /** Returns the number of shared holds represented in count */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } // 共享鎖(讀)的數量,取高16位 /** Returns the number of exclusive holds represented in count */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } // 互斥鎖(寫)的數量,取低16位
對於指定的 state 值,使用 sharedCount()
計算獲得共享鎖(讀鎖)的數量,使用 exclusiveCount()
計算獲得互斥鎖(寫鎖)的數量。
可知,ReentrantReadWriteLock 最多支持 65535 個遞歸寫入鎖和 65535 個讀取鎖。
Sync 中定義了兩個內部類:
/** * A counter for per-thread read hold counts. * Maintained as a ThreadLocal; cached in cachedHoldCounter */ static final class HoldCounter { // 用來保存線程id、該線程持有共享鎖的數量 int count = 0; // Use id, not reference, to avoid garbage retention // 記錄線程id,不記錄線程引用,防止沒法GC final long tid = getThreadId(Thread.currentThread()); } /** * ThreadLocal subclass. Easiest to explicitly define for sake * of deserialization mechanics. */ static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } /** * The number of reentrant read locks held by current thread. * Initialized only in constructor and readObject. * Removed whenever a thread's read hold count drops to 0. */ // 存儲全部線程的id,以及各自持有鎖的數量 private transient ThreadLocalHoldCounter readHolds; // 緩存,存儲最後一個獲取鎖的 HoldCounter:線程id,該線程持有鎖的數量 private transient HoldCounter cachedHoldCounter; // 緩存,存儲第一個獲取鎖的線程 private transient Thread firstReader = null; // 緩存,記錄第一個獲取鎖的線程持有鎖的數量 private transient int firstReaderHoldCount; Sync() { readHolds = new ThreadLocalHoldCounter(); // 建立 ThreadLocalHoldCounter 實例 setState(getState()); // ensures visibility of readHolds }
方法摘要:
// 獲取讀取鎖。 void lock() // 獲取讀取鎖,除非當前線程被中斷。 void lockInterruptibly() // 由於 ReadLocks 不支持條件,因此將拋出 UnsupportedOperationException。 Condition newCondition() // 返回標識此鎖及其鎖狀態的字符串。 String toString() // 僅當寫入鎖在調用期間未被另外一個線程保持時獲取讀取鎖。 boolean tryLock() // 若是另外一個線程在給定的等待時間內沒有保持寫入鎖,而且當前線程未被中斷,則獲取讀取鎖。 boolean tryLock(long timeout, TimeUnit unit) // 試圖釋放此鎖。 void unlock()
獲取讀鎖。
即,跟寫鎖互斥。
java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#lock
/** * Acquires the read lock. * * <p>Acquires the read lock if the write lock is not held by * another thread and returns immediately. * * <p>If the write lock is held by another thread then * the current thread becomes disabled for thread scheduling * purposes and lies dormant until the read lock has been acquired. */ public void lock() { sync.acquireShared(1); }
共享模式下獲取鎖/資源,無視中斷。
代碼流程:
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireShared
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
其中,AQS#tryAcquireShared 返回值:
在 ReentrantReadWriteLock.ReadLock 中,tryAcquireShared 只會返回 1 和 -1.
當讀鎖獲取失敗時(其餘線程正在寫,或者不符合公平性策略)返回 -1,此時纔會執行 doAcquireShared 進入排隊等待。
注意,當讀鎖的計數達到最大值,後續再獲取讀鎖會拋出異常 throw new Error("Maximum lock count exceeded")
,而不是排隊等待。
代碼流程:
公平/非公平規則:
獲取鎖成功後,須要記錄當前線程持有數的數量:
java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryAcquireShared
protected final int tryAcquireShared(int unused) { /* * Walkthrough: * 1. If write lock held by another thread, fail. * 2. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 3. If step 2 fails either because thread * apparently not eligible or CAS fails or count * saturated, chain to version with full retry loop. */ Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) // 若是其餘線程持有寫鎖,則當前線程進入阻塞(寫讀互斥) // 若是當前線程持有寫鎖,則容許再次獲取讀鎖(支持鎖降級) return -1; int r = sharedCount(c); // 讀鎖被獲取的次數 if (!readerShouldBlock() && // 公平性策略校驗,若不須要阻塞則進入下一步 r < MAX_COUNT && // 讀鎖數量沒有超限制 compareAndSetState(c, c + SHARED_UNIT)) { // CAS 獲取讀鎖(高16位加1) if (r == 0) { // 校驗當前線程是不是 firstReader,並累計 firstReaderHoldCount firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { // 進入這裏,說明當前不是首個獲取鎖的線程,使用 HoldCounter 記錄當前線程id和持有鎖的數量 HoldCounter rh = cachedHoldCounter; // 先查緩存 cachedHoldCounter if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); // 緩存沒有命中,從 ThreadLocal 中獲取,並更新緩存 else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; // 獲取鎖成功,持有讀鎖數量1 } return fullTryAcquireShared(current); // 一次獲取讀鎖失敗後,嘗試循環獲取 }
在 tryAcquireShared 中經行了一次快速獲取讀鎖,可是 CAS 只能容許一個線程獲取鎖成功,而讀鎖是共享的,能夠同時容許多個線程獲取。所以須要調用 fullTryAcquireShared 執行完整版的獲取鎖的邏輯。
關注不一樣的地方:
也就是說,重入讀的狀況下,若是鎖是可獲取狀態,不會讓出鎖給寫線程。
java.util.concurrent.locks.ReentrantReadWriteLock.Sync#fullTryAcquireShared
/** * Full version of acquire for reads, that handles CAS misses * and reentrant reads not dealt with in tryAcquireShared. */ final int fullTryAcquireShared(Thread current) { /* * This code is in part redundant with that in * tryAcquireShared but is simpler overall by not * complicating tryAcquireShared with interactions between * retries and lazily reading hold counts. */ HoldCounter rh = null; for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { /** * 若是是其餘線程獲取了寫鎖,那麼把當前線程阻塞; * 若是是當前線程獲取了寫鎖,不阻塞,不然會形成死鎖。 * 從這裏能夠看到 ReentrantReadWriteLock 容許鎖降級。 */ if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } else if (readerShouldBlock()) { /** * 非公平模式下,進入這裏說明,同步隊列的頭結點的後繼有一個競爭寫鎖的線程。 * 因此這裏有一個鎖讓步的操做,即讓寫鎖先獲取。 * 1. 若是知足 firstReader == current 或者 rh.count > 0 說明是重入的讀。 * 不須要讓步給寫線程,不然會致使死鎖。 * 2. 若是 rh.count == 0 就說明,這個線程是第一次獲取讀鎖。 * 爲了防止寫飢餓,直接將當前線程放入同步隊列排隊等待。 */ // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // 說明不是第一次讀(而是重入的讀),不須要讓步給寫線程 // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); // 當前線程不持有讀鎖,移除計數 } } if (rh.count == 0) // 說明是第一次讀,須要讓步給寫線程 return -1; // 返回-1,後續進入同步隊列中排隊等待鎖 } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { // CAS 獲取讀鎖(高16位加1) if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
當前線程若是持有寫鎖,容許獲取讀鎖,即支持鎖降級。
爲何要支持鎖降級呢?主要是爲了防止出現死鎖。
假設不支持鎖降級,當持有寫鎖的線程須要獲取讀鎖時,只能進入阻塞等待鎖變爲可讀。
可是隻有當沒有線程持有寫鎖時,讀鎖纔可以被獲取(寫讀互斥),致使讀鎖一直沒法獲取,此時出現死鎖。
爲何重入讀鎖時,不須要讓出鎖呢?一樣是爲了防止出現死鎖。
假設重入讀鎖須要讓給等待寫鎖的線程,則重入讀的線程進入阻塞。
可是隻有當沒有線程持有讀鎖時,寫鎖纔可以被獲取(讀寫互斥),致使寫鎖一直沒法被獲取,此時出現死鎖。
僅當寫鎖在調用期間未被另外一個線程保持時,獲取讀鎖。
java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#tryLock()
/** * Acquires the read lock only if the write lock is not held by * another thread at the time of invocation. */ public boolean tryLock() { return sync.tryReadLock(); }
代碼流程:
跟 tryAcquireShared 最大的區別沒有調用 readerShouldBlock,所以可能會破壞公平規則,或形成寫飢餓。
注意:
即便已將此鎖設置爲使用公平排序策略,可是調用 tryLock() 仍將當即獲取讀鎖(若是有可用的),無論其餘線程當前是否正在等待該讀鎖。在某些狀況下,此「闖入」行爲可能頗有用,即便它會打破公平性也如此。若是但願遵照此鎖的公平設置,則使用 tryLock(0, TimeUnit.SECONDS)
,它幾乎是等效的(它也檢測中斷)。
java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryReadLock
/** * Performs tryLock for read, enabling barging in both modes. * This is identical in effect to tryAcquireShared except for * lack of calls to readerShouldBlock. */ final boolean tryReadLock() { Thread current = Thread.currentThread(); for (;;) { int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) // 其餘線程持有寫鎖 return false; int r = sharedCount(c); if (r == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { // 自旋中 CAS 獲取讀鎖(高16位加1) if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return true; } } }
釋放讀鎖。
java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#unlock
/** * Attempts to release this lock. * * <p>If the number of readers is now zero then the lock * is made available for write lock attempts. */ public void unlock() { sync.releaseShared(1); }
共享模式下釋放鎖/資源。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
在 ReentrantReadWriteLock 中,只有當所有讀鎖都釋放以後,state == 0 時纔會執行 doReleaseShared,喚醒同步隊列中等待着的寫線程。
代碼流程:
能夠看到,釋放讀鎖不會與其餘行爲互斥。
java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryReleaseShared
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // 對當前線程持有鎖的數量進行自減 // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); // 當前線程沒有持有鎖,報錯 } --rh.count; } for (;;) { // 自旋 CAS 更新 state(確保多個線程可以併發釋放鎖) int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; // 返回true,說明當前沒有線程持有鎖,對於等待寫入的線程來講,能夠發起鎖請求 } }
因爲 AQS 中的 Condition 實現不支持共享鎖,所以 ReentrantReadWriteLock 中的讀鎖不支持使用 Condition。
爲何要設計成共享鎖不支持使用 Condition 呢?
Java 官方的解釋:
Read locks are held independently of write locks, so are not checked or affected. However it is essentially always an error to invoke a condition waiting method when the current thread has also acquired read locks, since other threads that could unblock it will not be able to acquire the write lock.
java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#newCondition
/** * Throws {@code UnsupportedOperationException} because * {@code ReadLocks} do not support conditions. * * @throws UnsupportedOperationException always */ public Condition newCondition() { throw new UnsupportedOperationException(); }
方法摘要:
// 查詢當前線程保持寫入鎖的數量。 int getHoldCount() // 查詢此寫入鎖是否由當前線程保持。 boolean isHeldByCurrentThread() // 獲取寫入鎖。 void lock() // 獲取寫入鎖,除非當前線程被中斷。 void lockInterruptibly() // 返回一個用來與此 Lock 實例一塊兒使用的 Condition 實例。 Condition newCondition() // 返回標識此鎖及其鎖狀態的字符串。 String toString() // 僅當寫入鎖在調用期間未被另外一個線程保持時獲取該鎖。 boolean tryLock() // 若是另外一個線程在給定的等待時間內沒有保持寫入鎖,而且當前線程未被中斷,則獲取寫入鎖。 boolean tryLock(long timeout, TimeUnit unit) // 試圖釋放此鎖。 void unlock()
獲取寫鎖。
即,跟讀鎖、寫鎖都互斥。
java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#lock
/** * Acquires the write lock. * * <p>Acquires the write lock if neither the read nor write lock * are held by another thread * and returns immediately, setting the write lock hold count to * one. * * <p>If the current thread already holds the write lock then the * hold count is incremented by one and the method returns * immediately. * * <p>If the lock is held by another thread then the current * thread becomes disabled for thread scheduling purposes and * lies dormant until the write lock has been acquired, at which * time the write lock hold count is set to one. */ public void lock() { sync.acquire(1); }
AQS 中的 acquire 方法中,只有 tryAcquire 方法須要子類來實現。
代碼流程:
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
注意,當寫鎖的計數達到最大值,後續再獲取寫鎖會拋出異常 throw new Error("Maximum lock count exceeded")
,而不是排隊等待。
代碼流程:
注意,當前線程持有讀鎖後,沒法再獲取寫鎖,即不支持鎖升級。
爲何不支持鎖升級?
java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryAcquire
protected final boolean tryAcquire(int acquires) { /* * Walkthrough: * 1. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */ Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // 鎖已經被線程持有,須要進一步區分讀、寫鎖,是否當前線程持有 // (Note: if c != 0 and w == 0 then shared count != 0) // 只有讀鎖(讀寫互斥。注意當前線程持有讀鎖以後,也沒法獲取寫鎖,不支持鎖升級) // 或者持有寫鎖的不是當前線程,則沒法獲取寫鎖 if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire // 進入這裏,說明當前線程已持有寫鎖,是重入獲取 setState(c + acquires); // 更新 state,無需使用 CAS return true; } // 進入這裏,說明鎖未被持有(可是同步隊列中可能有線程在等待) if (writerShouldBlock() || // 校驗公平性策略看可否獲取寫鎖 !compareAndSetState(c, c + acquires)) // 可以獲取寫鎖,則 CAS 獲取(低16位加1) return false; setExclusiveOwnerThread(current); // 獲取寫鎖成功,記錄持有寫鎖的是當前線程 return true; }
僅當寫鎖在調用期間未被另外一個線程保持時,獲取寫鎖。
java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#tryLock()
public boolean tryLock( ) { return sync.tryWriteLock(); }
代碼流程:
跟 Sync#tryAcquire 最大的區別沒有調用 writerShouldBlock,所以可能會破壞公平規則。
若是但願遵照此鎖的公平設置,則使用 tryLock(0, TimeUnit.SECONDS)
,它幾乎是等效的(它也檢測中斷)。
因爲讀鎖是共享的,tryReadLock 中須要自旋進行 CAS 獲取鎖,而 tryWriteLock 中不用進行自旋,只 CAS 獲取一次。
java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryWriteLock
/** * Performs tryLock for write, enabling barging in both modes. * This is identical in effect to tryAcquire except for lack * of calls to writerShouldBlock. */ final boolean tryWriteLock() { // 僅當寫入鎖在調用期間未被另外一個線程保持時獲取該鎖 Thread current = Thread.currentThread(); int c = getState(); if (c != 0) { int w = exclusiveCount(c); if (w == 0 || current != getExclusiveOwnerThread()) // 只有讀鎖(讀寫互斥);或者持有寫鎖的不是當前線程 return false; if (w == MAX_COUNT) throw new Error("Maximum lock count exceeded"); } if (!compareAndSetState(c, c + 1)) return false; setExclusiveOwnerThread(current); return true; }
試圖釋放此鎖。
java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#unlock
public void unlock() { sync.release(1); }
獨佔模式下釋放鎖。
java.util.concurrent.locks.AbstractQueuedSynchronizer#release
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
在 ReentrantReadWriteLock 中,只有當所有寫鎖都釋放以後,exclusiveCount == 0 時纔會執行 unparkSuccessor,喚醒同步隊列中等待着的讀、寫線程。
代碼流程:
java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryRelease
protected final boolean tryRelease(int releases) { // 釋放獨佔鎖-寫鎖 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; // 更新 state,無需 CAS boolean free = exclusiveCount(nextc) == 0; // 檢查鎖是否已所有釋放 if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
返回一個用來與此 Lock 實例一塊兒使用的 Condition 實例。
java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#newCondition
public Condition newCondition() { return sync.newCondition(); }
在使用內置監視器鎖時,返回的 Condition 實例支持與 Object 的監視器方法(wait、notify 和 notifyAll)相同的用法。
JDK 官方示例,展現瞭如何利用重入來執行升級緩存後的鎖降級(爲簡單起見,省略了異常處理):
class CachedData { Object data; volatile boolean cacheValid; ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() { rwl.readLock().lock(); if (!cacheValid) { // Must release read lock before acquiring write lock rwl.readLock().unlock(); rwl.writeLock().lock(); // Recheck state because another thread might have acquired // write lock and changed state before we did. if (!cacheValid) { data = ... cacheValid = true; } // Downgrade by acquiring read lock before releasing write lock rwl.readLock().lock(); // Unlock write, still hold read rwl.writeLock().unlock(); } use(data); rwl.readLock().unlock(); } }
在使用某些種類的 Collection 時,可使用 ReentrantReadWriteLock 來提升併發性。
一般,在預期 collection 很大,讀取者線程訪問它的次數多於寫入者線程,而且 entail 操做的開銷高於同步開銷時,這很值得一試。
例如,如下是一個使用 TreeMap 的類,預期它很大,而且能被同時訪問。
class RWDictionary { private final Map<String, Data> m = new TreeMap<String, Data>(); private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private final Lock r = rwl.readLock(); private final Lock w = rwl.writeLock(); public Data get(String key) { r.lock(); try { return m.get(key); } finally { r.unlock(); } } public String[] allKeys() { r.lock(); try { return m.keySet().toArray(); } finally { r.unlock(); } } public Data put(String key, Data value) { w.lock(); try { return m.put(key, value); } finally { w.unlock(); } } public void clear() { w.lock(); try { m.clear(); } finally { w.unlock(); } } }
讀讀不互斥,讀寫、寫讀、寫寫都是互斥的。
ReentrantReadWriteLock 不會按讀取者優先或寫入者優先的順序,來對鎖的訪問進行排序。可是,它確實支持可選的公平性策略。
非公平模式(默認):
獲取寫鎖能夠當即獲取,無需排隊;獲取讀鎖以前,若判斷等待時間最長的是寫線程,則非重入的讀線程需進入阻塞。
連續競爭的非公平鎖可能無限期地推遲一個或多個 reader 或 writer 線程,但吞吐量一般要高於公平鎖。
公平模式:
無論獲取讀鎖仍是寫鎖,都要嚴格按照前後順序排隊獲取。
注意,讀鎖是共享的,只要同步隊列中沒有等待的線程,讀鎖能夠被同時獲取,一旦同步隊列中具備線程在等待,後續的非重入的讀線程只能入隊等待。
此外,ReentrantReadWriteLock.ReadLock.tryLock() 和 ReentrantReadWriteLock.WriteLock.tryLock() 方法不會遵照此公平設置。
重入規則能夠看做是對公平性策略的一種修正,即便同步隊列中存在等待線程時,已持有鎖的線程能夠重入獲取鎖,無需讓步。
讀鎖的最大可重入次數是 65535,寫鎖的最大可重入次數一樣是 65535。
超過最大可重入次數,直接拋異常。
重入還容許從寫鎖降級爲讀鎖,其實現方式是:先獲取寫鎖,而後獲取讀鎖,最後釋放寫鎖、讀鎖。
可是,從讀鎖升級到寫鎖是不可能的。
寫鎖提供了一個 Condition 實現,讀鎖不支持 Condition。
讀鎖釋放:當鎖被多個讀線程持有時,只有所有讀鎖都釋放了,纔會喚醒同步隊列中等待着的節點,此時的等待節點是寫線程。
寫鎖釋放:當鎖被單個寫線程持有時,只有所有寫鎖都釋放了,纔會喚醒同步隊列中等待着的節點,該節點多是寫線程或讀線程。
讀鎖獲取:當同步隊列中的讀線程成功獲取鎖後,會喚醒隊列中的下一個共享節點(讀線程),再由下一個共享節點獲取鎖後喚醒下下個共享節點(見 AQS#setHeadAndPropagate)。
相關閱讀:
閱讀 JDK 源碼:AQS 中的獨佔模式
閱讀 JDK 源碼:AQS 中的共享模式
閱讀 JDK 源碼:AQS 對 Condition 的實現
閱讀 JDK 源碼:可重入鎖 ReentrantLock