「阻塞隊列」水太深,你把握不住! | Java Debug 筆記

**本文正在參加「Java主題月 - Java Debug筆記活動」,詳情查看 活動連接 **java

隨便搜了一下,全是「深刻剖析阻塞隊列」、「架構師帶你手寫阻塞隊列」、「阻塞隊列居然有8種」這一類的文章。恕我直言,關注點偏了,你越關注阻塞隊列自己,越學很差阻塞隊列。數組

提到阻塞隊列,你們腦海中就會冒出:markdown

  • BlockingQueue
  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • SynchronousQueue

但JDK阻塞隊列自己是很是簡單的,難的是阻塞隊列內部的AQS。多線程

若是你以前對阻塞隊列一無所知又剛好想要學習,但願能耐心看完下面的內容。仍是那句話,學習阻塞隊列的重點不是阻塞隊列自己...我本身均可以手寫阻塞隊列。架構

爲了打破你們對阻塞隊列「難」、「晦澀」、「神祕」的印象,我會重新的角度切入,重構你們對阻塞隊列的認識。分佈式

主要內容:ide

  • 什麼是線程間通訊
  • 實現線程間通訊
  • 輪詢
  • 等待喚醒機制:wait/notify
  • 等待喚醒機制:condition
  • 山寨版BlockingQueue
  • JDK BlockingQueue簡介
  • 展望AQS

什麼是線程間通訊

定義:post

針對同一個資源的操做有不一樣種類的線程。學習

說人話就是:共享資源+多線程,最典型的例子就是鎖和生產者消費者(本文以生產者-消費者爲例子講解)。測試

以現實生活爲例。消費者和生產者就像兩個線程,本來作着各自的事情,廠家管本身生產,消費者管本身買,通常狀況下彼此互不影響。

圖片.png

但當物資到達某個臨界點時,就須要根據供需關係適看成出調整。

圖片.png

當廠家作了一大堆東西,產能過剩時,應該暫停生產,擴大宣傳,讓消費者過來消費。

圖片.png

當消費者發現某個熱銷商品售罄,應該提醒廠家儘快生產。

在上面的案例中,生產者和消費者是不一樣種類的線程,一個負責存入,另外一個負責取出,且它們操做的是同一個資源。但最難的部分在於:

  • 資源到達上限時,生產者等待,消費者消費
  • 資源達到下限時,生產者生產,消費者等待

你會發現,本來互不打擾的兩個線程之間開始「溝通」了:

  • 生產者:喂,我這邊作的太多了,先休息會兒,你趕忙消費
  • 消費者:喂,貨快沒了,我休息會兒,你趕忙生產

這種線程間的相互調度,也就是線程間通訊。

看到這,你內心暗暗想道:我擦,我只會new Thread().start(),怎麼讓A線程去喊B線程工做呢?

實現線程間通訊

仍是以上面的生產者-消費者爲例,有不少種方式能夠實現線程間通訊。

輪詢

設計理念:生產者和消費者線程各自使用while循環,每隔片刻就去判斷Queue的狀態,隊列爲空時生產者纔可插入數據,隊列不爲空時消費者才能取出數據,不然一概sleep等待。

圖片.png

/** * 輪詢版本 */
public class WhileQueue<T> {
    // 容器,用來裝東西
    private final LinkedList<T> queue = new LinkedList<>();

    public void put(T resource) throws InterruptedException {
        while (queue.size() >= 1) {
            // 隊列滿了,不能再塞東西了,輪詢等待消費者取出數據
            System.out.println("生產者:隊列已滿,沒法插入...");
            TimeUnit.MILLISECONDS.sleep(1000);
        }
        System.out.println("生產者:插入" + resource + "!!!");
        queue.addFirst(resource);
    }

    public void take() throws InterruptedException {
        while (queue.size() <= 0) {
            // 隊列空了,不能再取東西,輪詢等待生產者插入數據
            System.out.println("消費者:隊列爲空,沒法取出...");
            TimeUnit.MILLISECONDS.sleep(1000);
        }
        System.out.println("消費者:取出消息!!!");
        queue.removeLast();
        TimeUnit.MILLISECONDS.sleep(5000);
    }

}
複製代碼

測試

