併發容器與框架——併發容器(二)

1.何爲阻塞隊列

    阻塞隊列(BlockingQueue)是一個支持兩個附加操做的隊列。這兩個附加的操做支持阻塞的插入和移除方法。java

  1. 支持阻塞的插入方法:意思是當隊列滿(無界隊列除外)時,隊列會阻塞插入元素的線程,直到隊列不滿。
  2. 支持阻塞的移除方法:意思是在隊列爲空時,獲取元素的線程會等待隊列變爲非空。

    阻塞隊列經常使用於生產者和消費者的場景,生產者是向隊列裏添加元素的線程,消費者是從隊列裏取元素的線程。阻塞隊列就是生產者用來存放元素、消費者用來獲取元素的容器。阻塞隊列對插入和移除兩個附加操做提供了4種處理方式。數組

  1. 拋出異常:當隊列滿時,若是再往隊列裏插入元素,會拋出IllegalStateException("Queuefull")異常。當隊列空時,從隊列裏獲取元素會拋出NoSuchElementException異常。
  2. ·返回特殊值:當往隊列插入元素時,會返回元素是否插入成功,成功返回true。若是是移除方法,則是從隊列裏取出一個元素,若是沒有則返回null。
  3. ·一直阻塞:當阻塞隊列滿時,若是生產者線程往隊列裏put元素,隊列會一直阻塞生產者線程,直到隊列可用或者響應中斷退出。當隊列空時,若是消費者線程從隊列裏take元素,隊列會阻塞住消費者線程,直到隊列不爲空。
  4. ·超時退出:當阻塞隊列滿時,若是生產者線程往隊列裏插入元素,隊列會阻塞生產者線程一段時間,若是超過了指定的時間,生產者線程就會退出。

    注意:若是是無界阻塞隊列,隊列不可能會出現滿的狀況,因此使用put或offer方法永 遠不會被阻塞,並且使用offer方法時,該方法永遠返回true。緩存

