在Java多線程應用中,隊列的使用率很高,多數生產消費模型的首選數據結構就是隊列。Java提供的線程安全的Queue能夠分爲阻塞隊列和非阻塞隊列,其中阻塞隊列的典型例子是BlockingQueue,非阻塞隊列的典型例子是ConcurrentLinkedQueue,在實際應用中要根據實際須要選用阻塞隊列或者非阻塞隊列。
注:什麼叫線程安全?這個首先要明確。線程安全的類 ,指的是類內共享的全局變量的訪問必須保證是不受多線程形式影響的。若是因爲多線程的訪問(好比修改、遍歷、查看)而使這些變量結構被破壞或者針對這些變量操做的原子性被破壞,則這個類就不是線程安全的。
今天就聊聊這兩種Queue,本文分爲如下兩個部分,用分割線分開:
- BlockingQueue 阻塞算法
- ConcurrentLinkedQueue,非阻塞算法
首先來看看BlockingQueue:
Queue是什麼就不須要多說了吧,一句話:隊列是先進先出。相對的,棧是後進先出。若是不熟悉的話先找本基礎的數據結構的書看看吧。
BlockingQueue,顧名思義,「阻塞隊列」:能夠提供阻塞功能的隊列。
首先,看看BlockingQueue提供的經常使用方法:
|
可能報異常 |
返回布爾值 |
可能阻塞 |
設定等待時間 |
入隊 |
add(e) |
offer(e) |
put(e) |
offer(e, timeout, unit) |
出隊 |
remove() |
poll() |
take() |
poll(timeout, unit) |
查看 |
element() |
peek() |
無 |
無 |
從上表能夠很明顯看出每一個方法的做用,這個不用多說。我想說的是:
- add(e) remove() element() 方法不會阻塞線程。當不知足約束條件時,會拋出IllegalStateException 異常。例如:當隊列被元素填滿後,再調用add(e),則會拋出異常。
- offer(e) poll() peek() 方法即不會阻塞線程,也不會拋出異常。例如:當隊列被元素填滿後,再調用offer(e),則不會插入元素,函數返回false。
- 要想要實現阻塞功能,須要調用put(e) take() 方法。當不知足約束條件時,會阻塞線程。
好,上點源碼你就更明白了。以ArrayBlockingQueue類爲例:
對於第一類方法,很明顯若是操做不成功就拋異常。並且能夠看到其實調用的是第二類的方法,爲何?由於第二類方法返回boolean啊。
Java代碼
- public boolean add(E e) {
- if (offer(e))
- return true;
- else
- throw new IllegalStateException("Queue full");//隊列已滿,拋異常
- }
-
- public E remove() {
- E x = poll();
- if (x != null)
- return x;
- else
- throw new NoSuchElementException();//隊列爲空,拋異常
- }
對於第二類方法,很標準的ReentrantLock使用方式(不熟悉的朋友看一下我上一篇帖子吧http://hellosure.iteye.com/blog/1121157),另外對於insert和extract的實現沒啥好說的。
注:先不看阻塞與否,這ReentrantLock的使用方式就能說明這個類是線程安全類。
Java代碼
- public boolean offer(E e) {
- if (e == null)throw new NullPointerException();
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- if (count == items.length)//隊列已滿,返回false
- return false;
- else {
- insert(e);//insert方法中發出了notEmpty.signal();
- return true;
- }
- } finally {
- lock.unlock();
- }
- }
-
- public E poll() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- if (count == 0)//隊列爲空,返回false
- return null;
- E x = extract();//extract方法中發出了notFull.signal();
- return x;
- } finally {
- lock.unlock();
- }
- }
對於第三類方法,這裏面涉及到Condition類,簡要提一下,
await方法指:形成當前線程在接到信號或被中斷以前一直處於等待狀態。
signal方法指:喚醒一個等待線程。
Java代碼
- public void put(E e)throws InterruptedException {
- if (e == null)throw new NullPointerException();
- final E[] items = this.items;
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- try {
- while (count == items.length)//若是隊列已滿,等待notFull這個條件,這時當前線程被阻塞
- notFull.await();
- } catch (InterruptedException ie) {
- notFull.signal(); //喚醒受notFull阻塞的當前線程
- throw ie;
- }
- insert(e);
- } finally {
- lock.unlock();
- }
- }
-
- public E take() throws InterruptedException {
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- try {
- while (count == 0)//若是隊列爲空,等待notEmpty這個條件,這時當前線程被阻塞
- notEmpty.await();
- } catch (InterruptedException ie) {
- notEmpty.signal();//喚醒受notEmpty阻塞的當前線程
- throw ie;
- }
- E x = extract();
- return x;
- } finally {
- lock.unlock();
- }
- }
-
第四類方法就是指在有必要時等待指定時間,就不詳細說了。
再來看看BlockingQueue接口的具體實現類吧:
- ArrayBlockingQueue,其構造函數必須帶一個int參數來指明其大小
- LinkedBlockingQueue,若其構造函數帶一個規定大小的參數,生成的BlockingQueue有大小限制,若不帶大小參數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定
- PriorityBlockingQueue,其所含對象的排序不是FIFO,而是依據對象的天然排序順序或者是構造函數的Comparator決定的順序
上面是用ArrayBlockingQueue舉得例子,下面看看LinkedBlockingQueue:
首先,既然是鏈表,就應該有Node節點,它是一個內部靜態類:
Java代碼
- static class Node<E> {
- /** The item, volatile to ensure barrier separating write and read */
- volatile E item;
- Node<E> next;
- Node(E x) { item = x; }
- }
而後,對於鏈表來講,確定須要兩個變量來標示頭和尾:
Java代碼
- /** 頭指針 */
- private transient Node<E> head;//head.next是隊列的頭元素
- /** 尾指針 */
- private transient Node<E> last;//last.next是null
那麼,對於入隊和出隊就很天然能理解了:
Java代碼
- private void enqueue(E x) {
- last = last.next = new Node<E>(x);//入隊是爲last再找個下家
- }
-
- private E dequeue() {
- Node<E> first = head.next; //出隊是把head.next取出來,而後將head向後移一位
- head = first;
- E x = first.item;
- first.item = null;
- return x;
- }
另外,LinkedBlockingQueue相對於ArrayBlockingQueue還有不一樣是,有兩個ReentrantLock,且隊列現有元素的大小由一個AtomicInteger對象標示。
注:AtomicInteger類是以原子的方式操做整型變量。
Java代碼
- private final AtomicInteger count =new AtomicInteger(0);
- /** 用於讀取的獨佔鎖*/
- private final ReentrantLock takeLock =new ReentrantLock();
- /** 隊列是否爲空的條件 */
- private final Condition notEmpty = takeLock.newCondition();
- /** 用於寫入的獨佔鎖 */
- private final ReentrantLock putLock =new ReentrantLock();
- /** 隊列是否已滿的條件 */
- private final Condition notFull = putLock.newCondition();
有兩個Condition很好理解,在ArrayBlockingQueue也是這樣作的。可是爲何須要兩個ReentrantLock呢?下面會慢慢道來。
讓咱們來看看offer和poll方法的代碼:
Java代碼
- public boolean offer(E e) {
- if (e == null)throw new NullPointerException();
- final AtomicInteger count = this.count;
- if (count.get() == capacity)
- return false;
- int c = -1;
- final ReentrantLock putLock =this.putLock;//入隊固然用putLock
- putLock.lock();
- try {
- if (count.get() < capacity) {
- enqueue(e); //入隊
- c = count.getAndIncrement(); //隊長度+1
- if (c + 1 < capacity)
- notFull.signal(); //隊列沒滿,固然能夠解鎖了
- }
- } finally {
- putLock.unlock();
- }
- if (c == 0)
- signalNotEmpty();//這個方法裏發出了notEmpty.signal();
- return c >= 0;
- }
-
- public E poll() {
- final AtomicInteger count = this.count;
- if (count.get() == 0)
- return null;
- E x = null;
- int c = -1;
- final ReentrantLock takeLock =this.takeLock;出隊固然用takeLock
- takeLock.lock();
- try {
- if (count.get() > 0) {
- x = dequeue();//出隊
- c = count.getAndDecrement();//隊長度-1
- if (c > 1)
- notEmpty.signal();//隊列沒空,解鎖
- }
- } finally {
- takeLock.unlock();
- }
- if (c == capacity)
- signalNotFull();//這個方法裏發出了notFull.signal();
- return x;
- }
看看源代碼發現和上面ArrayBlockingQueue的很相似,關鍵的問題在於:爲何要用兩個ReentrantLockputLock和takeLock?
咱們仔細想一下,入隊操做其實操做的只有隊尾引用last,而且沒有牽涉到head。而出隊操做其實只針對head,和last沒有關係。那麼就是說入隊和出隊的操做徹底不須要公用一把鎖,因此就設計了兩個鎖,這樣就實現了多個不一樣任務的線程入隊的同時能夠進行出隊的操做,另外一方面因爲兩個操做所共同使用的count是AtomicInteger類型的,因此徹底不用考慮計數器遞增遞減的問題。
另外,還有一點須要說明一下:await()和singal()這兩個方法執行時都會檢查當前線程是不是獨佔鎖的當前線程,若是不是則拋出java.lang.IllegalMonitorStateException異常。因此能夠看到在源碼中這兩個方法都出如今Lock的保護塊中。
-------------------------------我是分割線--------------------------------------
下面再來講說ConcurrentLinkedQueue,它是一個無鎖的併發線程安全的隊列。
如下部分的內容參照了這個帖子http://yanxuxin.iteye.com/blog/586943
對比鎖機制的實現,使用無鎖機制的難點在於要充分考慮線程間的協調。簡單的說就是多個線程對內部數據結構進行訪問時,若是其中一個線程執行的中途由於一些緣由出現故障,其餘的線程可以檢測並幫助完成剩下的操做。這就須要把對數據結構的操做過程精細的劃分紅多個狀態或階段,考慮每一個階段或狀態多線程訪問會出現的狀況。
ConcurrentLinkedQueue有兩個volatile的線程共享變量:head,tail。要保證這個隊列的線程安全就是保證對這兩個Node的引用的訪問(更新,查看)的原子性和可見性,因爲volatile自己可以保證可見性,因此就是對其修改的原子性要被保證。
下面經過offer方法的實現來看看在無鎖狀況下如何保證原子性:
Java代碼
- public boolean offer(E e) {
- if (e == null)throw new NullPointerException();
- Node<E> n = new Node<E>(e, null);
- for (;;) {
- Node<E> t = tail;
- Node<E> s = t.getNext();
- if (t == tail) { //------------------------------a
- if (s == null) {//---------------------------b
- if (t.casNext(s, n)) { //-------------------c
- casTail(t, n); //------------------------d
- return true;
- }
- } else {
- casTail(t, s); //----------------------------e
- }
- }
- }
- }
此方法的循環內首先得到尾指針和其next指向的對象,因爲tail和Node的next均是volatile的,因此保證了得到的分別都是最新的值。
代碼a:t==tail是最上層的協調,若是其餘線程改變了tail的引用,則說明如今得到不是最新的尾指針須要從新循環得到最新的值。
代碼b:s==null的判斷。靜止狀態下tail的next必定是指向null的,可是多線程下的另外一個狀態就是中間態:tail的指向沒有改變,可是其next已經指向新的結點,即完成tail引用改變前的狀態,這時候s!=null。這裏就是協調的典型應用,直接進入代碼e去協調參與中間態的線程去完成最後的更新,而後從新循環得到新的tail開始本身的新一次的入隊嘗試。另外值得注意的是a,b之間,其餘的線程可能會改變tail的指向,使得協調的操做失敗。從這個步驟能夠看到無鎖實現的複雜性。
代碼c:t.casNext(s, n)是入隊的第一步,由於入隊須要兩步:更新Node的next,改變tail的指向。代碼c以前可能發生tail引用指向的改變或者進入更新的中間態,這兩種狀況均會使得t指向的元素的next屬性被原子的改變,再也不指向null。這時代碼c操做失敗,從新進入循環。
代碼d:這是完成更新的最後一步了,就是更新tail的指向,最有意思的協調在這兒又有了體現。從代碼看casTail(t, n)不論是否成功都會接着返回true標誌着更新的成功。首先若是成功則代表本線程完成了兩步的更新,返回true是理所固然的;若是 casTail(t, n)不成功呢?要清楚的是完成代碼c則表明着更新進入了中間態,代碼d不成功則是tail的指向被其餘線程改變。意味着對於其餘的線程而言:它們獲得的是中間態的更新,s!=null,進入代碼e幫助本線程執行最後一步而且先於本線程成功。這樣本線程雖然代碼d失敗了,可是是因爲別的線程的協助先完成了,因此返回true也就理所固然了。
經過分析這個入隊的操做,能夠清晰的看到無鎖實現的每一個步驟和狀態下多線程之間的協調和工做。
注:上面這大段文字看起來很累,先能看懂多少看懂多少,如今看不懂先不急,下面還會提到這個算法,而且用示意圖說明,就易懂不少了。
在使用ConcurrentLinkedQueue時要注意,若是直接使用它提供的函數,好比add或者poll方法,這樣咱們本身不須要作任何同步。
但若是是非原子操做,好比:
Java代碼
- if(!queue.isEmpty()) {
- queue.poll(obj);
- }
咱們很難保證,在調用了isEmpty()以後,poll()以前,這個queue沒有被其餘線程修改。因此對於這種狀況,咱們仍是須要本身同步:
Java代碼
- synchronized(queue) {
- if(!queue.isEmpty()) {
- queue.poll(obj);
- }
- }
注:這種須要進行本身同步的狀況要視狀況而定,不是任何狀況下都須要這樣作。
另外還說一下,ConcurrentLinkedQueue的size()是要遍歷一遍集合的,因此儘可能要避免用size而改用isEmpty(),以避免性能過慢。
好,最後想說點什麼呢,阻塞算法其實很好理解,簡單點理解就是加鎖,好比在BlockingQueue中看到的那樣,再往前推點,那就是synchronized。相比而言,非阻塞算法的設計和實現都很困難,要經過低級的原子性來支持併發。下面就簡要的介紹一下非阻塞算法,如下部分的內容參照了一篇很經典的文章http://www.ibm.com/developerworks/cn/java/j-jtp04186/
注:我以爲能夠這樣理解,阻塞對應同步,非阻塞對應併發。也能夠說:同步是阻塞模式,異步是非阻塞模式
舉個例子來講明什麼是非阻塞算法:非阻塞的計數器
首先,使用同步的線程安全的計數器代碼以下
Java代碼
- public finalclass Counter {
- private long value =0;
- public synchronizedlong getValue() {
- return value;
- }
- public synchronizedlong increment() {
- return ++value;
- }
- }
下面的代碼顯示了一種最簡單的非阻塞算法:使用 AtomicInteger的compareAndSet()(CAS方法)的計數器。compareAndSet()方法規定「將這個變量更新爲新值,可是若是從我上次看到這個變量以後其餘線程修改了它的值,那麼更新就失敗」
Java代碼
- public class NonblockingCounter {
- private AtomicInteger value;//前面提到過,AtomicInteger類是以原子的方式操做整型變量。
- public int getValue() {
- return value.get();
- }
- public int increment() {
- int v;
- do {
- v = value.get();
- while (!value.compareAndSet(v, v +1));
- return v + 1;
- }
- }
非阻塞版本相對於基於鎖的版本有幾個性能優點。首先,它用硬件的原生形態代替 JVM 的鎖定代碼路徑,從而在更細的粒度層次上(獨立的內存位置)進行同步,失敗的線程也能夠當即重試,而不會被掛起後從新調度。更細的粒度下降了爭用的機會,不用從新調度就能重試的能力也下降了爭用的成本。即便有少許失敗的 CAS 操做,這種方法仍然會比因爲鎖爭用形成的從新調度快得多。
NonblockingCounter 這個示例可能簡單了些,可是它演示了全部非阻塞算法的一個基本特徵——有些算法步驟的執行是要冒險的,由於知道若是 CAS 不成功可能不得不重作。非阻塞算法一般叫做樂觀算法,由於它們繼續操做的假設是不會有干擾。若是發現干擾,就會回退並重試。在計數器的示例中,冒險的步驟是遞增——它檢索舊值並在舊值上加一,但願在計算更新期間值不會變化。若是它的但願落空,就會再次檢索值,並重作遞增計算。
再來一個例子,Michael-Scott 非阻塞隊列算法的插入操做,ConcurrentLinkedQueue 就是用這個算法實現的,如今來結合示意圖分析一下,很明朗:
Java代碼
- public class LinkedQueue <E> {
- private staticclass Node <E> {
- final E item;
- final AtomicReference<Node<E>> next;
- Node(E item, Node<E> next) {
- this.item = item;
- this.next = new AtomicReference<Node<E>>(next);
- }
- }
- private AtomicReference<Node<E>> head
- = new AtomicReference<Node<E>>(new Node<E>(null,null));
- private AtomicReference<Node<E>> tail = head;
- public boolean put(E item) {
- Node<E> newNode = new Node<E>(item,null);
- while (true) {
- Node<E> curTail = tail.get();
- Node<E> residue = curTail.next.get();
- if (curTail == tail.get()) {
- if (residue == null)/* A */ {
- if (curTail.next.compareAndSet(null, newNode))/* C */ {
- tail.compareAndSet(curTail, newNode) /* D */ ;
- return true;
- }
- } else {
- tail.compareAndSet(curTail, residue) /* B */;
- }
- }
- }
- }
- }
看看這代碼徹底就是ConcurrentLinkedQueue 源碼啊。
插入一個元素涉及頭指針和尾指針兩個指針更新,這兩個更新都是經過 CAS 進行的:從隊列當前的最後節點(C)連接到新節點,並把尾指針移動到新的最後一個節點(D)。若是第一步失敗,那麼隊列的狀態不變,插入線程會繼續重試,直到成功。一旦操做成功,插入被當成生效,其餘線程就能夠看到修改。還須要把尾指針移動到新節點的位置上,可是這項工做能夠當作是 「清理工做」,由於任何處在這種狀況下的線程均可以判斷出是否須要這種清理,也知道如何進行清理。
隊列老是處於兩種狀態之一:正常狀態(或稱靜止狀態,圖 1 和 圖 3)或中間狀態(圖 2)。在插入操做以前和第二個 CAS(D)成功以後,隊列處在靜止狀態;在第一個 CAS(C)成功以後,隊列處在中間狀態。在靜止狀態時,尾指針指向的連接節點的 next 字段總爲 null,而在中間狀態時,這個字段爲非 null。任何線程經過比較 tail.next 是否爲 null,就能夠判斷出隊列的狀態,這是讓線程能夠幫助其餘線程 「完成」 操做的關鍵。
上圖顯示的是:有兩個元素,處在靜止狀態的隊列
插入操做在插入新元素(A)以前,先檢查隊列是否處在中間狀態。若是是在中間狀態,那麼確定有其餘線程已經處在元素插入的中途,在步驟(C)和(D)之間。沒必要等候其餘線程完成,當前線程就能夠 「幫助」 它完成操做,把尾指針向前移動(B)。若是有必要,它還會繼續檢查尾指針並向前移動指針,直到隊列處於靜止狀態,這時它就能夠開始本身的插入了。 第一個 CAS(C)可能由於兩個線程競爭訪問隊列當前的最後一個元素而失敗;在這種狀況下,沒有發生修改,失去 CAS 的線程會從新裝入尾指針並再次嘗試。若是第二個 CAS(D)失敗,插入線程不須要重試 —— 由於其餘線程已經在步驟(B)中替它完成了這個操做!
上圖顯示的是:處在插入中間狀態的隊列,在新元素插入以後,尾指針更新以前
上圖顯示的是:在尾指針更新後,隊列從新處在靜止狀態