經典併發同步模式:生產者-消費者設計模式

​ 在討論基於阻塞隊列的生產者消費者模式以前咱們先搞清楚到底什麼是生產者-消費者模式(producer-consumer模式)?html

什麼是生產者-消費者模式

好比有兩個進程A和B,它們共享一個固定大小的緩衝區,A進程產生數據放入緩衝區,B進程從緩衝區中取出數據進行計算,那麼這裏其實就是一個生產者和消費者的模式,A至關於生產者,B至關於消費者java

圖片描述

爲何要使用生產者消費者模式

在多線程開發中,若是生產者生產數據的速度很快,而消費者消費數據的速度很慢,那麼生產者就必須等待消費者消費完了數據纔可以繼續生產數據,由於生產那麼多也沒有地方放啊;同理若是消費者的速度大於生產者那麼消費者就會常常處理等待狀態,因此爲了達到生產者和消費者生產數據和消費數據之間的平衡,那麼就須要一個緩衝區用來存儲生產者生產的數據,因此就引入了生產者-消費者模式git

簡單來講這裏的緩衝區的做用就是爲了平衡生產者和消費者的處理能力,起到一個數據緩存的做用,同時也達到了一個解耦的做用github

生產者-消費者模式的特色

  • 保證生產者不會在緩衝區滿的時候繼續向緩衝區放入數據,而消費者也不會在緩衝區空的時候,消耗數據
  • 當緩衝區滿的時候,生產者會進入休眠狀態,當下次消費者開始消耗緩衝區的數據時,生產者纔會被喚醒,開始往緩衝區中添加數據;當緩衝區空的時候,消費者也會進入休眠狀態,直到生產者往緩衝區中添加數據時纔會被喚醒

圖片描述

生產者-消費者模式的應用場景

生產者-消費者模式通常用於將生產數據的一方和消費數據的一方分割開來,將生產數據與消費數據的過程解耦開來web

  • Excutor任務執行框架:redis

    • 經過將任務的提交和任務的執行解耦開來,提交任務的操做至關於生產者,執行任務的操做至關於消費者
    • 例如使用Excutor構建web服務器,用於處理線程的請求:生產者將任務提交給線程池,線程池建立線程處理任務,若是須要運行的任務數大於線程池的基本線程數,那麼就把任務扔到阻塞隊列(經過線程池+阻塞隊列的方式比只使用一個阻塞隊列的效率高不少,由於消費者可以處理就直接處理掉了,不用每一個消費者都要先從阻塞隊列中取出任務再執行)
  • 消息中間件activeMQ:編程

    • 雙十一的時候,會產生大量的訂單,那麼不可能同時處理那麼多的訂單,須要將訂單放入一個隊列裏面,而後由專門的線程處理訂單。這裏用戶下單就是生產者,處理訂單的線程就是消費者;再好比12306的搶票功能,先由一個容器存儲用戶提交的訂單,而後再由專門處理訂單的線程慢慢處理,這樣能夠在短期內支持高併發服務
  • 任務的處理時間比較長的狀況下:segmentfault

    • 好比上傳附近並處理,那麼這個時候能夠將用戶上傳和處理附件分紅兩個過程,用一個隊列暫時存儲用戶上傳的附近,而後馬上返回用戶上傳成功,而後有專門的線程處理隊列中的附近

生產者-消費者模式的優勢

  • 解耦:將生產者類和消費者類進行解耦,消除代碼之間的依賴性,簡化工做負載的管理
  • 複用:經過將生產者類和消費者類獨立開來,那麼能夠對生產者類和消費者類進行獨立的複用與擴展
  • 調整併發數:因爲生產者和消費者的處理速度是不同的,能夠調整併發數,給予慢的一方多的併發數,來提升任務的處理速度
  • 異步:對於生產者和消費者來講可以各司其職,生產者只須要關心緩衝區是否還有數據,不須要等待消費者處理完;一樣的對於消費者來講,也只須要關注緩衝區的內容,不須要關注生產者,經過異步的方式支持高併發,將一個耗時的流程拆成生產和消費兩個階段,這樣生產者由於執行put()的時間比較短,而支持高併發
  • 支持分佈式:生產者和消費者經過隊列進行通信,因此不須要運行在同一臺機器上,在分佈式環境中能夠經過redis的list做爲隊列,而消費者只須要輪詢隊列中是否有數據。同時還能支持集羣的伸縮性,當某臺機器宕掉的時候,不會致使整個集羣宕掉

生產者-消費者模式的實現

首先咱們從最簡單的開始,假設只有一個生產者線程執行put操做,向緩衝區中添加數據,同時也只有一個消費者線程從緩衝區中取出數據windows

圖片描述

