Java併發之LinkedBlockingQueue

上一篇咱們已經學習過了 ArrayBlockingQueue的知識及相關方法的使用,這一篇咱們就來再學習一下ArrayBlockingQueue的親戚 LinkedBlockingQueue。在集合類中 ArrayList與 LinkedList會經常拿來比較,ArrayList內部實現是基於數組的,而 LinkedList內部實現是基於鏈表,因此他們之間會有不少不一樣,可是本文不會去重點討論,感興趣的朋友能夠參考我以前發過的幾篇文章,那麼有請本節的主角 LinkedBlockingQueue!node

        LinkedBlockingQueue數組

        LinkedBlockingQueue是一個一個基於已連接節點的、範圍任意(相對而論)的 blocking queue。此隊列按 FIFO(先進先出)排序元素。隊列的頭部 是在隊列中時間最長的元素。隊列的尾部 是在隊列中時間最短的元素。新元素插入到隊列的尾部,而且隊列獲取操做會得到位於隊列頭部的元素。連接隊列的吞吐量一般要高於基於數組的隊列,可是在大多數併發應用程序中,其可預知的性能要低。 併發

        可選的容量範圍構造方法參數做爲防止隊列過分擴展的一種方法。若是未指定容量,則它等於 Integer.MAX_VALUE。除非插入節點會使隊列超出容量,不然每次插入後會動態地建立連接節點。 性能

        LinkedBlockingQueue及其迭代器實現了 Collection 和 Iterator 接口的全部可選 方法。 學習

        咱們已經學習過了 ArrayBlockingQueue,因此學習 LinkedBlockingQueue就天然比較輕鬆,因此本文對於已經明確的相關概念就不作過多介紹了,而是重點放在二者的區別之上。this

 

        1.成員變量spa

        與ArrayBlockingQueue不一樣 LinkedBlockingQueue的成員變量有些變化,如下是 LinkedBlockingQueue的成員變量:線程

Java代碼   收藏代碼
  1. /** 容量範圍,默認值爲 Integer.MAX_VALUE */  
  2. private final int capacity;  
  3.   
  4. /** 當前隊列中元素數量 */  
  5. private final AtomicInteger count = new AtomicInteger(0);  
  6.   
  7. /** 頭節點 */  
  8. private transient Node<E> head;  
  9.   
  10. /** 尾節點 */  
  11. private transient Node<E> last;  
  12.   
  13. /** take, poll等方法的鎖 */  
  14. private final ReentrantLock takeLock = new ReentrantLock();  
  15.   
  16. /** 獲取隊列的 Condition(條件)實例 */  
  17. private final Condition notEmpty = takeLock.newCondition();  
  18.   
  19. /** put, offer等方法的鎖 */  
  20. private final ReentrantLock putLock = new ReentrantLock();  
  21.   
  22. /** 插入隊列的 Condition(條件)實例 */  
  23. private final Condition notFull = putLock.newCondition();  

        1)首先 LinkedBlockingQueue明確了容量變量,當爲指定容量時,默認容量爲Int的最大值Integer.MAX_VALUE。對象

        2)隊列元素數量變量 count採用的是 AtomicInteger ,而不是普通的Int型。CAS相關可參考http://286.iteye.com/blog/2295165blog

        3)LinkedBlockingQueue內部隊列實現使用的是 Node節點類,這與 LinkedList相似。

        4)最後也是最重要的一點,那就是獲取與插入操做分紅了兩個鎖:takeLock與 putLock來處理,這點下面還會重點分析。

 

        2.構造方法

        有三個構造方法,分別爲默認,指定容量,指定容量和初始元素。

