生產者消費者問題

生產者消費者模式是一個典型的併發編程須要考慮的問題, 經過一箇中間容器來解決生產者和消費者的強耦合特性。 生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力java

其特色大體以下:編程

  • 一組生產者生產數據到緩衝區中,另一組消費者從緩衝區中取數據
  • 若是緩衝區已經滿了,則生產者線程阻塞,等待隊列有空間存放數據
  • 若是緩衝區爲空,那麼消費者線程阻塞,等待隊列有數據進來
  • 生產者往緩衝去寫入數據的同時,消費者能夠同時拿取數據
  • 緩衝區數據先入先出

碼農的世界裏面向來是Talk is cheap, show me the code.數組

筆者寫了一個小demo,緩衝區用的是BlockingQueue 接口的實現類,java.util.concurrent包下具備ArrayBlockingQueue, DelayQueue, LinkedBlockingQueue,  PriorityBlockingQueue, SynchronousQueue.併發

示例中使用的是ArrayBlockingQueue, 一個數組實現的有界阻塞隊列。有界也就意味着,它不可以存儲無限多數量的元素。它有一個同一時間可以存儲元素數量的上限。你能夠在對其初始化的時候設定這個上限,但以後就沒法對這個上限進行修改了。一旦初始化,大小就沒法修改。dom

package demo;

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;

public class Producer implements Runnable {

    private ArrayBlockingQueue<Integer> myQueue;
    private Random random = new Random();

    public ArrayBlockingQueue<Integer> getMyQueue() {
        return myQueue;
    }

    public void setMyQueue(ArrayBlockingQueue<Integer> myQueue) {
        this.myQueue = myQueue;
    }

    @Override
    public void run() {
        int tmp =-1;
        
        try {
            while (true) {
                long millis = (long) (Math.random() * 6000);
                // 模擬耗時計算
                Thread.sleep(millis);
                int element = random.nextInt(2000);
                myQueue.put(element);
                tmp = myQueue.size();
                String msg = Thread.currentThread().getName() + " is producing data : " + element;
                msg += "\r\n";
                msg += "Current queue size : " + tmp;
                System.out.println(msg);
            }
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}
Producer
package demo;

import java.util.concurrent.ArrayBlockingQueue;

public class Consumer implements Runnable {

    private ArrayBlockingQueue<Integer> myQueue;

    public ArrayBlockingQueue<Integer> getMyQueue() {
        return myQueue;
    }

    public void setMyQueue(ArrayBlockingQueue<Integer> myQueue) {
        this.myQueue = myQueue;
    }

    @Override
    public void run() {
        int element = -1;
        int tmp = -1;
        try {
            while (true) {
                long millis = (long) (Math.random() * 5000);
                // 模擬複雜計算
                Thread.sleep(millis);
                element = myQueue.take();
                tmp = myQueue.size();
                // 延遲一毫秒寫日誌
                Thread.sleep(1);
                String msg = Thread.currentThread().getName() + " is consuming data : " + element;
                msg += "\r\n";
                msg += "Current queue size : " + tmp;
                System.out.println(msg);
            }

        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}
Consumer
package demo;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {

    private static final int SIZE = 10;
    private static ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(SIZE);
    
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        
        ExecutorService producerPool = Executors.newFixedThreadPool(3);
        ExecutorService consumerPool = Executors.newFixedThreadPool(2);
        
        Producer p1 = new Producer();
        p1.setMyQueue(queue);
        Producer p2 = new Producer();
        p2.setMyQueue(queue);
        Producer p3 = new Producer();
        p3.setMyQueue(queue);
        
        producerPool.execute(p1);
        producerPool.execute(p2);
        producerPool.execute(p3);
        
        Consumer c1 = new Consumer();
        c1.setMyQueue(queue);
        Consumer c2 = new Consumer();
        c2.setMyQueue(queue);
        
        consumerPool.execute(c1);
        consumerPool.execute(c2);
        
        
        //consumerPool.shutdown();
        //producerPool.shutdown();

    }

}
Main
相關文章
相關標籤/搜索