阻塞隊列

連接java

 

引言:數組

  使用位置:自定義線程池時,線程池構造器中有個參數就是阻塞隊列。阻塞隊列也經常使用於生產者和消費者的場景緩存

 

1.什麼是阻塞隊列(BlockingQueue)

  阻塞隊列經常使用於生產者和消費者的場景,生產者是往在容器隊列裏添加數據的線程,而消費者則從從容器隊列中獲取數據的線程,該容器隊列就稱之爲阻塞隊列多線程

 阻塞隊列使用的場景

    • 當隊列中沒有數據時,消費者端的所有線程掛起。
    • 當隊列中數據填滿時,生產者端的所有線程掛起。

 

 阻塞線程(BlockingQueue)的核心方法

  放入數據:併發

    • offer(anObject):表示若是可能的話,將anObject加到BlockingQueue裏,即若是BlockingQueue能夠容納, 
      則返回true,不然返回false.(本方法不阻塞當前執行方法的線程)  
    • offer(E o, long timeout, TimeUnit unit),能夠設定等待的時間,若是在指定的時間內,還不能往隊列中 
      加入BlockingQueue,則返回失敗。
    • put(anObject):把anObject加到BlockingQueue裏,若是BlockQueue沒有空間,則調用此方法的線程被阻斷直到BlockingQueue裏面有空間再繼續.

  獲取數據:ide

    • poll(time):取走BlockingQueue裏排在首位的對象,若不能當即取出,則能夠等time參數規定的時間, 
      取不到時返回null;
    • poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的對象,若是在指定時間內, 
      隊列一旦有數據可取,則當即返回隊列中的數據。不然知道時間超時尚未數據可取,返回失敗。
    • take():取走BlockingQueue裏排在首位的對象,若BlockingQueue爲空,阻斷進入等待狀態直到 
      BlockingQueue有新的數據被加入;
    • drainTo():一次性從BlockingQueue獲取全部可用的數據對象(還能夠指定獲取數據的個數), 
      經過該方法,能夠提高獲取數據效率;不須要屢次分批加鎖或釋放鎖。

  

  插入和移除操做的4種處理方式

    • 拋出異常:是指當阻塞隊列滿時候,再往隊列裏插入元素,會拋出IllegalStateException(「Queue 
      full」)異常。當隊列爲空時,從隊列裏獲取元素時會拋出NoSuchElementException異常 。函數

    • 返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從隊列裏拿出一個元素,若是沒有則返回null高併發

    • 一直阻塞:當阻塞隊列滿時,若是生產者線程往隊列裏put元素,隊列會一直阻塞生產者線程,直到拿到數據,或者響應中斷退出。當隊列空時,消費者線程試圖從隊列裏take元素,隊列也會阻塞消費者線程,直到隊列可用。性能

    • 超時退出:當阻塞隊列滿時,隊列會阻塞生產者線程一段時間,若是超過必定的時間,生產者線程就會退出this

  總結

   1.add,offer,put三種添加線程到隊列的方法只在隊列滿的時候有區別,add爲拋異常,offer返回boolean值,put直到添加成功爲止。

  2.同理remove,poll, take三種移除隊列中線程的方法只在隊列爲空的時候有區別, remove爲拋異常,poll爲返回boolean值, take等待直到有線程能夠被移除
  

 

 

 

2.Java中的阻塞隊列

 

  JDK7提供了7個阻塞隊列,分別是:

    • ArrayBlockingQueue :由數組結構組成的有界阻塞隊列。
    • LinkedBlockingQueue :由鏈表結構組成的有界阻塞隊列。
    • PriorityBlockingQueue :支持優先級排序的無界阻塞隊列。
    • DelayQueue:使用優先級隊列實現的無界阻塞隊列。
    • SynchronousQueue:不存儲元素的阻塞隊列。
    • LinkedTransferQueue:由鏈表結構組成的無界阻塞隊列。
    • LinkedBlockingDeque:由鏈表結構組成的雙向阻塞隊列。

 

  ArrayBlockingQueue

    用數組實現的有界阻塞隊列。此隊列按照先進先出(FIFO)的原則對元素進行排序。

    默認狀況下不保證訪問者公平的訪問隊列,所謂公平訪問隊列是指阻塞的全部生產者線程或消費者線程,當隊列可用時,能夠按照阻塞的前後順序訪問隊列,即先阻塞的生產者線程,能夠先往隊列裏插入元素,先阻塞的消費者線程,能夠先從隊列裏獲取元素。

    一般狀況下爲了保證公平性會下降吞吐量。咱們可使用如下代碼建立一個公平的阻塞隊列:

    

