Java 併發編程-再談 AbstractQueuedSynchronizer 2:共享模式與基於 Condition 的等待 / 通知機制實現

共享模式acquire實現流程

上文咱們講解了AbstractQueuedSynchronizer獨佔模式的acquire實現流程,本文趁熱打鐵繼續看一下AbstractQueuedSynchronizer共享模式acquire的實現流程。連續兩篇文章的學習,也能夠對比獨佔模式acquire和共享模式acquire的區別,加深對於AbstractQueuedSynchronizer的理解。html

先看一下共享模式acquire的實現,方法爲acquireShared和acquireSharedInterruptibly,二者差異不大,區別就在於後者有中斷處理,以acquireShared爲例:java

1
2
3
4
public final void acquireShared( int arg) {
       if (tryAcquireShared(arg) < 0 )
             doAcquireShared(arg);
  }

這裏就能看出第一個差異來了:獨佔模式acquire的時候子類重寫的方法tryAcquire返回的是boolean,便是否tryAcquire成功;共享模式acquire的時候,返回的是一個int型變量,判斷是否<0。doAcquireShared方法的實現爲:node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void doAcquireShared( int arg) {
     final Node node = addWaiter(Node.SHARED);
     boolean failed = true ;
     try {
         boolean interrupted = false ;
         for (;;) {
             final Node p = node.predecessor();
             if (p == head) {
                 int r = tryAcquireShared(arg);
                 if (r >= 0 ) {
                     setHeadAndPropagate(node, r);
                     p.next = null ; // help GC
                     if (interrupted)
                         selfInterrupt();
                     failed = false ;
                     return ;
                 }
             }
             if (shouldParkAfterFailedAcquire(p, node) &&
                 parkAndCheckInterrupt())
                 interrupted = true ;
         }
     } finally {
         if (failed)
             cancelAcquire(node);
     }
}

咱們來分析一下這段代碼作了什麼:算法

  1. addWaiter,把全部tryAcquireShared<0的線程實例化出一個Node,構建爲一個FIFO隊列,這和獨佔鎖是同樣的
  2. 拿當前節點的前驅節點,只有前驅節點是head的節點才能tryAcquireShared,這和獨佔鎖也是同樣的
  3. 前驅節點不是head的,執行」shouldParkAfterFailedAcquire() && parkAndCheckInterrupt()」,for(;;)循環,」shouldParkAfterFailedAcquire()」方法執行2次,當前線程阻塞,這和獨佔鎖也是同樣的

確實,共享模式下的acquire和獨佔模式下的acquire大部分邏輯差很少,最大的差異在於tryAcquireShared成功以後,獨佔模式的acquire是直接將當前節點設置爲head節點便可,共享模式會執行setHeadAndPropagate方法,顧名思義,即在設置head以後多執行了一步propagate操做。setHeadAndPropagate方法源碼爲:編程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void setHeadAndPropagate(Node node, int propagate) {
     Node h = head; // Record old head for check below
     setHead(node);
     /*
      * Try to signal next queued node if:
      *   Propagation was indicated by caller,
      *     or was recorded (as h.waitStatus) by a previous operation
      *     (note: this uses sign-check of waitStatus because
      *      PROPAGATE status may transition to SIGNAL.)
      * and
      *   The next node is waiting in shared mode,
      *     or we don't know, because it appears null
      *
      * The conservatism in both of these checks may cause
      * unnecessary wake-ups, but only when there are multiple
      * racing acquires/releases, so most need signals now or soon
      * anyway.
      */
     if (propagate > 0 || h == null || h.waitStatus < 0 ) {
         Node s = node.next;
         if (s == null || s.isShared())
             doReleaseShared();
     }
}

第3行的代碼設置重設head,第2行的代碼因爲第3行的代碼要重設head,所以先定義一個Node型變量h得到原head的地址,這兩行代碼很簡單。數據結構

