AbstractQueuedSynchronizer理解之四(Condition)

什麼是Condition

Condition必需要和獨佔鎖一塊兒使用,獨佔鎖代替了原來的synchronized,Condition代替了原來的Object中的監視器方法(wait, notify and notifyAll);一個Lock能夠對應多個Condition,這樣線程之間能夠按照條件喚醒指定的線程,而不是簡單的notifyAll多有的線程,使得咱們多線程編程的時候能夠靈活的控制線程。java

獨佔鎖和Condition最經典的配合使用就是ArrayBlockingQueue.java,典型的生產者消費者問題:node

/*
 * Concurrency control uses the classic two-condition algorithm
 * found in any textbook.
 */

/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

這是在許多教科書中能找到的經典的雙Condition算法的併發控制,須要有一個獨佔鎖ReentrantLock,而後再定義兩個Condition,notEmpty(隊列不是空的)表示能夠從隊列中消費元素的信號條件,notFull(隊列不是滿的)表示能夠向隊列生產元素的信號條件。這兩個Condition都是調用了lock.newCondition()方法實例化的。算法

當消費者線程調用消費方法take時:編程

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        //當隊列的元素數量爲0時,調用notEmpty.await,阻塞當前的消費線程
        while (count == 0)
            notEmpty.await();
        //dequeue中調用了notFull.signal(),通知生產者隊列還沒滿,能夠生產
        return dequeue();
    } finally {
        lock.unlock();
    }
}

當生產者線程調用生產方法put時:安全

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        //當隊列滿時,調用notFull.await(),阻塞當前生產線程,中止生產
        while (count == items.length)
            notFull.await();
        //enqueue中調用了notEmpty.signal(),通知消費者隊列裏有元素,能夠消費
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

Condition的await

在AQS中有一個ConditionObject內部類實現了Condition接口,其中有兩個成員變量:多線程

/** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;

Condition也有一個node隊列,firstWaiter、lastWaiter分別表示第一個和最後一個node。併發

先看await方法:less

public final void await() throws InterruptedException {
        //若是線程設置中斷標誌,拋出中斷異常
        if (Thread.interrupted())
            throw new InterruptedException();
        //往隊列添加node
        Node node = addConditionWaiter();
        //徹底釋放鎖,head的後繼節點將被喚醒,而後被移出sync隊列
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        //判斷當前節點是否在sync隊列中(當condition調用signal是會將該節點放入Sync隊列),若是不在就park當前線程,線程在這裏開始等待被signal
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            //發送中斷時(喚醒了線程)break;checkInterruptWhileWaiting中調用了transferAfterCancelledWait(貼在下面),這個方法時檢測中斷是發生在signal以前仍是以後
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        //當前線程被signal後,調用acquireQueued搶佔鎖,若是interruptMode不爲拋出異常,設置爲REINTERRUPT
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            //從頭至尾移除取消的節點
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            //繼續中斷仍是拋出異常
            reportInterruptAfterWait(interruptMode);
    }
        
final boolean transferAfterCancelledWait(Node node) {
    //首先CAS設置node狀態爲0,若是成功說明中斷髮生在signal以前(由於signal會將node狀態設置爲0)
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        //將node入sync隊列
        enq(node);
        return true;
    }
    /*
     * If we lost out to a signal(), then we can't proceed
     * until it finishes its enq().  Cancelling during an
     * incomplete transfer is both rare and transient, so just
     * spin.
     */
    //若是node不在sync隊列中,yield,讓出cpu
    while (!isOnSyncQueue(node))
        Thread.yield();
    //中斷髮生在signal後
    return false;
}

分析一下addConditionWaiter:oop

private Node addConditionWaiter() {
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        //若是最後一個node被取消,清除node
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        //新建一個node,持有當前線程,狀態爲CONDITION
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
            //若是尾節點爲null,說明condition隊列仍是空的,將新建的node做爲頭節點
            firstWaiter = node;
        else
            //若是condition隊列已經存在,將新建的node做爲尾節點的next
            t.nextWaiter = node;
        //將新建node設置爲尾節點
        lastWaiter = node;
        //返回新建的node
        return node;
    }

