Java併發編程(4)--生產者與消費者模式介紹

1、前言

  這種模式在生活是最多見的,那麼它的場景是什麼樣的呢? 下面是我假象的,假設有一個倉庫,倉庫有一個生產者和一個消費者,消費者過來消費的時候會檢測倉庫中是否有庫存,若是沒有了則等待生產,若是有就先消費直至消費完成;而生產者天天的工做就是先檢測倉庫是否有庫存,若是沒有就開始生產,滿倉了就中止生產等待消費,直至工做結束。下圖是根據假象畫的流程圖:java

  那麼在程序中怎麼才能達到這樣的效果呢?下面介紹三種方式實現。ide

2、使用notify() 和 wait()實現

  相信你們這兩個方法都不陌生,它是Object類中的兩個方法,具體請看源碼中的解釋。提醒一點就是使用notify()和wait()方法時必須擁有對象鎖this

  根據上面假象我這定義一下明確場景:倉庫庫存有個最大值,若是倉庫庫存已經達到最大值那麼就中止生產,小於就須要生產; 若是庫存等於0則須要等待生產中止消費。另外生產者有個生產目標,當它生產了目標數後就結束生產;消費者也是,當消費必定的數據後就結束消費,不然等待消費。spa

  見下面代碼:線程

package com.yuanfy.jmm.threads;

import com.yuanfy.util.SleepUtils;

import java.util.concurrent.TimeUnit;

public class Factory {
    // 當前庫存大小
    private int size;
    // 庫存容量(最大庫存值)
    private int capacity;

    public Factory(int capacity) {
        this.capacity = capacity;
    }

    public synchronized void produce(int num) {
        try {
            System.out.println("+++++生產者【" + Thread.currentThread().getName()
                    + "】, 他的任務是生產" + num + "件產品.");
            // 當生產完成就中止
            while (num > 0) {
                // 若是當前庫存大小大於或等於庫存容量值了,則中止生產等待消費。
                if (size >= capacity) {
                    System.out.println("+++++" + Thread.currentThread().getName() +
                            "檢測庫存已滿,中止生產等待消費...");
                    // 等待消費
 wait();
                    System.out.println("+++++" + Thread.currentThread().getName() + "開始生產...");
                }
                // 不然繼續生產
                int inc = (num + size) > capacity ? (capacity - size) : num;
                num -= inc;
                size += inc;
                SleepUtils.second(1);
                System.out.println("+++++" + Thread.currentThread().getName() + " 生產了" + inc + "件,當前庫存有" + size + "件.");
                // 生產後喚醒消費者
 notify();
            }
            System.out.println("+++++生產者【" + Thread.currentThread().getName()
                    + "】 生產結束.");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public synchronized void consume(int num) {
        try {
            System.out.println("-----消費者【" + Thread.currentThread().getName()
                    + "】, 他須要消費" + num + "件產品.");
            // 當消費完成則中止
            while (num > 0) {
                // 若是當前庫存大小小於等於0,則中止消費等待生產。
                if (size <= 0) {
                    System.out.println("-----" + Thread.currentThread().getName() + " 檢測庫存已空,中止消費等待生產...");
                    // 等待生產
 wait();
                    System.out.println("-----" + Thread.currentThread().getName() + " 開始消費...");
                }
                // 不然繼續消費
                int dec = (size - num) > 0 ? num : size;
                num -= dec;
                size -= dec;
                SleepUtils.second(1);
                System.out.println("-----" + Thread.currentThread().getName() + " 消費了" + dec + "件,當前有" + size + "件.");
                // 消費後喚醒生產者繼續生產
 notify();
            }
            System.out.println("-----消費者【" + Thread.currentThread().getName()
                    + "】 消費結束.");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  上面是工廠(倉庫)類,主要包含兩個任務一個是生產一個是消費,接下來建立兩個線程去調用它,以下:code

package com.yuanfy.jmm.threads;

/**
 * 生產線程
 */
class Produce {
    private Factory factory;

    public Produce(Factory factory) {
        this.factory = factory;
    }

    public void produce(String name, final int num) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                factory.produce(num);
            }
        }, name).start();
    }
}
/**
 * 消費線程
 */
class Consume {
    private Factory factory;

    public Consume(Factory factory) {
        this.factory = factory;
    }

    public void consume(String name, final int num) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                factory.consume(num);
            }
        }, name).start();
    }
}

public class ProduceConsumeDemo {

    public static void main(String[] args) {
        Factory f = new Factory(500); 
        Consume consume = new Consume(f);
        consume.consume("消費線程",600);

        Produce produce = new Produce(f);
        produce.produce("生產線程",800);
    }
}

  注意上方,消費線程和生產線程都是擁有同一個工廠對象,而後進行生產和消費模式。那麼咱們直接運行,結果以下:對象

  

 3、使用鎖中的Condition對象進行控制

  這種方式估計用的比較少,由於使用Condition必須先使用鎖Lock。這裏我只介紹怎麼用Condition對象進行控制實現生產者與消費者模式的實現。blog

  其實它跟上面那種方法有點相似,Condition對象中await()方法表示等待,signal()方法表示喚醒(看了AQS源碼的應該都知道有這個對象且瞭解過這兩個方法)。下面看下具體怎麼實現:隊列

public class Factory {
    // 當前大小
    private int size;

    // 總容量
    private int capacity;

    private Lock lock;

    // 已滿的條件
    private Condition fullCondition;

    // 已空的條件
    private Condition emptyCondition;

    public Factory(int capacity) {
        this.capacity = capacity;
        lock = new ReentrantLock();
        fullCondition = lock.newCondition();
        emptyCondition = lock.newCondition();
    }

    public void produce(int no) {
        lock.lock();
        try {
            while (no > 0) {
                while (size >= capacity) {
                    System.out.println(Thread.currentThread().getName() + " 報告倉庫已滿,等待快遞員取件...");
                    fullCondition.await();
                    System.out.println(Thread.currentThread().getName() + " 報告開始進貨...");
                }
                int inc = (no + size) > capacity ? (capacity - size) : no;
                no -= inc;
                size += inc;
                System.out.println(Thread.currentThread().getName() +
                        " 報告進貨了: " + inc + "件, 當前庫存數: " + size);
                emptyCondition.signal();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void consume(int no) {
        lock.lock();
        try {
            while (no > 0) {
                while (size <= 0) {
                    System.out.println(Thread.currentThread().getName() + " 報告倉庫已空,等待倉庫管理員進貨");
                    emptyCondition.await();
                    System.out.println(Thread.currentThread().getName() + " 報告開始取件...");
                }
                int dec = (size - no) > 0 ? no : size;
                no -= dec;
                size -= dec;
                System.out.println(Thread.currentThread().getName() +
                        " 報告取件: " + dec + ", 當前庫存數: " + size);
                fullCondition.signal();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

  看了上面工廠類的代碼後是否是跟使用Object中wait()和notify()方法相似呢。 主要區別就是擁有對象的方式不同,這裏使用的lock進行且須要手動釋放,而第一種是須要Synchronized進行控制。ci

4、使用阻塞隊列進行實現

  這個就很簡單了,它已經封裝好等待和喚醒的操做,因此不進行案例分享了。其中涉及到兩個重要方法put() 和 take

相關文章
相關標籤/搜索