生產者消費者模式是多線程中最爲常見的模式:生產者線程(一個或多個)生成麪包放進籃子裏(集合或數組),同時,消費者線程(一個或多個)從籃子裏(集合或數組)取出麪包消耗。雖然它們任務不一樣,但處理的資源是相同的,這體現的是一種線程間通訊方式。java
本文將先說明單生產者單消費者的狀況,以後再說明多生產者多消費者模式的狀況。還會分別使用wait()/nofity()/nofityAll()機制、lock()/unlock()機制實現這兩種模式。數組
在開始介紹模式以前,先解釋下wait()、notify()和notifyAll()方法的用法細節以及改進的lock()/unlock()、await()/signal()/signalAll()的用法。安全
wait()、notify()和notifyAll()分別表示讓線程進入睡眠、喚醒睡眠線程以及喚醒全部睡眠的線程。可是,對象是哪一個線程呢?另外,在API文檔中描述這三個方法都必須在有效監視器(可理解爲持有鎖)的前提下使用。這三個方法和鎖有什麼關係呢?bash
以同步代碼塊synchronized(obj){}或同步函數爲例,在它們的代碼結構中可使用wait()、notify()以及notifyAll(),由於它們都持有鎖。多線程
對於下面的兩個同步代碼塊來講,分別使用的是鎖obj1和鎖obj2,其中線程一、線程2執行的是obj1對應的同步代碼,線程三、線程4執行的是obj2對應的同步代碼。函數
class MyLock implements Runnable {
public int flag = 0;
Object obj1 = new Object();
Object obj2 = new Object();
public void run(){
while(true){
if(flag%2=0){
synchronized(obj1){ //線程t1和t2執行此同步任務
//try{obj1.wait();}catch(InterruptedException i){}
//obj1.notify()
//obj1.notifyAll()
}
} else {
synchronized(obj2){ //線程t3和t4執行此同步任務
//try{obj2.wait();}catch(InterruptedException i){}
//obj2.notify()
//obj2.notifyAll()
}
}
}
}
}
class Demo {
public static void main(String[] args){
MyLock ml = new MyLock();
Thread t1 = new Thread(ml);
Thread t2 = new Thread(ml);
Thread t3 = new Thread(ml);
Thread t4 = new Thread(ml);
t1.start();
t2.start();
try{Thread.sleep(1)}catch(InterruptedException i){};
ml.flag++;
t3.start();
t4.start();
}
}
複製代碼
當t1開始執行到wait()時,它將進入睡眠狀態,但卻不是通常的睡眠,而是在一個被obj1標識的線程池中睡眠(其實是監視器對應線程池,只不過此時的監視器和鎖是綁定在一塊兒的)。當t2開始執行,它發現鎖obj1被其餘線程持有,它將進入睡眠態,此次睡眠是由於鎖資源等待而非wait()進入的睡眠。由於t2已經判斷過它要申請的是obj1鎖,所以它也會進入obj1這個線程池睡眠,而不是普通的睡眠。同理t3和t4,這兩個線程會進入obj2線程池睡眠。ui
當某個線程執行到notify()時,這個notify()將 隨機 喚醒它 所屬鎖對應線程池 中的 任意一個 線程。例如,obj1.notify()將喚醒obj1線程池中任意一個睡眠的線程(固然,若是沒有睡眠線程則什麼也不作)。同理notifyAll()則是喚醒所屬鎖對應線程池中全部睡眠的線程。this
必需要搞清楚的是"對應鎖",由於在調用wait()、notify()和notifyAll()時都必須明確指定鎖。例如,obj1.wait()。若是省略了所屬鎖,則表示的是this這個對象,也就是說,只有在非靜態的同步函數中才能省略這三個方法的前綴。spa
簡而言之,當使用了同步,就使用了鎖,線程也就有了歸屬,它的全部依據都由所屬鎖來決定。例如,線程同步時,判斷鎖是否空閒以決定是否執行後面的代碼,亦決定是否去特定的線程池中睡眠,當喚醒時也只會喚醒所屬鎖對應線程池中的線程。線程
這幾個方法在應用上,通常在一次任務中,wait()和notify()/notifyAll()是成對出現且擇一執行的。換句話說,就是這一輪原子性同步執行過程當中,要麼執行wait()進入睡眠,要麼執行notify()喚醒線程池中的睡眠線程。要如何實現擇一執行,能夠考慮使用標記的方式來做爲判斷依據。參考後文的例子。
wait()系列的三個方法侷限性很大,由於不管是睡眠仍是喚醒的動做,都徹底和鎖耦合在一塊兒了。例如,鎖obj1關聯的線程只能喚醒obj1線程池中的線程,而沒法喚醒鎖obj2關聯的線程;再例如,在原來synchronized同步時,鎖是在開始同步時隱式地自動獲取的,且是在執行完一整個任務後,又隱式地自動釋放鎖,也就是說獲取鎖和釋放鎖的動做沒法人爲控制。
從JDK 1.5開始,java提供了java.util.concurrent.locks包,這個包中提供了Lock接口、Condition接口和ReadWriteLock接口,前兩個接口將鎖和監視器方法(睡眠、喚醒操做)解耦了。其中Lock接口只提供鎖,經過鎖方法newConditon()能夠生成一個或多個與該鎖關聯的監視器,每一個監視器都有本身的睡眠、喚醒方法。也就是說Lock替代了synchronized方法和同步代碼塊的使用,Condition替代了Object監視器方法的使用。 以下圖:
當某線程執行condition1.await()時,該線程將進入condition1監視器對應的線程池睡眠,當執行condition1.signal()時,將隨機喚醒condition1線程池中的任意一個線程,當執行condition1.signalAll()時,將喚醒condition1線程池中的全部線程。同理,對於condition2監視器也是同樣的。
即便有多個監視器,但只要它們關聯的是同一個鎖對象,就能夠跨監視器操做對方線程。例如condition1中的線程能夠執行condition2.signal()來喚醒condition2線程池中的某個線程。 要使用這種鎖、監視器的關聯方式,參考以下步驟:
import java.util.concurrent.locks.*;
Lock l = new ReentrantLock();
Condition con1 = l.newCondition();
condition con2 = l.newCondition();
l.lock();
try{
//包含await()、signal()或signalAll()的代碼段...
} finally {
l.unlock(); //因爲代碼段可能異常,但unlock()是必須執行的,因此必須使用try,且將unlock()放進finally段
複製代碼
具體用法見後文關於Lock、condition的示例代碼。
一個生產者線程,一個消費者線程,生產者每生產一個麪包放進盤子裏,消費者從盤子裏取出麪包進行消費。其中生產者判斷是否繼續生產的依據是盤子裏沒有面包,而消費者判斷是否消費的依據是盤子裏有面包。因爲這個模式中,盤子一直只放一個麪包,所以能夠把盤子省略掉,生產者和消費者直接手把手地交遞麪包便可。
首先須要描述這三個類,一是多線程共同操做的資源(此處即麪包),二是生產者,三是消費者。在下面的例子中,我把生產麪包和消費麪包的方法分別封裝到了生產者和消費者類中,若是把它們封裝在麪包類中則更容易理解。
//描述資源:麪包的名稱和編號,由編號決定麪包的號碼
class Bread {
public String name;
public int count = 1;
public boolean flag = false; //該標記爲wait()和notify()提供判斷標記
}
//生產者和消費者前後處理的麪包資源是同一個,要確保這一點,
//能夠按單例模式來設計麪包類,也能夠將同一個麪包對象經過構造方法傳遞給生產者和消費者,此處使用後一種方式。
//描述生產者
class Producer implements Runnable {
private Bread b; //生產者的成員:它要處理的資源
Producer(Bread b){
this.b = b;
}
//提供生產麪包的方法
public void produce(String name){
b.name = name + b.count;
b.count++;
}
public void run(){
while(true){
synchronized(Bread.class){ //使用Bread.class做爲鎖標識,使得生產者和消費者的同步代碼塊可使用同一個鎖
if(b.flag){ //wait()必須在同步代碼塊內部,不只由於必須持有鎖才能睡眠,並且對鎖這個資源的判斷會出現混亂
try{Bread.class.wait();}catch(InterruptedException i){}
}
produce("麪包");
System.out.println(Thread.currentThread().getName()+"----生產者------"+b.name);
try{Thread.sleep(10);}catch(InterruptedException i){}
b.flag = true; //標記的切換也必須在保持同步
Bread.class.notify(); //notify()也必須同步,不然鎖都已經釋放了,就沒法作喚醒動做
//ps:一次同步任務中,wait()和notify()應當只能其中一個執行,不然對方線程會混亂
}
}
}
}
//描述消費者
class Consumer implements Runnable {
private Bread b; //消費者的成員:它要處理的資源
Consumer(Bread b){
this.b = b;
}
//提供消費麪包的方法
public String consume(){
return b.name;
}
public void run(){
while(true){
synchronized(Bread.class){
if(!b.flag){
try{Bread.class.wait();}catch(InterruptedException i){}
}
System.out.println(Thread.currentThread().getName()+"----消費者-------------"+consume());
try{Thread.sleep(10);}catch(InterruptedException i){}
b.flag = false;
Bread.class.notify();
}
}
}
}
public class ProduceConsume_1{
public static void main(String[] args) {
//1.建立資源對象
Bread b = new Bread();
//2.建立生產者和消費者對象,將同一個麪包對象傳遞給生產者和消費者
Producer pro = new Producer(b);
Consumer con = new Consumer(b);
//3.建立線程對象
Thread pro_t = new Thread(pro);
Thread con_t = new Thread(con);
pro_t.start();
con_t.start();
}
}
複製代碼
最後的執行結果應當生產一個、消費一個,如此不斷循環。以下:
Thread-0----生產者------麪包1
Thread-1----消費者-------------麪包1
Thread-0----生產者------麪包2
Thread-1----消費者-------------麪包2
Thread-0----生產者------麪包3
Thread-1----消費者-------------麪包3
Thread-0----生產者------麪包4
Thread-1----消費者-------------麪包4
Thread-0----生產者------麪包5
Thread-1----消費者-------------麪包5
Thread-0----生產者------麪包6
Thread-1----消費者-------------麪包6
複製代碼
代碼以下:
import java.util.concurrent.locks.*;
class Bread {
public String name;
public int count = 1;
public boolean flag = false;
//爲生產者和消費者提供同一個鎖對象以及同一個Condition對象
public static Lock lock = new ReentrantLock();
public static Condition condition = lock.newCondition();
}
class Producer implements Runnable {
private Bread b;
Producer(Bread b){
this.b = b;
}
public void produce(String name){
b.name = name + b.count;
b.count++;
}
public void run(){
while(true){
//使用Bread.lock來鎖住資源
Bread.lock.lock();
try{
if(b.flag){
try{Bread.condition.await();}catch(InterruptedException i){}
}
produce("麪包");
System.out.println(Thread.currentThread().getName()+"----生產者------"+b.name);
try{Thread.sleep(10);}catch(InterruptedException i){}
b.flag = true;
Bread.condition.signal();
} finally {
Bread.lock.unlock();
}
}
}
}
class Consumer implements Runnable {
private Bread b;
Consumer(Bread b){
this.b = b;
}
public String consume(){
return b.name;
}
public void run(){
while(true){
//使用Bread.lock來鎖住資源
Bread.lock.lock();
try{
if(!b.flag){
try{Bread.condition.await();}catch(InterruptedException i){}
}
System.out.println(Thread.currentThread().getName()+"----消費者-------------"+consume());
try{Thread.sleep(10);}catch(InterruptedException i){}
b.flag = false;
Bread.condition.signal();
} finally {
Bread.lock.unlock();
}
}
}
}
public class ProduceConsume_1{
public static void main(String[] args) {
//1.建立資源對象
Bread b = new Bread();
//2.建立生產者和消費者對象,將同一個麪包對象傳遞給生產者和消費者
Producer pro = new Producer(b);
Consumer con = new Consumer(b);
//3.建立線程對象
Thread pro_t = new Thread(pro);
Thread con_t = new Thread(con);
pro_t.start();
con_t.start();
}
}
複製代碼
這裏先說明多生產者多消費者,但同一個時刻最多隻能有一個麪包的模式,這個模式在實際中多是不理想的,但爲了引出後面真實的多生產多消費模式,我以爲有必要在這裏解釋這種模式,而且分析這種模式以及如何從單生產單消費的代碼演變而來。 以下圖:
從單生產單消費到多生產多消費,由於多線程安全問題和死鎖問題,因此有兩個方面的問題須要考慮:
對於某一方來講,如何讓多線程達到和單線程一樣的生產或消費能力?也就是說,如何讓多線程看上去就是單線程。多線程和單線程最大的區別在於多線程安全問題,所以,只要保證多線程執行的任務可以同步便可。
第1個問題考慮的是某一方多線程的問題,第2個問題考慮的是兩方如何能和諧配合完成生產消費問題。也就是如何保證生產方和消費方一方活動的同時另外一方睡眠。只需在某一方執行完同步任務時,喚醒另外一方便可。
其實從單線程到多線程,就兩個問題須要考慮:不一樣步和死鎖。(1)當生產方和消費方都出現了多線程,能夠將生產方的多線程當作一個線程總體、消費方的多線程也當作一個總體,這解決的是線程安全問題。(2)再將生產方總體和消費方總體兩方結合起來當作多線程,來解決死鎖問題,而java中解決死鎖的方式就是喚醒對方或喚醒全部。
問題是如何保證某一方的多線程之間同步?以多線程執行單消費方的代碼爲例進行分析。
while(true){
synchronized(Bread.class){
if(!b.flag){
try{Bread.class.wait();}catch(InterruptedException i){}
}
System.out.println(Thread.currentThread().getName()+"----消費者-------------"+consume());
try{Thread.sleep(10);}catch(InterruptedException i){}
b.flag = false;
Bread.class.notify();
}
}
複製代碼
假設消費線程1消費完一個麪包後喚醒了消費線程2,並繼續循環,判斷if(!flag),它將wait,因而鎖被釋放。假設CPU正好選中了消費線程2,那麼消費線程2也將進入wait。當生產方生產了一個麪包後,假設喚醒了消費線程1,它將從wait語句處繼續向下消費剛生產完的麪包,假設正好再次喚醒了消費線程2,當消費線程2被CPU選中後,消費線程2也將從wait語句處向下消費,消費的也是剛纔生產的麪包,問題再此出現了,連續喚醒的消費線程1和2消費的是同一個麪包,也就是說麪包被重複消費了。這又是多線程不一樣步問題。
說了一大段,其實將視線放大後分析就很簡單了,只要某一方的2個或多個線程都由於判斷b.flag而wait,那麼這兩個或多個線程有可能會被連續喚醒而繼續向下生產或消費。這形成了多線程不一樣步問題。
不安全的問題就出在同一方的多個線程在連續喚醒後繼續向下生產或消費。這是if語句引發的,若是可以讓wait的線程在喚醒後還回頭判斷b.flag是否爲true,就能讓其決定是否繼續wait仍是向下生產或消費。
能夠將if語句替換爲while語句來知足要求。這樣一來,不管某一方的多個線程是否被連續喚醒,它們都將回頭判斷b.flag。
while(true){
synchronized(Bread.class){
while(!b.flag){
try{Bread.class.wait();}catch(InterruptedException i){}
}
System.out.println(Thread.currentThread().getName()+"----消費者-------------"+consume());
try{Thread.sleep(10);}catch(InterruptedException i){}
b.flag = false;
Bread.class.notify();
}
}
複製代碼
解決了第一個多線程安全的問題,但會出現死鎖問題。這很容易分析,將生產方看做一個總體,將消費方也看做一個總體,當生產方線程都wait了(生產方的線程被連續喚醒時會出現該方線程所有wait),消費方也都wait了,死鎖就出現了。其實放大了看,將生產方、消費方分別看做一個線程,這兩個線程組成多線程,當某一方wait後沒法喚醒另外一方,另外一方也必定會wait,因而就死鎖了。
對於雙方死鎖的問題,只要保證能喚醒對方,而非本方連續喚醒就能解決。使用notifyAll()或signalAll()便可,也能夠經過signal()喚醒對方線程解決,見下面的第二段代碼。
根據上面的分析,將單生產、單消費模式的代碼改進一下,就能夠變爲多生產多消費單面包模式。、
//代碼段1
class Bread {
public String name;
public int count = 1;
public boolean flag = false;
}
//描述生產者
class Producer implements Runnable {
private Bread b;
Producer(Bread b){
this.b = b;
}
public void produce(String name){
b.name = name + b.count;
b.count++;
}
public void run(){
while(true){
synchronized(Bread.class){
while(b.flag){
try{Bread.class.wait();}catch(InterruptedException i){}
}
produce("麪包");
System.out.println(Thread.currentThread().getName()+"----生產者------"+b.name);
try{Thread.sleep(10);}catch(InterruptedException i){}
b.flag = true;
Bread.class.notifyAll();
}
}
}
}
//描述消費者
class Consumer implements Runnable {
private Bread b;
Consumer(Bread b){
this.b = b;
}
public String consume(){
return b.name;
}
public void run(){
while(true){
synchronized(Bread.class){
while(!b.flag){
try{Bread.class.wait();}catch(InterruptedException i){}
}
System.out.println(Thread.currentThread().getName()+"----消費者-------------"+consume());
try{Thread.sleep(10);}catch(InterruptedException i){}
b.flag = false;
Bread.class.notifyAll();
}
}
}
}
public class ProduceConsume_5 {
public static void main(String[] args) {
//1.建立資源對象
Bread b = new Bread();
//2.建立生產者和消費者對象
Producer pro = new Producer(b);
Consumer con = new Consumer(b);
//3.建立線程對象
Thread pro_t1 = new Thread(pro); //生產線程1
Thread pro_t2 = new Thread(pro); //生產線程2
Thread con_t1 = new Thread(con); //消費線程1
Thread con_t2 = new Thread(con); //消費線程2
pro_t1.start();
pro_t2.start();
con_t1.start();
con_t2.start();
}
}
複製代碼
如下是採用Lock和Conditon重構後的代碼,使用的是signal()喚醒對方線程的方法。
//代碼段2
import java.util.concurrent.locks.*;
class Bread {
public String name;
public int count = 1;
public boolean flag = false;
public static Lock lock = new ReentrantLock();
public static Condition pro_con = lock.newCondition();
public static Condition con_con = lock.newCondition();
}
//描述生產者
class Producer implements Runnable {
private Bread b;
Producer(Bread b){
this.b = b;
}
public void produce(String name){
b.name = name + b.count;
b.count++;
}
public void run(){
while(true){
Bread.lock.lock();
try{
while(b.flag){
try{Bread.pro_con.await();}catch(InterruptedException i){}
}
produce("麪包");
System.out.println(Thread.currentThread().getName()+"----生產者------"+b.name);
try{Thread.sleep(10);}catch(InterruptedException i){}
b.flag = true;
Bread.con_con.signal(); //喚醒的是consumer線程
} finally {
Bread.lock.unlock();
}
}
}
}
//描述消費者
class Consumer implements Runnable {
private Bread b;
Consumer(Bread b){
this.b = b;
}
public String consume(){
return b.name;
}
public void run(){
while(true){
Bread.lock.lock();
try{
while(!b.flag){
try{Bread.con_con.await();}catch(InterruptedException i){}
}
System.out.println(Thread.currentThread().getName()+"----消費者-------------"+consume());
try{Thread.sleep(10);}catch(InterruptedException i){}
b.flag = false;
Bread.pro_con.signal(); //喚醒的是producer線程
} finally {
Bread.lock.unlock();
}
}
}
}
public class ProduceConsume_6 {
public static void main(String[] args) {
//1.建立資源對象
Bread b = new Bread();
//2.建立生產者和消費者對象
Producer pro = new Producer(b);
Consumer con = new Consumer(b);
//3.建立線程對象
Thread pro_t1 = new Thread(pro);
Thread pro_t2 = new Thread(pro);
Thread con_t1 = new Thread(con);
Thread con_t2 = new Thread(con);
pro_t1.start();
pro_t2.start();
con_t1.start();
con_t2.start();
}
}
複製代碼
關於多生產、多消費問題作個總結: (1).解決某一方多線程不一樣步的方案是使用while(flag)來判斷是否wait; (2).解決雙方死鎖問題的方案是喚醒對方,可使用notifyAll(),signalAll()或對方監視器的signal()方法。
有多個生產者線程,多個消費者線程,生產者將生產的麪包放進籃子(集合或數組)裏,消費者從籃子裏取出麪包。生產者判斷繼續生產的依據是籃子已經滿了,消費者判斷繼續消費的依據是籃子是否空了。此外,當消費者取出麪包後,對應的位置又空了,生產者能夠回頭從籃子的起始位置繼續生產,這能夠經過重置籃子的指針來實現。
在這個模式裏,除了描述生產者、消費者、麪包,還須要描述籃子這個容器。假設使用數組做爲容器,生產者每生產一個,生產指針向後移位,消費者每消費一個,消費指針向後移位。
代碼以下:可參考API-->Condition類中給出的示例代碼
import java.util.concurrent.locks.*;
class Basket {
private Bread[] arr;
//the size of basket
Basket(int size){
arr = new Bread[size];
}
//the pointer of in and out
private int in_ptr,out_ptr;
//how many breads left in basket
private int left;
private Lock lock = new ReentrantLock();
private Condition full = lock.newCondition();
private Condition empty = lock.newCondition();
//bread into basket
public void in(){
lock.lock();
try{
while(left == arr.length){
try{full.await();} catch (InterruptedException i) {i.printStackTrace();}
}
arr[in_ptr] = new Bread("MianBao",Producer.num++);
System.out.println("Put the bread: "+arr[in_ptr].getName()+"------into basket["+in_ptr+"]");
left++;
if(++in_ptr == arr.length){in_ptr = 0;}
empty.signal();
} finally {
lock.unlock();
}
}
//bread out from basket
public Bread out(){
lock.lock();
try{
while(left == 0){
try{empty.await();} catch (InterruptedException i) {i.printStackTrace();}
}
Bread out_bread = arr[out_ptr];
System.out.println("Get the bread: "+out_bread.getName()+"-----------from basket["+out_ptr+"]");
left--;
if(++out_ptr == arr.length){out_ptr = 0;}
full.signal();
return out_bread;
} finally {
lock.unlock();
}
}
}
class Bread {
private String name;
Bread(String name,int num){
this.name = name + num;
}
public String getName(){
return this.name;
}
}
class Producer implements Runnable {
private Basket basket;
public static int num = 1; //the first number for Bread's name Producer(Basket b){ this.basket = b; } public void run(){ while(true) { basket.in(); try{Thread.sleep(10);}catch(InterruptedException i){} } } } class Consumer implements Runnable { private Basket basket; private Bread i_get; Consumer(Basket b){ this.basket = b; } public void run(){ while(true){ i_get = basket.out(); try{Thread.sleep(10);}catch(InterruptedException i){} } } } public class ProduceConsume_7 { public static void main(String[] args) { Basket b = new Basket(20); // the basket size = 20 Producer pro = new Producer(b); Consumer con = new Consumer(b); Thread pro_t1 = new Thread(pro); Thread pro_t2 = new Thread(pro); Thread con_t1 = new Thread(con); Thread con_t2 = new Thread(con); Thread con_t3 = new Thread(con); pro_t1.start(); pro_t2.start(); con_t1.start(); con_t2.start(); con_t3.start(); } } 複製代碼
這裏涉及了消費者、生產者、麪包和籃子,其中麪包和籃子是多線程共同操做的資源,生產者線程生產麪包放進籃子,消費者線程從籃子中取出麪包。理想的代碼是將生產任務和消費任務都封裝在資源類中,由於麪包是籃子容器的元素,因此不適合封裝到麪包類中,並且封裝到籃子中,能更方便地操做容器。
注意,必定要將全部涉及資源操做的代碼都放進鎖的內部,不然會產生多線程不一樣步問題。例如,在Producer類中定義了生產麪包的方法produce(),而後將其做爲放進籃子的方法basket.in()的參數,即basket.in(producer()),這是錯誤的行爲,由於produce()是在鎖的外部執行後才傳遞給in()方法的。