Condition接口實現的功能與Object類中的wait/notify(等待/通知機制)相似,Object的wait和notify/notify是與對象監視器(java線程狀態操做和鎖與監視器)配合完成線程間的等待/通知機制,而Condition與Lock配合完成等待通知機制,前者是java底層級別的,後者是語言級別的,具備更高的可控制性和擴展性java
// 與Object wait()同樣,調用await()方法的線程必須先獲取鎖 public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 將線程封裝成Node,加入到Condition等待隊列的尾部 Node node = addConditionWaiter(); // 釋放當前線程所佔用的lock,在釋放的過程當中會喚醒同步隊列中的下一個節點 // 與Object wait()同樣,await也會釋放當前獲取的鎖 int savedState = fullyRelease(node); int interruptMode = 0; // 判斷當前node是否在AQS同步隊列中,若是不在就阻塞等待加入AQS同步隊列後喚醒 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 從AQS中獲取鎖後繼續執行 // acquireQueued方法參考這篇博客 // [AQS](https://my.oschina.net/kdy1994/blog/3022593 "AQS") if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } // 將線程封裝成Node對象,加入到Condition等待隊列的尾部 // firstWaiter 表示Conditon等待隊列的頭結點,lastWaiter是尾結點 private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } // Node.CONDITION = -2 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } public final void signal() { // 判斷是否獲取了鎖 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // firstWaiter Condition等待隊列的頭結點 Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 將頭結點從等待隊列中移除 first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { //將node waitStatus設置爲0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 將node加入AQS同步隊列尾部,加入後await()的while循環條件不成立了 Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
AQS同步隊列的尾(tail)變成了Condition等待隊列的頭(firstWaiter)node
Condition等待隊列的firstWaiter變成了以前firstWaiter的nextWaiter(doSigna()方法的if判斷)ui
if ( (firstWaiter = first.nextWaiter) == null)
this
Condition等待隊列的firstWaiter與next的聯繫被中斷了(doSignal()方法中).net
first.nextWaiter = null
線程
while (!isOnSyncQueue(node))
接下來執行acquireQueued(node, savedState)
,從AQS同步隊列中獲取鎖後結束退出import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[5]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); //獲取鎖 try { // 若是「緩衝已滿」,則等待;直到「緩衝」不是滿的,纔將x添加到緩衝中。 while (count == items.length) notFull.await(); // 將x添加到緩衝中 items[putptr] = x; // 將「put統計數putptr+1」;若是「緩衝已滿」,則設putptr爲0。 if (++putptr == items.length) putptr = 0; // 將「緩衝」數量+1 ++count; // 喚醒take線程,由於take線程經過notEmpty.await()等待 notEmpty.signal(); // 打印寫入的數據 System.out.println(Thread.currentThread().getName() + " put " + (Integer) x); } finally { lock.unlock(); // 釋放鎖 } } public Object take() throws InterruptedException { lock.lock(); //獲取鎖 try { // 若是「緩衝爲空」,則等待;直到「緩衝」不爲空,纔將x從緩衝中取出。 while (count == 0) notEmpty.await(); // 將x從緩衝中取出 Object x = items[takeptr]; // 將「take統計數takeptr+1」;若是「緩衝爲空」,則設takeptr爲0。 if (++takeptr == items.length) takeptr = 0; // 將「緩衝」數量-1 --count; // 喚醒put線程,由於put線程經過notFull.await()等待 notFull.signal(); // 打印取出的數據 System.out.println(Thread.currentThread().getName() + " take " + (Integer) x); return x; } finally { lock.unlock(); // 釋放鎖 } } } public class ConditionTest { private static BoundedBuffer bb = new BoundedBuffer(); public static void main(String[] args) { // 啓動10個「寫線程」,向BoundedBuffer中不斷的寫數據(寫入0-9); // 啓動10個「讀線程」,從BoundedBuffer中不斷的讀數據。 for (int i = 0; i < 10; i++) { new PutThread("p" + i, i).start(); new TakeThread("t" + i).start(); } } static class PutThread extends Thread { private int num; public PutThread(String name, int num) { super(name); this.num = num; } public void run() { try { Thread.sleep(1); // 線程休眠1ms bb.put(num); // 向BoundedBuffer中寫入數據 } catch (InterruptedException e) { } } } static class TakeThread extends Thread { public TakeThread(String name) { super(name); } public void run() { try { Thread.sleep(10); // 線程休眠1ms Integer num = (Integer) bb.take(); // 從BoundedBuffer中取出數據 } catch (InterruptedException e) { } } }