UML實體關係圖,從UML類圖中能夠看出,咱們的producer和consumer類都持有一個對container對象的引用,這樣的設計模式實際上在不少設計模式都有用到,好比咱們的裝飾者模式等等,它們共同的目的都是爲了達到解耦和複用的效果設計模式

圖片描述

在實現生產者-消費者模式以前咱們須要搞清兩個問題:

  • 如何保證容器中數據狀態的一致性
  • 如何保證消費者和生產者之間的同步和協做關係

1)容器中數據狀態的一致性:當一個consumer執行了take()方法以後,此時容器爲空,可是還沒來得及更新容器的size,那麼另一個consumer來了以後覺得size不等於0,那麼繼續執行take(),從而形成了了狀態的不一致性

2)爲了保證當容器裏面沒有數據的時候,消費者不會繼續take,此時消費者釋放鎖,處於阻塞狀態;而且一旦生產者添加了一條數據以後,此時從新喚醒消費者,消費者從新獲取到容器的鎖,繼續執行take();

​ 當容器裏面滿的時候,生產者也不會繼續put, 此時生產者釋放鎖,處於阻塞狀態;一旦消費者take了一條數據,此時應該喚醒生產者從新獲取到容器的鎖,繼續put

圖片描述

因此對於該容器的任何訪問都須要進行同步,也就是說在獲取容器的數據以前,須要先獲取到容器的鎖。

而這裏對於容器狀態的同步能夠參考以下幾種方法:

  • Object的wait() / notify()方法
  • Semaphore的acquire()/release()方法
  • BlockingQueue阻塞隊列方法
  • Lock和Condition的await() / signal()方法
  • PipedInputStream/ PipedOutputStream

要構建一個生產者消費者模式,那麼首先就須要構建一個固定大小的緩衝區,而且該緩衝區具備可阻塞的put方法和take方法

一、利用內部線程之間的通訊:Object的wait() / notify()方法

接下來咱們採用第一種方法來實現該模型:使用Object的wait() / notify()方法實現生產者-消費者模型

ps:採用wait()/notify()方法的缺點是不能實現單生產者單消費者模式,由於要是用notify()就必須使用同步代碼塊

建立Container容器類

package test1;

import java.util.LinkedList;

public class Container {
    LinkedList<Integer> list = new LinkedList<Integer>();
    int capacity = 10;

