Java併發包源碼學習系列:詳解Condition條件隊列、signal和await

系列傳送門:java

Condition接口

Contition是一種廣義上的條件隊列,它利用await()和signal()爲線程提供了一種更爲靈活的等待/通知模式node

圖源:《Java併發編程的藝術》編程

Condition必需要配合Lock一塊兒使用,由於對共享狀態變量的訪問發生在多線程環境下。c#

一個Condition的實例必須與一個Lock綁定,所以await和signal的調用必須在lock和unlock之間有鎖以後,才能使用condition嘛。以ReentrantLock爲例,簡單使用以下:多線程

public class ConditionTest {

    public static void main(String[] args) {
        final ReentrantLock lock = new ReentrantLock();
        final Condition condition = lock.newCondition();

        Thread thread1 = new Thread(() -> {
            String name = Thread.currentThread().getName();

            lock.lock();
            System.out.println(name + " <==成功獲取到鎖" + lock);
            try {
                System.out.println(name + " <==進入條件隊列等待");
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(name + " <==醒了");
            lock.unlock();
            System.out.println(name + " <==釋放鎖");
        }, "等待線程");

        thread1.start();

        Thread thread2 = new Thread(() -> {
            String name = Thread.currentThread().getName();

            lock.lock();
            System.out.println(name + " ==>成功獲取到鎖" + lock);
            try {
                System.out.println("========== 這裏演示await中的線程沒有被signal的時候會一直等着 ===========");
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(name + " ==>通知等待隊列的線程");
            condition.signal();
            lock.unlock();
            System.out.println(name + " ==>釋放鎖");
        }, "通知線程");

        thread2.start();
    }
}
等待線程 <==成功獲取到鎖java.util.concurrent.locks.ReentrantLock@3642cea8[Locked by thread 等待線程]
等待線程 <==進入條件隊列等待
通知線程 ==>成功獲取到鎖java.util.concurrent.locks.ReentrantLock@3642cea8[Locked by thread 通知線程]
========== 這裏演示await中的線程沒有被signal的時候會一直等着 ===========
通知線程 ==>通知等待隊列的線程
通知線程 ==>釋放鎖
等待線程 <==醒了
等待線程  <==釋放鎖

接下來咱們將從源碼的角度分析上面這個流程,理解所謂條件隊列的內涵。併發

AQS條件變量的支持之ConditionObject內部類

AQS,Lock,Condition,ConditionObject之間的關係:less

ConditionObject是AQS的內部類,實現了Condition接口,Lock中提供newCondition()方法,委託給內部AQS的實現Sync來建立ConditionObject對象,享受AQS對Condition的支持。oop

// ReentrantLock#newCondition
	public Condition newCondition() {
        return sync.newCondition();
    }
	// Sync#newCondition
    final ConditionObject newCondition() {
        // 返回Contition的實現,定義在AQS中
        return new ConditionObject();
    }

ConditionObject用來結合鎖實現線程同步,ConditionObject能夠直接訪問AQS對象內部的變量,好比state狀態值和AQS隊列源碼分析

ConditionObject是條件變量,每一個條件變量對應一個條件隊列(單向鏈表隊列),其用來存放調用條件變量的await方法後被阻塞的線程,ConditionObject維護了首尾節點,沒錯這裏的Node就是咱們以前在學習AQS的時候見到的那個Node,咱們會在下面回顧:post

public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /** 條件隊列的第一個節點. */
    private transient Node firstWaiter;
    /** 條件隊列的最後一個節點. */
    private transient Node lastWaiter;
}

看到這裏咱們須要明確這裏的條件隊列和咱們以前說的AQS同步隊列是不同的:

  • AQS維護的是當前在等待資源的隊列,Condition維護的是在等待signal信號的隊列。
  • 每一個線程會存在上述兩個隊列中的一個,lock與unlock對應在AQS隊列,signal與await對應條件隊列,線程節點在他們之間反覆橫跳。

這裏着重說明一下,接下來的源碼學習部分,咱們會將兩個隊列進行區分,涉及到同步隊列和阻塞隊列的描述,意味着是AQS的同步隊列,而條件隊列指的是Condition隊列,望讀者知曉。

這裏咱們針對上面的demo來分析一下會更好理解一些:

爲了簡化,接下來我將用D表示等待線程,用T表示通知線程

  1. 【D】先調用lock.lock()方法,此時無競爭,【D】被加入到AQS同步隊列中。
  2. 【D】調用condition.await()方法,此時【D】被構建爲等待節點並加入到condition對應的條件等待隊列中,並從AQS同步隊列中移除。
  3. 【D】陷入等待以後,【T】啓動,因爲AQS隊列中的【D】已經被移除,此時【T】也很快獲取到鎖,相應的,【T】也被加入到AQS同步隊列中。
  4. 【T】接着調用condition.signal()方法,這時condition對應的條件隊列中只有一個節點【D】,因而【D】被取出,並被再次加入AQS的等待隊列中。此時【D】並無被喚醒,只是單純換了個位置。
  5. 接着【T】執行lock.unlock(),釋放鎖鎖以後,會喚醒AQS隊列中的【D】,此時【D】真正被喚醒且執行。

OK,lock -> await -> signal -> unlock這一套流程相信已經大概可以理解,接下來咱們試着看看源碼吧。

回顧AQS中的Node

咱們這裏再簡單回顧一下AQS中Node類與Condition相關的字段:

// 記錄當前線程的等待狀態,
        volatile int waitStatus;

        // 前驅節點
        volatile Node prev;

        // 後繼節點
        volatile Node next;

        // node存儲的線程
        volatile Thread thread;
		
        // 當前節點在Condition中等待隊列上的下一個節點
        Node nextWaiter;

waitStatus能夠取五種狀態:

  1. 初始化爲0,啥也不表示,以後會被置signal。
  2. 1表示cancelled,取消當前線程對鎖的爭奪。
  3. -1表示signal,表示當前節點釋放鎖後須要喚醒後面可被喚醒的節點。
  4. -2表示condition,咱們這篇的重點,表示當前節點在條件隊列中
  5. -3表示propagate,表示釋放共享資源的時候會向後傳播釋放其餘共享節點。

固然,除了-2這個condition狀態,其餘的等待狀態咱們以前都或多或少分析過,今天着重學習condition這個狀態的意義。

咱們還能夠看到一個Node類型的nextWaiter,它表示條件隊列中當前節點的下一個節點,能夠看出用以實現條件隊列的單向鏈表。

void await()

調用Condition的await()方法,會使當前線程進入等待隊列並釋放鎖,同時線程狀態變爲等待狀態。

其實就是從AQS同步隊列的首節點,注意不是head,而是獲取了鎖的節點,移動到Condition的等待隊列中。

瞭解這些以後,咱們直接來看看具體方法的源碼:

public final void await() throws InterruptedException {
            // 這個方法是響應中斷的
            if (Thread.interrupted())
                throw new InterruptedException();
            // 添加到條件隊列中
            Node node = addConditionWaiter();
            // 釋放同步資源,也就是釋放鎖
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // 若是這個節點的線程不在同步隊列中,說明該線程還不具有競爭鎖的資格
            while (!isOnSyncQueue(node)) {
                // 掛起線程
                LockSupport.park(this);
                // 若是線程中斷,退出
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 上面的循環退出有兩種狀況:
            // 1. isOnSyncQueue(node) 爲true,即當前的node已經轉移到阻塞隊列了
            // 2. checkInterruptWhileWaiting != 0, 表示線程中斷
            
            // 退出循環,被喚醒以後,進入阻塞隊列,等待獲取鎖 acquireQueued
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

添加到條件隊列

Node addConditionWaiter()

addConditionWaiter() 是將當前節點加入到條件隊列中:

private Node addConditionWaiter() {
            Node t = lastWaiter;
            // 若是lastWaiter被取消了,將其清除
            if (t != null && t.waitStatus != Node.CONDITION) {
                // 遍歷整個條件隊列,將已取消的全部節點清除出列
                unlinkCancelledWaiters();
           		// t從新賦值一下,由於last可能改變了
                t = lastWaiter;
            }
            //注意這裏,node在初始化的時候,會指定ws爲CONDITION
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            // t == null 表示隊列此時爲空,初始化firstWaiter
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;// 入隊尾
            lastWaiter = node;// 將尾指針指向新建的node
            return node;
        }

void unlinkCancelledWaiters()

unlinkCancelledWaiters用於清除隊列中已經取消等待的節點。

private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            // trail這裏表示取消節點的前驅節點
            Node trail = null;
            // t會從頭至尾遍歷這個單鏈表
            while (t != null) {
                // next用於保存下一個
                Node next = t.nextWaiter;
                // 若是發現當前這個節點 不是 condition了, 那麼考慮移除它
                // 下面是單鏈表的移除節點操做 簡單來講就是 trail.next = t.next
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    // 說明first就是否是condition了
                    if (trail == null)
                        firstWaiter = next;
                    else
                        //trail.next = t.next
                        trail.nextWaiter = next;
                    // trail後面沒東西,天然trail就是lastWaiter了
                    if (next == null)
                        lastWaiter = trail;
                }
                // 當前節點是一直跟到不是condition節點的上一個
                else
                    trail = t;
                // 向後遍歷 t = t.next
                t = next;
            }
        }

總結一下addConditionWaiter的過程:

  1. 首先判斷條件隊列的尾節點是否被取消了,這裏用last.ws != CONDITION來判斷,若是是的話,就須要從頭至尾遍歷,消除被不是condition的節點。
  2. 接着將當前線程包裝爲Node,指定ws爲CONDITION。

徹底釋放獨佔鎖

將節點加入等待隊列中後,就須要徹底釋放線程擁有的獨佔鎖了,徹底釋放針對重入鎖的狀況。咱們能夠拉到await()方法中看看,將會調用:int savedState = fullyRelease(node);,這句話有什麼內涵呢?

咱們看到這個方法返回了一個savedState變量,簡單的理解就是保存狀態。咱們知道重入鎖的state由重入的次數,若是一個state爲N,咱們能夠認爲它持有N把鎖。

await()方法必須將state置0,也就是徹底釋放鎖,後面的線程才能獲取到這把鎖,置0以後,咱們須要用個變量標記一下,也就是這裏的savedState。

這樣它被從新喚醒的時候,咱們就知道,他須要獲取savedState把鎖。

int fullyRelease(Node node)

final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            // 獲取當前的state值,重入次數
            int savedState = getState();
            // 釋放N = savedState資源
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            // 若是獲取失敗,將會將節點設置爲取消狀態,並拋出異常
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

這裏其實咱們就會明白開頭說的:若是某個線程沒有獲取lock,就直接調用condition的await()方法,結果是什麼呢,在release的時候拋出異常,而後節點被取消,以後節點進來的時候,將它清理掉。

等待進入阻塞隊列

ok,徹底釋放鎖以後,將會來到這幾步,若是這個節點的線程不在同步隊列中,說明該線程還不具有競爭鎖的資格,將被一直掛起,這裏的同步隊列指的是AQS的阻塞隊列。

int interruptMode = 0;
            // 若是這個節點的線程不在同步隊列中,說明該線程還不具有競爭鎖的資格,會一直掛起
            while (!isOnSyncQueue(node)) {
                // 掛起線程
                LockSupport.park(this);
                // 若是線程中斷,退出
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }

boolean isOnSyncQueue(Node node)

下面這個方法會判斷節點是否是已經到阻塞隊列中了,若是是的話,就直接返回true,這個方法的必要性是什麼呢?

其實啊,這裏須要提早說一下signal()方法,signal的做用和await()方法,將在等待隊列中阻塞的節點移動到AQS同步隊列中,這個方法就是說判斷一下這個節點是否是移過去了。

final boolean isOnSyncQueue(Node node) {
        // 1. 節點的等待狀態仍是condition表示還在等待隊列中
        // 2. node.prev == null 表示還沒移到阻塞隊列中[prev和next都是阻塞隊列中用的]

        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
     
        // 若是node已經有了後繼節點,表示已經在阻塞隊列中了
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        // 來到這裏的狀況:ws != condition && node.prev != null && node.next == null
        
		// 想一想:爲何node.prev != null不能做爲判斷不在阻塞隊列的依據呢?
        // CAS首先設置node.prev 指向tail,這個時候node.prev 是不爲null的,但CAS可能會失敗
        return findNodeFromTail(node);
    }

爲何node.prev != null不能做爲判斷不在阻塞隊列的依據呢?

官方給出瞭解答: 由於CAS的入隊操做中,首先設置node.prev 指向tail,這個時候node.prev 是不爲null的。你可以說他入隊成功必定成功嗎?不必定,由於CAS可能會失敗,因此要findNodeFromTail(node)。

boolean findNodeFromTail(Node node)

從阻塞隊列的尾部向前遍歷,若是找到這個node,表示它已經在了,那就返回true。

private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            // 已經有了
            if (t == node)
                return true;
            // 尾都沒有,找啥呢,返回false
            if (t == null)
                return false;
            // 一直往前找
            t = t.prev;
        }
    }

