Java多線程14:生產者/消費者模型

什麼是生產者/消費者模型併發

一種重要的模型,基於等待/通知機制。生產者/消費者模型描述的是有一塊緩衝區做爲倉庫,生產者可將產品放入倉庫,消費者能夠從倉庫中取出產品,生產者/消費者模型關注的是如下幾個點:函數

  • 生產者生產的時候消費者不能消費
  • 消費者消費的時候生產者不能生產
  • 緩衝區空時消費者不能消費
  • 緩衝區滿時生產者不能生產

生產者/模型做爲一種重要的模型,它的優勢在於:this

  • 解耦。由於多了一個緩衝區,因此生產者和消費者並不直接相互調用,這一點很容易想到,這樣生產者和消費者的代碼發生變化,都不會對對方產生影響,這樣其實就把生產者和消費者之間的強耦合解開,變爲了生產者和緩衝區/消費者和緩衝區之間的弱耦合
  • 經過平衡生產者和消費者的處理能力來提升總體處理數據的速度,這是生產者/消費者模型最重要的一個優勢。若是消費者直接從生產者這裏拿數據,若是生產者生產的速度很慢,但消費者消費的速度很快,那消費者就得佔用CPU的時間片白白等在那邊。有了生產者/消費者模型,生產者和消費者就是兩個獨立的併發體,生產者把生產出來的數據往緩衝區一丟就行了,沒必要管消費者;消費者也是,從緩衝區去拿數據就行了,也沒必要管生產者,緩衝區滿了就不生產,緩衝區空了就不消費,使生產者/消費者的處理能力達到一個動態的平衡

 

利用wait()/notify()實現生產者/消費者模型spa

既然生產者/消費者模型有一個緩衝區,那麼咱們就本身作一個緩衝區,生產者和消費者的通訊都是經過這個緩衝區的。value爲""表示緩衝區空,value不爲""表示緩衝區滿:線程

public class ValueObject
{
    public static String value = "";
}

接下來就是一個生產者了,若是緩衝區滿了的,那麼就wait(),再也不生產了,等待消費者消費完通知;若是緩衝區是空的,那麼就生產數據到緩衝區中code

public class Producer
{
    private Object lock;
    
    public Producer(Object lock)
    {
        this.lock = lock;
    }
    
    public void setValue()
    {
        try
        {
            synchronized (lock)
            {
                if (!ValueObject.value.equals(""))
                    lock.wait();
                String value = System.currentTimeMillis() + "_" + System.nanoTime();
                System.out.println("Set的值是:" + value);
                ValueObject.value = value;
                lock.notify();
            }
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }
}

消費者相似,若是緩衝區是空的,那麼就再也不消費,wait()等待,等待生產者生產完通知;若是緩衝區不是空的,那麼就去拿數據:blog

public class Customer
{
    private Object lock;
    
    public Customer(Object lock)
    {
        this.lock = lock;
    }
    
    public void getValue()
    {
        try
        {
            synchronized (lock)
            {
                if (ValueObject.value.equals(""))
                    lock.wait();
                System.out.println("Get的值是:" + ValueObject.value);
                ValueObject.value = "";
                lock.notify();
            }
        } 
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }
}

寫個主函數,開兩個線程調用Producer裏面的getValue()方法和Customer()裏面的setValue()方法:get

public static void main(String[] args)
{
    Object lock = new Object();
    final Producer producer = new Producer(lock);
    final Customer customer = new Customer(lock);
    Runnable producerRunnable = new Runnable()
    {
        public void run()
        {
            while (true)
            {
                producer.setValue();
            }
        }
    };
    Runnable customerRunnable = new Runnable()
    {
        public void run()
        {
            while (true)
            {
                customer.getValue();
            }
        }
    };
    Thread producerThread = new Thread(producerRunnable);
    Thread CustomerThread = new Thread(customerRunnable);
    producerThread.start();
    CustomerThread.start();
}

看一下運行結果:產品

...
Set的值是:1444025677743_162366875965845
Get的值是:1444025677743_162366875965845
Set的值是:1444025677743_162366875983541
Get的值是:1444025677743_162366875983541
Set的值是:1444025677743_162366876004776
Get的值是:1444025677743_162366876004776
...

生產數據和消費數據必定是成對出現的,生產一個消費一個,滿了不生產,空了不消費,生產者不能無限生產,消費者也不能無限消費,符合生產者/消費者模型。生產者速度快,就不佔用CPU時間片,等着消費者消費完通知它繼續生產,這塊時間片能夠用來給其餘線程用。it

 

利用await()/signal()實現生產者和消費者模型

同樣,先定義一個緩衝區:

public class ValueObject
{
    public static String value = "";
}

換種寫法,生產和消費方法放在一個類裏面:

public class ThreadDomain41 extends ReentrantLock
{
    private Condition condition = newCondition();
    
    public void set()
    {
        try
        {
            lock();
            while (!"".equals(ValueObject.value))
                condition.await();
            ValueObject.value = "123";
            System.out.println(Thread.currentThread().getName() + "生產了value, value的當前值是" + ValueObject.value);
            condition.signal();
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        finally
        {
            unlock();
        }
    }
    
    public void get()
    {
        try
        {
            lock();
            while ("".equals(ValueObject.value))
                condition.await();
            ValueObject.value = "";
            System.out.println(Thread.currentThread().getName() + "消費了value, value的當前值是" + ValueObject.value);
            condition.signal();
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        finally
        {
            unlock();
        }
    }
}

一樣的,開兩個線程,一個線程調用set()方法生產,另外一個線程調用get()方法消費:

public static void main(String[] args)
{
    final ThreadDomain41 td = new ThreadDomain41();
    Runnable producerRunnable = new Runnable()
    {
        public void run()
        {
            for (int i = 0; i < Integer.MAX_VALUE; i++)
                td.set();
        }
    };
    Runnable customerRunnable = new Runnable()
    {
        public void run()
        {
            for (int i = 0; i < Integer.MAX_VALUE; i++)
                td.get();
        }
    };
    Thread ProducerThread = new Thread(producerRunnable);
    ProducerThread.setName("Producer");
    Thread ConsumerThread = new Thread(customerRunnable);
    ConsumerThread.setName("Consumer");
    ProducerThread.start();
    ConsumerThread.start();
}

看一下運行結果:

...
Producer生產了value, value的當前值是123
Consumer消費了value, value的當前值是
Producer生產了value, value的當前值是123
Consumer消費了value, value的當前值是
Producer生產了value, value的當前值是123
Consumer消費了value, value的當前值是
...

和wait()/notify()機制的實現效果同樣,一樣符合生產者/消費者模型

 

當心假死

生產者/消費者模型最終達到的目的是平衡生產者和消費者的處理能力,達到這個目的的過程當中,並不要求只有一個生產者和一個消費者。能夠多個生產者對應多個消費者,能夠一個生產者對應一個消費者,能夠多個生產者對應一個消費者。

假死就發生在上面三種場景下。理論分析就能說明問題,因此就不寫代碼了。代碼要寫也很簡單,上面的兩個例子隨便修改一個,開一個生產者線程/多個消費者線程、開多個生產者線程/消費者線程、開多個生產者線程/多個消費者線程均可以。假死指的是所有線程都進入了WAITING狀態,那麼程序就再也不執行任何業務功能了,整個項目呈現停滯狀態。

比方說有生產者A和生產者B,緩衝區因爲空了,消費者處於WAITING。生產者B處於WAITING,生產者A被消費者通知生產,生產者A生產出來的產品本應該通知消費者,結果通知了生產者B,生產者B被喚醒,發現緩衝區滿了,因而繼續WAITING。至此,兩個生產者線程處於WAITING,消費者處於WAITING,系統假死。

上面的分析能夠看出,假死出現的緣由是由於notify的是同類,因此非單生產者/單消費者的場景,能夠採起兩種方法解決這個問題:

一、synchronized用notifyAll()喚醒全部線程、ReentrantLock用signalAll()喚醒全部線程

二、用ReentrantLock定義兩個Condition,一個表示生產者的Condition,一個表示消費者的Condition,喚醒的時候調用相應的Condition的signal()方法就能夠了

相關文章
相關標籤/搜索