Java中的ReadWriteLock是什麼?java
通常而言,讀寫鎖是用來提高併發程序性能的鎖分離技術的成果,Java中的ReadWriteLock是Java5中新增的一個接口,提供了readLock和writeLock兩種鎖機制。數據庫
一個ReadWriteLock維護一對關聯的鎖,一個用於只讀操做,一個用於寫;在沒有寫線程的狀況下,一個讀鎖可能會同時被多個讀線程持有,寫鎖是獨佔的。緩存
咱們來看一下ReadWriteLock的源碼:數據結構
public interface ReadWriteLock{ Lock readLock(); Lock writeLock(); }
解讀:多線程
從源碼上面咱們能夠看出來ReadWriteLock並非Lock的子接口,只不過ReadWriteLock藉助Lock來實現讀寫兩個鎖並存、互斥的操做機制。併發
在ReadWriteLock中每次讀取共享數據時須要讀取鎖,當修改共享數據時須要寫入鎖,這看起來好像是兩個鎖,可是並不是如此。app
ReentrantReadWriteLock是ReadWriteLock在java.util裏面惟一的實現類,主要使用場景是當有不少線程都從某個數據結構中讀取數據,而不多有線程對其進行修改。ide
ReadLock和WriteLock單獨使用的狀況函數
package demo.thread; import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.ReentrantReadWriteLock; public class ReadWriteLockDemo { public static void main(String[] args) { final Count ct = new Count(); for (int i = 0; i < 2; i++) { new Thread() { @Override public void run() { ct.get(); } }.start(); } for (int i = 0; i < 2; i++) { new Thread() { @Override public void run() { ct.put(); } }.start(); } } } class Count { private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); public void get() { rwl.readLock().lock();// 上讀鎖,其餘線程只能讀不能寫,具備高併發性 try { System.out.println(Thread.currentThread().getName() + " read start."); Thread.sleep(1000L);// 模擬幹活 System.out.println(Thread.currentThread().getName() + "read end."); } catch (InterruptedException e) { e.printStackTrace(); } finally { rwl.readLock().unlock(); // 釋放寫鎖,最好放在finnaly裏面 } } public void put() { rwl.writeLock().lock();// 上寫鎖,具備阻塞性 try { System.out.println(Thread.currentThread().getName() + " write start."); Thread.sleep(1000L);// 模擬幹活 System.out.println(Thread.currentThread().getName() + "write end."); } catch (InterruptedException e) { e.printStackTrace(); } finally { rwl.writeLock().unlock(); // 釋放寫鎖,最好放在finnaly裏面 } } }
運行結果以下:高併發
Thread-1 read start. Thread-0 read start. Thread-1read end. Thread-0read end. Thread-3 write start. Thread-3write end. Thread-2 write start. Thread-2write end.
從結果上面能夠看的出來,讀的時候是併發的,寫的時候是有順序的帶阻塞機制的
ReadLock和WriteLock的複雜使用狀況,模擬一個有讀寫數據的場景
private final Map<String, Object> map = new HashMap<String, Object>();// 假設這裏面存了數據緩存 private final ReentrantReadWriteLock rwlock = new ReentrantReadWriteLock(); public Object readWrite(String id) { Object value = null; rwlock.readLock().lock();// 首先開啓讀鎖,從緩存中去取 try { value = map.get(id); if (value == null) { // 若是緩存中沒有數據,釋放讀鎖,上寫鎖 rwlock.readLock().unlock(); rwlock.writeLock().lock(); try { if (value == null) { value = "aaa"; // 此時能夠去數據庫中查找,這裏簡單的模擬一下 } } finally { rwlock.writeLock().unlock(); // 釋放寫鎖 } rwlock.readLock().lock(); // 而後再上讀鎖 } } finally { rwlock.readLock().unlock(); // 最後釋放讀鎖 } return value; }
解讀:
請必定要注意讀寫鎖的獲取與釋放順序。
類圖以下:
其構造函數以下:
/** * Creates a new {@code ReentrantReadWriteLock} with * default (nonfair) ordering properties. */ public ReentrantReadWriteLock() { this(false); } /** * Creates a new {@code ReentrantReadWriteLock} with * the given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */ public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }
解讀:
讀寫鎖的內部維護了一個 ReadLock 和一個 WriteLock,它們依賴 Sync 實現具體功能;Sync繼承自 AQS,提供了公平和非公平的實現。
重點說明:
AQS 中只維護了一個 state 狀態,而 ReentrantReadWriteLock 須要維護讀狀態和寫狀態,一個 state 怎麼表示寫和讀兩種狀態呢?
ReentrantReadWriteLock 巧妙地使用 state 的高 16 位表示讀狀態,也即獲取到讀鎖的次數;使用低 16 位表示獲取到寫鎖的線程的可重入次數。
寫鎖使用 WriteLock 來實現
對應代碼以下:
/** * Acquires the write lock. * * <p>Acquires the write lock if neither the read nor write lock * are held by another thread * and returns immediately, setting the write lock hold count to * one. * * <p>If the current thread already holds the write lock then the * hold count is incremented by one and the method returns * immediately. * * <p>If the lock is held by another thread then the current * thread becomes disabled for thread scheduling purposes and * lies dormant until the write lock has been acquired, at which * time the write lock hold count is set to one. */ public void lock() { sync.acquire(1); }
解讀:
相似於ReentrantLock的實現,其實是調用了AbstractQueuedSynchronizer的acquire方法,代碼以下:
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
重點關注子類中tryAcquire的實現
從上圖可知,tryAcquire方法定義在Sync中,具體代碼以下:
@ReservedStackAccess protected final boolean tryAcquire(int acquires) { /* * Walkthrough: * 1. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */ Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
解讀:
寫鎖是個獨佔鎖,某個時刻只有一個線程能夠獲取該鎖。
若是當前沒有線程獲取到讀鎖和寫鎖,則當前線程能夠獲取到寫鎖而後返回;若是當前己經有線程獲取到讀鎖或者寫鎖,則當前請求寫鎖的線程會被阻塞掛起。
對應上述代碼分支:
(1)AQS狀態值不爲0
(2)AQS狀態值爲0
說明目前沒有線程獲取到讀鎖和寫鎖,對於 writerShouldBlock 方法,由Sync子類實現,公平鎖和非公平鎖的具體實現不同。
/** * Nonfair version of Sync */ static final class NonfairSync extends Sync { private static final long serialVersionUID = -8159625535654395037L; final boolean writerShouldBlock() { return false; // writers can always barge } final boolean readerShouldBlock() { /* As a heuristic to avoid indefinite writer starvation, * block if the thread that momentarily appears to be head * of queue, if one exists, is a waiting writer. This is * only a probabilistic effect since a new reader will not * block if there is a waiting writer behind other enabled * readers that have not yet drained from the queue. */ return apparentlyFirstQueuedIsExclusive(); } }
解讀:
對於非公平鎖來講老是返回false,故若是搶佔式執行CAS嘗試獲取寫鎖成功則設置當前鎖的持有者爲當前線程並返回 true,不然返回 false。
/** * Fair version of Sync */ static final class FairSync extends Sync { private static final long serialVersionUID = -2274990926593161451L; final boolean writerShouldBlock() { return hasQueuedPredecessors(); } final boolean readerShouldBlock() { return hasQueuedPredecessors(); } }
解讀:
對於公平鎖,其使用 hasQueuedPredecessors來判斷當前線程節點是否有前驅節點,若是有則當前線程放棄獲取寫鎖的權限,直接返回 false。
Note:
寫鎖是可重入鎖,若是當前線程己經獲取了該鎖,再次獲取時則只是簡單地把可重入次數加 1 後直接返回。
/** * Attempts to release this lock. * * <p>If the current thread is the holder of this lock then * the hold count is decremented. If the hold count is now * zero then the lock is released. If the current thread is * not the holder of this lock then {@link * IllegalMonitorStateException} is thrown. * * @throws IllegalMonitorStateException if the current thread does not * hold this lock */ public void unlock() { sync.release(1); }
解讀:
相似於ReentrantLock的實現,其實是調用了AbstractQueuedSynchronizer的release方法,而子類Sync只須要實現tryRelease方法便可,對應方法以下:
/* * Note that tryRelease and tryAcquire can be called by * Conditions. So it is possible that their arguments contain * both read and write holds that are all released during a * condition wait and re-established in tryAcquire. */ @ReservedStackAccess protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
解讀:
若是當前線程持有鎖,調用unlock方法會讓持有的 AQS 狀態值減 1,若減 1 後當前狀態值爲 0,則當前線程會釋放鎖,不然僅僅執行減 1。
若是當前線程沒有持有鎖,則調用該方法會拋出 IllegalMonitorStateException異常。
讀鎖是使用ReadLock來實現的。
對應代碼以下:
/** * Acquires the read lock. * * <p>Acquires the read lock if the write lock is not held by * another thread and returns immediately. * * <p>If the write lock is held by another thread then * the current thread becomes disabled for thread scheduling * purposes and lies dormant until the read lock has been acquired. */ public void lock() { sync.acquireShared(1); }
解讀:
相似於ReentrantLock的實現,其實是調用了AbstractQueuedSynchronizer的acquireShared方法,代碼以下:
/** * Acquires in shared mode, ignoring interrupts. Implemented by * first invoking at least once {@link #tryAcquireShared}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquireShared} until success. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquireShared} but is otherwise uninterpreted * and can represent anything you like. */ public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
重點關注子類中tryAcquireShared的實現,即查看Sync中tryAcquireShared方法:
@ReservedStackAccess protected final int tryAcquireShared(int unused) { /* * Walkthrough: * 1. If write lock held by another thread, fail. * 2. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 3. If step 2 fails either because thread * apparently not eligible or CAS fails or count * saturated, chain to version with full retry loop. */ Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != LockSupport.getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }
解讀:
若是當前沒有其餘線程持有寫鎖,則當前線程能夠獲取讀鎖,AQS 的狀態值 state的高16位的值會增長 1,而後方法返回;
若是某個線程持有寫鎖,則當前線程會被阻塞。
對應上述代碼分支:
(1)判斷寫鎖是否被佔用,若是是則直接返回 -1,然後調用 AQS 的 doAcquireShared方法把當前線程放入 AQS 阻塞隊列。
(2)變量 r 是獲取到讀鎖的個數
(3)readerShouldBlock方法,由Sync子類實現,公平鎖和非公平鎖的具體實現不同;此處不展開描述,能夠查看本文前面NonfairSync 和 FairSync的實現代碼。
Note:
若是當前要獲取讀鎖的線程己經持有了寫鎖,則也能夠獲取讀鎖;當其處理完事情,須要把讀鎖和寫鎖都釋放掉,不能只釋放寫鎖。
對應代碼以下:
/** * Attempts to release this lock. * * <p>If the number of readers is now zero then the lock * is made available for write lock attempts. If the current * thread does not hold this lock then {@link * IllegalMonitorStateException} is thrown. * * @throws IllegalMonitorStateException if the current thread * does not hold this lock */ public void unlock() { sync.releaseShared(1); }
解讀:
相似於ReentrantLock的實現,其實是調用了AbstractQueuedSynchronizer的releaseShared方法
/** * Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true. * * @param arg the release argument. This value is conveyed to * {@link #tryReleaseShared} but is otherwise uninterpreted * and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} */ public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
子類Sync只須要實現tryReleaseShared方法便可,對應方法以下:
@ReservedStackAccess protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != LockSupport.getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }
解讀:
在無限循環裏面,首先獲取當前 AQS 狀態值並將其保存到變量 c,而後變量 c 減去一個讀計數單位後使用 CAS 操做更新 AQS 狀態值。
若是更新成功則查看當前 AQS 狀態值
Note:
關於無限循環的解釋——若是 CAS 更新 AQS 狀態值失敗,則自旋重試直到成功。
ReentrantReadWriteLock與ReentrantLock的比較:
(1)相同點:都是一種顯式鎖,手動加鎖和解鎖,都很適合高併發場景
(2)不一樣點:ReentrantLock 是獨佔鎖,ReentrantReadWriteLock是對ReentrantLock的複雜擴展,能適合更復雜的業務場景,ReentrantReadWriteLock能夠實現一個方法中讀寫分離的鎖機制。