在討論基於阻塞隊列的生產者消費者模式以前咱們先搞清楚到底什麼是生產者-消費者模式(producer-consumer模式)?html
好比有兩個進程A和B,它們共享一個固定大小的緩衝區,A進程產生數據放入緩衝區,B進程從緩衝區中取出數據進行計算,那麼這裏其實就是一個生產者和消費者的模式,A至關於生產者,B至關於消費者java
在多線程開發中,若是生產者生產數據的速度很快,而消費者消費數據的速度很慢,那麼生產者就必須等待消費者消費完了數據纔可以繼續生產數據,由於生產那麼多也沒有地方放啊;同理若是消費者的速度大於生產者那麼消費者就會常常處理等待狀態,因此爲了達到生產者和消費者生產數據和消費數據之間的平衡,那麼就須要一個緩衝區用來存儲生產者生產的數據,因此就引入了生產者-消費者模式git
簡單來講這裏的緩衝區的做用就是爲了平衡生產者和消費者的處理能力,起到一個數據緩存的做用,同時也達到了一個解耦的做用github
生產者-消費者模式通常用於將生產數據的一方和消費數據的一方分割開來,將生產數據與消費數據的過程解耦開來web
Excutor任務執行框架:redis
消息中間件activeMQ:編程
任務的處理時間比較長的狀況下:segmentfault
首先咱們從最簡單的開始,假設只有一個生產者線程執行put操做,向緩衝區中添加數據,同時也只有一個消費者線程從緩衝區中取出數據windows
UML實體關係圖,從UML類圖中能夠看出,咱們的producer和consumer類都持有一個對container對象的引用,這樣的設計模式實際上在不少設計模式都有用到,好比咱們的裝飾者模式等等,它們共同的目的都是爲了達到解耦和複用的效果設計模式
在實現生產者-消費者模式以前咱們須要搞清兩個問題:
1)容器中數據狀態的一致性:當一個consumer執行了take()方法以後,此時容器爲空,可是還沒來得及更新容器的size,那麼另一個consumer來了以後覺得size不等於0,那麼繼續執行take(),從而形成了了狀態的不一致性
2)爲了保證當容器裏面沒有數據的時候,消費者不會繼續take,此時消費者釋放鎖,處於阻塞狀態;而且一旦生產者添加了一條數據以後,此時從新喚醒消費者,消費者從新獲取到容器的鎖,繼續執行take();
當容器裏面滿的時候,生產者也不會繼續put, 此時生產者釋放鎖,處於阻塞狀態;一旦消費者take了一條數據,此時應該喚醒生產者從新獲取到容器的鎖,繼續put
因此對於該容器的任何訪問都須要進行同步,也就是說在獲取容器的數據以前,須要先獲取到容器的鎖。
而這裏對於容器狀態的同步能夠參考以下幾種方法:
要構建一個生產者消費者模式,那麼首先就須要構建一個固定大小的緩衝區,而且該緩衝區具備可阻塞的put方法和take方法
接下來咱們採用第一種方法來實現該模型:使用Object的wait() / notify()方法實現生產者-消費者模型
ps:採用wait()/notify()方法的缺點是不能實現單生產者單消費者模式,由於要是用notify()就必須使用同步代碼塊
package test1; import java.util.LinkedList; public class Container { LinkedList<Integer> list = new LinkedList<Integer>(); int capacity = 10; public void put(int value){ while (true){ try { //sleep不能放在同步代碼塊裏面,由於sleep不會釋放鎖, // 當前線程會一直佔有produce線程,直到達到容量,調用wait()方法主動釋放鎖 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (this){ //當容器滿的時候,producer處於等待狀態 while (list.size() == capacity){ System.out.println("container is full,waiting ...."); try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //沒有滿,則繼續produce System.out.println("producer--"+ Thread.currentThread().getName()+"--put:" + value); list.add(value++); //喚醒其餘全部處於wait()的線程,包括消費者和生產者 notifyAll(); } } } public Integer take(){ Integer val = 0; while (true){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (this){ //若是容器中沒有數據,consumer處於等待狀態 while (list.size() == 0){ System.out.println("container is empty,waiting ..."); try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //若是有數據,繼續consume val = list.removeFirst(); System.out.println("consumer--"+ Thread.currentThread().getName()+"--take:" + val); //喚醒其餘全部處於wait()的線程,包括消費者和生產者 //notify必須放在同步代碼塊裏面 notifyAll(); } } } }
ps:
這裏須要注意的是sleep()不能放在synchronized代碼塊裏面,由於咱們知道sleep()執行以後是不會釋放鎖的,也就是說當前線程仍然持有對container對象的互斥鎖,這個時候當前線程繼續判斷list.size是否等於capacity,不等於就繼續put,而後又sleep一會,而後又繼續,直到當list.size == capacity,這個時候終於進入wait()方法,咱們知道wait()方法會釋放鎖,這個時候其餘線程纔有機會獲取到container的互斥鎖,
package test1; import test1.Container; import java.util.Random; public class Producer implements Runnable{ private Container container; public Producer(Container container) { this.container = container; } @Override public void run() { container.put(new Random().nextInt(100)); } }
package test1; import java.util.Random; public class Consumer implements Runnable{ private Container container; public Consumer(Container container) { this.container = container; } @Override public void run() { Integer val = container.take(); } }
package test1; import test1.Consumer; import test1.Container; import test1.Producer; public class Main { public static void main(String[] args){ Container container = new Container(); Thread producer1 = new Thread(new Producer(container)); Thread producer2 = new Thread(new Producer(container)); Thread producer3 = new Thread(new Producer(container)); Thread producer4 = new Thread(new Producer(container)); producer1.start(); producer2.start(); producer3.start(); producer4.start(); Thread consumer1 = new Thread(new Consumer(container)); Thread consumer2 = new Thread(new Consumer(container)); Thread consumer3 = new Thread(new Consumer(container)); Thread consumer4 = new Thread(new Consumer(container)); Thread consumer5 = new Thread(new Consumer(container)); Thread consumer6 = new Thread(new Consumer(container)); consumer1.start(); consumer2.start(); consumer3.start(); consumer4.start(); consumer5.start(); consumer6.start(); } }
運行結果
producer--Thread-1--put:80 producer--Thread-2--put:19 producer--Thread-3--put:8 producer--Thread-0--put:74 consumer--Thread-8--take:80 consumer--Thread-4--take:19 consumer--Thread-6--take:8 consumer--Thread-9--take:74 container is empty,waiting ... container is empty,waiting ... producer--Thread-2--put:20 consumer--Thread-7--take:20 container is empty,waiting ... producer--Thread-3--put:9 producer--Thread-1--put:81 producer--Thread-0--put:75 consumer--Thread-5--take:9 consumer--Thread-6--take:81 consumer--Thread-8--take:75 container is empty,waiting ... container is empty,waiting ... container is empty,waiting ...
生產者消費者模型中的共享資源是一個固定大小的緩衝區,該模式須要當緩衝區滿的時候,生產者再也不生產數據,直到消費者消費了一個數據以後,才繼續生產;同理當緩衝區空的時候,消費者再也不消費數據,直到生產者生產了一個數據以後,才繼續消費
若是要經過信號量來解決這個問題:關鍵在於找到可以跟蹤緩衝區的size大小變化,並根據緩衝區的數量變化來控制消費者和生產者線程之間的協做和運行
那麼很容易很夠想到用兩個信號量:empytyCount和fullCount分別來表示緩衝區滿或者空的狀態,進而可以更加容易控制消費者和生產者到底何時處於阻塞狀態,何時處於運行狀態
同時爲了使得程序更加具備健壯性,咱們還添加一個二進制信號量useQueue,確保隊列的狀態的完整性不受損害。例如當兩個生產者同時向空隊列添加數據時,從而破壞了隊列內部的狀態,使得其餘計數信號量或者返回的緩衝區的size大小不具備一致性。(固然這裏也可使用mutex來代替二進制信號量)
produce: P(emptyCount)//信號量emptyCount減一 P(useQueue)//二值信號量useQueue減一,變爲0(其餘線程不能進入緩衝區,阻塞狀態) putItemIntoQueue(item)//執行put操做 V(useQueue)//二值信號量useQueue加一,變爲1(其餘線程能夠進入緩衝區) V(fullCount)//信號量fullCount加一
consume: P(fullCount)//fullCount -= 1 P(useQueue)//useQueue -= 1(useQueue = 0) item ← getItemFromQueue() V(useQueue)//useQueue += 1 (useQueue = 1) V(emptyCount)//emptyCount += 1
ps: 這裏的兩個PV操做是否能夠顛倒
首先生產者獲取到信號量emptyCount,執行P(emptyCount),確保emptyCount不等於0,也就是還有空間添加數據,從而纔可以進入臨界區container
而後執行put操做,執行put操做以前須要爲緩衝區加把鎖,防止在put的過程當中,其餘線程對緩衝區進行修改,因此這個時候須要獲取另一個信號量useQueue
相反,若是先執行了 P(useQueue),而且此時的emptyCount = 0,那麼生產者就會一直阻塞,直到消費者消費了一個數據;可是此時消費者又沒法獲取到互斥信號量useQueue,也會一直阻塞,因此就造成了一個死鎖
因此這兩個p操做是不能交換順序的,信號量emptyCount是useQueue的基礎和前提條件
此時若是生產者已經執行完put操做,那麼能夠先釋放互斥信號量,再執行 V(fullCount);或者先執行 V(fullCount)再釋放互斥信號量都沒有關係。不會對其餘的生產者消費者的狀態產生影響;可是最好的仍是先釋放互斥鎖,再執行V(fullCount),這樣能夠保證當容器滿的時候,消費者可以及時的獲取到互斥鎖
Container
package test3; import java.util.LinkedList; import java.util.List; import java.util.concurrent.Semaphore; public class Container { Semaphore fullCount = new Semaphore(0); Semaphore emptyCount = new Semaphore(10); Semaphore isUse = new Semaphore(1); List list = new LinkedList<Integer>(); public void put(Integer val){ try { emptyCount.acquire(); isUse.acquire(); list.add(val); System.out.println("producer--"+ Thread.currentThread().getName()+"--put:" + val+"===size:"+list.size()); } catch (InterruptedException e) { e.printStackTrace(); }finally { isUse.release(); fullCount.release(); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } public Integer get(){ Integer val1 = 0; try { fullCount.acquire(); isUse.acquire(); val1 = (Integer) list.remove(0); System.out.println("consumer--"+ Thread.currentThread().getName()+"--take:" + val1+"===size:"+list.size()); } catch (InterruptedException e) { e.printStackTrace(); }finally { isUse.release(); emptyCount.release(); } return val1; } }
生產者
package test3; import java.util.Random; public class Producer implements Runnable{ private Container container; public Producer(Container container) { this.container = container; } @Override public void run() { while (true){ container.put(new Random().nextInt(100)); } } }
消費者
package test3; public class Consumer implements Runnable{ private Container container; public Consumer(Container container) { this.container = container; } @Override public void run() { while (true){ Integer val = container.get(); } } }
測試
package test3; public class Test { public static void main(String[] args){ Container container = new Container(); Thread producer1 = new Thread(new Producer(container)); Thread producer2 = new Thread(new Producer(container)); Thread producer3 = new Thread(new Producer(container)); Thread consumer1 = new Thread(new Consumer(container)); Thread consumer2 = new Thread(new Consumer(container)); Thread consumer3 = new Thread(new Consumer(container)); Thread consumer4 = new Thread(new Consumer(container)); producer1.start(); producer2.start(); producer3.start(); consumer1.start(); consumer2.start(); consumer3.start(); consumer4.start(); } }
producer--Thread-0--put:74===size:1 producer--Thread-4--put:16===size:2 producer--Thread-2--put:51===size:3 producer--Thread-1--put:77===size:4 producer--Thread-3--put:93===size:5 consumer--Thread-6--take:74===size:4 consumer--Thread-6--take:16===size:3 consumer--Thread-6--take:51===size:2 consumer--Thread-6--take:77===size:1 consumer--Thread-5--take:93===size:0 producer--Thread-4--put:19===size:1 producer--Thread-3--put:68===size:2 producer--Thread-0--put:72===size:3 consumer--Thread-6--take:19===size:2 consumer--Thread-6--take:68===size:1 consumer--Thread-5--take:72===size:0 producer--Thread-1--put:82===size:1 producer--Thread-2--put:32===size:2 consumer--Thread-5--take:82===size:1
因爲這裏的緩衝區由BlockingQueue容器代替,那麼這裏咱們就不須要從新建立一個容器類了,直接建立生產者類和消費者類,而且一樣的都須要擁有一個容器類BlockingQueue的實例應用
package test; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; public class Producer implements Runnable{ private ArrayBlockingQueue<Integer> queue ; public Producer(ArrayBlockingQueue<Integer> queue) { this.queue = queue; } @Override public void run() { Random random = new Random(); while (true){ try { Thread.sleep(100); if(queue.size() == 10) System.out.println("================the queue is full,the producer thread is waiting.................."); int item = random.nextInt(100); queue.put(item); System.out.println("producer:" + Thread.currentThread().getName() + " produce:" + item+";the size of the queue:" + queue.size()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
package test; import java.util.concurrent.ArrayBlockingQueue; public class Consumer implements Runnable { private ArrayBlockingQueue<Integer> queue; public Consumer(ArrayBlockingQueue<Integer> queue) { this.queue = queue; } @Override public void run() { while (true){ try { Thread.sleep(100); if(queue.size() == 0) System.out.println("=============the queue is empty,the consumer thread is waiting................"); Integer item = queue.take(); System.out.println("consumer:" + Thread.currentThread().getName() + " consume:" + item+";the size of the queue:" + queue.size()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
package test; import java.util.concurrent.ArrayBlockingQueue; public class Test { public static void main(String[] args){ ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10); Thread producer1 = new Thread(new Producer(queue)); Thread producer2 = new Thread(new Producer(queue)); Thread producer3 = new Thread(new Producer(queue)); Thread producer4 = new Thread(new Producer(queue)); Thread producer5 = new Thread(new Producer(queue)); producer1.start(); producer2.start(); producer3.start(); producer4.start(); producer5.start(); Thread consumer1 = new Thread(new Consumer(queue)); Thread consumer2 = new Thread(new Consumer(queue)); consumer1.start(); consumer2.start(); try { producer1.join(); producer2.join(); producer3.join(); producer4.join(); producer5.join(); consumer1.join(); consumer2.join(); } catch (InterruptedException e) { e.printStackTrace(); } } }
=============the queue is empty,the consumer thread is waiting................ consumer:Thread-5 consume:64;the size of the queue:0 producer:Thread-3 produce:64;the size of the queue:1 consumer:Thread-6 consume:87;the size of the queue:0 producer:Thread-1 produce:1;the size of the queue:3 producer:Thread-4 produce:87;the size of the queue:2 producer:Thread-2 produce:71;the size of the queue:2 producer:Thread-0 produce:76;the size of the queue:1 consumer:Thread-6 consume:71;the size of the queue:2 producer:Thread-1 produce:26;the size of the queue:6 producer:Thread-3 produce:6;the size of the queue:6 producer:Thread-0 produce:76;the size of the queue:5 producer:Thread-2 produce:37;the size of the queue:6
在用Lock和Condition的await()/signal()方法實現生產者消費者以前,咱們先來了解一下Lock和synchronized都是基於鎖有哪些區別,以及Condition的await()/signal()方法和Object的wait()/notify()方法都是等待和喚醒又有哪些區別
鎖機制 | Lock | synchronized |
---|---|---|
所屬層次 | java.util.concurrent package中的一個接口 | 是一個關鍵字,JVM內置的語言實現 |
釋放鎖與加鎖 | 經過lock()/unlock()進行手動釋放與加鎖 | 不須要,進入synchronized同步代碼塊就自動獲取鎖,退出同步代碼塊自動釋放鎖 |
設置超時時間 | trylock(timeout) | 沒有超時時間,線程會一直阻塞,直到獲取鎖 |
公平機制 | 設置true,爲公平鎖,等待時間最長的先獲取 | 沒有 |
阻塞線程列表 | 能夠查看正處於等待狀態的線程列表 | 不能夠 |
遇到異常時釋放 | 當遇到異常時在finally中執行unlock() | 遇到異常時釋放鎖 |
底層實現 | 樂觀鎖方式(cas),每次不加鎖而是假設沒有衝突而去完成某項操做 | CPU悲觀鎖機制,即線程得到的是獨佔鎖,只能依靠阻塞來等待線程釋放鎖 |
具體喚醒某一個線程 | ReentrantLock裏面的Condition應用,可以控制signal哪一個線程 | 不能控制具體notify哪一個線程,notifyall()喚醒全部線程 |
靈活性 | 比synchronized更加靈活 | 不是那麼靈活 |
響應中斷 | 等待的線程能夠響應中斷 | 不能響應中斷 |
應用場景 | 資源競爭激烈的狀況下,是synchronized的幾十倍 | 資源競爭不激烈時,優於Lock |
方法 | Condition | Object |
---|---|---|
阻塞等待 | await() | wait() |
喚醒其餘線程 | signal() | notify()/notifyall() |
使用的鎖 | 互斥鎖/共享鎖,如Lock | 同步鎖:如synchronized |
一個鎖對應 | 能夠建立多個condition | 對應一個Object |
喚醒指定的線程 | 明確的指定線程 | 只能經過notifyAll喚醒全部線程;或者notify()隨機喚醒 |
該實現方式相比較synchronized於object的wait()/notify()方法具備更加的靈活性,能夠喚醒具體的消費者線程或者生產者線程,達到當緩衝區滿的時候,喚醒消費者線程,此時生產者線程都將被阻塞,而不是向notifyall()那樣喚醒全部的線程。
package test8; import java.util.LinkedList; import java.util.List; import java.util.Vector; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Container{ private final Lock lock = new ReentrantLock(); //表示生產者線程 private final Condition notFull = lock.newCondition(); //表示消費者線程 private final Condition notEmpty = lock.newCondition(); private int capacity; private List<Integer> list = new LinkedList<>(); public Container(int capacity) { this.capacity = capacity; } public Integer take(){ lock.lock(); try { while (list.size() == 0) try { System.out.println("the list is empty........"); notEmpty.await();//阻塞消費者線程 } catch (InterruptedException e) { e.printStackTrace(); } Integer val = list.remove(0); System.out.println("consumer--"+ Thread.currentThread().getName()+"--take:" + val+"===size:"+list.size()); notFull.signalAll();//喚醒全部生產者線程 return val; }finally { lock.unlock(); } } public void put(Integer val){ lock.lock(); try { while (list.size() == capacity){ try { System.out.println("the list is full........"); notFull.await();//阻塞生產者線程 } catch (InterruptedException e) { e.printStackTrace(); } } list.add(val); System.out.println("producer--"+ Thread.currentThread().getName()+"--put:" + val+"===size:"+ list.size()); notEmpty.signalAll();//喚醒全部消費者線程 }finally { lock.unlock(); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
package test8; import java.util.Random; import java.util.TreeMap; import java.util.concurrent.locks.Condition; public class Producer implements Runnable { private Container container; public Producer(Container container) { this.container = container; } @Override public void run() { while (true){ container.put(new Random().nextInt(100)); } } }
package test8; public class Consumer implements Runnable { private Container container; public Consumer(Container container) { this.container = container; } @Override public void run() { while (true){ Integer val = container.take(); } } }
package test8; public class Test { public static void main(String[] args){ Container container = new Container(5); Thread producer1 = new Thread(new Producer(container)); Thread producer2 = new Thread(new Producer(container)); Thread producer3 = new Thread(new Producer(container)); Thread producer4 = new Thread(new Producer(container)); Thread producer5 = new Thread(new Producer(container)); Thread consumer1 = new Thread(new Consumer(container)); Thread consumer2 = new Thread(new Consumer(container)); producer1.start(); producer2.start(); producer3.start(); producer4.start(); producer5.start(); consumer1.start(); consumer2.start(); } }
the list is empty........ producer--Thread-3--put:77===size:1 consumer--Thread-6--take:77===size:0 the list is empty........ producer--Thread-4--put:55===size:1 producer--Thread-0--put:62===size:2 producer--Thread-1--put:90===size:3 producer--Thread-2--put:57===size:4 consumer--Thread-5--take:55===size:3 consumer--Thread-5--take:62===size:2 consumer--Thread-5--take:90===size:1 consumer--Thread-5--take:57===size:0 the list is empty........ the list is empty........ producer--Thread-0--put:10===size:1 producer--Thread-1--put:21===size:2 producer--Thread-3--put:3===size:3 producer--Thread-4--put:75===size:4 producer--Thread-2--put:94===size:5 consumer--Thread-5--take:10===size:4
對於單生產者單消費者,只用保證緩衝區滿的時候,生產者不會繼續向緩衝區放數據,緩衝區空的時候,消費者不會繼續從緩衝區取數據,而不存在同時有兩個生產者使用緩衝區資源,形成數據不一致的狀態。
因此對於單生產者單消費者,若是採用信號量模型來實現的話,那麼只須要兩個信號量:empytyCount和fullCount分別來表示緩衝區滿或者空的狀態,進而可以更加容易控制消費者和生產者到底何時處於阻塞狀態,何時處於運行狀態; 而不須要使用互斥信號量了
produce: P(emptyCount)//信號量emptyCount減一 putItemIntoQueue(item)//執行put操做 V(fullCount)//信號量fullCount加一
consume: P(fullCount)//fullCount -= 1 item ← getItemFromQueue() V(emptyCount)//emptyCount += 1
實現
緩衝區容器類
package test9; import java.time.temporal.ValueRange; import java.util.LinkedList; import java.util.List; import java.util.concurrent.Semaphore; public class Container_spsc { Semaphore emptyCount = new Semaphore(10); Semaphore fullCount = new Semaphore(0); List<Integer> list = new LinkedList<Integer>(); public void put(int val){ try { emptyCount.acquire(); list.add(val); System.out.println("producer--"+ Thread.currentThread().getName()+"--put:" + val+"===size:"+list.size()); } catch (InterruptedException e) { e.printStackTrace(); }finally { fullCount.release(); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } public Integer take(){ Integer val = 0; try { fullCount.acquire(); val = list.remove(0); System.out.println("consumer--"+ Thread.currentThread().getName()+"--take:" + val+"===size:"+list.size()); } catch (InterruptedException e) { e.printStackTrace(); }finally { emptyCount.release(); } return val; } }
生產者
package test9; import test8.Container; import java.util.Random; public class Producer implements Runnable { private Container_spsc container; public Producer(Container_spsc container) { this.container = container; } @Override public void run() { while (true){ container.put(new Random().nextInt(100)); } } }
消費者類
package test9; import test8.Container; public class Consumer implements Runnable { private Container_spsc container; public Consumer(Container_spsc container) { this.container = container; } @Override public void run() { while (true){ Integer take = container.take(); } } }
測試
package test9; public class Test { public static void main(String[] args){ Container_spsc container = new Container_spsc(); Thread producer = new Thread(new Producer(container)); Thread consumer = new Thread(new Consumer(container)); producer.start(); consumer.start(); } }
producer--Thread-0--put:62===size:1 consumer--Thread-1--take:62===size:0 producer--Thread-0--put:40===size:1 consumer--Thread-1--take:40===size:0 producer--Thread-0--put:86===size:1 consumer--Thread-1--take:86===size:0 producer--Thread-0--put:15===size:1 consumer--Thread-1--take:15===size:0 producer--Thread-0--put:83===size:1 consumer--Thread-1--take:83===size:0 producer--Thread-0--put:13===size:1 consumer--Thread-1--take:13===size:0
對於多生產者單消費者來講,多生產者之間具備互斥關係,因此這裏須要一個互斥鎖來實現緩衝區的互斥訪問,那麼具體的實現方式就是在單生產者單消費者的基礎之上,加一個互斥信號量useQueue
若是採用信號量來實現的話能夠以下:
produce: P(emptyCount)//信號量emptyCount減一 P(useQueue)//二值信號量useQueue減一,變爲0(其餘線程不能進入緩衝區,阻塞狀態) putItemIntoQueue(item)//執行put操做 V(useQueue)//二值信號量useQueue加一,變爲1(其餘線程能夠進入緩衝區) V(fullCount)//信號量fullCount加一
consume: P(fullCount)//fullCount -= 1 item ← getItemFromQueue() V(emptyCount)//emptyCount += 1
具體的實現和單生產者單消費者差很少,只不過在生產者類裏面多加了一個互斥信號量useQueue
對於單生產者多消費者同多生產者多消費者
produce: P(emptyCount)//信號量emptyCount減一 putItemIntoQueue(item)//執行put操做 V(fullCount)//信號量fullCount加一
consume: P(fullCount)//fullCount -= 1 P(useQueue)//二值信號量useQueue減一,變爲0(其餘線程不能進入緩衝區,阻塞狀態) item ← getItemFromQueue() V(useQueue)//二值信號量useQueue加一,變爲1(其餘線程能夠進入緩衝區) V(emptyCount)//emptyCount += 1
具體的實現和單生產者單消費者差很少,只不過在消費者類裏面多加了一個互斥信號量useQueue
對於多生產者多消費者問題,是一個同步+互斥問題,不只須要生產者和消費者之間的同步協做,還須要實現對緩衝區資源的互斥訪問;這個能夠參考前面對生產者消費者4種實現方式
採用信號量
produce: P(emptyCount)//信號量emptyCount減一 P(useQueue)//二值信號量useQueue減一,變爲0(其餘線程不能進入緩衝區,阻塞狀態) putItemIntoQueue(item)//執行put操做 V(useQueue)//二值信號量useQueue加一,變爲1(其餘線程能夠進入緩衝區) V(fullCount)//信號量fullCount加一
consume: P(fullCount)//fullCount -= 1 P(useQueue)//二值信號量useQueue減一,變爲0(其餘線程不能進入緩衝區,阻塞狀態) item ← getItemFromQueue() V(useQueue)//二值信號量useQueue加一,變爲1(其餘線程能夠進入緩衝區) V(emptyCount)//emptyCount += 1
用一個緩衝區,生產者和消費者須要先獲取到緩衝區的鎖才能進行put和take操做,每一次put和take都須要獲取一次鎖,這須要大量的同步與互斥操做,十分損耗性能。
因此若是採用雙緩衝區的話,一個緩衝區bufferA用於生產者執行put操做,一個緩衝區bufferB用於消費者執行take操做;生產者線程和消費者線程在使用各自的緩衝區以前都須要先獲取到緩衝區對應的鎖,才能進行操做;
生產者和消費者各自使用本身獨立的緩衝區,那麼就不存在同一個緩衝區被put的同時進行take操做
因此一旦生產者和消費者一旦獲取到了對應緩衝區的鎖,那麼每一次執行put/take操做時就不用再次從新獲取鎖了,從而減小了不少獲取鎖、釋放鎖的性能開銷
若是bufferA被put滿了,那麼生產者釋放bufferA的鎖,並等待消費者釋放bufferB的鎖;當bufferB被take空了,消費者釋放bufferB的鎖,此時生產者獲取到bufferB的鎖,對bufferB進行put;消費者獲取到bufferA的鎖,對bufferA進行take,那麼就完成了一次緩衝區的切換
雙緩衝區的狀態
bufferA和bufferB都處於工做狀態,一個讀一個寫
假設bufferA已經滿了,那麼生產者就會釋放bufferA的鎖,嘗試獲取bufferB,而此時bufferB還在執行take操做,消費者還沒釋放bufferB的鎖,那麼生產者進入等待狀態
當bufferB爲空,那麼此時消費者釋放bufferB的鎖,嘗試獲取bufferA的鎖,此時消費者被喚醒,從新嘗試獲取bufferB的鎖
若是操做完當前的緩衝區以後,先獲取另一個緩衝區的鎖,再釋放當前緩衝區的鎖,就會發生死鎖問題。若是bufferA和bufferB的線程同時嘗試獲取對方的鎖,那麼就會一直循環等待下去
因爲雙緩衝區是爲了不每次讀寫的時候不用進行同步與互斥操做,因此對於一些原本就是線程安全的類例如arrayblockingqueue就不適合做爲雙緩衝區,由於他們內部已經實現了每次讀寫操做的時候進行加鎖和釋放
應用場景:
多個緩衝區構成一個緩衝池,一樣須要兩個同步信號量emtpyCount和fullCount,還有一個互斥信號量useQueue,同時還須要兩個變量指示哪些是空緩衝區哪些是有數據的緩衝區,多緩衝區和雙緩衝區同樣,一樣是以空間換時間,減小單個讀寫操做的同步與互斥操做,對於同一個緩衝區而言,不可能同時會put和take
討論爲何要引入環形緩衝區,其實也就是在討論隊列緩衝區有什麼弊端,而環形緩衝區是如何解決這種弊端的=
那麼咱們先認識一下什麼是環形緩衝區
隊列緩衝區
瞭解瞭如何使用Java經過簡單的synchronized與object的wait()/notify()、Lock與Condition的await()/signal()方法、BlockingQueue、信號量semaphore四種方法來實現生產者消費者模型,之後有機會咱們在研究研究Linux和windows分別又是如何實現生產者消費者模型的
《Java併發編程實踐》
實現生產者消費者模式的四種方式(Synchronized、Lock、Semaphore、BlockingQueue)
https://blog.csdn.net/luohuac...
https://www.geeksforgeeks.org...
https://www.geeksforgeeks.org...
https://blog.csdn.net/chencha...
https://www.cnblogs.com/Wante...
https://blog.csdn.net/u012403...
https://www.geeksforgeeks.org...
https://blog.csdn.net/woailuo...
https://blog.csdn.net/liuxiao...
https://program-think.blogspo...
https://zhuanlan.zhihu.com/p/...
歡迎關注個人公衆號:小秋的博客,天天進步一點點