1、兩個線程一個生產者一個消費者java
需求情景多線程
涉及問題併發
代碼實現(共三個類和一個main方法的測試類)ide
Resource.java函數
package com.demo.ProducerConsumer; /** * 資源 * @author lixiaoxi * */ public class Resource { /*資源序號*/ private int number = 0; /*資源標記*/ private boolean flag = false; /** * 生產資源 */ public synchronized void create() { if (flag) {//先判斷標記是否已經生產了,若是已經生產,等待消費; try { wait();//讓生產線程等待 } catch (InterruptedException e) { e.printStackTrace(); } } number++;//生產一個 System.out.println(Thread.currentThread().getName() + "生產者------------" + number); flag = true;//將資源標記爲已經生產 notify();//喚醒在等待操做資源的線程(隊列) } /** * 消費資源 */ public synchronized void destroy() { if (!flag) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + "消費者****" + number); flag = false; notify(); } }
Producer.java測試
package com.demo.ProducerConsumer; /** * 生產者 * @author lixiaoxi * */ public class Producer implements Runnable{ private Resource resource; public Producer(Resource resource) { this.resource = resource; } @Override public void run() { while (true) { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } resource.create(); } } }
Consumer.javathis
package com.demo.ProducerConsumer; /** * 消費者 * @author lixiaoxi * */ public class Consumer implements Runnable{ private Resource resource; public Consumer(Resource resource) { this.resource = resource; } @Override public void run() { while (true) { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } resource.destroy(); } } }
ProducerConsumerTest.javaspa
package com.demo.ProducerConsumer; public class ProducerConsumerTest { public static void main(String args[]) { Resource resource = new Resource(); new Thread(new Producer(resource)).start();//生產者線程 new Thread(new Consumer(resource)).start();//消費者線程 } }
打印結果:線程
以上打印結果能夠看出沒有任何問題。3d
2、多個線程,多個生產者和多個消費者的問題
需求情景
涉及問題
再次測試代碼
ProducerConsumerTest.java
package com.demo.ProducerConsumer; public class ProducerConsumerTest { public static void main(String args[]) { Resource resource = new Resource(); new Thread(new Producer(resource)).start();//生產者線程 new Thread(new Producer(resource)).start();//生產者線程 new Thread(new Consumer(resource)).start();//消費者線程 new Thread(new Consumer(resource)).start();//消費者線程 } }
運行結果:
經過以上打印結果發現問題
緣由分析
解決方案
代碼改進(Resource中的 if -> while)
Resource.java
package com.demo.ProducerConsumer; /** * 資源 * @author lixiaoxi * */ public class Resource { /*資源序號*/ private int number = 0; /*資源標記*/ private boolean flag = false; /** * 生產資源 */ public synchronized void create() { while (flag) {//先判斷標記是否已經生產了,若是已經生產,等待消費; try { wait();//讓生產線程等待 } catch (InterruptedException e) { e.printStackTrace(); } } number++;//生產一個 System.out.println(Thread.currentThread().getName() + "生產者------------" + number); flag = true;//將資源標記爲已經生產 notify();//喚醒在等待操做資源的線程(隊列) } /** * 消費資源 */ public synchronized void destroy() { while (!flag) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + "消費者****" + number); flag = false; notify(); } }
運行結果:
再次發現問題
緣由分析
解決方案
最後代碼改進(Resource中的 notify() -> notifyAll())
Resource.java
package com.demo.ProducerConsumer; /** * 資源 * @author lixiaoxi * */ public class Resource { /*資源序號*/ private int number = 0; /*資源標記*/ private boolean flag = false; /** * 生產資源 */ public synchronized void create() { while (flag) {//先判斷標記是否已經生產了,若是已經生產,等待消費; try { wait();//讓生產線程等待 } catch (InterruptedException e) { e.printStackTrace(); } } number++;//生產一個 System.out.println(Thread.currentThread().getName() + "生產者------------" + number); flag = true;//將資源標記爲已經生產 notifyAll();//喚醒在等待操做資源的線程(隊列) } /** * 消費資源 */ public synchronized void destroy() { while (!flag) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + "消費者****" + number); flag = false; notifyAll(); } }
運行結果:
以上就大功告成了,沒有任何問題。
再來梳理一下整個流程。按照示例,生產者消費者交替運行,每次生產後都有對應的消費者,測試類建立實例,若是是生產者先運行,進入run()方法,進入create()方法,flag默認爲false,number+1,生產者生產一個產品,flag置爲true,同時調用notifyAll()方法,喚醒全部正在等待的線程,接下來若是仍是生產者運行呢?這是flag爲true,進入while循環,執行wait()方法,接下來若是是消費者運行的話,調用destroy()方法,這時flag爲true,消費者購買了一次產品,隨即將flag置爲false,並喚醒全部正在等待的線程。這就是一次完整的多生產者對應多消費者的問題。
3、使用Lock和Condition來解決生產者消費者問題
上面的代碼有一個問題,就是咱們爲了不全部的線程都處於等待的狀態,使用了notifyAll方法來喚醒全部的線程,即notifyAll喚醒的是本身方和對方線程。若是我須要只是喚醒對方的線程,好比:生產者只能喚醒消費者的線程,消費者只能喚醒生產者的線程。
在jdk1.5當中爲咱們提供了多線程的升級解決方案:
1. 將同步synchronized替換成了Lock操做。
2. 將Object中的wait,notify,notifyAll方法替換成了Condition對象。
3. 能夠只喚醒對方的線程。
完整代碼:
Resource1.java
package com.demo.ProducerConsumer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 資源 * @author lixiaoxi * */ public class Resource1 { /*資源序號*/ private int number = 0; /*資源標記*/ private boolean flag = false; private Lock lock = new ReentrantLock(); //使用lock創建生產者的condition對象 private Condition condition_pro = lock.newCondition(); //使用lock創建消費者的condition對象 private Condition condition_con = lock.newCondition(); /** * 生產資源 */ public void create() throws InterruptedException { try{ lock.lock(); //先判斷標記是否已經生產了,若是已經生產,等待消費 while(flag){ //生產者等待 condition_pro.await(); } //生產一個 number++; System.out.println(Thread.currentThread().getName() + "生產者------------" + number); //將資源標記爲已經生產 flag = true; //生產者生產完畢後,喚醒消費者的線程(注意這裏不是signalAll) condition_con.signal(); }finally{ lock.unlock(); } } /** * 消費資源 */ public void destroy() throws InterruptedException{ try{ lock.lock(); //先判斷標記是否已經消費了,若是已經消費,等待生產 while(!flag){ //消費者等待 condition_con.await(); } System.out.println(Thread.currentThread().getName() + "消費者****" + number); //將資源標記爲已經消費 flag = false; //消費者消費完畢後,喚醒生產者的線程 condition_pro.signal(); }finally{ lock.unlock(); } } }
Producer1.java
package com.demo.ProducerConsumer; /** * 生產者 * @author lixiaoxi * */ public class Producer1 implements Runnable{ private Resource1 resource; public Producer1(Resource1 resource) { this.resource = resource; } @Override public void run() { while (true) { try { Thread.sleep(10); resource.create(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
Consumer1.java
package com.demo.ProducerConsumer; /** * 消費者 * @author lixiaoxi * */ public class Consumer1 implements Runnable{ private Resource1 resource; public Consumer1(Resource1 resource) { this.resource = resource; } @Override public void run() { while (true) { try { Thread.sleep(10); resource.destroy(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
ProducerConsumerTest1.java
package com.demo.ProducerConsumer; public class ProducerConsumerTest1 { public static void main(String args[]) { Resource1 resource = new Resource1(); new Thread(new Producer1(resource)).start();//生產者線程 new Thread(new Producer1(resource)).start();//生產者線程 new Thread(new Consumer1(resource)).start();//消費者線程 new Thread(new Consumer1(resource)).start();//消費者線程 } }
運行結果:
4、總結
一、若是生產者、消費者都是1個,那麼flag標記能夠用if判斷。這裏有多個,必須用while判斷。
二、在while判斷的同時,notify函數可能喚醒本類線程(如一個消費者喚醒另外一個消費者),這會致使全部消費者忙等待,程序沒法繼續往下執行。使用notifyAll函數代替notify能夠解決這個問題,notifyAll能夠保證非本類線程被喚醒(消費者線程能喚醒生產者線程,反之也能夠),解決了忙等待問題。
當心假死
生產者/消費者模型最終達到的目的是平衡生產者和消費者的處理能力,達到這個目的的過程當中,並不要求只有一個生產者和一個消費者。能夠多個生產者對應多個消費者,能夠一個生產者對應一個消費者,能夠多個生產者對應一個消費者。
假死就發生在上面三種場景下。假死指的是所有線程都進入了WAITING狀態,那麼程序就再也不執行任何業務功能了,整個項目呈現停滯狀態。
比方說有生產者A和生產者B,緩衝區因爲空了,消費者處於WAITING。生產者B處於WAITING,生產者A被消費者通知生產,生產者A生產出來的產品本應該通知消費者,結果通知了生產者B,生產者B被喚醒,發現緩衝區滿了,因而繼續WAITING。至此,兩個生產者線程處於WAITING,消費者處於WAITING,系統假死。
上面的分析能夠看出,假死出現的緣由是由於notify的是同類,因此非單生產者/單消費者的場景,能夠採起兩種方法解決這個問題:
(1)synchronized用notifyAll()喚醒全部線程、ReentrantLock用signalAll()喚醒全部線程。
(2)用ReentrantLock定義兩個Condition,一個表示生產者的Condition,一個表示消費者的Condition,喚醒的時候調用相應的Condition的signal()方法就能夠了。