Java併發之BlockingQueue

  1、Queue編程

        Queue是隊列接口是 Collection的子接口。除了基本的 Collection操做外,隊列還提供其餘的插入、提取和檢查操做。每一個方法都存在兩種形式:一種拋出異常(操做失敗時),另外一種返回一個特殊值(null 或 false,具體取決於操做)。插入操做的後一種形式是用於專門爲有容量限制的 Queue 實現設計的;在大多數實現中,插入操做不會失敗。緩存

 

  拋出異常 返回特殊值
插入 add(e) offer(e)
移除 remove(e) poll(e)
檢查 element() peek()

 

        隊列一般(但並不是必定)以 FIFO(先進先出)的方式排序各個元素。不過優先級隊列和 LIFO 隊列(或堆棧)例外,前者根據提供的比較器或元素的天然順序對元素進行排序,後者按 LIFO(後進先出)的方式對元素進行排序。不管使用哪一種排序方式,隊列的頭 都是調用 remove() 或 poll() 所移除的元素。在 FIFO 隊列中,全部的新元素都插入隊列的末尾。其餘種類的隊列可能使用不一樣的元素放置規則。每一個 Queue 實現必須指定其順序屬性。 安全

        若是可能,offer 方法可插入一個元素,失敗則返回 false。這與 Collection.add 方法不一樣,該方法只能經過拋出未經檢查的異常使添加元素失敗。offer 方法設計用於正常的失敗狀況,而不是出現異常的狀況,例如在容量固定(有界)的隊列中。 併發

        remove() 和 poll() 方法可移除和返回隊列的頭。到底從隊列中移除哪一個元素是隊列排序策略的功能,而該策略在各類實現中是不一樣的。remove() 和 poll() 方法僅在隊列爲空時其行爲有所不一樣:remove() 方法拋出一個異常,而 poll() 方法則返回 null。 學習

        element() 和 peek() 獲取但不移除隊列的頭,element與 peek 惟一的不一樣在於:此隊列爲空時將拋出一個異常。this

        Queue 接口並未定義阻塞隊列的方法,而這在併發編程中是很常見的。BlockingQueue 接口則定義了那些等待元素出現或等待隊列中有可用空間的方法,這些方法擴展了此接口。 spa

        Queue 實現一般不容許插入 null 元素,儘管某些實現(如 LinkedList)並不由止插入 null。即便在容許 null 的實現中,也不該該將 null 插入到 Queue 中,由於 null 也用做 poll 方法的一個特殊返回值,代表隊列不包含元素。 線程

        Queue 實現一般未定義 equals 和 hashCode 方法的基於元素的版本,而是從 Object 類繼承了基於身份的版本,由於對於具備相同元素但有不一樣排序屬性的隊列而言,基於元素的相等性並不是老是定義良好的。 設計

        Queue 做爲隊列能夠實現一個按固定順序訪問其內部元素的結構,與 LinkedList等實現不一樣,Queue並不能獲取指定位置的元素。code

        在 ThreadPoolExecutor類中建立線程池時使用的是 BlockingQueue。BlockingQueue是 Queue的子接口,BlockingQueue的實現類有不少:ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue。

        Deque與 Queue不一樣在於,Deque是一個雙端隊列,支持在兩端插入和移除元素。名稱 deque 是「double ended queue(雙端隊列)」的縮寫,一般讀爲「deck」。大多數 Deque 實現對於它們可以包含的元素數沒有固定限制,但此接口既支持有容量限制的雙端隊列,也支持沒有固定大小限制的雙端隊列。 Deque不是咱們要學習的重點,下面就不提了。

        咱們要用到的實現爲 ArrayBlockingQueue, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue,DelayedWorkQueue.其中 DelayedWorkQueue是 ScheduledThreadPoolExecutor的內部類實現。

        頂層接口爲 Queue,而後是 Queue的抽象實現類 AbstractQueue和子接口 BlockingQueue。圖中四個類均繼承於 AbstractQueue並實現 BlockingQueue接口,DelayedWorkQueue一樣實現了 BlockingQueue,但DelayedWorkQueue繼承自 AbstractCollection。

        如下是Queue的源代碼:

Java代碼   收藏代碼
  1. public interface Queue<E> extends Collection<E> {  
  2.     /** 
  3.      * 將指定的元素插入此隊列(若是當即可行且不會違反容量限制),在成功時返回 true,若是當前沒有可用的空間,則拋出IllegalStateException 
  4.      */  
  5.     boolean add(E e);  
  6.   
  7.     /** 
  8.      * 將指定的元素插入此隊列(若是當即可行且不會違反容量限制),當使用有容量限制的隊列時,此方法一般要優於add(E),後者可能沒法插入元素,而只是拋出一個異常 
  9.      */  
  10.     boolean offer(E e);  
  11.   
  12.     /** 
  13.      * 獲取並移除此隊列的頭。此方法與 poll 惟一的不一樣在於:此隊列爲空時將拋出一個異常 
  14.      */  
  15.     E remove();  
  16.   
  17.     /** 
  18.      * 獲取並移除此隊列的頭,若是此隊列爲空,則返回 null 
  19.      */  
  20.     E poll();  
  21.   
  22.     /** 
  23.      * 獲取,可是不移除此隊列的頭。此方法與 peek 惟一的不一樣在於:此隊列爲空時將拋出一個異常 
  24.      */  
  25.     E element();  
  26.   
  27.     /** 
  28.      * 獲取但不移除此隊列的頭;若是此隊列爲空,則返回 null 
  29.      */  
  30.     E peek();  
  31. }  

 

        2、AbstractQueue

        AbstractQueue提供某些 Queue 操做的主要實現。此類中的實現適用於基本實現不 容許包含 null 元素時。add、remove 和 element 方法分別基於 offer、poll 和 peek 方法,可是它們經過拋出異常而不是返回 false 或 null 來指示失敗。 

        擴展此類的 Queue 實現至少必須定義一個不容許插入 null 元素的 Queue.offer(E) 方法,該方法以及 Queue.peek()、Queue.poll()、Collection.size() 和 Collection.iterator() 都支持 Iterator.remove() 方法。一般還要重寫其餘方法。若是沒法知足這些要求,那麼能夠轉而考慮爲 AbstractCollection 建立子類。 

        如下是 AbstractQueue的源代碼:

Java代碼   收藏代碼
  1. public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E> {  
  2.   
  3.     /** 
  4.      * 子類使用的構造方法 
  5.      */  
  6.     protected AbstractQueue() {  
  7.     }  
  8.   
  9.     /** 
  10.      * 將指定的元素插入到此隊列中(若是當即可行且不會違反容量限制),在成功時返回 true,若是當前沒有可用空間,則拋出 IllegalStateException。 
  11.      */  
  12.     public boolean add(E e) {  
  13.         if (offer(e))  
  14.             return true;  
  15.         else  
  16.             throw new IllegalStateException("Queue full");  
  17.     }  
  18.   
  19.     /** 
  20.      * 獲取並移除此隊列的頭。此方法與 poll 惟一的不一樣在於:此隊列爲空時將拋出一個異常。 
  21.      * 除非隊列爲空,不然此實現返回 poll 的結果。  
  22.      */  
  23.     public E remove() {  
  24.         E x = poll();  
  25.         if (x != null)  
  26.             return x;  
  27.         else  
  28.             throw new NoSuchElementException();  
  29.     }  
  30.   
  31.     /** 
  32.      * 獲取但不移除此隊列的頭。此方法與 peek 惟一的不一樣在於:此隊列爲空時將拋出一個異常。 
  33.      * 除非隊列爲空,不然此實現返回 peek 的結果。 
  34.      */  
  35.     public E element() {  
  36.         E x = peek();  
  37.         if (x != null)  
  38.             return x;  
  39.         else  
  40.             throw new NoSuchElementException();  
  41.     }  
  42.   
  43.     /** 
  44.      * 移除此隊列中的全部元素。此調用返回後,隊列將爲空。  
  45.      * 此實現重複調用 poll,直到它返回 null 爲止。  
  46.      */  
  47.     public void clear() {  
  48.         while (poll() != null)  
  49.             ;  
  50.     }  
  51.   
  52.     /** 
  53.      * 將指定 collection 中的全部元素都添加到此隊列中。 
  54.      * 若是試圖將某一隊列 addAll 到該隊列自己中,則會致使 IllegalArgumentException。 
  55.      * 此外,若是正在進行此操做時修改指定的 collection,則此操做的行爲是不肯定的。 
  56.      * 此實如今指定的 collection 上進行迭代,並依次將迭代器返回的每個元素添加到此隊列中。 
  57.      * 在試圖添加某一元素(尤爲是 null 元素)時若是遇到了運行時異常,則可能致使在拋出相關異常時只成功地添加了某些元素。 
  58.      */  
  59.     public boolean addAll(Collection<? extends E> c) {  
  60.         if (c == null)  
  61.             throw new NullPointerException();  
  62.         if (c == this)  
  63.             throw new IllegalArgumentException();  
  64.         boolean modified = false;  
  65.         Iterator<? extends E> e = c.iterator();  
  66.         while (e.hasNext()) {  
  67.             if (add(e.next()))  
  68.                 modified = true;  
  69.         }  
  70.         return modified;  
  71.     }  
  72. }  

 

        3、BlockingQueue

        阻塞隊列(BlockingQueue)是一個支持兩個附加操做的隊列。這兩個附加的操做是:在隊列爲空時,獲取元素的線程會等待隊列變爲非空。當隊列滿時,存儲元素的線程會等待隊列可用,從而產生阻塞。阻塞隊列經常使用於生產者和消費者的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是生產者存放元素的緩存容器,而消費者也只從容器裏拿元素。

        BlockingQueue 的方法以四種形式出現,這四種形式的處理方式不一樣:第一種是拋出一個異常,第二種是返回一個特殊值(null 或 false,具體取決於操做),第三種是在操做能夠成功前,無限期地阻塞當前線程,第四種是在放棄前只在給定的最大時間限制內阻塞。下表中總結了這些方法:

 

     拋出異常      返回特殊值       阻塞    超時
   插入    add(e) offer(e) put(e) offer(e, time, unit)
   移除    remove() poll() take() poll(time, unit)
   檢查    element() peek() - -

 

        • 拋出異常:是指當阻塞隊列滿時候,再往隊列裏插入元素,會拋出IllegalStateException("Queue full")異常。當隊列爲空時,從隊列裏獲取元素時會拋出NoSuchElementException異常 。 

        • 返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從隊列裏拿出一個元素,若是沒有則返回null 

        • 阻塞:當阻塞隊列滿時,若是生產者線程往隊列裏put元素,隊列會一直阻塞生產者線程,直到拿到數據,或者響應中斷退出。當隊列空時,消費者線程試圖從隊列裏take元素,隊列也會阻塞消費者線程,直到隊列可用。 

        • 超時:當阻塞隊列滿時,隊列會阻塞生產者線程一段時間,若是超過必定的時間,生產者線程就會退出。 

        BlockingQueue 不接受 null 元素。試圖 add、put 或 offer 一個 null 元素時,某些實現會拋出 NullPointerException。null 被用做指示 poll 操做失敗的警惕值。 

        BlockingQueue 能夠是限定容量的。它在任意給定時間均可以有一個 remainingCapacity,超出此容量,便沒法無阻塞地 put 附加元素。沒有任何內部容量約束的 BlockingQueue 老是報告 Integer.MAX_VALUE 的剩餘容量。 

        BlockingQueue 實現主要用於生產者-消費者隊列,但它另外還支持 Collection 接口。所以,舉例來講,使用 remove(x) 從隊列中移除任意一個元素是有可能的。然而,這種操做一般不 會有效執行,只能有計劃地偶爾使用,好比在取消排隊信息時。 

        題外話:所謂生產者消費者模式,這裏簡單介紹一下。好比咱們在餐廳吃飯,咱們就是消費者,餐廳的廚師就是生產者,而餐廳的服務員就是一個緩衝環節。當生產者製做好菜品(生產產品),交由服務員(緩衝區),由服務員將菜品送至顧客(消費者)品用。


       

    生產者-消費者模式最重要的做用就是解耦,利用緩衝區將二者分離。若是每個廚師作完了菜都須要親自送到顧客桌上,那麼這樣就是將廚師與顧客綁定到了一塊兒。加入中間緩衝環節,也就是服務員,將送菜的任務交由服務員(緩衝區)去處理,這樣生產者與消費者就能夠各自作本身的事情了。在此模式下二者間支持併發操做,由於飯店的廚師確定不止一個,顧客也是如此。再有就是支持二者間不一樣步,由於二者間的數量與效率是不一樣步的,這就會致使生產與消費的速度不一樣。

         BlockingQueue 實現是線程安全的。全部排隊方法均可以使用內部鎖或其餘形式的併發控制來自動達到它們的目的。然而,大量的 Collection 操做(addAll、containsAll、retainAll 和 removeAll)沒有 必要自動執行,除非在實現中特別說明。所以,舉例來講,在只添加了 c 中的一些元素後,addAll(c) 有可能失敗(拋出一個異常)。 

        BlockingQueue 實質上不 支持使用任何一種「close」或「shutdown」操做來指示再也不添加任何項。這種功能的需求和使用有依賴於實現的傾向。例如,一種經常使用的策略是:對於生產者,插入特殊的 end-of-stream 或 poison 對象,並根據使用者獲取這些對象的時間來對它們進行解釋。 

       注意,BlockingQueue 能夠安全地與多個生產者和多個使用者一塊兒使用。 

       如下是基於典型的生產者-消費者場景的一個用例:

Java代碼   收藏代碼
  1. class Producer implements Runnable {  
  2.     private final BlockingQueue queue;  
  3.     private int i;  
  4.   
  5.     Producer(BlockingQueue q) {  
  6.         queue = q;  
  7.     }  
  8.   
  9.     public void run() {  
  10.         try {  
  11.             while (true) {  
  12.                 queue.put(produce());// 將產品放入緩衝隊列  
  13.             }  
  14.         } catch (InterruptedException e) {  
  15.             e.printStackTrace();  
  16.         }  
  17.     }  
  18.   
  19.     int produce() {  
  20.         return i++;// 生產產品  
  21.     }  
  22. }  
  23.   
  24. class Consumer implements Runnable {  
  25.     private final BlockingQueue queue;  
  26.   
  27.     Consumer(BlockingQueue q) {  
  28.         queue = q;  
  29.     }  
  30.   
  31.     public void run() {  
  32.         try {  
  33.             while (true) {  
  34.                 consume(queue.take());// 從緩衝隊列取出產品  
  35.             }  
  36.         } catch (InterruptedException e) {  
  37.             e.printStackTrace();  
  38.         }  
  39.     }  
  40.   
  41.     void consume(Object x) {  
  42.         System.out.println("消費:"+x);// 消費產品  
  43.     }  
  44. }  
  45.   
  46. public class Runner {  
  47.     public static void main(String[] args) {  
  48.         BlockingQueue q = new LinkedBlockingQueue<Integer>(10);// 或其餘實現  
  49.         Producer p = new Producer(q);  
  50.         Consumer c1 = new Consumer(q);  
  51.         Consumer c2 = new Consumer(q);  
  52.         new Thread(p).start();  
  53.         new Thread(c1).start();  
  54.         new Thread(c2).start();  
  55.     }  
  56. }  
  57. //結果:  
  58. ...  
  59. 消費:160607  
  60. 消費:160608  
  61. 消費:160609  
  62. 消費:160610  
  63. 消費:160611  
  64. ...  

        當生產者與消費者線程啓動後,首先生產者會不斷往隊列中添加產品,一旦隊列填滿則生產中止,而後消費者從隊列中取出產品使用,顯然過程當中使用了相似於 wait與 notify的流程,後面會詳細分析。

        如下是 BlockingQueue的源代碼:

Java代碼   收藏代碼
  1. public interface BlockingQueue<E> extends Queue<E> {  
  2.     /** 
  3.      * 將指定元素插入此隊列中(若是當即可行且不會違反容量限制),成功時返回 true, 
  4.      * 若是當前沒有可用的空間,則拋出 IllegalStateException。 
  5.      * 當使用有容量限制的隊列時,一般首選 offer 
  6.      */  
  7.     boolean add(E e);  
  8.   
  9.     /** 
  10.      * 將指定元素插入此隊列中(若是當即可行且不會違反容量限制),成功時返回 true, 
  11.      * 若是當前沒有可用的空間,則返回 false。 
  12.      * 當使用有容量限制的隊列時,此方法一般要優於 add(E),後者可能沒法插入元素,而只是拋出一個異常 
  13.      */  
  14.     boolean offer(E e);  
  15.   
  16.     /** 
  17.      * 將指定元素插入此隊列中,將等待可用的空間(即阻塞) 
  18.      */  
  19.     void put(E e) throws InterruptedException;  
  20.   
  21.     /** 
  22.      * 將指定元素插入此隊列中,在到達指定的等待時間前等待可用的空間 
  23.      */  
  24.     boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;  
  25.   
  26.     /** 
  27.      * 獲取並移除此隊列的頭部,在元素變得可用以前一直等待 
  28.      */  
  29.     E take() throws InterruptedException;  
  30.   
  31.     /** 
  32.      * 獲取並移除此隊列的頭部,在指定的等待時間前等待可用的元素 
  33.      */  
  34.     E poll(long timeout, TimeUnit unit) throws InterruptedException;  
  35.   
  36.     /** 
  37.      * 返回在無阻塞的理想狀況下(不存在內存或資源約束)此隊列能接受的附加元素數量; 
  38.      * 若是沒有內部限制,則返回 Integer.MAX_VALUE 
  39.      */  
  40.     int remainingCapacity();  
  41.   
  42.     /** 
  43.      * 今後隊列中移除指定元素的單個實例(若是存在)。 
  44.      * 更確切地講,若是此隊列包含一個或多個知足 o.equals(e) 的元素 e,則移除該元素。 
  45.      * 若是此隊列包含指定元素(或者此隊列因爲調用而發生更改),則返回 true 
  46.      */  
  47.     boolean remove(Object o);  
  48.   
  49.     /** 
  50.      * 若是此隊列包含指定元素,則返回 true。更確切地講,當且僅當此隊列至少包含一個知足 o.equals(e) 的元素 e時,返回 true 
  51.      */  
  52.     public boolean contains(Object o);  
  53.   
  54.     /** 
  55.      * 移除此隊列中全部可用的元素,並將它們添加到給定 collection 中。此操做可能比反覆輪詢此隊列更有效。 
  56.      * 在試圖向 collection c 中添加元素沒有成功時,可能致使在拋出相關異常時, 
  57.      * 元素會同時在兩個 collection 中出現,或者在其中一個 collection中出現,也可能在兩個 collection 中都不出現。 
  58.      * 若是試圖將一個隊列放入自身隊列中,則會致使 IllegalArgumentException 異常。 
  59.      * 此外,若是正在進行此操做時修改指定的 collection,則此操做行爲是不肯定的。 
  60.      */  
  61.     int drainTo(Collection<? super E> c);  
  62.   
  63.     /** 
  64.      * 最多今後隊列中移除給定數量的可用元素,並將這些元素添加到給定 collection 中。 
  65.      * 在試圖向 collection c中添加元素沒有成功時,可能致使在拋出相關異常時, 
  66.      * 元素會同時在兩個 collection 中出現,或者在其中一個 collection中出現,也可能在兩個 collection 中都不出現。 
  67.      * 若是試圖將一個隊列放入自身隊列中,則會致使 IllegalArgumentException 異常。 
  68.      * 此外,若是正在進行此操做時修改指定的 collection,則此操做行爲是不肯定的。 
  69.      */  
  70.     int drainTo(Collection<? super E> c, int maxElements);  
  71. }  

        後續幾篇介紹阻塞隊列的相關實現。

相關文章
相關標籤/搜索