public class Test {
    public static void main(String[] args) {
        // 隊列
        WhileQueue<String> queue = new WhileQueue<>();

        // 生產者
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 100; i++) {
                    try {
                        queue.put("消息" + i);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();

        // 消費者
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 100; i++) {
                    try {
                        queue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }
}
複製代碼

圖片.png

因爲設定了隊列最多隻能存1個消息,因此只有當隊列爲空時,生產者才能插入數據。這是最簡單的線程間通訊:

多個線程不斷輪詢共享資源,經過共享資源的狀態判斷本身下一步該作什麼。

看到這,你發現本身被騙了:哦,原來要實現線程間通訊,並不是真的須要A線程直接去叫B線程幹什麼,只要能按實際狀況完成線程切換便可!

但上面的實現方式存在一些缺點:

  • 輪詢的方式太耗費CPU資源,若是線程過多,好比幾百上千個線程同時在那輪詢,會給CPU帶來較大負擔
  • 沒法保證原子性(代碼裏沒有演示,但理論上確實如此,若是生產者的操做非原子性,消費者很可能獲取到髒數據)

等待喚醒機制:wait/notify

相對而言,等待喚醒機制則要優雅得多,底層經過維護線程隊列的方式,避免了過多線程同時自旋形成的CPU資源浪費,很有點「用空間換時間」的味道。當一個生產者線程沒法插入數據時,就讓它在隊列裏休眠(阻塞),此時生產者線程會釋放CPU資源,等到消費者搶到CPU執行權並取出數據後,再由消費者喚醒生產者繼續生產。

舉個例子,本來生產者和消費者都要時不時去店裏看一下:

  • 生產者:貨賣完了沒有,賣完了我要繼續生產(每分鐘來店裏看一下)
  • 消費者:補貨了沒,補貨了我就能夠買了(每分鐘來店裏看一下)

而如今,生產者去店裏看了下,發現還有貨,就管本身去後廚睡覺了,等店裏貨都賣完了,天然會有消費者過來喊他補貨,不須要付出額外的精力在店裏盯着。

Java有多種方式能夠實現等待喚醒機制,最經典的就是wait和notify。

/** * wait/notify版本 */
public class WaitNotifyQueue<T> {
    // 容器,用來裝東西
    private final LinkedList<T> queue = new LinkedList<>();

    public synchronized void put(T resource) throws InterruptedException {
        while (queue.size() >= 1) {
            // 隊列滿了,不能再塞東西了,輪詢等待消費者取出數據
            System.out.println("生產者:隊列已滿,沒法插入...");
            this.wait();
        }
        System.out.println("生產者:插入" + resource + "!!!");
        queue.addFirst(resource);
        this.notify();
    }

    public synchronized void take() throws InterruptedException {
        while (queue.size() <= 0) {
            // 隊列空了,不能再取東西,輪詢等待生產者插入數據
            System.out.println("消費者:隊列爲空,沒法取出...");
            this.wait();
        }
        System.out.println("消費者:取出消息!!!");
        queue.removeLast();
        this.notify();
    }
}
複製代碼

圖片.png

對比WhileQueue作了哪些改進:

  • 用synchronized保證原子性
  • wait和notify實現等待喚醒

圖片.png

但通常推薦使用notifyAll(爲何?)。咱們給測試程序再加一個生產者線程就知道了:

圖片.png

開始不久後,整個程序全部線程都阻塞了

緣由是:在synchronized機制下,全部等待的線程都在同一個隊列裏,而notify又恰巧是隨機喚醒線程(也就是說,有可能生產者喚醒生產者)。

圖片.png

最終結果是:全部線程都睡覺了...表如今程序上,就是卡住了。

解決辦法是改用notifyAll,**把全部線程都喚醒,而後你們一塊兒參與執行權的競爭。**你是否有疑問:若是和上面同樣,生產者1仍是喚醒生產者2呢?

其實這個假設不成立...使用notifyAll之後就再也不是隨機喚醒某一個線程了,而是喚醒全部線程並從新搶奪執行權。 也就是說,每個線程在進入阻塞以前,都會叫醒其餘全部線程!

等待喚醒機制:condition

wait/notify版本的缺點是隨機喚醒容易出現「己方喚醒己方,最終致使所有線程阻塞」的烏龍事件,雖然wait/notifyAll能解決這個問題,但喚醒所有線程又不夠精確,會形成無謂的線程競爭(實際只須要喚醒敵方線程便可)。

做爲改進版,可使用ReentrantLock的Condition替代synchronized的wait/notify:

/** * Condition版本 */
public class ConditionQueue<T> {
    // 容器,用來裝東西
    private final LinkedList<T> queue = new LinkedList<>();

    // 顯式鎖(相對地,synchronized鎖被稱爲隱式鎖)
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition producerCondition = lock.newCondition();
    private final Condition consumerCondition = lock.newCondition();

    public void put(T resource) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() >= 1) {
                // 隊列滿了,不能再塞東西了,輪詢等待消費者取出數據
                System.out.println("生產者:隊列已滿,沒法插入...");
                // 生產者阻塞
                producerCondition.await();
            }
            System.out.println("生產者:插入" + resource + "!!!");
            queue.addFirst(resource);
            // 生產完畢,喚醒消費者
            consumerCondition.signal();
        } finally {
            lock.unlock();
        }
    }

    public void take() throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() <= 0) {
                // 隊列空了,不能再取東西,輪詢等待生產者插入數據
                System.out.println("消費者:隊列爲空,沒法取出...");
                // 消費者阻塞
                consumerCondition.await();
            }
            System.out.println("消費者:取出消息!!!");
            queue.removeLast();
            // 消費完畢,喚醒生產者
            producerCondition.signal();
        } finally {
            lock.unlock();
        }
    }
}
複製代碼

