Android併發學習之阻塞隊列

簡介

多線程環境中,經過隊列能夠很容易實現數據共享,好比經典的「生產者」和「消費者」模型中,經過隊列能夠很便利地實現二者之間的數據共享。假設咱們有若干生產者線程,另外又有若干個消費者線程。若是生產者線程須要把準備好的數據共享給消費者線程,利用隊列的方式來傳遞數據,就能夠很方便地解決他們之間的數據共享問題。但若是生產者和消費者在某個時間段內,萬一發生數據處理速度不匹配的狀況呢?理想狀況下,若是生產者產出數據的速度大於消費者消費的速度,而且當生產出來的數據累積到必定程度的時候,那麼生產者必須暫停等待一下(阻塞生產者線程),以便等待消費者線程把累積的數據處理完畢,反之亦然。html

BlockingQueue很好地解決了上述問題,BlockingQueue即阻塞隊列,它是一個接口,它的實現類有ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等,它們的區別主要體如今存儲結構上或對元素操做上的不一樣。java

經常使用方法

public interface BlockingQueue<E> extends Queue<E> {

    //往隊列尾部添加元素,若是BlockingQueue能夠容納,則返回true,不然拋出異常
    boolean add(E e);
    
   //移除元素,若是有這個元素則就回true,不然拋出異常 
    boolean remove(Object o);
     
    //往隊列尾部添加元素,若是BlockingQueue能夠容納則返回true,不然返回false.
    //若是是往限定了長度的隊列中設置值,推薦使用offer()方法。
    boolean offer(E e);
    
   //和上面的方法差很少,不過若是隊列滿了能夠阻塞等待一段時間
    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
    
    //取出頭部對象,若不能當即取出,則能夠等time參數規定的時間,取不到時返回null
    E poll(long timeout, TimeUnit unit) throws InterruptedException; 
    
     //往隊列尾部添加元素,若是沒有空間,則調用此方法的線程被阻塞直到有空間再繼續. 
    void put(E e) throws InterruptedException;
    
    //取出頭部對象,若BlockingQueue爲空,阻斷進入等待狀態直到Blocking有新的對象被加入爲止 
    E take() throws InterruptedException;
    
    //剩餘容量,超出此容量,便沒法無阻塞地添加元素
    int remainingCapacity();
    
    //判斷隊列中是否擁有該值。
    boolean contains(Object o);
    
    //一次性從BlockingQueue獲取全部可用的數據對象,能夠提高獲取數據效率
    int drainTo(Collection<? super E> c);
    
     //和上面的方法差很少,不過限制了最大取出數量
    int drainTo(Collection<? super E> c, int maxElements);
    
}
複製代碼

源碼簡析

咱們以ArrayBlockingQueue爲例分析下上述方法:數組

offer(E e)安全

public boolean offer(E e) {
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
    
   private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length) putIndex = 0;
        count++;
        notEmpty.signal();
    }
複製代碼

offer操做如上,代碼比較簡單,可見阻塞隊列是經過可重入保證線程安全。enqueue方法也說明了ArrayBlockingQueue是經過數組的形式存儲數據的。若是隊列滿了直接會返回false,不會阻塞線程。bash

put(E e)多線程

public void put(E e) throws InterruptedException {
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)//隊列滿了,一直阻塞在這裏
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

複製代碼

由於put方法在隊列已滿的狀況下會阻塞線程,take、poll等方法會調用dequeue方法出列,從而調用notFull.signal(),從而喚醒阻塞在put方法中線程去繼續進行入列操做:ui

take()this

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
  private E dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length) takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }
複製代碼

poll(long timeout, TimeUnit unit)spa

從對頭取出一個元素:若是數組不空,出隊;若是數組已空且已經超時,返回null;若是數組已空則進入等待,直到被喚醒或超時:.net

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);////將時間轉換爲納秒
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {//隊列爲空
                if (nanos <= 0L)
                    return null;
                //阻塞指定時間,enqueue()方法會調用notEmpty.signal()喚醒進行poll操做的線程
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
複製代碼

參考文章和擴展閱讀

雖然只講了阻塞隊列,但涉及了ReentrantLock、中斷、Condition等知識點,若是不清楚的話能夠看下下面的幾篇文章:

blog.csdn.net/vernonzheng…

wsmajunfeng.iteye.com/blog/162935…

Lock鎖和Condition條件

ava中Lock,tryLock,lockInterruptibly有什麼區別?

相關文章
相關標籤/搜索