Inside AbstractQueuedSynchronizer (4)

3.6 ConditionObject java

    AbstractQueuedSynchronizer的內部類ConditionObject實現了Condition接口。Condition接口提供了跟Java語言內置的monitor機制相似的接口:await()/signal()/signalAll(),以及一些支持超時和回退的await版本。能夠將任意個數的ConcitionObject關聯到對應的synchronizer,例如經過調用ReentrantLock.newCondition()方法便可構造一個ConditionObject實例。每一個ConditionObject實例內部都維護一個ConditionQueue,該隊列的元素跟AbstractQueuedSynchronizer的WaitQueue同樣,都是Node對象。 node

 

    ConditionObject的await()代碼以下: 併發

Java代碼   收藏代碼
  1. public final void await() throws InterruptedException {  
  2.     if (Thread.interrupted())  
  3.         throw new InterruptedException();  
  4.     Node node = addConditionWaiter();  
  5.     int savedState = fullyRelease(node);  
  6.     int interruptMode = 0;  
  7.     while (!isOnSyncQueue(node)) {  
  8.         LockSupport.park(this);  
  9.         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)  
  10.             break;  
  11.     }  
  12.     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)  
  13.         interruptMode = REINTERRUPT;  
  14.     if (node.nextWaiter != null// clean up if cancelled  
  15.         unlinkCancelledWaiters();  
  16.     if (interruptMode != 0)  
  17.         reportInterruptAfterWait(interruptMode);  
  18. }  

    以上代碼中,能夠看出ConditionObject的await語義跟Java語言內置的monitor機制是很是類似的(詳見:http://whitesock.iteye.com/blog/162344 )。首先addConditionWaiter()將當前線程加入到ConditionQueue中,而後fullyRelease(node)釋放掉跟ConditionObject關聯的synchronizer鎖。若是某個線程在沒有持有對應的synchronizer鎖的狀況下調用某個ConditionObject對象的await()方法,那麼跟Object.wait()同樣會拋出IllegalMonitorStateException。接下來while (!isOnSyncQueue(node)) {...}會保證在其它線程調用了該ConditionObject的signal()/siangalAll()以前,當前線程一直被阻塞(signal()/siangalAll()的行爲稍後會介紹)。在被signal()/siangalAll()喚醒以後,await()經過acquireQueued(node, savedState)確保再次得到synchronizer的鎖。 app

 

    ConditionObject的signal()代碼以下: 函數

Java代碼   收藏代碼
  1. public final void signal() {  
  2.     if (!isHeldExclusively())  
  3.         throw new IllegalMonitorStateException();  
  4.     Node first = firstWaiter;  
  5.     if (first != null)  
  6.         doSignal(first);  
  7. }  
  8.   
  9. private void doSignal(Node first) {  
  10.     do {  
  11.         if ( (firstWaiter = first.nextWaiter) == null)  
  12.             lastWaiter = null;  
  13.         first.nextWaiter = null;  
  14.     } while (!transferForSignal(first) &&  
  15.              (first = firstWaiter) != null);  
  16. }  

   那麼跟await()同樣,若是某個線程在沒有持有對應的synchronizer鎖的狀況下調用某個ConditionObject對象的signal()/siangalAll()方法,會拋出IllegalMonitorStateException。signal()主要的行爲就是將ConditionQueue中對應的Node實例transfer到AbstractQueuedSynchronizer的WaitQueue中,以便在synchronizer release的過程當中,該Node對應的線程可能被喚醒。 ui

 

3.7 Timeout & Cancellation this

    AbstractQueuedSynchronizer的acquireQueued()和doAcquire***()系列方法在acquire失敗(超時或者中斷)後,都會調用cancelAcquire(Node node)方法進行清理,其代碼以下: atom

Java代碼   收藏代碼
  1. private void cancelAcquire(Node node) {  
  2.     // Ignore if node doesn't exist  
  3.     if (node == null)  
  4.         return;  
  5.   
  6.     node.thread = null;  
  7.   
  8.     // Skip cancelled predecessors  
  9.     Node pred = node.prev;  
  10.     while (pred.waitStatus > 0)  
  11.         node.prev = pred = pred.prev;  
  12.   
  13.     // predNext is the apparent node to unsplice. CASes below will  
  14.     // fail if not, in which case, we lost race vs another cancel  
  15.     // or signal, so no further action is necessary.  
  16.     Node predNext = pred.next;  
  17.   
  18.     // Can use unconditional write instead of CAS here.  
  19.     // After this atomic step, other Nodes can skip past us.  
  20.     // Before, we are free of interference from other threads.  
  21.     node.waitStatus = Node.CANCELLED;  
  22.   
  23.     // If we are the tail, remove ourselves.  
  24.     if (node == tail && compareAndSetTail(node, pred)) {  
  25.         compareAndSetNext(pred, predNext, null);  
  26.     } else {  
  27.         // If successor needs signal, try to set pred's next-link  
  28.         // so it will get one. Otherwise wake it up to propagate.  
  29.         int ws;  
  30.         if (pred != head &&  
  31.             ((ws = pred.waitStatus) == Node.SIGNAL ||  
  32.              (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&  
  33.             pred.thread != null) {  
  34.             Node next = node.next;  
  35.             if (next != null && next.waitStatus <= 0)  
  36.                 compareAndSetNext(pred, predNext, next);  
  37.         } else {  
  38.             unparkSuccessor(node);  
  39.         }  
  40.   
  41.         node.next = node; // help GC  
  42.     }  
  43. }  

    須要注意的是, cancelAcquire(Node node)方法是可能會被併發調用。while (pred.waitStatus > 0) {...}這段循環的做用就是清除當前Node以前的已經被標記爲取消的節點,可是head節點除外(由於head節點保證不會被標記爲Node.CANCELLED)。這段循環初看起來有併發問題,可是推敲一下以後發現:循環過程當中函數參數node的waitStatus不會大於0,所以即便是多個線程併發執行這個循環,那麼這些線程處理的都只是鏈表中互不重疊的一部分。接下來在node.waitStatus = Node.CANCELLED執行完畢以後,後續的操做都必需要避免併發問題。 spa

 

    關於處理線程中斷, ConditionObject的await()/signal()/signalAll()等方法符合JSR 133: Java Memory Model and Thread Specification Revision中規定的語義:若是中斷在signal以前發生,那麼await必須在從新得到synchronizer的鎖以後,拋出InterruptedException;若是中斷髮生在signal以後發生,那麼await必需要設定當前線程的中斷狀態,而且不能拋出InterruptedException。 .net

 

4 Reference

The java.util.concurrent Synchronizer Framework

The Art of Multiprocessor Programming

相關文章
相關標籤/搜索