如何理解Condition呢?你能夠認爲lock.newCondition()建立了一個隊列,調用producerCondition.await()會把生產者線程放入生產者的等待隊列中,當消費者調用producerCondition.signal()時會喚醒從生產者的等待隊列中喚醒一個生產者線程出來工做。

圖片.png

也就是說,ReentrantLock的Condition經過拆分線程等待隊列,讓線程的等待喚醒更加精確了,想喚醒哪一方就喚醒哪一方。

山寨版BlockingQueue

至此,你們應該對線程間通訊有了大體瞭解。若是你仔細觀察,會發現上面其實都採用了阻塞隊列實現。咱們都是先構造一個Queue,而後生產者和消費者直接操做Queue,至因而否阻塞,由Queue內部判斷。這樣封裝的好處是,將生產者和消費者解耦的同時,不暴露過多細節,使用起來更簡單。

你們應該都聽過JDK的阻塞隊列吧?基於上面的案例,咱們改進一下,抽取出一個自定義的阻塞隊列(使用wait/nofityAll實現):

public class BlockingQueue<T> {

    // 模擬隊列
    private final LinkedList<T> queue = new LinkedList<>();

    private int MAX_SIZE = 1;
    private int remainCount = 0;

    public BlockingQueue(int capacity) {
        if (capacity <= 0) {
            throw new IllegalArgumentException("size最小爲1");
        }
        this.MAX_SIZE = capacity;
    }

    public synchronized void put(T resource) throws InterruptedException {
        while (queue.size() >= MAX_SIZE) {
            // 隊列滿了,不能再塞東西了,阻塞生產者
            System.out.println("插入阻塞...");
            this.wait();
        }
        queue.addFirst(resource);
        remainCount++;
        printMsg(resource, "被插入");
        this.notifyAll();
    }

    public synchronized T take() throws InterruptedException {
        while (queue.size() <= 0) {
            // 隊列空了,不能再取東西了,阻塞消費者
            System.out.println("取出阻塞...");
            this.wait();
        }
        T resource = queue.removeLast();
        remainCount--;
        printMsg(resource, "被取出");
        this.notifyAll();
        return resource;
    }

    private void printMsg(T resource, String operation) throws InterruptedException {
        System.out.println(resource + operation);
        System.out.println("隊列容量:" + remainCount);
    }
}
複製代碼

圖片.png

JDK BlockingQueue簡介

雖然不少人開口閉口「阻塞隊列」,但「阻塞隊列」在他腦中只是個很模糊的概念。連「阻塞隊列」的前因後果都不甚清楚,又怎麼能說了解呢?

實際上,和List、Set同樣,「阻塞隊列」也有本身的一脈。在JDK的util包下有一個Queue接口:

圖片.png

若是你繼續往下扒,就會發現Queue和List其實很像,也是集合的一個分支罷了:

圖片.png

