不知道你是否遇到過面試官讓你手寫生產者消費者代碼。別說,前段時間有小夥伴還真的遇到了這種狀況。當時是一臉懵逼。面試
可是,俗話說,從哪裏跌倒就要從哪裏爬起來。既然此次被問到了,那就回去好好研究一下,爭取下一次再也不被虐唄。segmentfault
因而,今天我決定手敲一個生產者消費者模式壓壓驚。(由於我也不想之後被面試官血虐啊)併發
生產者消費者模式,其實很簡單。無非就是生產者不停的生產數據,消費者不停的消費數據。(這不廢話嗎,字面意思我也知道啊)dom
咳咳。其實,咱們能夠拿水池來舉例。ide
好比,如今要用多個注水管往水池裏邊注水,那這些注水管就認爲是生產者。從水池裏邊抽水的抽水管就是消費者。水池自己就是一個緩衝區,用於生產者消費者之間的通信。測試
好的,跟着個人思路。this
既然生產者是生產數據的,那總得定義一個數據類吧(Data)spa
public class Data { private int id; private int num; public int getId() { return id; } public void setId(int id) { this.id = id; } public int getNum() { return num; } public void setNum(int num) { this.num = num; } public Data(int id, int num) { this.id = id; this.num = num; } public Data() { } }
以上數據,假設注水管每次注水的id和注水容量num(單位是升)都是遞增的。而且,單次出水管的出水量和注水管的注水量是一一對應的。線程
生產者的類Producer和消費者類Consumer內部都須要維護一個阻塞隊列,來存儲緩衝區的數據。code
public class Producer implements Runnable{ //共享阻塞隊列 private BlockingDeque<Data> queue; //是否還在運行 private volatile boolean isRunning = true; //id生成器 private static AtomicInteger count = new AtomicInteger(); //生成隨機數 private static Random random = new Random(); public Producer(BlockingDeque<Data> queue){ this.queue = queue; } @Override public void run() { try { while(isRunning){ //模擬注水耗時 Thread.sleep(random.nextInt(1000)); int num = count.incrementAndGet(); Data data = new Data(num, num); System.out.println("當前>>注水管:"+Thread.currentThread().getName()+"注水容量(L):"+num); if(!queue.offer(data,2, TimeUnit.SECONDS)){ System.out.println("注水失敗..."); } } }catch (Exception e){ e.printStackTrace(); } } public void stop(){ isRunning = false; } }
消費者:
public class Consumer implements Runnable{ private BlockingDeque<Data> queue ; private static Random random = new Random(); public Consumer(BlockingDeque<Data> queue){ this.queue = queue; } @Override public void run() { while (true){ try { Data data = queue.take(); //模擬抽水耗時 Thread.sleep(random.nextInt(1000)); if(data != null){ System.out.println("當前<<抽水管:"+Thread.currentThread().getName()+",抽取水容量(L):"+data.getNum()); } }catch (Exception e){ e.printStackTrace(); } } } }
測試類,假設有三個注水管和三個出水管(即六個線程)同時運行。等必定時間後,全部注水管中止注水,則當水池空(阻塞隊列爲空)的時候,出水管也將再也不出水。
public class TestProC { public static void main(String[] args) throws InterruptedException { BlockingDeque<Data> queue = new LinkedBlockingDeque<>(10); Producer producer1 = new Producer(queue); Producer producer2 = new Producer(queue); Producer producer3 = new Producer(queue); Consumer consumer1 = new Consumer(queue); Consumer consumer2 = new Consumer(queue); Consumer consumer3 = new Consumer(queue); ExecutorService service = Executors.newCachedThreadPool(); service.execute(producer1); service.execute(producer2); service.execute(producer3); service.execute(consumer1); service.execute(consumer2); service.execute(consumer3); Thread.sleep(3000); producer1.stop(); producer2.stop(); producer3.stop(); Thread.sleep(1000); service.shutdown(); } }
運行結果以下:
到最後一次注水20L的時候,全部注水管都中止注水了,但此時水池還沒空。因而,全部出水管繼續消費水資源,直到最後20L也被消費完。
以上,就是一個典型的生產者消費者模式。
能夠看到,這種模式有不少優勢:
1)能夠解耦消費者和生產者,由於它們是兩個不一樣的類,互相之間不會產生影響。
2)支持併發。生產者只管生產數據就好了,生產完直接把數據丟到緩衝區,而不須要等消費者消費完數據才能夠生產下一個數據。不然會形成阻塞,從而影響效率。
3)容許生產者和消費者有不一樣的處理速度。如,當生產者生產數據比較快的時候,會把消費者還沒來得及處理的數據先放到緩衝區。等有空閒的消費者了,再去緩衝區拿去數據。
另外,以上的緩衝區,咱們通常會使用阻塞隊列。就像上邊用的LinkedBlockingDeque。
這樣,當隊列滿的時候,會阻塞生產者繼續往隊列添加數據,直到有消費者來消費了隊列中的數據。當隊列空的時候,也會阻塞消費者從隊列獲取數據,直到有生產者把數據放入到隊列中。
阻塞隊列最好使用有界隊列(代碼中指定的容量爲10)。由於,若是生產者的速度遠遠大於消費者時,就會有可能形成隊列的元素一直增長,直到內存耗盡。固然,這也須要看實際的業務狀況。若是能保證生產者的數量在可控範圍內,不會給內存形成壓力,用無界隊列,也何嘗不可。
若是本文對你有用,歡迎點贊,評論,轉發。還能夠關注我「煙雨星空」,查看更多精彩內容,我會持續輸出原創文章。