考查Java的併發編程時,手寫「生產者-消費者模型」是一個經典問題。有以下幾個考點:java
JDK版本:oracle java 1.8.0_102node
本文主要概括了4種寫法,閱讀後,最好在白板上練習幾遍,檢查本身是否掌握。這4種寫法或者編程接口不一樣,或者併發粒度不一樣,但本質是相同的——都是在使用或實現BlockingQueue。git
網上有不少生產者-消費者模型的定義和實現。本文研究最經常使用的有界生產者-消費者模型,簡單歸納以下:github
可經過以下條件驗證模型實現的正確性:面試
該模型的應用和變種很是多,不贅述。編程
面試時可語言說明如下準備代碼。關鍵部分須要實現,如AbstractConsumer。bash
下面會涉及多種生產者-消費者模型的實現,能夠先抽象出關鍵的接口,並實現一些抽象類:服務器
public interface Consumer { void consume() throws InterruptedException; } 複製代碼
public interface Producer { void produce() throws InterruptedException; } 複製代碼
abstract class AbstractConsumer implements Consumer, Runnable { @Override public void run() { while (true) { try { consume(); } catch (InterruptedException e) { e.printStackTrace(); break; } } } } 複製代碼
abstract class AbstractProducer implements Producer, Runnable { @Override public void run() { while (true) { try { produce(); } catch (InterruptedException e) { e.printStackTrace(); break; } } } } 複製代碼
不一樣的模型實現中,生產者、消費者的具體實現也不一樣,因此須要爲模型定義抽象工廠方法:markdown
public interface Model { Runnable newRunnableConsumer(); Runnable newRunnableProducer(); } 複製代碼
咱們將Task做爲生產和消費的單位:併發
public class Task { public int no; public Task(int no) { this.no = no; } } 複製代碼
若是需求還不明確(這符合大部分工程工做的實際狀況),建議邊實現邊抽象,不要「面向將來編程」。
BlockingQueue的寫法最簡單。核心思想是,把併發和容量控制封裝在緩衝區中。而BlockingQueue的性質天生知足這個要求。
public class BlockingQueueModel implements Model { private final BlockingQueue<Task> queue; private final AtomicInteger increTaskNo = new AtomicInteger(0); public BlockingQueueModel(int cap) { // LinkedBlockingQueue 的隊列是 lazy-init 的,但 ArrayBlockingQueue 在建立時就已經 init this.queue = new LinkedBlockingQueue<>(cap); } @Override public Runnable newRunnableConsumer() { return new ConsumerImpl(); } @Override public Runnable newRunnableProducer() { return new ProducerImpl(); } private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable { @Override public void consume() throws InterruptedException { Task task = queue.take(); // 固定時間範圍的消費,模擬相對穩定的服務器處理過程 Thread.sleep(500 + (long) (Math.random() * 500)); System.out.println("consume: " + task.no); } } private class ProducerImpl extends AbstractProducer implements Producer, Runnable { @Override public void produce() throws InterruptedException { // 不按期生產,模擬隨機的用戶請求 Thread.sleep((long) (Math.random() * 1000)); Task task = new Task(increTaskNo.getAndIncrement()); System.out.println("produce: " + task.no); queue.put(task); } } public static void main(String[] args) { Model model = new BlockingQueueModel(3); for (int i = 0; i < 2; i++) { new Thread(model.newRunnableConsumer()).start(); } for (int i = 0; i < 5; i++) { new Thread(model.newRunnableProducer()).start(); } } } 複製代碼
截取前面的一部分輸出:
produce: 0
produce: 4
produce: 2
produce: 3
produce: 5
consume: 0
produce: 1
consume: 4
produce: 7
consume: 2
produce: 8
consume: 3
produce: 6
consume: 5
produce: 9
consume: 1
produce: 10
consume: 7
複製代碼
因爲操做「出隊/入隊+日誌輸出」不是原子的,因此上述日誌的絕對順序與實際的出隊/入隊順序有出入,但對於同一個任務號task.no
,其consume日誌必定出如今其produce日誌以後,即:同一任務的消費行爲必定發生在生產行爲以後。緩衝區的容量留給讀者驗證。符合兩個驗證條件。
BlockingQueue寫法的核心只有兩行代碼,併發和容量控制都封裝在了BlockingQueue中,正確性由BlockingQueue保證。面試中首選該寫法,天然美觀簡單。
勘誤:
在簡書回覆一個讀者的時候,順道發現了這個問題:生產日誌應放在入隊操做以前,不然同一個task的生產日誌可能出如今消費日誌以後。
// 舊的錯誤代碼 queue.put(task); System.out.println("produce: " + task.no); 複製代碼// 正確代碼 System.out.println("produce: " + task.no); queue.put(task); 複製代碼具體來講,生產日誌應放在入隊操做以前,消費日誌應放在出隊操做以後,以保障:
- 消費線程中queue.take()返回以後,對應生產線程(生產該task的線程)中queue.put()及以前的行爲,對於消費線程來講都是可見的。
想一想爲何呢?由於咱們須要藉助「queue.put()與queue.take()的偏序關係」。其餘實現方案分別藉助了條件隊列、鎖的偏序關係,不存在該問題。要解釋這個問題,須要讀者明白可見性和Happens-Before的概念,篇幅所限,暫時很少解釋。
PS:舊代碼沒出現這個問題,是由於消費者打印消費日誌以前,sleep了500+ms,而恰巧競爭不激烈,這個時間通常足以讓「滯後」生產日誌打印完成(但不保證)。
順道說明一下,猴子如今主要在我的博客、簡書、掘金和CSDN上發文章,搜索「猴子007」或「程序猿說你好」都能找到。但我的精力有限,部分勘誤不免忘記同步到某些地方(甚至連新文章都不一樣步了T_T),只能保證我的博客是最新的,還望理解。
寫文章不是爲了出名,一方面但願整理本身的學習成果,一方面但願有更多人能幫助猴子糾正學習過程當中的錯誤。若是能認識一些志同道合的朋友,一塊兒提升就更好了。因此但願各位轉載的時候,必定帶着猴子我的博客末尾的轉載聲明。須要聯繫猴子的話,簡書或郵件均可以。
文章水平不高,就不奢求有人能打賞鼓勵我這潑猴了T_T
若是不能將併發與容量控制都封裝在緩衝區中,就只能由消費者與生產者完成。最簡單的方案是使用樸素的wait && notify
機制。
public class WaitNotifyModel implements Model { private final Object BUFFER_LOCK = new Object(); private final Queue<Task> buffer = new LinkedList<>(); private final int cap; private final AtomicInteger increTaskNo = new AtomicInteger(0); public WaitNotifyModel(int cap) { this.cap = cap; } @Override public Runnable newRunnableConsumer() { return new ConsumerImpl(); } @Override public Runnable newRunnableProducer() { return new ProducerImpl(); } private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable { @Override public void consume() throws InterruptedException { synchronized (BUFFER_LOCK) { while (buffer.size() == 0) { BUFFER_LOCK.wait(); } Task task = buffer.poll(); assert task != null; // 固定時間範圍的消費,模擬相對穩定的服務器處理過程 Thread.sleep(500 + (long) (Math.random() * 500)); System.out.println("consume: " + task.no); BUFFER_LOCK.notifyAll(); } } } private class ProducerImpl extends AbstractProducer implements Producer, Runnable { @Override public void produce() throws InterruptedException { // 不按期生產,模擬隨機的用戶請求 Thread.sleep((long) (Math.random() * 1000)); synchronized (BUFFER_LOCK) { while (buffer.size() == cap) { BUFFER_LOCK.wait(); } Task task = new Task(increTaskNo.getAndIncrement()); buffer.offer(task); System.out.println("produce: " + task.no); BUFFER_LOCK.notifyAll(); } } } public static void main(String[] args) { Model model = new WaitNotifyModel(3); for (int i = 0; i < 2; i++) { new Thread(model.newRunnableConsumer()).start(); } for (int i = 0; i < 5; i++) { new Thread(model.newRunnableProducer()).start(); } } } 複製代碼
驗證方法同上。
樸素的wait && notify
機制不那麼靈活,但足夠簡單。synchronized、wait、notifyAll的用法可參考【Java併發編程】之十:使用wait/notify/notifyAll實現線程間通訊的幾點重要說明,着重理解喚醒與鎖競爭的區別。
咱們要保證理解wait && notify
機制。實現時可使用Object類提供的wait()方法與notifyAll()方法,但更推薦的方式是使用java.util.concurrent包提供的Lock && Condition
。
public class LockConditionModel1 implements Model { private final Lock BUFFER_LOCK = new ReentrantLock(); private final Condition BUFFER_COND = BUFFER_LOCK.newCondition(); private final Queue<Task> buffer = new LinkedList<>(); private final int cap; private final AtomicInteger increTaskNo = new AtomicInteger(0); public LockConditionModel1(int cap) { this.cap = cap; } @Override public Runnable newRunnableConsumer() { return new ConsumerImpl(); } @Override public Runnable newRunnableProducer() { return new ProducerImpl(); } private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable { @Override public void consume() throws InterruptedException { BUFFER_LOCK.lockInterruptibly(); try { while (buffer.size() == 0) { BUFFER_COND.await(); } Task task = buffer.poll(); assert task != null; // 固定時間範圍的消費,模擬相對穩定的服務器處理過程 Thread.sleep(500 + (long) (Math.random() * 500)); System.out.println("consume: " + task.no); BUFFER_COND.signalAll(); } finally { BUFFER_LOCK.unlock(); } } } private class ProducerImpl extends AbstractProducer implements Producer, Runnable { @Override public void produce() throws InterruptedException { // 不按期生產,模擬隨機的用戶請求 Thread.sleep((long) (Math.random() * 1000)); BUFFER_LOCK.lockInterruptibly(); try { while (buffer.size() == cap) { BUFFER_COND.await(); } Task task = new Task(increTaskNo.getAndIncrement()); buffer.offer(task); System.out.println("produce: " + task.no); BUFFER_COND.signalAll(); } finally { BUFFER_LOCK.unlock(); } } } public static void main(String[] args) { Model model = new LockConditionModel1(3); for (int i = 0; i < 2; i++) { new Thread(model.newRunnableConsumer()).start(); } for (int i = 0; i < 5; i++) { new Thread(model.newRunnableProducer()).start(); } } } 複製代碼
該寫法的思路與實現二的思路徹底相同,僅僅將鎖與條件變量換成了Lock和Condition。
如今,若是作一些實驗,你會發現,實現一的併發性能高於實現2、三。暫且不關心BlockingQueue的具體實現,來分析看如何優化實現三(與實現二的思路相同,性能至關)的性能。
最好的查證方法是記錄方法執行時間,這樣能夠直接定位到真正的瓶頸。但此問題較簡單,咱們直接用「瞪眼法」分析。
實現三的併發瓶頸很明顯,由於在鎖 BUFFER_LOCK
看來,任何消費者線程與生產者線程都是同樣的。換句話說,同一時刻,最多隻容許有一個線程(生產者或消費者,二選一)操做緩衝區 buffer。
而實際上,若是緩衝區是一個隊列的話,「生產者將產品入隊」與「消費者將產品出隊」兩個操做之間沒有同步關係,能夠在隊首出隊的同時,在隊尾入隊。理想性能可提高至實現三的兩倍。
那麼思路就簡單了:須要兩個鎖 CONSUME_LOCK
與PRODUCE_LOCK
,CONSUME_LOCK
控制消費者線程併發出隊,PRODUCE_LOCK
控制生產者線程併發入隊;相應須要兩個條件變量NOT_EMPTY
與NOT_FULL
,NOT_EMPTY
負責控制消費者線程的狀態(阻塞、運行),NOT_FULL
負責控制生產者線程的狀態(阻塞、運行)。以此讓優化消費者與消費者(或生產者與生產者)之間是串行的;消費者與生產者之間是並行的。
public class LockConditionModel2 implements Model { private final Lock CONSUME_LOCK = new ReentrantLock(); private final Condition NOT_EMPTY = CONSUME_LOCK.newCondition(); private final Lock PRODUCE_LOCK = new ReentrantLock(); private final Condition NOT_FULL = PRODUCE_LOCK.newCondition(); private final Buffer<Task> buffer = new Buffer<>(); private AtomicInteger bufLen = new AtomicInteger(0); private final int cap; private final AtomicInteger increTaskNo = new AtomicInteger(0); public LockConditionModel2(int cap) { this.cap = cap; } @Override public Runnable newRunnableConsumer() { return new ConsumerImpl(); } @Override public Runnable newRunnableProducer() { return new ProducerImpl(); } private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable { @Override public void consume() throws InterruptedException { int newBufSize = -1; CONSUME_LOCK.lockInterruptibly(); try { while (bufLen.get() == 0) { System.out.println("buffer is empty..."); NOT_EMPTY.await(); } Task task = buffer.poll(); newBufSize = bufLen.decrementAndGet(); assert task != null; // 固定時間範圍的消費,模擬相對穩定的服務器處理過程 Thread.sleep(500 + (long) (Math.random() * 500)); System.out.println("consume: " + task.no); if (newBufSize > 0) { NOT_EMPTY.signalAll(); } } finally { CONSUME_LOCK.unlock(); } if (newBufSize < cap) { PRODUCE_LOCK.lockInterruptibly(); try { NOT_FULL.signalAll(); } finally { PRODUCE_LOCK.unlock(); } } } } private class ProducerImpl extends AbstractProducer implements Producer, Runnable { @Override public void produce() throws InterruptedException { // 不按期生產,模擬隨機的用戶請求 Thread.sleep((long) (Math.random() * 1000)); int newBufSize = -1; PRODUCE_LOCK.lockInterruptibly(); try { while (bufLen.get() == cap) { System.out.println("buffer is full..."); NOT_FULL.await(); } Task task = new Task(increTaskNo.getAndIncrement()); buffer.offer(task); newBufSize = bufLen.incrementAndGet(); System.out.println("produce: " + task.no); if (newBufSize < cap) { NOT_FULL.signalAll(); } } finally { PRODUCE_LOCK.unlock(); } if (newBufSize > 0) { CONSUME_LOCK.lockInterruptibly(); try { NOT_EMPTY.signalAll(); } finally { CONSUME_LOCK.unlock(); } } } } private static class Buffer<E> { private Node head; private Node tail; Buffer() { // dummy node head = tail = new Node(null); } public void offer(E e) { tail.next = new Node(e); tail = tail.next; } public E poll() { head = head.next; E e = head.item; head.item = null; return e; } private class Node { E item; Node next; Node(E item) { this.item = item; } } } public static void main(String[] args) { Model model = new LockConditionModel2(3); for (int i = 0; i < 2; i++) { new Thread(model.newRunnableConsumer()).start(); } for (int i = 0; i < 5; i++) { new Thread(model.newRunnableProducer()).start(); } } 複製代碼
須要注意的是,因爲須要同時在UnThreadSafe的緩衝區 buffer 上進行消費與生產,咱們不能使用實現2、三中使用的隊列了,須要本身實現一個簡單的緩衝區 Buffer。Buffer要知足如下條件:
咱們已經優化掉了消費者與生產者之間的瓶頸,還能進一步優化嗎?
若是能夠,必然是繼續優化消費者與消費者(或生產者與生產者)之間的併發性能。然而,消費者與消費者之間必須是串行的,所以,併發模型上已經沒有地方能夠繼續優化了。
不過在具體的業務場景中,通常還可以繼續優化。如:
文章開頭說:這4種寫法的本質相同——都是在使用或實現BlockingQueue。實現一直接使用BlockingQueue,實現四實現了簡單的BlockingQueue,而實現2、三則實現了退化版的BlockingQueue(性能下降一半)。
實現一使用的BlockingQueue實現類是LinkedBlockingQueue,給出其源碼閱讀對照,寫的不難:
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { ... /** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition(); ... /** * Signals a waiting take. Called only from put/offer (which do not * otherwise ordinarily lock takeLock.) */ private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } /** * Signals a waiting put. Called only from take/poll. */ private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } } /** * Links node at end of queue. * * @param node the node */ private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; } /** * Removes a node from head of queue. * * @return the node */ private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; } ... /** * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity. * * @param capacity the capacity of this queue * @throws IllegalArgumentException if {@code capacity} is not greater * than zero */ public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } ... /** * Inserts the specified element at the tail of this queue, waiting if * necessary for space to become available. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); } ... public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } ... } 複製代碼
還存在很是多的寫法,如信號量
Semaphore
,也很常見(本科操做系統教材中的生產者-消費者模型就是用信號量實現的)。不過追究過多了就好像在糾結茴香豆的寫法同樣,本文不繼續探討。
實現一必須掌握,實現四至少要能清楚表述;實現2、三掌握一個便可。
本文連接:Java實現生產者-消費者模型
做者:猴子007
出處:monkeysayhi.github.io
本文基於 知識共享署名-相同方式共享 4.0 國際許可協議發佈,歡迎轉載,演繹或用於商業目的,可是必須保留本文的署名及連接。