生產者消費者模式-基於線程池

1. 爲何使用生產者消費者模式java

(1)解耦合。消費者只關心隊列裏面取出來的數據,不用關心數據的來源。好比,生產者服務的域名,url這些變動。異步

(2)支持異步。生產者生產出來數據,直接放入隊列就行了,接着生產下一個數據,沒必要等待。好比廚師作菜的時候,只須要把作好的菜放到傳送帶就接着作下一道菜。不須要有等有顧客過來把這個菜領走在作下一道;效率更高。ide

(3)流量削峯。雙十一零點那一刻,qps會飆升。若是爲了這一小會的時間,增長機器不划算,由於平時的時候,這些機器足夠用。那我能夠吧這些請求放到一個隊列,服務從隊列中拿出請求,運算後返回給客戶端。this

 

2. 生產者消費者圖示url

 

 

3. 代碼實現spa

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class ProducerConsumer<E> {
    private int queueSize = 5;
    private int producerNum = 2;
    private int consumerNum = 2;
    //建立一個阻塞隊列
    private LinkedBlockingQueue<E> blockingQueue = null;
    //生產者線程池
    private ExecutorService producerTheadPool = null;
    //消費者線程池
    private ExecutorService consumerTheadPool = null;

    public ProducerConsumer(){
        blockingQueue = new LinkedBlockingQueue<>(queueSize);
        producerTheadPool = Executors.newFixedThreadPool(producerNum);
        consumerTheadPool = Executors.newFixedThreadPool(consumerNum);
    }

    public ProducerConsumer(int queueSize, int producerNum, int consumerNum){
        blockingQueue = new LinkedBlockingQueue<>(queueSize);
        producerTheadPool = Executors.newFixedThreadPool(producerNum);
        consumerTheadPool = Executors.newFixedThreadPool(consumerNum);
    }

    public void produceEleAsync(E ele){
        if(!checkSuccess()){
            return;
        }
        Producer<E> producer = new Producer<E>(this.blockingQueue, ele);
        producerTheadPool.execute(producer);
    }

    //執行消費過程
    public void consumeEleAsync() {
        if(!checkSuccess()){
            return;
        }
        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true){
                    try {
                        E ele = blockingQueue.take();//阻塞獲取數據
                        Consumer<E> consumer = new Consumer<E>(ele);
                        consumerTheadPool.execute(consumer);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }

    //判空檢查
    private boolean checkSuccess(){
        if(blockingQueue!=null
                && producerTheadPool!=null
                && consumerTheadPool!=null){
            return true;
        }
        return false;
    }

    //生產者
    private class Producer<E> implements Runnable{
        private LinkedBlockingQueue<E> blockingQueue;
        private E ele;

        public Producer(LinkedBlockingQueue<E> blockingQueue, E ele){
            this.blockingQueue = blockingQueue;
            this.ele = ele;
        }

        @Override
        public void run() {
            if(this.blockingQueue!=null && ele!=null){
                try {
                    this.blockingQueue.put(ele);
                } catch (InterruptedException e) {
                    e.getStackTrace();
                }
            }
        }
    }

    //消費者
    private class Consumer<E> implements Runnable{
        private E ele;

        public Consumer(E ele){
            this.ele = ele;
        }

        @Override
        public void run() {
            //執行消費過程
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            if(ele!=null){
                System.out.println("消費--->" + ele.toString());
            }
        }
    }
    

}
相關文章
相關標籤/搜索