2.阻塞隊列

    2.1 分類

  1. ArrayBlockingQueue:一個由數組結構組成的有界阻塞隊列。
  2. LinkedBlockingQueue:一個由鏈表結構組成的有界阻塞隊列。
  3. PriorityBlockingQueue:一個支持優先級排序的無界阻塞隊列。
  4. DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。
  5. SynchronousQueue:一個不存儲元素的阻塞隊列。
  6. LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
  7. LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。

    2.2 七種阻塞隊列使用詳解

  1. ArrayBlockingQueue:是一個用數組實現的有界阻塞隊列。此隊列按照先進先出的原則對元素進行排序。默認狀況下不保證線程公平的訪問隊列,所謂公平訪問隊列是指阻塞的線程,能夠按照阻塞的前後順序訪問隊列,即先阻塞線程先訪問隊列。非公平性是對先等待的線程是非公平的,當隊列可用時,阻塞的線程均可以爭奪訪問隊列的資格,有可能先阻塞的線程最後才訪問隊列。爲了保證公平性,一般會下降吞吐量。訪問者的公平性是使用可重入鎖實現的。
    public class TestBlockingQueue {
    	public static void main(String[] args) {
    		int capacity=1000;//有界隊列元素容量,必須>=1
    		ArrayBlockingQueue<String> arr1=new ArrayBlockingQueue<String>(capacity);
    		ArrayBlockingQueue<String> arr2=new ArrayBlockingQueue<String>(capacity,true);//boolean值表示是否採用公平性原則
    		
    	}
    }

     

  2. LinkedBlockingQueue:LinkedBlockingQueue是一個用鏈表實現的有界阻塞隊列。此隊列的默認和最大長度爲 Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序。
  3. PriorityBlockingQueue:是一個支持優先級的無界阻塞隊列。默認狀況下元素採起天然順序升序排列。也能夠自定義類實現compareTo()方法來指定元素排序規則,或者初始化 PriorityBlockingQueue時,指定構造參數Comparator來對元素進行排序。須要注意的是不能保證 同優先級元素的順序。
  4. DelayQueue:DelayQueue是一個支持延時獲取元素的無界阻塞隊列。隊列使用PriorityQueue來實現。隊 列中的元素必須實現Delayed接口(能夠參考ScheduledThreadPoolExecutor 裏ScheduledFutureTask類的實現,在建立元素時能夠指定多久才能從隊列中獲取當前元素。 只有在延遲期滿時才能從隊列中提取元素。該阻塞隊列很是有用,好比能夠設計緩存系統,使用一個線程循環查詢 DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了;還能夠作定時任務調度,使用DelayQueue保存當天將會執行的任務和執行時間,一旦從 DelayQueue中獲取到任務就開始執行,好比TimerQueue就是使用DelayQueue實現的。
  5. SynchronousQueue:SynchronousQueue是一個不存儲元素的阻塞隊列。每個put操做必須等待一個take操做,不然不能繼續添加元素。它支持公平訪問隊列。默認狀況下線程採用非公平性策略訪問隊列。使用如下構造方法能夠建立公平性訪問的SynchronousQueue,若是設置爲true,則等待的線程會採用先進先出的順序訪問隊列。能夠將該隊列理解爲一個傳球手,負責把生產者線程處理的數據直接傳遞給消費者線程。隊列自己並不存儲任何元素,很是適合傳遞性場景。
  6. LinkedTransferQueue:LinkedTransferQueue是一個由鏈表結構組成的無界阻塞TransferQueue隊列。相對於其餘阻塞隊列,LinkedTransferQueue多了tryTransfer和transfer方法。
    1. 使用transfer方法:若是當前有消費者正在等待接收元素(消費者使用take()方法或帶時間限制的poll()方法 時),transfer方法能夠把生產者傳入的元素立transfer(傳輸)給消費者。若是沒有消費者在等 待接收元素,transfer方法會將元素存放在隊列的tail節點,並等到該元素被消費者消費了才返 回。
    2. tryTransfer方法:tryTransfer方法是用來試探生產者傳入的元素是否能直接傳給消費者。若是沒有消費者等 待接收元素,則返回false。和transfer方法的區別tryTransfer方法不管消費者是否接收,方法 當即返回,而transfer方法是必須等到消費者消費了才返回。 對於帶有時間限制的tryTransfer(E e,long timeout,TimeUnit unit)方法,試圖把生產者傳入 的元素直接傳給消費者,可是若是沒有消費者消費該元素則等待指定的時間再返回,若是超 時還沒消費元素,則返回false,若是在超時時間內消費了元素,則返回true。
  7. LinkedBlockingDeque:LinkedBlockingDeque是一個由鏈表結構組成的雙向阻塞隊列。雙向隊列由於多了一個操做隊列的入口,在多線程同時入隊 時,也就減小了一半的競爭。相比其餘的阻塞隊列,LinkedBlockingDeque多了addFirst、 addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First單詞結尾的方法,表示插入、 獲取(peek)或移除雙端隊列的第一個元素。以Last單詞結尾的方法,表示插入、獲取或移除雙 端隊列的最後一個元素。另外,插入方法add等同於addLast,移除方法remove等效於 removeFirst。可是take方法卻等同takeFirst

3.阻塞隊列實現原理

    JDK經過通知模式實現。所謂通知模式,就是當生產者往滿的隊列裏添加元素時會阻塞住生產者,當消費者消費了一個隊列中的元素後,會通知生產者當前隊列可用。經過查看JDK源碼 發現ArrayBlockingQueue使用了Condition來實現。也就是經過await/signal來實現。多線程

final Object[] items;
 final ReentrantLock lock;
 private final Condition notEmpty;
 private final Condition notFull;
 int count;
 public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
 }
 public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
 private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }
 private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }
相關文章
相關標籤/搜索