生產者消費者模式是程序設計中很是常見的一種設計模式,被普遍運用在解耦、消息隊列等場景。java
使用生產者消費者模式一般須要在二者之間增長一個阻塞隊列做爲媒介,有了媒介以後就至關於有了一個緩衝,平衡了二者的能力。編程
總體如上圖所示,最上面是阻塞隊列,右側的 1 是生產者線程,生產者在生產數據後將數據存放在阻塞隊列中,左側的 2 是消費者線程,消費者獲取阻塞隊列中的數據。設計模式
而中間的 3 和 4 分別表明生產者消費者之間互相通訊的過程,由於不管阻塞隊列是滿仍是空均可能會產生阻塞,阻塞以後就須要在合適的時機去喚醒被阻塞的線程。多線程
那麼何時阻塞線程須要被喚醒呢?有兩種狀況。編程語言
第一種狀況是當消費者看到阻塞隊列爲空時,開始進入等待,這時生產者一旦往隊列中放入數據,就會通知全部的消費者,喚醒阻塞的消費者線程。ide
另外一種狀況是若是生產者發現隊列已經滿了,也會被阻塞,而一旦消費者獲取數據以後就至關於隊列空了一個位置,這時消費者就會通知全部正在阻塞的生產者進行生產。this
import java.util.concurrent.ArrayBlockingQueue; /** * 使用阻塞隊列實現一個生產者與消費者模型 * * @author xiandongxie */ public class ProducerAndConsumer { private static ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(10); public static void main(String[] args) throws InterruptedException { Producer producer = new Producer(); Consumer consumer = new Consumer(); Thread producer1 = new Thread(producer, "producer-1"); Thread producer2 = new Thread(producer, "producer-2"); Thread consumer1 = new Thread(consumer, "consumer-2"); Thread consumer2 = new Thread(consumer, "consumer-2"); producer1.start(); producer2.start(); consumer1.start(); consumer2.start(); Thread.sleep(5); producer1.interrupt(); Thread.sleep(5); producer2.interrupt(); Thread.sleep(5); consumer1.interrupt(); consumer2.interrupt(); } static class Producer implements Runnable { @Override public void run() { int count = 0; while (true && !Thread.currentThread().isInterrupted()) { count++; String message = Thread.currentThread().getName() + " message=" + count; try { queue.put(message); } catch (InterruptedException e) { Thread.currentThread().interrupt(); e.printStackTrace(); } } } } static class Consumer implements Runnable { @Override public void run() { while (true && !Thread.currentThread().isInterrupted()) { try { String take = queue.take(); System.out.println(Thread.currentThread().getName() + ",消費信息:" + take); } catch (InterruptedException e) { Thread.currentThread().interrupt(); e.printStackTrace(); } } } } }
import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * 採用 Condition 自定義阻塞隊列實現消費者與生產者 * * @author xiandongxie */ public class MyBlockingQueueForCondition<E> { private Queue<E> queue; private int max = 16; private ReentrantLock lock = new ReentrantLock(); // 沒有空,則消費者能夠消費,標記 消費者 private Condition notEmpty = lock.newCondition(); // 沒有滿,則生產者能夠生產,標記 生產者 private Condition notFull = lock.newCondition(); public MyBlockingQueueForCondition(int size) { this.max = size; queue = new LinkedList(); } public void put(E o) throws InterruptedException { lock.lock(); try { while (queue.size() == max) { // 若是滿了,阻塞生產者線程,釋放 Lock notFull.await(); } queue.add(o); // 有數據了,通知等待的消費者,並喚醒 notEmpty.signalAll(); } finally { lock.unlock(); } } public E take() throws InterruptedException { lock.lock(); try { while (queue.size() == 0) { // 若是爲空,阻塞消費者線程 notEmpty.await(); } E item = queue.remove(); // queue 未滿,喚醒生產者 notFull.signalAll(); return item; } finally { lock.unlock(); } } }
import java.util.LinkedList; /** * 採用 wait,notify 實現阻塞隊列 * * @author xiandongxie */ public class MyBlockingQueue<E> { private int maxSize; private LinkedList<E> storage; public MyBlockingQueue(int maxSize) { this.maxSize = maxSize; storage = new LinkedList<>(); } public synchronized void put(E e) throws InterruptedException { try { while (storage.size() == maxSize) { // 滿了 wait(); } storage.add(e); } finally { notifyAll(); } } public synchronized E take() throws InterruptedException { try { while (storage.size() == 0) { // 沒有數據 wait(); } return storage.remove(); } finally { notifyAll(); } } }