Java併發——Condition

簡介

和基類Object的監視器方法同樣,Condition接口也提供了相似方法,配合Lock能夠實現等待/通知模式
java

Object的監視器方法與Condition接口對比:node

對比項 Object監視器方法 Condition
前置條件 獲取對象的鎖 調用Lock.lock()獲取鎖
調用Lock.newCondition()獲取Condition對象
調用方式 直接調用
如:object.wait()
直接調用
如:condition.wait()
等待隊列個數 一個 多個
當前線程釋放鎖並進入等待狀態 支持 支持
當前線程釋放鎖並進入等待狀態,在等待狀態中不響應中斷 不支持 支持
當前線程釋放鎖並進入超時等待狀態 支持 支持
當前線程釋放鎖並進入等待狀態到未來的某個時間 不支持 支持
喚醒等待隊列中的一個線程 支持 支持
喚醒等待隊列中的所有線程 支持 支持

Condition實現

在介紹AQS同步器時知道其維護了一個同步隊列,其實還維護了多個等待隊列,兩隊列均爲FIFO隊列,F4看Condition實現類,能夠發現其只有這兩個類(這兩個類區別在於state同步狀態一個int,一個long) 編程

等待隊列

具體來看下Condition實現類AQS中內部類ConditionObject安全

public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        // 頭節點
        private transient Node firstWaiter;
        // 尾結點
        private transient Node lastWaiter;

        public ConditionObject() { }
        ...
    }    
複製代碼

結構如圖(單向隊列):併發

await()

await()方法過程至關於同步隊列的首節點(獲取了鎖的節點)移動到Condition的等待隊列中post

public final void await() throws InterruptedException {
            // 判斷當前線程是否中斷
            if (Thread.interrupted())
                throw new InterruptedException();
            // 當前線程加入等待隊列    
            Node node = addConditionWaiter();
            // 釋放同步狀態
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // 判斷此節點是否在同步隊列中,若不在直至在同步隊列爲止
            while (!isOnSyncQueue(node)) {
                // 阻塞當前線程
                LockSupport.park(this);
                // 若線程已經中斷則退出
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 競爭同步狀態
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
複製代碼

先是判斷線程是否中斷,若中斷直接拋出異常,不然調用addConditionWaiter()將線程包裝成節點加入等待隊列ui

private Node addConditionWaiter() {
            // 獲取等待隊列的尾節點
            Node t = lastWaiter;
            // 若尾節點狀態不爲CONDITION,清除節點
            if (t != null && t.waitStatus != Node.CONDITION) {
                // 清除等待隊列中全部狀態不爲CONDITION的節點
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            // 將當前線程包裝成Node節點
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            // 尾插節點
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            // 將節點置爲尾節點    
            lastWaiter = node;
            return node;
        }
複製代碼

從源碼來看將節點加入等待隊列並無使用CAS,由於調用await()方法的線程一定是獲取了鎖的線程,即此過程是由鎖來保證線程安全,成功加入等待隊列後,調用fullyRelease()釋放同步狀態this

final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            // 獲取同步狀態
            int savedState = getState();
            // 釋放鎖
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
複製代碼

調用AQS的模板方法release()方法釋放同步狀態而且喚醒在同步隊列中頭結點的後繼節點引用的線程,若是釋放成功則正常返回,不然拋出異常。隨後調用isOnSyncQueue()判斷節點是否在同步隊列spa

final boolean isOnSyncQueue(Node node) {
        // 若狀態爲CONDITION或者前驅節點爲null
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        // 若後繼節點不爲null,代表節點確定在同步隊列中    
        if (node.next != null) // If has successor, it must be on queue
            return true;
        // 從同步隊列尾節點找節點    
        return findNodeFromTail(node);
    }
複製代碼

若節點不在同步隊列會一直在while循環體中,當此線程被中斷或者線程關聯的節點被移動到了同步隊列中(即另外線程調用的condition的signal或者signalAll方法)會結束循環調用acquireQueued()方法獲取,不然會在循環體中經過LockSupport.park()方法阻塞線程線程

await()方法示意:

signal()/signalAll()

  • signal()
  • public final void signal() {
            // 判斷當前線程是否爲獲取鎖的線程
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            // 喚醒條件隊列中的第一個節點
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
    複製代碼
    isHeldExclusively()方法須要子類重寫,其目的在於判斷當前線程是否爲獲取鎖的線程
    protected boolean isHeldExclusively() {
            throw new UnsupportedOperationException();
        }
    複製代碼
    doSignal
    private void doSignal(Node first) {
                do {
                    // 將頭節點從等待隊列中移除
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null);
            }
    複製代碼
    doSignal()將頭節點移出等待隊列,再調用transferForSignal()方法將節點添加到同步隊列中
    final boolean transferForSignal(Node node) {
            // 將node節點狀態由CONDITION  CAS置爲初始狀態0
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
            // 將node節點尾插同步隊列,返回插入後同步隊列中node節點的前驅節點
            Node p = enq(node);
            int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }
    複製代碼

    signal()方法示意:

  • signalAll()
  • private void doSignalAll(Node first) {
                lastWaiter = firstWaiter = null;
                // 將等待隊列中節點從頭節點開始逐個移出等待隊列,添加到同步隊列
                do {
                    Node next = first.nextWaiter;
                    first.nextWaiter = null;
                    transferForSignal(first);
                    first = next;
                } while (first != null);
            }
    複製代碼
    signalAll()方法至關於對等待隊列中的每一個節點均執行一次signal()方法,將等待隊列中全部節點所有移動到同步隊列中,並喚醒每一個節點的線程

    感謝

    《java併發編程的藝術》

    相關文章
    相關標籤/搜索