Java中的阻塞隊列-ArrayBlockingQueue(一)

最近在看一些java基礎的東西,看到了隊列這章,打算對複習的一些知識點作一個筆記,也算是對本身思路的一個整理,本章先聊聊java中的阻塞隊列java

參考文章:數組

http://ifeve.com/java-blocking-queue/app

https://blog.csdn.net/u014082714/article/details/52215130函數

由上圖能夠用看出java中的阻塞隊列都實現了 BlockingQueue接口,BlockingQueue又繼承自Queue性能

一、什麼是阻塞隊列?

阻塞隊列(BlockingQueue)是一個支持兩個附加操做的隊列。這兩個附加的操做是:在隊列爲空時,獲取元素的線程會等待隊列變爲非空。當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列經常使用於生產者和消費者的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器裏拿元素。this

阻塞隊列提供了四種處理方法:spa

  • 拋出異常:是指當阻塞隊列滿時候,再往隊列裏插入元素,會拋出IllegalStateException(「Queue full」)異常。當隊列爲空時,從隊列裏獲取元素時會拋出NoSuchElementException異常 。
  • 返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從隊列裏拿出一個元素,若是沒有則返回null
  • 一直阻塞:當阻塞隊列滿時,若是生產者線程往隊列裏put元素,隊列會一直阻塞生產者線程,直到拿到數據,或者響應中斷退出。當隊列空時,消費者線程試圖從隊列裏take元素,隊列也會阻塞消費者線程,直到隊列可用。
  • 超時退出:當阻塞隊列滿時,隊列會阻塞生產者線程一段時間,若是超過必定的時間,生產者線程就會退出。

2.、Java裏的阻塞隊列

JDK7提供了7個阻塞隊列。分別是操作系統

  • ArrayBlockingQueue :一個由數組結構組成的有界阻塞隊列。
  • LinkedBlockingQueue :一個由鏈表結構組成的有界阻塞隊列。
  • PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列。
  • DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。
  • SynchronousQueue:一個不存儲元素的阻塞隊列。
  • LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
  • LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。

 ArrayBlockingQueue.net

ArrayBlockingQueue是一個用數組實現有界阻塞隊列。此隊列按照先進先出(FIFO)的原則對元素進行排序。默認狀況下不保證訪問者公平的訪問隊列,所謂公平訪問隊列是指阻塞的全部生產者線程或消費者線程,當隊列可用時,能夠按照阻塞的前後順序訪問隊列,即先阻塞的生產者線程,能夠先往隊列裏插入元素,先阻塞的消費者線程,能夠先從隊列裏獲取元素。一般狀況下爲了保證公平性會下降吞吐量。咱們能夠使用如下代碼建立一個公平的阻塞隊列線程

ArrayBlockingQueue fairQueue = new  ArrayBlockingQueue(1000,true);

public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
   throw new IllegalArgumentException();
   this.items = new Object[capacity];
   lock = new ReentrantLock(fair);
   notEmpty = lock.newCondition();
   notFull = lock.newCondition();
}

經過源碼咱們能夠看到,構造器第一個參數是指定有界隊列的大小(及數組的大小),第二個參數指定是否使用公平鎖,這裏能夠看到阻塞隊列的公平訪問隊列是經過重入鎖來實現的(關於重入鎖咱們在別的章節介紹)

下邊咱們結合源碼對其提供的方法作一個簡單分析

關於構造器相關說明

/**
* * 構造函數,設置隊列的初始容量 */ public ArrayBlockingQueue(int capacity) { this(capacity, false); } /** * 構造函數。capacity設置數組大小 ,fair設置是否爲公平鎖 * capacity and the specified access policy. */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair);//是否爲公平鎖,若是是的話,那麼先到的線程先得到鎖對象。 //不然,由操做系統調度由哪一個線程得到鎖,通常爲false,性能會比較高 notEmpty = lock.newCondition(); notFull = lock.newCondition(); } /** *構造函數,帶有初始內容的隊列 */ public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); //要給數組設置內容,先上鎖 try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e;//依次拷貝內容 } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i;//若是putIndex大於數組大小 ,那麼從0從新開始 } finally { lock.unlock();//最後必定要釋放鎖 } }
關於方法的說明

/**
       * 添加一個元素,其實super.add裏面調用了offer方法       */       public boolean add(E e) {           return super.add(e);       }    
