先來回顧一下java中的等待/通知機制java
咱們有時會遇到這樣的場景:線程A執行到某個點的時候,由於某個條件condition不知足,須要線程A暫停;等到線程B修改了條件condition,使condition知足了線程A的要求時,A再繼續執行。node
最簡單的實現方法就是將condition設爲一個volatile的變量,當A線程檢測到條件不知足時就自旋,相似下面:安全
public class Test { private static volatile int condition = 0; public static void main(String[] args) throws InterruptedException { Thread A = new Thread(new Runnable() { @Override public void run() { while (!(condition == 1)) { // 條件不知足,自旋 } System.out.println("a executed"); } }); A.start(); Thread.sleep(2000); condition = 1; } }
這種方式的問題在於自旋很是耗費CPU資源,固然若是在自旋的代碼塊里加入Thread.sleep(time)將會減輕CPU資源的消耗,可是若是time設的太大,A線程就不能及時響應condition的變化,若是設的過小,依然會形成CPU的消耗。less
所以,java在Object類裏提供了wait()和notify()方法,使用方法以下:ide
class Test1 { private static volatile int condition = 0; private static final Object lock = new Object(); public static void main(String[] args) throws InterruptedException { Thread A = new Thread(new Runnable() { @Override public void run() { synchronized (lock) { while (!(condition == 1)) { try { lock.wait(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } System.out.println("a executed by notify"); } } }); A.start(); Thread.sleep(2000); condition = 1; synchronized (lock) { lock.notify(); } } }
經過代碼能夠看出,在使用一個對象的wait()、notify()方法前必需要獲取這個對象的鎖。源碼分析
當線程A調用了lock對象的wait()方法後,線程A將釋放持有的lock對象的鎖,而後將本身掛起,直到有其餘線程調用notify()/notifyAll()方法或被中斷。能夠看到在lock.wait()前面檢測condition條件的時候使用了一個while循環而不是if,那是由於當有其餘線程把condition修改成知足A線程的要求並調用notify()後,A線程會從新等待獲取鎖,獲取到鎖後才從lock.wait()方法返回,而在A線程等待鎖的過程當中,condition是有可能再次變化的。ui
由於wait()、notify()是和synchronized配合使用的,所以若是使用了顯示鎖Lock,就不能用了。因此顯示鎖要提供本身的等待/通知機制,Condition應運而生。this
咱們用Condition實現上面的例子:spa
class Test2 { private static volatile int condition = 0; private static Lock lock = new ReentrantLock(); private static Condition lockCondition = lock.newCondition(); public static void main(String[] args) throws InterruptedException { Thread A = new Thread(new Runnable() { @Override public void run() { lock.lock(); try { while (!(condition == 1)) { lockCondition.await(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { lock.unlock(); } System.out.println("a executed by condition"); } }); A.start(); Thread.sleep(2000); condition = 1; lock.lock(); try { lockCondition.signal(); } finally { lock.unlock(); } } }
能夠看到經過 lock.newCondition() 能夠得到到 lock 對應的一個Condition對象lockCondition ,lockCondition的await()、signal()方法分別對應以前的Object的wait()和notify()方法。總體上和Object的等待通知是相似的。線程
上面咱們看到了Condition實現的等待通知和Object的等待通知是很是相似的,而Condition提供的等待通知功能更強大,最重要的一點是,一個lock對象能夠經過屢次調用 lock.newCondition() 獲取多個Condition對象,也就是說,在一個lock對象上,能夠有多個等待隊列,而Object的等待通知在一個Object上,只能有一個等待隊列。用下面的例子說明,下面的代碼實現了一個阻塞隊列,當隊列已滿時,add操做被阻塞有其餘線程經過remove方法刪除元素;當隊列已空時,remove操做被阻塞直到有其餘線程經過add方法添加元素。
public class BoundedQueue1<T> { public List<T> q; //這個列表用來存隊列的元素 private int maxSize; //隊列的最大長度 private Lock lock = new ReentrantLock(); private Condition addConditoin = lock.newCondition(); private Condition removeConditoin = lock.newCondition(); public BoundedQueue1(int size) { q = new ArrayList<>(size); maxSize = size; } public void add(T e) { lock.lock(); try { while (q.size() == maxSize) { addConditoin.await(); } q.add(e); removeConditoin.signal(); //執行了添加操做後喚醒因隊列空被阻塞的刪除操做 } catch (InterruptedException e1) { Thread.currentThread().interrupt(); } finally { lock.unlock(); } } public T remove() { lock.lock(); try { while (q.size() == 0) { removeConditoin.await(); } T e = q.remove(0); addConditoin.signal(); //執行刪除操做後喚醒因隊列滿而被阻塞的添加操做 return e; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; } finally { lock.unlock(); } } }
下面來分析Condition源碼
以前咱們介紹AQS的時候說過,AQS的同步排隊用了一個隱式的雙向隊列,同步隊列的每一個節點是一個AbstractQueuedSynchronizer.Node實例。
Node的主要字段有:
狀態 | 值 | 含義 |
CANCELLED | 1 | 當前節點由於超時或中斷被取消同步狀態獲取,該節點進入該狀態不會再變化 |
SIGNAL | -1 | 標識後繼的節點處於阻塞狀態,當前節點在釋放同步狀態或被取消時,須要通知後繼節點繼續運行。每一個節點在阻塞前,須要標記其前驅節點的狀態爲SIGNAL。 |
CONDITION | -2 | 標識當前節點是做爲等待隊列節點使用的。 |
PROPAGATE | -3 | |
0 | 0 | 初始狀態 |
Condition實現等待的時候內部也有一個等待隊列,等待隊列是一個隱式的單向隊列,等待隊列中的每個節點也是一個AbstractQueuedSynchronizer.Node實例。
每一個Condition對象中保存了firstWaiter和lastWaiter做爲隊列首節點和尾節點,每一個節點使用Node.nextWaiter保存下一個節點的引用,所以等待隊列是一個單向隊列。
每當一個線程調用Condition.await()方法,那麼該線程會釋放鎖,構形成一個Node節點加入到等待隊列的隊尾。
Condition.await()方法的源碼以下:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); //構造一個新的等待隊列Node加入到隊尾 int savedState = fullyRelease(node); //釋放當前線程的獨佔鎖,無論重入幾回,都把state釋放爲0 int interruptMode = 0;
//若是當前節點沒有在同步隊列上,即尚未被signal,則將當前線程阻塞 while (!isOnSyncQueue(node)) { LockSupport.park(this);
//後面的藍色代碼都是和中斷相關的,主要是區分兩種中斷:是在被signal前中斷仍是在被signal後中斷,若是是被signal前就被中斷則拋出 InterruptedException,不然執行 Thread.currentThread().interrupt(); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //被中斷則直接退出自旋 break; }
//退出了上面自旋說明當前節點已經在同步隊列上,可是當前節點不必定在同步隊列隊首。acquireQueued將阻塞直到當前節點成爲隊首,即當前線程得到了鎖。而後await()方法就能夠退出了,讓線程繼續執行await()後的代碼。 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } final boolean isOnSyncQueue(Node node) {
//若是當前節點狀態是CONDITION或node.prev是null,則證實當前節點在等待隊列上而不是同步隊列上。之因此能夠用node.prev來判斷,是由於一個節點若是要加入同步隊列,在加入前就會設置好prev字段。 if (node.waitStatus == Node.CONDITION || node.prev == null) return false;
//若是node.next不爲null,則必定在同步隊列上,由於node.next是在節點加入同步隊列後設置的 if (node.next != null) // If has successor, it must be on queue return true; return findNodeFromTail(node); //前面的兩個判斷沒有返回的話,就從同步隊列隊尾遍歷一個一個看是否是當前節點。 } private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
Condition.signal() 方法的源碼以下:
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //若是同步狀態不是被當前線程獨佔,直接拋出異常。從這裏也能看出來,Condition只能配合獨佔類同步組件使用。 Node first = firstWaiter; if (first != null) doSignal(first); //通知等待隊列隊首的節點。 }
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && //transferForSignal方法嘗試喚醒當前節點,若是喚醒失敗,則繼續嘗試喚醒當前節點的後繼節點。 (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { //若是當前節點狀態爲CONDITION,則將狀態改成0準備加入同步隊列;若是當前狀態不爲CONDITION,說明該節點等待已被中斷,則該方法返回false,doSignal()方法會繼續嘗試喚醒當前節點的後繼節點 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); //將節點加入同步隊列,返回的p是節點在同步隊列中的先驅節點 int ws = p.waitStatus;
//若是先驅節點的狀態爲CANCELLED(>0) 或設置先驅節點的狀態爲SIGNAL失敗,那麼就當即喚醒當前節點對應的線程,線程被喚醒後會執行acquireQueued方法,該方法會從新嘗試將節點的先驅狀態設爲SIGNAL並再次park線程;若是當前設置前驅節點狀態爲SIGNAL成功,那麼就不須要立刻喚醒線程了,當它的前驅節點成爲同步隊列的首節點且釋放同步狀態後,會自動喚醒它。
//其實筆者認爲這裏不加這個判斷條件應該也是能夠的。只是對於CAS修改前驅節點狀態爲SIGNAL成功這種狀況來講,若是不加這個判斷條件,提早喚醒了線程,等進入acquireQueued方法了節點發現本身的前驅不是首節點,還要再阻塞,等到其前驅節點成爲首節點並釋放鎖時再喚醒一次;而若是加了這個條件,線程被喚醒的時候它的前驅節點確定是首節點了,線程就有機會直接獲取同步狀態從而避免二次阻塞,節省了硬件資源。 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
總的來講,Condition的本質就是等待隊列和同步隊列的交互:
當一個持有鎖的線程調用Condition.await()時,它會執行如下步驟:
當一個持有鎖的線程調用Condition.signal()時,它會執行如下操做:
從等待隊列的隊首開始,嘗試對隊首節點執行喚醒操做;若是節點CANCELLED,就嘗試喚醒下一個節點;若是再CANCELLED則繼續迭代。
對每一個節點執行喚醒操做時,首先將節點加入同步隊列,此時await()操做的步驟3的解鎖條件就已經開啓了。而後分兩種狀況討論:
若是知道Object的等待通知機制,Condition的使用是比較容易掌握的,由於和Object等待通知的使用基本一致。
對Condition的源碼理解,主要就是理解等待隊列,等待隊列能夠類比同步隊列,並且等待隊列比同步隊列要簡單,由於等待隊列是單向隊列,同步隊列是雙向隊列。
如下是筆者對等待隊列是單向隊列、同步隊列是雙向隊列的一些思考,歡迎提出不一樣意見:
之因此同步隊列要設計成雙向的,是由於在同步隊列中,節點喚醒是接力式的,由每個節點喚醒它的下一個節點,若是是由next指針獲取下一個節點,是有可能獲取失敗的,由於虛擬隊列每添加一個節點,是先用CAS把tail設置爲新節點,而後才修改原tail的next指針到新節點的。所以用next向後遍歷是不安全的,可是若是在設置新節點爲tail前,爲新節點設置prev,則能夠保證從tail往前遍歷是安全的。所以要安全的獲取一個節點Node的下一個節點,先要看next是否是null,若是是null,還要從tail往前遍歷看看能不能遍歷到Node。
而等待隊列就簡單多了,等待的線程就是等待者,只負責等待,喚醒的線程就是喚醒者,只負責喚醒,所以每次要執行喚醒操做的時候,直接喚醒等待隊列的首節點就好了。等待隊列的實現中不須要遍歷隊列,所以也不須要prev指針。