目錄java
在使用內置鎖synchronized時,經過調用java.lang.Objec中定義的監視器方法,主要有wait()、wait(long timeout)、notify()和notifyAll()方法,能夠實現等待/通知模式。Codition接口中也定義了相似的監視器方法,與顯示鎖Lock配合使用也能夠實現等待/通知模式。
當線程須要利用Condition對象進行等待時,須要提早獲取到Condition對象關聯的顯示鎖Lock對象,使用案例以下:node
Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); //等待 public void coditionWait() throws InterruptedException { lock.lock(); try { condition.await(); }finally { lock.unlock(); } } //通知 public void coditionSignal() throws InterruptedException { lock.lock(); try { condition.signal(); }finally { lock.unlock(); } }
Condition接口由同步器AbstractQueuedSynchronizer內部類ConditionObject提供實現,而顯示鎖Lock對象實現時內部類Sync會繼承AQS,從而把Condition對象與Lock對象關聯起來。併發
在上一篇博客中介紹到爲了處理多個線程競爭同一把鎖,同步器AQS中維護了一個先入先出的雙向同步隊列,讓競爭失敗的線程進入同步隊列等待。一樣,AQS在實現Condition接口也維護了一個先入先出的單向等待隊列,當一個與Lock對象關聯的Condition對象調用await方法,得到鎖的線程就要釋放鎖,並推出同步隊列head頭節點,進入condition等待隊列。condition隊列規定了頭節點firstWaiter和尾節點lastWaiter。less
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter; }
AQS中構建等待隊列複用了內部類Node結點類源碼分析
static final class Node { //等待狀態 volatile int waitStatus; //前驅結點 volatile Node prev; //後繼節點 volatile Node next; //等待獲取鎖的線程 volatile Thread thread; //condition隊列的後繼節點 Node nextWaiter; }
從上圖能夠發現,Condition等待隊列是一個先入先出的單向鏈表,從鏈表尾部加入元素,頭部移出鏈表。使用nextWaiter指向下一個等待節點,構成鏈表的基本元素是節點Node,複用了AQS中的Node類,nextWaiter並不僅僅在Condition鏈表指向下一個等待節點。這是Node類定義nextWaiter的註釋:ui
Link to next node waiting on condition, or the special value SHARED. Because condition queues are accessed only when holding in exclusive mode, we just need a simple linked queue to hold nodes while they are waiting on conditions. They are then transferred to the queue to re-acquire. And because conditions can only be exclusive,we save a field by using special value to indicate sharedmode.this
大意是隻有獨佔鎖纔會關聯Condition隊列,經過nextWaiter變量在構成同步隊列節點標識同步鎖是獨佔鎖仍是共享鎖,從如下方法能夠看出AQS使用nextWaiter來表示鎖:線程
/** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; //判斷是不是共享鎖 final boolean isShared() { return nextWaiter == SHARED; } //構建同步隊列節點,nextWaiter標識同步鎖是獨佔鎖仍是共享鎖 Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } //構建等待隊列節點,nextWaiter指向單向鏈表下一個節點 Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; }
從以上分析能夠看出:AQS複用了Node類來構建同步隊列和等待隊列,Node用來構建同步隊列節點,nextWaiter標識同步鎖是獨佔鎖仍是共享鎖;Node用來構建等待隊列節點,nextWaiter指向單向鏈表下一個節點。剛開始看這一部分時,對我形成了很大的困擾,因此特意寫出來。code
await實現等待考慮到了中斷,若當前線程等待期間發生中斷,拋出InterruptedException異常。線程在等待期間會被阻塞,直到發生中斷或者Condition對象調用signal方法。基本流程:首先將node加入condition隊列,而後釋放鎖,掛起當前線程等待喚醒,喚醒後線程從新進入同步隊列並調用acquireQueued獲取鎖。流程圖以下:
對象
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //將當前線程加入Condition等待隊列 Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; //判斷當前線程是否在同步隊列中 while (!isOnSyncQueue(node)) { //阻塞當前線程 LockSupport.park(this); //在阻塞的過程當中發生中斷 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //被其餘線程喚醒,退出Condition等待隊列加入同步隊列 //獲取鎖 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. //若是尾節點lastWaiter等待狀態是CANCELLED,將隊列全部CANCELLED節點清除 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } //以當前線程構成節點 Node node = new Node(Thread.currentThread(), Node.CONDITION); //尾節點爲空,等待隊列爲空,進行初始化,當前節點是等待隊列的頭節點 if (t == null) firstWaiter = node; //不然添加到等待隊列的尾部,當前節點是等待隊列新的lastWaiter else t.nextWaiter = node; lastWaiter = node; return node; } //unlinkCancelledWaiters方法遍歷CONDITION隊列,刪除狀態爲CANCELLED的節點。 private void unlinkCancelledWaiters() { //首節點 Node t = firstWaiter; //保存遍歷節點前驅節點的引用 Node trail = null; //單向鏈表從前日後遍歷 while (t != null) { //下一個節點 Node next = t.nextWaiter; //節點t的waitStatus爲CANCELLED if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
final int fullyRelease(Node node) { boolean failed = true; try { //獲取同步狀態 int savedState = getState(); //若是是重入鎖,要屢次釋放 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
final boolean isOnSyncQueue(Node node) { //轉移到同步隊列,CONDITION狀態會被清除 //同步隊列prev表示前驅結點,不爲null if (node.waitStatus == Node.CONDITION || node.prev == null) return false; //同步隊列next表示後繼節點,不爲null if (node.next != null) // If has successor, it must be on queue return true; /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */ //遍歷同步隊列,一個一個找 return findNodeFromTail(node); }
//表示從等待狀態退出時會從新產生一箇中斷,但不會拋出異常 private static final int REINTERRUPT = 1; //從等待狀態退出時拋出InterruptedException異常 private static final int THROW_IE = -1; /** * Checks for interrupt, returning THROW_IE if interrupted * before signalled, REINTERRUPT if after signalled, or * 0 if not interrupted. */ private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { //產生異常 if (interruptMode == THROW_IE) throw new InterruptedException(); //產生中斷 else if (interruptMode == REINTERRUPT) selfInterrupt(); }
檢查當前線程是否佔據獨佔鎖,喚醒等待在當前Condition對象等待最久的線程(等待隊列的頭節點)
public final void signal() { //檢查當前線程是否佔據獨佔鎖,若是不是沒有權限喚醒等待線程,拋出異常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ //若是CAS設置失敗,說明節點在signal以前被取消了,返回false if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //CAS設置成功,入隊 //插入節點的前驅節點 Node p = enq(node); //前驅節點的等待狀態 int ws = p.waitStatus; //若是p等待狀態爲CANECLLED或對p進行CAS設置失敗,則喚醒線程,讓node中線程進入acquireQueued方法。不然 //因爲前驅節點等待狀態爲signal,由同步器喚醒線程 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
將等待隊列全部節點依次轉移到同步隊列末尾。
public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { //first節點從condition隊列移出 Node next = first.nextWaiter; first.nextWaiter = null; //first節點加入同步隊列 transferForSignal(first); //更新first節點指向 first = next; } while (first != null); }
以上是對AQS中內部類ConditionObject對Condition接口實現的簡單分析。