AQS之Condition

1、引言

       通常咱們在使用鎖的Condition時,咱們通常都是這麼使用,以ReentrantLock爲例,java

ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();

lock.lock();
try{
   condition.await();
	
}finally{
	
   lock.unlock();
}

lock.lock();
try{
   condition.signal();
	
}finally{
	
   lock.unlock();
}

  從上面能夠知道,咱們調用Condition的await和signal方法必須是在獲取獲得鎖的狀況下,首先咱們以這個爲基礎,先不論是如何獲取獲得鎖的,那麼上面的程序在condition.await()時阻塞當前調用的線程,而調用 condition.signal()方法的時候可能喚起一個正在await阻塞的線程,我這裏說的是可能不是必定。爲何這麼說,咱們來看下await()方法主要作了什麼事情。node

2、分析

下面是await 方法的在jdk8的源碼,less

下面是await 方法的在jdk8的源碼,
  public final void await() throws InterruptedException {
            if (Thread.interrupted())  // ①
                throw new InterruptedException();
            Node node = addConditionWaiter(); //②
            int savedState = fullyRelease(node); //③
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) { //④
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // ⑤
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //⑥
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled ⑦
                unlinkCancelledWaiters();
            if (interruptMode != 0) //  ⑧
                reportInterruptAfterWait(interruptMode);
  }

執行過程以下,
① ,判斷當前顯示是否被interrupt? 是的話,會拋出中斷異常InterruptedException,
② ,調用addConditionWaiter方法,主要是new 一個以當前線程爲數據的節點Node,而後添加到condition條件隊列裏。
③ ,調用fullyRelease方法,該方法內部調用release方法,而release方法主要是在AQS隊列裏面喚起第一個節點(即head的next節點)的線程(若是head後繼節點存在的話)。這時,在ASQ同步器內至少有2個活動的線程(一個是當前線程(多是頭節點),另外一個是喚起的線程(若是存在))。若是喚起失敗,會拋異常IllegalMonitorStateException。注:當存在喚起的線程的時候,這個線程就能夠去爭取獲取鎖。
④ ,經過isOnSyncQueue方法判斷該節點是否在AQS隊列中,當調用await時候,確定不在AQS上,由於addConditionWaiter方法是new 一個新的Node.接着會進入while循環裏面。調用 LockSupport.park(this);阻塞當前線程,至關於釋放鎖。
⑤ ,噹噹前線程被喚起的時候(多是 另外一個await線程喚起的第一個節點多是這個線程,或者在signal下,前驅節點已經cancel時,第一個firstWaiter節點是該當前節點),須要判斷是否被中斷,存儲在interruptMode,若是被中斷則break,不然checkInterruptWhileWaiting返回0,那麼會接着判斷node節點是否在AQS中,若是仍是不在的話,park當前線程,不然跳出while循環。那麼node節點是何時被加入到AQS上的,答案是在signal方法上。
⑥ ,當node節點在AQS隊列時,咱們須要獲取鎖,只有當前線程的節點Node在AQS隊列上,才能去爭取鎖。爭取鎖就是經過調用acquireQueued方法。等下來分析下acquireQueued方法。
⑦ , 若是當前節點的node.nextWaiter不爲空,說明還有其餘線程在該condition上,而且當前的線程已經獲取鎖,接着清除條件隊列上的cancel類型節點
⑧ , 若是interruptMode 是InterruptedException類型或者REINTERRUPT類型。則進行相應的拋中斷異常或者線程自我中斷標誌位設置。優化


接着,來分析下signal方法ui

public final void signal() {
            if (!isHeldExclusively()) // ①
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first); // ②
   }

 

執行過程以下,
① ,若是當前線程不是持有該condition的鎖,那麼執行拋IllegalMonitorStateException異常。
② ,調用doSignal方法,而且條件隊列的首節點傳入。this

    private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null) // ①
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) && (first = firstWaiter) != null); // ②
    } 

 

① ,這種條件隊列firstWaiter指針爲next節點,由於當前的節點須要被移除條件隊列,而且next節點爲空,那麼lastWaiter置爲null,說明是空條件隊列,接着把first.nextWaiter=null,說明移除了條件隊列
② ,在這裏有2步操做,一是transferForSignal,二是 first = firstWaiter,若是咱們當前first節點入AQS隊列成功,那麼transferForSignal返回true,則doSignal的while循環結束,
若是當前的first節點入AQS返回失敗,則須要next的節點從新signal,保證有一個成功的firstWaiter節點入AQS隊列。接着來看下transferForSignal 方法主要作了什麼事情。線程

 

final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) // ①
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node); // ②
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))  // ③
            LockSupport.unpark(node.thread);
        return true;
    }

① ,首先傳入的節點node是條件隊列的第一個節點(在外部已經移除),改變其狀態CONDITION,爲初始狀態0,若是改變失敗,說明該節點已經不是條件節點,直接返回false,doSignal方法從新調用新firstWaiter節點入AQS隊列,
② ,把首節點入AQS節點,enq()方法返回的是入節點的前驅節點。從這裏核心方法能夠知道,signal() 方法的做用其實只是把等待隊列中第一個非取消節點轉移到AQS的同步隊列尾部。轉移後的節點極可能正在在同步隊列阻塞着,何時喚醒,取決於它的前驅節點是不是頭節點。
③ ,若是當前前驅節點的waitStatus>0(說明是CANCELLED狀態),前驅節點已經Canncel(說明前驅節點已經中斷等狀況),則能夠調用LockSupport.unpark(node.thread)喚起線程,則await方法的park返回能夠當即返回,預先將AQS同步隊列中取消的節點移除掉,而不用等到獲取同步狀態失敗的時候再去判斷了,起到必定的優化做用。指針


最後來分析下獲取鎖方法 acquireQueued blog

執行以下:隊列

final boolean acquireQueued(final Node node, int arg) { // ①
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) { 
                final Node p = node.predecessor();  
                if (p == head && tryAcquire(arg)) { //②
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // ③
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

① ,傳入node 爲須要獲取鎖的節點,arg爲以前state狀態.
② ,若是傳入的節點的前驅節點是head節點,說明當前節點是AQS隊列的首節點,能夠嘗試去獲取鎖,即咱們須要是要實現的同步語義方法tryAcquire,若是同步語義獲取鎖成功,則設置當前頭節點爲頭節點。
這裏注意返回的值得語義是是否發生中斷,而不是獲取鎖是否成功。
③ ,調用 shouldParkAfterFailedAcquire方法,該方法用來判斷獲取鎖失敗後是否須要park當前線程,若是須要park線程,則接着判斷該線程是否有中斷標誌。


接着咱們來看下shouldParkAfterFailedAcquire 方法。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)  // ①
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {  // ②
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL); //③
        }
        return false;
    }

① 前驅節點pred的waitStatus爲SIGNAL,說明當前節點有效,返回true,表明須要park當前線程,② 前驅節點已經取消,則刪除去取消掉的前驅節點,返回false,外面繼續for循環獲取鎖③ 處在該條件語句的前驅節點的waitStatus一定是 0,或者是傳播PROPAGATE,則設置傳播節點爲SIGNAL,而後返回false,則接着去for循環獲取鎖,而且失敗的時候,調用shouldParkAfterFailedAcquire時知道前驅爲SIGNAL(以前由③設置),則須要park線程。從這裏能夠知道transferForSignal方法中,!compareAndSetWaitStatus(p, ws, Node.SIGNAL)語句,若是對其前驅節點設置Node.SIGNAL失敗,則不須要等到acquireQueued去判斷是否須要park線程,直接unpark線程便可

相關文章
相關標籤/搜索