    public void put(int value){
        while (true){
            try {
                //sleep不能放在同步代碼塊裏面,由於sleep不會釋放鎖,
                // 當前線程會一直佔有produce線程,直到達到容量,調用wait()方法主動釋放鎖
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            synchronized (this){
                //當容器滿的時候,producer處於等待狀態
                while (list.size() == capacity){
                    System.out.println("container is full,waiting ....");
                    try {
                        wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                //沒有滿,則繼續produce
                System.out.println("producer--"+ Thread.currentThread().getName()+"--put:" + value);
                list.add(value++);
                //喚醒其餘全部處於wait()的線程,包括消費者和生產者
                notifyAll();
            }
        }
    }

    public Integer take(){
        Integer val = 0;
        while (true){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            synchronized (this){
                //若是容器中沒有數據,consumer處於等待狀態
                while (list.size() == 0){
                    System.out.println("container is empty,waiting ...");
                    try {
                        wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                //若是有數據,繼續consume
                val = list.removeFirst();
                System.out.println("consumer--"+ Thread.currentThread().getName()+"--take:" + val);

                //喚醒其餘全部處於wait()的線程,包括消費者和生產者
                //notify必須放在同步代碼塊裏面
                notifyAll();
            }
        }


    }

}

ps:

  • sleep()的位置

這裏須要注意的是sleep()不能放在synchronized代碼塊裏面,由於咱們知道sleep()執行以後是不會釋放鎖的,也就是說當前線程仍然持有對container對象的互斥鎖,這個時候當前線程繼續判斷list.size是否等於capacity,不等於就繼續put,而後又sleep一會,而後又繼續,直到當list.size == capacity,這個時候終於進入wait()方法,咱們知道wait()方法會釋放鎖,這個時候其餘線程纔有機會獲取到container的互斥鎖,

  • notifyAll()不能單獨放在producer類裏面,由於notifyAll()必須放在同步代碼塊裏面
  • 弊端:這裏因爲不能區分哪些是not empty或者not full或者is full/empty線程,因此須要喚醒全部其餘等待的線程,但實際上咱們須要的是喚醒那些not empty或者not full的線程就夠了

建立生產者類

package test1;
import test1.Container;
import java.util.Random;

public class Producer implements Runnable{
    private Container container;
    public Producer(Container container) {
        this.container = container;
    }
    @Override
    public void run() {
        container.put(new Random().nextInt(100));
    }
}

建立消費者類

package test1;
import java.util.Random;

public class Consumer implements Runnable{
    private Container container;
    public Consumer(Container container) {
        this.container = container;
    }

    @Override
    public void run() {
        Integer val = container.take();
    }
}

測試類

package test1;

import test1.Consumer;
import test1.Container;
import test1.Producer;

public class Main {
    public static void main(String[] args){
        Container container = new Container();

        Thread producer1 = new Thread(new Producer(container));
        Thread producer2 = new Thread(new Producer(container));
        Thread producer3 = new Thread(new Producer(container));
        Thread producer4 = new Thread(new Producer(container));
        producer1.start();
        producer2.start();
        producer3.start();
        producer4.start();

        Thread consumer1 = new Thread(new Consumer(container));
        Thread consumer2 = new Thread(new Consumer(container));
        Thread consumer3 = new Thread(new Consumer(container));
        Thread consumer4 = new Thread(new Consumer(container));
        Thread consumer5 = new Thread(new Consumer(container));
        Thread consumer6 = new Thread(new Consumer(container));
        consumer1.start();
        consumer2.start();
        consumer3.start();
        consumer4.start();
        consumer5.start();
        consumer6.start();
    }
}

運行結果

producer--Thread-1--put:80
producer--Thread-2--put:19
producer--Thread-3--put:8
producer--Thread-0--put:74
consumer--Thread-8--take:80
consumer--Thread-4--take:19
consumer--Thread-6--take:8
consumer--Thread-9--take:74
container is empty,waiting ...
container is empty,waiting ...
producer--Thread-2--put:20
consumer--Thread-7--take:20
container is empty,waiting ...
producer--Thread-3--put:9
producer--Thread-1--put:81
producer--Thread-0--put:75
consumer--Thread-5--take:9
consumer--Thread-6--take:81
consumer--Thread-8--take:75
container is empty,waiting ...
container is empty,waiting ...
container is empty,waiting ...

二、利用信號量實現生產者-消費者模型

思路

生產者消費者模型中的共享資源是一個固定大小的緩衝區,該模式須要當緩衝區滿的時候,生產者再也不生產數據,直到消費者消費了一個數據以後,才繼續生產;同理當緩衝區空的時候,消費者再也不消費數據,直到生產者生產了一個數據以後,才繼續消費

若是要經過信號量來解決這個問題:關鍵在於找到可以跟蹤緩衝區的size大小變化,並根據緩衝區的數量變化來控制消費者和生產者線程之間的協做和運行

那麼很容易很夠想到用兩個信號量:empytyCount和fullCount分別來表示緩衝區滿或者空的狀態,進而可以更加容易控制消費者和生產者到底何時處於阻塞狀態,何時處於運行狀態

  • emptyCount = N ; fullCount = 0 ; useQueue = 1

同時爲了使得程序更加具備健壯性,咱們還添加一個二進制信號量useQueue,確保隊列的狀態的完整性不受損害。例如當兩個生產者同時向空隊列添加數據時,從而破壞了隊列內部的狀態,使得其餘計數信號量或者返回的緩衝區的size大小不具備一致性。(固然這裏也可使用mutex來代替二進制信號量)

produce:
    P(emptyCount)//信號量emptyCount減一
    P(useQueue)//二值信號量useQueue減一,變爲0(其餘線程不能進入緩衝區,阻塞狀態)
    putItemIntoQueue(item)//執行put操做
    V(useQueue)//二值信號量useQueue加一,變爲1(其餘線程能夠進入緩衝區)
    V(fullCount)//信號量fullCount加一
consume:
    P(fullCount)//fullCount -= 1
    P(useQueue)//useQueue -= 1(useQueue = 0)
    item ← getItemFromQueue()
    V(useQueue)//useQueue += 1 (useQueue = 1)
    V(emptyCount)//emptyCount += 1

ps: 這裏的兩個PV操做是否能夠顛倒

  • P操做不能夠

    首先生產者獲取到信號量emptyCount,執行P(emptyCount),確保emptyCount不等於0,也就是還有空間添加數據,從而纔可以進入臨界區container

    而後執行put操做,執行put操做以前須要爲緩衝區加把鎖,防止在put的過程當中,其餘線程對緩衝區進行修改,因此這個時候須要獲取另一個信號量useQueue

    相反,若是先執行了 P(useQueue),而且此時的emptyCount = 0,那麼生產者就會一直阻塞,直到消費者消費了一個數據;可是此時消費者又沒法獲取到互斥信號量useQueue,也會一直阻塞,因此就造成了一個死鎖

    因此這兩個p操做是不能交換順序的,信號量emptyCount是useQueue的基礎和前提條件

  • V操做能夠

    此時若是生產者已經執行完put操做,那麼能夠先釋放互斥信號量,再執行 V(fullCount);或者先執行 V(fullCount)再釋放互斥信號量都沒有關係。不會對其餘的生產者消費者的狀態產生影響;可是最好的仍是先釋放互斥鎖,再執行V(fullCount),這樣能夠保證當容器滿的時候,消費者可以及時的獲取到互斥鎖

代碼實現

Container

package test3;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Semaphore;

public class Container {
    Semaphore fullCount = new Semaphore(0);
    Semaphore emptyCount = new Semaphore(10);
    Semaphore isUse = new Semaphore(1);

    List list = new LinkedList<Integer>();

    public void  put(Integer val){

        try {
            emptyCount.acquire();
            isUse.acquire();

            list.add(val);
            System.out.println("producer--"+ Thread.currentThread().getName()+"--put:" + val+"===size:"+list.size());

        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            isUse.release();
            fullCount.release();
        }
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    public Integer get(){
        Integer val1 = 0;
        try {
            fullCount.acquire();
            isUse.acquire();

             val1 = (Integer) list.remove(0);
            System.out.println("consumer--"+ Thread.currentThread().getName()+"--take:" + val1+"===size:"+list.size());

        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            isUse.release();
            emptyCount.release();
        }

       return val1;

    }
}

生產者

package test3;

import java.util.Random;

public class Producer implements Runnable{
    private Container container;

    public Producer(Container container) {
        this.container = container;
    }

    @Override
    public void run() {
        while (true){

            container.put(new Random().nextInt(100));
        }
    }
}

消費者

package test3;

public class Consumer implements Runnable{
    private Container container;

    public Consumer(Container container) {
        this.container = container;
    }

    @Override
    public void run() {
        while (true){
            Integer val = container.get();

        }
    }
}

測試

package test3;

public class Test {
    public static void main(String[] args){

        Container container = new Container();

        Thread producer1 = new Thread(new Producer(container));
        Thread producer2 = new Thread(new Producer(container));
        Thread producer3 = new Thread(new Producer(container));

        Thread consumer1 = new Thread(new Consumer(container));
        Thread consumer2 = new Thread(new Consumer(container));
        Thread consumer3 = new Thread(new Consumer(container));
        Thread consumer4 = new Thread(new Consumer(container));

        producer1.start();
        producer2.start();
        producer3.start();

        consumer1.start();
        consumer2.start();
        consumer3.start();
        consumer4.start();

    }
}
producer--Thread-0--put:74===size:1
producer--Thread-4--put:16===size:2
producer--Thread-2--put:51===size:3
producer--Thread-1--put:77===size:4
producer--Thread-3--put:93===size:5
consumer--Thread-6--take:74===size:4
consumer--Thread-6--take:16===size:3
consumer--Thread-6--take:51===size:2
consumer--Thread-6--take:77===size:1
consumer--Thread-5--take:93===size:0
producer--Thread-4--put:19===size:1
producer--Thread-3--put:68===size:2
producer--Thread-0--put:72===size:3
consumer--Thread-6--take:19===size:2
consumer--Thread-6--take:68===size:1
consumer--Thread-5--take:72===size:0
producer--Thread-1--put:82===size:1
producer--Thread-2--put:32===size:2
consumer--Thread-5--take:82===size:1

三、基於阻塞隊列的生產者消費者模型

因爲這裏的緩衝區由BlockingQueue容器代替,那麼這裏咱們就不須要從新建立一個容器類了,直接建立生產者類和消費者類,而且一樣的都須要擁有一個容器類BlockingQueue的實例應用

建立生產者類

package test;

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;

public class Producer implements Runnable{
    private ArrayBlockingQueue<Integer> queue ;

    public Producer(ArrayBlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        Random random = new Random();
        while (true){
           try {
               Thread.sleep(100);
               if(queue.size() == 10) System.out.println("================the queue is full,the producer thread is waiting..................");
               int item = random.nextInt(100);
               queue.put(item);
               System.out.println("producer:" + Thread.currentThread().getName() + " produce:" + item+";the size of the queue:" + queue.size());
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
    }
}

建立消費者類

package test;

import java.util.concurrent.ArrayBlockingQueue;

public class Consumer implements Runnable {
    private ArrayBlockingQueue<Integer> queue;

    public Consumer(ArrayBlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
       while (true){
           try {
               Thread.sleep(100);
               if(queue.size() == 0) System.out.println("=============the queue is empty,the consumer thread is waiting................");
               Integer item = queue.take();
               System.out.println("consumer:" + Thread.currentThread().getName() + " consume:" + item+";the size of the queue:" + queue.size());
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }

    }
}

測試類

package test;

import java.util.concurrent.ArrayBlockingQueue;

public class Test {
    public static void main(String[] args){

        ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);
        Thread producer1 = new Thread(new Producer(queue));
        Thread producer2 = new Thread(new Producer(queue));
        Thread producer3 = new Thread(new Producer(queue));
        Thread producer4 = new Thread(new Producer(queue));
        Thread producer5 = new Thread(new Producer(queue));
        producer1.start();
        producer2.start();
        producer3.start();
        producer4.start();
        producer5.start();

        Thread consumer1 = new Thread(new Consumer(queue));
        Thread consumer2 = new Thread(new Consumer(queue));
        consumer1.start();
        consumer2.start();

        try {
            producer1.join();
            producer2.join();
            producer3.join();
            producer4.join();
            producer5.join();
            consumer1.join();
            consumer2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
=============the queue is empty,the consumer thread is waiting................
consumer:Thread-5 consume:64;the size of the queue:0
producer:Thread-3 produce:64;the size of the queue:1
consumer:Thread-6 consume:87;the size of the queue:0
producer:Thread-1 produce:1;the size of the queue:3
producer:Thread-4 produce:87;the size of the queue:2
producer:Thread-2 produce:71;the size of the queue:2
producer:Thread-0 produce:76;the size of the queue:1
consumer:Thread-6 consume:71;the size of the queue:2
producer:Thread-1 produce:26;the size of the queue:6
producer:Thread-3 produce:6;the size of the queue:6
producer:Thread-0 produce:76;the size of the queue:5
producer:Thread-2 produce:37;the size of the queue:6

四、Lock和Condition的await() / signal()方法

在用Lock和Condition的await()/signal()方法實現生產者消費者以前,咱們先來了解一下Lock和synchronized都是基於鎖有哪些區別,以及Condition的await()/signal()方法和Object的wait()/notify()方法都是等待和喚醒又有哪些區別

Lock和synchronized的區別

鎖機制 Lock synchronized
所屬層次 java.util.concurrent package中的一個接口 是一個關鍵字,JVM內置的語言實現
釋放鎖與加鎖 經過lock()/unlock()進行手動釋放與加鎖 不須要,進入synchronized同步代碼塊就自動獲取鎖,退出同步代碼塊自動釋放鎖
設置超時時間 trylock(timeout) 沒有超時時間,線程會一直阻塞,直到獲取鎖
公平機制 設置true,爲公平鎖,等待時間最長的先獲取 沒有
阻塞線程列表 能夠查看正處於等待狀態的線程列表 不能夠
遇到異常時釋放 當遇到異常時在finally中執行unlock() 遇到異常時釋放鎖
底層實現 樂觀鎖方式(cas),每次不加鎖而是假設沒有衝突而去完成某項操做 CPU悲觀鎖機制,即線程得到的是獨佔鎖,只能依靠阻塞來等待線程釋放鎖
具體喚醒某一個線程 ReentrantLock裏面的Condition應用,可以控制signal哪一個線程 不能控制具體notify哪一個線程,notifyall()喚醒全部線程
靈活性 比synchronized更加靈活 不是那麼靈活
響應中斷 等待的線程能夠響應中斷 不能響應中斷
應用場景 資源競爭激烈的狀況下,是synchronized的幾十倍 資源競爭不激烈時,優於Lock

Condition的await()/signal()方法和Object的wait()/notify()方法

方法 Condition Object
阻塞等待 await() wait()
喚醒其餘線程 signal() notify()/notifyall()
使用的鎖 互斥鎖/共享鎖,如Lock 同步鎖:如synchronized
一個鎖對應 能夠建立多個condition 對應一個Object
喚醒指定的線程 明確的指定線程 只能經過notifyAll喚醒全部線程;或者notify()隨機喚醒

lock和condition實現生產者消費者

該實現方式相比較synchronized於object的wait()/notify()方法具備更加的靈活性,能夠喚醒具體的消費者線程或者生產者線程,達到當緩衝區滿的時候,喚醒消費者線程,此時生產者線程都將被阻塞,而不是向notifyall()那樣喚醒全部的線程。

容器類
package test8;

import java.util.LinkedList;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Container{
    private final Lock lock = new ReentrantLock();
    //表示生產者線程
    private final Condition notFull = lock.newCondition();
    //表示消費者線程
    private final Condition notEmpty = lock.newCondition();
    private int capacity;
    private List<Integer> list = new LinkedList<>();

    public Container(int capacity) {
        this.capacity = capacity;

    }

    public Integer take(){
        lock.lock();
       try {
           while (list.size() == 0)
               try {
                   System.out.println("the list is empty........");
                   notEmpty.await();//阻塞消費者線程
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           Integer val = list.remove(0);
           System.out.println("consumer--"+ Thread.currentThread().getName()+"--take:" + val+"===size:"+list.size());

           notFull.signalAll();//喚醒全部生產者線程
           return val;
       }finally {
           lock.unlock();
       }
    }

    public void put(Integer val){
        lock.lock();
       try {
           while (list.size() == capacity){
               try {
                   System.out.println("the list is full........");

                   notFull.await();//阻塞生產者線程
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
           list.add(val);
           System.out.println("producer--"+ Thread.currentThread().getName()+"--put:" + val+"===size:"+ list.size());

           notEmpty.signalAll();//喚醒全部消費者線程
       }finally {
           lock.unlock();
       }

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }



}
生產者
package test8;

import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.locks.Condition;

public class Producer implements Runnable {

    private Container container;

    public Producer(Container container) {
        this.container = container;
    }

    @Override
    public void run() {
        while (true){

            container.put(new Random().nextInt(100));
        }
    }
}
消費者
package test8;

public class Consumer implements Runnable {

    private Container container;

    public Consumer(Container container) {
        this.container = container;
    }

    @Override
    public void run() {
        while (true){
            Integer val = container.take();
        }
    }
}
測試類
package test8;

public class Test {
    public static void main(String[] args){

        Container container = new Container(5);

        Thread producer1 = new Thread(new Producer(container));
        Thread producer2 = new Thread(new Producer(container));
        Thread producer3 = new Thread(new Producer(container));
        Thread producer4 = new Thread(new Producer(container));
        Thread producer5 = new Thread(new Producer(container));

        Thread consumer1 = new Thread(new Consumer(container));
        Thread consumer2 = new Thread(new Consumer(container));


        producer1.start();
        producer2.start();
        producer3.start();
        producer4.start();
        producer5.start();

        consumer1.start();
        consumer2.start();
    }
}
the list is empty........
producer--Thread-3--put:77===size:1
consumer--Thread-6--take:77===size:0
the list is empty........
producer--Thread-4--put:55===size:1
producer--Thread-0--put:62===size:2
producer--Thread-1--put:90===size:3
producer--Thread-2--put:57===size:4
consumer--Thread-5--take:55===size:3
consumer--Thread-5--take:62===size:2
consumer--Thread-5--take:90===size:1
consumer--Thread-5--take:57===size:0
the list is empty........
the list is empty........
producer--Thread-0--put:10===size:1
producer--Thread-1--put:21===size:2
producer--Thread-3--put:3===size:3
producer--Thread-4--put:75===size:4
producer--Thread-2--put:94===size:5
consumer--Thread-5--take:10===size:4

不一樣模式的生產者消費者模型

一、單生產者單消費者(SPSC)(只有同步沒有互斥)

使用信號量實現

對於單生產者單消費者,只用保證緩衝區滿的時候,生產者不會繼續向緩衝區放數據,緩衝區空的時候,消費者不會繼續從緩衝區取數據,而不存在同時有兩個生產者使用緩衝區資源,形成數據不一致的狀態。

因此對於單生產者單消費者,若是採用信號量模型來實現的話,那麼只須要兩個信號量:empytyCount和fullCount分別來表示緩衝區滿或者空的狀態,進而可以更加容易控制消費者和生產者到底何時處於阻塞狀態,何時處於運行狀態; 而不須要使用互斥信號量了

  • emptyCount = N ; fullCount = 0 ;
produce:
    P(emptyCount)//信號量emptyCount減一
    putItemIntoQueue(item)//執行put操做
    V(fullCount)//信號量fullCount加一
consume:
    P(fullCount)//fullCount -= 1
    item ← getItemFromQueue()
    V(emptyCount)//emptyCount += 1

實現

緩衝區容器類

package test9;

import java.time.temporal.ValueRange;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Semaphore;

public class Container_spsc {
    Semaphore emptyCount = new Semaphore(10);
    Semaphore fullCount = new Semaphore(0);

    List<Integer> list = new LinkedList<Integer>();

    public void put(int val){
        try {
            emptyCount.acquire();
            list.add(val);
            System.out.println("producer--"+ Thread.currentThread().getName()+"--put:" + val+"===size:"+list.size());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            fullCount.release();
        }
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public Integer take(){
        Integer val = 0;

        try {
            fullCount.acquire();
             val = list.remove(0);
            System.out.println("consumer--"+ Thread.currentThread().getName()+"--take:" + val+"===size:"+list.size());

        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            emptyCount.release();
        }
        return val;
    }

}

生產者

package test9;

import test8.Container;

import java.util.Random;

public class Producer implements Runnable {

    private Container_spsc container;

    public Producer(Container_spsc container) {
        this.container = container;
    }

    @Override
    public void run() {
        while (true){
            container.put(new Random().nextInt(100));
        }
    }
}

消費者類

package test9;

import test8.Container;

public class Consumer implements Runnable {

    private Container_spsc container;

    public Consumer(Container_spsc container) {
        this.container = container;
    }

    @Override
    public void run() {
        while (true){
            Integer take = container.take();
        }
    }
}

測試

package test9;

public class Test {
    public static void main(String[] args){

        Container_spsc container = new Container_spsc();
        Thread producer = new Thread(new Producer(container));
        Thread consumer = new Thread(new Consumer(container));

        producer.start();
        consumer.start();

    }
}
producer--Thread-0--put:62===size:1
consumer--Thread-1--take:62===size:0
producer--Thread-0--put:40===size:1
consumer--Thread-1--take:40===size:0
producer--Thread-0--put:86===size:1
consumer--Thread-1--take:86===size:0
producer--Thread-0--put:15===size:1
consumer--Thread-1--take:15===size:0
producer--Thread-0--put:83===size:1
consumer--Thread-1--take:83===size:0
producer--Thread-0--put:13===size:1
consumer--Thread-1--take:13===size:0

二、多生產者單消費者(MPSC)

對於多生產者單消費者來講,多生產者之間具備互斥關係,因此這裏須要一個互斥鎖來實現緩衝區的互斥訪問,那麼具體的實現方式就是在單生產者單消費者的基礎之上,加一個互斥信號量useQueue

若是採用信號量來實現的話能夠以下:

  • emptyCount = N ; fullCount = 0 ; useQueue = 1
produce:
    P(emptyCount)//信號量emptyCount減一
    P(useQueue)//二值信號量useQueue減一,變爲0(其餘線程不能進入緩衝區,阻塞狀態)
    putItemIntoQueue(item)//執行put操做
    V(useQueue)//二值信號量useQueue加一,變爲1(其餘線程能夠進入緩衝區)
    V(fullCount)//信號量fullCount加一
consume:
    P(fullCount)//fullCount -= 1   
    item ← getItemFromQueue()
    V(emptyCount)//emptyCount += 1

具體的實現和單生產者單消費者差很少,只不過在生產者類裏面多加了一個互斥信號量useQueue

三、單生產者多消費者(SPMC)

對於單生產者多消費者同多生產者多消費者

  • emptyCount = N ; fullCount = 0 ; useQueue = 1
produce:
    P(emptyCount)//信號量emptyCount減一
    putItemIntoQueue(item)//執行put操做
    V(fullCount)//信號量fullCount加一
consume:
    P(fullCount)//fullCount -= 1   
    P(useQueue)//二值信號量useQueue減一,變爲0(其餘線程不能進入緩衝區,阻塞狀態)
    item ← getItemFromQueue()
    V(useQueue)//二值信號量useQueue加一,變爲1(其餘線程能夠進入緩衝區)
    V(emptyCount)//emptyCount += 1

具體的實現和單生產者單消費者差很少,只不過在消費者類裏面多加了一個互斥信號量useQueue

四、多生產者多消費者(MPMC)-單緩衝區(SB)

對於多生產者多消費者問題,是一個同步+互斥問題,不只須要生產者和消費者之間的同步協做,還須要實現對緩衝區資源的互斥訪問;這個能夠參考前面對生產者消費者4種實現方式

採用信號量

  • emptyCount = N ; fullCount = 0 ; useQueue = 1
produce:
    P(emptyCount)//信號量emptyCount減一
    P(useQueue)//二值信號量useQueue減一,變爲0(其餘線程不能進入緩衝區,阻塞狀態)
    putItemIntoQueue(item)//執行put操做
    V(useQueue)//二值信號量useQueue加一,變爲1(其餘線程能夠進入緩衝區)
    V(fullCount)//信號量fullCount加一
consume:
    P(fullCount)//fullCount -= 1   
    P(useQueue)//二值信號量useQueue減一,變爲0(其餘線程不能進入緩衝區,阻塞狀態)
    item ← getItemFromQueue()
    V(useQueue)//二值信號量useQueue加一,變爲1(其餘線程能夠進入緩衝區)
    V(emptyCount)//emptyCount += 1

五、多生產者多消費者(MPMC)-雙緩衝區(MB)

  • 爲何要用雙緩衝區讀寫分離減小釋放鎖和獲取鎖的開銷

用一個緩衝區,生產者和消費者須要先獲取到緩衝區的鎖才能進行put和take操做,每一次put和take都須要獲取一次鎖,這須要大量的同步與互斥操做,十分損耗性能。

因此若是採用雙緩衝區的話,一個緩衝區bufferA用於生產者執行put操做,一個緩衝區bufferB用於消費者執行take操做;生產者線程和消費者線程在使用各自的緩衝區以前都須要先獲取到緩衝區對應的鎖,才能進行操做;

生產者和消費者各自使用本身獨立的緩衝區,那麼就不存在同一個緩衝區被put的同時進行take操做

因此一旦生產者和消費者一旦獲取到了對應緩衝區的鎖,那麼每一次執行put/take操做時就不用再次從新獲取鎖了,從而減小了不少獲取鎖、釋放鎖的性能開銷

  • 緩衝區的切換

若是bufferA被put滿了,那麼生產者釋放bufferA的鎖,並等待消費者釋放bufferB的鎖;當bufferB被take空了,消費者釋放bufferB的鎖,此時生產者獲取到bufferB的鎖,對bufferB進行put;消費者獲取到bufferA的鎖,對bufferA進行take,那麼就完成了一次緩衝區的切換

  • 雙緩衝區的狀態

    • 併發讀寫

      bufferA和bufferB都處於工做狀態,一個讀一個寫

    • 單個緩衝區空閒

      假設bufferA已經滿了,那麼生產者就會釋放bufferA的鎖,嘗試獲取bufferB,而此時bufferB還在執行take操做,消費者還沒釋放bufferB的鎖,那麼生產者進入等待狀態

    • 緩衝區的切換

      當bufferB爲空,那麼此時消費者釋放bufferB的鎖,嘗試獲取bufferA的鎖,此時消費者被喚醒,從新嘗試獲取bufferB的鎖

  • 雙緩衝區的死鎖問題

    若是操做完當前的緩衝區以後,先獲取另一個緩衝區的鎖,再釋放當前緩衝區的鎖,就會發生死鎖問題。若是bufferA和bufferB的線程同時嘗試獲取對方的鎖,那麼就會一直循環等待下去

  • 須要注意的問題

    因爲雙緩衝區是爲了不每次讀寫的時候不用進行同步與互斥操做,因此對於一些原本就是線程安全的類例如arrayblockingqueue就不適合做爲雙緩衝區,由於他們內部已經實現了每次讀寫操做的時候進行加鎖和釋放

  • 應用場景:

    • 共享內存和共享文件
    • 邏輯處理線程和IO處理線程分離。 I/0處理線程負責網絡數據的發送和接收,鏈接的創建和維護。 邏輯處理線程處理從IO線程接收到的包。

六、多生產者多消費者(MPMC)-多緩衝區(MB)

多個緩衝區構成一個緩衝池,一樣須要兩個同步信號量emtpyCount和fullCount,還有一個互斥信號量useQueue,同時還須要兩個變量指示哪些是空緩衝區哪些是有數據的緩衝區,多緩衝區和雙緩衝區同樣,一樣是以空間換時間,減小單個讀寫操做的同步與互斥操做,對於同一個緩衝區而言,不可能同時會put和take

七、多生產者多消費者(MPMC)-環形緩衝區(Ring buffer)

爲何要引入環形緩衝區

討論爲何要引入環形緩衝區,其實也就是在討論隊列緩衝區有什麼弊端,而環形緩衝區是如何解決這種弊端的=

那麼咱們先認識一下什麼是環形緩衝區

  • 循環緩衝區的有用特性是,當使用一個循環緩衝區時,它不須要將其元素打亂。
  • FIFO
  • 全部的 push/pop 操做都是在一個固定的存儲空間內進行,少掉了對於緩衝區元素所用存儲空間的分配、釋放

隊列緩衝區

  • 若是使用非循環緩衝區,那麼在使用一個緩衝區時,須要移動全部元素
  • LIFO
  • 在執行push和pop操做時,涉及到內存的分配與釋放開銷大

瞭解瞭如何使用Java經過簡單的synchronized與object的wait()/notify()、Lock與Condition的await()/signal()方法、BlockingQueue、信號量semaphore四種方法來實現生產者消費者模型,之後有機會咱們在研究研究Linux和windows分別又是如何實現生產者消費者模型的

參考

《Java併發編程實踐》

實現生產者消費者模式的四種方式(Synchronized、Lock、Semaphore、BlockingQueue)

生產者消費者模型你知道多少

多任務併發之生產者消費者模式應用

聊聊併發——生產者消費者模式

經典消費者生產者問題

https://blog.csdn.net/luohuac...

https://www.geeksforgeeks.org...

【Java】生產者消費者模式的實現

信號量(生產者和消費者模型)

線程通訊

https://www.geeksforgeeks.org...

https://blog.csdn.net/chencha...

https://www.cnblogs.com/Wante...

https://blog.csdn.net/u012403...

https://www.geeksforgeeks.org...

https://blog.csdn.net/woailuo...

https://blog.csdn.net/liuxiao...

https://program-think.blogspo...

https://zhuanlan.zhihu.com/p/...

歡迎關注個人公衆號:小秋的博客,天天進步一點點
圖片描述

相關文章
相關標籤/搜索