今天來介紹Java併發編程中最受歡迎的同步類——堪稱併發一枝花之BlockingQueue。java
JDK版本:oracle java 1.8.0_102node
繼續閱讀以前,需確保你對鎖和條件隊列的使用方法爛熟於心,特別是條件隊列,不然你可能沒法理解如下源碼的精妙之處,甚至基本的正確性。本篇暫不涉及此部份內容,需讀者自行準備。git
BlockingQueue繼承自Queue,增長了阻塞的入隊、出隊等特性:github
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
void put(E e) throws InterruptedException;
// can extends from Queue. i don't know why overriding here
boolean offer(E e);
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
// extends from Queue
// E poll();
E poll(long timeout, TimeUnit unit) throws InterruptedException;
int remainingCapacity();
boolean remove(Object o);
public boolean contains(Object o);
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}複製代碼
爲了方便講解,我調整了部分方法的順序,還增長了註釋輔助說明。編程
須要關注的是兩對方法:數組
BlockingQueue有不少實現類。根據github的code results排名,最經常使用的是LinkedBlockingQueue(253k)和ArrayBlockingQueue(95k)。LinkedBlockingQueue的性能在大部分狀況下優於ArrayBlockingQueue,本文主要介紹LinkedBlockingQueue,文末會簡要說起兩者的對比。安全
兩個阻塞方法相對簡單,有助於理解LinkedBlockingQueue的核心思想:在隊頭和隊尾各持有一把鎖,入隊和出隊之間不存在競爭。數據結構
前面在Java實現生產者-消費者模型中按部就班的引出了BlockingQueue#put()和BlockingQueue#take()的實現,能夠先去複習一下,瞭解爲何LinkedBlockingQueue要如此設計。如下是更細緻的講解。併發
在隊尾入隊。putLock和notFull配合完成同步。oracle
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}複製代碼
如今觸發一個入隊操做,分狀況討論。
入隊前需獲得鎖putLock。檢查隊列非滿,無需等待條件notFull,直接入隊。入隊後,檢查隊列非滿(精確說是入隊前「將滿」,但不影響理解),隨機通知一個生產者條件notFull知足。最後,檢查入隊前隊列非空,則無需通知條件notEmpty。
注意點:
「單次通知」
,目的是減小無效競爭。但這不會產生「信號劫持」的問題,由於只有生產者在等待該條件。入隊前需獲得鎖putLock。檢查隊列滿,則等待條件notFull。條件notFull可能由出隊成功觸發(必要的),也可能由入隊成功觸發(也是必要的,避免「信號不足」的問題)。條件notFull知足後,入隊。入隊後,假設檢查隊列滿(隊列非滿的狀況同case1),則無需通知條件notFull。最後,檢查入隊前隊列非空,則無需通知條件notEmpty。
注意點:
補充signalNotEmpty()、signalNotFull()的實現:
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}複製代碼
入隊前需獲得鎖putLock。檢查隊列空,則無需等待條件notFull,直接入隊。入隊後,若是隊列非滿,則同case1;若是隊列滿,則同case2。最後,假設檢查入隊前隊列空(隊列非空的狀況同case1),則隨機通知一個消費者條件notEmpty知足。
注意點:
「條件通知」
,是一種減小無效通知的措施。由於若是隊列非空,則出隊操做不會阻塞在條件notEmpty上。另外一方面,雖然已經有生產者完成了入隊,但可能有消費者在生產者釋放鎖putLock後、通知條件notEmpty知足前,使隊列變空;不過這沒有影響,take()方法的while循環可以在線程競爭到鎖以後再次確認。case4是一個特殊狀況,分析方法相似於case1,但可能入隊與出隊之間存在競爭,咱們稍後分析。
在隊頭入隊。takeLock和notEmpty配合完成同步。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}複製代碼
依舊是四種case,put()和take()是對偶的,很容易分析,不贅述。
隊列長度爲1時,到底入隊和出隊之間存在競爭嗎?這取決於LinkedBlockingQueue的底層數據結構。
最簡單的是使用樸素鏈表,能夠本身實現,也可使用JDK提供的非線程安全集合類,如LinkedList等。可是,隊列長度爲1時,樸素鏈表中的head、tail指向同一個節點,從而入隊、出隊更新同一個節點時存在競爭。
樸素鏈表:一個節點保存一個元素,不加任何控制和trick。典型如LinkedList。
增長dummy node可解決該問題(或者叫哨兵節點什麼的)。定義Node(item, next),描述以下:
在新的數據結構中,更新操做發生在dummy和tail上,head僅僅做爲示意存在,跟隨dummy節點更新。隊列長度爲1時,雖然head、tail仍指向同一個節點,但dummy、tail指向不一樣的節點,從而更新dummy和tail時不存在競爭。
源碼中的head即爲dummy
,first即爲head
:
...
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
...
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
...
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
...複製代碼
以put()爲例,count自增必定要晚於enqueue執行,不然take()方法的while循環檢查會失效。
用一個最簡單的場景來分析,只有一個生產者線程T1,一個消費者線程T2。
假設目前隊列長度0,則事件發生順序:
很明顯,在事件1發生後事件4發生前,雖然count>0,但隊列中實際是沒有元素的。所以,事件3 dequeue會執行失敗(預計拋出NullPointerException)。事件4也就不會發生了。
若是先enqueue再count自增,就不會存在該問題。
仍假設目前隊列長度0,則事件發生順序:
換個方法,用狀態機來描述:
狀態S1
狀態S2
狀態S2
狀態S1
狀態S1
不少讀者可能第一次從狀態機的角度來理解併發程序設計,因此猴子選擇先寫出狀態遷移序列,若是能理解上述序列,咱們再進行進一步的抽象。實際的狀態機定義比下面要嚴謹的多,不過這裏的描述已經足夠了。
如今補充定義以下,不考慮入隊和出隊的區別:
狀態S1
狀態S2
LinkedBlockingQueue中的同步機制保證了不會有其餘線程看到狀態S2,即,S1->S2->S1兩個狀態轉換隻能由線程T1連續完成,其餘線程沒法在中間插入狀態轉換。
在猴子的理解中,併發程序設計的本質是狀態機,即維護合法的狀態和狀態轉換。以上是一個極其簡單的場景,用狀態機舉例子就能夠描述;然而,複雜場景須要用狀態機作數學證實,這使得用狀態機描述併發程序設計不太受歡迎(雖然口頭描述也不能算嚴格證實)。不過,理解實現中的各類代碼順序、猛不丁蹦出的trick,這些只是「知其因此然」;經過簡單的例子來掌握其狀態機本質,才能讓咱們瞭解其如何保證線程安全性,本身也能寫出相似的實現,作到「知其然而知其因此然」。後面會繼續用狀態機分析ConcurrentLinkedQueue的源碼,敬請期待。
分析了兩個阻塞方法put()、take()後,非阻塞方法就簡單了。
以offer爲例,poll()同理。假設此時隊列非空。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}複製代碼
入隊前需獲得鎖putLock。檢查隊列非滿(隱含代表「無需等待條件notFull」),直接入隊。入隊後,檢查隊列非滿,隨機通知一個生產者(包括使用put()方法的生產者,下同)條件notFull知足。最後,檢查入隊前隊列非空,則無需通知條件notEmpty。
能夠看到,瞬時版offer()在隊列非滿時的行爲與put()相同。
入隊前需獲得鎖putLock。檢查隊列滿,直接退出try-block。後同case1。
隊列滿時,offer()與put()的區別就顯現出來了。put()經過while循環阻塞,一直等到條件notFull獲得知足;而offer()卻直接返回。
一個小point:
c在申請鎖putLock前被賦值爲-1。接下來,若是入隊成功,會執行
c = count.getAndIncrement();
一句,則釋放鎖後,c的值將大於等於0。因而,這裏直接用c是否大於等於0來判斷是否入隊成功。這種實現犧牲了可讀性,只換來了無足輕重的性能或代碼量的優化。本身在開發時,不要編寫這種代碼。
同上,以offer()爲例。假設此時隊列非空。
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}複製代碼
該方法同put()很像,12-13行判斷nanos超時的狀況(吞掉了timeout參數非法的異常狀況),因此區別只有14行:將阻塞的notFull.await()
換成非阻塞的超時版notFull.awaitNanos(nanos)
。
awaitNanos()的實現有點意思,這裏不表。其實現類中的Javadoc描述很是幹練:「Block until signalled, interrupted, or timed out.」,返回值爲剩餘時間。剩餘時間小於等於參數nanos,表示:
nanos首先被初始化爲timeout;接下來,消費者線程可能阻塞、收到信號屢次,每次收到信號被喚醒,返回的剩餘時間都大於0並小於等於參數nanos,再用剩餘時間做爲下次等待的參數nanos,直到剩餘時間小於等於0。以此實現總時長不超過timeout的超時檢測。
其餘同put()方法。
12-13行判斷nanos參數非法後,直接返回了false。實現有問題,有可能違反接口聲明。
根據Javadoc的返回值聲明,返回值true表示入隊成功,false表示入隊失敗。但若是傳進來的timeout是一個負數,那麼5行初始化的nanos也將是一個負數;進而一進入while循環,就在13行返回了false。然而,這是一種參數非法的狀況,返回false讓人誤覺得參數正常,只是入隊失敗。這違反了接口聲明,而且很是難以發現。
應該在函數頭部就將參數非法的狀況檢查出來,相應拋出IllegalArgumentException。
github上LinkedBlockingQueue和ArrayBlockingQueue的使用頻率都很高。大部分狀況下均可以也建議使用LinkedBlockingQueue,但清楚兩者的異同點,方能對症下藥,在針對不一樣的優化場景選擇最合適的方案。
相同點:
不一樣點
能夠看到,LinkedBlockingQueue總體上是優於ArrayBlockingQueue的。因此,除非某些特殊緣由,不然應優先使用LinkedBlockingQueue。
可能不全,歡迎評論,隨時增改。
沒有。
本文連接:源碼|併發一枝花之BlockingQueue
做者:猴子007
出處:monkeysayhi.github.io
本文基於 知識共享署名-相同方式共享 4.0 國際許可協議發佈,歡迎轉載,演繹或用於商業目的,可是必須保留本文的署名及連接。