Java代碼   收藏代碼
  1. /** 
  2.  * 建立一個容量爲 Integer.MAX_VALUE 的 LinkedBlockingQueue 
  3.  */  
  4. public LinkedBlockingQueue() {  
  5.     this(Integer.MAX_VALUE);  
  6. }  
  7.   
  8. /** 
  9.  * 建立一個具備給定(固定)容量的 LinkedBlockingQueue 
  10.  */  
  11. public LinkedBlockingQueue(int capacity) {  
  12.     if (capacity <= 0)  
  13.         throw new IllegalArgumentException();  
  14.     this.capacity = capacity;  
  15.     last = head = new Node<E>(null);  
  16. }  
  17.   
  18. /** 
  19.  * 建立一個容量爲 Integer.MAX_VALUE 的 LinkedBlockingQueue, 
  20.  * 最初包含給定 collection 的元素,元素按該 collection 迭代器的遍歷順序添加。 
  21.  */  
  22. public LinkedBlockingQueue(Collection<? extends E> c) {  
  23.     this(Integer.MAX_VALUE);  
  24.     for (E e : c)  
  25.         add(e);  
  26. }  

        默認構造方法建立一個容量爲 Integer.MAX_VALUE的 LinkedBlockingQueue實例。

        第二種構造方法,指定了隊列容量,首先判斷指定容量是否大於零,不然拋出異常。而後爲 capacity 賦值,最後建立空節點,並指向 head與 last,二者的 item與 next此時均爲 null。

 

 

        最後一種,利用循環向隊列中添加指定集合中的元素。

 

        3.Node類

        LinkedBlockingQueue內部列表實現是使用的 Node內部類,Node類也並不複雜,如下是其源代碼:

Java代碼   收藏代碼
  1. /** 
  2.  * 節點類 
  3.  */  
  4. static class Node<E> {  
  5.     /** volatile保障讀寫分離 */  
  6.     volatile E item;  
  7.     Node<E> next;  
  8.   
  9.     Node(E x) {  
  10.         item = x;  
  11.     }  
  12. }  

        item用於表示元素對象,next指向鏈表的下一個節點。

 



        LinkedBlockingQueue的大部分方法實際上是與  ArrayBlockingQueue相似的,因此本文就只介紹不一樣於ArrayBlockingQueue的相關方法。

 

        4.添加元素

        1)add方法

        add方法相同就不介紹了,一樣調用的是offer方法。

        2)offer方法

        將指定元素插入到此隊列的尾部(若是當即可行且不會超出此隊列的容量),在成功時返回 true,若是此隊列已滿,則返回 false。當使用有容量限制的隊列時,此方法一般要優於 add 方法,後者可能沒法插入元素,而只是拋出一個異常。 

        與ArrayBlockingQueue不一樣,LinkedBlockingQueue多了一些容量方面的判斷。

Java代碼   收藏代碼
  1. /** 
  2.  * 將指定元素插入到此隊列的尾部(若是當即可行且不會超出此隊列的容量) 
  3.  * 在成功時返回 true,若是此隊列已滿,則返回 false。 
  4.  * 當使用有容量限制的隊列時,此方法一般要優於 add 方法, 
  5.  * 後者可能沒法插入元素,而只是拋出一個異常。  
  6.  */  
  7. public boolean offer(E e) {  
  8.     //判斷添加元素是否爲null  
  9.     if (e == null)  
  10.         throw new NullPointerException();  
  11.     //第一點不一樣,使用原子類操做count,由於有兩個鎖  
  12.     final AtomicInteger count = this.count;  
  13.     //判斷容量,隊列是否已滿  
  14.     if (count.get() == capacity)  
  15.         return false;  
  16.     int c = -1;  
  17.     final ReentrantLock putLock = this.putLock;  
  18.     //獲取添加鎖  
  19.     putLock.lock();  
  20.     try {  
  21.         //再次判斷,若是隊列未滿  
  22.         if (count.get() < capacity) {  
  23.             //插入元素  
  24.             insert(e);  
  25.             //增長元素數count  
  26.             c = count.getAndIncrement();  
  27.             if (c + 1 < capacity)  
  28.                 //未滿則喚醒添加線程  
  29.                 notFull.signal();  
  30.         }  
  31.     } finally {  
  32.         //釋放鎖  
  33.         putLock.unlock();  
  34.     }  
  35.     //c等於0說明添加成功  
  36.     if (c == 0)  
  37.         //喚醒讀取線程  
  38.         signalNotEmpty();  
  39.     return c >= 0;  
  40. }  

        能夠看到offer方法的關鍵在於 insert方法。

        3)insert方法

         insert方法很是簡單,可是卻不要小看。

