9、生產者與消費者模式

生產者消費者模式

  • 生產者消費者模式是程序設計中很是常見的一種設計模式,被普遍運用在解耦、消息隊列等場景。java

  • 使用生產者消費者模式一般須要在二者之間增長一個阻塞隊列做爲媒介,有了媒介以後就至關於有了一個緩衝,平衡了二者的能力。編程

  • 總體如上圖所示,最上面是阻塞隊列,右側的 1 是生產者線程,生產者在生產數據後將數據存放在阻塞隊列中,左側的 2 是消費者線程,消費者獲取阻塞隊列中的數據。設計模式

  • 而中間的 3 和 4 分別表明生產者消費者之間互相通訊的過程,由於不管阻塞隊列是滿仍是空均可能會產生阻塞,阻塞以後就須要在合適的時機去喚醒被阻塞的線程。多線程

  • 那麼何時阻塞線程須要被喚醒呢?有兩種狀況。編程語言

  • 第一種狀況是當消費者看到阻塞隊列爲空時,開始進入等待,這時生產者一旦往隊列中放入數據,就會通知全部的消費者,喚醒阻塞的消費者線程。ide

  • 另外一種狀況是若是生產者發現隊列已經滿了,也會被阻塞,而一旦消費者獲取數據以後就至關於隊列空了一個位置,這時消費者就會通知全部正在阻塞的生產者進行生產。this

使用 BlockingQueue 實現生產者消費者模式

import java.util.concurrent.ArrayBlockingQueue;

/**
 * 使用阻塞隊列實現一個生產者與消費者模型
 *
 * @author xiandongxie
 */
public class ProducerAndConsumer {

    private static ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

    public static void main(String[] args) throws InterruptedException {
        Producer producer = new Producer();
        Consumer consumer = new Consumer();
        Thread producer1 = new Thread(producer, "producer-1");
        Thread producer2 = new Thread(producer, "producer-2");
        Thread consumer1 = new Thread(consumer, "consumer-2");
        Thread consumer2 = new Thread(consumer, "consumer-2");

        producer1.start();
        producer2.start();
        consumer1.start();
        consumer2.start();

        Thread.sleep(5);
        producer1.interrupt();
        Thread.sleep(5);
        producer2.interrupt();

        Thread.sleep(5);
        consumer1.interrupt();
        consumer2.interrupt();

    }

    static class Producer implements Runnable {
        @Override
        public void run() {
            int count = 0;
            while (true && !Thread.currentThread().isInterrupted()) {
                count++;
                String message = Thread.currentThread().getName() + " message=" + count;
                try {
                    queue.put(message);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    e.printStackTrace();
                }
            }
        }
    }

    static class Consumer implements Runnable {

        @Override
        public void run() {
            while (true && !Thread.currentThread().isInterrupted()) {
                try {
                    String take = queue.take();
                    System.out.println(Thread.currentThread().getName() + ",消費信息:" + take);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    e.printStackTrace();
                }
            }
        }
    }
}

使用 Condition 實現生產者消費者模式

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 採用 Condition 自定義阻塞隊列實現消費者與生產者
 *
 * @author xiandongxie
 */
public class MyBlockingQueueForCondition<E> {

    private Queue<E> queue;
    private int max = 16;
    private ReentrantLock lock = new ReentrantLock();
    // 沒有空,則消費者能夠消費,標記 消費者
    private Condition notEmpty = lock.newCondition();
    // 沒有滿,則生產者能夠生產,標記 生產者
    private Condition notFull = lock.newCondition();

    public MyBlockingQueueForCondition(int size) {
        this.max = size;
        queue = new LinkedList();
    }

    public void put(E o) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == max) {
                // 若是滿了,阻塞生產者線程,釋放 Lock
                notFull.await();
            }
            queue.add(o);
            // 有數據了,通知等待的消費者,並喚醒
            notEmpty.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == 0) {
                // 若是爲空,阻塞消費者線程
                notEmpty.await();
            }
            E item = queue.remove();
            // queue 未滿,喚醒生產者
            notFull.signalAll();
            return item;
        } finally {
            lock.unlock();
        }
    }

}
  • 這裏須要注意,在 take() 方法中使用 while( queue.size() == 0 ) 檢查隊列狀態,而不能用 if( queue.size() == 0 )。
  • 由於生產者消費者每每是多線程的,假設有兩個消費者,第一個消費者線程獲取數據時,發現隊列爲空,便進入等待狀態;
  • 由於第一個線程在等待時會釋放 Lock 鎖,因此第二個消費者能夠進入並執行 if( queue.size() == 0 ),也發現隊列爲空,因而第二個線程也進入等待;
  • 而此時,若是生產者生產了一個數據,便會喚醒兩個消費者線程,而兩個線程中只有一個線程能夠拿到鎖,並執行 queue.remove 操做,另一個線程由於沒有拿到鎖而卡在被喚醒的地方,而第一個線程執行完操做後會在 finally 中經過 unlock 解鎖,而此時第二個線程即可以拿到被第一個線程釋放的鎖,繼續執行操做,也會去調用 queue.remove 操做,然而這個時候隊列已經爲空了,因此會拋出 NoSuchElementException 異常,這不符合邏輯。
  • 而若是用 while 作檢查,當第一個消費者被喚醒獲得鎖並移除數據以後,第二個線程在執行 remove 前仍會進行 while 檢查,發現此時依然知足 queue.size() == 0 的條件,就會繼續執行 await 方法,避免了獲取的數據爲 null 或拋出異常的狀況。
  • 多線程的代碼大部分都用 while 而不用 if,無論線程在哪被切換中止了,while 的話,線程上次切換判斷結果對下次切換判斷沒有影響,可是if的話,若線程切換前,條件成立過了,可是該線程再次拿到 cpu 使用權的時候,其實條件已經不成立了,因此不該該執行。(本質緣由:就是原子性問題,CPU 嚴重的原子性是針對 CPU 指令的,而不是針對高級編程語言的語句的)。

使用 wait/notify 實現生產者消費者模式

import java.util.LinkedList;

/**
 * 採用 wait,notify 實現阻塞隊列
 *
 * @author xiandongxie
 */
public class MyBlockingQueue<E> {
    private int maxSize;
    private LinkedList<E> storage;

    public MyBlockingQueue(int maxSize) {
        this.maxSize = maxSize;
        storage = new LinkedList<>();
    }

    public synchronized void put(E e) throws InterruptedException {
        try {
            while (storage.size() == maxSize) {
                // 滿了
                wait();
            }
            storage.add(e);
        } finally {
            notifyAll();
        }

    }

    public synchronized E take() throws InterruptedException {
        try {
            while (storage.size() == 0) {
                // 沒有數據
                wait();
            }
            return storage.remove();
        } finally {
            notifyAll();
        }
    }

}
相關文章
相關標籤/搜索