在這裏咱們能夠看到Condition的隊列是一個單鏈表。
看一下unlinkCancelledWaiters,Condition全部操做都是在獲取鎖以後執行的,因此不用考慮線程安全問題:ui

private void unlinkCancelledWaiters() {
        Node t = firstWaiter;
        Node trail = null;
        while (t != null) {
            Node next = t.nextWaiter;
            if (t.waitStatus != Node.CONDITION) {
                t.nextWaiter = null;
                if (trail == null)
                    firstWaiter = next;
                else
                    trail.nextWaiter = next;
                if (next == null)
                    lastWaiter = trail;
            }
            else
                trail = t;
            t = next;
        }
    }

該方法從隊列頭開始日後遍歷全部node,移除已經取消的node;

在新建了node後,調用了fullyRelease:

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        //保存當前的state
        int savedState = getState();
        //release(savedState)嘗試釋放鎖,這也是爲何叫fullyRelease
        if (release(savedState)) {
            failed = false;
            //返回以前保存的state
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            //若是失敗,將當前node設置爲取消狀態
            node.waitStatus = Node.CANCELLED;
    }
}

看一下release:

public final boolean release(int arg) {
    //嘗試釋放鎖,這裏調用的是ReentrantLock實現的tryRelease,傳入的arg是當前的state,因此會釋放成功,即state爲0
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            //喚醒後繼節點
            unparkSuccessor(h);
        return true;
    }
    return false;
}

下面的方法是判斷當前節點是否在Sync隊列中

final boolean isOnSyncQueue(Node node) {
    //若是當前節點狀態爲CONDITION或者節點前驅爲null,說明該節點已經在CONDITION隊列中,不在Syc隊列裏
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    //若是節點後繼不是null,那該節點必定在Syc隊列中
    if (node.next != null) // If has successor, it must be on queue
        return true;
    /*
     * node.prev can be non-null, but not yet on queue because
     * the CAS to place it on queue can fail. So we have to
     * traverse from tail to make sure it actually made it.  It
     * will always be near the tail in calls to this method, and
     * unless the CAS failed (which is unlikely), it will be
     * there, so we hardly ever traverse much.
     */
    //此時節點入列的CAS動做可能失敗,因此要從尾部往前查找該節點再次確認
    return findNodeFromTail(node);
}

Condition的signal

public final void signal() {
        //若是當前線程不是當前的獨佔線程,拋出異常
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            //signal Condition隊列的第一個節點
            doSignal(first);
    }
    
    private void doSignal(Node first) {
        //若是transferForSignal失敗(即當前節點取消)且下一個節點存在,while繼續loop
        do {
            //設置第一個節點的next爲firstWaiter,此時若是firstWaiter爲null,說明隊列空了,將lastWaiter也設置爲null
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            //設置第一個節點next爲null,help GC
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }
    
    final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
     //若是爲node設置狀態失敗,說明node被取消,返回false
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    //將當前node入列sync隊列,返回node的前繼
    Node p = enq(node);
    int ws = p.waitStatus;
    //若是前繼的狀態爲取消或者設置前繼狀態爲SIGNAL失敗,當前node線程unpark
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

signal後,Condition第一個節點將入列sync的隊列,等待搶佔到鎖繼續執行。

總結

在一開是的例子中,假設有兩個線程P,C分別表明生產者和消費者線程,生產消費元素E的隊列Q容量爲1。

C無限loop調用take,當C搶佔到獨佔鎖,發現Q時空的,調用notEmpty.await(),線程C釋放鎖而且入列notEmpty隊列park,等待別的線程調用notEmpty.signal();

P無限loop調用put,當P搶佔到獨佔鎖生產了一個E,調用notEmpty.signal()通知C,而後釋放了鎖;

C收到signal信號,入列SYC隊列,而且unpark,嘗試搶佔獨佔鎖,成功得到獨佔鎖後,消費了一個E,而後調用notFull.signal();

P生產E時發現Q已滿(C還沒來得及消費),調用notFull.await()線程P釋放鎖而且入列notFull隊列park,等待notFull.signal()通知本身unpark併入列AQS隊列去搶佔獨佔鎖進行生產;

相關文章
相關標籤/搜索