實現生產者消費者模式的四種方式(Synchronized、Lock、Semaphore、BlockingQueue)

所謂生產者消費者模式,即N個線程進行生產,同時N個線程進行消費,兩種角色經過內存緩衝區進行通訊
生產着消費者圖解
圖片來源https://www.cnblogs.com/chent...html

下面咱們經過四種方式,來實現生產者消費者模式。java

首先是最原始的synchronized方式api

定義庫存類(即圖中緩存區)緩存

class Stock {
    private String name;
    // 標記庫存是否有內容
    private boolean hasComputer = false;

    public synchronized void putOne(String name) {
        // 若庫存中已有內容,則生產線程阻塞等待
        while (hasComputer) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.name = name;
        System.out.println("生產者...生產了 " + name);
        // 更新標記
        this.hasComputer = true;
        // 這裏用notify的話,假設p0執行完畢,此時c0,c1都在wait, 同時喚醒另外一個provider:p1,
        // p1判斷標記後休眠,形成全部線程都wait的局面,即死鎖;
        // 所以使用notifyAll解決死鎖問題
        this.notifyAll();
    }

    public synchronized void takeOne() {
        // 若庫存中沒有內容,則消費線程阻塞等待生產完畢後繼續
        while (!hasComputer) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("消費者...消費了 " + name);
        this.hasComputer = false;
        this.notifyAll();
    }
}

定義生產者和消費者(爲了節省空間和方便閱讀,這裏將生產者和消費者定義成了匿名內部類)多線程

public static void main(String[] args) {
    // 用於通訊的庫存類
    Stock computer = new Stock();
    // 定義兩個生產者和兩個消費者
    Thread p1 = new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                computer.putOne("Dell");
            }
        }
    });
    Thread p2 = new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                computer.putOne("Mac");
            }
        }
    });
    
    Thread c1 = new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                computer.takeOne();
            }
        }
    });
    Thread c2 = new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                computer.takeOne();
            }
        }
    });
    p1.start();
    p2.start();
    c1.start();
    c2.start();
}

運行結果圖
synchronized方式運行結果圖oracle


第二種方式:Lockide

Jdk1.5以後加入了Lock接口,一個lock對象能夠有多個Condition類,Condition類負責對lock對象進行wait,notify,notifyall操做ui

定義庫存類this

class LockStock {
    final Lock lock = new ReentrantLock();
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    // 加入庫存概念,可批量生產和消費
    // 定義最大庫存爲10
    final String[] stock = new String[10];
    // 寫入標記、讀取標記、已有商品數量
    int putptr, takeptr, count;

    public void put(String computer) {
        // lock代替synchronized
        lock.lock();
        try {
            // 若庫存已滿則生產者線程阻塞
            while (count == stock.length)
                notFull.await();
            // 庫存中加入商品
            stock[putptr] = computer;
            // 庫存已滿,指針置零,方便下次從新寫入
            if (++putptr == stock.length) putptr = 0;
            ++count;
            System.out.println(computer + " 正在生產數據: -- 庫存剩餘:" + count);
            notEmpty.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public String take(String consumerName) {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await();
            // 從庫存中獲取商品
            String computer = stock[takeptr];
            if (++takeptr == stock.length) takeptr = 0;
            --count;
            System.out.println(consumerName + " 正在消費數據:" + computer + " -- 庫存剩餘:" + count);
            notFull.signal();
            return computer;
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

        // 無邏輯做用,放慢速度
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "";
    }
}

以上部分代碼摘自java7 API中Condition接口的官方示例spa

接着仍是定義生產者和消費者

public static void main(String[] args) {
    LockStock computer = new LockStock();
    Thread p1 = new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                computer.put("Dell");
            }
        }
    });
    Thread p2 = new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                computer.put("Mac");
            }
        }
    });

    Thread c1 = new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                computer.take("zhangsan");
            }
        }
    });
    Thread c2 = new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                computer.take("李四");
            }
        }
    });
    // 兩個生產者兩個消費者同時運行
    p1.start();
    p2.start();
    c1.start();
    c2.start();
}

運行結果圖:

Lock方式運行結果圖


