Java實現生產者-消費者模型

考查Java的併發編程時,手寫「生產者-消費者模型」是一個經典問題。有以下幾個考點:java

  • 對Java併發模型的理解
  • 對Java併發編程接口的熟練程度
  • bug free
  • coding style

JDK版本:oracle java 1.8.0_102node

本文主要概括了4種寫法,閱讀後,最好在白板上練習幾遍,檢查本身是否掌握。這4種寫法或者編程接口不一樣,或者併發粒度不一樣,但本質是相同的——都是在使用或實現BlockingQueue。git

生產者-消費者模型

網上有不少生產者-消費者模型的定義和實現。本文研究最經常使用的有界生產者-消費者模型,簡單歸納以下:github

  • 生產者持續生產,直到緩衝區滿,阻塞;緩衝區不滿後,繼續生產
  • 消費者持續消費,直到緩衝區空,阻塞;緩衝區不空後,繼續消費
  • 生產者能夠有多個,消費者也能夠有多個

可經過以下條件驗證模型實現的正確性:面試

  • 同一產品的消費行爲必定發生在生產行爲以後
  • 任意時刻,緩衝區大小不小於0,不大於限制容量

該模型的應用和變種很是多,不贅述。編程

幾種寫法

準備

面試時可語言說明如下準備代碼。關鍵部分須要實現,如AbstractConsumer。bash

下面會涉及多種生產者-消費者模型的實現,能夠先抽象出關鍵的接口,並實現一些抽象類:服務器

public interface Consumer {
  void consume() throws InterruptedException;
}
複製代碼
public interface Producer {
  void produce() throws InterruptedException;
}
複製代碼
abstract class AbstractConsumer implements Consumer, Runnable {
  @Override
  public void run() {
    while (true) {
      try {
        consume();
      } catch (InterruptedException e) {
        e.printStackTrace();
        break;
      }
    }
  }
}
複製代碼
abstract class AbstractProducer implements Producer, Runnable {
  @Override
  public void run() {
    while (true) {
      try {
        produce();
      } catch (InterruptedException e) {
        e.printStackTrace();
        break;
      }
    }
  }
}
複製代碼

不一樣的模型實現中,生產者、消費者的具體實現也不一樣,因此須要爲模型定義抽象工廠方法:markdown

public interface Model {
  Runnable newRunnableConsumer();

  Runnable newRunnableProducer();
}
複製代碼

咱們將Task做爲生產和消費的單位:併發

public class Task {
  public int no;

  public Task(int no) {
    this.no = no;
  }
}
複製代碼

若是需求還不明確(這符合大部分工程工做的實際狀況),建議邊實現邊抽象,不要「面向將來編程」

實現一:BlockingQueue

BlockingQueue的寫法最簡單。核心思想是,把併發和容量控制封裝在緩衝區中。而BlockingQueue的性質天生知足這個要求。

public class BlockingQueueModel implements Model {
  private final BlockingQueue<Task> queue;

  private final AtomicInteger increTaskNo = new AtomicInteger(0);

  public BlockingQueueModel(int cap) {
    // LinkedBlockingQueue 的隊列是 lazy-init 的,但 ArrayBlockingQueue 在建立時就已經 init
    this.queue = new LinkedBlockingQueue<>(cap);
  }

  @Override
  public Runnable newRunnableConsumer() {
    return new ConsumerImpl();
  }

  @Override
  public Runnable newRunnableProducer() {
    return new ProducerImpl();
  }

  private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable {
    @Override
    public void consume() throws InterruptedException {
      Task task = queue.take();
      // 固定時間範圍的消費,模擬相對穩定的服務器處理過程
      Thread.sleep(500 + (long) (Math.random() * 500));
      System.out.println("consume: " + task.no);
    }
  }

  private class ProducerImpl extends AbstractProducer implements Producer, Runnable {
    @Override
    public void produce() throws InterruptedException {
      // 不按期生產,模擬隨機的用戶請求
      Thread.sleep((long) (Math.random() * 1000));
      Task task = new Task(increTaskNo.getAndIncrement());
      System.out.println("produce: " + task.no);
      queue.put(task);
    }
  }