Java代碼   收藏代碼
  1. /** 
  2.  * 再隊尾添加元素 
  3.  */  
  4. private void insert(E x) {  
  5.     last = last.next = new Node<E>(x);  
  6. }  

        首先,根據指定參數x建立一個Node實例。

        而後,將原尾節點的next指向此節點。

        最後,將尾節點設置尾此節點。

        這樣新添加的節點就成爲了新的尾節點。



 

        當向鏈表中添加第一個節點時,由於在初始化時

Java代碼   收藏代碼
  1. last = head = new Node<E>(null);  

        因此此時 head與 last指向的是同一個對象new Node<E>(null)。

        以後將last.next指向x。

Java代碼   收藏代碼
  1. last.next = new Node<E>(x);  

        由於此時 head與 last是同一個對象,因此 head.next也指向x。

        最後將 last指向x。

Java代碼   收藏代碼
  1. last =  new Node<E>(x);  

        這樣 head的next就指向了 last。此時head中的 item仍爲 null。

 

        4)put方法

        將指定元素插入到此隊列的尾部,若有必要,則等待空間變得可用。

Java代碼   收藏代碼
  1. /** 
  2.  * 將指定元素插入到此隊列的尾部,若有必要,則等待空間變得可用 
  3.  */  
  4. public void put(E e) throws InterruptedException {  
  5.     //判斷添加元素是否爲null  
  6.     if (e == null)  
  7.         throw new NullPointerException();  
  8.     int c = -1;  
  9.     final ReentrantLock putLock = this.putLock;  
  10.     final AtomicInteger count = this.count;  
  11.     //獲取插入的可中斷鎖  
  12.     putLock.lockInterruptibly();  
  13.     try {  
  14.         try {  
  15.             //判斷隊列是否已滿  
  16.             while (count.get() == capacity)  
  17.                 //若是已滿則阻塞添加線程  
  18.                 notFull.await();  
  19.         } catch (InterruptedException ie) {  
  20.             //失敗就喚醒添加線程  
  21.             notFull.signal();   
  22.             throw ie;  
  23.         }  
  24.         //添加元素  
  25.         insert(e);  
  26.         //修改c值  
  27.         c = count.getAndIncrement();  
  28.         //根據c值判斷隊列是否已滿  
  29.         if (c + 1 < capacity)  
  30.             //未滿則喚醒添加線程  
  31.             notFull.signal();  
  32.     } finally {  
  33.         //釋放鎖  
  34.         putLock.unlock();  
  35.     }  
  36.     //c等於0表明添加成功  
  37.     if (c == 0)  
  38.         signalNotEmpty();  
  39. }  

 

        5.獲取元素

        1)peek方法

        peek方法獲取但不移除此隊列的頭;若是此隊列爲空,則返回 null。

Java代碼   收藏代碼
  1. /** 
  2.  * 獲取但不移除此隊列的頭;若是此隊列爲空,則返回 null 
  3.  */  
  4. public E peek() {  
  5.     //判斷元素數是否爲0  
  6.     if (count.get() == 0)  
  7.         return null;  
  8.     final ReentrantLock takeLock = this.takeLock;  
  9.     //獲取獲取鎖  
  10.     takeLock.lock();  
  11.     try {  
  12.         //頭節點的 next節點即爲添加的第一個節點  
  13.         Node<E> first = head.next;  
  14.         //若是不爲空則返回該節點  
  15.         if (first == null)  
  16.             return null;  
  17.         else  
  18.             return first.item;  
  19.     } finally {  
  20.         //釋放鎖  
  21.         takeLock.unlock();  
  22.     }  
  23. }  

        peek方法從頭節點直接就能夠獲取到第一個添加的元素,因此效率是比較高的。若是不存在則返回null。

        2)poll方法

        poll方法獲取並移除此隊列的頭,若是此隊列爲空,則返回 null。

