AQS系列(三)- ReentrantReadWriteLock讀寫鎖的加鎖

前言node

    前兩篇咱們講述了ReentrantLock的加鎖釋放鎖過程,相對而言比較簡單,本篇進入深水區,看看ReentrantReadWriteLock-讀寫鎖的加鎖過程是如何實現的,繼續拜讀老Lea凌厲的代碼風。緩存

1、讀寫鎖的類圖app

    讀鎖就是共享鎖,而寫鎖是獨佔鎖。讀鎖與寫鎖之間的互斥關係爲:讀讀可同時執行(有條件的);讀寫與寫寫均互斥執行。注意此處讀讀可並行我用了有條件的並行,後文會對此作介紹。ui

    繼續奉上一張醜陋的類圖:spa

     能夠看到ReentrantReadWriteLock維護了五個內部類,ReentrantReadWriteLock中存放了Sync、ReadLock、WriteLock三個成員變量,以下截圖所示:線程

     而ReadLock和WriteLock中又存放了Sync變量,截圖以下所示,這樣一組合,有了四種鎖,公平讀鎖、公平寫鎖、非公平讀鎖、非公平寫鎖。對於公平與非公平的實現區別,咱們上一篇已經作過講解,本文將着重關注讀鎖和寫鎖的實現區別。設計

 

2、加鎖源碼code

    在前文中咱們知道,ReentrantLock中用state來判斷當前鎖是否被佔用,而讀寫鎖ReentrantReadWriteLock中因爲同時存在兩種鎖,因此老Lea用state的高16位來存放讀鎖的佔用狀態以及重入次數,低16位存放寫鎖的佔用狀態和重入次數。blog

一、讀鎖加鎖,即共享鎖加鎖隊列

1 public void lock() {
2             sync.acquireShared(1); // 獲取共享鎖方法
3         }

    上述lock方法中調用的獲取共享鎖方法是在AbstractQueuedSynchronizer中實現的,代碼以下:

1 public final void acquireShared(int arg) {
2         if (tryAcquireShared(arg) < 0)
3             doAcquireShared(arg);
4     }

    能夠看到獲取共享鎖分紅了兩步,第一步是嘗試獲取,若是獲取不到再進入if裏面執行doAcquireShared方法,下面分別追蹤。

1)、tryAcquireShared方法

 1 protected final int tryAcquireShared(int unused) {
 2             Thread current = Thread.currentThread();
 3             int c = getState();
 4             // 1.有寫鎖佔用而且不是當前線程,則直接返回獲取失敗
 5             if (exclusiveCount(c) != 0 &&
 6                 getExclusiveOwnerThread() != current)
 7                 return -1;
 8             // 執行到這裏,有兩種狀況 沒有寫鎖佔用或者是當前線程
 9             int r = sharedCount(c); // 獲取讀鎖次數
10             // 二、不該該阻塞則獲取鎖  @此方法有點意思,需着重講解,做用:判斷讀鎖是否須要阻塞
11             if (!readerShouldBlock() &&
12                 r < MAX_COUNT &&
13                 compareAndSetState(c, c + SHARED_UNIT)) {
14                 // 若是CAS成功,則將當前線程對應的計數+1
15                 if (r == 0) { // 若是讀鎖持有數爲0,則說明當前線程是第一個reader,分別給firstReader和firstReaderHoldCount初始化
16                     firstReader = current;
17                     firstReaderHoldCount = 1;
18                 } else if (firstReader == current) { // 若是讀鎖持有數不爲0且當前線程就是firstReader,那麼直接給firstReaderHoldCount+1,表示讀鎖重入
19                     firstReaderHoldCount++;
20                 } else { // 其餘狀況,即當前線程不是firstReader且還有其餘線程持有讀鎖,則要獲取到當前線程對應的HoldCounter,而後給裏面的計數+1
21                     HoldCounter rh = cachedHoldCounter;
22                     if (rh == null || rh.tid != getThreadId(current))
23                         cachedHoldCounter = rh = readHolds.get();
24                     else if (rh.count == 0)
25                         readHolds.set(rh);
26                     rh.count++;
27                 }
28                 return 1;
29             }
30             // 三、應該阻塞或者CAS失敗則進入此方法獲取鎖
31             return fullTryAcquireShared(current);
32         }

     結合上述代碼中的註釋,將邏輯分三部分,咱們一步步分析此方法的邏輯。

    首先第一步,判斷若是有寫鎖而且當前線程不是寫鎖的線程,則直接退出獲取讀鎖的嘗試,由於讀寫是互斥的,退出此方法後就會進入doAcquireShared方法,後續邏輯見下面的2)。但此處仍是要看一下寫鎖狀態統計方法exclusiveCount和讀鎖狀態統計方法sharedCount,方法源碼以下截圖所示:

    能夠看到,exclusiveCount方法是將c和獨佔掩碼進行與操做,獨佔掩碼EXCLUSIVE_MASK高16位均爲0,低16位均爲1,按位與計算以後就剩下c的低16位,這就是第二部分一開始說的低16位存放寫鎖重入次數;同理看sharedCount方法,將c有符號右移16位,這樣移位以後低16位就是原來的高16位,即讀鎖的加鎖次數。老Lea經過這兩個方法實現了用一個int類型的state存放寫鎖讀鎖兩個加鎖次數的結果,是否是看起來就很高端!

    而後看第二步,判斷讀不該該阻塞(即readerShouldBlock方法返回false)且讀鎖持有次數小於最大值且CAS成功,則進入方法中嘗試獲取讀鎖。先看看重點方法readerShouldBlock何時會返回false(不阻塞)何時返回true(阻塞)。此方法在非公平模式和公平模式中有不一樣的實現,公平模式代碼:

1 final boolean readerShouldBlock() {
2             return hasQueuedPredecessors();
3         }

    看到了一個熟悉的身影,hashQueuedPredecessors方法,這不就是在ReentrantLock中公平鎖加鎖時的方法麼?詳細可看個人AQS系列(一)中的講解,總結一下就是該方法判斷隊列前面是否有在排隊的非當前線程,意思就是按排隊順序獲取鎖,不要爭搶。

    非公平模式代碼:

1 final boolean readerShouldBlock() {
2             return apparentlyFirstQueuedIsExclusive();
3         }
1 final boolean apparentlyFirstQueuedIsExclusive() {
2         Node h, s;
3         return (h = head) != null &&
4             (s = h.next)  != null &&
5             !s.isShared()         &&
6             s.thread != null;
7     }

    在後面的方法中,返回了一個四個條件組成的布爾值,邏輯爲頭節點不爲空而且頭節點後的第一個節點不爲空而且這個節點是獨佔的而且線程不爲空,此時返回true即當前這個讀操做應該阻塞,不讓它獲取到鎖。那麼問題來了,爲何要有這個邏輯?此處是爲了不一種異常狀況的發生,若是後面有一個排隊的寫鎖在等待獲取鎖,而這時有一個讀鎖正在執行中,若在讀鎖執行完以前又來了一個讀鎖,由於讀鎖與讀鎖不阻塞因此後來的的讀鎖又獲取到了鎖,這時在隊列第一個位置排隊的寫鎖仍然在傻傻的等着,沒辦法,誰讓你不要緊。就這樣,若是一直有讀鎖在當前正在執行的讀鎖執行完以前進來獲取讀鎖,那麼後面的寫鎖就會一直傻等在那,永遠都無法獲取鎖。因此Lea就設計了這個方法來避免這種狀況的發生,即若是判斷隊列第一位排隊的是寫鎖,那麼後面的讀鎖就先等一等,等這個寫鎖執行完了大家再執行。這也就是我在文章的開始講的-讀讀同時執行是有條件的,這個條件就是指這裏。

    看第二步以前要先說說讀鎖的處理邏輯,由於是可重入的讀鎖,因此須要記錄每一個獲取讀鎖線程的重入次數,即每一個讀的線程都有一個與其對應的重入次數。而後繼續看第二步中讀鎖獲取鎖成功(即CAS成功)以後的邏輯:若是讀鎖持有數爲0,則說明當前線程是第一個reader,分別給firstReader和firstReaderHoldCount初始化;若是讀鎖持有數不爲0且當前線程就是firstReader,那麼直接給firstReaderHoldCount+1,表示讀鎖重入;不然,即當前線程不是firstReader且還有其餘線程持有讀鎖,則要獲取到當前線程對應的HoldCounter,而後給裏面的計數+1。

    下面再一塊兒看看【不然】中的邏輯,粘貼一下Sync中的部分代碼

 1 abstract static class Sync extends AbstractQueuedSynchronizer {
 2        // ...
 3        static final class HoldCounter {
 4             int count = 0;
 5             // Use id, not reference, to avoid garbage retention
 6             final long tid = getThreadId(Thread.currentThread());
 7         }
 8 
 9         static final class ThreadLocalHoldCounter
10             extends ThreadLocal<HoldCounter> {
11             public HoldCounter initialValue() {
12                 return new HoldCounter();
13             }
14         }
15 
16         private transient ThreadLocalHoldCounter readHolds;
17 
18         private transient HoldCounter cachedHoldCounter;
19 
20         private transient Thread firstReader = null;
21         private transient int firstReaderHoldCount;
22 
23         Sync() {
24             readHolds = new ThreadLocalHoldCounter();
25             setState(getState()); // ensures visibility of readHolds
26         }
27         // ...
28 }

    能夠看到,Sync中緩存了一個HoldCounter,存放的是最近一次讀鎖記錄。而若是當前線程不是最近一次記錄的HoldCounter,則去readHolds中取,readHolds是ThreadLocalHoldCounter類型,在Sync的無參構造器中初始化,它與HoldCounter都是Sync的內部類,ThreadLocalHoldCounter就是一個ThreadLocal,內部維護了一個線程與HoldCounter的鍵值對map,一個線程對應一個HoldCounter。因此【不然】中的邏輯加註釋以下所示:

1                     HoldCounter rh = cachedHoldCounter; // 獲取最近一次記錄的HoldCounter,此緩存是爲了提升效率,不用每次都去ThreadLocal中取
2                     if (rh == null || rh.tid != getThreadId(current)) // 判斷當前線程是否是最近一次記錄的HoldCounter
3                         cachedHoldCounter = rh = readHolds.get(); // 若是不是,則去Sync中的ThreadLocal中獲取,而後再放在緩存中
4                     else if (rh.count == 0) // 若是count計數爲0,說明是第一次重入,則將HoldCounter加入ThreadLocal中
5                         readHolds.set(rh);
6                     rh.count++; // 當前線程重入次數+1

 

    下面進入第三步,fullTryAcquireShared方法,進入此方法的前提條件是沒有寫鎖且 (讀應該阻塞或者讀鎖CAS失敗)。看這個full方法的邏輯:

 1 final int fullTryAcquireShared(Thread current) {
 2             
 3             HoldCounter rh = null;
 4             for (;;) { // 無限循環直到有肯定的結果返回
 5                 int c = getState();
 6                 if (exclusiveCount(c) != 0) { // 一、有獨佔鎖且不是當前線程,直接返回讀鎖加鎖失敗
 7                     if (getExclusiveOwnerThread() != current)
 8                         return -1;
 9                     // else we hold the exclusive lock; blocking here
10                     // would cause deadlock.
11                 } else if (readerShouldBlock()) { // 二、判斷讀是否應該阻塞
12                     // Make sure we're not acquiring read lock reentrantly
13                     if (firstReader == current) { // 判斷若是當前線程就是firstReader,那麼什麼都不作,進入3中嘗試獲取鎖,why? 由於這說明當前線程以前就持有了鎖還沒釋放,因此能夠繼續獲取
14                         // assert firstReaderHoldCount > 0;
15                     } else { // 2.5 此處邏輯須要仔細研讀,乍看時看的一頭霧水
16                         if (rh == null) { // 第一次進來時rh確定==null
17                             rh = cachedHoldCounter;
18                             if (rh == null || rh.tid != getThreadId(current)) {
19                                 rh = readHolds.get();
20                                 if (rh.count == 0) // 若是當前線程沒獲取到過讀鎖,則從本地線程變量中移除HoldCounter,由於下一步就要斷定它獲取鎖失敗先不讓它獲取了
21                                     readHolds.remove();
22                             }
23                         }// 能走到這裏,說明當前讀鎖應該阻塞且不是firstReader
24                         if (rh.count == 0) // 再加上當前線程沒獲取到過讀鎖,則先不讓它嘗試獲取鎖了,直接返回獲取失敗
25                             return -1;
26                     }
27                 }
28                 if (sharedCount(c) == MAX_COUNT)
29                     throw new Error("Maximum lock count exceeded");
30                 // 三、再次嘗試獲取鎖
31                 if (compareAndSetState(c, c + SHARED_UNIT)) {
32                     if (sharedCount(c) == 0) {
33                         firstReader = current;
34                         firstReaderHoldCount = 1;
35                     } else if (firstReader == current) {
36                         firstReaderHoldCount++;
37                     } else {
38                         if (rh == null)
39                             rh = cachedHoldCounter;
40                         if (rh == null || rh.tid != getThreadId(current))
41                             rh = readHolds.get();
42                         else if (rh.count == 0)
43                             readHolds.set(rh);
44                         rh.count++;
45                         cachedHoldCounter = rh; // cache for release
46                     }
47                     return 1;
48                 }
49             }
50         }

    詳細看看註解以及源代碼註釋、代碼邏輯,相信能理解這個過程。

 2)、doAcquireShared方法

 1 private void doAcquireShared(int arg) {
 2         // 將當前讀鎖加到隊列後面
 3         final Node node = addWaiter(Node.SHARED);
 4         boolean failed = true;
 5         try {
 6             boolean interrupted = false;
 7             for (;;) {
 8                 // 獲得前一個節點
 9                 final Node p = node.predecessor();
10                 if (p == head) { // 若是前一個節點是頭節點,則嘗試獲取鎖
11                     int r = tryAcquireShared(arg);
12                     if (r >= 0) { // 設置頭節點而且激活後續的節點
13                         setHeadAndPropagate(node, r);
14                         p.next = null; // help GC
15                         if (interrupted)
16                             selfInterrupt();
17                         failed = false;
18                         return;
19                     }
20                 }// 判斷應該掛起則掛起線程
21                 if (shouldParkAfterFailedAcquire(p, node) &&
22                     parkAndCheckInterrupt())
23                     interrupted = true;
24             }
25         } finally {
26             if (failed)
27                 cancelAcquire(node);
28         }
29     }

    該方法跟以前系列中ReentrantLock的加鎖過程相似,在此就不作過多的解釋了,總之仍是經過park來掛起。

 二、寫鎖加鎖,即獨佔鎖加鎖

    進入lock方法:

1 public void lock() {
2             sync.acquire(1);
3         }

    熟悉的樣子,繼續 點進去:

1 public final void acquire(int arg) {
2         if (!tryAcquire(arg) &&
3             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4             selfInterrupt();
5     }

    仍是原先的方法,可是各個方法的實現有區別了。先看第一個tryAcquire:

 1 protected final boolean tryAcquire(int acquires) {
 2             Thread current = Thread.currentThread();
 3             int c = getState();
 4             int w = exclusiveCount(c);
 5             if (c != 0) { // 若是排它鎖存在,則判斷是否是當前線程,若是也不是當前線程,則直接返回獲取失敗
 6                 // (Note: if c != 0 and w == 0 then shared count != 0)
 7                 if (w == 0 || current != getExclusiveOwnerThread())
 8                     return false;
 9                 if (w + exclusiveCount(acquires) > MAX_COUNT)
10                     throw new Error("Maximum lock count exceeded");
11                 // Reentrant acquire
12                 setState(c + acquires);
13                 return true;
14             } // 判斷讀鎖要不要阻塞,此處針對公平鎖和非公平鎖有不一樣的實現,對於非公平鎖統一返回false表示不要阻塞,而公平鎖則會查看前面還有沒有鎖來判斷要不要阻塞
15             if (writerShouldBlock() ||
16                 !compareAndSetState(c, c + acquires))
17                 return false;
18             setExclusiveOwnerThread(current);
19             return true;
20         }

    而後是addWaiter在隊列末尾添加node節點排隊,這個方法在AbstractQueuedSynchronizer中,一樣是熟悉的方法了,此處略過不提。

    最後是acquireQueued方法,以下所示,又是熟悉的代碼,跟ReentrantLock中的加鎖方法一毛同樣,惟一的不一樣點是第7行調用的tryAcquire方法的實現,此處調的是ReentrantReadWriteLock類中Sync的方法,也就是上面的第一個方法。

 1 final boolean acquireQueued(final Node node, int arg) {
 2         boolean failed = true;
 3         try {
 4             boolean interrupted = false;
 5             for (;;) { 
 6                 final Node p = node.predecessor();
 7                 if (p == head && tryAcquire(arg)) {
 8                     setHead(node);
 9                     p.next = null; // help GC
10                     failed = false;
11                     return interrupted;
12                 }
13                 if (shouldParkAfterFailedAcquire(p, node) &&
14                     parkAndCheckInterrupt())
15                     interrupted = true;
16             }
17         } finally {
18             if (failed)
19                 cancelAcquire(node);
20         }
21     }

    寫鎖的加鎖過程基本就這些了,相對來講比讀鎖加鎖容易了不少,由於大多都跟ReentrantLock中的實現相仿。

後記

    讀寫鎖的加鎖過程到此爲止,最近每晚下班回來讀一會,斷斷續續的四晚上才搞定,難受 ><

相關文章
相關標籤/搜索