  public static void main(String[] args) {
    Model model = new BlockingQueueModel(3);
    for (int i = 0; i < 2; i++) {
      new Thread(model.newRunnableConsumer()).start();
    }
    for (int i = 0; i < 5; i++) {
      new Thread(model.newRunnableProducer()).start();
    }
  }
}
複製代碼

截取前面的一部分輸出:

produce: 0
produce: 4
produce: 2
produce: 3
produce: 5
consume: 0
produce: 1
consume: 4
produce: 7
consume: 2
produce: 8
consume: 3
produce: 6
consume: 5
produce: 9
consume: 1
produce: 10
consume: 7
複製代碼

因爲操做「出隊/入隊+日誌輸出」不是原子的,因此上述日誌的絕對順序與實際的出隊/入隊順序有出入,但對於同一個任務號task.no,其consume日誌必定出如今其produce日誌以後,即:同一任務的消費行爲必定發生在生產行爲以後。緩衝區的容量留給讀者驗證。符合兩個驗證條件。

BlockingQueue寫法的核心只有兩行代碼,併發和容量控制都封裝在了BlockingQueue中,正確性由BlockingQueue保證。面試中首選該寫法,天然美觀簡單。

勘誤:

在簡書回覆一個讀者的時候,順道發現了這個問題:生產日誌應放在入隊操做以前,不然同一個task的生產日誌可能出如今消費日誌以後。

// 舊的錯誤代碼
queue.put(task);
System.out.println("produce: " + task.no);
複製代碼
// 正確代碼
System.out.println("produce: " + task.no);
queue.put(task);
複製代碼

具體來講,生產日誌應放在入隊操做以前,消費日誌應放在出隊操做以後,以保障:

  • 消費線程中queue.take()返回以後,對應生產線程(生產該task的線程)中queue.put()及以前的行爲,對於消費線程來講都是可見的。

想一想爲何呢?由於咱們須要藉助「queue.put()與queue.take()的偏序關係」。其餘實現方案分別藉助了條件隊列、鎖的偏序關係,不存在該問題。要解釋這個問題,須要讀者明白可見性和Happens-Before的概念,篇幅所限,暫時很少解釋。

PS:舊代碼沒出現這個問題,是由於消費者打印消費日誌以前,sleep了500+ms,而恰巧競爭不激烈,這個時間通常足以讓「滯後」生產日誌打印完成(但不保證)。


順道說明一下,猴子如今主要在我的博客、簡書、掘金和CSDN上發文章,搜索「猴子007」或「程序猿說你好」都能找到。但我的精力有限,部分勘誤不免忘記同步到某些地方(甚至連新文章都不一樣步了T_T),只能保證我的博客是最新的,還望理解。

寫文章不是爲了出名,一方面但願整理本身的學習成果,一方面但願有更多人能幫助猴子糾正學習過程當中的錯誤。若是能認識一些志同道合的朋友,一塊兒提升就更好了。因此但願各位轉載的時候,必定帶着猴子我的博客末尾的轉載聲明。須要聯繫猴子的話,簡書或郵件均可以。

文章水平不高,就不奢求有人能打賞鼓勵我這潑猴了T_T

實現二:wait && notify

若是不能將併發與容量控制都封裝在緩衝區中,就只能由消費者與生產者完成。最簡單的方案是使用樸素的wait && notify機制。

public class WaitNotifyModel implements Model {
  private final Object BUFFER_LOCK = new Object();
  private final Queue<Task> buffer = new LinkedList<>();
  private final int cap;

  private final AtomicInteger increTaskNo = new AtomicInteger(0);

  public WaitNotifyModel(int cap) {
    this.cap = cap;
  }

  @Override
  public Runnable newRunnableConsumer() {
    return new ConsumerImpl();
  }

  @Override
  public Runnable newRunnableProducer() {
    return new ProducerImpl();
  }

