JAVA隊列詳細分析和ArrayBlockingQueue源碼剖析(JDK1.8)

Queue: 基本上,一個隊列就是一個先入先出(FIFO)的數據結構java

Queue接口與List、Set同一級別,都是繼承了Collection接口。LinkedList實現了Deque接 口。數組

Queue的實現安全


一、沒有實現的阻塞接口的LinkedList: 實現了java.util.Queue接口和java.util.AbstractQueue接口
  內置的不阻塞隊列: PriorityQueue 和 ConcurrentLinkedQueue
  PriorityQueue 和 ConcurrentLinkedQueue 類在 Collection Framework 中加入兩個具體集合實現。 
  PriorityQueue 類實質上維護了一個有序列表。加入到 Queue 中的元素根據它們的自然排序(經過其 java.util.Comparable 實現)或者根據傳遞給構造函數的 java.util.Comparator 實現來定位。
  ConcurrentLinkedQueue 是基於連接節點的、線程安全的隊列。併發訪問不須要同步。由於它在隊列的尾部添加元素並從頭部刪除它們,因此只要不須要知道隊列的大 小,          ConcurrentLinkedQueue 對公共集合的共享訪問就能夠工做得很好。收集關於隊列大小的信息會很慢,須要遍歷隊列。數據結構


二、實現阻塞接口的:
  java.util.concurrent 中加入了 BlockingQueue 接口和五個阻塞隊列類。它實質上就是一種帶有一點扭曲的 FIFO 數據結構。不是當即從隊列中添加或者刪除元素,線程執行操做阻塞,直到有空間或者元素可用。
五個隊列所提供的各有不一樣:
  * ArrayBlockingQueue :一個由數組支持的有界隊列。
  * LinkedBlockingQueue :一個由連接節點支持的可選有界隊列。
  * PriorityBlockingQueue :一個由優先級堆支持的無界優先級隊列。
  * DelayQueue :一個由優先級堆支持的、基於時間的調度隊列。
  * SynchronousQueue :一個利用 BlockingQueue 接口的簡單彙集(rendezvous)機制。多線程

下表顯示了jdk1.5中的阻塞隊列的操做:併發

  add        增長一個元索                     若是隊列已滿,則拋出一個IIIegaISlabEepeplian異常
  remove   移除並返回隊列頭部的元素    若是隊列爲空,則拋出一個NoSuchElementException異常
  element  返回隊列頭部的元素             若是隊列爲空,則拋出一個NoSuchElementException異常
  offer       添加一個元素並返回true       若是隊列已滿,則返回false
  poll         移除並返問隊列頭部的元素    若是隊列爲空,則返回null
  peek       返回隊列頭部的元素             若是隊列爲空,則返回null
  put         添加一個元素                      若是隊列滿,則阻塞
  take        移除並返回隊列頭部的元素     若是隊列爲空,則阻塞函數

remove、element、offer 、poll、peek 實際上是屬於Queue接口。 性能

阻塞隊列的操做能夠根據它們的響應方式分爲如下三類:aad、removee和element操做在你試圖爲一個已滿的隊列增長元素或從空隊列取得元素時 拋出異常。固然,在多線程程序中,隊列在任什麼時候間均可能變成滿的或空的,因此你可能想使用offer、poll、peek方法。這些方法在沒法完成任務時 只是給出一個出錯示而不會拋出異常。this

注意:poll和peek方法出錯進返回null。所以,向隊列中插入null值是不合法的spa

最後,咱們有阻塞操做put和take。put方法在隊列滿時阻塞,take方法在隊列空時阻塞。

LinkedBlockingQueue的容量是沒有上限的(說的不許確,在不指定時容量爲Integer.MAX_VALUE,不要然的話在put時怎麼會受阻呢),可是也能夠選擇指定其最大容量,它是基於鏈表的隊列,此隊列按 FIFO(先進先出)排序元素。


ArrayBlockingQueue在構造時須要指定容量, 並能夠選擇是否須要公平性,若是公平參數被設置true,等待時間最長的線程會優先獲得處理(其實就是經過將ReentrantLock設置爲true來 達到這種公平性的:即等待時間最長的線程會先操做)。一般,公平性會使你在性能上付出代價,只有在的確很是須要的時候再使用它。它是基於數組的阻塞循環隊 列,此隊列按 FIFO(先進先出)原則對元素進行排序。


PriorityBlockingQueue是一個帶優先級的 隊列,而不是先進先出隊列。元素按優先級順序被移除,該隊列也沒有上限(看了一下源碼,PriorityBlockingQueue是對 PriorityQueue的再次包裝,是基於堆數據結構的,而PriorityQueue是沒有容量限制的,與ArrayList同樣,因此在優先阻塞 隊列上put時是不會受阻的。雖然此隊列邏輯上是無界的,可是因爲資源被耗盡,因此試圖執行添加操做可能會致使 OutOfMemoryError),可是若是隊列爲空,那麼取元素的操做take就會阻塞,因此它的檢索操做take是受阻的。另外,往入該隊列中的元 素要具備比較能力。


