工做三年,小胖問我:什麼是生產者消費者模式?菜到摳腳!

生產者消費者模式在咱們平常工做中用得很是多,好比:在模塊解耦、消息隊列、分佈式場景中都很常見。這個模式裏有三個角色,他們之間的關係是以下圖這樣的:前端

圖源:Java 併發編程 - 徐隆曦

  • 生產者線程:生產消息、數據
  • 消費者線程:消費消息、數據
  • 阻塞隊列:做數據緩衝、平衡兩者能力,避免出現"產能過剩"的狀況(生產者生產速度遠高於消費者消費速度 or 多個生產者對一個消費者)以及"供不該求"的狀況(生產者生產速度遠低於消費者消費速度 or 多個消費者對一個生產者)

從圖中 3 和 4 能夠知道:不管阻塞隊列是滿仍是空均可能會產生阻塞,阻塞以後就要在合適的時候去喚醒被阻塞的線程。java

Q1:那何時會喚醒阻塞線程?面試

  • 當消費者判斷隊列爲空時,消費者線程進入等待。這期間生產者一旦往隊列中放入數據,就會通知全部的消費者,喚醒阻塞的消費者線程。算法

  • 反之,當生產者判斷隊列已滿,生產者線程進入等待。這期間消費者一旦消費了數據、隊列有空位,就會通知全部的生產者,喚醒阻塞的生產者線程。數據庫

Q2:爲何要用這種模式?編程

看了上面的 Q1,你們發現沒有?生產者不用管消費者的動做,消費者也不用管生產者的動做;它兩之間就是經過阻塞隊列通訊,實現了解耦;阻塞隊列的加入,平衡兩者能力;生產者只有在隊列滿或消費者只有在隊列空時纔會等待,其餘時間誰搶到鎖誰工做,提升效率。以上就是緣由~設計模式

使用 wait、notify/notifyAll 實現

上篇文章《正確使用 wait、notify/notifyAll》說過,wait 讓當前線程等待並釋放鎖,notify 喚醒任意一個等待同一個鎖的線程,notifyAll 則是喚醒全部等待該鎖的線程,而後誰搶到鎖,誰執行。這就是所謂的等待喚醒機制安全

先來看看用等待喚醒機制如何實現生產者、消費者模式的,首先是阻塞隊列:微信

public class MyBlockingQueue {

    private int maxSize;
    private LinkedList<Integer> queue;

    public MyBlockingQueue(int size) {
        this.maxSize = size;
        queue = new LinkedList<>();
    }

    public synchronized void put() throws InterruptedException {
        while (queue.size() == maxSize) {
            System.out.println("隊列已滿,生產者: " + Thread.currentThread().getName() +"進入等待");
            wait();
        }
        Random random = new Random();
        int i = random.nextInt();
        System.out.println("隊列未滿,生產者: " +
                Thread.currentThread().getName() +"放入數據" + i);

        // 隊列空纔去喚醒消費者,其餘時間自由競爭鎖
        if (queue.size() == 0) {
            notifyAll();
        }

        queue.add(i);
    }

    public synchronized void take() throws InterruptedException {
        while (queue.size() == 0) {
            System.out.println("隊列爲空,消費者: " + Thread.currentThread().getName() +"進入等待");
            wait();
        }

        // 隊列滿了纔去喚醒生產者,其餘時間自由競爭鎖
        if (queue.size() == maxSize) {
            notifyAll();
        }

        System.out.println("隊列有數據,消費者: " +
                Thread.currentThread().getName() +"取出數據: " + queue.remove());
    }

}

主要邏輯在阻塞隊列這邊:先看 put 方法,while 檢查隊列是否滿?滿則進入等待並主動釋放鎖,不滿則生產數據,同時判斷放入數據以前隊列是否空?空則喚醒消費者(由於隊列已有數據,可消費)。數據結構

再看 take 方法,while 檢查隊列是否空?空則進入等待並主動釋放鎖,不空則生產數據,同時判斷取出數據以前隊列是否已滿?滿則喚醒生產者(由於隊列已有空位,可生產)。

爲何是 while 不是 if ?