  private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable {
    @Override
    public void consume() throws InterruptedException {
      synchronized (BUFFER_LOCK) {
        while (buffer.size() == 0) {
          BUFFER_LOCK.wait();
        }
        Task task = buffer.poll();
        assert task != null;
        // 固定時間範圍的消費,模擬相對穩定的服務器處理過程
        Thread.sleep(500 + (long) (Math.random() * 500));
        System.out.println("consume: " + task.no);
        BUFFER_LOCK.notifyAll();
      }
    }
  }

  private class ProducerImpl extends AbstractProducer implements Producer, Runnable {
    @Override
    public void produce() throws InterruptedException {
      // 不按期生產,模擬隨機的用戶請求
      Thread.sleep((long) (Math.random() * 1000));
      synchronized (BUFFER_LOCK) {
        while (buffer.size() == cap) {
          BUFFER_LOCK.wait();
        }
        Task task = new Task(increTaskNo.getAndIncrement());
        buffer.offer(task);
        System.out.println("produce: " + task.no);
        BUFFER_LOCK.notifyAll();
      }
    }
  }

  public static void main(String[] args) {
    Model model = new WaitNotifyModel(3);
    for (int i = 0; i < 2; i++) {
      new Thread(model.newRunnableConsumer()).start();
    }
    for (int i = 0; i < 5; i++) {
      new Thread(model.newRunnableProducer()).start();
    }
  }
}
複製代碼

驗證方法同上。

樸素的wait && notify機制不那麼靈活,但足夠簡單。synchronized、wait、notifyAll的用法可參考【Java併發編程】之十:使用wait/notify/notifyAll實現線程間通訊的幾點重要說明着重理解喚醒與鎖競爭的區別

實現三:簡單的Lock && Condition

咱們要保證理解wait && notify機制。實現時可使用Object類提供的wait()方法與notifyAll()方法,但更推薦的方式是使用java.util.concurrent包提供的Lock && Condition

public class LockConditionModel1 implements Model {
  private final Lock BUFFER_LOCK = new ReentrantLock();
  private final Condition BUFFER_COND = BUFFER_LOCK.newCondition();
  private final Queue<Task> buffer = new LinkedList<>();
  private final int cap;

  private final AtomicInteger increTaskNo = new AtomicInteger(0);

  public LockConditionModel1(int cap) {
    this.cap = cap;
  }

  @Override
  public Runnable newRunnableConsumer() {
    return new ConsumerImpl();
  }

  @Override
  public Runnable newRunnableProducer() {
    return new ProducerImpl();
  }

  private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable {
    @Override
    public void consume() throws InterruptedException {
      BUFFER_LOCK.lockInterruptibly();
      try {
        while (buffer.size() == 0) {
          BUFFER_COND.await();
        }
        Task task = buffer.poll();
        assert task != null;
        // 固定時間範圍的消費,模擬相對穩定的服務器處理過程
        Thread.sleep(500 + (long) (Math.random() * 500));
        System.out.println("consume: " + task.no);
        BUFFER_COND.signalAll();
      } finally {
        BUFFER_LOCK.unlock();
      }
    }
  }

  private class ProducerImpl extends AbstractProducer implements Producer, Runnable {
    @Override
    public void produce() throws InterruptedException {
      // 不按期生產,模擬隨機的用戶請求
      Thread.sleep((long) (Math.random() * 1000));
      BUFFER_LOCK.lockInterruptibly();
      try {
        while (buffer.size() == cap) {
          BUFFER_COND.await();
        }
        Task task = new Task(increTaskNo.getAndIncrement());
        buffer.offer(task);
        System.out.println("produce: " + task.no);
        BUFFER_COND.signalAll();
      } finally {
        BUFFER_LOCK.unlock();
      }
    }
  }

  public static void main(String[] args) {
    Model model = new LockConditionModel1(3);
    for (int i = 0; i < 2; i++) {
      new Thread(model.newRunnableConsumer()).start();
    }
    for (int i = 0; i < 5; i++) {
      new Thread(model.newRunnableProducer()).start();
    }
  }
}
複製代碼