第19行~第23行的代碼是獨佔鎖和共享鎖最不同的一個地方,咱們再看獨佔鎖acquireQueued的代碼:app

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued( final Node node, int arg) {
     boolean failed = true ;
     try {
         boolean interrupted = false ;
         for (;;) {
             final Node p = node.predecessor();
             if (p == head && tryAcquire(arg)) {
                 setHead(node);
                 p.next = null ; // help GC
                 failed = false ;
                 return interrupted;
             }
             if (shouldParkAfterFailedAcquire(p, node) &&
                 parkAndCheckInterrupt())
                 interrupted = true ;
         }
     } finally {
         if (failed)
             cancelAcquire(node);
     }
}

這意味着獨佔鎖某個節點被喚醒以後,它只須要將這個節點設置成head就完事了,而共享鎖不同,某個節點被設置爲head以後,若是它的後繼節點是SHARED狀態的,那麼將繼續經過doReleaseShared方法嘗試日後喚醒節點,實現了共享狀態的向後傳播。less

共享模式release實現流程

上面講了共享模式下acquire是如何實現的,下面再看一下release的實現流程,方法爲releaseShared:ide

1
2
3
4
5
6
7
public final boolean releaseShared( int arg) {
     if (tryReleaseShared(arg)) {
         doReleaseShared();
         return true ;
     }
     return false ;
}

tryReleaseShared方法是子類實現的,若是tryReleaseShared成功,那麼執行doReleaseShared()方法:oop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void doReleaseShared() {
     /*
      * Ensure that a release propagates, even if there are other
      * in-progress acquires/releases.  This proceeds in the usual
      * way of trying to unparkSuccessor of head if it needs
      * signal. But if it does not, status is set to PROPAGATE to
      * ensure that upon release, propagation continues.
      * Additionally, we must loop in case a new node is added
      * while we are doing this. Also, unlike other uses of
      * unparkSuccessor, we need to know if CAS to reset status
      * fails, if so rechecking.
      */
     for (;;) {
         Node h = head;
         if (h != null && h != tail) {
             int ws = h.waitStatus;
             if (ws == Node.SIGNAL) {
                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 ))
                     continue ;            // loop to recheck cases
                 unparkSuccessor(h);
             }
             else if (ws == 0 &&
                      !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE))
                 continue ;                // loop on failed CAS
         }
         if (h == head)                   // loop if head changed
             break ;
     }
}

主要是兩層邏輯:

  1. 頭結點自己的waitStatus是SIGNAL且能經過CAS算法將頭結點的waitStatus從SIGNAL設置爲0,喚醒頭結點的後繼節點
  2. 頭結點自己的waitStatus是0的話,嘗試將其設置爲PROPAGATE狀態的,意味着共享狀態能夠向後傳播

Condition的await()方法實現原理—-構建等待隊列

咱們知道,Condition是用於實現通知/等待機制的,和Object的wait()/notify()同樣,因爲本文以前描述AbstractQueuedSynchronizer的共享模式的篇幅不是很長,加之Condition也是AbstractQueuedSynchronizer的一部分,所以將Condition也放在這裏寫了。

Condition分爲await()和signal()兩部分,前者用於等待、後者用於喚醒,首先看一下await()是如何實現的。Condition自己是一個接口,其在AbstractQueuedSynchronizer中的實現爲ConditionObject:

1
2
3
4
5
6
7
8
9
public class ConditionObject implements Condition, java.io.Serializable {
         private static final long serialVersionUID = 1173984872572414699L;
         /** First node of condition queue. */
         private transient Node firstWaiter;
         /** Last node of condition queue. */
         private transient Node lastWaiter;
         
         ...
}

這裏貼了一些字段定義,後面都是方法就不貼了,會對重點方法進行分析的。從字段定義咱們能夠看到,ConditionObject全局性地記錄了第一個等待的節點與最後一個等待的節點。

像ReentrantLock每次要使用ConditionObject,直接new一個ConditionObject出來便可。咱們關注一下await()方法的實現:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public final void await() throws InterruptedException {
     if (Thread.interrupted())
         throw new InterruptedException();
     Node node = addConditionWaiter();
     int savedState = fullyRelease(node);
     int interruptMode = 0 ;
     while (!isOnSyncQueue(node)) {
         LockSupport.park( this );
         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 )
             break ;
     }
     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
         interruptMode = REINTERRUPT;
     if (node.nextWaiter != null ) // clean up if cancelled
         unlinkCancelledWaiters();
     if (interruptMode != 0 )
         reportInterruptAfterWait(interruptMode);
}