爲何不少人會以爲阻塞隊列(好比ArrayBlockingQueue)高大上,聽起來比ArrayList牛逼呢?主要在於「阻塞」二字!由於你們不瞭解阻塞,本身也不知道怎麼實現阻塞,因此會以爲阻塞隊列很神祕,很牛逼。但仔細觀察上面的繼承關係你會發現,若是ArrayBlockingQueue沒有實現BlockingQueue接口,那麼它本應該是個普普統統的隊列,而不是阻塞隊列,也就沒有那麼驚豔了。

那麼BlockingQueue作了啥呢?其實啥也沒作,畢竟BlockingQueue只是個接口,而接口只能定義方法...就比如一棟摩天大廈建成了,樓頂有個空中泳池,你以爲很牛逼。那麼,你以爲是當初說「我要樓頂有個大花園」的老闆牛逼仍是把這個方案實現的設計師牛逼呢?

扯遠了,其實BlockingQueue繼承Queue接口後,就定義了幾個方法:

圖片.png

BlockingQueue金口一開,後面的小弟只能知足,因此幾個阻塞隊列的實現類都有上面的幾個方法。

圖片.png

那麼阻塞隊列的「阻塞」是怎麼實現的呢?以ArrayBlockingQueue爲例,經過上面的繼承關係分析,Queue和BlockingQueue是接口,裏面只有方法定義沒有具體實現,有可能實現「阻塞」功能的要麼在AbstractQueue,要麼就是ArrayBlockingQueue自身。咱們查看AbstractQueue發現這傢伙幾乎啥都沒寫...

圖片.png

也就是說,當初老闆發話「我但願這個隊列能阻塞」,經理微笑着滿口答應,結果轉手就交給3個小弟本身整了。好在3個小弟爭氣,還真給他們搞出來了...

經常使用的3個阻塞隊列:

  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • SynchronousQueue

仍是以ArrayBlockingQueue爲例,它是怎麼實現阻塞的呢?

圖片.png

圖片.png

好傢伙...居然用了ReentrantLock,這和咱們上面案例中寫的ConditionQueue好像啊!

圖片.png

可是ArrayBlockingQueue只有notFull.await(),沒看到signal(),不合理。仔細找找,惟一的多是ArrayBlockingQueue把signal()藏在enqueue(e)方法裏了:

圖片.png

其餘兩個阻塞隊列LinkedBlockingQueue和SynchronousQueue同理,也是用ReentrantLock實現阻塞的。

展望AQS

看到這裏,相信阻塞隊列在你們心中已經再也不那麼神聖了,有什麼了不得啊,咱們本身也能寫啊,還用了好幾種方式實現呢!可是捫心自問,阻塞隊列總共也就:

  • 阻塞
  • 隊列

而咱們所謂的手寫阻塞隊列,實際上是這樣的:隊列直接用了LinkedList,阻塞也是借用wait/notify和ReentrantLock實現的。也就是說,咱們其實只是作了組裝工做,拿現成的隊列+阻塞功能拼出了一個阻塞隊列。

世間路千萬條,總有人不走尋常路。按理說現成的List+wait/notifyAll已經能夠造出阻塞隊列了,但就是有大佬不知足。

Doug Lea老爺子震驚的說:

What?! Why you don't say earlly ya! I have already finished the AQS le...

是的,又是這個男人,他整出了一個AQS,再把AQS塞到ReentrantLock中,最後用ReentrantLock+數組、ReentrantLock+鏈表、ReentrantLock+Transfer搞出了ArrayBlockingQueue、LinkedBlockingQueue和SynchronousQueue...阻塞隊列只能算順便的,他的初衷實際上是利用AQS統一併簡化鎖的實現,屏蔽同步狀態管理、阻塞線程的排隊和通知、喚醒機制等,讓後續的二次開發更簡便。

換句話說:

若是你糾結於阻塞隊列怎麼實現,那你的格局就過小了...JDK的阻塞隊列依賴於ReentrantLock,而ReentrantLock只是對AQS的淺封裝,真正須要咱們花功夫學習的其實有且只有AQS。

我是bravo1988,點個贊吧,求你了。短短半個月,我從「壯志凌雲,想把知乎3w關注帶到掘金」,轉變爲「垂頭喪氣,想把掘金30關注帶回知乎」。

よろしく・つづく

我昨晚夢見你了.gif

往期文章:

漫畫:從JVM鎖扯到Redis分佈式鎖

深刻淺出Java線程基礎

深刻淺出Java註解

Tomcat外傳:孤獨的小貓咪

相關文章
相關標籤/搜索