**本文正在參加「Java主題月 - Java Debug筆記活動」,詳情查看 活動連接 **java
隨便搜了一下,全是「深刻剖析阻塞隊列」、「架構師帶你手寫阻塞隊列」、「阻塞隊列居然有8種」這一類的文章。恕我直言,關注點偏了,你越關注阻塞隊列自己,越學很差阻塞隊列。數組
提到阻塞隊列,你們腦海中就會冒出:markdown
但JDK阻塞隊列自己是很是簡單的,難的是阻塞隊列內部的AQS。多線程
若是你以前對阻塞隊列一無所知又剛好想要學習,但願能耐心看完下面的內容。仍是那句話,學習阻塞隊列的重點不是阻塞隊列自己...我本身均可以手寫阻塞隊列。架構
爲了打破你們對阻塞隊列「難」、「晦澀」、「神祕」的印象,我會重新的角度切入,重構你們對阻塞隊列的認識。分佈式
主要內容:ide
定義:post
針對同一個資源的操做有不一樣種類的線程。學習
說人話就是:共享資源+多線程,最典型的例子就是鎖和生產者消費者(本文以生產者-消費者爲例子講解)。測試
以現實生活爲例。消費者和生產者就像兩個線程,本來作着各自的事情,廠家管本身生產,消費者管本身買,通常狀況下彼此互不影響。
但當物資到達某個臨界點時,就須要根據供需關係適看成出調整。
當廠家作了一大堆東西,產能過剩時,應該暫停生產,擴大宣傳,讓消費者過來消費。
當消費者發現某個熱銷商品售罄,應該提醒廠家儘快生產。
在上面的案例中,生產者和消費者是不一樣種類的線程,一個負責存入,另外一個負責取出,且它們操做的是同一個資源。但最難的部分在於:
你會發現,本來互不打擾的兩個線程之間開始「溝通」了:
這種線程間的相互調度,也就是線程間通訊。
看到這,你內心暗暗想道:我擦,我只會new Thread().start(),怎麼讓A線程去喊B線程工做呢?
仍是以上面的生產者-消費者爲例,有不少種方式能夠實現線程間通訊。
設計理念:生產者和消費者線程各自使用while循環,每隔片刻就去判斷Queue的狀態,隊列爲空時生產者纔可插入數據,隊列不爲空時消費者才能取出數據,不然一概sleep等待。
/** * 輪詢版本 */
public class WhileQueue<T> {
// 容器,用來裝東西
private final LinkedList<T> queue = new LinkedList<>();
public void put(T resource) throws InterruptedException {
while (queue.size() >= 1) {
// 隊列滿了,不能再塞東西了,輪詢等待消費者取出數據
System.out.println("生產者:隊列已滿,沒法插入...");
TimeUnit.MILLISECONDS.sleep(1000);
}
System.out.println("生產者:插入" + resource + "!!!");
queue.addFirst(resource);
}
public void take() throws InterruptedException {
while (queue.size() <= 0) {
// 隊列空了,不能再取東西,輪詢等待生產者插入數據
System.out.println("消費者:隊列爲空,沒法取出...");
TimeUnit.MILLISECONDS.sleep(1000);
}
System.out.println("消費者:取出消息!!!");
queue.removeLast();
TimeUnit.MILLISECONDS.sleep(5000);
}
}
複製代碼
測試
public class Test {
public static void main(String[] args) {
// 隊列
WhileQueue<String> queue = new WhileQueue<>();
// 生產者
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 100; i++) {
try {
queue.put("消息" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
// 消費者
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 100; i++) {
try {
queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
}
複製代碼
因爲設定了隊列最多隻能存1個消息,因此只有當隊列爲空時,生產者才能插入數據。這是最簡單的線程間通訊:
多個線程不斷輪詢共享資源,經過共享資源的狀態判斷本身下一步該作什麼。
看到這,你發現本身被騙了:哦,原來要實現線程間通訊,並不是真的須要A線程直接去叫B線程幹什麼,只要能按實際狀況完成線程切換便可!
但上面的實現方式存在一些缺點:
相對而言,等待喚醒機制則要優雅得多,底層經過維護線程隊列的方式,避免了過多線程同時自旋形成的CPU資源浪費,很有點「用空間換時間」的味道。當一個生產者線程沒法插入數據時,就讓它在隊列裏休眠(阻塞),此時生產者線程會釋放CPU資源,等到消費者搶到CPU執行權並取出數據後,再由消費者喚醒生產者繼續生產。
舉個例子,本來生產者和消費者都要時不時去店裏看一下:
而如今,生產者去店裏看了下,發現還有貨,就管本身去後廚睡覺了,等店裏貨都賣完了,天然會有消費者過來喊他補貨,不須要付出額外的精力在店裏盯着。
Java有多種方式能夠實現等待喚醒機制,最經典的就是wait和notify。
/** * wait/notify版本 */
public class WaitNotifyQueue<T> {
// 容器,用來裝東西
private final LinkedList<T> queue = new LinkedList<>();
public synchronized void put(T resource) throws InterruptedException {
while (queue.size() >= 1) {
// 隊列滿了,不能再塞東西了,輪詢等待消費者取出數據
System.out.println("生產者:隊列已滿,沒法插入...");
this.wait();
}
System.out.println("生產者:插入" + resource + "!!!");
queue.addFirst(resource);
this.notify();
}
public synchronized void take() throws InterruptedException {
while (queue.size() <= 0) {
// 隊列空了,不能再取東西,輪詢等待生產者插入數據
System.out.println("消費者:隊列爲空,沒法取出...");
this.wait();
}
System.out.println("消費者:取出消息!!!");
queue.removeLast();
this.notify();
}
}
複製代碼
對比WhileQueue作了哪些改進:
但通常推薦使用notifyAll(爲何?)。咱們給測試程序再加一個生產者線程就知道了:
開始不久後,整個程序全部線程都阻塞了
緣由是:在synchronized機制下,全部等待的線程都在同一個隊列裏,而notify又恰巧是隨機喚醒線程(也就是說,有可能生產者喚醒生產者)。
最終結果是:全部線程都睡覺了...表如今程序上,就是卡住了。
解決辦法是改用notifyAll,**把全部線程都喚醒,而後你們一塊兒參與執行權的競爭。**你是否有疑問:若是和上面同樣,生產者1仍是喚醒生產者2呢?
其實這個假設不成立...使用notifyAll之後就再也不是隨機喚醒某一個線程了,而是喚醒全部線程並從新搶奪執行權。 也就是說,每個線程在進入阻塞以前,都會叫醒其餘全部線程!
wait/notify版本的缺點是隨機喚醒容易出現「己方喚醒己方,最終致使所有線程阻塞」的烏龍事件,雖然wait/notifyAll能解決這個問題,但喚醒所有線程又不夠精確,會形成無謂的線程競爭(實際只須要喚醒敵方線程便可)。
做爲改進版,可使用ReentrantLock的Condition替代synchronized的wait/notify:
/** * Condition版本 */
public class ConditionQueue<T> {
// 容器,用來裝東西
private final LinkedList<T> queue = new LinkedList<>();
// 顯式鎖(相對地,synchronized鎖被稱爲隱式鎖)
private final ReentrantLock lock = new ReentrantLock();
private final Condition producerCondition = lock.newCondition();
private final Condition consumerCondition = lock.newCondition();
public void put(T resource) throws InterruptedException {
lock.lock();
try {
while (queue.size() >= 1) {
// 隊列滿了,不能再塞東西了,輪詢等待消費者取出數據
System.out.println("生產者:隊列已滿,沒法插入...");
// 生產者阻塞
producerCondition.await();
}
System.out.println("生產者:插入" + resource + "!!!");
queue.addFirst(resource);
// 生產完畢,喚醒消費者
consumerCondition.signal();
} finally {
lock.unlock();
}
}
public void take() throws InterruptedException {
lock.lock();
try {
while (queue.size() <= 0) {
// 隊列空了,不能再取東西,輪詢等待生產者插入數據
System.out.println("消費者:隊列爲空,沒法取出...");
// 消費者阻塞
consumerCondition.await();
}
System.out.println("消費者:取出消息!!!");
queue.removeLast();
// 消費完畢,喚醒生產者
producerCondition.signal();
} finally {
lock.unlock();
}
}
}
複製代碼
如何理解Condition呢?你能夠認爲lock.newCondition()建立了一個隊列,調用producerCondition.await()會把生產者線程放入生產者的等待隊列中,當消費者調用producerCondition.signal()時會喚醒從生產者的等待隊列中喚醒一個生產者線程出來工做。
也就是說,ReentrantLock的Condition經過拆分線程等待隊列,讓線程的等待喚醒更加精確了,想喚醒哪一方就喚醒哪一方。
至此,你們應該對線程間通訊有了大體瞭解。若是你仔細觀察,會發現上面其實都採用了阻塞隊列實現。咱們都是先構造一個Queue,而後生產者和消費者直接操做Queue,至因而否阻塞,由Queue內部判斷。這樣封裝的好處是,將生產者和消費者解耦的同時,不暴露過多細節,使用起來更簡單。
你們應該都聽過JDK的阻塞隊列吧?基於上面的案例,咱們改進一下,抽取出一個自定義的阻塞隊列(使用wait/nofityAll實現):
public class BlockingQueue<T> {
// 模擬隊列
private final LinkedList<T> queue = new LinkedList<>();
private int MAX_SIZE = 1;
private int remainCount = 0;
public BlockingQueue(int capacity) {
if (capacity <= 0) {
throw new IllegalArgumentException("size最小爲1");
}
this.MAX_SIZE = capacity;
}
public synchronized void put(T resource) throws InterruptedException {
while (queue.size() >= MAX_SIZE) {
// 隊列滿了,不能再塞東西了,阻塞生產者
System.out.println("插入阻塞...");
this.wait();
}
queue.addFirst(resource);
remainCount++;
printMsg(resource, "被插入");
this.notifyAll();
}
public synchronized T take() throws InterruptedException {
while (queue.size() <= 0) {
// 隊列空了,不能再取東西了,阻塞消費者
System.out.println("取出阻塞...");
this.wait();
}
T resource = queue.removeLast();
remainCount--;
printMsg(resource, "被取出");
this.notifyAll();
return resource;
}
private void printMsg(T resource, String operation) throws InterruptedException {
System.out.println(resource + operation);
System.out.println("隊列容量:" + remainCount);
}
}
複製代碼
雖然不少人開口閉口「阻塞隊列」,但「阻塞隊列」在他腦中只是個很模糊的概念。連「阻塞隊列」的前因後果都不甚清楚,又怎麼能說了解呢?
實際上,和List、Set同樣,「阻塞隊列」也有本身的一脈。在JDK的util包下有一個Queue接口:
若是你繼續往下扒,就會發現Queue和List其實很像,也是集合的一個分支罷了:
爲何不少人會以爲阻塞隊列(好比ArrayBlockingQueue)高大上,聽起來比ArrayList牛逼呢?主要在於「阻塞」二字!由於你們不瞭解阻塞,本身也不知道怎麼實現阻塞,因此會以爲阻塞隊列很神祕,很牛逼。但仔細觀察上面的繼承關係你會發現,若是ArrayBlockingQueue沒有實現BlockingQueue接口,那麼它本應該是個普普統統的隊列,而不是阻塞隊列,也就沒有那麼驚豔了。
那麼BlockingQueue作了啥呢?其實啥也沒作,畢竟BlockingQueue只是個接口,而接口只能定義方法...就比如一棟摩天大廈建成了,樓頂有個空中泳池,你以爲很牛逼。那麼,你以爲是當初說「我要樓頂有個大花園」的老闆牛逼仍是把這個方案實現的設計師牛逼呢?
扯遠了,其實BlockingQueue繼承Queue接口後,就定義了幾個方法:
BlockingQueue金口一開,後面的小弟只能知足,因此幾個阻塞隊列的實現類都有上面的幾個方法。
那麼阻塞隊列的「阻塞」是怎麼實現的呢?以ArrayBlockingQueue爲例,經過上面的繼承關係分析,Queue和BlockingQueue是接口,裏面只有方法定義沒有具體實現,有可能實現「阻塞」功能的要麼在AbstractQueue,要麼就是ArrayBlockingQueue自身。咱們查看AbstractQueue發現這傢伙幾乎啥都沒寫...
也就是說,當初老闆發話「我但願這個隊列能阻塞」,經理微笑着滿口答應,結果轉手就交給3個小弟本身整了。好在3個小弟爭氣,還真給他們搞出來了...
經常使用的3個阻塞隊列:
仍是以ArrayBlockingQueue爲例,它是怎麼實現阻塞的呢?
好傢伙...居然用了ReentrantLock,這和咱們上面案例中寫的ConditionQueue好像啊!
可是ArrayBlockingQueue只有notFull.await(),沒看到signal(),不合理。仔細找找,惟一的多是ArrayBlockingQueue把signal()藏在enqueue(e)方法裏了:
其餘兩個阻塞隊列LinkedBlockingQueue和SynchronousQueue同理,也是用ReentrantLock實現阻塞的。
看到這裏,相信阻塞隊列在你們心中已經再也不那麼神聖了,有什麼了不得啊,咱們本身也能寫啊,還用了好幾種方式實現呢!可是捫心自問,阻塞隊列總共也就:
而咱們所謂的手寫阻塞隊列,實際上是這樣的:隊列直接用了LinkedList,阻塞也是借用wait/notify和ReentrantLock實現的。也就是說,咱們其實只是作了組裝工做,拿現成的隊列+阻塞功能拼出了一個阻塞隊列。
世間路千萬條,總有人不走尋常路。按理說現成的List+wait/notifyAll已經能夠造出阻塞隊列了,但就是有大佬不知足。
Doug Lea老爺子震驚的說:
What?! Why you don't say earlly ya! I have already finished the AQS le...
是的,又是這個男人,他整出了一個AQS,再把AQS塞到ReentrantLock中,最後用ReentrantLock+數組、ReentrantLock+鏈表、ReentrantLock+Transfer搞出了ArrayBlockingQueue、LinkedBlockingQueue和SynchronousQueue...阻塞隊列只能算順便的,他的初衷實際上是利用AQS統一併簡化鎖的實現,屏蔽同步狀態管理、阻塞線程的排隊和通知、喚醒機制等,讓後續的二次開發更簡便。
換句話說:
若是你糾結於阻塞隊列怎麼實現,那你的格局就過小了...JDK的阻塞隊列依賴於ReentrantLock,而ReentrantLock只是對AQS的淺封裝,真正須要咱們花功夫學習的其實有且只有AQS。
我是bravo1988,點個贊吧,求你了。短短半個月,我從「壯志凌雲,想把知乎3w關注帶到掘金」,轉變爲「垂頭喪氣,想把掘金30關注帶回知乎」。
よろしく・つづく
往期文章: