閱讀建議:雖然我這裏會介紹一些 AQS 的知識,不過若是你徹底不瞭解 AQS,看本文就有點吃力了。java
目錄緩存
使用示例下面這個例子很是實用,我是 javadoc 的搬運工:安全
// 這是一個關於緩存操做的故事
class CachedData {
Object data;
volatile boolean cacheValid;
// 讀寫鎖實例
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
// 獲取讀鎖
rwl.readLock().lock();
if (!cacheValid) {
// 若是緩存過時了,或者爲 null
// 釋放掉讀鎖,而後獲取寫鎖 (後面會看到,沒釋放掉讀鎖就獲取寫鎖,會發生死鎖狀況)
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
if (!cacheValid) {
// 從新判斷,由於在等待寫鎖的過程當中,可能前面有其餘寫線程執行過了
data = ...
cacheValid = true;
}
// 獲取讀鎖 (持有寫鎖的狀況下,是容許獲取讀鎖的,稱爲 「鎖降級」,反之不行。)
rwl.readLock().lock();
} finally {
// 釋放寫鎖,此時還剩一個讀鎖
rwl.writeLock().unlock();
// Unlock write, still hold read
}
}
try {
use(data);
}
finally {
// 釋放讀鎖
rwl.readLock().unlock();
}
}
}複製代碼
ReentrantReadWriteLock 分爲讀鎖和寫鎖兩個實例,讀鎖是共享鎖,可被多個線程同時使用,寫鎖是獨佔鎖。持有寫鎖的線程能夠繼續獲取讀鎖,反之不行。bash
這一節比較重要,咱們要先看清楚 ReentrantReadWriteLock 的大框架,而後再到源碼細節。app
首先,咱們來看下 ReentrantReadWriteLock 的結構,它有好些嵌套類:框架
你們先仔細看看這張圖中的信息。而後咱們把 ReadLock 和 WriteLock 的代碼提出來一塊兒看,清晰一些:源碼分析
很清楚了,ReadLock 和 WriteLock 中的方法都是經過 Sync 這個類來實現的。Sync 是 AQS 的子類,而後再派生了公平模式和不公平模式。性能
從它們調用的 Sync 方法,咱們能夠看到: ReadLock 使用了共享模式,WriteLock 使用了獨佔模式。ui
等等,同一個 AQS 實例怎麼能夠同時使用共享模式和獨佔模式???spa
這裏給你們回顧下 AQS,咱們橫向對比下 AQS 的共享模式和獨佔模式:
AQS 的精髓在於內部的屬性 state:
對於獨佔模式來講,一般就是 0 表明可獲取鎖,1 表明鎖被別人獲取了,重入例外
而共享模式下,每一個線程均可以對 state 進行加減操做
也就是說,獨佔模式和共享模式對於 state 的操做徹底不同,那讀寫鎖 ReentrantReadWriteLock 中是怎麼使用 state 的呢?答案是將 state 這個 32 位的 int 值分爲高 16 位和低 16位,分別用於共享模式和獨佔模式。
有了前面的概念,你們內心應該都有數了吧,下面就再也不那麼囉嗦了,直接代碼分析。
源代碼加註釋 1500 行,並不算難,咱們要看的代碼量不大。若是你前面一節都理解了,那麼直接從頭開始一行一行往下看就是了,仍是比較簡單的。
ReentrantReadWriteLock 的前面幾行很簡單,咱們往下滑到 Sync 類,先來看下它的全部的屬性:
abstract static class Sync extends AbstractQueuedSynchronizer {
// 下面這塊說的就是將 state 一分爲二,高 16 位用於共享模式,低16位用於獨佔模式
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 取 c 的高 16 位值,表明讀鎖的獲取次數(包括重入)
static int sharedCount(int c) {
return c >>> SHARED_SHIFT;
}
// 取 c 的低 16 位值,表明寫鎖的重入次數,由於寫鎖是獨佔模式
static int exclusiveCount(int c) {return c & EXCLUSIVE_MASK;}
// 這個嵌套類的實例用來記錄每一個線程持有的讀鎖數量(讀鎖重入)
static final class HoldCounter {
// 持有的讀鎖數
int count = 0;
// 線程 id
final long tid = getThreadId(Thread.currentThread());
}
// ThreadLocal 的子類
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
/** * 組合使用上面兩個類,用一個 ThreadLocal 來記錄當前線程持有的讀鎖數量 */
private transient ThreadLocalHoldCounter readHolds;
// 用於緩存,記錄"最後一個獲取讀鎖的線程"的讀鎖重入次數,
// 因此無論哪一個線程獲取到讀鎖後,就把這個值佔爲已用,這樣就不用到 ThreadLocal 中查詢 map 了
// 算不上理論的依據:一般讀鎖的獲取很快就會伴隨着釋放,
// 顯然,在 獲取->釋放 讀鎖這段時間,若是沒有其餘線程獲取讀鎖的話,此緩存就能幫助提升性能
private transient HoldCounter cachedHoldCounter;
// 第一個獲取讀鎖的線程(而且其未釋放讀鎖),以及它持有的讀鎖數量
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
Sync() {
// 初始化 readHolds 這個 ThreadLocal 屬性
readHolds = new ThreadLocalHoldCounter();
// 爲了保證 readHolds 的內存可見性
setState(getState());
// ensures visibility of readHolds }
...
}複製代碼
state 的高 16 位表明讀鎖的獲取次數,包括重入次數,獲取到讀鎖一次加 1,釋放掉讀鎖一次減 1
state 的低 16 位表明寫鎖的獲取次數,由於寫鎖是獨佔鎖,同時只能被一個線程得到,因此它表明重入次數
每一個線程都須要維護本身的 HoldCounter,記錄該線程獲取的讀鎖次數,這樣才能知道究竟是不是讀鎖重入,用 ThreadLocal 屬性 readHolds 維護
cachedHoldCounter 有什麼用?其實沒什麼用,但能提示性能。將最後一次獲取讀鎖的線程的 HoldCounter 緩存到這裏,這樣比使用 ThreadLocal 性能要好一些,由於 ThreadLocal 內部是基於 map 來查詢的。可是 cachedHoldCounter 這一個屬性畢竟只能緩存一個線程,因此它要起提高性能做用的依據就是:一般讀鎖的獲取緊隨着就是該讀鎖的釋放。我這裏可能表達不太好,可是你們應該是懂的吧。
firstReader 和 firstReaderHoldCount 有什麼用?其實也沒什麼用,可是它也能提示性能。將"第一個"獲取讀鎖的線程記錄在 firstReader 屬性中,這裏的第一個不是全局的概念,等這個 firstReader 當前表明的線程釋放掉讀鎖之後,會有後來的線程佔用這個屬性的。firstReader 和 firstReaderHoldCount 使得在讀鎖不產生競爭的狀況下,記錄讀鎖重入次數很是方便快速
若是一個線程使用了 firstReader,那麼它就不須要佔用 cachedHoldCounter
我的認爲,讀寫鎖源碼中最讓初學者頭疼的就是這幾個用於提高性能的屬性了,使得你們看得雲裏霧裏的。主要是由於 ThreadLocal 內部是經過一個 ThreadLocalMap 來操做的,會增長檢索時間。而不少場景下,執行 unlock 的線程每每就是剛剛最後一次執行 lock 的線程,中間可能沒有其餘線程進行 lock。還有就是不少不怎麼會發生讀鎖競爭的場景。
上面說了這麼多,是但願能幫你們下降後面閱讀源碼的壓力,你們也能夠先看看後面的,而後再慢慢體會。
前面咱們好像都只說讀鎖,徹底沒提到寫鎖,主要是由於寫鎖真的是簡單不少,我也特意將寫鎖的源碼放到了後面,咱們先啃下最難的讀鎖先。
下面我就不一行一行按源碼順序說了,咱們按照使用來講。
咱們來看下讀鎖 ReadLock 的 lock 流程:
// ReadLock
public void lock() {
sync.acquireShared(1);
}
// AQS
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}複製代碼
而後咱們就會進到 Sync 類的 tryAcquireShared 方法:
在 AQS 中,若是 tryAcquireShared(arg) 方法返回值小於 0 表明沒有獲取到共享鎖(讀鎖),大於 0 表明獲取到
回顧 AQS 共享模式:tryAcquireShared 方法不只僅在 acquireShared 的最開始被使用,這裏是 try,也就可能會失敗,若是失敗的話,執行後面的 doAcquireShared,進入到阻塞隊列,而後等待前驅節點喚醒。喚醒之後,仍是會調用 tryAcquireShared 進行獲取共享鎖的。固然,喚醒之後再 try 是很容易得到鎖的,由於這個節點已經排了好久的隊了,組織是會照顧它的。
因此,你在看下面這段代碼的時候,要想象到兩種獲取讀鎖的場景,一種是新來的,一種是排隊排到它的。
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread(); int c = getState();
// exclusiveCount(c) 不等於 0,說明有線程持有寫鎖,
// 並且不是當前線程持有寫鎖,那麼當前線程獲取讀鎖失敗
// (另,若是持有寫鎖的是當前線程,是能夠繼續獲取讀鎖的)
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
return -1;
// 讀鎖的獲取次數
int r = sharedCount(c);
// 讀鎖獲取是否須要被阻塞,稍後細說。爲了進去下面的分支,假設這裏不阻塞就行了
if (!readerShouldBlock() &&
// 判斷是否會溢出 (2^16-1,沒那麼容易溢出的)
r < MAX_COUNT &&
// 下面這行 CAS 是將 state 屬性的高 16 位加 1,低 16 位不變,若是成功就表明獲取到了讀鎖
compareAndSetState(c, c + SHARED_UNIT)) {
// =======================
// 進到這裏就是獲取到了讀鎖
// =======================
if (r == 0) {
// r == 0 說明此線程是第一個獲取讀鎖的,或者說在它前面獲取讀鎖的都走光光了,它也算是第一個吧
// 記錄 firstReader 爲當前線程,及其持有的讀鎖數量:1
firstReader = current; firstReaderHoldCount = 1;
}
else if (firstReader == current) {
// 進來這裏,說明是 firstReader 重入獲取讀鎖(這很是簡單,count 加 1 結束)
firstReaderHoldCount++;
}
else {
// 前面咱們說了 cachedHoldCounter 用於緩存最後一個獲取讀鎖的線程
// 若是 cachedHoldCounter 緩存的不是當前線程,設置爲緩存當前線程的 HoldCounter
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
// 到這裏,那麼就是 cachedHoldCounter 緩存的是當前線程,可是 count 爲 0,
// 你們能夠思考一下:這裏爲何要 set ThreadLocal 呢?(固然,答案確定不在這塊代碼中)
// 既然 cachedHoldCounter 緩存的是當前線程,
// 當前線程確定調用過 readHolds.get() 進行初始化 ThreadLocal
readHolds.set(rh);
// count 加 1
rh.count++;
}
// return 大於 0 的數,表明獲取到了共享鎖
return 1; }
// 往下看
return fullTryAcquireShared(current);}複製代碼
上面的代碼中,要進入 if 分支,須要知足:readerShouldBlock() 返回 false,而且 CAS 要成功(咱們先不要糾結 MAX_COUNT 溢出)。
那咱們反向推,怎麼樣進入到最後的 fullTryAcquireShared:
readerShouldBlock() 返回 true,2 種狀況:
在 FairSync 中說的是 hasQueuedPredecessors(),即阻塞隊列中有其餘元素在等待鎖。
也就是說,公平模式下,有人在排隊呢,你新來的不能直接獲取鎖
在 NonFairSync 中說的是 apparentlyFirstQueuedIsExclusive(),即判斷阻塞隊列中 head 的第一個後繼節點是不是來獲取寫鎖的,若是是的話,讓這個寫鎖先來,避免寫鎖飢餓。
做者給寫鎖定義了更高的優先級,因此若是碰上獲取寫鎖的線程立刻就要獲取到鎖了,獲取讀鎖的線程不該該和它搶。
若是 head.next 不是來獲取寫鎖的,那麼能夠隨便搶,由於是非公平模式,你們比比 CAS 速度
compareAndSetState(c, c + SHARED_UNIT) 這裏 CAS 失敗,存在競爭。多是和另外一個讀鎖獲取競爭,固然也多是和另外一個寫鎖獲取操做競爭。
而後就會來到 fullTryAcquireShared 中再次嘗試:
/** * 1\. 剛剛咱們說了多是由於 CAS 失敗,若是就此返回,那麼就要進入到阻塞隊列了,
* 想一想有點不甘心,由於都已經知足了 !readerShouldBlock(),也就是說原本能夠不用到阻塞隊列的,
* 因此進到這個方法實際上是增長 CAS 成功的機會 * 2\. 在 NonFairSync 狀況下,雖然 head.next 是獲取寫鎖的,我知道它等待好久了,我沒想和它搶,
* 但是若是我是來重入讀鎖的,那麼只能表示對不起了
*/
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
// 別忘了這外層有個 for 循環
for (;;) {
int c = getState();
// 若是其餘線程持有了寫鎖,天然此次是獲取不到讀鎖了,乖乖到阻塞隊列排隊吧
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock. }
else if (readerShouldBlock()) {
/** * 進來這裏,說明:
* 1\. exclusiveCount(c) == 0:寫鎖沒有被佔用
* 2\. readerShouldBlock() 爲 true,說明阻塞隊列中有其餘線程在等待 * * 既然 should block,那進來這裏是幹什麼的呢? * 答案:是進來處理讀鎖重入的! * */
// firstReader 線程重入讀鎖,直接到下面的 CAS
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
}
else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
// cachedHoldCounter 緩存的不是當前線程
// 那麼到 ThreadLocal 中獲取當前線程的 HoldCounter // 若是當前線程歷來沒有初始化過 ThreadLocal 中的值,get() 會執行初始化 rh = readHolds.get(); // 若是發現 count == 0,也就是說,純屬上一行代碼初始化的,那麼執行 remove // 而後往下兩三行,乖乖排隊去 if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) // 排隊去。 return -1; } /** * 這塊代碼我看了蠻久才把握好它是幹嗎的,原來只須要知道,它是處理重入的就能夠了。 * 就是爲了確保讀鎖重入操做能成功,而不是被塞到阻塞隊列中等待 * * 另外一個信息就是,這裏對於 ThreadLocal 變量 readHolds 的處理: * 若是 get() 後發現 count == 0,竟然會作 remove() 操做, * 這行代碼對於理解其餘代碼是有幫助的 */ }
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 這裏 CAS 成功,那麼就意味着成功獲取讀鎖了
// 下面須要作的是設置 firstReader 或 cachedHoldCounter
if (sharedCount(c) == 0) {
// 若是發現 sharedCount(c) 等於 0,就將當前線程設置爲 firstReader firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { // 下面這幾行,就是將 cachedHoldCounter 設置爲當前線程 if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; } // 返回大於 0 的數,表明獲取到了讀鎖 return 1; } }}複製代碼
firstReader 是每次將讀鎖獲取次數從 0 變爲 1 的那個線程。
能緩存到 firstReader 中就不要緩存到 cachedHoldCounter 中。
上面的源碼分析應該說得很是詳細了,若是到這裏你不太能看懂上面的有些地方的註釋,那麼能夠先日後看,而後再多看幾遍。
下面咱們看看讀鎖釋放的流程:
// ReadLock
public void unlock() {
sync.releaseShared(1);
}複製代碼
// Sync
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
// 這句代碼其實喚醒 獲取寫鎖的線程,往下看就知道了
return true; }
return false;}
// Sync
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
if (firstReaderHoldCount == 1)
// 若是等於 1,那麼此次解鎖後就再也不持有鎖了,把 firstReader 置爲 null,給後來的線程用
// 爲何不順便設置 firstReaderHoldCount = 0?由於不必,其餘線程使用的時候本身會設值
firstReader = null;
else
firstReaderHoldCount--;
} else {
// 判斷 cachedHoldCounter 是否緩存的是當前線程,不是的話要到 ThreadLocal 中取
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
// 這一步將 ThreadLocal remove 掉,防止內存泄漏。由於已經再也不持有讀鎖了 readHolds.remove();
if (count <= 0)
// 就是那種,lock() 一次,unlock() 好幾回的逗比
throw unmatchedUnlockException();
}
// count 減 1
--rh.count;
}
for (;;) {
int c = getState();
// nextc 是 state 高 16 位減 1 後的值
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// 若是 nextc == 0,那就是 state 所有 32 位都爲 0,也就是讀鎖和寫鎖都空了 // 此時這裏返回 true 的話,實際上是幫助喚醒後繼節點中的獲取寫鎖的線程 return nextc == 0; }}複製代碼
讀鎖釋放的過程仍是比較簡單的,主要就是將 hold count 減 1,若是減到 0 的話,還要將 ThreadLocal 中的 remove 掉。
而後是在 for 循環中將 state 的高 16 位減 1,若是發現讀鎖和寫鎖都釋放光了,那麼喚醒後繼的獲取寫鎖的線程。
寫鎖是獨佔鎖。
若是有讀鎖被佔用,寫鎖獲取是要進入到阻塞隊列中等待的。
// WriteLock
public void lock() {
sync.acquire(1);}
// AQS
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
// 若是 tryAcquire 失敗,那麼進入到阻塞隊列等待
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// Sync
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// 看下這裏返回 false 的狀況:
// c != 0 && w == 0: 寫鎖可用,可是有線程持有讀鎖(也多是本身持有)
// c != 0 && w !=0 && current != getExclusiveOwnerThread(): 其餘線程持有寫鎖
// 也就是說,只要有讀鎖或寫鎖被佔用,此次就不能獲取到寫鎖
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 這裏不須要 CAS,仔細看就知道了,能到這裏的,只多是寫鎖重入,否則在上面的 if 就攔截了
setState(c + acquires); return true;
}
// 若是寫鎖獲取不須要 block,那麼進行 CAS,成功就表明獲取到了寫鎖
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}複製代碼
下面看一眼 writerShouldBlock() 的斷定,而後你再回去看一篇寫鎖獲取過程。
static final class NonfairSync extends Sync {
// 若是是非公平模式,那麼 lock 的時候就能夠直接用 CAS 去搶鎖,搶不到再排隊
final boolean writerShouldBlock() {
return false; // writers can always barge
}
...}
static final class FairSync extends Sync {
final boolean writerShouldBlock() {
// 若是是公平模式,那麼若是阻塞隊列有線程等待的話,就乖乖去排隊
return hasQueuedPredecessors();
}
...}複製代碼
// WriteLock
public void unlock() {
sync.release(1);
}
// AQS
public final boolean release(int arg) {
// 1\. 釋放鎖
if (tryRelease(arg)) {
// 2\. 若是獨佔鎖釋放"徹底",喚醒後繼節點
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// Sync
// 釋放鎖,是線程安全的,由於寫鎖是獨佔鎖,具備排他性
// 實現很簡單,state 減 1 就是了
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively()
)
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
// 若是 exclusiveCount(nextc) == 0,也就是說包括重入的,全部的寫鎖都釋放了,
// 那麼返回 true,這樣會進行喚醒後繼節點的操做。
return free;
}複製代碼
看到這裏,是否是發現寫鎖相對於讀鎖來講要簡單不少。
Doug Lea 沒有說寫鎖更高級,若是有線程持有讀鎖,那麼寫鎖獲取也須要等待。
不過從源碼中也能夠看出,確實會給寫鎖一些特殊照顧,如非公平模式下,爲了提升吞吐量,lock 的時候會先 CAS 競爭一下,能成功就表明讀鎖獲取成功了,可是若是發現 head.next 是獲取寫鎖的線程,就不會去作 CAS 操做。
Doug Lea 將持有寫鎖的線程,去獲取讀鎖的過程稱爲鎖降級(Lock downgrading)。這樣,此線程就既持有寫鎖又持有讀鎖。
可是,鎖升級是不能夠的。線程持有讀鎖的話,在沒釋放的狀況下不能去獲取寫鎖,由於會發生死鎖。
回去看下寫鎖獲取的源碼:
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// 看下這裏返回 false 的狀況:
// c != 0 && w == 0: 寫鎖可用,可是有線程持有讀鎖(也多是本身持有)
// c != 0 && w !=0 && current != getExclusiveOwnerThread(): 其餘線程持有寫鎖
// 也就是說,只要有讀鎖或寫鎖被佔用,此次就不能獲取到寫鎖
if (w == 0 || current != getExclusiveOwnerThread())
return false;
...
}
...
}複製代碼
仔細想一想,若是線程 a 先獲取了讀鎖,而後獲取寫鎖,那麼線程 a 就到阻塞隊列休眠了,本身把本身弄休眠了,並且可能以後就沒人去喚醒它了。
(全文完)