任意一個java對象,都擁有一組監視器方法,主要包括 wait(),wait(long timeout)、notify 以及 notifyAll 方法,這些方法與synchronized 同步關鍵字配合,能夠實現等待/通知模式。Condition 接口也提供了類型Object的監視器方法,與Lock配合能夠實現等待/通知模式。java
Condition 定義了等待/通知兩種類型的方法,當前線程調用這些方法時,須要提早獲取到Condition對象關聯的鎖。Condition對象由Lock對象(調用Lock對象的 newCondition方法)建立出來的。Condition是依賴Lock對象的。node
package com.lock; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * Created by cxx on 2018/1/18. */ public class ConditionOption { Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); public void conditonWait() throws InterruptedException{ lock.lock(); try { condition.await(); }finally { lock.unlock(); } } public void conditonSignal() throws InterruptedException{ lock.lock(); try { condition.signal(); }finally { lock.unlock(); } } }
獲取一個Condition 必須經過Lock的newCondition() 方法。有界隊列是一種特殊的隊列,當隊列爲空時,隊列的獲取操做將會阻塞獲取線程,直到隊列中有新增元素,當隊列已滿時,隊列的插入操做將會阻塞插入線程,知道隊列出現「空位。」數組
package com.lock; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * Created by cxx on 2018/1/18. */ public class BoundedQueue <T> { private Object[] items; private int addIndex,removeIndex,count; private Lock lock = new ReentrantLock(); private Condition notEmpty = lock.newCondition(); private Condition notFull = lock.newCondition(); public BoundedQueue(int size){ items = new Object[size]; } //添加一個元素,若是數組滿,則添加線程進入等待狀態,直到有空位 public void add(T t) throws InterruptedException{ lock.lock(); try { while (count == items.length){ notFull.await(); } items[addIndex] = t; if (++addIndex == items.length){ addIndex = 0; } ++count; notEmpty.signal(); }finally { lock.unlock(); } } //由頭部刪除一個元素,若是數組空,則刪除線程進入等待狀態,直到有添加元素。 public T remove() throws InterruptedException{ lock.lock(); try { while (count == 0){ notEmpty.await(); } Object x = items[removeIndex]; if (++removeIndex == items.length){ removeIndex = 0; } --count; notFull.signal(); return (T) x; }finally { lock.unlock(); } } }
在添加和刪除方法中使用 while 循環而非 if判斷,目的是防止過早或意外的通知,只有條件符合纔可以退出循環。安全
ConditionObject 是同步器 AbstractQueuedSynchronizer的內部類,由於Condition的操做須要獲取相關聯的鎖。每一個Condition對象都包含着一個隊列,該隊列是Condition對象實現 等待/通知功能的關鍵。併發
等待隊列是一個FIFO的隊列,在隊列中的每一個節點都包含了一個線程引用,該線程就是在Condition對象上等待的線程,若是一個線程調用了Condition.await()方法,那麼該線程將會釋放鎖、構形成節點加入等待隊列並進入等待狀態。ui
一個Condition包含一個等待隊列,Condition擁有首節點(firstWaiter)和尾節點(lastWaiter)。當前線程調用Condition.await()方法,將會以當前線程構造節點,並將節點從尾部加入等待隊列。 Condition擁有首尾節點的引用,而新增節點只須要將原有的尾節點 nextWaiter 指向它,而且更新尾節點便可。上述節點應用更新的過程並無使用CAS保證,緣由在於調用 await()方法的線程一定是獲取了鎖的線程,這個過程是由鎖來保證線程安全的。this
在Object的監視器模型上,一個對象擁有一個同步隊列和等待隊列,而併發包中的Lock擁有一個同步隊列和多個等待隊列。線程
調用Condition的await()方法,會使當前線程進入等待隊列並釋放鎖,同時線程狀態變爲等待狀態。當從await()方法返回時,當前線程必定獲取了Condition相關聯的鎖。code
/** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
調用該方法的線程成功獲取了鎖的線程,也就是同步隊列中的首節點,該方法將會當前線程構形成節點並加入等待隊列中,而後釋放同步狀態,喚醒同步隊列中的後繼節點,而後當前線程會進入等待狀態。對象
調用 Condition的signal() 方法,將會喚醒在等待隊列中等待時間最長的節點,在喚醒節點以前,會將節點移到同步隊列中。
/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
調用該方法的前置條件是當前線程必須獲取了鎖,能夠看到signal()方法進行了IsHeldExclusively()檢查,也就是當前線程必須是獲取了鎖的線程,接着獲取等待隊列的首節點,將其移動到同步隊列並使用LockSupport喚醒節點中的線程。
調用同步器的enq(Node node)方法,等待隊列中的頭節點線程安全地移動到同步隊列。當節點移動到同步隊列後,當前線程在使用LockSupport喚醒該節點的線程。
被喚醒後的線程,將從await()方法中的while循環中退出,進而調用同步器的acquireQueued()方法加入到獲取同步狀態的競爭中。
成功獲取同步狀態以後,被喚醒的線程就愛哪根蔥先前調用的await()方法返回,此時該線程已經成功地獲取了鎖。
Condition的signalAll()方法,至關於對等待隊列中的每一個節點均執行一次signal()方法,效果就是將等待隊列中全部節點所有移動到同步隊列中,並喚醒每一個節點的線程。