Java代碼   收藏代碼
  1. /** 
  2.  * 獲取並移除此隊列的頭,若是此隊列爲空,則返回 null 
  3.  */  
  4. public E poll() {  
  5.     final AtomicInteger count = this.count;  
  6.     //判斷元素數量  
  7.     if (count.get() == 0)  
  8.         return null;  
  9.     E x = null;  
  10.     int c = -1;  
  11.     final ReentrantLock takeLock = this.takeLock;  
  12.     //獲取獲取鎖  
  13.     takeLock.lock();  
  14.     try {  
  15.         //再次判斷元素數量  
  16.         if (count.get() > 0) {  
  17.             //調用extract方法獲取第一個元素  
  18.             x = extract();  
  19.             //c=count++  
  20.             c = count.getAndDecrement();  
  21.             //若是隊列中含有元素  
  22.             if (c > 1)  
  23.                 //喚醒讀取線程  
  24.                 notEmpty.signal();  
  25.         }  
  26.     } finally {  
  27.         //釋放鎖  
  28.         takeLock.unlock();  
  29.     }  
  30.     //若是隊列已滿  
  31.     if (c == capacity)  
  32.         //喚醒等待中的添加線程  
  33.         signalNotFull();  
  34.     return x;  
  35. }  

        poll與 peek方法不一樣在於poll獲取完元素後移除這個元素,獲取與移除是經過 extract()方法實現的。

        注意:其中須要注意的是最後部分代碼:

Java代碼   收藏代碼
  1. //若是隊列已滿  
  2. if (c == capacity)  
  3.     //喚醒等待中的添加線程  
  4.     signalNotFull();  

        確定會有朋友有如下疑問:

        1)隊列都已經滿了,還須要喚醒添加線程幹什麼?

        2)線程滿了就不該該再向裏面添加元素了啊?

        3)signalNotFull方法是幹什麼的?

    signalNotFull方法的做用是喚醒等待中的put線程,signalNotFull只能被 take/poll方法調用,如下是 signalNotFull方法的源代碼:

Java代碼   收藏代碼
  1. /** 
  2.  * 喚醒等待中的put線程,只能被 take/poll方法調用 
  3.  */  
  4. private void signalNotFull() {  
  5.     final ReentrantLock putLock = this.putLock;  
  6.     //獲取鎖  
  7.     putLock.lock();  
  8.     try {  
  9.         //喚醒添加線程  
  10.         notFull.signal();  
  11.     } finally {  
  12.         //釋放鎖  
  13.         putLock.unlock();  
  14.     }  
  15. }  

      前兩點問題其實轉換一下角度就能很好的理解了,雖然隊列已經滿了,可是此時本線程已經完成了添加,可是其餘線程還在等待獲取條件進行添加,若是不去主動喚醒的話,那麼這些添加操做就只能無限期的等待下去,因此這些等待的添加操做就會失效。因此此時須要喚醒已經排隊的添加線程,雖然他們已經沒法添加元素至隊列。

 

        3)extract方法

        extract方法用於獲取並移除頭節點。

Java代碼   收藏代碼
  1. /** 
  2.  * 獲取並移除頭節點 
  3.  */  
  4. private E extract() {  
  5.     //獲取第一個節點,即 head的下一個元素  
  6.     Node<E> first = head.next;  
  7.     //將head指向此元素  
  8.     head = first;  
  9.     //獲取元素值  
  10.     E x = first.item;  
  11.     //清除first的item元素爲空,即head元素的item爲空  
  12.     first.item = null;  
  13.     //返回  
  14.     return x;  
  15. }  

        這裏須要注意的是這裏指的頭節點並非 head,而是 head的 next所指 Node的 item元素。由於 head的 item永遠爲 null。last的 next永遠爲 null。

        4)take方法

        獲取並移除此隊列的頭部,在元素變得可用以前一直等待(若是有必要)。

