解決線程安全問題使用ReentrantLock就能夠了,可是ReentrantLock是獨佔鎖,某一時刻只有一個線程能夠獲取該鎖,而實際中會有寫少讀多的場景,顯然ReentrantLock知足不了這個需求,因此ReentrantReadWriteLock應運而生。ReentrantReadWriteLock採用讀寫分離的策略,容許多個線程能夠同時獲取讀鎖。html
由類圖可知,讀寫鎖內部維護了一個ReadLock和一個WriteLock,他們依賴Sync實現具體功能,而Sync繼承自AQS,而且提供了公平和非公平的實現。java
咱們先看下ReentrantReadWriteLock類的總體結構編程
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { private final ReentrantReadWriteLock.ReadLock readerLock;//讀鎖🔒 private final ReentrantReadWriteLock.WriteLock writerLock;//寫鎖🔒 final Sync sync; public ReentrantReadWriteLock() {//使用默認(非公平)的排序屬性建立一個新的ReentrantReadWriteLock this(false); } public ReentrantReadWriteLock(boolean fair) {//使用給定的公平策略建立一個新的ReentrantReadWriteLock sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }//返回用於寫入操做的鎖🔒 public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }//返回用於讀取操做的鎖🔒 abstract static class Sync extends AbstractQueuedSynchronizer { .... static final class NonfairSync extends Sync {....}//非公平策略 static final class FairSync extends Sync {.... }//公平策略 public static class ReadLock implements Lock, java.io.Serializable {}//讀鎖🔒 .... public static class WriteLock implements Lock, java.io.Serializable {}//寫鎖🔒 .... }
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {}緩存
public interface ReadWriteLock { Lock readLock(); Lock writeLock(); }
說明:能夠看到ReentrantReadWriteLock實現了ReadWriteLock接口,ReadWriteLock接口規範了讀寫鎖方法,具體操做由子類去實現,同時還實現了Serializable接口,表示能夠進行序列化操做。安全
abstract static class Sync extends AbstractQueuedSynchronizer {}
說明:Sync抽象類繼承自AQS抽象類,Sync類提供了對ReentrantReadWriteLock的支持。併發
Sync類的內部類存在兩個,分別爲HoldCounter和ThreadLocalHoldCounter,其中HoldCounter主要與讀鎖配套使用,HoldCounter源碼以下函數
static final class HoldCounter {//計數器 int count = 0;//計數 // Use id, not reference, to avoid garbage retention final long tid = getThreadId(Thread.currentThread());//線程id }
說明:HoldCounter有兩個屬性,count和tid,其中count表示某個讀線程重入次數,tid表示該線程的tid字段的值,該字段能夠用來惟一標識一個線程。源碼分析
ThreadLocalHoldCounter源碼以下:ui
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {//本地線程計數器 public HoldCounter initialValue() {//重寫初始化方法,在沒有進行set的狀況下,獲取的都是該HoldCounter的值 return new HoldCounter(); } }
說明:ThreadLocalHoldCounter重寫了ThreadLocal的initialValue方法,ThreadLocal類能夠將線程與對象相關聯。在沒用進行set的狀況下,get到的均是initialValue方法裏面生成的那個HoldCounter對象。this
abstract static class Sync extends AbstractQueuedSynchronizer { //版本序號 private static final long serialVersionUID = 6317671515068378041L; //高16位爲讀鎖🔒,低16位爲寫鎖🔒 static final int SHARED_SHIFT = 16; //讀鎖🔒單位 static final int SHARED_UNIT = (1 << SHARED_SHIFT); //讀鎖🔒的最大數量 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; //寫鎖🔒的最大數量 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; //本地線程計數器 private transient ThreadLocalHoldCounter readHolds; //緩存計數器 private transient HoldCounter cachedHoldCounter; //第一個讀線程 private transient Thread firstReader = null; //第一個讀線程的計數 private transient int firstReaderHoldCount;
說明:該屬性中包括了讀鎖,寫鎖線程的最大量。本地線程計數器等。
//構造函數 Sync() { //本地線程計數器 readHolds = new ThreadLocalHoldCounter(); //設置AQS的狀態 setState(getState()); // ensures visibility of readHolds }
說明:在Sync的構造函數中設置了本地線程計數器和AQS的狀態state。
讀寫鎖須要在同步狀態(一個整形變量)上維護多個讀線程和一個寫線程的狀態。
讀寫鎖對於同步狀態的實現是在一個整形變量上經過「按位切割使用」:將變量切割成兩部分,高16位表示讀狀態,也就是獲取到讀鎖的次數,低16位表示獲取到寫線程的可重入次數。
假設當前同步狀態值爲S,get和set的操做以下:
(1)獲取寫狀態:
S&0x0000FFFF:將高16位所有抹去
(2)獲取讀狀態:
S>>>16:無符號補0,右移16位
(3)寫狀態加1:
S+1
(4)讀狀態加1:
S+(1<<16)即S + 0x00010000
在代碼層的判斷中,若是S不等於0,當寫狀態(S&0x0000FFFF),而讀狀態(S>>>16)大於0,則表示該讀寫鎖的讀鎖已被獲取。
看下WriteLock類中的lock和unlock方法:
public void lock() {
sync.acquire(1);
}public void unlock() {
sync.release(1);
}
說明:能夠看到就是調用獨佔式同步狀態的獲取與釋放,所以真實的實現就是Sync的tryAcquire和tryRelease。
protected final boolean tryAcquire(int acquires) { //當前線程 Thread current = Thread.currentThread(); //獲取狀態 int c = getState(); //寫線程數量(即獲取獨佔鎖的重入數) int w = exclusiveCount(c); //c!=0說明讀鎖或者寫鎖已經被某線程獲取 if (c != 0) { //w=0說明已經有線程獲取了讀鎖返回false,w!=0而且當前線程不是寫鎖的擁有者,則返回false if (w == 0 || current != getExclusiveOwnerThread()) return false; //說明當前線程獲取了寫鎖,判斷可重入次數(最大次數65535) if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); //次數當前線程已經持有寫鎖,設置可重入次數 setState(c + acquires); return true; } //到這裏說明此時c=0,讀鎖和寫鎖都沒有被獲取,writerShouldBlock表示是否阻塞 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; //設置鎖位當前線程所持有 setExclusiveOwnerThread(current); return true; }
其中exclusiveCount方法表示佔有寫鎖的線程數量,源碼以下:
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
static final int SHARED_SHIFT = 16;
說明:直接將狀態state和(2^16 - 1)作與運算,其等效於將state模上2^16。寫鎖數量由state的低十六位表示。
writerShouldBlock()判斷是否須要阻塞,公平和非公平方式實現不一樣。
非公平實現方式:
在非公平策略方式下老是不會被阻塞
final boolean writerShouldBlock() { return false; // writers can always barge }
公平實現方式,在公平策略下會進行判斷,判斷同步隊列中是否有等待時間更長的線程,若存在,則須要被阻塞,不然無需阻塞。
final boolean writerShouldBlock() { return hasQueuedPredecessors(); }
public final boolean hasQueuedPredecessors() { Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
說明:若是不須要阻塞,則CAS更新同步狀態,若CAS成功則返回true,失敗則說明當前鎖🔒被別的線程搶去了,返回false,若是須要阻塞也返回false。
成功獲取寫鎖🔒後,將當前線程設置位佔有寫鎖的線程,返回true。
方法流程圖以下:
1.4.2 寫鎖的釋放,tryRelease方法:
protected final boolean tryRelease(int releases) { //若鎖的持有則不是當前線程,拋出異常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //寫鎖的新線程數 int nextc = getState() - releases; //若是獨佔模式重入數爲0了,說明獨佔模式被釋放 boolean free = exclusiveCount(nextc) == 0; if (free) //若寫鎖的新線程數爲0,則將鎖的持有則設置爲null setExclusiveOwnerThread(null); //設置寫鎖的新線程數,無論獨佔模式是否被釋放,更新獨佔重入數 setState(nextc); return free; }
說明: 寫鎖的釋放過程仍是相對而言比較簡單的:首先查看當前線程是否爲寫鎖的持有者,若是不是拋出異常。而後檢查釋放後寫鎖的線程數是否爲0,若是爲0則表示寫鎖空閒了,
釋放鎖資源將鎖的持有線程設置爲null,不然釋放僅僅只是一次重入鎖而已,並不能將寫鎖的線程清空。 此方法用於釋放寫鎖資源,首先會判斷該線程是否爲獨佔線程,若不爲獨佔線程,
則拋出異常,不然,計算釋放資源後的寫鎖的數量,若爲0,表示成功釋放,資源不將被佔用,不然,表示資源還被佔用。其方法流程圖以下。
相似於寫鎖,讀鎖的lock和unlock的實際實現對應Sync的 tryAcquireShared 和 tryReleaseShared方法。
protected final int tryAcquireShared(int unused) { //獲取當前線程 Thread current = Thread.currentThread(); //獲取狀態 int c = getState(); //若是寫鎖線程數!=0,且獨佔鎖不是當前線程則返回失敗,由於存在鎖🔒降級 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //讀鎖數量 int r = sharedCount(c); if (!readerShouldBlock() &&//讀鎖是否須要等待(公平鎖原則) r < MAX_COUNT &&//持有線程小於最大數(65535) compareAndSetState(c, c + SHARED_UNIT)) {//設置讀取鎖狀態 //r=0,表示第一個讀鎖線程,第一個讀鎖firstReader是不會加入到readHolds中 if (r == 0) { firstReader = current;//設置第一個讀線程 firstReaderHoldCount = 1;//讀線程佔用的資源數爲1 } else if (firstReader == current) {//當前線程爲第一個讀線程,表示第一個讀鎖線程重入 firstReaderHoldCount++;//佔用資源數+1 } else {//讀鎖數量不爲0,而且不爲當前線程 HoldCounter rh = cachedHoldCounter;//獲取計數器 if (rh == null || rh.tid != getThreadId(current))//計數器爲空,或者計數器的tid不爲當前正在運行的線程的tid cachedHoldCounter = rh = readHolds.get();//獲取當前線程的計數器 else if (rh.count == 0)//計數器爲0 readHolds.set(rh);//加入到readHolds中 rh.count++;//計數器+1 } return 1; } return fullTryAcquireShared(current); }
其中sharedCount方法表示佔有讀鎖的線程數量,源碼以下:
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
說明:直接將state右移16位,就能夠獲得讀鎖的線程數量,由於state的高16位表示讀鎖,對應的第十六位表示寫鎖數量。
讀鎖獲取鎖的過程比寫鎖稍微複雜些,首先判斷寫鎖是否爲0而且當前線程不佔有獨佔鎖,直接返回;不然,判斷讀線程是否須要被阻塞而且讀鎖數量是否小於最大值而且比較設置狀態成功,
若當前沒有讀鎖,則設置第一個讀線程firstReader和firstReaderHoldCount;若當前線程線程爲第一個讀線程,則增長firstReaderHoldCount;不然,將設置當前線程對應的HoldCounter對象的值。流程圖以下。
注意:更新成功後會在firstReaderHoldCount中或readHolds(ThreadLocal類型的)本地線程副本中記錄當前線程的重入數,若是當前只有一個線程的話,還不須要動用ThreadLocal,直接往firstReaderHoldCount這個成員變量裏存重入數,
當第二個線程來的時候,就要動用ThreadLocal變量的readHolds了,每一個線程擁有本身的副本,用來保存本身的重入數。
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) { // 無限循環
// 獲取狀態
int c = getState();
if (exclusiveCount(c) != 0) { // 寫線程數量不爲0
if (getExclusiveOwnerThread() != current) // 不爲當前線程
return -1;
} else if (readerShouldBlock()) { // 寫線程數量爲0而且讀線程被阻塞
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) { // 當前線程爲第一個讀線程
// assert firstReaderHoldCount > 0;
} else { // 當前線程不爲第一個讀線程
if (rh == null) { // 計數器不爲空
//
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) { // 計數器爲空或者計數器的tid不爲當前正在運行的線程的tid
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT) // 讀鎖數量爲最大值,拋出異常
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) { // 比較而且設置成功
if (sharedCount(c) == 0) { // 讀線程數量爲0
// 設置第一個讀線程
firstReader = current;
//
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
說明:在tryAcquireShared函數中,若是下列三個條件不知足(讀線程是否應該被阻塞、小於最大值、比較設置成功)則會進行fullTryAcquireShared函數中,它用來保證相關操做能夠成功。其邏輯與tryAcquireShared邏輯相似,再也不累贅。
protected final boolean tryReleaseShared(int unused) {
// 獲取當前線程
Thread current = Thread.currentThread();
if (firstReader == current) { // 當前線程爲第一個讀線程
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1) // 讀線程佔用的資源數爲1
firstReader = null;
else // 減小佔用的資源
firstReaderHoldCount--;
} else { // 當前線程不爲第一個讀線程
// 獲取緩存的計數器
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) // 計數器爲空或者計數器的tid不爲當前正在運行的線程的tid
// 獲取當前線程對應的計數器
rh = readHolds.get();
// 獲取計數
int count = rh.count;
if (count <= 1) { // 計數小於等於1
// 移除
readHolds.remove();
if (count <= 0) // 計數小於等於0,拋出異常
throw unmatchedUnlockException();
}
// 減小計數
--rh.count;
}
for (;;) { // 無限循環
// 獲取狀態
int c = getState();
// 獲取狀態
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc)) // 比較並進行設置
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
說明:此方法表示讀鎖線程釋放鎖。首先判斷當前線程是否爲第一個讀線程firstReader,如果,則判斷第一個讀線程佔有的資源數firstReaderHoldCount是否爲1,如果,則設置第一個讀線程firstReader爲空,不然,將第一個讀線程佔有的資源數firstReaderHoldCount減1;若當前線程不是第一個讀線程,那麼首先會獲取緩存計數器(上一個讀鎖線程對應的計數器 ),若計數器爲空或者tid不等於當前線程的tid值,則獲取當前線程的計數器,若是計數器的計數count小於等於1,則移除當前線程對應的計數器,若是計數器的計數count小於等於0,則拋出異常,以後再減小計數便可。不管何種狀況,都會進入無限循環,該循環能夠確保成功設置狀態state。其流程圖以下。
在讀鎖的獲取、釋放過程當中,老是會有一個對象存在着,同時該對象在獲取線程獲取讀鎖是+1,釋放讀鎖時-1,該對象就是HoldCounter。
要明白HoldCounter就要先明白讀鎖。前面提過讀鎖的內在實現機制就是共享鎖,對於共享鎖其實咱們能夠稍微的認爲它不是一個鎖的概念,它更加像一個計數器的概念。
一次共享鎖操做就至關於一次計數器的操做,獲取共享鎖計數器+1,釋放共享鎖計數器-1。只有當線程獲取共享鎖後才能對共享鎖進行釋放、重入操做。因此HoldCounter的做用就是當前線程持有共享鎖的數量,這個數量必需要與線程綁定在一塊兒,不然操做其餘線程鎖就會拋出異常。
先看讀鎖獲取鎖的部分:
if (r == 0) {//r == 0,表示第一個讀鎖線程,第一個讀鎖firstRead是不會加入到readHolds中
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {//第一個讀鎖線程重入
firstReaderHoldCount++;
} else { //非firstReader計數
HoldCounter rh = cachedHoldCounter;//readHoldCounter緩存
//rh == null 或者 rh.tid != current.getId(),須要獲取rh
if (rh == null || rh.tid != current.getId())
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh); //加入到readHolds中
rh.count++; //計數+1
}
這裏爲何要搞一個firstRead、firstReaderHoldCount呢?而不是直接使用else那段代碼?這是爲了一個效率問題,firstReader是不會放入到readHolds中的,
若是讀鎖僅有一個的狀況下就會避免查找readHolds。可能就看這個代碼還不是很理解HoldCounter。咱們先看firstReader、firstReaderHoldCount的定義:
private transient Thread firstReader = null; private transient int firstReaderHoldCount;
這兩個變量比較簡單,一個表示線程,固然該線程是一個特殊的線程,一個是firstReader的重入計數。
HoldCounter的定義:
static final class HoldCounter { int count = 0; final long tid = Thread.currentThread().getId(); }
在HoldCounter中僅有count和tid兩個變量,其中count表明着計數器,tid是線程的id。可是若是要將一個對象和線程綁定起來僅記錄tid確定不夠的,並且HoldCounter根本不能起到綁定對象的做用,只是記錄線程tid而已。
誠然,在java中,咱們知道若是要將一個線程和對象綁定在一塊兒只有ThreadLocal才能實現。因此以下:
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } }
ThreadLocalHoldCounter繼承ThreadLocal,而且重寫了initialValue方法。
故而,HoldCounter應該就是綁定線程上的一個計數器,而ThradLocalHoldCounter則是線程綁定的ThreadLocal。從上面咱們能夠看到ThreadLocal將HoldCounter綁定到當前線程上,同時HoldCounter也持有線程Id,這樣在釋放鎖的時候才能知道ReadWriteLock裏面緩存的上一個讀取線程(cachedHoldCounter)是不是當前線程。這樣作的好處是能夠減小ThreadLocal.get()的次數,由於這也是一個耗時操做。須要說明的是這樣HoldCounter綁定線程id而不綁定線程對象的緣由是避免HoldCounter和ThreadLocal互相綁定而GC難以釋放它們(儘管GC可以智能的發現這種引用而回收它們,可是這須要必定的代價),因此其實這樣作只是爲了幫助GC快速回收對象而已。
經過上面的源碼分析,咱們能夠發現一個現象:
在線程持有讀鎖的狀況下,該線程不能取得寫鎖(由於獲取寫鎖的時候,若是發現當前的讀鎖被佔用,就立刻獲取失敗,無論讀鎖是否是被當前線程持有)。
在線程持有寫鎖的狀況下,該線程能夠繼續獲取讀鎖(獲取讀鎖時若是發現寫鎖被佔用,只有寫鎖沒有被當前線程佔用的狀況纔會獲取失敗)。
仔細想一想,這個設計是合理的:由於當線程獲取讀鎖的時候,可能有其餘線程同時也在持有讀鎖,所以不能把獲取讀鎖的線程「升級」爲寫鎖;而對於得到寫鎖的線程,它必定獨佔了讀寫鎖,所以能夠繼續讓它獲取讀鎖,當它同時獲取了寫鎖和讀鎖後,還能夠先釋放寫鎖繼續持有讀鎖,這樣一個寫鎖就「降級」爲了讀鎖.
一個線程要想同時持有寫鎖和讀鎖,必須先獲取寫鎖再獲取讀鎖;寫鎖能夠「降級」爲讀鎖;讀鎖不能「升級」爲寫鎖。
Java併發編程之美