簡介:html
生產者、消費者模型是多線程編程的常見問題,最簡單的一個生產者、一個消費者線程模型大多數人都可以寫出來,可是一旦條件發生變化,咱們就很容易掉進多線程的bug中。這篇文章主要講解了生產者和消費者的數量,商品緩存位置數量,商品數量等多個條件的不一樣組合下,寫出正確的生產者消費者模型的方法。java
歡迎探討,若有錯誤敬請指正 編程
如需轉載,請註明出處 http://www.cnblogs.com/nullzx/緩存
定義商品類多線程
package demo; /*定義商品*/ public class Goods { public final String name; public final int price; public final int id; public Goods(String name, int price, int id){ this.name = name; /*類型*/ this.price = price; /*價格*/ this.id = id; /*商品序列號*/ } @Override public String toString(){ return "name: " + name + ", price:"+ price + ", id: " + id; } }
基本要求: dom
1)生產者不能重複生產一個商品,也就是說不能有兩個id相同的商品ide
2)生產者不能覆蓋一個商品(當前商品還未被消費,就被下一個新商品覆蓋)。也就是說消費商品時,商品的id屬性能夠不連續,但不能出現缺號的狀況this
3)消費者不能重複消費一個商品atom
1.1使用線程對象,一個生產者線程,一個消費者線程,一個商品存儲位置spa
package demo; import java.util.Random; /*使用線程對象,一個緩存位置,一個生產者,一個消費者,無限生產商品消費商品*/ public class ProducterComsumerDemo1 { /*定義一個商品緩存位置*/ private volatile Goods goods; /*定義一個對象做爲鎖,不使用goods做爲鎖是由於生產者每次會產生一個新的對象*/ private Object obj = new Object(); /*isFull == true 生產者線程休息,消費者線程消費 *isFull == false 消費者線程休息,生產者線程生產*/ private volatile boolean isFull = false; /*商品的id編號,生產者製造的每一個商品的id都不同,每生產一個id自增1*/ private int id = 1; /*隨機產生一個sleep時間*/ private Random rnd = new Random(); /*=================定義消費者線程==================*/ public class ComsumeThread implements Runnable{ @Override public void run(){ try{ while(true){ /*獲取obj對象的鎖, id 和 isFull 的操做都在同步代碼塊中*/ synchronized(obj){ if(!isFull){ /*wait方法使當前線程阻塞,並釋放鎖*/ obj.wait(); } /*隨機延時一段時間*/ Thread.sleep(rnd.nextInt(250)); /*模擬消費商品*/ System.out.println(goods); /*隨機延時一段時間*/ Thread.sleep(rnd.nextInt(250)); isFull = false; /*喚醒阻塞obj上的生產者線程*/ obj.notify(); } /*隨機延時一段時間*/ Thread.sleep(rnd.nextInt(250)); } }catch (InterruptedException e){ /*什麼都不作*/ } } } /*=================定義生產者線程==================*/ public class ProductThread implements Runnable{ @Override public void run(){ try { while(true){ synchronized(obj){ if(isFull){ obj.wait(); } Thread.sleep(rnd.nextInt(500)); /*若是id爲偶數,生產價格爲2的產品A *若是id爲奇數,生產價格爲1的產品B*/ if(id % 2 == 0){ goods = new Goods("A", 2, id); }else{ goods = new Goods("B", 1, id); } Thread.sleep(rnd.nextInt(250)); id++; isFull = true; /*喚醒阻塞的消費者線程*/ obj.notify(); } } } catch (InterruptedException e) { /*什麼都不作*/ } } } public static void main(String[] args) throws InterruptedException{ ProducterComsumerDemo1 pcd = new ProducterComsumerDemo1(); Runnable c = pcd.new ComsumeThread(); Runnable p = pcd.new ProductThread(); new Thread(p).start(); new Thread(c).start(); } }
運行結果
name: B, price:1, id: 1 name: A, price:2, id: 2 name: B, price:1, id: 3 name: A, price:2, id: 4 name: B, price:1, id: 5 name: A, price:2, id: 6 name: B, price:1, id: 7 name: A, price:2, id: 8 name: B, price:1, id: 9 name: A, price:2, id: 10 name: B, price:1, id: 11 name: A, price:2, id: 12 name: B, price:1, id: 13 ……
從結果看出,商品類型交替生產,每一個商品的id都不相同,且不會漏過任何一個id,生產者沒有重複生產,消費者沒有重複消費,結果徹底正確。
1.2. 使用線程對象,多個生產者線程,多個消費者線程,1個緩存位置
1.2.1一個經典的bug
對於多生產者,多消費者這個問題,看起來咱們彷佛不用修改代碼,只需在main方法中多添加幾個線程就好。假設咱們須要三個消費者,一個生產者,那麼咱們只須要在main方法中再添加兩個消費者線程。
public static void main(String[] args) throws InterruptedException{ ProducterComsumerDemo1 pcd = new ProducterComsumerDemo1(); Runnable c = pcd.new ComsumeThread(); Runnable p = pcd.new ProductThread(); new Thread(c).start(); new Thread(p).start(); new Thread(c).start(); new Thread(c).start(); }
運行結果
name: B, price:1, id: 1 name: A, price:2, id: 2 name: A, price:2, id: 2 name: B, price:1, id: 3 name: B, price:1, id: 3 name: A, price:2, id: 4 name: A, price:2, id: 4 name: B, price:1, id: 5 name: B, price:1, id: 5 name: A, price:2, id: 6 ……
從結果中,咱們發現消費者重複消費了商品,因此這樣作顯然是錯誤的。這裏咱們定義多個消費者,一個生產者,因此遇到了重複消費的問題,若是定義成一個消費者,多個生產者就會遇到id覆蓋的問題。若是咱們定義多個消費者,多個生產者,那麼即會遇到重複消費,也會遇到id覆蓋的問題。注意,上面的代碼使用的notifyAll喚醒方法,若是使用notify方法喚醒bug仍然可能發生。
如今咱們來分析一下緣由。當生產者生產好了商品,會喚醒因沒有商品而阻塞消費者線程,假設喚醒的消費者線程超過兩個,這兩個線程會競爭獲取鎖,獲取到鎖的線程就會從obj.wait()方法中返回,而後消費商品,並把isFull置爲false,而後釋放鎖。當被喚醒的另外一個線程競爭獲取到鎖了之後也會從obj.wait()方法中返回。會再次消費同一個商品。顯然,每個被喚醒的線程應該再次檢查isFull這個條件。因此不管是消費者,仍是生產者,isFull的判斷必須改爲while循環,這樣才能獲得正確的結果而不受生產者的線程數和消費者的線程數的影響。
而對於只有一個生產者線程,一個消費者線程,用if判斷是沒有問題的,可是仍然強烈建議改爲while語句進行判斷。
1.2.2正確的姿式
package demo; import java.util.Random; /*使用線程對象,一個緩存位置,一個生產者,一個消費者,無限生產商品消費商品*/ public class ProducterComsumerDemo1 { /*定義一個商品緩存位置*/ private volatile Goods goods; /*定義一個對象做爲鎖,不使用goods做爲鎖是由於生產者每次會產生一個新的對象*/ private Object obj = new Object(); /*isFull == true 生產者線程休息,消費者線程消費 *isFull == false 消費者線程消費,生產者線程生產*/ private volatile boolean isFull = false; /*商品的id編號,生產者製造的每一個商品的id都不同,每生產一個id自增1*/ private int id = 1; /*隨機產生一個sleep時間*/ private Random rnd = new Random(); /*=================定義消費者線程==================*/ public class ComsumeThread implements Runnable{ @Override public void run(){ try{ while(true){ /*獲取obj對象的鎖, id 和 isFull 的操做都在同步代碼塊中*/ synchronized(obj){ while(!isFull){ /*wait方法使當前線程阻塞,並釋放鎖*/ obj.wait(); } /*隨機延時一段時間*/ Thread.sleep(rnd.nextInt(250)); /*模擬消費商品*/ System.out.println(goods); /*隨機延時一段時間*/ Thread.sleep(rnd.nextInt(250)); isFull = false; /*喚醒阻塞obj上的生產者線程*/ obj.notifyAll(); } /*隨機延時一段時間*/ Thread.sleep(rnd.nextInt(250)); } }catch (InterruptedException e){ /*我就是任性,這裏什麼都不作*/ } } } /*=================定義生產者線程==================*/ public class ProductThread implements Runnable{ @Override public void run(){ try { while(true){ synchronized(obj){ while(isFull){ obj.wait(); } Thread.sleep(rnd.nextInt(500)); /*若是id爲偶數,生產價格爲2的產品A 若是id爲奇數,生產價格爲1的產品B*/ if(id % 2 == 0){ goods = new Goods("A", 2, id); }else{ goods = new Goods("B", 1, id); } Thread.sleep(rnd.nextInt(250)); id++; isFull = true; /*喚醒阻塞的消費者線程*/ obj.notifyAll(); } } } catch (InterruptedException e) { /*我就是任性,這裏什麼都不作*/ } } } public static void main(String[] args) throws InterruptedException{ ProducterComsumerDemo1 pcd = new ProducterComsumerDemo1(); Runnable c = pcd.new ComsumeThread(); Runnable p = pcd.new ProductThread(); new Thread(p).start(); new Thread(p).start(); new Thread(p).start(); new Thread(c).start(); new Thread(c).start(); new Thread(c).start(); } }
1.3 使用線程對象,多個緩存位置(有界),多生產者,多消費者
1)當緩存位置滿時,咱們應該阻塞生產者線程
2)當緩存位置空時,咱們應該阻塞消費者線程
下面的代碼我沒有用java對象內置的鎖,而是用了ReentrantLock對象。是由於普通對象的鎖只有一個阻塞隊列,若是使用notify方式,沒法保證喚醒的就是特定類型的線程(消費者線程或生產者線程),而notifyAll方法會喚醒全部的線程,當剩餘的緩存商品的數量小於生產者線程數量或已緩存商品的數量小於消費者線程時效率就比較低。因此這裏咱們經過ReentrantLock對象構造兩個阻塞隊列提升效率。
1.3.1 普通方式
package demo; import java.util.LinkedList; import java.util.Random; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /*使用線程對象,多個緩存位置(有界),多生產者,多消費者,無限循環模式*/ public class ProducterComsumerDemo2 { /*最大緩存商品數*/ private final int MAX_SLOT = 2; /*定義緩存商品的容器*/ private LinkedList<Goods> queue = new LinkedList<Goods>(); /*定義線程鎖和鎖對應的阻塞隊列*/ private Lock lock = new ReentrantLock(); private Condition full = lock.newCondition(); private Condition empty = lock.newCondition(); /*商品的id編號,生產者製造的每一個商品的id都不同,每生產一個id自增1*/ private int id = 1; /*隨機產生一個sleep時間*/ private Random rnd = new Random(); /*=================定義消費者線程==================*/ public class ComsumeThread implements Runnable{ @Override public void run(){ while(true){ /*加鎖,queue的出列操做都在同步代碼塊中*/ lock.lock(); try { while(queue.isEmpty()){ System.out.println("queue is empty"); empty.await(); } /*隨機延時一段時間*/ Thread.sleep(rnd.nextInt(200)); /*模擬消費商品*/ Goods goods = queue.remove(); System.out.println(goods); /*隨機延時一段時間*/ Thread.sleep(rnd.nextInt(200)); /*喚醒阻塞的生產者線程*/ full.signal(); } catch (InterruptedException e) { /*什麼都不作*/ }finally{ lock.unlock(); } /*釋放鎖後隨機延時一段時間*/ try { Thread.sleep(rnd.nextInt(200)); } catch (InterruptedException e) { /*什麼都不作*/ } } } } /*=================定義生產者線程==================*/ public class ProductThread implements Runnable{ @Override public void run(){ while(true){ /*加鎖,queue的入列操做,id操做都在同步代碼塊中*/ lock.lock(); try{ while(queue.size() == MAX_SLOT){ System.out.println("queue is full"); full.await(); } Thread.sleep(rnd.nextInt(200)); Goods goods = null; /*根據序號產生不一樣的商品*/ switch(id%3){ case 0 : goods = new Goods("A", 1, id); break; case 1 : goods = new Goods("B", 2, id); break; case 2 : goods = new Goods("C", 3, id); break; } Thread.sleep(rnd.nextInt(200)); queue.add(goods); id++; /*喚醒阻塞的消費者線程*/ empty.signal(); }catch(InterruptedException e){ /*什麼都不作*/ }finally{ lock.unlock(); } /*釋放鎖後隨機延時一段時間*/ try { Thread.sleep(rnd.nextInt(100)); } catch (InterruptedException e) { /*什麼都不作*/ } } } } /*=================main==================*/ public static void main(String[] args) throws InterruptedException{ ProducterComsumerDemo2 pcd = new ProducterComsumerDemo2(); Runnable c = pcd.new ComsumeThread(); Runnable p = pcd.new ProductThread(); /*兩個生產者線程,兩個消費者線程*/ new Thread(p).start(); new Thread(p).start(); new Thread(c).start(); new Thread(c).start(); } }
運行結果
queue is empty queue is empty name: B, price:2, id: 1 name: C, price:3, id: 2 name: A, price:1, id: 3 queue is full name: B, price:2, id: 4 name: C, price:3, id: 5 queue is full name: A, price:1, id: 6 name: B, price:2, id: 7 name: C, price:3, id: 8 name: A, price:1, id: 9 name: B, price:2, id: 10 name: C, price:3, id: 11 name: A, price:1, id: 12 name: B, price:2, id: 13 name: C, price:3, id: 14 ……
1.3.2 更優雅的實現方式
下面使用線程池(ThreadPool)和阻塞隊列(LinkedBlockingQueue)原子類(AtomicInteger)以更加優雅的方式實現上述功能。LinkedBlockingQueue阻塞隊列僅在take和put方法上鎖,因此id必須定義爲原子類。
package demo; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; /*使用線程對象,多個緩存位置(有界),多生產者,多消費者,無限循環模式*/ public class ProducterComsumerDemo4 { /*最大緩存商品數*/ private final int MAX_SLOT = 3; /*定義緩存商品的容器*/ private LinkedBlockingQueue<Goods> queue = new LinkedBlockingQueue<Goods>(MAX_SLOT); /*商品的id編號,生產者製造的每一個商品的id都不同,每生產一個id自增1*/ private AtomicInteger id = new AtomicInteger(1); /*隨機產生一個sleep時間*/ private Random rnd = new Random(); /*=================定義消費者線程==================*/ public class ComsumeThread implements Runnable{ @Override public void run(){ while(true){ try { /*隨機延時一段時間*/ Thread.sleep(rnd.nextInt(200)); /*模擬消費商品*/ Goods goods = queue.take(); System.out.println(goods); /*隨機延時一段時間*/ Thread.sleep(rnd.nextInt(200)); } catch (InterruptedException e) { /*什麼都不作*/ } } } } /*=================定義生產者線程==================*/ public class ProductThread implements Runnable{ @Override public void run(){ while(true){ try{ int x = id.getAndIncrement(); Goods goods = null; Thread.sleep(rnd.nextInt(200)); /*根據序號產生不一樣的商品*/ switch(x%3){ case 0 : goods = new Goods("A", 1, x); break; case 1 : goods = new Goods("B", 2, x); break; case 2 : goods = new Goods("C", 3, x); break; } Thread.sleep(rnd.nextInt(200)); queue.put(goods); Thread.sleep(rnd.nextInt(100)); }catch(InterruptedException e){ /*什麼都不作*/ } } } } /*=================main==================*/ public static void main(String[] args) throws InterruptedException{ ProducterComsumerDemo4 pcd = new ProducterComsumerDemo4(); Runnable c = pcd.new ComsumeThread(); Runnable p = pcd.new ProductThread(); /*定義線程池*/ ExecutorService es = Executors.newCachedThreadPool(); /*三個生產者線程,兩個消費者線程*/ es.execute(p); es.execute(p); es.execute(p); es.execute(c); es.execute(c); es.shutdown(); } }
這個問題顯然比上面的問題要複雜很多,緣由在於要保證緩存區的商品要所有消費掉,沒有重複消費商品,沒有覆蓋商品,同時還要保證全部線程可以正常結束,防止存在一直阻塞的線程。
2.1 使用線程對象,多個緩存位置(有界),多生產者,多消費者
思路 定義一下三個變量
/*須要生產的總商品數*/ private final int TOTAL_NUM = 30; /*已產生的數量*/ private volatile int productNum = 0; /*已消耗的商品數*/ private volatile int comsumedNum = 0;
每生產一個商品 productNum 自增1,直到TOTAL_NUM爲止,若是不知足條件 productNum < TOTAL_NUM 則結束進程,自增操做必須在full.await()方法調用以前,防止生產者線程沒法喚醒。
同理,每消費一個商品 comsumedNum 自增1,直到TOTAL_NUM爲止,若是不知足條件 comsumedNum < TOTAL_NUM 則結束進程,自增操做必須在empty.await()方法調用以前,防止消費者線程沒法喚醒。
comsumedNum和productNum至關於計劃經濟時代的糧票同樣,有了它可以保證生產者線程在喚醒後必定須要生產一個商品,消費者線程在喚醒之後必定可以消費一個商品
package demo; import java.util.LinkedList; import java.util.Random; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /*使用線程對象,多個緩存位置(有界),多生產者,多消費者, 有限商品個數*/ public class ProducterComsumerDemo3 { /*須要生產的總商品數*/ private final int TOTAL_NUM = 30; /*已產生的數量*/ private volatile int productNum = 0; /*已消耗的商品數*/ private volatile int comsumedNum = 0; /*最大緩存商品數*/ private final int MAX_SLOT = 2; /*定義線程公用的鎖和條件*/ private Lock lock = new ReentrantLock(); private Condition full = lock.newCondition(); private Condition empty = lock.newCondition(); /*定義緩存商品的容器*/ private LinkedList<Goods> queue = new LinkedList<Goods>(); /*商品的id編號,生產者製造的每一個商品的id都不同,每生產一個id自增1*/ private int id = 1; /*隨機產生一個sleep時間*/ private Random rnd = new Random(); /*=================定義消費者線程==================*/ public class ComsumeThread implements Runnable{ @Override public void run(){ while(true){ /*加鎖, id、comsumedNum 操做都在同步代碼塊中*/ lock.lock(); try { /*隨機延時一段時間*/ Thread.sleep(rnd.nextInt(250)); if(comsumedNum < TOTAL_NUM){ comsumedNum++; }else{ /*這裏會自動執行finally的語句,釋放鎖*/ break; } while(queue.isEmpty()){ System.out.println("queue is empty"); empty.await(); } /*隨機延時一段時間*/ Thread.sleep(rnd.nextInt(250)); /*模擬消費商品*/ Goods goods = queue.remove(); System.out.println(goods); /*隨機延時一段時間*/ Thread.sleep(rnd.nextInt(250)); /*喚醒阻塞的生產者線程*/ full.signal(); } catch (InterruptedException e) { }finally{ lock.unlock(); } /*釋放鎖後,隨機延時一段時間*/ try { Thread.sleep(rnd.nextInt(250)); } catch (InterruptedException e) { } } System.out.println( "customer " + Thread.currentThread().getName() + " is over"); } } /*=================定義生產者線程==================*/ public class ProductThread implements Runnable{ @Override public void run(){ while(true){ lock.lock(); try{ /*隨機延時一段時間*/ Thread.sleep(rnd.nextInt(250)); if(productNum < TOTAL_NUM){ productNum++; }else{ /*這裏會自動執行finally的語句,釋放鎖*/ break; } Thread.sleep(rnd.nextInt(250)); while(queue.size() == MAX_SLOT){ System.out.println("queue is full"); full.await(); } Thread.sleep(rnd.nextInt(250)); Goods goods = null; /*根據序號產生不一樣的商品*/ switch(id%3){ case 0 : goods = new Goods("A", 1, id); break; case 1 : goods = new Goods("B", 2, id); break; case 2 : goods = new Goods("C", 3, id); break; } queue.add(goods); id++; /*喚醒阻塞的消費者線程*/ empty.signal(); }catch(InterruptedException e){ }finally{ lock.unlock(); } /*釋放鎖後,隨機延時一段時間*/ try { Thread.sleep(rnd.nextInt(250)); } catch (InterruptedException e) { /*什麼都不作*/ } } System.out.println( "producter " + Thread.currentThread().getName() + " is over"); } } /*=================main==================*/ public static void main(String[] args) throws InterruptedException{ ProducterComsumerDemo3 pcd = new ProducterComsumerDemo3(); ComsumeThread c = pcd.new ComsumeThread(); ProductThread p = pcd.new ProductThread(); new Thread(p).start(); new Thread(p).start(); new Thread(p).start(); new Thread(c).start(); new Thread(c).start(); new Thread(c).start(); System.out.println("main Thread is over"); } }
2.2利用線程池,原子類,阻塞隊列,以更優雅的方式實現
LinkedBlockingQueue阻塞隊列僅在take和put方法上鎖,因此productNum和comsumedNum必須定義爲原子類。
package demo; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; /*使用線程池,多個緩存位置(有界),多生產者,多消費者, 有限商品個數*/ public class LinkedBlockingQueueDemo { /*須要生產的總商品數*/ private final int TOTAL_NUM = 20; /*已產生商品的數量*/ volatile AtomicInteger productNum = new AtomicInteger(0); /*已消耗的商品數*/ volatile AtomicInteger comsumedNum = new AtomicInteger(0); /*最大緩存商品數*/ private final int MAX_SLOT = 5; /*同步阻塞隊列,隊列容量爲MAX_SLOT*/ private LinkedBlockingQueue<Goods> lbq = new LinkedBlockingQueue<Goods>(MAX_SLOT); /*隨機數*/ private Random rnd = new Random(); /*pn表示產品的編號,產品編號從1開始*/ private volatile AtomicInteger pn = new AtomicInteger(1); /*=================定義消費者線程==================*/ public class CustomerThread implements Runnable{ @Override public void run(){ while(comsumedNum.getAndIncrement() < TOTAL_NUM){ try{ /*隨機延時一段時間*/ Thread.sleep(rnd.nextInt(500)); /*從隊列中取出商品,隊列空時發生阻塞*/ Goods goods = lbq.take(); /*隨機延時一段時間*/ Thread.sleep(rnd.nextInt(500)); /*模擬消耗商品*/ System.out.println(goods); /*隨機延時一段時間*/ Thread.sleep(rnd.nextInt(500)); }catch(InterruptedException e){ } } System.out.println( "customer " + Thread.currentThread().getName() + " is over"); } } /*=================定義生產者線程==================*/ public class ProducerThread implements Runnable{ @Override public void run(){ while(productNum.getAndIncrement() < TOTAL_NUM){ try { int x = pn.getAndIncrement(); Goods goods = null; /*根據序號產生不一樣的商品*/ switch(x%3){ case 0 : goods = new Goods("A", 1, x); break; case 1 : goods = new Goods("B", 2, x); break; case 2 : goods = new Goods("C", 3, x); break; } /*隨機延時一段時間*/ Thread.sleep(rnd.nextInt(500)); /*產生的新產品入列,隊列滿時發生阻塞*/ lbq.put(goods); /*隨機延時一段時間*/ Thread.sleep(rnd.nextInt(500)); } catch (InterruptedException e1) { /*什麼都不作*/ } } System.out.println( "producter " + Thread.currentThread().getName() + " is over "); } } /*=================main==================*/ public static void main(String[] args){ LinkedBlockingQueueDemo lbqd = new LinkedBlockingQueueDemo(); Runnable c = lbqd.new CustomerThread(); Runnable p = lbqd.new ProducerThread(); ExecutorService es = Executors.newCachedThreadPool(); es.execute(c); es.execute(c); es.execute(c); es.execute(p); es.execute(p); es.execute(p); es.shutdown(); System.out.println("main Thread is over"); } }