ArrayBlockingQueue fairQueue = new  ArrayBlockingQueue(1000,true);

  

  LinkedBlockingQueue

  基於鏈表的阻塞隊列,同ArrayListBlockingQueue相似,此隊列按照先進先出(FIFO)的原則對元素進行排序,其內部也維持着一個數據緩衝隊列(該隊列由一個鏈表構成),當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,並緩存在隊列內部,而生產者當即返回;只有當隊列緩衝區達到最大值緩存容量時(LinkedBlockingQueue能夠經過構造函數指定該值),纔會阻塞生產者隊列,直到消費者從隊列中消費掉一份數據,生產者線程會被喚醒,反之對於消費者這端的處理也基於一樣的原理。而LinkedBlockingQueue之因此可以高效的處理併發數據,還由於其對於生產者端和消費者端分別採用了獨立的鎖來控制數據同步,這也意味着在高併發的狀況下生產者和消費者能夠並行地操做隊列中的數據,以此來提升整個隊列的併發性能。 

  做爲開發者,咱們須要注意的是,若是構造一個LinkedBlockingQueue對象,而沒有指定其容量大小,LinkedBlockingQueue會默認一個相似無限大小的容量(Integer.MAX_VALUE),這樣的話,若是生產者的速度一旦大於消費者的速度,也許尚未等到隊列滿阻塞產生,系統內存就有可能已被消耗殆盡了。 


  ArrayBlockingQueue和LinkedBlockingQueue是兩個最普通也是最經常使用的阻塞隊列,通常狀況下,在處理多線程間的生產者消費者問題,使用這兩個類足以。

  

  PriorityBlockingQueue

  是一個支持優先級的無界隊列。默認狀況下元素採起天然順序升序排列。能夠自定義實現compareTo()方法來指定元素進行排序規則,或者初始化PriorityBlockingQueue時,指定構造參數Comparator來對元素進行排序。須要注意哦的是不能保證同優先級元素的順序。

  

  DelayQueue

  是一個支持延時獲取元素的無界阻塞隊列。隊列使用PriorityQueue來實現。隊列中的元素必須實現Delayed接口,在建立元素時能夠指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從隊列中提取元素。咱們能夠將DelayQueue運用在如下應用場景:

  • 緩存系統的設計:能夠用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。
  • 定時任務調度:使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從好比TimerQueue就是使用DelayQueue實現的。

  

  SynchronousQueue

  是一個不存儲元素的阻塞隊列。每個put操做必須等待一個take操做,不然不能繼續添加元素。SynchronousQueue能夠當作是一個傳球手,負責把生產者線程處理的數據直接傳遞給消費者線程。隊列自己並不存儲任何元素,很是適合於傳遞性場景,好比在一個線程中使用的數據,傳遞給另一個線程使用,SynchronousQueue的吞吐量高於LinkedBlockingQueue 和 ArrayBlockingQueue。

  

  LinkedTransferQueue

  是一個由鏈表結構組成的無界阻塞TransferQueue隊列。相對於其餘阻塞隊列,LinkedTransferQueue多了tryTransfer和transfer方法。 
transfer方法。若是當前有消費者正在等待接收元素(消費者使用take()方法或帶時間限制的poll()方法時),transfer方法能夠把生產者傳入的元素馬上transfer(傳輸)給消費者。若是沒有消費者在等待接收元素,transfer方法會將元素存放在隊列的tail節點,並等到該元素被消費者消費了才返回。transfer方法的關鍵代碼以下:

  

Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);

  

  第一行代碼是試圖把存放當前元素的s節點做爲tail節點。第二行代碼是讓CPU自旋等待消費者消費元素。由於自旋會消耗CPU,因此自旋必定的次數後使用Thread.yield()方法來暫停當前正在執行的線程,並執行其餘線程。

  tryTransfer方法。則是用來試探下生產者傳入的元素是否能直接傳給消費者。若是沒有消費者等待接收元素,則返回false。和transfer方法的區別是tryTransfer方法不管消費者是否接收,方法當即返回。而transfer方法是必須等到消費者消費了才返回。

  對於帶有時間限制的tryTransfer(E e, long timeout, TimeUnit unit)方法,則是試圖把生產者傳入的元素直接傳給消費者,可是若是沒有消費者消費該元素則等待指定的時間再返回,若是超時還沒消費元素,則返回false,若是在超時時間內消費了元素,則返回true。

  

  LinkedBlockingDeque

  是一個由鏈表結構組成的雙向阻塞隊列。所謂雙向隊列指的你能夠從隊列的兩端插入和移出元素。雙端隊列由於多了一個操做隊列的入口,在多線程同時入隊時,也就減小了一半的競爭。相比其餘的阻塞隊列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法,以First單詞結尾的方法,表示插入,獲取(peek)或移除雙端隊列的第一個元素。以Last單詞結尾的方法,表示插入,獲取或移除雙端隊列的最後一個元素。另外插入方法add等同於addLast,移除方法remove等效於removeFirst。可是take方法卻等同於takeFirst,不知道是否是Jdk的bug,使用時仍是用帶有First和Last後綴的方法更清楚。