第2行~第3行的代碼用於處理中斷,第4行代碼比較關鍵,添加Condition的等待者,看一下實現:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node addConditionWaiter() {
     Node t = lastWaiter;
     // If lastWaiter is cancelled, clean out.
     if (t != null && t.waitStatus != Node.CONDITION) {
         unlinkCancelledWaiters();
         t = lastWaiter;
     }
     Node node = new Node(Thread.currentThread(), Node.CONDITION);
     if (t == null )
         firstWaiter = node;
     else
         t.nextWaiter = node;
     lastWaiter = node;
     return node;
}

首先拿到隊列(注意數據結構,Condition構建出來的也是一個隊列)中最後一個等待者,緊接着第4行的的判斷,判斷最後一個等待者的waitStatus不是CONDITION的話,執行第5行的代碼,解綁取消的等待者,由於經過第8行的代碼,咱們看到,new出來的Node的狀態都是CONDITION的。

那麼unlinkCancelledWaiters作了什麼?裏面的流程就不看了,就是一些指針遍歷並判斷狀態的操做,總結一下就是:從頭至尾遍歷每個Node,遇到Node的waitStatus不是CONDITION的就從隊列中踢掉,該節點的先後節點相連。

接着第8行的代碼前面說過了,new出來了一個Node,存儲了當前線程,waitStatus是CONDITION,接着第9行~第13行的操做很好理解:

  1. 若是lastWaiter是null,說明FIFO隊列中沒有任何Node,firstWaiter=Node
  2. 若是lastWaiter不是null,說明FIFO隊列中有Node,原lastWaiter的next指向Node
  3. 不管如何,新加入的Node編程lastWaiter,即新加入的Node必定是在最後面

用一張圖表示一下構建的數據結構就是:

對比學習,咱們總結一下Condition構建出來的隊列和AbstractQueuedSynchronizer構建出來的隊列的差異,主要體如今2點上:

  1. AbstractQueuedSynchronizer構建出來的隊列,頭節點是一個沒有Thread的空節點,其標識做用,而Condition構建出來的隊列,頭節點就是真正等待的節點
  2. AbstractQueuedSynchronizer構建出來的隊列,節點之間有next與pred相互標識該節點的前一個節點與後一個節點的地址,而Condition構建出來的隊列,只使用了nextWaiter標識下一個等待節點的地址

整個過程當中,咱們看到沒有使用任何CAS操做,firstWaiter和lastWaiter也沒有用volatile修飾,其實緣由很簡單:要await()必然要先lock(),既然lock()了就表示沒有競爭,沒有競爭天然也不必使用volatile+CAS的機制去保證什麼。

Condition的await()方法實現原理—-線程等待

前面咱們看了Condition構建等待隊列的過程,接下來咱們看一下等待的過程,await()方法的代碼比較短,再貼一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public final void await() throws InterruptedException {
     if (Thread.interrupted())
         throw new InterruptedException();
     Node node = addConditionWaiter();
     int savedState = fullyRelease(node);
     int interruptMode = 0 ;
     while (!isOnSyncQueue(node)) {
         LockSupport.park( this );
         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 )
             break ;
     }
     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
         interruptMode = REINTERRUPT;
     if (node.nextWaiter != null ) // clean up if cancelled
         unlinkCancelledWaiters();
     if (interruptMode != 0 )
         reportInterruptAfterWait(interruptMode);
}

構建完畢隊列以後,執行第5行的fullyRelease方法,顧名思義:fullyRelease方法的做用是徹底釋放Node的狀態。方法實現爲:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final int fullyRelease(Node node) {
     boolean failed = true ;
     try {
         int savedState = getState();
         if (release(savedState)) {
             failed = false ;
             return savedState;
         } else {
             throw new IllegalMonitorStateException();
         }
     } finally {
         if (failed)
             node.waitStatus = Node.CANCELLED;
     }
}

