生產者消費者問題(英語:Producer-consumer problem),也稱有限緩衝問題(英語:Bounded-buffer problem),是一個多線程同步問題的經典案例。該問題描述了共享固定大小緩衝區的兩個線程——即所謂的「生產者」和「消費者」——在實際運行時會發生的問題。生產者的主要做用是生成必定量的數據放到緩衝區中,而後重複此過程。與此同時,消費者也在緩衝區消耗這些數據。該問題的關鍵就是要保證生產者不會在緩衝區滿時加入數據,消費者也不會在緩衝區中空時消耗數據。
.java
要解決該問題,就必須讓生產者在緩衝區滿時休眠(要麼乾脆就放棄數據),等到下次消費者消耗緩衝區中的數據的時候,生產者才能被喚醒,開始往緩衝區添加數據。一樣,也可讓消費者在緩衝區空時進入休眠,等到生產者往緩衝區添加數據以後,再喚醒消費者。一般採用進程間通訊的方法解決該問題。若是解決方法不夠完善,則容易出現死鎖的狀況。出現死鎖時,兩個線程都會陷入休眠,等待對方喚醒本身。該問題也能被推廣到多個生產者和消費者的情形。
多線程
因爲前兩點緣由,所以須要保持線程間的同步,即一個線程消費(或生產)完,其餘線程才能進行競爭CPU,得到消費(或生產)的機會。對於這一點,可使用條件變量進行線程間的同步:生產者線程在product以前,須要wait直至獲取本身所需的信號量以後,纔會進行product的操做;一樣,對於消費者線程,在consume以前須要wait直到沒有線程在訪問共享區(緩衝區),再進行consume的操做,以後再解鎖並喚醒其餘可用阻塞線程。
性能
在訪問共享區資源時,爲避免多個線程同時訪問資源形成混亂,須要對共享資源加鎖,從而保證某一時刻只有一個線程在訪問共享資源。測試
/**資源類**/ class Data { private int number = 0; /** * 判斷等待、業務、通知 */ //+1 public synchronized void increment() throws InterruptedException { if (number != 0) { // 等待 this.wait(); } number++; System.out.println(Thread.currentThread().getName() + "=>" + number); // 通知其餘線程,我+1完畢了 this.notifyAll(); } //-1 public synchronized void decrement() throws InterruptedException { if (number == 0) { //等待 this.wait(); } number--; System.out.println(Thread.currentThread().getName() + "=>" + number); // 通知其餘線程,我-1完畢了 this.notifyAll(); } }
package com.xgp.pc; /** * 線程之間的通訊問題:生產者和消費者問題! 等待喚醒 通知 * 線程交替問題 A B 操做同一個變量 num = 0 * A num+1 * B num-1 * @author 薛國鵬 */ @SuppressWarnings("all") public class A { public static void main(String[] args) { Data data = new Data(); new Thread(() -> {for(int i = 0;i < 10;i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"A").start(); new Thread(() -> {for(int i = 0;i < 10;i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"B").start(); } }
A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 進程完成,退出碼 0
package com.xgp.pc; /** * 線程之間的通訊問題:生產者和消費者問題! 等待喚醒 通知 * 線程交替問題 A B 操做同一個變量 num = 0 * A num+1 * B num-1 * @author 薛國鵬 */ @SuppressWarnings("all") public class A { public static void main(String[] args) { Data data = new Data(); new Thread(() -> {for(int i = 0;i < 10;i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"A").start(); new Thread(() -> {for(int i = 0;i < 10;i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"B").start(); new Thread(() -> {for(int i = 0;i < 10;i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"C").start(); new Thread(() -> {for(int i = 0;i < 10;i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"D").start(); } }
A=>1 B=>0 A=>1 B=>0 C=>1 A=>2 D=>1 D=>0 A=>1 C=>2 B=>1 B=>0 C=>1 A=>2 D=>1 D=>0 A=>1 C=>2 B=>1 B=>0 C=>1 A=>2 D=>1 D=>0 A=>1 C=>2 B=>1 B=>0 C=>1 A=>2 D=>1 D=>0 A=>1 C=>2 B=>1 B=>0 C=>1 D=>0 C=>1 D=>0 進程完成,退出碼 0
/**資源類**/ class Data { private int number = 0; /** * 判斷等待、業務、通知 */ //+1 public synchronized void increment() throws InterruptedException { while (number != 0) { // 等待 this.wait(); } number++; System.out.println(Thread.currentThread().getName() + "=>" + number); // 通知其餘線程,我+1完畢了 this.notifyAll(); } //-1 public synchronized void decrement() throws InterruptedException { while (number == 0) { //等待 this.wait(); } number--; System.out.println(Thread.currentThread().getName() + "=>" + number); // 通知其餘線程,我-1完畢了 this.notifyAll(); } }
A=>1 B=>0 A=>1 B=>0 C=>1 B=>0 A=>1 D=>0 C=>1 B=>0 A=>1 D=>0 C=>1 B=>0 A=>1 D=>0 C=>1 B=>0 A=>1 D=>0 C=>1 B=>0 A=>1 D=>0 C=>1 B=>0 A=>1 D=>0 C=>1 B=>0 A=>1 D=>0 C=>1 B=>0 A=>1 D=>0 C=>1 D=>0 C=>1 D=>0 進程完成,退出碼 0
package com.xgp.pc; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @SuppressWarnings("all") public class B { public static void main(String[] args) { Data2 data = new Data2(); new Thread(() -> {for(int i = 0;i < 10;i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"A").start(); new Thread(() -> {for(int i = 0;i < 10;i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"B").start(); new Thread(() -> {for(int i = 0;i < 10;i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"C").start(); new Thread(() -> {for(int i = 0;i < 10;i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"D").start(); } } /**資源類**/ class Data2 { private int number = 0; /** * 判斷等待、業務、通知 */ Lock lock = new ReentrantLock(); //鎖監視器(取代了對象監視器的使用) Condition condition = lock.newCondition(); //+1 public void increment() throws InterruptedException { lock.lock(); try { while (number != 0) { // 等待 condition.await(); //等待 } number++; System.out.println(Thread.currentThread().getName() + "=>" + number); // 通知其餘線程,我+1完畢了 condition.signalAll(); }catch (Exception e) { e.printStackTrace(); }finally { lock.unlock(); } } //-1 public void decrement() throws InterruptedException { lock.lock(); try { while (number == 0) { //等待 condition.await(); } number--; System.out.println(Thread.currentThread().getName() + "=>" + number); // 通知其餘線程,我-1完畢了 condition.signalAll(); }catch (Exception e) { e.printStackTrace(); }finally { lock.unlock(); } } }
A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 C=>1 D=>0 C=>1 D=>0 C=>1 D=>0 C=>1 D=>0 C=>1 D=>0 C=>1 D=>0 C=>1 D=>0 C=>1 D=>0 C=>1 D=>0 C=>1 D=>0 進程完成,退出碼 0
package com.xgp.pc; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @SuppressWarnings("all") public class C { public static void main(String[] args) { Data3 data = new Data3(); new Thread(() -> {for(int i = 0;i < 10;i++) { try { data.printA(); } catch (Exception e) { e.printStackTrace(); } } },"A").start(); new Thread(() -> {for(int i = 0;i < 10;i++) { try { data.printB(); } catch (Exception e) { e.printStackTrace(); } } },"B").start(); new Thread(() -> {for(int i = 0;i < 10;i++) { try { data.printC(); } catch (Exception e) { e.printStackTrace(); } } },"C").start(); } } class Data3 { private int number = 1; private Lock lock = new ReentrantLock(); //鎖監視器(取代了對象監視器的使用) private Condition condition1 = lock.newCondition(); private Condition condition2 = lock.newCondition(); private Condition condition3 = lock.newCondition(); public void printA() { lock.lock(); try { while (number != 1) { condition1.await(); } System.out.println(Thread.currentThread().getName()); //喚醒指定的B number = 2; condition2.signal(); }catch (Exception e) { e.printStackTrace(); }finally { lock.unlock(); } } public void printB() { lock.lock(); try { while (number != 2) { condition2.await(); } System.out.println(Thread.currentThread().getName()); //喚醒指定的B number = 3; condition3.signal(); }catch (Exception e) { e.printStackTrace(); }finally { lock.unlock(); } } public void printC() { lock.lock(); try { while (number != 3) { condition3.await(); } System.out.println(Thread.currentThread().getName()); //喚醒指定的B number = 1; condition1.signal(); }catch (Exception e) { e.printStackTrace(); }finally { lock.unlock(); } } }
A B C A B C A B C A B C A B C A B C A B C A B C A B C A B C 進程完成,退出碼 0