BlockingQueue支持兩個附加操做的Queue:1)當Queue爲空時,獲取元素線程被阻塞直到Queue變爲非空;2)當Queue滿時,添加元素線程被阻塞直到Queue不滿。BlockingQueue不容許元素爲null,若是入隊一個null元素,會拋NullPointerException。經常使用於生產者消費者模式。node
BlockingQueue對於不能知足條件的操做,提供了四種處理方式:編程
1)直接拋異常,拋出異常。若是隊列已滿,添加元素會拋出IllegalStateException異常;若是隊列爲空,獲取元素會拋出NoSuchElementException異常;數組
2)返回一個特殊值(null或false);安全
3)在知足條件以前,無限期的阻塞當前線程,當隊列知足條件或響應中斷退出;併發
4)在有限時間內阻塞當前線程,超時後返回失敗。app
拋出異常 | 返回特殊值 | 阻塞 | 超時 | |
入隊 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
出隊 | remove() | poll() | take() | poll(time, unit) |
檢查 | element() | peek() |
內存一致性效果:當存在其餘併發 collection 時,將對象放入 BlockingQueue 以前的線程中的操做 happen-before 隨後經過另外一線程從 BlockingQueue 中訪問或移除該元素的操做。less
JDK提供的阻塞隊列:函數
ArrayBlockingQueue:一個由數組結構組成的有界阻塞隊列,遵循FIFO原則。高併發
LinkedBlockingQueue:一個由鏈表結構組成的有界阻塞隊列,遵循FIFO原則,默認和最大長度爲Integer.MAX_VALUE。源碼分析
PriorityBlockingQueue:一個支持優先級排序的無界阻塞隊列。
DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。
SynchronousQueue:一個不存儲元素的阻塞隊列。
LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。
示例:生產者-消費者,BlockingQueue 能夠安全地與多個生產者和多個使用者一塊兒使用。
class Producer implements Runnable { private final BlockingQueue queue; Producer(BlockingQueue q) { queue = q; } public void run() { try { while(true) { queue.put(produce()); } //當隊列滿時,生產者阻塞等待 } catch (InterruptedException ex) { ... handle ...} } Object produce() { ... } } //消費者 class Consumer implements Runnable { private final BlockingQueue queue; Consumer(BlockingQueue q) { queue = q; } public void run() { try { while(true) { consume(queue.take()); } //當隊列空時,消費者阻塞等待 } catch (InterruptedException ex) { ... handle ...} } void consume(Object x) { ... } } class Setup { void main() { BlockingQueue q = new SomeQueueImplementation(); Producer p = new Producer(q); Consumer c1 = new Consumer(q); Consumer c2 = new Consumer(q); new Thread(p).start(); new Thread(c1).start(); new Thread(c2).start(); } }
當隊列滿時,生產者會一直阻塞,當消費者從隊列中取出元素時,如何通知生產者隊列能夠繼續,以ArrayBlockingQueue和LinkedBlockingQueue爲例,分析源代碼如何實現阻塞隊列。它們的阻塞機制都是基於Lock和Condition實現,其中LinkedBlockingQueue還用到了原子變量類。
1 /** The queued items */ 2 final Object[] items; 3 /** items index for next take, poll, peek or remove */ 4 int takeIndex; 5 /** items index for next put, offer, or add */ 6 int putIndex; 7 /** Number of elements in the queue */ 8 int count; 9 /* 10 * Concurrency control uses the classic two-condition algorithm 11 * found in any textbook. 12 */ 13 14 /** Main lock guarding all access */ 15 final ReentrantLock lock; 16 /** Condition for waiting takes */ 17 private final Condition notEmpty; 18 /** Condition for waiting puts */ 19 private final Condition notFull;
由ArrayBlockingQueue的域能夠看出,使用循環數組存儲隊列中的元素,兩個索引takeIndex和putIndex分別指向下一個要出隊和入隊的數組位置,線程間的通訊是使用ReentrantLock和兩個Condition實現的。
當不知足入隊或出隊條件時,當前線程阻塞等待。即當隊列滿時,生產者會一直阻塞直到被喚醒,當隊列空時,消費者會一直阻塞直到被喚醒。
入隊(put)
1 //在隊列的尾部(當前putIndex指定的位置)插入指定的元素 2 public void put(E e) throws InterruptedException { 3 checkNotNull(e); 4 final ReentrantLock lock = this.lock; 5 lock.lockInterruptibly(); //可響應中斷獲取鎖 6 try { 7 while (count == items.length) //若是隊列滿,在入隊條件notFull的等待隊列上等待。 8 //這裏使用While循環而非if判斷,目的是防止過早或意外的通知,只有條件符合才能推出循環 9 notFull.await(); 10 insert(e); 11 } finally { 12 lock.unlock(); //釋放鎖,喚醒同步隊列中的後繼節點 13 } 14 } 15 //爲保證操做線程安全,此方法必須在獲取鎖的前提下才能被調用 16 private void insert(E x) { 17 items[putIndex] = x; 18 putIndex = inc(putIndex); 19 ++count; //元素數量+1 20 notEmpty.signal(); //喚醒出隊條件的等待隊列上的線程 21 } 22 //將i增1,當++i等於數組的最大容量時,將i置爲0。即經過循環數組的方式 23 final int inc(int i) { 24 return (++i == items.length) ? 0 : i; 25 }
從源碼能夠看出,入隊的大體步驟以下:
1)首先獲取鎖,若是獲取鎖失敗,當前線程可能自旋獲取鎖或被阻塞直到獲取到鎖,不然執行2);
2)循環判斷隊列是否滿,若是滿,那麼當前線程被阻塞到notFull條件的等待隊列中,並釋放鎖,等待被喚醒;
3)當隊列非滿或從await方法中返回(此時當前線程從等待隊列中被喚醒並從新獲取到鎖)時,執行插入元素操做。
4)入隊完成後,釋放鎖,喚醒同步隊列中的後繼節點。
出隊(take)
1 public E take() throws InterruptedException { 2 final ReentrantLock lock = this.lock; 3 lock.lockInterruptibly(); //可響應中斷獲取鎖 4 try { 5 while (count == 0) //若是隊列爲空,在出隊條件notEmpty的等待隊列中等待 6 notEmpty.await(); 7 return extract(); 8 } finally { 9 lock.unlock(); //釋放鎖 10 } 11 } 12 //在當前takeIndex指定的位置取出元素,此方法必須在獲取鎖的前提下才能被調用 13 private E extract() { 14 final Object[] items = this.items; 15 E x = this.<E>cast(items[takeIndex]); //強制類型轉換 16 items[takeIndex] = null; 17 takeIndex = inc(takeIndex); //出隊索引一樣採用循環的方式增1 18 --count; 19 notFull.signal(); //喚醒入隊條件的等待隊列中的線程 20 return x; 21 }
從源碼能夠看出,出隊的大體步驟以下:
1)首先獲取鎖,若是獲取鎖失敗,當前線程可能自旋獲取鎖或被阻塞直到獲取到鎖成功。
2)獲取鎖成功,循環判斷隊列是否爲空,若是爲空,那麼當前線程被阻塞到 notEmpty 條件的等待隊列中,並釋放鎖,等待被喚醒;
3)當隊列非空或從await方法中返回(此時當前線程從等待隊列中被喚醒並從新獲取到鎖)時,執行取出元素操做。
4)出隊完成後,釋放鎖,喚醒同步隊列的後繼節點,
當不能知足入隊或出隊條件時,返回特殊值。當隊列滿時,入隊會失敗,offer方法直接返回false,反之入隊成功,返回true;當隊列空時,poll方法返回null。
入隊(offer)
1 public boolean offer(E e) { 2 checkNotNull(e); 3 final ReentrantLock lock = this.lock; 4 lock.lock(); //獲取鎖 5 try { 6 if (count == items.length) //若是隊列滿,與put阻塞當前線程不一樣的是,offer方法直接返回false 7 return false; 8 else { 9 insert(e); 10 return true; 11 } 12 } finally { 13 lock.unlock(); //釋放鎖 14 } 15 }
出隊(poll)
1 //出隊 2 public E poll() { 3 final ReentrantLock lock = this.lock; 4 lock.lock(); //獲取鎖 5 try { 6 return (count == 0) ? null : extract(); //若是隊列空,與take阻塞當前線程不一樣的是,poll方法返回null 7 } finally { 8 lock.unlock(); //釋放鎖 9 } 10 }
當不能知足入隊或出隊條件時,直接拋出異常。當隊列滿時,入隊失敗,拋IllegalStateException("Queue full");當隊列空時,remove方法拋NoSuchElementException()異常。
入隊(add)
1 public boolean add(E e) { 2 return super.add(e); 3 } 4 5 //抽象類AbstractQueue提供的方法 6 public boolean add(E e) { 7 //若是offer返回true,那麼add方法返回true;若是offer返回false,那麼add方法拋IllegalStateException("Queue full")異常 8 if (offer(e)) 9 return true; 10 else 11 throw new IllegalStateException("Queue full"); 12 }
出隊(remove)
1 //抽象類AbstractQueue提供的方法 2 public E remove() { 3 E x = poll(); 4 if (x != null) 5 return x; 6 else 7 throw new NoSuchElementException(); 8 }
使用Condition的超時等待機制實現,當不知足條件時,只在有限的時間內阻塞,超過超時時間仍然不知足條件才返回false或null。
入隊(offer(E e, long timeout, TimeUnit unit))
1 public boolean offer(E e, long timeout, TimeUnit unit) 2 throws InterruptedException { 3 4 checkNotNull(e); 5 long nanos = unit.toNanos(timeout); //轉換爲納秒 6 final ReentrantLock lock = this.lock; 7 lock.lockInterruptibly(); 8 try { 9 while (count == items.length) { 10 if (nanos <= 0) 11 return false; 12 nanos = notFull.awaitNanos(nanos); //與offer直接返回false不一樣,此處使用Condition的超時等待機制實現,超過等待時間若是仍然不知足條件才返回false 13 } 14 insert(e); 15 return true; 16 } finally { 17 lock.unlock(); 18 } 19 }
出隊(poll(long timeout, TimeUnit unit))
1 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 2 long nanos = unit.toNanos(timeout); 3 final ReentrantLock lock = this.lock; 4 lock.lockInterruptibly(); 5 try { 6 while (count == 0) { 7 if (nanos <= 0) 8 return null; 9 nanos = notEmpty.awaitNanos(nanos); 10 } 11 return extract(); 12 } finally { 13 lock.unlock(); 14 } 15 }
Executors建立固定大小線程池的代碼,就使用了LinkedBlockingQueue來做爲任務隊列。
1 /** The capacity bound, or Integer.MAX_VALUE if none */ 2 private final int capacity; //隊列最大容量,默認爲Integer.MAX_VALUE 3 /** Current number of elements */ 4 private final AtomicInteger count = new AtomicInteger(0); //當前元素數量,原子類保證線程安全 5 /** 6 * Head of linked list. 7 * Invariant: head.item == null 8 */ 9 private transient Node<E> head; //隊列的首節點,head節點是個空節點,head.item == null,實際存儲元素的第一個節點是head.next 10 /** 11 * Tail of linked list. 12 * Invariant: last.next == null 13 */ 14 private transient Node<E> last; //隊列的尾節點 15 /** Lock held by take, poll, etc */ 16 private final ReentrantLock takeLock = new ReentrantLock(); //出隊鎖 17 /** Wait queue for waiting takes */ 18 private final Condition notEmpty = takeLock.newCondition(); //出隊條件 19 /** Lock held by put, offer, etc */ 20 private final ReentrantLock putLock = new ReentrantLock(); //入隊鎖 21 /** Wait queue for waiting puts */ 22 private final Condition notFull = putLock.newCondition(); //入隊條件
Node類:
1 static class Node<E> { 2 E item; //元素 3 /** 4 * One of: 5 * - the real successor Node 6 * - this Node, meaning the successor is head.next 7 * - null, meaning there is no successor (this is the last node) 8 */ 9 Node<E> next; //後繼節點,LinkedBlockingQueue使用的是單向鏈表 10 Node(E x) { item = x; } 11 }
由LinkedBlockingQueue的域能夠看出,它使用鏈表存儲元素。線程間的通訊也是使用ReentrantLock和Condition實現的,與ArrayBlockingQueue不一樣的是,LinkedBlockingQueue在入隊和出隊操做時分別使用兩個鎖putLock和takeLock。
思考問題一:爲何使用兩把鎖?
爲了提升併發度和吞吐量,使用兩把鎖,takeLock只負責出隊,putLock只負責入隊,入隊和出隊能夠同時進行,提升入隊和出隊操做的效率,增大隊列的吞吐量。LinkedBlockingQueue隊列的吞吐量一般要高於ArrayBlockingQueue隊列,可是在高併發條件下可預測性下降。
思考問題二:ArrayBlockingQueue中的count是一個普通的int型變量,LinkedBlockingQueue的count爲何是AtomicInteger類型的?
由於ArrayBlockingQueue的入隊和出隊操做使用同一把鎖,對count的修改都是在處於線程獲取鎖的狀況下進行操做,所以不會有線程安全問題。而LinkedBlockingQueue的入隊和出隊操做使用的是不一樣的鎖,會有對count變量併發修改的狀況,因此使用原子變量保證線程安全。
思考問題三:像notEmpty、takeLock、count域等都聲明爲final型,final成員變量有什麼特色?
1)對於一個final變量,若是是基本數據類型的變量,則其數值一旦在初始化以後便不能更改;若是是引用類型的變量,則在對其初始化以後便不能再讓其指向另外一個對象。
2)對於一個final成員變量,必須在定義時或者構造器中進行初始化賦值,並且final變量一旦被初始化賦值以後,就不能再被賦值了。只要對象是正確構造的(被構造對象的引用在構造函數中沒有「逸出」),那麼不須要使用同步(指lock和volatile的使用)就能夠保證任意線程都能看到這個final域在構造函數中被初始化以後的值。
1 //指定容量,默認爲Integer.MAX_VALUE 2 public LinkedBlockingQueue(int capacity) { 3 if (capacity <= 0) throw new IllegalArgumentException(); 4 this.capacity = capacity; 5 last = head = new Node<E>(null); //構造元素爲null的head節點,並將last指向head節點 6 }
阻塞式入隊(put)
1 public void put(E e) throws InterruptedException { 2 if (e == null) throw new NullPointerException(); 3 // Note: convention in all put/take/etc is to preset local var 預置本地變量,例如入隊鎖賦給局部變量putLock 4 // holding count negative to indicate failure unless set. 5 int c = -1; 6 Node<E> node = new Node(e); //構造新節點 7 //預置本地變量putLock和count 8 final ReentrantLock putLock = this.putLock; 9 final AtomicInteger count = this.count; 10 putLock.lockInterruptibly(); //可中斷獲取入隊鎖 11 try { 12 /* 13 * Note that count is used in wait guard even though it is 14 * not protected by lock. This works because count can 15 * only decrease at this point (all other puts are shut 16 * out by lock), and we (or some other waiting put) are 17 * signalled if it ever changes from capacity. Similarly 18 * for all other uses of count in other wait guards. 19 */ 20 while (count.get() == capacity) { 21 notFull.await(); 22 } 23 enqueue(node); //在隊尾插入node 24 c = count.getAndIncrement(); //count原子方式增1,返回值c爲count增加以前的值 25 if (c + 1 < capacity) //若是隊列未滿,通知入隊線程(notFull條件等待隊列中的線程) 26 notFull.signal(); 27 } finally { 28 putLock.unlock(); //釋放入隊鎖 29 } 30 //若是入隊該元素以前隊列中元素數量爲0,那麼通知出隊線程(notEmpty條件等待隊列中的線程) 31 if (c == 0) 32 signalNotEmpty(); 33 } 34 35 //通知出隊線程(notEmpty條件等待隊列中的線程) 36 private void signalNotEmpty() { 37 final ReentrantLock takeLock = this.takeLock; 38 takeLock.lock(); //獲取出隊鎖,調用notEmpty條件的方法的前提 39 try { 40 notEmpty.signal(); //喚醒一個等待出隊的線程 41 } finally { 42 takeLock.unlock(); //釋放出隊鎖 43 } 44 } 45 46 private void enqueue(Node<E> node) { 47 // assert putLock.isHeldByCurrentThread(); 48 // assert last.next == null; 49 last = last.next = node; 50 }
思考問題一:爲何要再聲明一個final局部變量指向putLock和count,直接使用成員變量不行嗎?
直接使用成員變量:每次調用putLock的方法,都須要先經過this指針找到Heap中的Queue實例,而後在根據Queue實例的putLock域引用找到Lock實例,最後才能調用Lock的方法(即將相應的方法信息組裝成棧幀壓入棧頂)。聲明一個final局部變量指向putLock:先經過this指針找到Heap中的Queue實例,將Queue實例的putLock域存儲的Lock實例的地址賦給局部變量putLock,之後須要調用putLock的方法時,直接使用局部變量putLock引用就能夠找到Lock實例。簡化了查找Lock實例的過程。count變量也是一樣的道理。我的理解應該是爲了提高效率。
思考問題二:使用兩把鎖怎麼保證元素的可見性?
例如:入隊線程使用put方法在隊列尾部插入一個元素,怎麼保證出隊線程能看到這個元素?ArrayBlockingQueue的入隊和出隊使用同一個鎖,因此沒有可見性問題。
在LinkedBlockingQueue中,每次一個元素入隊, 都須要獲取putLock和更新count,而出隊線程爲了保證可見性,須要獲取fullyLock(fullyLock方法用於一些批量操做,對全局加鎖)或者獲取takeLock,而後讀取count.get()。由於volatile對象的寫操做happen-before讀操做,也就是寫線程先寫的操做對隨後的讀線程是可見的,volatile至關於一個內存屏障,volatile後面的指令不容許重排序到它以前,而count是原子整型類,是基於volatile變量和CAS機制實現。因此就保證了可見性,寫線程修改count-->讀線程讀取count-->讀線程。
思考問題三:在put方法中,爲何喚醒出隊線程的方法signalNotEmpty()要放在釋放putLock鎖(putLock.unlock())以後?一樣,take也有一樣的疑問?
避免死鎖的發生,由於signalNotEmpty()方法中要獲取takeLock鎖。若是放在釋放putLock以前,至關於在入隊線程須要先獲取putLock鎖,再獲取takeLock鎖。例如:當入隊線程先獲取到putLock鎖,並嘗試獲取takeLock鎖,出隊線程獲取到takeLock鎖,並嘗試獲取putLock鎖時,就會產生死鎖。
思考問題四:什麼是級聯通知?
好比put操做會調用notEmpty的notify,只會喚醒一個等待的讀線程來take,take以後若是發現還有剩餘的元素,會繼續調用notify,通知下一個線程來獲取。
阻塞式出隊(take)
1 public E take() throws InterruptedException { 2 E x; 3 int c = -1; 4 final AtomicInteger count = this.count; 5 final ReentrantLock takeLock = this.takeLock; 6 takeLock.lockInterruptibly(); //可中斷獲取出隊鎖 7 try { 8 while (count.get() == 0) { //若是隊列爲空,阻塞線程同時釋放鎖 9 notEmpty.await(); 10 } 11 x = dequeue(); //從隊列頭彈出元素 12 c = count.getAndDecrement(); //count原子式遞減 13 //c>1說明本次出隊後,隊列中還有元素 14 if (c > 1) 15 notEmpty.signal(); //喚醒一個等待出隊的線程 16 } finally { 17 takeLock.unlock(); //釋放出隊鎖 18 } 19 //c == capacity說明本次出隊以前是滿隊列,喚醒一個等待NotFull的線程 20 if (c == capacity) 21 signalNotFull(); 22 return x; 23 } 24 25 //喚醒一個等待NotFull條件的線程 26 private void signalNotFull() { 27 final ReentrantLock putLock = this.putLock; 28 putLock.lock(); 29 try { 30 notFull.signal(); 31 } finally { 32 putLock.unlock(); 33 } 34 } 35 36 //從隊列頭彈出元素 37 private E dequeue() { 38 // assert takeLock.isHeldByCurrentThread(); 39 // assert head.item == null; 40 Node<E> h = head; 41 Node<E> first = h.next; 42 h.next = h; // help GC 43 head = first; 44 E x = first.item; 45 first.item = null; 46 return x; 47 }
1)remove
刪除指定元素 全局加鎖
1 public boolean remove(Object o) { //正經常使用不到remove方法,queue正常只使用入隊出隊操做. 2 if (o == null) return false; 3 fullyLock(); // 兩個鎖都鎖上了,禁止進行入隊出隊操做. 4 try { 5 for (Node<E> trail = head, p = trail.next; 6 p != null; // 按順序一個一個檢索,直到p==null 7 trail = p, p = p.next) { 8 if (o.equals(p.item)) { //找到了,就刪除掉 9 unlink(p, trail); //刪除操做是一個unlink方法,意思是p從LinkedList鏈路中解除. 10 return true; //返回刪除成功 11 } 12 } 13 return false; 14 } finally { 15 fullyUnlock(); 16 } 17 }
2)contains
判斷是否包含指定的元素 全局加鎖
1 public boolean contains(Object o) { //這種須要檢索的操做都是對全局加鎖的,很影響性能,要當心使用! 2 if (o == null) return false; 3 fullyLock(); 4 try { 5 for (Node<E> p = head.next; p != null; p = p.next) 6 if (o.equals(p.item)) 7 return true; 8 return false; 9 } finally { 10 fullyUnlock(); 11 } 12 }
全局加鎖和解鎖的方法做爲公共的方法供其餘須要全局鎖的方法調用,避免因爲獲取鎖的順序不一致致使死鎖。另外fullyLock和fullyUnlock兩個方法對鎖的操做要相反。
1 /** 2 * Lock to prevent both puts and takes. 3 */ 4 void fullyLock() { 5 putLock.lock(); 6 takeLock.lock(); 7 } 8 9 /** 10 * Unlock to allow both puts and takes. 11 */ 12 void fullyUnlock() { 13 takeLock.unlock(); 14 putLock.unlock(); 15 }
3)迭代器Iterator
弱一致性,不會不會拋出ConcurrentModificationException異常,不會阻止遍歷的時候對queue進行修改操做,可能會遍歷到修改操做的結果.
ArrayBlockingQueue因爲其底層基於數組,而且在建立時指定存儲的大小,在完成後就會當即在內存分配固定大小容量的數組元素,所以其存儲一般有限,故其是一個「有界「的阻塞隊列;而LinkedBlockingQueue能夠由用戶指定最大存儲容量,也能夠無需指定,若是不指定則最大存儲容量將是Integer.MAX_VALUE,便可以看做是一個「無界」的阻塞隊列,因爲其節點的建立都是動態建立,而且在節點出隊列後能夠被GC所回收,所以其具備靈活的伸縮性。可是因爲ArrayBlockingQueue的有界性,所以其可以更好的對於性能進行預測,而LinkedBlockingQueue因爲沒有限制大小,當任務很是多的時候,不停地向隊列中存儲,就有可能致使內存溢出的狀況發生。
其次,ArrayBlockingQueue中在入隊列和出隊列操做過程當中,使用的是同一個lock,因此即便在多核CPU的狀況下,其讀取和操做的都沒法作到並行,而LinkedBlockingQueue的讀取和插入操做所使用的鎖是兩個不一樣的lock,它們之間的操做互相不受干擾,所以兩種操做能夠並行完成,故LinkedBlockingQueue的吞吐量要高於ArrayBlockingQueue。
(LinkedBlockingQueue源碼分析)http://www.jianshu.com/p/cc2281b1a6bc
(ArrayBlockingQueue源碼分析)http://www.jianshu.com/p/9a652250e0d1
(源碼分析-LinkedBlockingQueue) http://blog.csdn.net/u011518120/article/details/53886256
(阻塞隊列LinkedBlockingQueue源碼分析)http://blog.csdn.net/levena/article/details/78322573
《Java併發編程的藝術》