該寫法的思路與實現二的思路徹底相同,僅僅將鎖與條件變量換成了Lock和Condition。

實現四:更高併發性能的Lock && Condition

如今,若是作一些實驗,你會發現,實現一的併發性能高於實現2、三。暫且不關心BlockingQueue的具體實現,來分析看如何優化實現三(與實現二的思路相同,性能至關)的性能。

分析實現三的瓶頸

最好的查證方法是記錄方法執行時間,這樣能夠直接定位到真正的瓶頸。但此問題較簡單,咱們直接用「瞪眼法」分析。

實現三的併發瓶頸很明顯,由於在鎖 BUFFER_LOCK 看來,任何消費者線程與生產者線程都是同樣的。換句話說,同一時刻,最多隻容許有一個線程(生產者或消費者,二選一)操做緩衝區 buffer。

而實際上,若是緩衝區是一個隊列的話,「生產者將產品入隊」與「消費者將產品出隊」兩個操做之間沒有同步關係,能夠在隊首出隊的同時,在隊尾入隊。理想性能可提高至實現三的兩倍

去掉這個瓶頸

那麼思路就簡單了:須要兩個鎖 CONSUME_LOCKPRODUCE_LOCKCONSUME_LOCK控制消費者線程併發出隊,PRODUCE_LOCK控制生產者線程併發入隊;相應須要兩個條件變量NOT_EMPTYNOT_FULLNOT_EMPTY負責控制消費者線程的狀態(阻塞、運行),NOT_FULL負責控制生產者線程的狀態(阻塞、運行)。以此讓優化消費者與消費者(或生產者與生產者)之間是串行的;消費者與生產者之間是並行的。

public class LockConditionModel2 implements Model {
  private final Lock CONSUME_LOCK = new ReentrantLock();
  private final Condition NOT_EMPTY = CONSUME_LOCK.newCondition();
  private final Lock PRODUCE_LOCK = new ReentrantLock();
  private final Condition NOT_FULL = PRODUCE_LOCK.newCondition();

  private final Buffer<Task> buffer = new Buffer<>();
  private AtomicInteger bufLen = new AtomicInteger(0);

  private final int cap;

  private final AtomicInteger increTaskNo = new AtomicInteger(0);

  public LockConditionModel2(int cap) {
    this.cap = cap;
  }

  @Override
  public Runnable newRunnableConsumer() {
    return new ConsumerImpl();
  }

  @Override
  public Runnable newRunnableProducer() {
    return new ProducerImpl();
  }

  private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable {
    @Override
    public void consume() throws InterruptedException {
      int newBufSize = -1;

      CONSUME_LOCK.lockInterruptibly();
      try {
        while (bufLen.get() == 0) {
          System.out.println("buffer is empty...");
          NOT_EMPTY.await();
        }
        Task task = buffer.poll();
        newBufSize = bufLen.decrementAndGet();
        assert task != null;
        // 固定時間範圍的消費,模擬相對穩定的服務器處理過程
        Thread.sleep(500 + (long) (Math.random() * 500));
        System.out.println("consume: " + task.no);
        if (newBufSize > 0) {
          NOT_EMPTY.signalAll();
        }
      } finally {
        CONSUME_LOCK.unlock();
      }

      if (newBufSize < cap) {
        PRODUCE_LOCK.lockInterruptibly();
        try {
          NOT_FULL.signalAll();
        } finally {
          PRODUCE_LOCK.unlock();
        }
      }
    }
  }

  private class ProducerImpl extends AbstractProducer implements Producer, Runnable {
    @Override
    public void produce() throws InterruptedException {
      // 不按期生產,模擬隨機的用戶請求
      Thread.sleep((long) (Math.random() * 1000));

      int newBufSize = -1;

      PRODUCE_LOCK.lockInterruptibly();
      try {
        while (bufLen.get() == cap) {
          System.out.println("buffer is full...");
          NOT_FULL.await();
        }
        Task task = new Task(increTaskNo.getAndIncrement());
        buffer.offer(task);
        newBufSize = bufLen.incrementAndGet();
        System.out.println("produce: " + task.no);
        if (newBufSize < cap) {
          NOT_FULL.signalAll();
        }
      } finally {
        PRODUCE_LOCK.unlock();
      }

      if (newBufSize > 0) {
        CONSUME_LOCK.lockInterruptibly();
        try {
          NOT_EMPTY.signalAll();
        } finally {
          CONSUME_LOCK.unlock();
        }
      }
    }
  }