第三種方式:Semaphore
首先依舊是庫存類:

class Stock {
    List<String> stock = new LinkedList();
    // 互斥量,控制共享數據的互斥訪問
    private Semaphore mutex = new Semaphore(1);

    // canProduceCount能夠生產的總數量。 經過生產者調用acquire,減小permit數目
    private Semaphore canProduceCount = new Semaphore(10);

    // canConsumerCount能夠消費的數量。經過生產者調用release,增長permit數目
    private Semaphore canConsumerCount = new Semaphore(0);

    public void put(String computer) {
        try {
            // 可生產數量 -1
            canProduceCount.acquire();
            mutex.acquire();
            // 生產一臺電腦
            stock.add(computer);
            System.out.println(computer + " 正在生產數據" + " -- 庫存剩餘:" + stock.size());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 釋放互斥鎖
            mutex.release();
            // 釋放canConsumerCount,增長能夠消費的數量
            canConsumerCount.release();
        }
        // 無邏輯做用,放慢速度
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void get(String consumerName) {
        try {
            // 可消費數量 -1
            canConsumerCount.acquire();
            mutex.acquire();
            // 從庫存消費一臺電腦
            String removedVal = stock.remove(0);
            System.out.println(consumerName + " 正在消費數據:" + removedVal + " -- 庫存剩餘:" + stock.size());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            mutex.release();
            // 消費後釋放canProduceCount,增長能夠生產的數量
            canProduceCount.release();
        }
    }
}

仍是生產消費者:

public class SemaphoreTest {
    public static void main(String[] args) {
        // 用於多線程操做的庫存變量
        final Stock stock = new Stock();
        // 定義兩個生產者和兩個消費者
        Thread dellProducer = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    stock.put("Del");
                }
            }
        });
        Thread macProducer = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    stock.put("Mac");
                }
            }
        });
        Thread consumer1 = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    stock.get("zhangsan");
                }
            }
        });
        Thread consumer2 = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    stock.get("李四");
                }
            }
        });
        dellProducer.start();
        macProducer.start();
        consumer1.start();
        consumer2.start();
    }
}

運行結果圖:

Semaphore運行結果圖


第四種方式:BlockingQueue
BlockingQueue的put和take底層實現其實也是使用了第二種方式中的ReentrantLock+Condition,而且幫咱們實現了庫存隊列,方便簡潔
一、定義生產者

class Producer implements Runnable {
    // 庫存隊列
    private BlockingQueue<String> stock;
    // 生產/消費延遲
    private int timeOut;
    private String name;

    public Producer(BlockingQueue<String> stock, int timeout, String name) {
        this.stock = stock;
        this.timeOut = timeout;
        this.name = name;
    }

    @Override
    public void run() {
        while (true) {
            try {
                stock.put(name);
                System.out.println(name + " 正在生產數據" + " -- 庫存剩餘:" + stock.size());
                TimeUnit.MILLISECONDS.sleep(timeOut);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

二、定義消費者

class Consumer implements Runnable {
    // 庫存隊列
    private BlockingQueue<String> stock;
    private String consumerName;

    public Consumer(BlockingQueue<String> stock, String name) {
        this.stock = stock;
        this.consumerName = name;
    }

    @Override
    public void run() {
        while (true) {
            try {
                // 從庫存消費一臺電腦
                String takeName = stock.take();
                System.out.println(consumerName + " 正在消費數據:" + takeName + " -- 庫存剩餘:" + stock.size());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

三、定義庫存並運行

public static void main(String[] args) {
        // 定義最大庫存爲10
        BlockingQueue<String> stock = new ArrayBlockingQueue<>(10);
        Thread p1 = new Thread(new Producer(stock, 500, "Mac"));
        Thread p2 = new Thread(new Producer(stock, 500, "Dell"));
        Thread c1 = new Thread(new Consumer(stock,"zhangsan"));
        Thread c2 = new Thread(new Consumer(stock, "李四"));

        p1.start();
        p2.start();
        c1.start();
        c2.start();

    }

運行結果圖:
BlockingQueue運行結果圖.png

感謝閱讀~歡迎指正和補充~~~

相關文章
相關標籤/搜索