生產者消費者模式在咱們平常工做中用得很是多,好比:在模塊解耦、消息隊列、分佈式場景中都很常見。這個模式裏有三個角色,他們之間的關係是以下圖這樣的:前端
從圖中 3 和 4 能夠知道:不管阻塞隊列是滿仍是空均可能會產生阻塞,阻塞以後就要在合適的時候去喚醒被阻塞的線程。java
Q1:那何時會喚醒阻塞線程?面試
當消費者判斷隊列爲空時,消費者線程進入等待。這期間生產者一旦往隊列中放入數據,就會通知全部的消費者,喚醒阻塞的消費者線程。算法
反之,當生產者判斷隊列已滿,生產者線程進入等待。這期間消費者一旦消費了數據、隊列有空位,就會通知全部的生產者,喚醒阻塞的生產者線程。數據庫
Q2:爲何要用這種模式?編程
看了上面的 Q1,你們發現沒有?生產者不用管消費者的動做,消費者也不用管生產者的動做;它兩之間就是經過阻塞隊列通訊,實現了解耦;阻塞隊列的加入,平衡兩者能力;生產者只有在隊列滿或消費者只有在隊列空時纔會等待,其餘時間誰搶到鎖誰工做,提升效率。以上就是緣由~設計模式
上篇文章《正確使用 wait、notify/notifyAll》說過,wait 讓當前線程等待並釋放鎖,notify 喚醒任意一個等待同一個鎖的線程,notifyAll 則是喚醒全部等待該鎖的線程,而後誰搶到鎖,誰執行。這就是所謂的等待喚醒機制安全
先來看看用等待喚醒機制如何實現生產者、消費者模式的,首先是阻塞隊列:微信
public class MyBlockingQueue { private int maxSize; private LinkedList<Integer> queue; public MyBlockingQueue(int size) { this.maxSize = size; queue = new LinkedList<>(); } public synchronized void put() throws InterruptedException { while (queue.size() == maxSize) { System.out.println("隊列已滿,生產者: " + Thread.currentThread().getName() +"進入等待"); wait(); } Random random = new Random(); int i = random.nextInt(); System.out.println("隊列未滿,生產者: " + Thread.currentThread().getName() +"放入數據" + i); // 隊列空纔去喚醒消費者,其餘時間自由競爭鎖 if (queue.size() == 0) { notifyAll(); } queue.add(i); } public synchronized void take() throws InterruptedException { while (queue.size() == 0) { System.out.println("隊列爲空,消費者: " + Thread.currentThread().getName() +"進入等待"); wait(); } // 隊列滿了纔去喚醒生產者,其餘時間自由競爭鎖 if (queue.size() == maxSize) { notifyAll(); } System.out.println("隊列有數據,消費者: " + Thread.currentThread().getName() +"取出數據: " + queue.remove()); } }
主要邏輯在阻塞隊列這邊:先看 put 方法,while 檢查隊列是否滿?滿則進入等待並主動釋放鎖,不滿則生產數據,同時判斷放入數據以前隊列是否空?空則喚醒消費者(由於隊列已有數據,可消費)。數據結構
再看 take 方法,while 檢查隊列是否空?空則進入等待並主動釋放鎖,不空則生產數據,同時判斷取出數據以前隊列是否已滿?滿則喚醒生產者(由於隊列已有空位,可生產)。
爲何是 while 不是 if ?
你們可能有個疑問。爲何判斷隊列 size 進入等待狀態這裏是用 while,不能用 if 嗎?就這個 demo 而言,是能夠的。由於咱們的生產者和消費者線程都只有一個,可是多線程狀況下用 if 就大錯特錯了。想象如下狀況:
假設有兩個消費者一個生產者。隊列爲空,消費者一進入等待狀態,釋放鎖。消費者二搶到鎖,進入 if(queue.size == 0) 的判斷,也進入等待,釋放鎖。這時生產者搶到鎖生產數據,隊列有數據了。反過來喚醒兩個消費者。
消費者一搶到鎖執行 wait() 後的邏輯,取完數據釋放鎖。這時消費者二拿到鎖,執行 wait() 後的邏輯取數據,可是此時隊列的數據已被消費者一取出,沒有數據了,這時就會報異常了。
而用 while 爲何能夠?由於不論是消費者一仍是二搶到鎖,循環體的邏輯以前。根據 while 的語法,它會再一次判斷條件是否成立,而 if 不會。這就是用 while 不用 if 的緣由。
生產者:
public class Producer implements Runnable { private MyBlockingQueue myBlockingQueue; public Producer(MyBlockingQueue myBlockingQueue) { this.myBlockingQueue = myBlockingQueue; } @Override public void run() { for (int i = 0; i < 100; i++) { try { myBlockingQueue.put(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
消費者:
public class Consumer implements Runnable{ private MyBlockingQueue myBlockingQueue; public Consumer(MyBlockingQueue myBlockingQueue) { this.myBlockingQueue = myBlockingQueue; } @Override public void run() { for (int i = 0; i < 100; i++) { try { myBlockingQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
測試類:
public class MyBlockingQueueTest { public static void main(String[] args) { MyBlockingQueue myBlockingQueue = new MyBlockingQueue(10); Producer producer = new Producer(myBlockingQueue); Consumer consumer = new Consumer(myBlockingQueue); new Thread(producer).start(); new Thread(consumer).start(); } }
Condition 是一個多線程間協調通訊的工具類,它的 await、sign/signAll 方法正好對應Object 的 wait、notify/notifyAll 方法。相比於 Object 的 wait、notify 方法,Condition 的 await、signal 結合的方式實現線程間協做更加安全和高效,因此更推薦這種方式實現線程間協做。關於 Condition 後面章節會繼續研究,敬請關注
Object 的 wait、notify 方式須要結合 synchronized 關鍵字實現等待喚醒機制,一樣 Condition 也須要結合 Lock 類-。那麼這種方式如何實現生產者、消費者模式?看代碼:
public class MyBlockingQueueForCondition { private Queue<Integer> queue; private int max = 10; private ReentrantLock lock = new ReentrantLock(); private Condition notEmpty = lock.newCondition(); private Condition notFull = lock.newCondition(); public MyBlockingQueueForCondition(int size) { this.max = size; queue = new LinkedList(); } public void put(Integer i) throws InterruptedException { // 加鎖 lock.lock(); try { // 隊列滿了,進入等待 while (queue.size() == max) { System.out.println("隊列已滿,生產者: " + Thread.currentThread().getName() + "進入等待"); notFull.await(); } // 加入數據以前,隊列爲空?通知消費者,能夠消費 if (queue.size() == 0) { notEmpty.signalAll(); } // 不然,繼續生產 queue.add(i); } finally { // 最後別忘記釋放鎖 lock.unlock(); } } public Integer take() throws InterruptedException { // 加鎖 lock.lock(); try { // 隊列無數據,進入等待 while (queue.size() == 0) { System.out.println("隊列爲空,消費者: " + Thread.currentThread().getName() + "進入等待"); notEmpty.await(); } // 取出數據以前,隊列已滿?通知生產者,能夠生產 if (queue.size() == max) { notFull.signalAll(); } // 不然,取出 return queue.remove(); } finally { // 最後別忘記釋放鎖 lock.unlock(); } } }
首先,定義了一個隊列以及 ReentrantLock 類型的鎖,在這基礎上還建立 notFull、notEmpty 兩個條件,分別表明未滿、不爲空的條件。最後定義了 take、put 方法。
take 和 put 邏輯差很少,這裏只說 put 。由於消費生產模式確定用於多線程環境,須要保證同步。這裏仍是先獲取鎖,確保同步。以後依然是判斷隊列是否已滿?滿了進入等待並釋放鎖,不滿則繼續生產,同時判斷隊列在生產前是否爲空,爲空纔去喚醒消費者。不然不喚醒,由於當隊列爲空消費者才進入阻塞。
PS:最後是一個很是重要的細節,在 finally 裏面釋放鎖,不然有可能出現異常沒法釋放鎖的狀況。
生產者:
public class ProducerForCondition implements Runnable { private MyBlockingQueueForCondition myBlockingQueueForCondition; public ProducerForCondition(MyBlockingQueueForCondition myBlockingQueueForCondition) { this.myBlockingQueueForCondition = myBlockingQueueForCondition; } @Override public void run() { for (int i = 0; i < 100; i++) { try { myBlockingQueueForCondition.put(i); } catch (InterruptedException e) { e.printStackTrace(); } } } }
消費者:
public class ConsumerForCondition implements Runnable{ private MyBlockingQueueForCondition myBlockingQueueForCondition; public ConsumerForCondition(MyBlockingQueueForCondition myBlockingQueueForCondition) { this.myBlockingQueueForCondition = myBlockingQueueForCondition; } @Override public void run() { for (int i = 0; i < 100; i++) { try { System.out.println("消費者取出數據: " + myBlockingQueueForCondition.take()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
測試類:
public class MyBlockingQueueForConditionTest { public static void main(String[] args) { MyBlockingQueueForCondition myBlockingQueueForCondition = new MyBlockingQueueForCondition(10); ProducerForCondition producerForCondition = new ProducerForCondition(myBlockingQueueForCondition); ConsumerForCondition consumerForCondition = new ConsumerForCondition(myBlockingQueueForCondition); new Thread(producerForCondition).start(); new Thread(consumerForCondition).start(); } }
看完前兩種方式以後,有些小夥伴可能會說,實現個生產者消費者這麼煩麼?其實主要代碼仍是在阻塞隊列,這點 Java 早就爲咱們考慮好了,它提供了 BlockingQueue 接口,並有實現類: ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、等。(關於阻塞隊列,狗哥的多線程系列後面也會講到)
咱們選用最簡單的 ArrayBlockingQueue 實現。它的內部也是採起 ReentrantLock 和 Condition 結合的等待喚醒機制。因此,上面的兩種方式實際上是爲這種方式鋪墊。很少比比,上代碼:
public class ArrayBlockingQueueTest { public static void main(String[] args) { // 初始化長度爲 10 的 ArrayBlockingQueue BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10); // 生產者 Runnable producer = () -> { try { // 放入數據 Random random = new Random(); while (true) { queue.put(random.nextInt()); } } catch (Exception e) { System.out.println("生產數據出錯: " + e.getMessage()); } }; // 開啓線程生產數據 new Thread(producer).start(); // 消費者 Runnable consumer = () -> { try { // 取出數據 while (true) { System.out.println(queue.take()); } } catch (Exception e) { System.out.println("消費數據出錯: " + e.getMessage()); } }; // 開啓線程消費數據 new Thread(consumer).start(); } }
建立一個 ArrayBlockingQueue 並給定最大長度爲 10,建立生產者和消費者。生產者在 while(true) 裏面一直生產,與此同時消費者也是不斷取數據,有數據就取出來。
看着是否是很簡單?但其實背後 ArrayBlockingQueue 已經爲咱們作好了線程間通訊的工做了,好比隊列滿了就去阻塞生產者線程,隊列有空就去喚醒生產者線程等。
看了這幾個例子以後,相信你對生產者消費者模式也有所瞭解。之後面試官讓你手寫一個阻塞隊列,確定也難不倒你。
若是看到這裏,喜歡這篇文章的話,請幫點個好看。微信搜索一個優秀的廢人,關注後回覆電子書送你 100+ 本編程電子書 ,不僅 Java 哦,詳情看下圖。回覆 1024送你一套完整的 java 視頻教程。