文源網絡,僅供學習之用,若有侵權請聯繫刪除。
生產者-消費者模式是一個十分經典的多線程併發協做的模式,弄懂生產者-消費者問題可以讓咱們對併發編程的理解加深。java
所謂生產者-消費者問題,實際上主要是包含了兩類線程,一種是生產者線程用於生產數據,另外一種是消費者線程用於消費數據。面試
爲了解耦生產者和消費者的關係,一般會採用共享的數據區域,就像是一個倉庫,生產者生產數據以後直接放置在共享數據區中,並不須要關心消費者的行爲;而消費者只須要從共享數據區中去獲取數據,就再也不須要關心生產者的行爲。spring
可是,這個共享數據區域中應該具有這樣的線程間併發協做的功能:編程
在實現生產者消費者問題時,能夠採用三種方式:設計模式
一、使用Object的wait/notify的消息通知機制;網絡
二、使用Lock的Condition的await/signall的消息通知機制;數據結構
三、使用BlockingQueue實現。多線程
本文主要將這三種實現方式進行總結概括。併發
一、預備知識框架
Java 中,能夠經過配合調用 Object 對象的 wait() 方法和 notify()方法或 notifyAll() 方法來實現線程間的通訊。在線程中調用 wait() 方法,將阻塞當前線程,直至等到其餘線程調用了調用 notify() 方法或 notifyAll() 方法進行通知以後,當前線程才能從wait()方法出返回,繼續執行下面的操做。
wait該方法用來將當前線程置入休眠狀態,直到接到通知或被中斷爲止。在調用 wait()以前,線程必需要得到該對象的對象監視器鎖,即只能在同步方法或同步塊中調用 wait()方法。
調用wait()方法以後,當前線程會釋放鎖。若是調用wait()方法時,線程並未獲取到鎖的話,則會拋出IllegalMonitorStateException異常,這是以個RuntimeException。若是再次獲取到鎖的話,當前線程才能從wait()方法處成功返回。
notify該方法也要在同步方法或同步塊中調用,即在調用前,線程也必需要得到該對象的對象級別鎖,若是調用 notify()時沒有持有適當的鎖,也會拋出 IllegalMonitorStateException。
該方法任意從WAITTING狀態的線程中挑選一個進行通知,使得調用wait()方法的線程從等待隊列移入到同步隊列中,等待有機會再一次獲取到鎖,從而使得調用wait()方法的線程可以從wait()方法處退出。
調用notify後,當前線程不會立刻釋放該對象鎖,要等到程序退出同步塊後,當前線程纔會釋放鎖。
notifyAll該方法與 notify ()方法的工做方式相同,重要的一點差別是:notifyAll 使全部原來在該對象上 wait 的線程通通退出WAITTING狀態,使得他們所有從等待隊列中移入到同步隊列中去,等待下一次可以有機會獲取到對象監視器鎖。
二、wait/notify消息通知潛在的一些問題
1)notify早期通知
notify 通知的遺漏很容易理解,即 threadA 還沒開始 wait 的時候,threadB 已經 notify 了,這樣,threadB 通知是沒有任何響應的,當 threadB 退出 synchronized 代碼塊後,threadA 再開始 wait,便會一直阻塞等待,直到被別的線程打斷。
好比在下面的示例代碼中,就模擬出notify早期通知帶來的問題:
public class EarlyNotify { private static String lockObject = ""; public static void main(String[] args) { WaitThread waitThread = new WaitThread(lockObject); NotifyThread notifyThread = new NotifyThread(lockObject); notifyThread.start(); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } waitThread.start(); } static class WaitThread extends Thread { private String lock; public WaitThread(String lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { try { System.out.println(Thread.currentThread().getName() + " 進去代碼塊"); System.out.println(Thread.currentThread().getName() + " 開始wait"); lock.wait(); System.out.println(Thread.currentThread().getName() + " 結束wait"); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class NotifyThread extends Thread { private String lock; public NotifyThread(String lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { System.out.println(Thread.currentThread().getName() + " 進去代碼塊"); System.out.println(Thread.currentThread().getName() + " 開始notify"); lock.notify(); System.out.println(Thread.currentThread().getName() + " 結束開始notify"); } } } }
示例中開啓了兩個線程,一個是WaitThread,另外一個是NotifyThread。NotifyThread會先啓動,先調用notify方法。
而後WaitThread線程才啓動,調用wait方法,可是因爲通知過了,wait方法就沒法再獲取到相應的通知,所以WaitThread會一直在wait方法出阻塞,這種現象就是通知過早的現象。
針對這種現象,解決方法,通常是添加一個狀態標誌,讓waitThread調用wait方法前先判斷狀態是否已經改變了沒,若是通知早已發出的話,WaitThread就再也不去wait。
對上面的代碼進行更正:
public class EarlyNotify { private static String lockObject = ""; private static boolean isWait = true; public static void main(String[] args) { WaitThread waitThread = new WaitThread(lockObject); NotifyThread notifyThread = new NotifyThread(lockObject); notifyThread.start(); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } waitThread.start(); } static class WaitThread extends Thread { private String lock; public WaitThread(String lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { try { while (isWait) { System.out.println(Thread.currentThread().getName() + " 進去代碼塊"); System.out.println(Thread.currentThread().getName() + " 開始wait"); lock.wait(); System.out.println(Thread.currentThread().getName() + " 結束wait"); } } catch (InterruptedException e) { e.printStackTrace(); } } } } static class NotifyThread extends Thread { private String lock; public NotifyThread(String lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { System.out.println(Thread.currentThread().getName() + " 進去代碼塊"); System.out.println(Thread.currentThread().getName() + " 開始notify"); lock.notifyAll(); isWait = false; System.out.println(Thread.currentThread().getName() + " 結束開始notify"); } } } }
這段代碼只是增長了一個isWait狀態變量,NotifyThread調用notify方法後會對狀態變量進行更新,在WaitThread中調用wait方法以前會先對狀態變量進行判斷,在該示例中,調用notify後將狀態變量isWait改變爲false,所以,在WaitThread中while對isWait判斷後就不會執行wait方法,從而避免了Notify過早通知形成遺漏的狀況。
總結:
在使用線程的等待/通知機制時,通常都要配合一個 boolean 變量值(或者其餘可以判斷真假的條件),在 notify 以前改變該 boolean 變量的值,讓 wait 返回後可以退出 while 循環(通常都要在 wait 方法外圍加一層 while 循環,以防止早期通知),或在通知被遺漏後,不會被阻塞在 wait 方法處。這樣便保證了程序的正確性。
2)等待wait的條件發生變化
若是線程在等待時接受到了通知,可是以後等待的條件發生了變化,並無再次對等待條件進行判斷,也會致使程序出現錯誤。
下面用一個例子來講明這種狀況
public class ConditionChange { private static List<String> lockObject = new ArrayList(); public static void main(String[] args) { Consumer consumer1 = new Consumer(lockObject); Consumer consumer2 = new Consumer(lockObject); Productor productor = new Productor(lockObject); consumer1.start(); consumer2.start(); productor.start(); } static class Consumer extends Thread { private List<String> lock; public Consumer(List lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { try { //這裏使用if的話,就會存在wait條件變化形成程序錯誤的問題 if (lock.isEmpty()) { System.out.println(Thread.currentThread().getName() + " list爲空"); System.out.println(Thread.currentThread().getName() + " 調用wait方法"); lock.wait(); System.out.println(Thread.currentThread().getName() + " wait方法結束"); } String element = lock.remove(0); System.out.println(Thread.currentThread().getName() + " 取出第一個元素爲:" + element); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class Productor extends Thread { private List<String> lock; public Productor(List lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { System.out.println(Thread.currentThread().getName() + " 開始添加元素"); lock.add(Thread.currentThread().getName()); lock.notifyAll(); } } } }
會報異常:
Exception in thread "Thread-1" Thread-0 list爲空 Thread-0 調用wait方法 Thread-1 list爲空 Thread-1 調用wait方法 Thread-2 開始添加元素 Thread-1 wait方法結束 java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
異常緣由分析:
在這個例子中一共開啓了3個線程,Consumer1,Consumer2以及Productor。首先Consumer1調用了wait方法後,線程處於了WAITTING狀態,而且將對象鎖釋放出來。所以,Consumer2可以獲取對象鎖,從而進入到同步代塊中,當執行到wait方法時,一樣的也會釋放對象鎖。
所以,productor可以獲取到對象鎖,進入到同步代碼塊中,向list中插入數據後,經過notifyAll方法通知處於WAITING狀態的Consumer1和Consumer2線程。
consumer1獲得對象鎖後,從wait方法出退出,刪除了一個元素讓List爲空,方法執行結束,退出同步塊,釋放掉對象鎖。這個時候Consumer2獲取到對象鎖後,從wait方法退出,繼續往下執行,這個時候Consumer2再執行lock.remove(0);就會出錯,由於List因爲Consumer1刪除一個元素以後已經爲空了。
解決方案:
經過上面的分析,能夠看出Consumer2報異常是由於線程從wait方法退出以後沒有再次對wait條件進行判斷,所以,此時的wait條件已經發生了變化。解決辦法就是,在wait退出以後再對條件進行判斷便可。
public class ConditionChange { private static List<String> lockObject = new ArrayList(); public static void main(String[] args) { Consumer consumer1 = new Consumer(lockObject); Consumer consumer2 = new Consumer(lockObject); Productor productor = new Productor(lockObject); consumer1.start(); consumer2.start(); productor.start(); } static class Consumer extends Thread { private List<String> lock; public Consumer(List lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { try { //這裏使用if的話,就會存在wait條件變化形成程序錯誤的問題 while (lock.isEmpty()) { System.out.println(Thread.currentThread().getName() + " list爲空"); System.out.println(Thread.currentThread().getName() + " 調用wait方法"); lock.wait(); System.out.println(Thread.currentThread().getName() + " wait方法結束"); } String element = lock.remove(0); System.out.println(Thread.currentThread().getName() + " 取出第一個元素爲:" + element); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class Productor extends Thread { private List<String> lock; public Productor(List lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { System.out.println(Thread.currentThread().getName() + " 開始添加元素"); lock.add(Thread.currentThread().getName()); lock.notifyAll(); } } } }
上面的代碼與以前的代碼僅僅只是將 wait 外圍的 if 語句改成 while 循環便可,這樣當 list 爲空時,線程便會繼續等待,而不會繼續去執行刪除 list 中元素的代碼。
總結:
在使用線程的等待/通知機制時,通常都要在 while 循環中調用 wait()方法,所以xuy配合使用一個 boolean 變量(或其餘能判斷真假的條件,如本文中的 list.isEmpty()),知足 while 循環的條件時,進入 while 循環,執行 wait()方法,不知足 while 循環的條件時,跳出循環,執行後面的代碼。
三、「假死」狀態
現象:若是是多消費者和多生產者狀況,若是使用notify方法可能會出現「假死」的狀況,即喚醒的是同類線程。
緣由分析:假設當前多個生產者線程會調用wait方法阻塞等待,當其中的生產者線程獲取到對象鎖以後使用notify通知處於WAITTING狀態的線程,若是喚醒的仍然是生產者線程,就會形成全部的生產者線程都處於等待狀態。
解決辦法:將notify方法替換成notifyAll方法,若是使用的是lock的話,就將signal方法替換成signalAll方法。
總結
在Object提供的消息通知機制應該遵循以下這些條件:
永遠在while循環中對條件進行判斷而不是if語句中進行wait條件的判斷;
使用NotifyAll而不是使用notify。
基本的使用範式以下:
// The standard idiom for calling the wait method in Java synchronized (sharedObject) { while (condition) { sharedObject.wait(); // (Releases lock, and reacquires on wakeup) } // do action based upon condition e.g. take or put into queue }
wait/notifyAll實現生產者-消費者
利用wait/notifyAll實現生產者和消費者代碼以下:
public class ProductorConsumer { public static void main(String[] args) { LinkedList linkedList = new LinkedList(); ExecutorService service = Executors.newFixedThreadPool(15); for (int i = 0; i < 5; i++) { service.submit(new Productor(linkedList, 8)); } for (int i = 0; i < 10; i++) { service.submit(new Consumer(linkedList)); } } static class Productor implements Runnable { private List<Integer> list; private int maxLength; public Productor(List list, int maxLength) { this.list = list; this.maxLength = maxLength; } @Override public void run() { while (true) { synchronized (list) { try { while (list.size() == maxLength) { System.out.println("生產者" + Thread.currentThread().getName() + " list以達到最大容量,進行wait"); list.wait(); System.out.println("生產者" + Thread.currentThread().getName() + " 退出wait"); } Random random = new Random(); int i = random.nextInt(); System.out.println("生產者" + Thread.currentThread().getName() + " 生產數據" + i); list.add(i); list.notifyAll(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } static class Consumer implements Runnable { private List<Integer> list; public Consumer(List list) { this.list = list; } @Override public void run() { while (true) { synchronized (list) { try { while (list.isEmpty()) { System.out.println("消費者" + Thread.currentThread().getName() + " list爲空,進行wait"); list.wait(); System.out.println("消費者" + Thread.currentThread().getName() + " 退出wait"); } Integer element = list.remove(0); System.out.println("消費者" + Thread.currentThread().getName() + " 消費數據:" + element); list.notifyAll(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } }
輸出結果:
生產者pool-1-thread-1 生產數據-232820990 生產者pool-1-thread-1 生產數據1432164130 生產者pool-1-thread-1 生產數據1057090222 生產者pool-1-thread-1 生產數據1201395916 生產者pool-1-thread-1 生產數據482766516 生產者pool-1-thread-1 list以達到最大容量,進行wait 消費者pool-1-thread-15 退出wait 消費者pool-1-thread-15 消費數據:1237535349 消費者pool-1-thread-15 消費數據:-1617438932 消費者pool-1-thread-15 消費數據:-535396055 消費者pool-1-thread-15 消費數據:-232820990 消費者pool-1-thread-15 消費數據:1432164130 消費者pool-1-thread-15 消費數據:1057090222 消費者pool-1-thread-15 消費數據:1201395916 消費者pool-1-thread-15 消費數據:482766516 消費者pool-1-thread-15 list爲空,進行wait 生產者pool-1-thread-5 退出wait 生產者pool-1-thread-5 生產數據1442969724 生產者pool-1-thread-5 生產數據1177554422 生產者pool-1-thread-5 生產數據-133137235 生產者pool-1-thread-5 生產數據324882560 生產者pool-1-thread-5 生產數據2065211573 生產者pool-1-thread-5 生產數據253569900 生產者pool-1-thread-5 生產數據571277922 生產者pool-1-thread-5 生產數據1622323863 生產者pool-1-thread-5 list以達到最大容量,進行wait 消費者pool-1-thread-10 退出wait
參照Object的wait和notify/notifyAll方法,Condition也提供了一樣的方法:
一、針對wait方法
void await() throws InterruptedException:當前線程進入等待狀態,若是其餘線程調用condition的signal或者signalAll方法而且當前線程獲取Lock從await方法返回,若是在等待狀態中被中斷會拋出被中斷異常;
long awaitNanos(long nanosTimeout):當前線程進入等待狀態直到被通知,中斷或者超時;
boolean await(long time, TimeUnit unit)throws InterruptedException:同第二種,支持自定義時間單位
boolean awaitUntil(Date deadline) throws InterruptedException:當前線程進入等待狀態直到被通知,中斷或者到了某個時間
二、針對notify方法
void signal():喚醒一個等待在condition上的線程,將該線程從等待隊列中轉移到同步隊列中,若是在同步隊列中可以競爭到Lock則能夠從等待方法中返回。
void signalAll():與1的區別在於可以喚醒全部等待在condition上的線程
也就是說wait--->await,notify---->Signal。
若是採用lock中Conditon的消息通知原理來實現生產者-消費者問題,原理同使用wait/notifyAll同樣。直接上代碼:
public class ProductorConsumer { private static ReentrantLock lock = new ReentrantLock(); private static Condition full = lock.newCondition(); private static Condition empty = lock.newCondition(); public static void main(String[] args) { LinkedList linkedList = new LinkedList(); ExecutorService service = Executors.newFixedThreadPool(15); for (int i = 0; i < 5; i++) { service.submit(new Productor(linkedList, 8, lock)); } for (int i = 0; i < 10; i++) { service.submit(new Consumer(linkedList, lock)); } } static class Productor implements Runnable { private List<Integer> list; private int maxLength; private Lock lock; public Productor(List list, int maxLength, Lock lock) { this.list = list; this.maxLength = maxLength; this.lock = lock; } @Override public void run() { while (true) { lock.lock(); try { while (list.size() == maxLength) { System.out.println("生產者" + Thread.currentThread().getName() + " list以達到最大容量,進行wait"); full.await(); System.out.println("生產者" + Thread.currentThread().getName() + " 退出wait"); } Random random = new Random(); int i = random.nextInt(); System.out.println("生產者" + Thread.currentThread().getName() + " 生產數據" + i); list.add(i); empty.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } } static class Consumer implements Runnable { private List<Integer> list; private Lock lock; public Consumer(List list, Lock lock) { this.list = list; this.lock = lock; } @Override public void run() { while (true) { lock.lock(); try { while (list.isEmpty()) { System.out.println("消費者" + Thread.currentThread().getName() + " list爲空,進行wait"); empty.await(); System.out.println("消費者" + Thread.currentThread().getName() + " 退出wait"); } Integer element = list.remove(0); System.out.println("消費者" + Thread.currentThread().getName() + " 消費數據:" + element); full.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } } }
輸出結果:
消費者pool-1-thread-9 消費數據:1146627506 消費者pool-1-thread-9 消費數據:1508001019 消費者pool-1-thread-9 消費數據:-600080565 消費者pool-1-thread-9 消費數據:-1000305429 消費者pool-1-thread-9 消費數據:-1270658620 消費者pool-1-thread-9 消費數據:1961046169 消費者pool-1-thread-9 消費數據:-307680655 消費者pool-1-thread-9 list爲空,進行wait 消費者pool-1-thread-13 退出wait 消費者pool-1-thread-13 list爲空,進行wait 消費者pool-1-thread-10 退出wait 生產者pool-1-thread-5 退出wait 生產者pool-1-thread-5 生產數據-892558288 生產者pool-1-thread-5 生產數據-1917220008 生產者pool-1-thread-5 生產數據2146351766 生產者pool-1-thread-5 生產數據452445380 生產者pool-1-thread-5 生產數據1695168334 生產者pool-1-thread-5 生產數據1979746693 生產者pool-1-thread-5 生產數據-1905436249 生產者pool-1-thread-5 生產數據-101410137 生產者pool-1-thread-5 list以達到最大容量,進行wait 生產者pool-1-thread-1 退出wait 生產者pool-1-thread-1 list以達到最大容量,進行wait 生產者pool-1-thread-4 退出wait 生產者pool-1-thread-4 list以達到最大容量,進行wait 生產者pool-1-thread-2 退出wait 生產者pool-1-thread-2 list以達到最大容量,進行wait 生產者pool-1-thread-3 退出wait 生產者pool-1-thread-3 list以達到最大容量,進行wait 消費者pool-1-thread-9 退出wait 消費者pool-1-thread-9 消費數據:-892558288
因爲BlockingQueue內部實現就附加了兩個阻塞操做。
即當隊列已滿時,阻塞向隊列中插入數據的線程,直至隊列中未滿;當隊列爲空時,阻塞從隊列中獲取數據的線程,直至隊列非空時爲止。
能夠利用BlockingQueue實現生產者-消費者爲題,阻塞隊列徹底能夠充當共享數據區域,就能夠很好的完成生產者和消費者線程之間的協做。
public class ProductorConsumer { private static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(); public static void main(String[] args) { ExecutorService service = Executors.newFixedThreadPool(15); for (int i = 0; i < 5; i++) { service.submit(new Productor(queue)); } for (int i = 0; i < 10; i++) { service.submit(new Consumer(queue)); } } static class Productor implements Runnable { private BlockingQueue queue; public Productor(BlockingQueue queue) { this.queue = queue; } @Override public void run() { try { while (true) { Random random = new Random(); int i = random.nextInt(); System.out.println("生產者" + Thread.currentThread().getName() + "生產數據" + i); queue.put(i); } } catch (InterruptedException e) { e.printStackTrace(); } } } static class Consumer implements Runnable { private BlockingQueue queue; public Consumer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { try { while (true) { Integer element = (Integer) queue.take(); System.out.println("消費者" + Thread.currentThread().getName() + "正在消費數據" + element); } } catch (InterruptedException e) { e.printStackTrace(); } } } }
輸出結果:
消費者pool-1-thread-7正在消費數據1520577501 生產者pool-1-thread-4生產數據-127809610 消費者pool-1-thread-8正在消費數據504316513 生產者pool-1-thread-2生產數據1994678907 消費者pool-1-thread-11正在消費數據1967302829 生產者pool-1-thread-1生產數據369331507 消費者pool-1-thread-9正在消費數據1994678907 生產者pool-1-thread-2生產數據-919544017 消費者pool-1-thread-12正在消費數據-127809610 生產者pool-1-thread-4生產數據1475197572 消費者pool-1-thread-14正在消費數據-893487914 生產者pool-1-thread-3生產數據906921688 消費者pool-1-thread-6正在消費數據-1292015016 生產者pool-1-thread-5生產數據-652105379 生產者pool-1-thread-5生產數據-1622505717 生產者pool-1-thread-3生產數據-1350268764 消費者pool-1-thread-7正在消費數據906921688 生產者pool-1-thread-4生產數據2091628867 消費者pool-1-thread-13正在消費數據1475197572 消費者pool-1-thread-15正在消費數據-919544017 生產者pool-1-thread-2生產數據564860122 生產者pool-1-thread-2生產數據822954707 消費者pool-1-thread-14正在消費數據564860122 消費者pool-1-thread-10正在消費數據369331507 生產者pool-1-thread-1生產數據-245820912 消費者pool-1-thread-6正在消費數據822954707 生產者pool-1-thread-2生產數據1724595968 生產者pool-1-thread-2生產數據-1151855115 消費者pool-1-thread-12正在消費數據2091628867 生產者pool-1-thread-4生產數據-1774364499 生產者pool-1-thread-4生產數據2006106757 消費者pool-1-thread-14正在消費數據-1774364499 生產者pool-1-thread-3生產數據-1070853639 消費者pool-1-thread-9正在消費數據-1350268764 消費者pool-1-thread-11正在消費數據-1622505717 生產者pool-1-thread-5生產數據355412953
能夠看出,使用BlockingQueue來實現生產者-消費者很簡潔,這正是利用了BlockingQueue插入和獲取數據附加阻塞操做的特性。
關於生產者-消費者實現的三種方式,到這裏就所有總結出來,但願對你有幫助,同時若有錯誤歡迎指正。
我將面試題和答案都整理成了PDF文檔,還有一套學習資料,涵蓋Java虛擬機、spring框架、Java線程、數據結構、設計模式等等,但不只限於此。關注公衆號【java圈子】獲取資料,還有優質文章每日送達。