void signal()

因爲以前節點被加入等待隊列將會一直阻塞,爲了連貫性,咱們來看看喚醒它的signal方法吧:

以前說到,若是這個線程會在等待隊列中等待,那麼喚醒它的signal方法的流程是怎麼樣的呢,前面其實已經說了一丟丟了,咱們猜想,signal會將isOnSyncQueue方法的循環打破,接下來看看是否是這樣子的。

public final void signal() {
            // 同樣的,必須佔有當前這個鎖才能用signal方法
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

喚醒節點

該方法會從頭至尾遍歷條件隊列,找到須要移到同步隊列的節點。

void doSignal(Node first)

private void doSignal(Node first) {
            do {
                // firstWaiter 指向first的下一個
                if ( (firstWaiter = first.nextWaiter) == null)
                    // 若是first是最後一個且要被移除了,就將last置null
                    lastWaiter = null;
                // first斷絕與條件隊列的鏈接
                first.nextWaiter = null;
                // fisrt轉移失敗,就看看後面是否是須要的
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

這裏的while循環表示,若是first沒有轉移成功,就接着判斷first後面的節點是否是須要轉移。

boolean transferForSignal(Node node)

該方法將節點從條件隊列轉移到阻塞隊列。

final boolean transferForSignal(Node node) {
        /*
         * CAS操做嘗試將Condition的節點的ws改成0
         * 若是失敗,意味着:節點的ws已經不是CONDITION,說明節點已經被取消了
         * 若是成功,則該節點的狀態ws被改成0了
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * 經過enq方法將node自旋的方式加入同步隊列隊尾
         * 這裏放回的p是node在同步隊列的前驅節點
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        // ws大於0 的狀況只有 cancenlled,表示node的前驅節點取消了爭取鎖,那直接喚醒node線程
        // ws <= 0 會使用cas操做將前驅節點的ws置爲signal,若是cas失敗也會喚醒node
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
	// 自旋的方式入隊
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    // 返回的是node的前驅節點
                    return t;
                }
            }
        }
    }

檢查中斷狀態

ok,一旦signal以後,節點被成功轉移到同步隊列後,這時下面這個循環就會退出了,繼續回到這裏:

int interruptMode = 0;
            // 若是這個節點的線程不在同步隊列中,說明該線程還不具有競爭鎖的資格,會一直掛起
            while (!isOnSyncQueue(node)) {
                // 掛起線程
                LockSupport.park(this);
                // 若是線程中斷,退出
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }

interruptMode能夠有如下幾種取值:

/** await 返回的時候,須要從新設置中斷狀態 */
        private static final int REINTERRUPT =  1;
        /** await 返回的時候,須要拋出 InterruptedException 異常 */
        private static final int THROW_IE    = -1;
		
		/** interruptMode取0的時候表示在await()期間,沒有發生中斷 */

說到這裏咱們須要明白,LockSupport.park(this)掛起的線程是何時喚醒的:

  1. signal方法將節點轉移到同步隊列中,且獲取到了鎖或者對前驅節點的cas操做失敗,調用了LockSupport.unpark(node.thread);方法。
  2. 在park的時候,另一個線程對掛起的線程進行了中斷。

喚醒以後,咱們能夠看到調用checkInterruptWhileWaiting方法檢查等待期間是否發生了中斷,若是不爲0表示確實在等待期間發生了中斷。

但其實這個方法的返回結果用interruptMode變量接收,擁有更加豐富的內涵,它還可以判斷中斷的時機是否在signal以前。

int checkInterruptWhileWaiting(Node node)

該方法用於判斷該線程是否在掛起期間發生了中斷。

private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?// 若是處於中斷狀態,返回true,且將重置中斷狀態
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :// 若是中斷了,判斷什麼時候中斷
                0; // 沒有中斷, 返回0
        }

boolean transferAfterCancelledWait(Node node)

該方法判斷什麼時候中斷,是否在signal以前。

final boolean transferAfterCancelledWait(Node node) {
        // 嘗試使用CAS操做將node 的ws設置爲0
        // 若是成功,說明在signal方法以前中斷就已經發生:
        // 緣由在於:signal若是在此以前發生,必然已經cas操做將ws設置爲0了,這裏不可能設置成功
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            // 就算中斷了,也將節點入隊
            enq(node);
            return true;
        }
        /*
         * If we lost out to a signal(), then we can't proceed
         * until it finishes its enq().  Cancelling during an
         * incomplete transfer is both rare and transient, so just
         * spin.
         * 這裏就是signal以後發生的中斷
         * 可是signal可能還在進行轉移中,這邊自旋等一下它完成
         */
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }

這裏的話,咱們仍是稍微總結一下:

  1. await()中的節點中斷以後,被喚醒有多種狀況:
    • 無中斷的狀況:signal方法成功將節點移入同步隊列且節點成功獲取資源,喚醒該線程,此時退出的時候interruptMode爲0。
    • 有中斷的狀況:
      • signal以前中斷,interruptMode設置爲THROW_IE。
      • signal以後中斷,interruptMode設置爲REINTERRUPT。
  2. 中斷時,不管signal以前或以後,節點不管如何都會進入阻塞隊列。

處理中斷狀態

接下來三個部分我將一一說明:

// 第一部分
			if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
			// 第二部分
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters(); // 清除取消的節點
			// 第三部分
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);

第一部分

signal喚醒的線程並不會當即獲取到資源,從while循環退出後,會經過acquireQueued方法加入獲取同步狀態的競爭中。

// 第一部分
			if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;

acquireQueued(node, savedState)中node此時已經被加入同步隊列了,savedState是以前存儲的state。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted; // 
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

acquireQueued方法返回時,表示已經獲取到了鎖,且返回的是interrupted值,若是返回true,表示已經被中斷。

接着判斷interruptMode != THROW_IE表示是在signal以後發生的中斷,須要從新中斷當前線程,將interruptMode設置爲REINTERRUPT。

第二部分

// 第二部分
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters(); // 清除取消的節點

前面說了,signal會將節點移到同步隊列中,最後一步須要和條件隊列斷開關係,也就是:node.nextWaiter = null,但這是想象中比較正常的狀況,若是在signal以前被中斷,節點也會被加入同步隊列中,這時實際上是沒有調用這個斷開關係的。

所以這邊作一點處理, unlinkCancelledWaiters()邏輯上面也說過了,能夠回過頭去看看,主要是清除隊列中已經取消等待的節點。

第三部分

最後一個部分,就是對兩種interruptMode的狀況進行處理,看看代碼就知道了:

void reportInterruptAfterWait(interruptMode)

private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            // signal 以前的中斷, 須要拋出異常
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            // signal 以後發生的中斷, 須要從新中斷
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }

帶超機制的void await()

帶超時機制的await()方法有如下幾個,簡單看下便可:

  • long awaitNanos(long nanosTimeout)
  • boolean awaitUntil(Date deadline)
  • boolean await(long time, TimeUnit unit)

咱們選最後一個來看看,主要看看和以前await()方法不同的地方:

public final boolean await(long time, TimeUnit unit)
                throws InterruptedException {
            // 計算等待的時間
            long nanosTimeout = unit.toNanos(time);
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            // 截止時間
            final long deadline = System.nanoTime() + nanosTimeout;
            // 表示是否超時
            boolean timedout = false;
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                // 等待時間到了
                if (nanosTimeout <= 0L) {
                    // 這個方法返回true表示在這個方法內,已經將節點轉移到阻塞隊列中
                    // 返回false,表示signal已經發生,表示沒有超時
                    timedout = transferAfterCancelledWait(node);
                    break;
                }
                //spinForTimeoutThreshold 是AQS中的一個字段,若是超過1ms,使用parkNonos
                if (nanosTimeout >= spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
                // 更新一下還須要等待多久
                nanosTimeout = deadline - System.nanoTime();
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return !timedout;
        }

不拋出InterruptedException的await

public final void awaitUninterruptibly() {
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            boolean interrupted = false;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if (Thread.interrupted())
                    interrupted = true;
            }
            // 相比await() 針對中斷少了拋出異常的操做,而是直接進行中斷
            if (acquireQueued(node, savedState) || interrupted)
                selfInterrupt();
        }

