Java 併發編程-再談 AbstractQueuedSynchronizer 3 :基於 AbstractQueuedSynchronizer 的併發類實現

公平模式ReentrantLock實現原理

前面的文章研究了AbstractQueuedSynchronizer的獨佔鎖和共享鎖,有了前兩篇文章的基礎,就能夠乘勝追擊,看一下基於AbstractQueuedSynchronizer的併發類是如何實現的。java

ReentrantLock顯然是一種獨佔鎖,首先是公平模式的ReentrantLock,Sync是ReentractLock中的基礎類,繼承自AbstractQueuedSynchronizer,看一下代碼實現:算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
abstract static class Sync extends AbstractQueuedSynchronizer {
     private static final long serialVersionUID = -5179523762034025860L;
 
     /**
      * Performs {@link Lock#lock}. The main reason for subclassing
      * is to allow fast path for nonfair version.
      */
     abstract void lock();
 
     /**
      * Performs non-fair tryLock.  tryAcquire is
      * implemented in subclasses, but both need nonfair
      * try for trylock method.
      */
     final boolean nonfairTryAcquire( int acquires) {
         final Thread current = Thread.currentThread();
         int c = getState();
         if (c == 0 ) {
             if (compareAndSetState( 0 , acquires)) {
                 setExclusiveOwnerThread(current);
                 return true ;
             }
         }
         else if (current == getExclusiveOwnerThread()) {
             int nextc = c + acquires;
             if (nextc < 0 ) // overflow
                 throw new Error( "Maximum lock count exceeded" );
             setState(nextc);
             return true ;
         }
         return false ;
     }
 
     protected final boolean tryRelease( int releases) {
         int c = getState() - releases;
         if (Thread.currentThread() != getExclusiveOwnerThread())
             throw new IllegalMonitorStateException();
         boolean free = false ;
         if (c == 0 ) {
             free = true ;
             setExclusiveOwnerThread( null );
         }
         setState(c);
         return free;
     }
 
     protected final boolean isHeldExclusively() {
         // While we must in general read state before owner,
         // we don't need to do so to check if current thread is owner
         return getExclusiveOwnerThread() == Thread.currentThread();
     }
 
     final ConditionObject newCondition() {
         return new ConditionObject();
     }
 
     // Methods relayed from outer class
 
     final Thread getOwner() {
         return getState() == 0 ? null : getExclusiveOwnerThread();
     }
 
     final int getHoldCount() {
         return isHeldExclusively() ? getState() : 0 ;
     }
 
     final boolean isLocked() {
         return getState() != 0 ;
     }
 
     /**
      * Reconstitutes this lock instance from a stream.
      * @param s the stream
      */
     private void readObject(java.io.ObjectInputStream s)
         throws java.io.IOException, ClassNotFoundException {
         s.defaultReadObject();
         setState( 0 ); // reset to unlocked state
     }
}

Sync屬於一個公共類,它是抽象的說明Sync會被繼承,簡單整理一下Sync主要作了哪些事(由於Sync不是ReentrantLock公平鎖的關鍵):多線程

  1. 定義了一個lock方法讓子類去實現,咱們平時之因此能調用ReentrantLock的lock()方法,就是由於Sync定義了它
  2. 實現了非公平鎖tryAcquira的方法
  3. 實現了tryRelease方法,比較簡單,狀態-1,獨佔鎖的線程置空
  4. 實現了isHeldExclusively方法
  5. 定義了newCondition方法,讓開發者能夠利用Condition實現通知/等待

接着,看一下公平鎖的實現,FairSync類,它繼承自Sync:併發

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static final class FairSync extends Sync {
     private static final long serialVersionUID = -3000897897090466540L;
 
     final void lock() {
         acquire( 1 );
     }
 
     /**
      * Fair version of tryAcquire.  Don't grant access unless
      * recursive call or no waiters or is first.
      */
     protected final boolean tryAcquire( int acquires) {
         final Thread current = Thread.currentThread();
         int c = getState();
         if (c == 0 ) {
             if (!hasQueuedPredecessors() &&
                 compareAndSetState( 0 , acquires)) {
                 setExclusiveOwnerThread(current);
                 return true ;
             }
         }
         else if (current == getExclusiveOwnerThread()) {
             int nextc = c + acquires;
             if (nextc < 0 )
                 throw new Error( "Maximum lock count exceeded" );
             setState(nextc);
             return true ;
         }
         return false ;
     }
}

