Java併發編程之Condition源碼分析

Condition介紹

上篇文章講了ReentrantLock的加鎖和釋放鎖的使用,這篇文章是對ReentrantLock的補充。ReentrantLock#newCondition()能夠建立Condition,在ReentrantLock加鎖過程當中能夠利用Condition阻塞當前線程並臨時釋放鎖,待另外線程獲取到鎖並在邏輯後通知阻塞線程"激活"。Condition經常使用在基於異步通訊的同步機制實現中,好比dubbo中的請求和獲取應答結果的實現。java

經常使用方法

Condition中主要的方法有2個node

  • (1)await()方法能夠阻塞當前線程,並釋放鎖。
  • (2)在獲取鎖後能夠調用signal()通知被await()阻塞的線程"激活"。

這裏的await(),signal()必須在ReentrantLock#lock()和ReentrantLock#unlock()之間調用。異步

Condition實現分析

Condition的實現也是利用AbstractQueuedSynchronizer隊列來實現,await()在被調用後先將當前線程加入到等待隊列中,而後釋放鎖,最後阻塞當前線程。signal()在被調用後會先獲取等待隊列中第一個節點,並將這個節點轉化成ReentrantLock中的節點並加入到同步阻塞隊列的結尾,這樣此節點的上個節點線程釋放鎖後會激活此節點線程取來獲取鎖。源碼分析

await()方法源碼分析

await()源碼以下ui

public final void await() throws InterruptedException {
		//判斷是否當前線程是否被中斷中斷則拋出中斷異常
            if (Thread.interrupted())
                throw new InterruptedException();
		//加入等待隊列
            Node node = addConditionWaiter();
		//釋放當前線程鎖
            int savedState = fullyRelease(node);
            int interruptMode = 0;
		//判斷是否在同步阻塞隊列,若是不在一直循環到被加入
            while (!isOnSyncQueue(node)) {
		//阻塞當前線程
                LockSupport.park(this);
		//判斷是否被中斷
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
		//獲取鎖,若是獲取中被中斷則設置中斷狀態
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
		//清除等待隊列中被"激活"的節點
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
		//若是當前線程被中斷,處理中斷邏輯
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

主要分如下幾步this

  • (1)先判斷是否當前線程是否被中斷中斷則拋出中斷異常若是未中斷調用addConditionWaiter()加入等待隊列線程

  • (2)調用fullyRelease(node)釋放鎖使同步阻塞隊列的下個節點線程能獲取鎖。code

  • (3)調用isOnSyncQueue(node)判斷是否在同步阻塞隊列,這裏的加入同步阻塞隊列操做是在另外一個線程調用signal()後加入,若是不在同步阻塞隊列會進行阻塞直到被激活。隊列

  • (4)若是被激活而後調用checkInterruptWhileWaiting(node)判斷是否被中斷並獲取中斷模式。get

  • (5)繼續調用isOnSyncQueue(node)判斷是否在同步阻塞隊列。

  • (6)是則調用acquireQueued(node, savedState) 獲取鎖,這裏若是獲取不到也會被阻塞,獲取不到緣由是在第一次調用isOnSyncQueue(node)前,可能另外一個線程已經調用signal()後加入到同步阻塞隊列,而後調用acquireQueued(node, savedState) 獲取不到鎖並阻塞。acquireQueued(node, savedState)也會返回當前線程是否被中斷,若是被中斷設置中斷模式。

  • (7)在激活後調用unlinkCancelledWaiters()清理等待隊列的已經被激活的節點。

  • (8)最後判斷當前線程是否被中斷,若是被中斷則對中斷線程作處理。

下面來看下addConditionWaiter()實現

private Node addConditionWaiter() {
 		//獲取等待隊列尾部節點
            Node t = lastWaiter;
            //若是尾部狀態不爲CONDITION,若是已經被"激活",清理之,而後從新獲取尾部節點
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
		//建立以當前線程爲基礎的節點,並將節點模式設置成CONDITION
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
		//若是尾節點不存在,說明隊列爲空,將頭節點設置成當前節點
            if (t == null)
                firstWaiter = node;
		//若是尾節點存在,將此節點設置成尾節點的下個節點
            else
                t.nextWaiter = node;
		//將尾節點設置成當前節點
            lastWaiter = node;
            return node;
        }

addConditionWaiter()的邏輯很簡單,就是建立以當前線程爲基礎的節點並把節點加入等待隊列的尾部待其餘線程處理。

下面來看下fullyRelease(Node node)實現

final int fullyRelease(Node node) {
        boolean failed = true;
        try {
		//獲取阻塞隊列中當前線程節點的鎖狀態值
            int savedState = getState();
		//釋放當前線程節點鎖
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
		//釋放失敗講節點等待狀態設置成關閉
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

調用getState()先獲取阻塞隊列中當前線程節點的鎖狀態值,這個值可能大於1表示屢次重入,而後調用release(savedState)釋放全部鎖,若是釋放成功返回鎖狀態值。

下面來看下isOnSyncQueue(Node node)實現

final boolean isOnSyncQueue(Node node) {
		//判斷當前節點是不是CONDITION或者前置節點是否爲空若是爲空直接返回false
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
		//若是下個節點存在,則在同步阻塞隊列中返回true
        if (node.next != null) // If has successor, it must be on queue
            return true;
		//遍歷查找當前節點是否在同步阻塞隊列中
        return findNodeFromTail(node);
    }
    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }

此方法的功能是查找當前節點是否在同步阻塞隊列中,方法先是快速判斷,判斷不了再進行遍歷查找。

  • (1)第一步先判斷次節點是否CONDITION狀態或者前置節點是否存在,若是是代表不在隊列中返回false,阻塞隊列中的狀態通常是0或者SIGNAL狀態並且若是當前若是當前節點在隊列阻塞中且未被激活前置節點必定不爲空。
  • (2)第二步判斷節點的下個節點是否存在,若是存在則代表當前當前節點已加入到阻塞隊列中。
  • (3)若是以上2點都無法判斷,也有可能剛剛加入到同步阻塞隊列中,因此調用findNodeFromTail(Node node)作最後的遍歷查找。查找從隊列尾部開始查,從尾部開始查的緣由是可能剛剛加入到同步阻塞隊列中,從尾部能快速定位。

下面看下checkInterruptWhileWaiting(Node node)實現

private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }

final boolean transferAfterCancelledWait(Node node) {
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node);
            return true;
        }
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }

此方法在線程被激活後被調用,主要功能就是判斷被激活的線程是否被中斷。此方法會返回2種中斷狀態THROW_IE和REINTERRUPT,THROW_IE是調用signal()前被中斷返回,REINTERRUPT在調用signal()後被中斷返回。 此方法先判斷是否被標記中斷,是的話再調用transferAfterCancelledWait(node)取判斷是那種中斷狀態,transferAfterCancelledWait(node)方法分2步

  • (1)用CAS方式將節點狀態改錯等待狀態改爲CONDITION,並加入到同步阻塞隊列中返回true
  • (2)若是不能加入到同步阻塞隊列就自旋一直等待加入

若是使用await()方法上面2步實際上是沒什麼做用其最後必定會返回false,由於await()被激活只能調用 signal()方法,而signal()方法確定已經將節點加入到同步阻塞隊列中。因此以上邏輯是給await(long time, TimeUnit unit)等帶超時激活方法用的。

acquireQueued(node, savedState)方法再上一章節已經講過這邊就不重複了,下面分析下unlinkCancelledWaiters()方法

private void unlinkCancelledWaiters() {
		//獲取等待隊列頭節點
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
		//獲取下個節點
                Node next = t.nextWaiter;
		//若是狀態不爲CONDITION說明已經加入阻塞隊列須要清理掉
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
			//獲取下個節點
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

此方法就是從頭開始查找狀態不爲CONDITION的節點並清理,狀態不爲CONDITION節點說明此節點已經加入到阻塞隊列,已經不須要維護。

下面來看下reportInterruptAfterWait(interruptMode)方法

private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
		//若是是THROW_IE模式直接拋出異常
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
		//若是是REINTERRUPT模式標記線程中斷由上層處理中斷
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }

此方法處理中斷邏輯。若是是THROW_IE模式直接拋出異常,若是是REINTERRUPT模式標記線程中斷由上層處理中斷。

signal()方法源碼分析

signal()源碼以下

public final void signal() {
		//是否當前線程持有鎖
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
		//通知"激活"頭節點線程
            if (first != null)
                doSignal(first);
        }

先調用isHeldExclusively()判斷鎖是否被當前線程持有,而後檢查等待隊列是否爲空,不爲空就是能夠取第一個節點調用doSignal(first)去"激活",這裏激活不是真正的激活而只是將節點加入到同步阻塞隊列尾部,因此上下文中帶""的激活都是這種解釋。

下面看下isHeldExclusively()實現

protected final boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

實現就是比較下當前線程和持有鎖的線程是否同一個

下面看下doSignal(first)的實現

private void doSignal(Node first) {
            do {
		//頭指頭後移一位,若是後面的節點爲空,則將尾指頭也指向空,說明隊列爲空了
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
		//清空頭節點的下個節點
                first.nextWaiter = null;
		//若是"激活"失敗者取下個繼續,直到成功或者遍歷完
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

此方法就是取當前頭節點一直去嘗試"激活",直到成功或者遍歷完。

下面來看下transferForSignal(first)方法

final boolean transferForSignal(Node node) {
		//將CONDITION狀態設置成0
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
		//加入到同步阻塞隊列
        Node p = enq(node);
        int ws = p.waitStatus;
		//狀態異常直接激活
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
  • (1)此方法先先將CONDITION狀態設置成0,由於若是是CONDITION狀態加入到同步阻塞隊列,激活的時候是不識別的。
  • (2)加入到同步阻塞隊列的尾部。因此同步阻塞隊列中前面若是有多個在排隊,調用unlock()不會立刻激活此節點。
  • (3)狀態異常直接調用unpark激活,這邊按理說若是狀態異常狀況下激活,await()在調用unlock()被激活後會進行相應的異常處理,但看await()代碼沒有處理則是正常執行。

這個方法主要就是把節點加入到同步阻塞隊列的,真正的激活則是調用unlock()去處理。

相關文章
相關標籤/搜索