想要了解更多關於Java生產者消費者問題的演變嗎?那就看看這篇文章吧,咱們分別用舊方法和新方法來處理這個問題。html
生產者消費者問題是一個典型的多進程同步問題。java
對於大多數人來講,這個問題多是咱們在學校,執行第一次並行算法所遇到的第一個同步問題。算法
雖然它很簡單,但一直是並行計算中的最大挑戰 - 多個進程共享一個資源。安全
生產者和消費者兩個程序,共享一個大小有限的公共緩衝區。數據結構
假設一個生產者「生產」一份數據並將其存儲在緩衝區中,而一個消費者「消費」這份數據,並將這份數據從緩衝區中刪除。多線程
再假設如今這兩個程序在併發地運行,咱們須要確保當緩衝區的數據已滿時,生產者不會放置新數據進來,也要確保當緩衝區的數據爲空時,消費者不會試圖刪除數據緩衝區的數據。併發
爲了解決上述的併發問題,生產者和消費者將不得不相互通訊。app
若是緩衝區已滿,生產者將處於睡眠狀態,直到有通知信息喚醒。ide
在消費者將一些數據從緩衝區刪除後,消費者將通知生產者,隨後生產者將從新開始填充數據到緩衝區中。性能
若是緩衝區內容爲空的化,那麼狀況是同樣的,只不過,消費者會先等待生產者的通知。
但若是這種溝通作得不恰當,在進程彼此等待的位置可能致使程序死鎖。
首先來看一個典型的Java方案來解決這個問題。
package ProducerConsumer; import java.util.LinkedList; import java.util.Queue; public class ClassicProducerConsumerExample { public static void main(String[] args) throws InterruptedException { Buffer buffer = new Buffer(2); Thread producerThread = new Thread(new Runnable() { @Override public void run() { try { buffer.produce(); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread consumerThread = new Thread(new Runnable() { @Override public void run() { try { buffer.consume(); } catch (InterruptedException e) { e.printStackTrace(); } } }); producerThread.start(); consumerThread.start(); producerThread.join(); consumerThread.join(); } static class Buffer { private Queue<Integer> list; private int size; public Buffer(int size) { this.list = new LinkedList<>(); this.size = size; } public void produce() throws InterruptedException { int value = 0; while (true) { synchronized (this) { while (list.size() >= size) { // wait for the consumer wait(); } list.add(value); System.out.println("Produced " + value); value++; // notify the consumer notify(); Thread.sleep(1000); } } } public void consume() throws InterruptedException { while (true) { synchronized (this) { while (list.size() == 0) { // wait for the producer wait(); } int value = list.poll(); System.out.println("Consume " + value); // notify the producer notify(); Thread.sleep(1000); } } } } }
這裏咱們有生產者和消費者兩個線程,它們共享一個公共緩衝區。生產者線程開始產生新的元素並將它們存儲在緩衝區。若是緩衝區已滿,那麼生產者線程進入睡眠狀態,直到有通知喚醒。不然,生產者線程將會在緩衝區建立一個新元素而後通知消費者。就像我以前說的,這個過程也適用於消費者。若是緩衝區爲空,那麼消費者將等待生產者的通知。不然,消費者將從緩衝區刪除一個元素並通知生產者。
正如你所看到的,在以前的例子中,生產者和消費者的工做都是管理緩衝區的對象。這些線程僅僅調用了buffer.produce()和buffer.consume()兩個方法就搞定了一切。
對於緩衝區是否應該負責建立或者刪除元素,一直都是一個有爭議的話題,但在我看來,緩衝區不該該作這種事情。固然,這取決於你想要達到的目的,但在這種狀況下,緩衝區應該只是負責以線程安全的形式存儲合併元素,而不是生產新的元素。
因此,讓咱們把生產和消費的邏輯從緩衝對象中進行解耦。
package ProducerConsumer; import java.util.LinkedList; import java.util.Queue; public class ProducerConsumerExample2 { public static void main(String[] args) throws InterruptedException { Buffer buffer = new Buffer(2); Thread producerThread = new Thread(() -> { try { int value = 0; while (true) { buffer.add(value); System.out.println("Produced " + value); value ++; Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); Thread consumerThread = new Thread(() -> { try { while (true) { int value = buffer.poll(); System.out.println("Consume " + value); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); producerThread.start(); consumerThread.start(); producerThread.join(); consumerThread.join(); } static class Buffer { private Queue<Integer> list; private int size; public Buffer(int size) { this.list = new LinkedList<>(); this.size = size; } public void add(int value) throws InterruptedException { synchronized (this) { while (list.size() >= size) { wait(); } list.add(value); notify(); } } public int poll() throws InterruptedException { synchronized (this) { while (list.size() == 0) { wait(); } int value = list.poll(); notify(); return value; } } } }
這樣好多了,至少如今緩衝區僅僅負責以線程安全的形式來存儲和刪除元素。
不過,咱們還能夠進一步改善。
在前面的例子中,咱們已經建立了一個緩衝區,每當存儲一個元素以前,緩衝區將等待是否有可用的一個槽以防止沒有足夠的存儲空間,而且,在合併以前,緩衝區也會等待一個新的元素出現,以確保存儲和刪除的操做是線程安全的。
可是,Java自己的庫已經整合了這些操做。它被稱之爲BlockingQueue,在這裏能夠查看它的詳細文檔。
BlockingQueue是一個以線程安全的形式存入和取出實例的隊列。而這就是咱們所須要的。
因此,若是咱們在示例中使用BlockingQueue,咱們就不須要再去實現等待和通知的機制。
接下來,咱們來看看具體的代碼。
package ProducerConsumer; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; public class ProducerConsumerWithBlockingQueue { public static void main(String[] args) throws InterruptedException { BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque<>(2); Thread producerThread = new Thread(() -> { try { int value = 0; while (true) { blockingQueue.put(value); System.out.println("Produced " + value); value++; Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); Thread consumerThread = new Thread(() -> { try { while (true) { int value = blockingQueue.take(); System.out.println("Consume " + value); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); producerThread.start(); consumerThread.start(); producerThread.join(); consumerThread.join(); } }
雖然runnables看起來跟以前同樣,他們按照以前的方式生產和消費元素。
惟一的區別在於,這裏咱們使用blockingQueue代替緩衝區對象。
這兒有不少種類型的BlockingQueue:
×××隊列
有界隊列
一個×××隊列幾乎能夠無限地增長元素,任何添加操做將不會被阻止。
你能夠以這種方式去建立一個×××隊列:
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>();
在這種狀況下,因爲添加操做不會被阻塞,生產者添加新元素時能夠不用等待。每次當生產者想要添加一個新元素時,會有一個隊列先存儲它。可是,這裏面也存在一個異常須要捕獲。若是消費者刪除元素的速度比生產者添加新的元素要慢,那麼內存將被填滿,咱們將可能獲得一個OutOfMemory異常。
與之相反的則是有界隊列,存在一個固定大小。你能夠這樣去建立它:
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(10);
二者最主要的區別在於,使用有界隊列的狀況下,若是隊列內存已滿,而生產者仍然試圖往裏面塞元素,那麼隊列將會被阻塞(具體阻塞方式取決於添加元素的方法)直到有足夠的空間騰出來。
往blocking queue裏面添加元素一共有如下四種方式:
add() - 若是插入成功返回true,不然拋出IllegalStateException
put() - 往隊列中插入元素,並在有必要的狀況下等待一個可用的槽(slot)
offer() - 若是插入元素成功返回true,不然返回false
offer(E e, long timeout, TimeUnit unit) – 在隊列沒有滿的狀況下,或者爲了一個可用的slot而等待指定的時間後,往隊列中插入一個元素。
因此,若是你使用put()方法插入元素,而隊列內存已滿的狀況下,咱們的生產者就必須等待,直到有可用的slot出現。
以上就是咱們上一個案例的所有,這跟ProducerConsumerExample2的工做原理是同樣的。
還有什麼地方咱們能夠優化的?那首先來分析一下咱們幹了什麼,咱們實例化了兩個線程,一個被叫作生產者,專門往隊列裏面塞元素,另外一個被叫作消費者,負責從隊列裏面刪元素。
然而,好的軟件技術代表,手動地去建立和銷燬線程是很差的作法。首先建立線程是一項昂貴的任務,每建立一個線程,意味着要經歷一遍下面的步驟:
首先要分配內存給一個線程堆棧
操做系統要建立一個原生線程對應於Java的線程
跟這個線程相關的描述符被添加到JVM內部的數據結構中
首先別誤會我,咱們的案例中用了幾個線程是沒有問題的,而那也是併發工做的方式之一。這裏的問題是,咱們是手動地去建立線程,這能夠說是一次糟糕的實踐。若是咱們手動地建立線程,除了建立過程當中的消耗外,還有另外一個問題,就是咱們沒法控制同時有多少個線程在運行。舉個例子,若是同時有一百萬次請求線上服務,那麼每一次請求都會相應的建立一個線程,那麼同時會有一百萬個線程在後臺運行,這將會致使[thread starvation](https://en.wikipedia.org/wiki/Starvation_(computer_science))
因此,咱們須要一種全局管理線程的方式,這就用到了線程池。
線程池將基於咱們選擇的策略來處理線程的生命週期。它擁有有限數量的空閒線程,並在須要解決任務時啓用它們。經過這種方式,咱們不須要爲每個新的請求建立一個新線程,所以,咱們能夠避免出現線程飢餓的問題。
Java線程池的實現包括:
一個任務隊列
一個工做線程的集合
一個線程工廠
管理線程池狀態的元數據
爲了同時運行一些任務,你必須把他們先放到任務隊列裏。而後,當一個線程可用的時候,它將接收一個任務並運行它。可用的線程越多,並行執行的任務就越多。
除了管理線程生命週期,使用線程池還有另外一個好處,當你計劃如何分割任務,以便同時執行時,你能想到更多種方式。並行性的單位再也不是線程了,而是任務。你設計一些任務來併發執行,而不是讓一些線程經過共享公共的內存塊來併發運行。按照功能需求來思考的方式能夠幫助咱們避免一些常見的多線程問題,如死鎖或數據競爭等。沒有什麼能夠阻止咱們再次深刻這些問題,可是,因爲使用了功能範式,咱們沒辦法命令式地同步並行計算(鎖)。這比直接使用線程和共享內存所能碰到的概率要少的多。在咱們的例子中,共享一個阻塞隊列不是想要的狀況,但我就是想強調這個優點。
說了那麼多,接下來咱們看看在案例中如何使用線程池。
package ProducerConsumer; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; public class ProducerConsumerExecutorService { public static void main(String[] args) { BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque<>(2); ExecutorService executor = Executors.newFixedThreadPool(2); Runnable producerTask = () -> { try { int value = 0; while (true) { blockingQueue.put(value); System.out.println("Produced " + value); value++; Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }; Runnable consumerTask = () -> { try { while (true) { int value = blockingQueue.take(); System.out.println("Consume " + value); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }; executor.execute(producerTask); executor.execute(consumerTask); executor.shutdown(); } }
這裏的區別在於,咱們不在手動建立或運行消費者和生產者線程。咱們創建一個線程池,它將收到兩個任務,生產者和消費者的任務。生產者和消費者的任務,實際上跟以前例子裏面使用的runnable是相同的。如今,執行程序(線程池實現)將接收任務,並安排它的工做線程去執行他們。
在咱們簡單的案例下,一切都跟以前同樣運行。就像以前的例子,咱們仍然有兩個線程,他們仍然要以一樣的方式生產和消費元素。雖然咱們並無讓性能獲得提高,可是代碼看起來乾淨多了。咱們再也不手動建立線程,而只是具體說明咱們想要什麼:咱們想要併發執行某些任務。
因此,當你使用一個線程池時。你不須要考慮線程是併發執行的單位,相反的,你把一些任務看做併發執行的就好。以上就是你須要知道的,剩下的由執行程序去處理。執行程序會收到一些任務,而後,它會分配工做線程去處理它們。
首先,咱們看到了一個「傳統」的消費者-生產者問題的解決方案。咱們儘可能避免了重複造沒有必要的車輪,偏偏相反,咱們重用了已經測試過的解決方案,所以,咱們不是寫一個通知等待系統,而是嘗試使用Java已經提供的blocking queue,由於Java爲咱們提供了一個很是有效的線程池來管理線程生命週期,讓咱們能夠擺脫手動建立線程。經過這些改進,消費者-生產者問題的解決方案看起來更可靠和更好理解。