DelayQueue(基於PriorityQueue來實現的)是一個存放Delayed 元素的無界阻塞隊列,只有在延遲期滿時才能從中提取元素。該隊列的頭部是延遲期滿後保存時間最長的 Delayed 元素。若是延遲都尚未期滿,則隊列沒有頭部,而且poll將返回null。當一個元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一個小於或等於零的值時,則出現期滿,poll就以移除這個元素了。此隊列不容許使用 null 元素。

ArrayBlockingQueue 的源碼解析

ArrayBlockingQueue類的結構以下:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
        private static final long serialVersionUID = -817911632652898426L;
        final Object[] items;  //用數據來存儲元素的容器
        int takeIndex;  //下一次讀取或移除的位置(remove、poll、take )
        int putIndex;  //下一次存放元素的位置(add、offer、put)
        int count;  //隊列中元素的總數
        final ReentrantLock lock;  //全部訪問的保護鎖
        private final Condition notEmpty;  //等待獲取元素的條件
        private final Condition notFull;  //等待存放元素的條件
        略...

能夠看出ArrayBlockingQueue內部使用final修飾的對象數組來存儲元素,一旦初始化數組,數組的大小就不可改變。使用ReentrantLock鎖來保證鎖競爭,使用Condition來控制插入或獲取元素時,線程是否阻塞。

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        //得到支持響應中斷的鎖
        lock.lockInterruptibly();
        try {
            //使用while循環來判斷隊列是否已滿,防止假喚醒
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

首先得到鎖,而後判斷隊列是否已滿,若是已滿則阻塞當前生產線程,直到隊列中空閒時,被喚醒操做。隊列空閒則調用enqueue 插入元素。

private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        //把當前元素插入到數組中去
        items[putIndex] = x;
        //這裏能夠看出這個數組是個環形數組
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        // 喚醒在notEmpty條件上等待的線程 
        notEmpty.signal();
    }

把元素插入到隊列中去,能夠看出這個隊列中的數組是環形數組結構,這樣每次插入、移除的時候不須要複製移動數組中的元素。

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        //得到可響應中斷鎖
        lock.lockInterruptibly();
        try {
            //使用while循環來判斷隊列是否已滿,防止假喚醒
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

消費者線程從阻塞隊列中獲取元素,若是隊列中元素爲空,則阻塞當前的消費者線程直到有數據時才調用dequeue方法獲取元素。不然直接調用dequeue方法獲取元素

private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        //獲取元素
        E x = (E) items[takeIndex];
        //將當前位置的元素設置爲null
        items[takeIndex] = null;
        //這裏能夠看出這個數組是個環形數組
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            //修改迭代器參數
            itrs.elementDequeued();
        // 喚醒在notFull條件上等待的線程 
        notFull.signal();
        return x;
    }

直接從數據中獲取items[takeIndex]的元素,並設置當前位置的元素爲null,並設置下一次takeIndex的座標(++takeIndex),隊列元素總數-1等操做。

public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        //得到不可響應中斷的鎖
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                //
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

首先判斷隊列中的元素是否已滿,若是已滿則直接返回false,不然調用enqueue方法向隊列中插入元素,插入成功返回true。

public E poll() {
        final ReentrantLock lock = this.lock;
        //得到不可響應中斷的鎖
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }

判斷隊列是否爲空,若是爲空返回null,不然調用dequeue方法返回元素。

public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

首先調用offer方法插入元素,插入成功返回true,不然拋出IllegalStateException異常。

public E remove() {
        E x = poll();
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    }

首先調用poll方法獲取元素,若是不爲空則直接返回,不然拋出NoSuchElementException異常。

public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        //獲得超時的時間
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        //得到可響應中斷的鎖
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }

首先判斷隊列是否已滿,若是已滿再循環判斷超時時間是否超時,超時則直接返回false,不然阻塞該生產線程nanos時間,若是nanos時間以內喚醒則調用enqueue方法插入元素。若是隊列不滿則直接調用enqueue方法插入元素,並返回true。

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        //獲得超時的時間
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        //得到可響應中斷的鎖
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

首先循環判斷隊列是否爲空,若是爲空再判斷是否超時,超時則返回null。不超時則等待,在nanos時間喚醒則調用dequeue方法獲取元素。

public E element() {
        E x = peek();
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    }

調用peek方法獲取元素,元素不爲空則返回,不然拋出NoSuchElementException異常。

public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex); // null when queue is empty
        } finally {
            lock.unlock();
        }
    }
    
    final E itemAt(int i) {
        return (E) items[i];
    }

調用itemAt方法獲取元素。

** 其它的阻塞隊列實現原理都相似,都是使用ReentrantLock和Condition來完成併發控制、阻塞的。 **

相關文章
相關標籤/搜索