整理一下要點:less

1. 每次acquire的時候,state+1,若是當前線程lock()以後又lock()了,state不斷+1,相應的unlock()的時候state-1,直到將state減到0爲之,說明當前線程釋放完全部的狀態,其它線程能夠競爭性能

2. state=0的時候,經過hasQueuedPredecessors方法作一次判斷,hasQueuedPredecessors的實現爲」h != t && ((s = h.next) == null || s.thread != Thread.currentThread());」,其中h是head、t是tail,因爲代碼中對結果取反,所以取反以後的判斷爲」h == t || ((s = h.next) != null && s.thread == Thread.currentThread());」,總結起來有兩種狀況能夠經過!hasQueuedPredecessors()這個判斷:ui

  • h==t,h==t的狀況爲要麼當前FIFO隊列中沒有任何數據,要麼只構建出了一個head還沒日後面連過任何一個Node,所以head就是tail
  • (s = h.next) != null && s.thread == Thread.currentThread(),當前線程爲正在等待的第一個Node中的線程

3. 若是沒有線程比當前線程等待更久去執行acquire操做,那麼經過CAS操做將state從0變爲1的線程tryAcquire成功this

4. 沒有tryAcquire成功的線程,按照tryAcquire的前後順序,構建爲一個FIFO隊列,即第一個tryAcquire失敗的排在head的後一位,第二個tryAcquire失敗的排在head的後二位spa

5. 當tryAcquire成功的線程release完畢,第一個tryAcquire失敗的線程第一個嘗試tryAcquire,這就是先到先得,典型的公平鎖線程

非公平模式ReentrantLock實現原理

看完了公平模式ReentrantLock,接着咱們看一下非公平模式ReentrantLock是如何實現的。NonfairSync類,一樣是繼承自Sync類,實現爲:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static final class NonfairSync extends Sync {
     private static final long serialVersionUID = 7316153563782823691L;
 
     /**
      * Performs lock.  Try immediate barge, backing up to normal
      * acquire on failure.
      */
     final void lock() {
         if (compareAndSetState( 0 , 1 ))
             setExclusiveOwnerThread(Thread.currentThread());
         else
             acquire( 1 );
     }
 
     protected final boolean tryAcquire( int acquires) {
         return nonfairTryAcquire(acquires);
     }
}

結合nonfairTryAcquire方法一塊兒講解,nonfairTryAcquire方法的實現爲:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final boolean nonfairTryAcquire( int acquires) {
     final Thread current = Thread.currentThread();
     int c = getState();
     if (c == 0 ) {
         if (compareAndSetState( 0 , acquires)) {
             setExclusiveOwnerThread(current);
             return true ;
         }
     }
     else if (current == getExclusiveOwnerThread()) {
         int nextc = c + acquires;
         if (nextc < 0 ) // overflow
             throw new Error( "Maximum lock count exceeded" );
         setState(nextc);
         return true ;
     }
     return false ;
}

看到差異就在於非公平鎖lock()的時候會先嚐試經過CAS看看能不能把state從0變爲1(即獲取鎖),若是能夠的話,直接獲取鎖而不須要排隊。舉個實際例子就很好理解了:

  1. 線程一、線程二、線程3競爭鎖,線程1競爭成功獲取鎖,線程二、線程3依次排隊
  2. 線程1執行完畢,釋放鎖,state變爲0,喚醒了第一個排隊的線程2
  3. 此時線程4來嘗試獲取鎖了,因爲線程2被喚醒了,所以線程2與線程4競爭鎖
  4. 線程4成功將state從0變爲1,線程2競爭鎖失敗,繼續park

看到整個過程當中,後來的線程4反而比先來的線程2先獲取鎖,至關因而一種非公平的模式,

那爲何非公平鎖效率會比公平鎖效率高?上面第(3)步若是線程2和線程4不競爭鎖就是答案。爲何這麼說,後面的解釋很重要,但願你們能夠理解:

線程1是先將state設爲0,再去喚醒線程2,這兩個過程之間是有時間差的。

