Condition的做用是對鎖進行更精確的控制。Condition中的await()方法至關於Object的wait()方法,Condition中的signal()方法至關於Object的notify()方法,Condition中的signalAll()至關於Object的notifyAll()方法。不一樣的是,Object中的wait(),notify(),notifyAll()方法是和"同步鎖"(synchronized關鍵字)捆綁使用的;而Condition是須要與"互斥鎖"/"共享鎖"捆綁使用的。html
public interface Condition { // 形成當前線程在接到信號或被中斷以前一直處於等待狀態。會釋放鎖 void await() // 形成當前線程在接到信號、被中斷或到達指定等待時間以前一直處於等待狀態。 boolean await(long time, TimeUnit unit) // 形成當前線程在接到信號、被中斷或到達指定等待時間以前一直處於等待狀態。 long awaitNanos(long nanosTimeout) // 形成當前線程在接到信號以前一直處於等待狀態。 void awaitUninterruptibly() // 形成當前線程在接到信號、被中斷或到達指定最後期限以前一直處於等待狀態。 boolean awaitUntil(Date deadline) // 喚醒一個等待線程。 void signal() // 喚醒全部等待線程。 void signalAll()
public class BoundedQueue<T> { public List<T> q; //這個列表用來存隊列的元素 private int maxSize; //隊列的最大長度 private Lock lock = new ReentrantLock(); private Condition addConditoin = lock.newCondition(); private Condition removeConditoin = lock.newCondition(); public BoundedQueue(int size) { q = new ArrayList<>(size); maxSize = size; } public void add(T e) { lock.lock(); try { while (q.size() == maxSize) { addConditoin.await(); } q.add(e); removeConditoin.signal(); //執行了添加操做後喚醒因隊列空被阻塞的刪除操做 } catch (InterruptedException e1) { Thread.currentThread().interrupt(); } finally { lock.unlock(); } } public T remove() { lock.lock(); try { while (q.size() == 0) { removeConditoin.await(); } T e = q.remove(0); addConditoin.signal(); //執行刪除操做後喚醒因隊列滿而被阻塞的添加操做 return e; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; } finally { lock.unlock(); } } }
ConditionObject是Condition在java併發中的具體的實現,它是AQS的內部類。由於Condition相關操做都須要獲取鎖,因此做爲AQS的內部類也比較合理。接下來就以ConditionObject的等待隊列、等待、通知爲切入點分析ConditionObject的具體實現。java
有關AQS中的內容能夠參考https://my.oschina.net/cqqcqqok/blog/1931790 在此咱們在來看看AQS中Node節點node
static final class Node { //標識節點當前在共享模式下 static final Node SHARED = new Node(); //標識節點當前在獨佔模式下 static final Node EXCLUSIVE = null; /* 這幾個int常量表示當前節點的等待狀態(waitStatus) */ //結束狀態,線程取消鎖的爭搶,通常在超時或被中斷設置爲CANCELLED狀態而該狀態表示的節點會被踢出隊列 static final int CANCELLED = 1; //其表示當前node的後繼節點對應的線程須要被喚醒(unpark)。若是當前線程的後繼線程處於阻塞狀態,而當前線程被release或cancel掉,所以須要喚醒當前線程的後繼線程. static final int SIGNAL = -1; //與Condition相關,該標識的結點處於等待隊列中,結點的線程等待在Condition上,當其餘線程調用了Condition的signal()方法後,CONDITION狀態的結點將從等待隊列轉移到同步隊列中,等待獲取同步鎖。 static final int CONDITION = -2; //與共享模式相關,在共享模式中,該狀態標識結點的線程處於可運行狀態。 static final int PROPAGATE = -3; //節點的等待狀態,新節點的waitStatus=0 volatile int waitStatus; volatile Node prev;//當前節點的前一個節點。 volatile Node next;//當前節點的後一個節點 volatile Thread thread;//當前節點對應的線程 Node nextWaiter;//存儲condition隊列中的後繼節點 }
Condition實現等待的時候內部也有一個等待隊列,等待隊列是一個隱式的單向隊列,等待隊列中的每個節點也是一個AbstractQueuedSynchronizer.Node實例。安全
每一個Condition對象中保存了firstWaiter和lastWaiter做爲隊列首節點和尾節點,每一個節點使用Node.nextWaiter保存下一個節點的引用,所以等待隊列是一個單向隊列。併發
每當一個線程調用Condition.await()方法,那麼該線程會釋放鎖,構形成一個Node節點加入到等待隊列的隊尾。源碼分析
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter; .................. }
public final void await() throws InterruptedException { if (Thread.interrupted())throw new InterruptedException(); Node node = addConditionWaiter();//構造一個新的等待隊列Node加入到隊尾 long savedState = fullyRelease(node);//釋放當前線程的獨佔鎖,無論重入幾回,都把state釋放爲0 int interruptMode = 0; //若是當前節點沒有在同步隊列上,即尚未被signal,則將當前線程阻塞 while (!isOnSyncQueue(node)) { LockSupport.park(this); //被中斷則直接退出自旋 //注意區分兩種中斷:是在被signal前中斷仍是在被signal後中斷, //若是是被signal前就被中斷則拋出 InterruptedException, //不然執行 Thread.currentThread().interrupt(); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //退出了上面自旋說明當前節點已經在同步隊列上,可是當前節點不必定在同步隊列隊首。 //acquireQueued將阻塞直到當前節點成爲隊首,即當前線程得到了鎖。 //acquireQueued源碼介紹可參考AQS源碼分析https://my.oschina.net/cqqcqqok/blog/1931790 //而後await()方法就能夠退出了,讓線程繼續執行await()後的代碼。 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
final long fullyRelease(Node node) { boolean failed = true; try { long savedState = getState(); if (release(savedState)) {//真正的釋放邏輯在AQS的子類中 failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
final boolean isOnSyncQueue(Node node) { //若是當前節點狀態是CONDITION或node.prev是null,則證實當前節點在等待隊列上而不是同步隊列上。 //之因此能夠用node.prev來判斷,是由於一個節點若是要加入同步隊列,在加入前就會設置好prev字段。 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) //若是node.next不爲null,則必定在同步隊列上,由於node.next是在節點加入同步隊列後設置的 return true; //前面的兩個判斷沒有返回的話,就從同步隊列隊尾遍歷一個一個看是否是當前節點。 return findNodeFromTail(node); } private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
public final void signal() { //若是同步狀態不是被當前線程獨佔,直接拋出異常。從這裏也能看出來,Condition只能配合獨佔類同步組件使用。 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first);//通知等待隊列隊首的節點。 }
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } //transferForSignal方法嘗試喚醒當前節點,若是喚醒失敗,則繼續嘗試喚醒當前節點的後繼節點。 while (!transferForSignal(first) && (first = firstWaiter) != null); }
final boolean transferForSignal(Node node) { //若是當前節點狀態爲CONDITION,則將狀態改成0準備加入同步隊列;若是當前狀態不爲CONDITION,說明該節點等待已被中斷,則該方法返回false,doSignal()方法會繼續嘗試喚醒當前節點的後繼節點 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //將節點加入同步隊列,返回的p是節點在同步隊列中的先驅節點 Node p = enq(node); int ws = p.waitStatus; //若是先驅節點的狀態爲CANCELLED(>0) 或設置先驅節點的狀態爲SIGNAL失敗,那麼就當即喚醒當前節點對應的線程,線程被喚醒後會執行acquireQueued方法,該方法會從新嘗試將節點的先驅狀態設爲SIGNAL並再次park線程;若是當前設置前驅節點狀態爲SIGNAL成功,那麼就不須要立刻喚醒線程了,當它的前驅節點成爲同步隊列的首節點且釋放同步狀態後,會自動喚醒它。 //其實筆者認爲這裏不加這個判斷條件應該也是能夠的。只是對於CAS修改前驅節點狀態爲SIGNAL成功這種狀況來講,若是不加這個判斷條件,提早喚醒了線程,等進入acquireQueued方法了節點發現本身的前驅不是首節點,還要再阻塞,等到其前驅節點成爲首節點並釋放鎖時再喚醒一次;而若是加了這個條件,線程被喚醒的時候它的前驅節點確定是首節點了,線程就有機會直接獲取同步狀態從而避免二次阻塞,節省了硬件資源。 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
總的來講,Condition的本質就是等待隊列和同步隊列的交互:ui
當一個持有鎖的線程調用Condition.await()時,它會執行如下步驟:this
當一個持有鎖的線程調用Condition.signal()時,它會執行如下操做:.net
從等待隊列的隊首開始,嘗試對隊首節點執行喚醒操做;若是節點CANCELLED,就嘗試喚醒下一個節點;若是再CANCELLED則繼續迭代。線程
對每一個節點執行喚醒操做時,首先將節點加入同步隊列,此時await()操做的步驟3的解鎖條件就已經開啓了。而後分兩種狀況討論:
若是先驅節點的狀態爲CANCELLED(>0) 或設置先驅節點的狀態爲SIGNAL失敗,那麼就當即喚醒當前節點對應的線程,此時await()方法就會完成步驟3,進入步驟4.
若是成功把先驅節點的狀態設置爲了SIGNAL,那麼就不當即喚醒了。等到先驅節點成爲同步隊列首節點並釋放了同步狀態後,會自動喚醒當前節點對應線程的,這時候await()的步驟3才執行完成,進入步驟4
對Condition的源碼理解,主要就是理解等待隊列,等待隊列能夠類比同步隊列,並且等待隊列比同步隊列要簡單,由於等待隊列是單向隊列,同步隊列是雙向隊列。
如下是筆者對等待隊列是單向隊列、同步隊列是雙向隊列的一些思考,歡迎提出不一樣意見:
之因此同步隊列要設計成雙向的,是由於在同步隊列中,節點喚醒是接力式的,由每個節點喚醒它的下一個節點,若是是由next指針獲取下一個節點,是有可能獲取失敗的,由於虛擬隊列每添加一個節點,是先用CAS把tail設置爲新節點,而後才修改原tail的next指針到新節點的。所以用next向後遍歷是不安全的,可是若是在設置新節點爲tail前,爲新節點設置prev,則能夠保證從tail往前遍歷是安全的。所以要安全的獲取一個節點Node的下一個節點,先要看next是否是null,若是是null,還要從tail往前遍歷看看能不能遍歷到Node。
而等待隊列就簡單多了,等待的線程就是等待者,只負責等待,喚醒的線程就是喚醒者,只負責喚醒,所以每次要執行喚醒操做的時候,直接喚醒等待隊列的首節點就好了。等待隊列的實現中不須要遍歷隊列,所以也不須要prev指針。
參考地址