在初始化LinkedBlockingDeque時能夠設置容量防止其過渡膨脹。另外雙向阻塞隊列能夠運用在「工做竊取」模式中。

 

 

 

3.阻塞隊列的實現原理(JDK1.7)

以ArrayBlockingQueue爲例,咱們先來看看代碼:

  

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    private static final long serialVersionUID = -817911632652898426L;
    /** The queued items */
    final Object[] items;
    /** items index for next take, poll, peek or remove */
    int takeIndex;
    /** items index for next put, offer, or add */
    int putIndex;
    /** Number of elements in the queue */
    int count;
    final ReentrantLock lock;
    /** Condition for waiting takes */
    private final Condition notEmpty;
    /** Condition for waiting puts */
    private final Condition notFull;
 ...省略
 }

  從上面代碼能夠看出ArrayBlockingQueue是維護一個Object類型的數組,takeIndex和putIndex分別表示隊首元素和隊尾元素的下標,count表示隊列中元素的個數,lock則是一個可重入鎖,notEmpty和notFull是等待條件。接下來咱們看看關鍵方法put:

  

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

  從put方法的實現能夠看出,它先獲取了鎖,而且獲取的是可中斷鎖,而後判斷當前元素個數是否等於數組的長度,若是相等,則調用notFull.await()進行等待,當被其餘線程喚醒時,經過enqueue(e)方法插入元素,最後解鎖。

  

/**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length) putIndex = 0;
        count++;
        notEmpty.signal();
    }

  

插入成功後,經過notEmpty喚醒正在等待取元素的線程。再來看看take方法:

  

   public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

  跟put方法實現相似,put方法等待的是notFull信號,而take方法等待的是notEmpty信號。在take方法中,若是能夠取元素,則經過dequeue方法取得元素,下面是dequeue方法的實現:

  

 private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length) takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

  

4.阻塞隊列的使用場景

 

除了線程池的實現使用阻塞隊列以外,咱們能夠在生產者-消費者模式來使用阻塞隊列,

 

首先使用Object.wait()、Object.notify()和非阻塞隊列實現生產者-消費者模式:

  
public class Test {
    private int queueSize = 10;
    private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);  
    public static void main(String[] args)  {
        Test test = new Test();
        Producer producer = test.new Producer();
        Consumer consumer = test.new Consumer();       
        producer.start();
        consumer.start();
    }

    class Consumer extends Thread{         
        @Override
        public void run() {
            while(true){
                synchronized (queue) {
                    while(queue.size() == 0){
                        try {
                            System.out.println("隊列空,等待數據");
                            queue.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            queue.notify();
                        }
                    }
                    queue.poll();          //每次移走隊首元素
                    queue.notify();
                }
            }
        }
    }

    class Producer extends Thread{       
        @Override
        public void run() {
            while(true){
                synchronized (queue) {
                    while(queue.size() == queueSize){
                        try {
                            System.out.println("隊列滿,等待有空餘空間");
                            queue.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            queue.notify();
                        }
                    }
                    queue.offer(1);        //每次插入一個元素
                    queue.notify();
                }
            }
        }
    }       
}

  

下面是使用阻塞隊列實現的生產者-消費者模式:

public class Test {
    private int queueSize = 10;
    private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize); 
    public static void main(String[] args)  {
        Test test = new Test();
        Producer producer = test.new Producer();
        Consumer consumer = test.new Consumer();         
        producer.start();
        consumer.start();
    }

    class Consumer extends Thread{  
        @Override
        public void run() {
            while(true){
                try {
                    queue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }   
    }

    class Producer extends Thread{    
        @Override
        public void run() {         
            while(true){
                try {
                    queue.put(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }     
    }
}

  

很顯然使用阻塞隊列實現不須要單獨考慮同步和線程間通訊的問題,實現起來很簡單。

相關文章
相關標籤/搜索