你們可能有個疑問。爲何判斷隊列 size 進入等待狀態這裏是用 while,不能用 if 嗎?就這個 demo 而言,是能夠的。由於咱們的生產者和消費者線程都只有一個,可是多線程狀況下用 if 就大錯特錯了。想象如下狀況:

  • 假設有兩個消費者一個生產者。隊列爲空,消費者一進入等待狀態,釋放鎖。消費者二搶到鎖,進入 if(queue.size == 0) 的判斷,也進入等待,釋放鎖。這時生產者搶到鎖生產數據,隊列有數據了。反過來喚醒兩個消費者。

  • 消費者一搶到鎖執行 wait() 後的邏輯,取完數據釋放鎖。這時消費者二拿到鎖,執行 wait() 後的邏輯取數據,可是此時隊列的數據已被消費者一取出,沒有數據了,這時就會報異常了。

而用 while 爲何能夠?由於不論是消費者一仍是二搶到鎖,循環體的邏輯以前。根據 while 的語法,它會再一次判斷條件是否成立,而 if 不會。這就是用 while 不用 if 的緣由。

生產者:

public class Producer implements Runnable {

    private MyBlockingQueue myBlockingQueue;

    public Producer(MyBlockingQueue myBlockingQueue) {
        this.myBlockingQueue = myBlockingQueue;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            try {
                myBlockingQueue.put();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

消費者:

public class Consumer implements Runnable{

    private MyBlockingQueue myBlockingQueue;

    public Consumer(MyBlockingQueue myBlockingQueue) {
        this.myBlockingQueue = myBlockingQueue;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            try {
                myBlockingQueue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

測試類:

public class MyBlockingQueueTest {

    public static void main(String[] args) {
        MyBlockingQueue myBlockingQueue = new MyBlockingQueue(10);
        Producer producer = new Producer(myBlockingQueue);
        Consumer consumer = new Consumer(myBlockingQueue);
        new Thread(producer).start();
        new Thread(consumer).start();
    }

}

使用 Condition 實現

Condition 是一個多線程間協調通訊的工具類,它的 await、sign/signAll 方法正好對應Object 的 wait、notify/notifyAll 方法。相比於 Object 的 wait、notify 方法,Condition 的 await、signal 結合的方式實現線程間協做更加安全和高效,因此更推薦這種方式實現線程間協做。關於 Condition 後面章節會繼續研究,敬請關注

Object 的 wait、notify 方式須要結合 synchronized 關鍵字實現等待喚醒機制,一樣 Condition 也須要結合 Lock 類-。那麼這種方式如何實現生產者、消費者模式?看代碼:

public class MyBlockingQueueForCondition {

    private Queue<Integer> queue;
    private int max = 10;
    private ReentrantLock lock = new ReentrantLock();
    private Condition notEmpty = lock.newCondition();
    private Condition notFull = lock.newCondition();

    public MyBlockingQueueForCondition(int size) {
        this.max = size;
        queue = new LinkedList();
    }

    public void put(Integer i) throws InterruptedException {
        // 加鎖
        lock.lock();
        try {
            // 隊列滿了,進入等待
            while (queue.size() == max) {
                System.out.println("隊列已滿,生產者: " + Thread.currentThread().getName() + "進入等待");
                notFull.await();
            }

            // 加入數據以前,隊列爲空?通知消費者,能夠消費
            if (queue.size() == 0) {
                notEmpty.signalAll();
            }

            // 不然,繼續生產
            queue.add(i);
        } finally {
            // 最後別忘記釋放鎖
            lock.unlock();
        }
    }

    public Integer take() throws InterruptedException {
        // 加鎖
        lock.lock();
        try {
            // 隊列無數據,進入等待
            while (queue.size() == 0) {
                System.out.println("隊列爲空,消費者: " + Thread.currentThread().getName() + "進入等待");
                notEmpty.await();
            }

            // 取出數據以前,隊列已滿?通知生產者,能夠生產
            if (queue.size() == max) {
                notFull.signalAll();
            }

            // 不然,取出
            return queue.remove();
        } finally {
            // 最後別忘記釋放鎖
            lock.unlock();
        }
    }
}

首先,定義了一個隊列以及 ReentrantLock 類型的鎖,在這基礎上還建立 notFull、notEmpty 兩個條件,分別表明未滿、不爲空的條件。最後定義了 take、put 方法。

take 和 put 邏輯差很少,這裏只說 put 。由於消費生產模式確定用於多線程環境,須要保證同步。這裏仍是先獲取鎖,確保同步。以後依然是判斷隊列是否已滿?滿了進入等待並釋放鎖,不滿則繼續生產,同時判斷隊列在生產前是否爲空,爲空纔去喚醒消費者。不然不喚醒,由於當隊列爲空消費者才進入阻塞

PS:最後是一個很是重要的細節,在 finally 裏面釋放鎖,不然有可能出現異常沒法釋放鎖的狀況

生產者:

public class ProducerForCondition implements Runnable {

    private MyBlockingQueueForCondition myBlockingQueueForCondition;

    public ProducerForCondition(MyBlockingQueueForCondition myBlockingQueueForCondition) {
        this.myBlockingQueueForCondition = myBlockingQueueForCondition;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            try {
                myBlockingQueueForCondition.put(i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

消費者:

public class ConsumerForCondition implements Runnable{

    private MyBlockingQueueForCondition myBlockingQueueForCondition;

    public ConsumerForCondition(MyBlockingQueueForCondition myBlockingQueueForCondition) {
        this.myBlockingQueueForCondition = myBlockingQueueForCondition;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            try {
                System.out.println("消費者取出數據: " + myBlockingQueueForCondition.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

測試類:

public class MyBlockingQueueForConditionTest {

    public static void main(String[] args) {
        MyBlockingQueueForCondition myBlockingQueueForCondition = new MyBlockingQueueForCondition(10);
        ProducerForCondition producerForCondition = new ProducerForCondition(myBlockingQueueForCondition);
        ConsumerForCondition consumerForCondition = new ConsumerForCondition(myBlockingQueueForCondition);
        new Thread(producerForCondition).start();
        new Thread(consumerForCondition).start();
    }

}

使用 BlockingQueue 實現

看完前兩種方式以後,有些小夥伴可能會說,實現個生產者消費者這麼煩麼?其實主要代碼仍是在阻塞隊列,這點 Java 早就爲咱們考慮好了,它提供了 BlockingQueue 接口,並有實現類: ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、等。(關於阻塞隊列,狗哥的多線程系列後面也會講到

咱們選用最簡單的 ArrayBlockingQueue 實現。它的內部也是採起 ReentrantLock 和 Condition 結合的等待喚醒機制。因此,上面的兩種方式實際上是爲這種方式鋪墊。很少比比,上代碼:

public class ArrayBlockingQueueTest {

    public static void main(String[] args) {
        // 初始化長度爲 10 的 ArrayBlockingQueue
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
        // 生產者
        Runnable producer = () -> {
            try {
                // 放入數據
                Random random = new Random();
                while (true) {
                    queue.put(random.nextInt());
                }
            } catch (Exception e) {
                System.out.println("生產數據出錯: " + e.getMessage());
            }
        };
        // 開啓線程生產數據
        new Thread(producer).start();

        // 消費者
        Runnable consumer = () -> {
            try {
                // 取出數據
                while (true) {
                    System.out.println(queue.take());
                }
            } catch (Exception e) {
                System.out.println("消費數據出錯: " + e.getMessage());
            }
        };
        // 開啓線程消費數據
        new Thread(consumer).start();
    }

}

建立一個 ArrayBlockingQueue 並給定最大長度爲 10,建立生產者和消費者。生產者在 while(true) 裏面一直生產,與此同時消費者也是不斷取數據,有數據就取出來。

看着是否是很簡單?但其實背後 ArrayBlockingQueue 已經爲咱們作好了線程間通訊的工做了,好比隊列滿了就去阻塞生產者線程,隊列有空就去喚醒生產者線程等

巨人的肩膀

總結

看了這幾個例子以後,相信你對生產者消費者模式也有所瞭解。之後面試官讓你手寫一個阻塞隊列,確定也難不倒你。

小福利

若是看到這裏,喜歡這篇文章的話,請幫點個好看。微信搜索一個優秀的廢人,關注後回覆電子書送你 100+ 本編程電子書 ,不僅 Java 哦,詳情看下圖。回覆 1024送你一套完整的 java 視頻教程。

資源

C語言

C++

Java

Git

Python

GO

Linux

經典必讀

面試相關

前端

人工智能

設計模式

數據庫

數據結構與算法

計算機基礎

相關文章
相關標籤/搜索