生產者消費者模式簡而言之就是兩種不一樣的線程分別扮演生產者和消費者,經過一個商品容器來生產商品和消費商品。生產者和消費者模式是學習多線程的好例子,下文就以四種不一樣實現的消費者生產者模式來理解多線程的編程。html
如下的例子都共用消費者和生產者對象,而將商品容器(Stock)按照四種形式進行實現。編程
生產者持有商品容器,並實現了Runnable接口,在run方法中無限循環地往商品容器stock中放入商品。數組
public class Producer implements Runnable{ // 商品容器 private Stock stock; public Producer(Stock stock) { this.stock = stock; } @Override public void run() { while (true) { // 隨機生成商品 放入商品容器 stock中 String product = "商品" + System.currentTimeMillis() % 100; System.out.println("生產了" + product); stock.put(product); // 休眠0.5秒 try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } }
消費者持有商品容器,並實現了Runnable接口,無限循環地從商品容器stock中取出商品消費。多線程
public class Consumer implements Runnable { // 商品容器 private Stock stock; public Consumer(Stock stock) { this.stock = stock; } @Override public void run() { while (true) { // 從商品容器中取出商品消費 Object take = stock.take(); System.out.println("消費了" + take); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } }
該接口主要定義了取出商品和放入商品兩個方法供消費者和生產者使用,具體實現由不一樣子類提供。併發
public interface Stock { // 定義了容器的最大容量 public static final int MAX = 10; // 取出商品 String take(); // 放入商品 void put(String good); }
該實現主要由synchronized、await、notify配合使用。 ide
synchronized的語義你們應該都知道,當兩個併發線程訪問同一個對象object中的這個加鎖同步代碼塊時,一個時間內只能有一個線程獲得執行。即同一時間內要麼只有消費者執行take()方法,要麼只有生產者執行put()方法。 學習
只有synchronized保證只有一個線程執行方法還不夠,咱們須要在容器空的時候,須要調用await()讓出鎖進行等待,將執行權交給生產者生產商品,生產者生產完商品後再調用notify()方法通知消費者線程消費商品(有可能喚醒的仍是生產者,若是喚醒的是仍是生產者就繼續生產商品直到容器滿,讓出鎖進行等待。)。反之亦然。ui
public class SynchronizedStock implements Stock { // 使用鏈表放置商品 private LinkedList<String> productList = new LinkedList(); public synchronized String take() { // 進入方法前先判斷數組是否爲空,爲空的話釋放鎖進入阻塞狀態 while (productList.isEmpty()) { try { System.out.println("商品空了"); wait(); } catch (InterruptedException e) { e.printStackTrace(); } } // 取出商品 String product = productList.pop(); // 通知其餘線程,有可能不是喚醒生產者線程 notifyAll(); return product; } public synchronized void put(String good) { // 進入方法前先判斷數組是否已滿,滿的話釋放鎖進入阻塞狀態 while (productList.size() == MAX) { try { System.out.println("商品滿了"); wait(); } catch (InterruptedException e) { e.printStackTrace(); } } // 放入商品 productList.push(good); // 通知全部線程(生產者和消費者都有可能) notifyAll(); } }
有的朋友可能會疑惑爲何要使用while循環判斷容器空或者滿呢?筆者舉個例子,假設咱們用if判斷數組爲空的話?消費者線程A先判斷if條件爲空,並進入了if代碼塊內進行了等待。接下來消費者線程B也判斷if條件爲空,也進入到if代碼塊內進行了等待。這時候生產者線程C生產了一個商品,先喚醒了消費者線程A,A喚醒後從if代碼塊內恢復執行,而後直接消費一個商品(此時容器空)。接下來可能喚醒了消費者線程B,因爲消費者線程B剛纔也進入到了if代碼塊中(不會再判斷一次if容器爲空),此時直接從代碼塊中恢復執行,消費商品時,發現容器中根本沒有商品能夠消費。因此若是條件用while進行判斷的話,在喚醒線程時,依然會判斷容器是否爲空。才能防止出錯。 this
要點:線程
該實現主要由ReentrantLock、以及notEmpty、notFull兩個Condition來一塊兒實現。Condition同樣也是用來阻塞等待線程。那爲何須要兩個Condition呢?讀者能夠看看剛纔的例子,使用notify的時候可能會喚醒生產者和消費者。而兩個Condition的話,咱們能夠在精準的控制喚醒,在消費者中喚醒生產者,在生產者中喚醒消費者。
public class ReentrantLockStock implements Stock { // 使用鏈表來存放商品 private LinkedList<String> productList = new LinkedList(); // 執行take()和put()時須要的鎖 private Lock lock = new ReentrantLock(); // 當調用notEmpty.signal()時,告訴生產者容器沒空能夠取商品 private Condition notEmpty = lock.newCondition(); // 當調用notFull.signal()時,告訴消費者者容器沒滿能夠放入商品 private Condition notFull = lock.newCondition(); @Override public String take() { String good = null; try { // 獲取鎖才能夠執行接下來的方法。 lock.lock(); // 當商品容器空時,notEmpty調用wait阻塞當前線程,表示如今容器空。 while (productList.isEmpty()) { System.out.println("商品空了"); notEmpty.await(); } // 結束等待 獲取商品 good = productList.pop(); // 通知生產者能夠繼續生產商品 notFull.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } // 返回商品 return good; } @Override public void put(String good) { try { // 獲取鎖才能夠執行接下來的方法。 lock.lock(); // 當商品容器滿時,notFull調用wait阻塞當前線程,表示如今容器滿 while (productList.size() == MAX) { System.out.println("商品滿了"); notFull.await(); } // 結束等待時,放入商品 productList.push(good); // 通知消費者能夠繼續消費 notEmpty.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } } }
許多人會將lock放在try catch塊外面,這樣很容易出現死鎖。由於lock鎖和synchronized鎖不同。synchronized鎖會自動釋放鎖。而lock不會自動釋放鎖,必須手工釋放鎖。若是lock放在try catch塊以外的話,持有鎖後卻發生了異常,此時並不會釋放鎖。其餘線程就永遠得不到這個鎖了。
Semaphore是信號量的意思,信號量表明一張票,擁有了這張票你才能進行相應的操做。Semaphore的acquire()方法是阻塞獲取信號量的方法。release()方法是添加信號量。咱們使用只有惟一信號量的lock變量來模擬加鎖解鎖。用10個信號量的notFull變量模擬只可往容器裏添加10個商品。當添加完一個商品後,增長一個notEmpty的信號量,notEmpy有信號量以後才能夠消費商品。
public class SemaphoreStock implements Stock { // 使用鏈表存放商品 private LinkedList<String> goodList = new LinkedList(); // 使用一個信號量模擬鎖(只有一個線程可使用容器) private Semaphore lock = new Semaphore(1); // 使用10個信號量模擬商品容器的最大容量 private Semaphore notFull = new Semaphore(10); private Semaphore notEmpty = new Semaphore(0); @Override public String take() { String good = null; try { // 當notEmpty還有信號量的話 表明容器內有商品 notEmpty.acquire(); // 利用惟一的信號量模擬加鎖 lock.acquire(); // 獲取商品 good = goodList.pop(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 釋放惟一的信號量 lock.release(); // 消費完一個商品,往notFull中添加一個信號量 notFull.release(); } return good; } @Override public void put(String good) { try { // 當notFull還有信號量的話 表明容器還未滿,能夠放入商品 notFull.acquire(); // 利用惟一的信號量模擬加鎖 lock.acquire(); // 放入商品 goodList.push(good); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 釋放惟一的信號量 lock.release(); // 生產完一個商品,往notEmpty中添加一個信號量 notEmpty.release(); } } }
使用信號量控制消費者和生產者協調時,不能先lock.acquire(),再notNull.acquire()。由於當lock.acquire()先獲得信號量時,接着執行notNull.acquire()發現沒有信號量,就阻塞等待而且沒有釋放剛纔lock的信號量。致使程序進入死鎖。因此必定要先獲取生產或者消費的信號量,再使用lock的信號量。
咱們直接使用ArrayBlockingQueue同步隊列做爲商品容器。該同步隊列其實底層也是調用ReentrantLock進行實現的。
public class BlockingQueueStock implements Stock { // 使用固定容量的arrayBlockingQueue同步隊列放置商品 private ArrayBlockingQueue<String> goods = new ArrayBlockingQueue<String>(10); @Override public String take() { String good = null; // 調用take阻塞獲取商品 try { good = goods.take(); } catch (InterruptedException e) { e.printStackTrace(); } return good; } @Override public void put(String good) { try { goods.put(good); } catch (InterruptedException e) { e.printStackTrace(); } } }
ArrayBlockingQueue主要有以下方法:
add、offer、put都是放入元素。
remove、poll、take都是移除元素。
element、peek是獲取頭元素,但不移除。
切記:put和take阻塞。
它們有不一樣形式