這裏第4行獲取state,第5行release的時候將整個state傳過去,理由是某線程可能屢次調用了lock()方法,好比調用了10次lock,那麼此線程就將state加到了10,因此這裏要將10傳過去,將狀態所有釋放,這樣後面的線程才能從新從state=0開始競爭鎖,這也是方法被命名爲fullyRelease的緣由,由於要徹底釋放鎖,釋放鎖以後,若是有競爭鎖的線程,那麼就喚醒第一個,這都是release方法的邏輯了,前面的文章詳細講解過。

接着看await()方法的第7行判斷」while(!isOnSyncQueue(node))」:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final boolean isOnSyncQueue(Node node) {
     if (node.waitStatus == Node.CONDITION || node.prev == null )
         return false ;
     if (node.next != null ) // If has successor, it must be on queue
         return true ;
     /*
      * node.prev can be non-null, but not yet on queue because
      * the CAS to place it on queue can fail. So we have to
      * traverse from tail to make sure it actually made it.  It
      * will always be near the tail in calls to this method, and
      * unless the CAS failed (which is unlikely), it will be
      * there, so we hardly ever traverse much.
      */
     return findNodeFromTail(node);
}

注意這裏的判斷是Node是否在AbstractQueuedSynchronizer構建的隊列中而不是Node是否在Condition構建的隊列中,若是Node不在AbstractQueuedSynchronizer構建的隊列中,那麼調用LockSupport的park方法阻塞。

至此調用await()方法的線程構建Condition等待隊列–釋放鎖–等待的過程已經所有分析完畢。 

Condition的signal()實現原理

上面的代碼分析了構建Condition等待隊列–釋放鎖–等待的過程,接着看一下signal()方法通知是如何實現的:

1
2
3
4
5
6
7
public final void signal() {
     if (!isHeldExclusively())
         throw new IllegalMonitorStateException();
     Node first = firstWaiter;
     if (first != null )
         doSignal(first);
}

首先從第2行的代碼咱們看到,要能signal(),當前線程必須持有獨佔鎖,不然拋出異常IllegalMonitorStateException。

那麼真正操做的時候,獲取第一個waiter,若是有waiter,調用doSignal方法:

1
2
3
4
5
6
7
8
private void doSignal(Node first) {
     do {
         if ( (firstWaiter = first.nextWaiter) == null )
             lastWaiter = null ;
         first.nextWaiter = null ;
     } while (!transferForSignal(first) &&
              (first = firstWaiter) != null );
}

第3行~第5行的代碼很好理解:

  1. 從新設置firstWaiter,指向第一個waiter的nextWaiter
  2. 若是第一個waiter的nextWaiter爲null,說明當前隊列中只有一個waiter,lastWaiter置空
  3. 由於firstWaiter是要被signal的,所以它沒什麼用了,nextWaiter置空

接着執行第6行和第7行的代碼,這裏重點就是第6行的transferForSignal方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
final boolean transferForSignal(Node node) {
     /*
      * If cannot change waitStatus, the node has been cancelled.
      */
     if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
         return false;
 
     /*
      * Splice onto queue and try to set waitStatus of predecessor to
      * indicate that thread is (probably) waiting. If cancelled or
      * attempt to set waitStatus fails, wake up to resync (in which
      * case the waitStatus can be transiently and harmlessly wrong).
      */
     Node p = enq(node);
     int ws = p.waitStatus;
     if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
         LockSupport.unpark(node.thread);
     return true ;
}

方法本意是將一個節點從Condition隊列轉換爲AbstractQueuedSynchronizer隊列,總結一下方法的實現:

  1. 嘗試將Node的waitStatus從CONDITION置爲0,這一步失敗直接返回false
  2. 當前節點進入調用enq方法進入AbstractQueuedSynchronizer隊列
  3. 當前節點經過CAS機制將waitStatus置爲SIGNAL

最後上面的步驟所有成功,返回true,返回true喚醒等待節點成功。從喚醒的代碼咱們能夠得出一個重要結論:某個await()的節點被喚醒以後並不意味着它後面的代碼會當即執行,它會被加入到AbstractQueuedSynchronizer隊列的尾部,只有前面等待的節點獲取鎖所有完畢才能輪到它。

