生產者/消費者模型相信各位都不陌生,是一種很常見的分佈式資源調度模型。在這個模型中,至少有兩個對象:生產者和消費者。生產者只負責建立資源,消費者只負責使用資源。若是本身實現一個簡單的生產者/消費者模型也很容易,無非就是經過一個隊列來作,可是這種方式有不少隱藏的缺陷:java
因此Java已經提早爲咱們封裝好了接口和實現,接下來咱們就要針對BlockingQueue接口和它的經常使用的實現類LinkedBlockingQueue進行簡要的分析node
BlockingQueue,含義爲阻塞隊列,咱們能夠從類定義看出,其繼承了Queue接口,因此能夠看成隊列來使用:編程
既然叫作阻塞隊列,也就是說這個隊列的操做是以阻塞方式進行的,體如今以下兩個方面:安全
經過這種方式,能夠方便地協調生產者和消費者之間的關係併發
在BlockingQueue中,定義瞭如下6個接口:分佈式
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
int remainingCapacity();
boolean remove(Object o);
public boolean contains(Object o);
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}
複製代碼
這些接口方法按功能能夠分爲三類:post
通常地,咱們也將添加元素叫作put
操做(即便使用的是offer
方法而不是put
方法),移除元素的叫作take
操做this
對於前兩類,能夠按照異常處理方式再次分爲如下幾類:spa
這幾種處理方式我就很少解釋了,字面意義已經很顯然了線程
JDK8提供瞭如下BlockQueue的實現類:
咱們經常使用的基本有如下幾種:
其他的實現感興趣的能夠自行了解,咱們這裏就以LinkedBlockingQueue爲例,介紹一下Java是如何實現阻塞隊列的
除了BlockingQueue提供的接口方法以外,LinkedBlockingQueue還提供了一個方法peek
,用於獲取隊首節點
至此,咱們經常使用的阻塞隊列方法都已說明完畢,這裏用一張表來總結一下[1]:
方法/處理方式 | 拋出異常 | 返回特殊值 | 阻塞 | 超時退出 |
---|---|---|---|---|
插入元素 | add(e) | offer(e) | put(e) | offer(e, timeout, unit) |
移除元素 | remove() | poll() | take() | poll(timeout, unit) |
獲取元素 | element() | peek() | / | / |
其中element
方法和peek
方法功能是相同的
BlockingQueue僅僅定義了接口規範,真正的實現仍是由具體的實現類來完成,咱們暫且略過中間的AbstractQueue,直接來研究LinkedBlockingQueue,其中定義了幾個重要的域對象:
/** 元素個數 */
private final AtomicInteger count = new AtomicInteger();
/** 隊首節點 */
transient Node<E> head;
/** 隊尾節點 */
private transient Node<E> last;
/** take、poll等方法持有的鎖,這裏叫作take鎖或出鎖 */
private final ReentrantLock takeLock = new ReentrantLock();
/** take方法的等待隊列 */
private final Condition notEmpty = takeLock.newCondition();
/** put、offer等方法持有的鎖,這裏叫作put鎖或入鎖 */
private final ReentrantLock putLock = new ReentrantLock();
/** put方法的等待隊列 */
private final Condition notFull = putLock.newCondition();
複製代碼
Node節點就是普通的隊列節點,和LinkedList同樣,咱們主要關注後面的4個域對象,能夠分爲兩類:用於插入元素的,和用於移除元素的。其中每類都有兩個屬性:ReentranLock
和Condition
。其中ReentranLock
是基於AQS[2]實現的一個可重入鎖(不理解可重入概念的能夠看成普通的鎖),Condition
是一個等待/通知模式的具體實現(能夠理解爲一種提供了功能更強大的wait
和notify
的類)
count
屬性天然不用說,head
和last
很顯然是用於維護存儲元素的隊列,相信也不用細說了。阻塞隊列和普通隊列的區分點是在於後面的ReentrantLock
和Condition
類型的四個屬性,關於這四個屬性的意義,在接下來的幾個模塊會進行深刻的分析
不過咱們爲了接下來說解方便,先來簡單介紹一下Condition
這個類。實際上,Condition
是一個接口,具體的實現類是在AQS中。對於本篇文章來講,你只須要清楚3個方法:await()
、signal()
,還有singalAll()
。這三個方法徹底就能夠類比wait()
、notify()
和notifyAll()
,它們之間的區別能夠模糊地理解爲,wait/notify
這些方法管理的是對象鎖和類鎖,它們操控的是等待這些鎖的線程隊列;而await/signal
這些方法管理的是基於AQS的鎖,操控的天然也是AQS中的線程等待隊列
因此這裏的notEmpty
維護了等待take鎖
的線程隊列,notFull
維護了等待put鎖
的線程隊列。從字面意義上也很好理解,notEmpty
表示「隊列還沒空」,因此能夠取元素,同理,notFull
就表示「隊列還沒滿」,能夠往裏插入元素
先來看offer(e)
方法,源碼以下:
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
// 若是容量達到上限會返回false
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
// 獲取put鎖
putLock.lock();
try {
if (count.get() < capacity) {
// 入隊並自增元素個數
enqueue(node);
// 注意,這裏c返回的是增長前的值
c = count.getAndIncrement();
// 若是容量沒到上限,就喚醒一個put操做
if (c + 1 < capacity)
notFull.signal();
}
} finally {
// 解鎖
putLock.unlock();
}
if (c == 0)
// 若是隊列以前爲空,會喚醒一個take操做
signalNotEmpty();
return c >= 0;
}
複製代碼
這個方法大部分操做都很好理解,當添加元素的操做不容許時,offer
方法會給用戶返回false
,相似於非阻塞的通訊方式。offer
方法的線程安全性是經過put鎖
來保證的
這裏有一個頗有意思的地方,咱們看最後判斷若是c == 0
,那麼就會喚醒一個take
操做。可能不少人疑惑這裏爲何要加一條判斷,是這樣的,整個方法中,c
的初值是-1
,修改其值的惟一地方就是c = count.getAndIncrement()
這條語句。也就是說,若是斷定c == 0
,那麼這條語句的返回值就是0
,即在插入元素以前,隊列是空的。因此,若是一開始隊列爲空,當插入第一個元素以後,會馬上喚醒一個take
操做[3]
至此,整個方法流程能夠概括爲:
put鎖
count
值put
操做take
操做趁熱打鐵,咱們接着來看帶有超時機制的offer
方法:
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 可被中斷地獲取put鎖
putLock.lockInterruptibly();
try {
// 重複執行while循環體,直到隊列不滿,或到了超時時間
while (count.get() == capacity) {
// 到了超時時間後就返回false
if (nanos <= 0)
return false;
// 會將當前線程添加到notFull等待隊列中,
// 返回的是剩餘可用的等待時間
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
複製代碼
整個方法大致上和offer(e)
方法相同,不一樣的點有兩處:
putLock.lockInterruptibly()
notFull.awaitNanos(nanos)
操做來將當前線程添加到notFull
等待隊列中(等待put
操做的執行)其他部分和offer(e)
徹底一致,在這裏就不贅述了
add
方法與offer
方法相比,當操做不容許時,會拋出異常而不是返回一個特殊值,以下:
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
複製代碼
單純地就是對offer(e)
作了二次封裝,沒什麼好說的,須要提一點的就是這個方法的實現是在AbstractQueue
中
put(e)
方法當操做不容許時會阻塞線程,咱們來看其是如何實現的:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 以可中斷的形式獲取put鎖
putLock.lockInterruptibly();
try {
// 與offer(e, timeout, unit)相比,採用了無限等待的方式
while (count.get() == capacity) {
// 當執行了移除元素操做後,會經過signal操做來喚醒notFull隊列中的一個線程
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
複製代碼
果真方法之間都是大同小異的,put(e)
操做能夠類比咱們以前講的offer(e, timeout, unit)
,只有一個不一樣的地方,就是當隊列滿時,await
操做再也不有超時時間,也就是說,只能等待take
操做[4]來調用signal
方法喚醒該線程
poll()
方法用於移除並返回隊首節點,下面是方法的具體實現:
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
// 獲取take鎖
takeLock.lock();
try {
if (count.get() > 0) {
// 出隊,並自減
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
// 只要隊列還有元素,就喚醒一個take操做
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
// 若是在隊列滿的狀況下移除一個元素,會喚醒一個put操做
if (c == capacity)
signalNotFull();
return x;
}
複製代碼
若是你認真看了offer(e)
方法以後,poll()
方法就沒什麼好講的了,徹底就是offer(e)
的翻版(我也想講點東西,可是poll()
方法徹底和offer(e)
流程如出一轍...)
poll(timeout, unit)/take()/remove()
方法分別是offer(e, timeout, unit)/put()/add()
方法的翻版,沒有什麼特殊的地方,這裏就一筆略過了
peek()
方法是用於獲取隊首元素,其實現以下:
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
// 獲取take鎖
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
複製代碼
流程沒什麼好說的,須要注意的是該方法須要獲取take鎖
,也就是說在peek()
方法執行時,是不能執行移除元素的操做的
element()
方法的實現是在AbstractQueue
中:
public E element() {
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
複製代碼
仍是一樣的二次封裝操做
原本說的是BlockingQueue
,結果說了半天LinkedBlockingQueue
。不過做爲阻塞隊列的一種經典實現,LinkedBlockingQueue
中的方法實現思路也是對於理解阻塞隊列來講也是很重要的。想要理解阻塞隊列的理念,最重要的就是理解鎖的概念,好比LinkedBlockingQueue
經過生產者鎖/put鎖
和消費者鎖/take鎖
,以及鎖對應的Condition
對象來實現線程安全。理解了這一點,才能理解整個生產者/消費者模型
這裏參考了《Java併發編程的藝術》 ↩︎
參見淺談AQS(抽象隊列同步器)一文 ↩︎
這裏描述爲「喚醒一個take
操做」有些不許確,實際應描述爲「喚醒一個等待take鎖
的線程」,不過我認爲前者更有助於讀者理解,因此沿用前者的描述方式 ↩︎
指的是和take
功能相似的一組方法,包含有take/poll/remove
,put
操做同理 ↩︎