Java代碼   收藏代碼
  1. /** 
  2.  * 獲取並移除此隊列的頭部,在元素變得可用以前一直等待(若是有必要) 
  3.  */  
  4. public E take() throws InterruptedException {  
  5.     E x;  
  6.     int c = -1;  
  7.     final AtomicInteger count = this.count;  
  8.     final ReentrantLock takeLock = this.takeLock;  
  9.     //獲取可中斷鎖  
  10.     takeLock.lockInterruptibly();  
  11.     try {  
  12.         try {  
  13.             //判斷隊列是否含有元素  
  14.             while (count.get() == 0)  
  15.                 //沒有元素就阻塞獲取線程,由於沒有元素因此獲取線程也就沒有必要運行  
  16.                 notEmpty.await();  
  17.         } catch (InterruptedException ie) {  
  18.             //失敗就喚醒獲取線程  
  19.             notEmpty.signal();   
  20.             throw ie;  
  21.         }  
  22.         //調用 extract方法獲取元素  
  23.         x = extract();  
  24.         //計數c的新值  
  25.         c = count.getAndDecrement();  
  26.         //若是元素數大於1  
  27.         if (c > 1)  
  28.             //喚醒獲取線程  
  29.             notEmpty.signal();  
  30.     } finally {  
  31.         //釋放鎖  
  32.         takeLock.unlock();  
  33.     }  
  34.     //若是隊列已滿  
  35.     if (c == capacity)  
  36.         //喚醒還在等待的put線程  
  37.         signalNotFull();  
  38.     return x;  
  39. }  

        與 poll方法相似,只是take方法採用阻塞的方式來獲取元素。

 

        7.其餘方法

        1)remainingCapacity方法

Java代碼   收藏代碼
  1. /** 
  2.  * 返回理想狀況下(沒有內存和資源約束)此隊列可接受而且不會被阻塞的附加元素數量 
  3.  */  
  4. public int remainingCapacity() {  
  5.     return capacity - count.get();  
  6. }  

        也就是返回能夠當即添加元素的數量。

 

        2)iterator方法

        iterator方法返回在隊列中的元素上按適當順序進行迭代的迭代器。返回的 Iterator 是一個「弱一致」的迭代器,從不拋出 ConcurrentModificationException,而且確保可遍歷迭代器構造後所存在的全部元素,而且可能(但並不保證)反映構造後的全部修改。 

Java代碼   收藏代碼
  1. /** 
  2.  * 返回Itr實例 
  3.  */  
  4. public Iterator<E> iterator() {  
  5.     return new Itr();  
  6. }  

        iterator方法返回的是一個Itr內部類的實例,經過這個實例能夠遍歷整個隊列。如下是Itr內部類的源代碼:

Java代碼   收藏代碼
  1. private class Itr implements Iterator<E> {  
  2.     //當前節點  
  3.     private Node<E> current;  
  4.     private Node<E> lastRet;  
  5.     //當前元素  
  6.     private E currentElement;  
  7.   
  8.     Itr() {  
  9.         final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;  
  10.         final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;  
  11.         //獲取獲取與添加鎖  
  12.         putLock.lock();  
  13.         takeLock.lock();  
  14.         try {  
  15.             current = head.next;  
  16.             if (current != null)  
  17.                 currentElement = current.item;  
  18.         } finally {  
  19.             takeLock.unlock();  
  20.             putLock.unlock();  
  21.         }  
  22.     }  
  23.   
  24.     public boolean hasNext() {  
  25.         return current != null;  
  26.     }  
  27.   
  28.     public E next() {  
  29.         final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;  
  30.         final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;  
  31.         putLock.lock();  
  32.         takeLock.lock();  
  33.         try {  
  34.             if (current == null)  
  35.                 throw new NoSuchElementException();  
  36.             E x = currentElement;  
  37.             lastRet = current;  
  38.             current = current.next;  
  39.             if (current != null)  
  40.                 currentElement = current.item;  
  41.             return x;  
  42.         } finally {  
  43.             takeLock.unlock();  
  44.             putLock.unlock();  
  45.         }  
  46.     }  
  47.   
  48.     public void remove() {  
  49.         if (lastRet == null)  
  50.             throw new IllegalStateException();  
  51.         final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;  
  52.         final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;  
  53.         putLock.lock();  
  54.         takeLock.lock();  
  55.         try {  
  56.             Node<E> node = lastRet;  
  57.             lastRet = null;  
  58.             Node<E> trail = head;  
  59.             Node<E> p = head.next;  
  60.             while (p != null && p != node) {  
  61.                 trail = p;  
  62.                 p = p.next;  
  63.             }  
  64.             if (p == node) {  
  65.                 p.item = null;  
  66.                 trail.next = p.next;  
  67.                 if (last == p)  
  68.                     last = trail;  
  69.                 int c = count.getAndDecrement();  
  70.                 if (c == capacity)  
  71.                     notFull.signalAll();  
  72.             }  
  73.         } finally {  
  74.             takeLock.unlock();  
  75.             putLock.unlock();  
  76.         }  
  77.     }  
  78. }  

         Itr類不復雜,我就不詳細解釋了。

        3)清除方法

        clear,drainTo等方法與 ArrayBlockingQueue相似,這裏就不說了。

 

        8,.LinkedBlockingQueue與 ArrayBlockingQueue

        1)內部實現不一樣

        ArrayBlockingQueue內部隊列存儲使用的是數組:

Java代碼   收藏代碼
  1. private final E[] items;  

        而 LinkedBlockingQueue內部隊列存儲使用的是Node節點內部類:

Java代碼   收藏代碼
  1. static class Node<E> {  
  2.     /** The item, volatile to ensure barrier separating write and read */  
  3.     volatile E item;  
  4.     Node<E> next;  
  5.     Node(E x) { item = x; }  
  6. }  

 

        2)隊列中鎖的實現不一樣

Java代碼   收藏代碼
  1. /** LinkedBlockingQueue的獲取鎖 */  
  2. private final ReentrantLock takeLock = new ReentrantLock();  
  3.   
  4. /** LinkedBlockingQueue的添加鎖 */  
  5. private final ReentrantLock putLock = new ReentrantLock();  
  6.   
  7.   
  8. /** ArrayBlockingQueue的惟一鎖 */  
  9. private final ReentrantLock lock;  

        從源代碼就能夠看出 ArrayBlockingQueue實現的隊列中的鎖是沒有分離的,即添加與獲取使用的是同一個鎖;而 LinkedBlockingQueue實現的隊列中的鎖是分離的,即添加用的是 putLock,獲取是 takeLock。

        3)初始化條件不一樣

        ArrayBlockingQueue實現的隊列中必須指定隊列的大小。

        LinkedBlockingQueue實現的隊列中能夠不指定隊列的大小,默認容量爲Integer.MAX_VALUE。

        4)操做不一樣

        ArrayBlockingQueue不管是添加仍是獲取使用的是同一個鎖,因此添加的同時就不能讀取,讀取的同時就不能添加,因此鎖方面性能不如 LinkedBlockingQueue。

        LinkedBlockingQueue讀取與添加操做使用不一樣的鎖,由於其內部實現的特殊性,添加的時候只須要修改 last便可,而不會影響 head節點。而獲取時也只須要修改 head節點便可,一樣不會影響 last節點。因此在添加獲取方面理論上性能會高於 ArrayBlockingQueue。

        因此 LinkedBlockingQueue更適合實現生產者-消費者隊列。

相關文章
相關標籤/搜索