Condition的使用

最後以一個Java doc給的例子結尾吧:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    // condition 依賴於 lock 來產生
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    // 生產
    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await();  // 隊列已滿,等待,直到 not full 才能繼續生產
            items[putptr] = x;
            if (++putptr == items.length) putptr = 0;
            ++count;
            notEmpty.signal(); // 生產成功,隊列已經 not empty 了,發個通知出去
        } finally {
            lock.unlock();
        }
    }

    // 消費
    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await(); // 隊列爲空,等待,直到隊列 not empty,才能繼續消費
            Object x = items[takeptr];
            if (++takeptr == items.length) takeptr = 0;
            --count;
            notFull.signal(); // 被我消費掉一個,隊列 not full 了,發個通知出去
            return x;
        } finally {
            lock.unlock();
        }
    }
}

其實這個以前也說過,ArrayBlockingQueue就是採用了這種方式實現的生產者-消費者模式,若是你感興趣,能夠看看具體的實現細節哦。

總結

  • Condition的await()和signal()基於Lock,相比於基於Object的wait()和notify()方法,它提供更加靈活的等待通知的機制。
  • 支持豐富的功能如:帶超時機制的await(),不響應中斷的await(),以及多個等待的條件隊列。
  • Condition的await()方法會將線程包裝爲等待節點,加入等待隊列中,並將AQS同步隊列中的節點移除,接着不斷檢查isOnSyncQueue(Node node),若是在等待隊列中,就一直等着,若是signal將它移到AQS隊列中,則退出循環。
  • Condition的signal()方法則是先檢查當前線程是否獲取了鎖,接着將等待隊列中的節點經過Node的操做直接加入AQS隊列。線程並不會當即獲取到資源,從while循環退出後,會經過acquireQueued方法加入獲取同步狀態的競爭中。

參考閱讀

相關文章
相關標籤/搜索