回憶 synchronized 關鍵字,它配合 Object 的 wait()、notify() 系列方法能夠實現等待/通知模式。java
對於 Lock,經過 Condition 也能夠實現等待/通知模式。node
Condition 是一個接口。sql
Condition 接口的實現類是 Lock(AQS)中的 ConditionObject。架構
Lock 接口中有個 newCondition() 方法,經過這個方法能夠得到 Condition 對象(其實就是 ConditionObject)。併發
所以,經過 Lock 對象能夠得到 Condition 對象。分佈式
Lock lock = new ReentrantLock(); Condition c1 = lock.newCondition(); Condition c2 = lock.newCondition();
ConditionObject 類是 AQS 的內部類,實現了 Condition 接口。高併發
public class ConditionObject implements Condition, java.io.Serializable { private transient Node firstWaiter; private transient Node lastWaiter; ...
能夠看到,等待隊列和同步隊列同樣,使用的都是同步器 AQS 中的節點類 Node。性能
一樣擁有首節點和尾節點,學習
每一個 Condition 對象都包含着一個 FIFO 隊列。ui
結構圖:
調用 Condition 的 await() 方法會使線程進入等待隊列,並釋放鎖,線程狀態變爲等待狀態。
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); //釋放同步狀態(鎖) int savedState = fullyRelease(node); int interruptMode = 0; //判斷節點是否放入同步對列 while (!isOnSyncQueue(node)) { //阻塞 LockSupport.park(this); //若是已經中斷了,則退出 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
分析上述方法的大概過程:
addConditionWaiter() 方法:
private Node addConditionWaiter() { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { //清除條件隊列中全部狀態不爲Condition的節點 unlinkCancelledWaiters(); t = lastWaiter; } //將該線程建立節點,放入等待隊列 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
過程分析:同步隊列的首節點移動到等待隊列。加入尾節點以前會清除全部狀態不爲 Condition 的節點。
調用 Condition 的 signal() 方法,能夠喚醒等待隊列的首節點(等待時間最長),喚醒以前會將該節點移動到同步隊列中。
public final void signal() { //判斷是否獲取了鎖 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
過程:
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
過程:
final boolean transferForSignal(Node node) { //將節點狀態變爲0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //將該節點加入同步隊列 Node p = enq(node); int ws = p.waitStatus; //若是結點p的狀態爲cancel 或者修改waitStatus失敗,則直接喚醒 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
調用同步器的 enq 方法,將節點移動到同步隊列,
知足條件後使用 LockSupport 喚醒該線程。
當 Condition 調用 signalAll() 方法:
public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }
能夠看到 doSignalAll() 方法使用了 do-while 循環來喚醒每個等待隊列中的節點,直到 first 爲 null 時,中止循環。
一句話總結 signalAll() 的做用:將等待隊列中的所有節點移動到同步隊列中,並喚醒每一個節點的線程。
整個過程能夠分爲三步:
第一步:一個線程獲取鎖後,經過調用 Condition 的 await() 方法,會將當前線程先加入到等待隊列中,並釋放鎖。而後就在 await() 中的一個 while 循環中判斷節點是否已經在同步隊列,是則嘗試獲取鎖,不然一直阻塞。
第二步:當線程調用 signal() 方法後,程序首先檢查當前線程是否獲取了鎖,而後經過 doSignal(Node first) 方法將節點移動到同步隊列,並喚醒節點中的線程。
第三步:被喚醒的線程,將從 await() 中的 while 循環中退出來,而後調用 acquireQueued() 方法競爭同步狀態。競爭成功則退出 await() 方法,繼續執行。
歡迎工做一到五年的Java工程師朋友們加入Java架構開發: 855835163 羣內提供免費的Java架構學習資料(裏面有高可用、高併發、高性能及分佈式、Jvm性能調優、Spring源碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個知識點的架構資料)合理利用本身每一分每一秒的時間來學習提高本身,不要再用"沒有時間「來掩飾本身思想上的懶惰!趁年輕,使勁拼,給將來的本身一個交代!