  private static class Buffer<E> {
    private Node head;
    private Node tail;

    Buffer() {
      // dummy node
      head = tail = new Node(null);
    }

    public void offer(E e) {
      tail.next = new Node(e);
      tail = tail.next;
    }

    public E poll() {
      head = head.next;
      E e = head.item;
      head.item = null;
      return e;
    }

    private class Node {
      E item;
      Node next;

      Node(E item) {
        this.item = item;
      }
    }
  }

  public static void main(String[] args) {
    Model model = new LockConditionModel2(3);
    for (int i = 0; i < 2; i++) {
      new Thread(model.newRunnableConsumer()).start();
    }
    for (int i = 0; i < 5; i++) {
      new Thread(model.newRunnableProducer()).start();
    }
  }
複製代碼

須要注意的是,因爲須要同時在UnThreadSafe的緩衝區 buffer 上進行消費與生產,咱們不能使用實現2、三中使用的隊列了,須要本身實現一個簡單的緩衝區 Buffer。Buffer要知足如下條件:

  • 在頭部出隊,尾部入隊
  • 在poll()方法中只操做head
  • 在offer()方法中只操做tail

還能進一步優化嗎

咱們已經優化掉了消費者與生產者之間的瓶頸,還能進一步優化嗎?

若是能夠,必然是繼續優化消費者與消費者(或生產者與生產者)之間的併發性能。然而,消費者與消費者之間必須是串行的,所以,併發模型上已經沒有地方能夠繼續優化了。

不過在具體的業務場景中,通常還可以繼續優化。如:

  • 併發規模中等,可考慮使用CAS代替重入鎖
  • 模型上不能優化,但一個消費行爲或許能夠進一步拆解、優化,從而下降消費的延遲
  • 一個隊列的併發性能達到了極限,可採用「多個隊列」(如分佈式消息隊列等)

4種實現的本質

文章開頭說:這4種寫法的本質相同——都是在使用或實現BlockingQueue。實現一直接使用BlockingQueue,實現四實現了簡單的BlockingQueue,而實現2、三則實現了退化版的BlockingQueue(性能下降一半)。

實現一使用的BlockingQueue實現類是LinkedBlockingQueue,給出其源碼閱讀對照,寫的不難:

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
...
/** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();
    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();
    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();
    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
...
    /** * Signals a waiting take. Called only from put/offer (which do not * otherwise ordinarily lock takeLock.) */
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

    /** * Signals a waiting put. Called only from take/poll. */
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

    /** * Links node at end of queue. * * @param node the node */
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

    /** * Removes a node from head of queue. * * @return the node */
    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }
...
    /** * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity. * * @param capacity the capacity of this queue * @throws IllegalArgumentException if {@code capacity} is not greater * than zero */
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
...
    /** * Inserts the specified element at the tail of this queue, waiting if * necessary for space to become available. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }
...
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
...
}
複製代碼

還存在很是多的寫法,如信號量Semaphore,也很常見(本科操做系統教材中的生產者-消費者模型就是用信號量實現的)。不過追究過多了就好像在糾結茴香豆的寫法同樣,本文不繼續探討。

總結

實現一必須掌握,實現四至少要能清楚表述;實現2、三掌握一個便可。


本文連接:Java實現生產者-消費者模型
做者:猴子007
出處:monkeysayhi.github.io
本文基於 知識共享署名-相同方式共享 4.0 國際許可協議發佈,歡迎轉載,演繹或用於商業目的,可是必須保留本文的署名及連接。

相關文章
相關標籤/搜索