Condition 接口

Condition 接口

任意一個java對象,都擁有一組監視器方法,主要包括 wait(),wait(long timeout)、notify 以及 notifyAll 方法,這些方法與synchronized 同步關鍵字配合,能夠實現等待/通知模式。Condition 接口也提供了類型Object的監視器方法,與Lock配合能夠實現等待/通知模式。java

condition 接口與示例

Condition 定義了等待/通知兩種類型的方法,當前線程調用這些方法時,須要提早獲取到Condition對象關聯的鎖。Condition對象由Lock對象(調用Lock對象的 newCondition方法)建立出來的。Condition是依賴Lock對象的。node

package com.lock;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created by cxx on 2018/1/18.
 */
public class ConditionOption {
    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();
    
    public void conditonWait() throws InterruptedException{
        lock.lock();
        
        try {
            condition.await();
        }finally {
            lock.unlock();
        }
        
    }
    
    public void conditonSignal() throws InterruptedException{
        lock.lock();
        try {
            condition.signal();
        }finally {
            lock.unlock();
        }
        
        
        
    }
            
}

獲取一個Condition 必須經過Lock的newCondition() 方法。有界隊列是一種特殊的隊列,當隊列爲空時,隊列的獲取操做將會阻塞獲取線程,直到隊列中有新增元素,當隊列已滿時,隊列的插入操做將會阻塞插入線程,知道隊列出現「空位。」數組

package com.lock;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created by cxx on 2018/1/18.
 */
public class BoundedQueue <T> {
    private Object[] items;

    private int addIndex,removeIndex,count;

    private Lock lock = new ReentrantLock();
    private Condition notEmpty = lock.newCondition();
    private Condition notFull = lock.newCondition();

    public BoundedQueue(int size){
        items = new Object[size];
    }

    //添加一個元素,若是數組滿,則添加線程進入等待狀態,直到有空位
    public void add(T t) throws InterruptedException{
        lock.lock();
        try {
            while (count == items.length){
                notFull.await();
            }
            items[addIndex] = t;

            if (++addIndex == items.length){
                addIndex = 0;
            }
            ++count;
            notEmpty.signal();

        }finally {
            lock.unlock();
        }

    }

    //由頭部刪除一個元素,若是數組空,則刪除線程進入等待狀態,直到有添加元素。
    public T remove() throws InterruptedException{
        lock.lock();

        try {
            while (count == 0){
                notEmpty.await();
            }
            Object x = items[removeIndex];
            if (++removeIndex == items.length){
                removeIndex = 0;
            }
            --count;
            notFull.signal();
            return (T) x;

        }finally {
            lock.unlock();
        }
    }
}

在添加和刪除方法中使用 while 循環而非 if判斷,目的是防止過早或意外的通知,只有條件符合纔可以退出循環。安全

Condition 的實現分析

ConditionObject 是同步器 AbstractQueuedSynchronizer的內部類,由於Condition的操做須要獲取相關聯的鎖。每一個Condition對象都包含着一個隊列,該隊列是Condition對象實現 等待/通知功能的關鍵。併發

等待隊列

等待隊列是一個FIFO的隊列,在隊列中的每一個節點都包含了一個線程引用,該線程就是在Condition對象上等待的線程,若是一個線程調用了Condition.await()方法,那麼該線程將會釋放鎖、構形成節點加入等待隊列並進入等待狀態。ui

一個Condition包含一個等待隊列,Condition擁有首節點(firstWaiter)和尾節點(lastWaiter)。當前線程調用Condition.await()方法,將會以當前線程構造節點,並將節點從尾部加入等待隊列。 Condition擁有首尾節點的引用,而新增節點只須要將原有的尾節點 nextWaiter 指向它,而且更新尾節點便可。上述節點應用更新的過程並無使用CAS保證,緣由在於調用 await()方法的線程一定是獲取了鎖的線程,這個過程是由鎖來保證線程安全的。this

在Object的監視器模型上,一個對象擁有一個同步隊列和等待隊列,而併發包中的Lock擁有一個同步隊列和多個等待隊列。線程

等待

調用Condition的await()方法,會使當前線程進入等待隊列並釋放鎖,同時線程狀態變爲等待狀態。當從await()方法返回時,當前線程必定獲取了Condition相關聯的鎖。code

/**
         * Implements interruptible condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException.
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled or interrupted.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * <li> If interrupted while blocked in step 4, throw InterruptedException.
         * </ol>
         */
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

調用該方法的線程成功獲取了鎖的線程,也就是同步隊列中的首節點,該方法將會當前線程構形成節點並加入等待隊列中,而後釋放同步狀態,喚醒同步隊列中的後繼節點,而後當前線程會進入等待狀態。對象

通知

調用 Condition的signal() 方法,將會喚醒在等待隊列中等待時間最長的節點,在喚醒節點以前,會將節點移到同步隊列中。

/**
         * Moves the longest-waiting thread, if one exists, from the
         * wait queue for this condition to the wait queue for the
         * owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

調用該方法的前置條件是當前線程必須獲取了鎖,能夠看到signal()方法進行了IsHeldExclusively()檢查,也就是當前線程必須是獲取了鎖的線程,接着獲取等待隊列的首節點,將其移動到同步隊列並使用LockSupport喚醒節點中的線程。

調用同步器的enq(Node node)方法,等待隊列中的頭節點線程安全地移動到同步隊列。當節點移動到同步隊列後,當前線程在使用LockSupport喚醒該節點的線程。

被喚醒後的線程,將從await()方法中的while循環中退出,進而調用同步器的acquireQueued()方法加入到獲取同步狀態的競爭中。

成功獲取同步狀態以後,被喚醒的線程就愛哪根蔥先前調用的await()方法返回,此時該線程已經成功地獲取了鎖。

Condition的signalAll()方法,至關於對等待隊列中的每一個節點均執行一次signal()方法,效果就是將等待隊列中全部節點所有移動到同步隊列中,並喚醒每一個節點的線程。

相關文章
相關標籤/搜索