生產者消費者之Java簡單實現

爲何要使用生產者和消費者模式

  • 在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。
  • 在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

什麼是生產者消費者模式

  • 生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。
  • 這個阻塞隊列就是用來給生產者和消費者解耦的。縱觀大多數設計模式,都會找一個第三者出來進行解耦,如工廠模式的第三者是工廠類,模板模式的第三者是模板類。

代碼實現(多生產者 和 多消費者)

  • QueueBuffer : 實現阻塞隊列,將生產者和消費者解耦。它底層是一個數組,構造的時候指定數組的大小。因爲實現的時多生產這和多消費者的模型,因此注意一下 put 和 get 中對阻塞條件的描述用的是while循環,這是爲了生產者之間或者消費者之間他們的內部競爭所形成的數組越界異常。

clipboard.png

package concurrency;

public class QueueBuffer {
    private final int SIZE;
    private int count = 0;
    private int[] buffer;
    public QueueBuffer(int size){
        this.SIZE = size;
        buffer = new int[SIZE];
    }

    public int getSIZE(){
        return SIZE;
    }

    public synchronized void put(int value){
        while (count == SIZE){ //buffer已經滿了 等待get   ,用while使用於多個生產者的狀況
            try {
                wait();
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }

        notifyAll(); //說明buffer中有元素 能夠取
        buffer[count++] = value;
        System.out.println("Put "+value+" current size = "+count);
    }

    public synchronized int get(){
        while(count == 0){//用while使用於多個消費者的狀況。
            try {
                wait();//buffer爲空,須要等到put進元素
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
//        notify() 只是去通知其餘的線程,可是synchronized 方法裏面的代碼仍是會執行完畢的。
//        synchronized方法原本就加了鎖。代碼的執行跟你的notify()也無關,代碼的執行是跟你的
//        synchronized綁定一塊兒而已。

        notifyAll(); //說明剛剛從buffer中取出了元素 有空位能夠加進新的元素
        int result = buffer[--count];
        System.out.println("Get "+result+" current size = "+count);
        return result;
    }
}

class Test{
    public static void main(String[] args){
        QueueBuffer q = new QueueBuffer(10);
        new Producer(q);
        new Producer(q);
        new Producer(q);
        new Consumer(q);
        new Consumer(q);
        new Consumer(q);
        System.out.println("Press Control-C to stop.");
    }
}

Producerjava

package concurrency;

import java.util.Random;

public class Producer implements Runnable {

    Random rand = new Random(47);

    private QueueBuffer q;

    Producer(QueueBuffer q) {
        this.q = q;
        new Thread(this, "Producer").start();
    }

    public void run() {
        while (true) {
            q.put(rand.nextInt(q.getSIZE()));
            Thread.yield();
        }
    }
}

Consumer設計模式

package concurrency;

public class Consumer implements Runnable {
    private QueueBuffer q;

    Consumer(QueueBuffer q) {
        this.q = q;
        new Thread(this, "Consumer").start();
    }

    public void run() {
        while (true){
            q.get();
            Thread.yield();
        }
    }
}

注意事項

  • 調用obj的wait(), notify()方法前,必須得到obj對象的鎖,也就是必須寫在synchronized(obj) {…} 代碼段內。
  • 調用obj.wait()後,線程A就釋放了obj的鎖,不然線程B沒法得到obj鎖,也就沒法在synchronized(obj) {…} 代碼段內喚醒A。
  • 當obj.wait()方法返回後,線程A須要再次得到obj鎖,才能繼續執行。
  • 若是A1,A2,A3都在obj.wait(),則B調用obj.notify()只能喚醒A1,A2,A3中的一個(具體哪個由JVM決定)。
  • obj.notifyAll()則能所有喚醒A1,A2,A3,可是要繼續執行obj.wait()的下一條語句,必須得到obj鎖,所以,A1,A2,A3只有一個有機會得到鎖繼續執行,例如A1,其他的須要等待A1釋放obj鎖以後才能繼續執行。
  • 當B調用obj.notify/notifyAll的時候,B正持有obj鎖,所以,A1,A2,A3雖被喚醒,可是仍沒法得到obj鎖。直到B退出synchronized塊,釋放obj鎖後,A1,A2,A3中的一個纔有機會得到鎖繼續執行。這一點很重要,並非調用 notify 或者 notifyAll 以後立刻釋放鎖,而是執行完相應的synchronized代碼段。
相關文章
相關標籤/搜索