併發數據結構與線程(ArrayBlockingQueue)

今天在QQ羣上拋出來一個問題,以下
clipboard.pnghtml

我以Java自帶的數據結構爲例,用源碼的形式說明,如何阻塞線程、通知線程的。node

1、Lock & Condition
ArrayBlockingQueue以可重入鎖和兩個Condition對象來控制併發。數據結構

/*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    private final ReentrantLock lock;
    /** Condition for waiting takes */
    private final Condition notEmpty;
    /** Condition for waiting puts */
    private final Condition notFull;

構造函數中初始化了notEmpty和notFull.併發

/**
     * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
     * capacity and the specified access policy.
     * @param capacity the capacity of this queue
     * @param fair if <tt>true</tt> then queue accesses for threads blocked
     *        on insertion or removal, are processed in FIFO order;
     *        if <tt>false</tt> the access order is unspecified.
     * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
     */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

2、線程阻塞
當ArrayBlockingQueue存儲的元素是0個的時候,take()方法會阻塞.less

public Object take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == 0)
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            Object x = extract();
            return x;
        } finally {
            lock.unlock();
        }
    }

這裏take方法首先得到可重入鎖lock,而後判斷若是元素爲空就執行notEmpty.await(); 這個時候線程掛起。函數

3、通知線程
好比使用put放入一個新元素,ui

/**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

enqueue方法中,this

/**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

對剛纔的notEmptyCondition進行通知。
4、ReentrantLock vs AbstractQueuedSynchronizer
ArrayBlockingQueue使用ReentrantLock來控制併發,同時也使用ArrayBlockingQueue的Condition對象來與線程交互。notEmptynotFull都是由
ReentrantLock的成員變量sync生成的,spa

public Condition newCondition() {
        return sync.newCondition();
    }

sync能夠認爲是一個抽象類類型,Sync,它是在ReentrantLock內部定義的靜態抽象類,抽象類實現了newCondition方法,.net

final ConditionObject newCondition() {
            return new ConditionObject();
        }

返回的類型是實現了Condition接口的ConditionObject類,這是在AbstractQueuedSynchronizer內部定義的類。在ArrayBlockingQueue中的notEmpty就是ConditionObject實例。

阻塞:
ArrayBlockingQueue爲空時,notEmpty.await()將本身掛起,如ConditionObject的await方法,

/**
         * Implements interruptible condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException.
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled or interrupted.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * <li> If interrupted while blocked in step 4, throw InterruptedException.
         * </ol>
         */
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

addConditionWaiter是將當前線程做爲一個node加入到ConditionObject的隊列中,隊列是用鏈表實現的。
若是是初次加入隊列的狀況,node.waitStatus == Node.CONDITION成立,方法isOnSyncQueue返回false,那麼就將線程park。

while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                ....
}

至此線程被掛起,LockSupport.park(this);這裏this是指ConditionObject,是notEmpty.
通知:
當新的元素put進入ArrayBlockingQueue後,notEmpty.signal()通知在這上面等待的線程,如ConditionObject的signal方法,

/**
         * Moves the longest-waiting thread, if one exists, from the
         * wait queue for this condition to the wait queue for the
         * owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

doSignal方法,

/**
         * Removes and transfers nodes until hit non-cancelled one or
         * null. Split out from signal in part to encourage compilers
         * to inline the case of no waiters.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

doSignal一開始接收到的參數就是firstWaiter這個參數,在內部實現中用了do..while的形式,首先將first的的nextWaiter找出來保存到firstWaiter此時(first和firstWaiter不是一回事),在while的比較條件中可調用了transferForSignal方法,
整個while比較條件能夠看着短路邏輯,若是transferForSignal結果爲true,後面的first = firstWaiter就不執行了,整個while循環就結束了。

參照註釋,看

transferForSignal方法,

/**
     * Transfers a node from a condition queue onto sync queue.
     * Returns true if successful.
     * @param node the node
     * @return true if successfully transferred (else the node was
     * cancelled before signal)
     */
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

首先確保想要被signal的等待node仍是處於Node.CONDITION狀態,而後調整狀態爲Node.SIGNAL,這兩個都是採用CAS方法,最後調用的是

LockSupport.unpark(node.thread);

5、LockSupport
至此,咱們已經知道了線程的掛起和通知都是使用LockSupport來完成的,併發數據結構與線程直接的交互最終也是須要LockSupport。那麼關於LockSupport,咱們又能夠了解多少呢?

Ref:
Java中的ReentrantLock和synchronized兩種鎖定機制的對比
Java的LockSupport.park()實現分析

相關文章
相關標籤/搜索