在IT技術面試過程當中,咱們常常會遇到生產者消費者問題(Producer-consumer problem), 這是多線程併發協做問題的經典案例。場景中包含三個對象,生產者(Producer),消費者(Consumer)以及一個固定大小的緩衝區(Buffer)。生產者的主要做用是不斷生成數據放到緩衝區,消費者則從緩衝區不斷消耗數據。該問題的關鍵是如何線程安全的操做共享數據塊,保證生產者線程和消費者線程能夠正確的更新數據塊,主要考慮 1. 生產者不會在緩衝區滿時加入數據. 2. 消費者應當中止在緩衝區時消耗數據. 3. 在同一時間應當只容許一個生產者或者消費者訪問共享緩衝區(這一點是對於互斥操做訪問共享區塊的要求)。java
解決問題以上問題一般有信號量,wait & notify, 管道或者阻塞隊列等幾種思路。本文以Java語言爲例一一進行舉例講解。面試
信號量(Semaphore)也稱信號燈,是用來控制資源被同時訪問的個數,好比控制訪問數據庫最大鏈接數的數量,線程經過acquire()得到鏈接許可,完成數據操做後,經過release()釋放許可。對於生產者消費者問題來講,爲了知足線程安全操做的要求,同一時間咱們只容許一個線程訪問共享數據區,所以須要一個大小爲1的信號量mutex來控制互斥操做。注意到咱們還定義了notFull 和 notEmpty 信號量,notFull用於標識當前可用區塊的空間大小,當notFull size 大於0時代表"not full", producer 能夠繼續生產,等於0時表示空間已滿,沒法繼續生產;一樣,對於notEmpty信號量來講,大於0時代表 "not empty", consumer能夠繼續消耗,等於0 時代表沒有產品,沒法繼續消耗。notFull初始size 爲5 (5個available空間可供生產),notEmpty初始爲0(沒有產品可供消耗)。數據庫
/*** 數據倉儲class,全部的producer和consumer共享這個class對象 **/ static class DataWareHouse { //共享數據區 private final Queue<String> data = new LinkedList(); //非滿鎖 private final Semaphore notFull; //非空鎖 private final Semaphore notEmpty; //互斥鎖 private final Semaphore mutex; public DataWareHouse(int capacity) { this.notFull = new Semaphore(capacity); this.notEmpty = new Semaphore(0); mutex = new Semaphore(1); } public void offer(String x) throws InterruptedException { notFull.acquire(); //producer獲取信號,notFull信號量減一 mutex.acquire(); //當前進程得到信號,mutex信號量減1,其餘線程被阻塞操做共享區塊data data.add(x); mutex.release(); //mutex信號量+1, 其餘線程能夠繼續信號操做共享區塊data notEmpty.release(); //成功生產數據,notEmpty信號量加1 } public String poll() throws InterruptedException { notEmpty.acquire(); //notEmpty信號減一 mutex.acquire(); String result = data.poll(); mutex.release(); notFull.release(); //成功消耗數據, notFull信號量加1 return result; } } /**Producer線程**/ static class Producer implements Runnable { private final DataWareHouse dataWareHouse; public Producer(final DataWareHouse dataWareHouse) { this.dataWareHouse = dataWareHouse; } @Override public void run() { while (true) { try { Thread.sleep(100); //生產的速度慢於消耗的速率 String s = UUID.randomUUID().toString(); System.out.println("put data " + s); dataWareHouse.offer(s); } catch (InterruptedException e) { e.printStackTrace(); } } } } /**Consumer線程**/ static class Consumer implements Runnable { private final DataWareHouse dataWareHouse; public Consumer(final DataWareHouse dataWareHouse) { this.dataWareHouse = dataWareHouse; } @Override public void run() { while (true) { while (true) { try { System.out.println("get data " + dataWareHouse.poll()); } catch (InterruptedException e) { e.printStackTrace(); } } } } } //測試代碼 public static void main(String[] args) { final DataWareHouse dataWareHouse = new DataWareHouse(5); //三個producer 持續生產 for (int i = 0; i < 3; i++) { Thread t = new Thread(new Producer(dataWareHouse)); t.start(); } //三個consumer 持續消耗 for (int i = 0; i < 3; i++) { Thread t = new Thread(new Consumer(dataWareHouse)); t.start(); } }
Java Object對象類中包含三個final methods來容許線程之間進行通訊,告知資源的狀態。它們分別是wait(), notify(), 和notifyAll()。安全
wait(): 顧名思義告訴當前線程釋放鎖,陷入休眠狀態(waiting狀態),等待資源。wait 方法自己是一個native method,它在Java中的使用語法以下所示:多線程
synchronized(lockObject ) { while( ! condition ) { lockObject.wait(); } //take the action here; }
notify(): 用於喚醒waiting狀態的線程, 同時釋放鎖,被喚醒的線程能夠從新得到鎖訪問資源。它的基本語法 以下併發
synchronized(lockObject) { //establish_the_condition; lockObject.notify(); //any additional code if needed }
notifyAll(): 不一樣於notify(),它用於喚醒全部處於waiting狀態的線程。語法以下:dom
synchronized(lockObject) { establish_the_condition; lockObject.notifyAll(); }
說完了這三個方法,來看下如何使用wait & notify(All) 來解決咱們的問題。新的DataWareHouse 類以下所示:ide
//producer類和consumer共享對象 static class DataWareHouse { //共享數據區 private final Queue<String> data = new LinkedList(); private int capacity; private int size = 0; public DataWareHouse(int capacity) { this.capacity = capacity; } public synchronized void offer(String x) throws InterruptedException { while (size == capacity) { //當buffer滿時,producer進入waiting 狀態 this.wait(); //使用this對象來加鎖 } data.add(x); size++; notifyAll(); //當buffer 有數據時,喚醒全部等待的consumer線程 } public synchronized String poll() throws InterruptedException { while (size == 0) {//當buffer爲空時,consumer 進入等待狀態 this.wait(); } String result = data.poll(); size--; notifyAll(); //當數據被消耗,空間被釋放,通知全部等待的producer。 return result; } }
Note: 在方法上使用synchronized 等價於在方法體內使用synchronized(this),二者都是使用this對象做爲鎖。工具
生產者和消費者類,以及測試代碼和 信號量相同,不作重複列舉了。測試
管道Pipe是實現進程或者線程(線程之間一般經過共享內存實現通信,而進程則經過scoket,管道,消息隊列等技術)之間通訊經常使用方式,它鏈接輸入流和輸出流,基於生產者- 消費者模式構建的一種技術。具體實現能夠經過建立一個管道輸入流對象和管道輸出流對象,而後將輸入流和輸出流就行連接,生產者經過往管道中寫入數據,而消費者在管道數據流中讀取數據,經過這種方式就實現了線程之間的互相通信。
具體實現代碼以下所示
public class PipeSolution { static class DataWareHouse implements Closeable { private final PipedInputStream pis; private final PipedOutputStream pos; public DataWareHouse() throws IOException { pis = new PipedInputStream(); pos = new PipedOutputStream(); pis.connect(pos); //鏈接管道 } //向管道中寫入數據 public void offer(int val) throws IOException { pos.write(val); pos.flush(); } //從管道中取數據. public int poll() throws IOException { //當管道中沒有數據,方法阻塞 return pis.read(); } //關閉管道 @Override public void close() throws IOException { if (pis != null) { pis.close(); } if (pos != null) { pos.close(); } } } //consumer類 static class Consumer implements Runnable { private final DataWareHouse dataWareHouse; Consumer(DataWareHouse dataWareHouse) { this.dataWareHouse = dataWareHouse; } @Override public void run() { try { //消費者不斷從管道中讀取數據 while (true) { int num = dataWareHouse.poll(); System.out.println("get data +" + num); } } catch (IOException e) { throw new RuntimeException(e); } } } static class Producer implements Runnable { private final DataWareHouse dataWareHouse; private final Random random = new Random(); Producer(DataWareHouse dataWareHouse) { this.dataWareHouse = dataWareHouse; } @Override public void run() { try { //生產者不斷向管道中寫入數據 while (true) { int num = random.nextInt(256); dataWareHouse.offer(num); System.out.println("put data +" + num); Thread.sleep(1000); } } catch (Exception e) { throw new RuntimeException(e); } } public static void main(String[] args) throws IOException { DataWareHouse dataWareHouse = new DataWareHouse(); new Thread(new Producer(dataWareHouse)).start(); new Thread(new Consumer(dataWareHouse)).start(); } }
阻塞隊列(BlockingQueue),具備1. 當隊列滿了的時候阻塞入隊列操做 2. 當隊列空了的時候阻塞出隊列操做 3. 線程安全 的特性,於是阻塞隊列一般被視爲實現生產消費者模式最便捷的工具,其中DataWareHouse類實現代碼以下:
static class DataWareHouse { //共享數據區 private final BlockingQueue<String> blockingQueue; public DataWareHouse(int capacity) { this.blockingQueue = new ArrayBlockingQueue<>(capacity); } public void offer(String x) { blockingQueue.offer(x); } public String poll() { return blockingQueue.poll(); } }
生產者和消費者類,以及測試代碼和 信號量 相同,在此不作重複列舉了。
生產者消費者問題是面試中常常會遇到的題目,本文總結了幾種常見的實現方式,面試過程當中一般沒必要要向面試官描述過多實現細節,說出每種實現方式的特色便可。但願能給你們帶來幫助。