在上一篇文章《從0到1實現本身的阻塞隊列(上)》中,咱們已經實現了一個可使用的阻塞隊列版本。在這篇文章中,咱們能夠繼續咱們的冒險之旅,將咱們的阻塞隊列提高到接近JDK版本的水平上。java
咱們一直使用的都是Object.notifyAll()
或者condition.signalAll()
這樣會喚醒全部線程的方法,那麼若是隻有一個線程可以順利執行,可是其餘線程都要再次回到等待狀態繼續休眠,那不是很是的浪費嗎?好比若是有N個消費者線程在等待隊列中出現元素,那麼當一個元素被插入之後全部N個消費者線程都被所有喚醒,最後也只會有一個消費者線程可以真正拿到元素並執行完成,其餘線程不是都被白白喚醒了嗎?咱們爲何不用只會喚醒一個線程的Object.notify()
和condition.signal()
方法呢?編程
在阻塞隊列中,咱們可使用Object.notify()
或者condition.signal()
這樣只喚醒一個線程的方法,可是會有一些前提條件:數組
因此,咱們只須要知足第一個要求讓不一樣類型的線程在不一樣的條件變量上等待就能夠了。那麼具體要怎麼作呢?安全
首先,咱們天然是要把原來的一個條件變量condition
給拆分紅兩個實例變量notFull
和notEmpty
,這兩個條件變量雖然對應於同一互斥鎖,可是兩個條件變量的等待和喚醒操做是徹底隔離的。這兩個條件變量分別表明隊列未滿和隊列非空兩個條件,消費者線程由於是被隊列爲空的狀況所阻塞的,因此就應該等待隊列非空條件獲得知足;而生產者線程由於是被隊列已滿的狀況所阻塞的,天然就要等待隊列未滿條件的成立。bash
/** 隊列未滿的條件變量 */
private final Condition notFull = lock.newCondition();
/** 隊列非空的條件變量 */
private final Condition notEmpty = lock.newCondition();
複製代碼
因此在put()
和take()
方法中,咱們就須要把take()
方法中原來的condition.await()
修改成等待隊列非空條件,即notEmpty.await()
;而put()
方法中的condition.await()
天然是要修改成等待隊列未滿條件成立,即notFull.await()
。既然咱們把等待條件變量的語句都改了,那麼喚醒的語句也要作一樣的修改,put()
操做要喚醒等待的消費者線程,因此是notEmpty.signal()
;take()
操做要喚醒的生產者線程,因此是notFull.signal()
。修改完成後的代碼以下,你們能夠參考一下:多線程
/**
* 將指定元素插入隊列
*
* @param e 待插入的對象
*/
public void put(Object e) throws InterruptedException {
lock.lockInterruptibly();
try {
while (count == items.length) {
// 隊列已滿時進入休眠
// 等待隊列未滿條件獲得知足
notFull.await();
}
// 執行入隊操做,將對象e實際放入隊列中
enqueue(e);
// 插入元素後喚醒一個等待隊列非空條件成立的線程
notEmpty.signal();
} finally {
lock.unlock();
}
}
/**
* 從隊列中彈出一個元素
*
* @return 被彈出的元素
*/
public Object take() throws InterruptedException {
lock.lockInterruptibly();
try {
while (count == 0) {
// 隊列爲空時進入休眠
// 等待隊列非空條件獲得知足
notEmpty.await();
}
// 執行出隊操做,將隊列中的第一個元素彈出
Object e = dequeue();
// 彈出元素後喚醒一個等待隊列未滿條件成立的線程
notFull.signal();
return e;
} finally {
lock.unlock();
}
}
複製代碼
既然咱們對阻塞隊列作了效率上的改進,那麼就讓咱們來實際檢驗一下吧。咱們仍是以前已經提供的檢驗程序,可是不一樣的是,爲了明顯地看出效率上的變化,咱們須要修改一下程序中的兩個變量。首先,咱們須要把檢驗程序中運行的線程數threads
增長到400,而後咱們須要把每一個線程執行的次數改成100次,就像下面這樣:併發
// 建立400個線程
final int threads = 400;
// 每一個線程執行100次
final int times = 100;
複製代碼
最後咱們分別使用改進前和改進後的版原本執行這個這個阻塞隊列,在個人電腦上,改進前的版本耗時爲7.80s,改進後的版本耗時爲1.35s。看起來咱們對阻塞隊列的效率作了一個很是大的提高,很是好,那咱們還有沒有辦法再加快一點呢?工具
在上面的阻塞隊列實現中,咱們主要使用的就是put()
和take()
兩個操做。而由於有互斥鎖ReentrantLock
的保護,因此這兩個方法在同一時間只能有一個線程調用。也就是說,生產者線程在操做隊列時一樣會阻塞消費者線程。不過從咱們的代碼中看,實際上put()
方法和take()
方法之間須要有互斥鎖保護的共享數據訪問只發生在入隊操做enqueue
方法和出隊操做dequeue
方法之中。在這兩個方法裏,對於putIndex
和takeIndex
的訪問是徹底隔離的,enqueue
只使用putIndex
,而dequeue
只使用takeIndex
,那麼線程間的競爭性數據就只剩下count了。這樣的話,若是咱們能解決count的更新問題是否是就能夠把鎖lock
拆分爲兩個互斥鎖,分別讓生產者線程和消費者線程使用了呢?這樣的話生產者線程在操做時就只會阻塞生產者線程而不會阻塞消費者線程了,消費者線程也是同樣的道理。post
這時候就要請出咱們很熟悉的一種同步工具CAS
了,CAS
是一個原子操做,它會接收兩個參數,一個是當前值,一個是目標值,若是當前值已經發生了改變,那麼就會返回失敗,而若是當前值沒有變化,就會將這個變量修改成目標值。在Java中,咱們通常會經過java.util.concurrent
中的AtomicInteger
來執行CAS操做。在AtomicInteger
類上有原子性的增長與減小方法,每次調用均可以保證對指定的對象進行增長或減小,而且即便有多個線程同時執行這些操做,它們的結果也仍然是正確的。優化
首先,爲了保證入隊和出隊操做之間的互斥特性移除後兩個方法可以併發執行,那麼咱們就要保證對count的更新是線程安全的。所以,咱們首先須要把實例變量count
的類型從int
修改成AtomicInteger
,而AtomicInteger
類就提供了咱們須要的原子性的增長與減小接口。
/** 隊列中的元素總數 */
private AtomicInteger count = new AtomicInteger(0);
複製代碼
而後對應地,咱們須要將入隊方法中的count++
和出隊方法中的count--
分別改成Atomic
原子性的加1方法getAndIncrement
與減1方法getAndDecrement
。
/**
* 入隊操做
*
* @param e 待插入的對象
*/
private void enqueue(Object e) {
// 將對象e放入putIndex指向的位置
items[putIndex] = e;
// putIndex向後移一位,若是已到末尾則返回隊列開頭(位置0)
if (++putIndex == items.length)
putIndex = 0;
// 增長元素總數
count.getAndIncrement();
}
/**
* 出隊操做
*
* @return 被彈出的元素
*/
private Object dequeue() {
// 取出takeIndex指向位置中的元素
// 並將該位置清空
Object e = items[takeIndex];
items[takeIndex] = null;
// takeIndex向後移一位,若是已到末尾則返回隊列開頭(位置0)
if (++takeIndex == items.length)
takeIndex = 0;
// 減小元素總數
count.getAndDecrement();
// 返回以前代碼中取出的元素e
return e;
}
複製代碼
到這裏,咱們就已經解決了put()
和take()
方法之間的數據競爭問題,兩個方法如今就能夠分別用兩個鎖來控制了。雖然相同類型的線程仍然是互斥的,例如生產者和生產者之間同一時間只能有一個生產者線程在操做隊列。可是在生產者線程和消費者線程之間將不用再繼續互斥,一個生產者線程和一個消費者線程能夠在同一時間操做同一阻塞隊列了。因此,咱們在這裏能夠將互斥鎖lock
拆爲兩個,分別保證生產者線程和消費者線程的互斥性,咱們將它們命名爲插入鎖putLock
和彈出鎖takeLock
。同時,原來的條件變量也要分別對應於不一樣的互斥鎖了,notFull
要對應於putLock
,由於插入元素的生產者線程須要等待隊列未滿條件,那麼notEmpyt
天然就要對應於takeLock
了。
/** 插入鎖 */
private final ReentrantLock putLock = new ReentrantLock();
/** 隊列未滿的條件變量 */
private final Condition notFull = putLock.newCondition();
/** 彈出鎖 */
private final ReentrantLock takeLock = new ReentrantLock();
/** 隊列非空的條件變量 */
private final Condition notEmpty = takeLock.newCondition();
複製代碼
最後咱們要對put()
和take()
方法中的signal()
調用作出一些調整。由於在上文中提到的,在使用條件變量時必定要先持有條件變量所對應的互斥鎖,而在put()
和take()
方法中,使用signal()
方法喚醒的都是另外一種類型的線程,例如生產者線程喚醒消費者,消費者線程喚醒生產者。這樣咱們調用signal()
方法的條件變量就和try語句中持有的鎖不一致了,因此咱們必須將直接的xxx.signal()
調用替換爲一個私有方法調用。而在私有方法中,咱們會先獲取與條件變量對應的鎖,而後再調用條件變量的signal()
方法。好比在下面的signalNotEmpty()
方法中,咱們就要先獲取takeLock
才能調用notEmpty.signal()
;而在signalNotFull()
方法中,咱們就要先獲取putLock
才能調用notFull.signal()
。
/**
* 喚醒等待隊列非空條件的線程
*/
private void signalNotEmpty() {
// 爲了喚醒等待隊列非空條件的線程,須要先獲取對應的takeLock
takeLock.lock();
try {
// 喚醒一個等待非空條件的線程
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
/**
* 喚醒等待隊列未滿條件的線程
*/
private void signalNotFull() {
// 爲了喚醒等待隊列未滿條件的線程,須要先獲取對應的putLock
putLock.lock();
try {
// 喚醒一個等待隊列未滿條件的線程
notFull.signal();
} finally {
putLock.unlock();
}
}
複製代碼
但直接把notFull.signal()
換成signalNotFull()
,把notEmpty.signal()
換成signalNotEmpty()
還不夠,由於在咱們的代碼中,原來的notFull.signal()
和notEmpty.signal()
都是在持有鎖的try語句塊當中的。一旦咱們作了調用私有方法的替換,那麼put()
和take()
方法就會以相反的順序同時獲取putLock
和takeLock
兩個鎖。有一些讀者可能已經意識到這樣會產生死鎖問題了,那麼咱們應該怎麼解決它呢?
最好的方法就是不要同時加兩個鎖,咱們徹底能夠在釋放前一個以後再使用signal()
方法來喚醒另外一種類型的線程。就像下面的put()
與take()
方法中所作的同樣,咱們能夠在執行完入隊操做以後就釋放插入鎖putLock
,而後才運行signalNotEmpty()
方法去獲取takeLock
並調用與其對應的條件變量notEmpty
的signal()
方法,在take()
方法中也是同樣的道理。
/**
* 將指定元素插入隊列
*
* @param e 待插入的對象
*/
public void put(Object e) throws InterruptedException {
putLock.lockInterruptibly();
try {
while (count.get() == items.length) {
// 隊列已滿時進入休眠
// 等待隊列未滿條件獲得知足
notFull.await();
}
// 執行入隊操做,將對象e實際放入隊列中
enqueue(e);
} finally {
putLock.unlock();
}
// 喚醒等待隊列非空條件的線程
// 爲了防止死鎖,不能在釋放putLock以前獲取takeLock
signalNotEmpty();
}
/**
* 從隊列中彈出一個元素
*
* @return 被彈出的元素
*/
public Object take() throws InterruptedException {
Object e;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
// 隊列爲空時進入休眠
// 等待隊列非空條件獲得知足
notEmpty.await();
}
// 執行出隊操做,將隊列中的第一個元素彈出
e = dequeue();
} finally {
takeLock.unlock();
}
// 喚醒等待隊列未滿條件的線程
// 爲了防止死鎖,不能在釋放takeLock以前獲取putLock
signalNotFull();
return e;
}
複製代碼
到了這裏咱們就順利地把原來單一的一個lock
鎖拆分爲了插入鎖putLock
和takeLock
,這樣生產者線程和消費者線程就能夠同時運行了。
啊?咱們的阻塞隊列到了這裏還能再繼續優化嗎?其實咱們作的優化已經足夠多了,基本上影響比較大的優化咱們都作了,可是還有一些細節是能夠最後完善一下的。好比說若是隊列並無爲空或者已滿時,咱們插入或者彈出了元素其實都是不須要喚醒任何線程的,多餘的喚醒操做須要先獲取ReentrantLock
鎖才能調用對應的條件變量的signal()
方法,而獲取鎖是一個成本比較大的操做。因此咱們最好是能在隊列真的爲空或者已滿之後,成功插入或彈出元素時,再去獲取鎖並喚醒等待的線程。
也就是說咱們會將signalNotEmpty();
修改成if (c == 0) signalNotEmpty();
,而把signalNotFull();
修改成if (c == items.length) signalNotFull();
,也就是隻有在必要的時候纔去喚醒另外一種類型的線程。可是這種修改又會引入另一種問題,例若有N個消費者線程在等待隊列非空,這時有兩個生產者線程插入了兩個元素,可是這兩個插入操做是連續發生的,也就是說只有第一個生產者線程在插入元素以後調用了signalNotEmpty()
,第二個線程看到隊列本來是非空的就不會調用喚醒方法。在這種狀況下,實際就只有一個消費者線程被喚醒了,而實際上隊列中還有一個元素可供消費。那麼咱們如何解決這個問題呢?
比較簡單的一種方法就是,生產者線程和消費者線程不止會喚醒另外一種類型的線程,並且也會喚醒同類型的線程。好比在生產者線程中若是插入元素以後發現隊列還未滿,那麼就能夠調用notFull.signal()
方法來喚醒其餘可能存在的等待狀態的生產者線程,對於消費者線程所使用的take()
方法也是相似的處理方式。相對來講signal方法較低,而互斥鎖的lock方法成本較高,並且會影響到另外一種類型線程的運行。因此經過這種方式儘量地少調用signalNotEmpty()
和signalNotFull()
方法會是一種還不錯的優化手段。
優化後的put()
和take()
方法以下:
/**
* 將指定元素插入隊列
*
* @param e 待插入的對象
*/
public void put(Object e) throws InterruptedException {
int c = -1;
putLock.lockInterruptibly();
try {
while (count.get() == items.length) {
// 隊列已滿時進入休眠
// 等待隊列未滿條件獲得知足
notFull.await();
}
// 執行入隊操做,將對象e實際放入隊列中
enqueue(e);
// 增長元素總數
c = count.getAndIncrement();
// 若是在插入後隊列仍然沒滿,則喚醒其餘等待插入的線程
if (c + 1 < items.length)
notFull.signal();
} finally {
putLock.unlock();
}
// 若是插入以前隊列爲空,才喚醒等待彈出元素的線程
// 爲了防止死鎖,不能在釋放putLock以前獲取takeLock
if (c == 0)
signalNotEmpty();
}
/**
* 從隊列中彈出一個元素
*
* @return 被彈出的元素
*/
public Object take() throws InterruptedException {
Object e;
int c = -1;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
// 隊列爲空時進入休眠
// 等待隊列非空條件獲得知足
notEmpty.await();
}
// 執行出隊操做,將隊列中的第一個元素彈出
e = dequeue();
// 減小元素總數
c = count.getAndDecrement();
// 若是隊列在彈出一個元素後仍然非空,則喚醒其餘等待隊列非空的線程
if (c - 1 > 0)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 只有在彈出以前隊列已滿的狀況下才喚醒等待插入元素的線程
// 爲了防止死鎖,不能在釋放takeLock以前獲取putLock
if (c == items.length)
signalNotFull();
return e;
}
複製代碼
恭喜你們,通過一番漫長的探索,咱們終於完全完成了咱們的阻塞隊列實現之旅。若是你能堅持到這裏,我相信你已經對多線程編程的實踐方法有了很是深入的理解。最後讓咱們來看一看咱們最終完成的成品代碼——一個完整的阻塞隊列實現吧。
public class BlockingQueue {
/** 存放元素的數組 */
private final Object[] items;
/** 彈出元素的位置 */
private int takeIndex;
/** 插入元素的位置 */
private int putIndex;
/** 隊列中的元素總數 */
private AtomicInteger count = new AtomicInteger(0);
/** 插入鎖 */
private final ReentrantLock putLock = new ReentrantLock();
/** 隊列未滿的條件變量 */
private final Condition notFull = putLock.newCondition();
/** 彈出鎖 */
private final ReentrantLock takeLock = new ReentrantLock();
/** 隊列非空的條件變量 */
private final Condition notEmpty = takeLock.newCondition();
/**
* 指定隊列大小的構造器
*
* @param capacity 隊列大小
*/
public BlockingQueue(int capacity) {
if (capacity <= 0)
throw new IllegalArgumentException();
items = new Object[capacity];
}
/**
* 入隊操做
*
* @param e 待插入的對象
*/
private void enqueue(Object e) {
// 將對象e放入putIndex指向的位置
items[putIndex] = e;
// putIndex向後移一位,若是已到末尾則返回隊列開頭(位置0)
if (++putIndex == items.length)
putIndex = 0;
}
/**
* 出隊操做
*
* @return 被彈出的元素
*/
private Object dequeue() {
// 取出takeIndex指向位置中的元素
// 並將該位置清空
Object e = items[takeIndex];
items[takeIndex] = null;
// takeIndex向後移一位,若是已到末尾則返回隊列開頭(位置0)
if (++takeIndex == items.length)
takeIndex = 0;
// 返回以前代碼中取出的元素e
return e;
}
/**
* 將指定元素插入隊列
*
* @param e 待插入的對象
*/
public void put(Object e) throws InterruptedException {
int c = -1;
putLock.lockInterruptibly();
try {
while (count.get() == items.length) {
// 隊列已滿時進入休眠
// 等待隊列未滿條件獲得知足
notFull.await();
}
// 執行入隊操做,將對象e實際放入隊列中
enqueue(e);
// 增長元素總數
c = count.getAndIncrement();
// 若是在插入後隊列仍然沒滿,則喚醒其餘等待插入的線程
if (c + 1 < items.length)
notFull.signal();
} finally {
putLock.unlock();
}
// 若是插入以前隊列爲空,才喚醒等待彈出元素的線程
// 爲了防止死鎖,不能在釋放putLock以前獲取takeLock
if (c == 0)
signalNotEmpty();
}
/**
* 喚醒等待隊列非空條件的線程
*/
private void signalNotEmpty() {
// 爲了喚醒等待隊列非空條件的線程,須要先獲取對應的takeLock
takeLock.lock();
try {
// 喚醒一個等待非空條件的線程
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
/**
* 從隊列中彈出一個元素
*
* @return 被彈出的元素
*/
public Object take() throws InterruptedException {
Object e;
int c = -1;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
// 隊列爲空時進入休眠
// 等待隊列非空條件獲得知足
notEmpty.await();
}
// 執行出隊操做,將隊列中的第一個元素彈出
e = dequeue();
// 減小元素總數
c = count.getAndDecrement();
// 若是隊列在彈出一個元素後仍然非空,則喚醒其餘等待隊列非空的線程
if (c - 1 > 0)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 只有在彈出以前隊列已滿的狀況下才喚醒等待插入元素的線程
// 爲了防止死鎖,不能在釋放takeLock以前獲取putLock
if (c == items.length)
signalNotFull();
return e;
}
/**
* 喚醒等待隊列未滿條件的線程
*/
private void signalNotFull() {
// 爲了喚醒等待隊列未滿條件的線程,須要先獲取對應的putLock
putLock.lock();
try {
// 喚醒一個等待隊列未滿條件的線程
notFull.signal();
} finally {
putLock.unlock();
}
}
}
複製代碼
有興趣的讀者能夠把咱們完成的這個阻塞隊列類和JDK中的java.util.concurrent.LinkedBlockingQueue
類作一個比較,相信你們能夠發現這兩個類很是的類似,這足以說明咱們費勁千辛萬苦實現的這個阻塞隊列類已經很是接近JDK中的阻塞隊列類的質量了。
恭喜你們終於完整地讀完了這篇文章!在這篇文章中,咱們從一個最簡單的阻塞隊列版本開始,一路解決了各類問題,最終獲得了一個完整、高質量的阻塞隊列實現。咱們一塊兒來回憶一下咱們解決的問題吧。從最簡單的阻塞隊列開始,咱們首先用互斥鎖synchronized
關鍵字解決了併發控制問題,保證了隊列在多線程訪問狀況下的正確性。而後咱們用條件變量Object.wati()
、Object.notifyAll()
解決了休眠喚醒問題,使隊列的效率獲得了飛躍性地提升。爲了保障隊列的安全性,不讓外部代碼能夠訪問到咱們所使用的對象鎖和條件變量,因此咱們使用了顯式鎖ReentrantLock
,並經過鎖對象lock
的newCondition()
方法建立了與其相對應的條件變量對象。最後,咱們對隊列中的條件變量和互斥鎖分別作了拆分,使隊列的效率獲得了進一步的提升。固然,最後咱們還加上了一點對喚醒操做的有條件調用優化,使整個阻塞隊列的實現變得更加完善。