Java併發之Condition的實現分析

1、Condition的概念

介紹

回憶 synchronized 關鍵字,它配合 Object 的 wait()、notify() 系列方法能夠實現等待/通知模式。html

對於 Lock,經過 Condition 也能夠實現等待/通知模式。java

Condition 是一個接口。
Condition 接口的實現類是 Lock(AQS)中的 ConditionObject。
Lock 接口中有個 newCondition() 方法,經過這個方法能夠得到 Condition 對象(其實就是 ConditionObject)。
所以,經過 Lock 對象能夠得到 Condition 對象。node

1架構

2分佈式

3源碼分析

Lock lock  = new ReentrantLock();性能

Condition c1 = lock.newCondition();學習

Condition c2 = lock.newCondition();大數據

2、Condition的實現分析

實現

ConditionObject 類是 AQS 的內部類,實現了 Condition 接口。ui

1

2

3

4

public class ConditionObject implements Condition, java.io.Serializable {

        private transient Node firstWaiter;

        private transient Node lastWaiter;

        ...

能夠看到,等待隊列和同步隊列同樣,使用的都是同步器 AQS 中的節點類 Node。
一樣擁有首節點和尾節點,
每一個 Condition 對象都包含着一個 FIFO 隊列。
結構圖:

等待

調用 Condition 的 await() 方法會使線程進入等待隊列,並釋放鎖,線程狀態變爲等待狀態。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

public final void await() throws InterruptedException {

    if (Thread.interrupted())

        throw new InterruptedException();

    Node node = addConditionWaiter();

    //釋放同步狀態(鎖)

    int savedState = fullyRelease(node);

    int interruptMode = 0;

    //判斷節點是否放入同步對列

    while (!isOnSyncQueue(node)) {

        //阻塞

        LockSupport.park(this);

        //若是已經中斷了,則退出

        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

            break;

    }

    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

        interruptMode = REINTERRUPT;

            if (node.nextWaiter != null) // clean up if cancelled

            unlinkCancelledWaiters();

            if (interruptMode != 0)

                reportInterruptAfterWait(interruptMode);

}

分析上述方法的大概過程:

  1. 將當前線程建立爲節點,加入等待隊列;
  2. 釋放鎖,喚醒同步隊列中的後繼節點;
  3. while循環判斷節點是否放入同步隊列:
  • 沒有放入,則阻塞,繼續 while 循環(若是已經中斷了,則退出)
  • 放入,則退出 while 循環,執行後面的判斷
  1. 退出 while 說明節點已經在同步隊列中,調用 acquireQueued() 方法加入同步狀態競爭。
  2. 競爭到鎖後從 await() 方法返回,即退出該方法。

addConditionWaiter() 方法:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

private Node addConditionWaiter() {

    Node t = lastWaiter;

    if (t != null && t.waitStatus != Node.CONDITION) {

        //清除條件隊列中全部狀態不爲Condition的節點

        unlinkCancelledWaiters();

        t = lastWaiter;

    }

    //將該線程建立節點,放入等待隊列

    Node node = new Node(Thread.currentThread(), Node.CONDITION);

    if (t == null)

        firstWaiter = node;

    else

        t.nextWaiter = node;

    lastWaiter = node;

    return node;

}

過程分析:同步隊列的首節點移動到等待隊列。加入尾節點以前會清除全部狀態不爲 Condition 的節點。

通知

調用 Condition 的 signal() 方法,能夠喚醒等待隊列的首節點(等待時間最長),喚醒以前會將該節點移動到同步隊列中。

1

2

3

4

5

6

7

8

public final void signal() {

    //判斷是否獲取了鎖

    if (!isHeldExclusively())

        throw new IllegalMonitorStateException();

    Node first = firstWaiter;

    if (first != null)

        doSignal(first);

}

過程:

  1. 先判斷當前線程是否獲取了鎖;
  2. 而後對首節點調用 doSignal() 方法。

1

2

3

4

5

6

7

8

private void doSignal(Node first) {

    do {

        if ( (firstWaiter = first.nextWaiter) == null)

            lastWaiter = null;

        first.nextWaiter = null;

    } while (!transferForSignal(first) &&

       (first = firstWaiter) != null);

}

過程:

  1. 修改首節點;
  2. 調用 transferForSignal() 方法將節點移動到同步隊列。

1

2

3

4

5

6

7

8

9

10

11

12

final boolean transferForSignal(Node node) {

    //將節點狀態變爲0  

    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))

        return false;

    //將該節點加入同步隊列

    Node p = enq(node);

    int ws = p.waitStatus;

    //若是結點p的狀態爲cancel 或者修改waitStatus失敗,則直接喚醒

    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))

        LockSupport.unpark(node.thread);

    return true;

}

調用同步器的 enq 方法,將節點移動到同步隊列,
知足條件後使用 LockSupport 喚醒該線程。

當 Condition 調用 signalAll() 方法:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

public final void signalAll() {

    if (!isHeldExclusively())

        throw new IllegalMonitorStateException();

    Node first = firstWaiter;

    if (first != null)

        doSignalAll(first);

}

private void doSignalAll(Node first) {

    lastWaiter = firstWaiter = null;

    do {

        Node next = first.nextWaiter;

        first.nextWaiter = null;

        transferForSignal(first);

        first = next;

    } while (first != null);

}

能夠看到 doSignalAll() 方法使用了 do-while 循環來喚醒每個等待隊列中的節點,直到 first 爲 null 時,中止循環。

一句話總結 signalAll() 的做用:將等待隊列中的所有節點移動到同步隊列中,並喚醒每一個節點的線程。

總結

整個過程能夠分爲三步:

第一步:一個線程獲取鎖後,經過調用 Condition 的 await() 方法,會將當前線程先加入到等待隊列中,並釋放鎖。而後就在 await() 中的一個 while 循環中判斷節點是否已經在同步隊列,是則嘗試獲取鎖,不然一直阻塞。

第二步:當線程調用 signal() 方法後,程序首先檢查當前線程是否獲取了鎖,而後經過 doSignal(Node first) 方法將節點移動到同步隊列,並喚醒節點中的線程。

第三步:被喚醒的線程,將從 await() 中的 while 循環中退出來,而後調用 acquireQueued() 方法競爭同步狀態。競爭成功則退出 await() 方法,繼續執行。

歡迎學Java和大數據的朋友們加入java架構交流: 855835163 羣內提供免費的架構資料還有:Java工程化、高性能及分佈式、高性能、深刻淺出。高架構。性能調優、Spring,MyBatis,Netty源碼分析和大數據等多個知識點高級進階乾貨的免費直播講解  能夠進來一塊兒學習交流哦

相關文章
相關標籤/搜索