那麼若是線程1將state設置爲0的時候,線程4就經過CAS算法獲取到了鎖,且在線程1喚醒線程2以前就已經使用完畢鎖,那麼至關於線程2獲取鎖的時間並無推遲,在線程1將state設置爲0到線程1喚醒線程2的這段時間裏,反而有線程4獲取了鎖執行了任務,這就增長了系統的吞吐量,至關於單位時間處理了更多的任務。

從這段解釋咱們也應該能看出來了,非公平鎖比較適合加鎖時間比較短的任務。這是由於加鎖時間長,至關於線程2將state設爲0並去喚醒線程2的這段時間,線程4沒法完成釋放鎖,那麼線程2被喚醒因爲無法獲取到鎖,又被阻塞了,這種喚醒-阻塞的操做會引發線程的上下文切換,繼而影響系統的性能。

Semaphore實現原理

Semaphore即信號量,用於控制代碼塊的併發數,將Semaphore的permits設置爲1至關於就是synchronized或者ReentrantLock,Semaphore具體用法可見Java多線程19:多線程下的其餘組件之CountDownLatch、Semaphore、Exchanger。信號量容許多條線程獲取鎖,顯然它的鎖是一種共享鎖,信號量也有公平模式與非公平模式,相信看懂了上面ReentrantLock的公平模式與非公平模式的朋友應該對Semaphore的公平模式與非公平模式理解起來會更快,這裏就放在一塊兒寫了。

首先仍是看一下Semaphore的基礎設施,它和ReentrantLock同樣,也有一個Sync:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
abstract static class Sync extends AbstractQueuedSynchronizer {
     private static final long serialVersionUID = 1192457210091910933L;
 
     Sync( int permits) {
         setState(permits);
     }
 
     final int getPermits() {
         return getState();
     }
 
     final int nonfairTryAcquireShared( int acquires) {
         for (;;) {
             int available = getState();
             int remaining = available - acquires;
             if (remaining < 0 ||
                 compareAndSetState(available, remaining))
                 return remaining;
         }
     }
 
     protected final boolean tryReleaseShared( int releases) {
         for (;;) {
             int current = getState();
             int next = current + releases;
             if (next < current) // overflow
                 throw new Error( "Maximum permit count exceeded" );
             if (compareAndSetState(current, next))
                 return true ;
         }
     }
 
     final void reducePermits( int reductions) {
         for (;;) {
             int current = getState();
             int next = current - reductions;
             if (next > current) // underflow
                 throw new Error( "Permit count underflow" );
             if (compareAndSetState(current, next))
                 return ;
         }
     }
 
     final int drainPermits() {
         for (;;) {
             int current = getState();
             if (current == 0 || compareAndSetState(current, 0 ))
                 return current;
         }
     }
}

和ReentrantLock的Sync差很少,Semaphore的Sync定義瞭如下的一些主要內容:

  1. getPermits方法獲取當前的許可剩餘量還剩多少,即還有多少線程能夠同時得到信號量
  2. 定義了非公平信號量獲取共享鎖的邏輯nonfairTryAcquireShared
  3. 定義了公平模式釋放信號量的邏輯tryReleaseShared,至關於釋放一次信號量,state就向上+1(信號量每次的獲取與釋放都是以1爲單位的)

再看下公平信號量的實現,一樣的FairSync,繼承自Sync,代碼爲:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static final class FairSync extends Sync {
     private static final long serialVersionUID = 2014338818796000944L;
 
     FairSync( int permits) {
         super (permits);
     }
 
     protected int tryAcquireShared( int acquires) {
         for (;;) {
             if (hasQueuedPredecessors())
                 return - 1 ;
             int available = getState();
             int remaining = available - acquires;
             if (remaining < 0 ||
                 compareAndSetState(available, remaining))
                 return remaining;
         }
     }
}

首先第10行的hasQueuedPredecessors方法,前面已經說過了,若是已經有了FIFO隊列或者當前線程不是FIFO隊列中在等待的第一條線程,返回-1,表示沒法獲取共享鎖成功。

接着獲取available,available就是state,用volatile修飾,因此線程中能夠看到最新的state,信號量的acquires是1,每次獲取信號量都對state-1,兩種狀況直接返回:

  1. remaining減完<0
  2. 經過cas設置成功

以後就是和以前說過的共享鎖的邏輯了,若是返回的是一個<0的數字,那麼構建FIFO隊列,線程阻塞,直到前面的執行完才能喚醒後面的。

