所謂生產者消費者模式,即N個線程進行生產,同時N個線程進行消費,兩種角色經過內存緩衝區進行通訊
圖片來源https://www.cnblogs.com/chent...html
下面咱們經過四種方式,來實現生產者消費者模式。java
首先是最原始的synchronized
方式api
定義庫存類(即圖中緩存區)緩存
class Stock { private String name; // 標記庫存是否有內容 private boolean hasComputer = false; public synchronized void putOne(String name) { // 若庫存中已有內容,則生產線程阻塞等待 while (hasComputer) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } this.name = name; System.out.println("生產者...生產了 " + name); // 更新標記 this.hasComputer = true; // 這裏用notify的話,假設p0執行完畢,此時c0,c1都在wait, 同時喚醒另外一個provider:p1, // p1判斷標記後休眠,形成全部線程都wait的局面,即死鎖; // 所以使用notifyAll解決死鎖問題 this.notifyAll(); } public synchronized void takeOne() { // 若庫存中沒有內容,則消費線程阻塞等待生產完畢後繼續 while (!hasComputer) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("消費者...消費了 " + name); this.hasComputer = false; this.notifyAll(); } }
定義生產者和消費者(爲了節省空間和方便閱讀,這裏將生產者和消費者定義成了匿名內部類)多線程
public static void main(String[] args) { // 用於通訊的庫存類 Stock computer = new Stock(); // 定義兩個生產者和兩個消費者 Thread p1 = new Thread(new Runnable() { @Override public void run() { while (true) { computer.putOne("Dell"); } } }); Thread p2 = new Thread(new Runnable() { @Override public void run() { while (true) { computer.putOne("Mac"); } } }); Thread c1 = new Thread(new Runnable() { @Override public void run() { while (true) { computer.takeOne(); } } }); Thread c2 = new Thread(new Runnable() { @Override public void run() { while (true) { computer.takeOne(); } } }); p1.start(); p2.start(); c1.start(); c2.start(); }
運行結果圖
oracle
第二種方式:Lock
ide
Jdk1.5以後加入了Lock接口,一個lock對象能夠有多個Condition類
,Condition類負責對lock對象進行wait,notify,notifyall
操做ui
定義庫存類this
class LockStock { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); // 加入庫存概念,可批量生產和消費 // 定義最大庫存爲10 final String[] stock = new String[10]; // 寫入標記、讀取標記、已有商品數量 int putptr, takeptr, count; public void put(String computer) { // lock代替synchronized lock.lock(); try { // 若庫存已滿則生產者線程阻塞 while (count == stock.length) notFull.await(); // 庫存中加入商品 stock[putptr] = computer; // 庫存已滿,指針置零,方便下次從新寫入 if (++putptr == stock.length) putptr = 0; ++count; System.out.println(computer + " 正在生產數據: -- 庫存剩餘:" + count); notEmpty.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } public String take(String consumerName) { lock.lock(); try { while (count == 0) notEmpty.await(); // 從庫存中獲取商品 String computer = stock[takeptr]; if (++takeptr == stock.length) takeptr = 0; --count; System.out.println(consumerName + " 正在消費數據:" + computer + " -- 庫存剩餘:" + count); notFull.signal(); return computer; } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } // 無邏輯做用,放慢速度 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return ""; } }
以上部分代碼摘自java7 API中Condition接口的官方示例spa
接着仍是定義生產者和消費者
public static void main(String[] args) { LockStock computer = new LockStock(); Thread p1 = new Thread(new Runnable() { @Override public void run() { while (true) { computer.put("Dell"); } } }); Thread p2 = new Thread(new Runnable() { @Override public void run() { while (true) { computer.put("Mac"); } } }); Thread c1 = new Thread(new Runnable() { @Override public void run() { while (true) { computer.take("zhangsan"); } } }); Thread c2 = new Thread(new Runnable() { @Override public void run() { while (true) { computer.take("李四"); } } }); // 兩個生產者兩個消費者同時運行 p1.start(); p2.start(); c1.start(); c2.start(); }
運行結果圖:
第三種方式:Semaphore
首先依舊是庫存類:
class Stock { List<String> stock = new LinkedList(); // 互斥量,控制共享數據的互斥訪問 private Semaphore mutex = new Semaphore(1); // canProduceCount能夠生產的總數量。 經過生產者調用acquire,減小permit數目 private Semaphore canProduceCount = new Semaphore(10); // canConsumerCount能夠消費的數量。經過生產者調用release,增長permit數目 private Semaphore canConsumerCount = new Semaphore(0); public void put(String computer) { try { // 可生產數量 -1 canProduceCount.acquire(); mutex.acquire(); // 生產一臺電腦 stock.add(computer); System.out.println(computer + " 正在生產數據" + " -- 庫存剩餘:" + stock.size()); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 釋放互斥鎖 mutex.release(); // 釋放canConsumerCount,增長能夠消費的數量 canConsumerCount.release(); } // 無邏輯做用,放慢速度 try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } public void get(String consumerName) { try { // 可消費數量 -1 canConsumerCount.acquire(); mutex.acquire(); // 從庫存消費一臺電腦 String removedVal = stock.remove(0); System.out.println(consumerName + " 正在消費數據:" + removedVal + " -- 庫存剩餘:" + stock.size()); } catch (InterruptedException e) { e.printStackTrace(); } finally { mutex.release(); // 消費後釋放canProduceCount,增長能夠生產的數量 canProduceCount.release(); } } }
仍是生產消費者:
public class SemaphoreTest { public static void main(String[] args) { // 用於多線程操做的庫存變量 final Stock stock = new Stock(); // 定義兩個生產者和兩個消費者 Thread dellProducer = new Thread(new Runnable() { @Override public void run() { while (true) { stock.put("Del"); } } }); Thread macProducer = new Thread(new Runnable() { @Override public void run() { while (true) { stock.put("Mac"); } } }); Thread consumer1 = new Thread(new Runnable() { @Override public void run() { while (true) { stock.get("zhangsan"); } } }); Thread consumer2 = new Thread(new Runnable() { @Override public void run() { while (true) { stock.get("李四"); } } }); dellProducer.start(); macProducer.start(); consumer1.start(); consumer2.start(); } }
運行結果圖:
第四種方式:BlockingQueue
BlockingQueue的put和take底層實現其實也是使用了第二種方式中的ReentrantLock
+Condition
,而且幫咱們實現了庫存隊列,方便簡潔
一、定義生產者
class Producer implements Runnable { // 庫存隊列 private BlockingQueue<String> stock; // 生產/消費延遲 private int timeOut; private String name; public Producer(BlockingQueue<String> stock, int timeout, String name) { this.stock = stock; this.timeOut = timeout; this.name = name; } @Override public void run() { while (true) { try { stock.put(name); System.out.println(name + " 正在生產數據" + " -- 庫存剩餘:" + stock.size()); TimeUnit.MILLISECONDS.sleep(timeOut); } catch (InterruptedException e) { e.printStackTrace(); } } } }
二、定義消費者
class Consumer implements Runnable { // 庫存隊列 private BlockingQueue<String> stock; private String consumerName; public Consumer(BlockingQueue<String> stock, String name) { this.stock = stock; this.consumerName = name; } @Override public void run() { while (true) { try { // 從庫存消費一臺電腦 String takeName = stock.take(); System.out.println(consumerName + " 正在消費數據:" + takeName + " -- 庫存剩餘:" + stock.size()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
三、定義庫存並運行
public static void main(String[] args) { // 定義最大庫存爲10 BlockingQueue<String> stock = new ArrayBlockingQueue<>(10); Thread p1 = new Thread(new Producer(stock, 500, "Mac")); Thread p2 = new Thread(new Producer(stock, 500, "Dell")); Thread c1 = new Thread(new Consumer(stock,"zhangsan")); Thread c2 = new Thread(new Consumer(stock, "李四")); p1.start(); p2.start(); c1.start(); c2.start(); }
運行結果圖:
感謝閱讀~歡迎指正和補充~~~