1.讀寫鎖的介紹java
在併發場景中用於解決線程安全的問題,咱們幾乎會高頻率的使用到獨佔式鎖,一般使用java提供的關鍵字synchronized或者concurrents包中實現了Lock接口的。它們都是獨佔式獲取鎖,也就是在同一時刻只有一個線程可以獲取鎖。而在一些業務場景中,大部分只是讀數據,寫數據不多,若是僅僅是讀數據的話並不會影響數據正確性(出現髒讀),而若是在這種業務場景下,依然使用獨佔鎖的話,很顯然這將是出現性能瓶頸的地方。安全
針對這種讀多寫少的狀況,java還提供了另一個實現Lock接口的ReentrantReadWriteLock(讀寫鎖)。讀寫所容許同一時刻被多個讀線程訪問,可是在寫線程訪問時,全部的讀線程和其餘的寫線程都會被阻塞。在分析WirteLock和ReadLock的互斥性時能夠按照WriteLock與WriteLock之間,WriteLock與ReadLock之間以及ReadLock與ReadLock之間進行分析。更多關於讀寫鎖特性介紹你們能夠看源碼上的介紹(閱讀源碼時最好的一種學習方式,我也正在學習中,與你們共勉),這裏作一個概括總結:併發
公平性選擇:支持非公平性(默認)和公平的鎖獲取方式,吞吐量仍是非公平優於公平;app
重入性:支持重入,讀鎖獲取後能再次獲取,寫鎖獲取以後可以再次獲取寫鎖,同時也可以獲取讀鎖;分佈式
鎖降級:遵循獲取寫鎖,獲取讀鎖再釋放寫鎖的次序,寫鎖可以降級成爲讀鎖ide
要想可以完全的理解讀寫鎖必須可以理解這樣幾個問題:1. 讀寫鎖是怎樣實現分別記錄讀寫狀態的?2. 寫鎖是怎樣獲取和釋放的?3.讀鎖是怎樣獲取和釋放的?咱們帶着這樣的三個問題,再去了解下讀寫鎖。高併發
同步組件的實現聚合了同步器(AQS),並經過重寫重寫同步器(AQS)中的方法實現同步組件的同步語義,AQS的底層實現分析能夠。所以,寫鎖的實現依然也是採用這種方式。在同一時刻寫鎖是不能被多個線程所獲取,很顯然寫鎖是獨佔式鎖,而實現寫鎖的同步語義是經過重寫AQS中的tryAcquire方法實現的。源碼爲:oop
protected final boolean tryAcquire(int acquires) { /* * Walkthrough: * 1\. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 2\. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3\. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */ Thread current = Thread.currentThread(); // 1\. 獲取寫鎖當前的同步狀態 int c = getState(); // 2\. 獲取寫鎖獲取的次數 int w = exclusiveCount(c); if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) // 3.1 當讀鎖已被讀線程獲取或者當前線程不是已經獲取寫鎖的線程的話 // 當前線程獲取寫鎖失敗 if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire // 3.2 當前線程獲取寫鎖,支持可重複加鎖 setState(c + acquires); return true; } // 3.3 寫鎖未被任何線程獲取,當前線程可獲取寫鎖 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
這段代碼的邏輯請看註釋,這裏有一個地方須要重點關注,exclusiveCount(c)方法,該方法源碼爲:post
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
其中EXCLUSIVE_MASK爲: static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
EXCLUSIVE _MASK爲1左移16位而後減1,即爲0x0000FFFF。而exclusiveCount方法是將同步狀態(state爲int類型)與0x0000FFFF相與,即取同步狀態的低16位。那麼低16位表明什麼呢?根據exclusiveCount方法的註釋爲獨佔式獲取的次數即寫鎖被獲取的次數,如今就能夠得出來一個結論同步狀態的低16位用來表示寫鎖的獲取次數。同時還有一個方法值得咱們注意:性能
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
該方法是獲取讀鎖被獲取的次數,是將同步狀態(int c)右移16次,即取同步狀態的高16位,如今咱們能夠得出另一個結論同步狀態的高16位用來表示讀鎖被獲取的次數。如今還記得咱們開篇說的須要弄懂的第一個問題嗎?讀寫鎖是怎樣實現分別記錄讀鎖和寫鎖的狀態的,如今這個問題的答案就已經被咱們弄清楚了,其示意圖以下圖所示:
如今咱們回過頭來看寫鎖獲取方法tryAcquire,其主要邏輯爲:當讀鎖已經被讀線程獲取或者寫鎖已經被其餘寫線程獲取,則寫鎖獲取失敗;不然,獲取成功並支持重入,增長寫狀態。
寫鎖釋放經過重寫AQS的tryRelease方法,源碼爲:
protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //1\. 同步狀態減去寫狀態 int nextc = getState() - releases; //2\. 當前寫狀態是否爲0,爲0則釋放寫鎖 boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); //3\. 不爲0則更新同步狀態 setState(nextc); return free; }
源碼的實現邏輯請看註釋,不難理解與ReentrantLock基本一致,這裏須要注意的是,減小寫狀態int nextc = getState() - releases;
只須要用當前同步狀態直接減去寫狀態的緣由正是咱們剛纔所說的寫狀態是由同步狀態的低16位表示的。
看完了寫鎖,如今來看看讀鎖,讀鎖不是獨佔式鎖,即同一時刻該鎖能夠被多個讀線程獲取也就是一種共享式鎖。按照以前對AQS介紹,實現共享式同步組件的同步語義須要經過重寫AQS的tryAcquireShared方法和tryReleaseShared方法。讀鎖的獲取實現方法爲:
protected final int tryAcquireShared(int unused) { /* * Walkthrough: * 1\. If write lock held by another thread, fail. * 2\. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 3\. If step 2 fails either because thread * apparently not eligible or CAS fails or count * saturated, chain to version with full retry loop. */ Thread current = Thread.currentThread(); int c = getState(); //1\. 若是寫鎖已經被獲取而且獲取寫鎖的線程不是當前線程的話,當前 // 線程獲取讀鎖失敗返回-1 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && //2\. 當前線程獲取讀鎖 compareAndSetState(c, c + SHARED_UNIT)) { //3\. 下面的代碼主要是新增的一些功能,好比getReadHoldCount()方法 //返回當前獲取讀鎖的次數 if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } //4\. 處理在第二步中CAS操做失敗的自旋已經實現重入性 return fullTryAcquireShared(current); }
代碼的邏輯請看註釋,須要注意的是 當寫鎖被其餘線程獲取後,讀鎖獲取失敗,不然獲取成功利用CAS更新同步狀態。另外,當前同步狀態須要加上SHARED_UNIT((1 << SHARED_SHIFT)
即0x00010000)的緣由這是咱們在上面所說的同步狀態的高16位用來表示讀鎖被獲取的次數。若是CAS失敗或者已經獲取讀鎖的線程再次獲取讀鎖時,是靠fullTryAcquireShared方法實現的,這段代碼就不展開說了,有興趣能夠看看。
讀鎖釋放的實現主要經過方法tryReleaseShared,源碼以下,主要邏輯請看註釋:
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); // 前面仍是爲了實現getReadHoldCount等新功能 if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); // 讀鎖釋放 將同步狀態減去讀狀態便可 int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }
讀寫鎖支持鎖降級,遵循按照獲取寫鎖,獲取讀鎖再釋放寫鎖的次序,寫鎖可以降級成爲讀鎖,不支持鎖升級,關於鎖降級下面的示例代碼摘自ReentrantWriteReadLock源碼中:
void processCachedData() { rwl.readLock().lock(); if (!cacheValid) { // Must release read lock before acquiring write lock rwl.readLock().unlock(); rwl.writeLock().lock(); try { // Recheck state because another thread might have // acquired write lock and changed state before we did. if (!cacheValid) { data = ... cacheValid = true; } // Downgrade by acquiring read lock before releasing write lock rwl.readLock().lock(); } finally { rwl.writeLock().unlock(); // Unlock write, still hold read } } try { use(data); } finally { rwl.readLock().unlock(); } } }