接着看一下非公平信號量的實現,NonfairSync繼承Sync:

1
2
3
4
5
6
7
8
9
10
11
static final class NonfairSync extends Sync {
     private static final long serialVersionUID = -2694183684443567898L;
 
     NonfairSync( int permits) {
         super (permits);
     }
 
     protected int tryAcquireShared( int acquires) {
         return nonfairTryAcquireShared(acquires);
     }
}

nonfairTryAcquireShared在父類已經實現了,再貼一下代碼:

1
2
3
4
5
6
7
8
9
final int nonfairTryAcquireShared( int acquires) {
     for (;;) {
         int available = getState();
         int remaining = available - acquires;
         if (remaining < 0 ||
             compareAndSetState(available, remaining))
             return remaining;
     }
}

看到這裏和公平Semaphore只有一點差異:不會前置進行一次hasQueuedPredecessors()判斷。即當前有沒有構建爲一個FIFO隊列,隊列裏面第一個等待的線程是否是自身都無所謂,對於非公平Semaphore都同樣,反正線程調用Semaphore的acquire方法就將當前state-1,若是獲得的remaining設置成功或者CAS操做成功就返回,這種操做沒有遵循先到先得的原則,即非公平信號量。

至於非公平信號量對比公平信號量的優勢,和ReentrantLock的非公平鎖對比ReentrantLock的公平鎖同樣,就不說了。

CountDownLatch實現原理

CountDownLatch即計數器自減的一種閉鎖,某線程阻塞,對一個計數器自減到0,此線程被喚醒,CountDownLatch具體用法可見Java多線程19:多線程下的其餘組件之CountDownLatch、Semaphore、Exchanger。

CountDownLatch是一種共享鎖,經過await()方法與countDown()兩個方法實現自身的功能,首先看一下await()方法的實現:

1
2
3
public void await() throws InterruptedException {
       sync.acquireSharedInterruptibly( 1 );
  }

acquireSharedInterruptibly最終又回到tryAcquireShared方法上,直接貼整個Sync的代碼實現:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static final class Sync extends AbstractQueuedSynchronizer {
     private static final long serialVersionUID = 4982264981922014374L;
 
     Sync( int count) {
         setState(count);
     }
 
     int getCount() {
         return getState();
     }
 
     protected int tryAcquireShared( int acquires) {
         return (getState() == 0 ) ? 1 : - 1 ;
     }
 
     protected boolean tryReleaseShared( int releases) {
         // Decrement count; signal when transition to zero
         for (;;) {
             int c = getState();
             if (c == 0 )
                 return false ;
             int nextc = c- 1 ;
             if (compareAndSetState(c, nextc))
                 return nextc == 0 ;
         }
     }
}

其實看到tryAcquireShared方法,理解AbstractQueuedSynchronizer共享鎖原理的,不用看countDown方法應該都能猜countDown方法是如何實現的。我這裏總結一下:

  1. 傳入一個count,state就等於count,await的時候判斷是否是0,是0返回1表示成功,不是0返回-1表示失敗,構建FIFO隊列,head頭只鏈接一個Node,Node中的線程就是調用CountDownLatch的await()方法的線程
  2. 每次countDown的時候對state-1,直到state減到0的時候纔算tryReleaseShared成功,tryReleaseShared成功,喚醒被掛起的線程

爲了驗證(2),看一下上面Sync的tryReleaseShared方法就能夠了,確實是這麼實現的。

再理解獨佔鎖與共享鎖的區別

本文詳細分析了ReentrantLock、Semaphore、CountDownLatch的實現原理,第一個是基於獨佔鎖的實現,後兩個是基於共享鎖的實現,從這三個類咱們能夠再總結一下獨佔鎖與共享鎖的區別,主要在兩點上:

  1. 獨佔鎖同時只有一條線程能夠acquire成功,獨佔鎖同時可能有多條線程能夠acquire成功,Semaphore是典型例子;
  2. 獨佔鎖每次只能喚醒一個Node,共享鎖每次喚醒的時候能夠將狀態向後傳播,便可能喚醒多個Node,CountDownLatch是典型例子。

帶着這兩個結論再看ReentrantLock、Semaphore、CountDownLatch,你必定會對獨佔鎖與共享鎖理解更深。 

相關文章
相關標籤/搜索