代碼分析到這裏,我想相似的signalAll方法也沒有必要再分析了,顯然signalAll方法的做用就是將全部Condition隊列中等待的節點逐一隊列中從移除,由CONDITION狀態變爲SIGNAL狀態並加入AbstractQueuedSynchronizer隊列的尾部。

代碼示例

可能你們看了我分析半天代碼會有點迷糊,這裏最後我貼一段我用於驗證上面Condition結論的示例代碼,首先創建一個Thread,我將之命名爲ConditionThread:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
  * @author 五月的倉頡http://www.cnblogs.com/xrq730/p/7067904.html
  */
public class ConditionThread implements Runnable {
 
     private Lock lock;
     
     private Condition condition;
     
     public ConditionThread(Lock lock, Condition condition) {
         this .lock = lock;
         this .condition = condition;
     }
     
     @Override
     public void run() {
         
         if ( "線程0" .equals(JdkUtil.getThreadName())) {
             thread0Process();
         } else if ( "線程1" .equals(JdkUtil.getThreadName())) {
             thread1Process();
         } else if ( "線程2" .equals(JdkUtil.getThreadName())) {
             thread2Process();
         }
         
     }
     
     private void thread0Process() {
         try {
             lock.lock();
             System.out.println( "線程0休息5秒" );
             JdkUtil.sleep( 5000 );
             condition.signal();
             System.out.println( "線程0喚醒等待線程" );
         } finally {
             lock.unlock();
         }
     }
     
     private void thread1Process() {
         try {
             lock.lock();
             System.out.println( "線程1阻塞" );
             condition.await();
             System.out.println( "線程1被喚醒" );
         } catch (InterruptedException e) {
             
         } finally {
             lock.unlock();
         }
     }
     
     private void thread2Process() {
         try {
             System.out.println( "線程2想要獲取鎖" );
             lock.lock();
             System.out.println( "線程2獲取鎖成功" );
         } finally {
             lock.unlock();
         }
     }
     
}

這個類裏面的方法就不解釋了,反正就三個方法片斷,根據線程名判斷,每一個線層執行的是其中的一個代碼片斷。寫一段測試代碼:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
  * @author 五月的倉頡http://www.cnblogs.com/xrq730/p/7067904.html
  */
@Test
public void testCondition() throws Exception {
     Lock lock = new ReentrantLock();
     Condition condition = lock.newCondition();
         
     // 線程0的做用是signal
     Runnable runnable0 = new ConditionThread(lock, condition);
     Thread thread0 = new Thread(runnable0);
     thread0.setName( "線程0" );
     // 線程1的做用是await
     Runnable runnable1 = new ConditionThread(lock, condition);
     Thread thread1 = new Thread(runnable1);
     thread1.setName( "線程1" );
     // 線程2的做用是lock
     Runnable runnable2 = new ConditionThread(lock, condition);
     Thread thread2 = new Thread(runnable2);
     thread2.setName( "線程2" );
         
     thread1.start();
     Thread.sleep( 1000 );
     thread0.start();
     Thread.sleep( 1000 );
     thread2.start();
         
     thread1.join();
}

測試代碼的意思是:

  1. 線程1先啓動,獲取鎖,調用await()方法等待
  2. 線程0後啓動,獲取鎖,休眠5秒準備signal()
  3. 線程2最後啓動,獲取鎖,因爲線程0未使用完畢鎖,所以線程2排隊,能夠此時因爲線程0還未signal(),所以線程1在線程0執行signal()後,在AbstractQueuedSynchronizer隊列中的順序是在線程2後面的

代碼執行結果爲:

1
2
3
4
5
6
1 線程 1 阻塞
2 線程 0 休息 5
3 線程 2 想要獲取鎖
4 線程 0 喚醒等待線程
5 線程 2 獲取鎖成功
6 線程 1 被喚醒

符合咱們的結論:signal()並不意味着被喚醒的線程當即執行。因爲線程2先於線程0排隊,所以看到第5行打印的內容,線程2先獲取鎖。

相關文章
相關標籤/搜索