/**
* 當調用offer方法返回false時,直接拋出異常
*/
     public boolean add(E e) {
       if (offer(e))
           return true;
else
throw new IllegalStateException("Queue full");
}
}
     /**       *加入成功返回true,不然返回false       *        */       public boolean offer(E e) {           checkNotNull(e);           final ReentrantLock lock = this.lock;           lock.lock();//上鎖  
        try {               if (count == items.length) //超過數組的容量  
                return false;               else {                   enqueue(e); //放入元素  
                return true;               }           } finally {               lock.unlock();           }       }          /**       * 若是隊列已滿的話,就會等待       */       public void put(E e) throws InterruptedException {           checkNotNull(e);           final ReentrantLock lock = this.lock;           lock.lockInterruptibly();//和lock()方法的區別是讓它在阻塞時也可拋出異常跳出  
        try {               while (count == items.length)                   notFull.await(); //這裏就是阻塞了,要注意。若是運行到這裏,那麼它會釋放上面的鎖,一直等到notify  
            enqueue(e);           } finally {               lock.unlock();           }       }          /**       * 帶有超時時間的插入方法,unit表示是按秒、分、時哪種       */       public boolean offer(E e, long timeout, TimeUnit unit)           throws InterruptedException {              checkNotNull(e);           long nanos = unit.toNanos(timeout);           final ReentrantLock lock = this.lock;           lock.lockInterruptibly();           try {               while (count == items.length) {                   if (nanos <= 0)                       return false;                   nanos = notFull.awaitNanos(nanos);//帶有超時等待的阻塞方法  
            }               enqueue(e);//入隊  
            return true;           } finally {               lock.unlock();           }       }          //實現的方法,若是當前隊列爲空,返回null  
    public E poll() {           final ReentrantLock lock = this.lock;           lock.lock();           try {               return (count == 0) ? null : dequeue();           } finally {               lock.unlock();           }       }        //實現的方法,若是當前隊列爲空,一直阻塞  
    public E take() throws InterruptedException {           final ReentrantLock lock = this.lock;           lock.lockInterruptibly();           try {               while (count == 0)                   notEmpty.await();//隊列爲空,阻塞方法  
            return dequeue();           } finally {               lock.unlock();           }       }       //帶有超時時間的取元素方法,不然返回Null  
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {           long nanos = unit.toNanos(timeout);           final ReentrantLock lock = this.lock;           lock.lockInterruptibly();           try {               while (count == 0) {                   if (nanos <= 0)                       return null;                   nanos = notEmpty.awaitNanos(nanos);//超時等待  
            }               return dequeue();//取得元素  
        } finally {               lock.unlock();           }       }       //只是看一個隊列最前面的元素,取出是不刪除隊列中的原來元素。隊列爲空時返回null  
    public E peek() {           final ReentrantLock lock = this.lock;           lock.lock();           try {               return itemAt(takeIndex); // 隊列爲空時返回null  
        } finally {               lock.unlock();           }       }          /**       * 返回隊列當前元素個數       *       */       public int size() {           final ReentrantLock lock = this.lock;           lock.lock();           try {               return count;           } finally {               lock.unlock();           }       }          /**       * 返回當前隊列再放入多少個元素就滿隊       */       public int remainingCapacity() {           final ReentrantLock lock = this.lock;           lock.lock();           try {               return items.length - count;           } finally {               lock.unlock();           }       }          /**       *  從隊列中刪除一個元素的方法。刪除成功返回true,不然返回false       */       public boolean remove(Object o) {           if (o == nullreturn false;           final Object[] items = this.items;           final ReentrantLock lock = this.lock;           lock.lock();           try {               if (count > 0) {                   final int putIndex = this.putIndex;                   int i = takeIndex;                   do {                       if (o.equals(items[i])) {                           removeAt(i); //真正刪除的方法  
                        return true;                       }                       if (++i == items.length)                           i = 0;                   } while (i != putIndex);//一直不斷的循環取出來作判斷  
            }               return false;           } finally {               lock.unlock();           }       }          /**       * 是否包含一個元素       */       public boolean contains(Object o) {           if (o == nullreturn false;           final Object[] items = this.items;           final ReentrantLock lock = this.lock;           lock.lock();           try {               if (count > 0) {                   final int putIndex = this.putIndex;                   int i = takeIndex;                   do {                       if (o.equals(items[i]))                           return true;                       if (++i == items.length)                           i = 0;                   } while (i != putIndex);               }               return false;           } finally {               lock.unlock();           }       }          /**       * 清空隊列       *       */       public void clear() {           final Object[] items = this.items;           final ReentrantLock lock = this.lock;           lock.lock();           try {               int k = count;               if (k > 0) {                   final int putIndex = this.putIndex;                   int i = takeIndex;                   do {                       items[i] null;                       if (++i == items.length)                           i = 0;                   } while (i != putIndex);                   takeIndex = putIndex;                   count = 0;                   if (itrs != null)                       itrs.queueIsEmpty();                   for (; k > 0 && lock.hasWaiters(notFull); k--)                       notFull.signal();               }           } finally {               lock.unlock();           }       }          /**       * 取出全部元素到集合       */       public int drainTo(Collection<? super E> c) {           return drainTo(c, Integer.MAX_VALUE);       }          /**       * 取出全部元素到集合       */       public int drainTo(Collection<? super E> c, int maxElements) {           checkNotNull(c);           if (c == this)               throw new IllegalArgumentException();           if (maxElements <= 0)               return 0;           final Object[] items = this.items;           final ReentrantLock lock = this.lock;           lock.lock();           try {               int n = Math.min(maxElements, count);               int take = takeIndex;               int i = 0;               try {                   while (i < n) {                       @SuppressWarnings("unchecked")                       E x = (E) items[take];                       c.add(x);                       items[take] null;                       if (++take == items.length)                           take = 0;                       i++;                   }                   return n;               } finally {                   // Restore invariants even if c.add() threw  
                if (i > 0) {                       count -= i;                       takeIndex = take;                       if (itrs != null) {                           if (count == 0)                               itrs.queueIsEmpty();                           else if (i > take)                               itrs.takeIndexWrapped();                       }                       for (; i > 0 && lock.hasWaiters(notFull); i--)                           notFull.signal();                   }               }           } finally {               lock.unlock();           }       }  
相關文章
相關標籤/搜索