手寫一個生產者--消費者模型例子

在併發編程中,比較經典的編程例子就是生產者和消費者模型。下面就是一個例子來詮釋一下什麼是生產者和消費者以及他們的特色和注意點。編程

一、先定義一個數據對象,緩存

public class Data {
    private String id;

    private String name;

    public Data(String id,String name){
        this.id = id;
        this.name = name;
    }
    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "Data [id=" + id + ", name=" + name + "]";
    }
}

2.定義一個生產者,實現Runnable接口。多線程

public class Provider implements Runnable{
    //共享緩衝區
    private BlockingQueue<Data> queue;

    //多線程間是否啓動變量,有強制從主內存中刷新的功能,及時返回線程狀態
    private volatile boolean isRunning = true;
    //id生成器
    private static AtomicInteger count = new AtomicInteger();

    //隨機對象
    private static Random r = new Random();

    public Provider(BlockingQueue queue){
        this.queue = queue;
    }

    @Override
    public void run() {
        while(isRunning){
            //隨機休眠0-1000毫秒 表示獲取數據
            try {
                Thread.sleep(r.nextInt(1000));
                //獲取的數據進行累計
                int id  = count.incrementAndGet();
                //好比經過一個getData()方法獲取了
                Data data = new Data(Integer.toString(id),"數據"+id);
                System.out.println("當前線程:"+ Thread.currentThread().getName() + ",獲取了數據,id爲:"+ id+ ",進行裝載到公共緩衝區中。。。");
                if(!this.queue.offer(data,2,TimeUnit.SECONDS)){
                    System.out.print("提交緩衝區數據失敗");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.print("aaa");
        }
    }

    public void  stop(){
        this.isRunning = false;
    }
}

這裏有幾個注意點,一個就是對共享緩衝區的選擇,做爲生產者–消費者模型而言,共享緩衝區必定要具有阻塞的能力。因此這邊選擇的是阻塞隊列。還有一個就是在併發編程的時候,若是須要使用相似i++這種id自增加的功能,須要使用Atomic包下的併發類。由於這些類是採用CAS設計的,不會產生併發問題。併發

3.消費者dom

public class Consumer implements Runnable {

    private BlockingQueue<Data> queue;

    public Consumer(BlockingQueue queu){
        this.queue = queu;
    }

    //隨機對象
    private static Random r = new Random();

    @Override
    public void run() {
        while(true){
            try{
                //獲取數據
                Data data = this.queue.take();
                //進行數據處理,休眠 0-1000毫秒模擬耗時
                Thread.sleep(r.nextInt(1000));
                System.out.print("當前消費線程"+Thread.currentThread().getName() +",消費成功,消費id爲"+data.getId());
            }catch(InterruptedException e){
                e.printStackTrace();
            }
        }
    }
}

消費者主要就是從阻塞隊列中獲取數據,若是隊列中沒有元素,則會釋放CPU,而後等待。(注意這裏使用的是take而不是poll,不一樣點在於take在沒有元素的時候會釋放CPU,而poll則是直接返回null)。ide

main函數:函數

public class Main {
    public static void main(String[] args){
        //內存緩衝區
        BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10);
        //生產者
        Provider p1 = new Provider(queue);
        Provider p2 = new Provider(queue);
        Provider p3 = new Provider(queue);

        Consumer c1 = new Consumer(queue);
        Consumer c2 = new Consumer(queue);
        Consumer c3 = new Consumer(queue);

        //建立線程池,這是一個緩存的線程池,能夠建立無窮大的線程,沒有任務的時候不建立線程,空閒線程存活的時間爲60s。
        ExecutorService cachepool = Executors.newCachedThreadPool();
        cachepool.execute(p1);
        cachepool.execute(p2);
        cachepool.execute(p3);
        cachepool.execute(c1);
        cachepool.execute(c2);
        cachepool.execute(c3);
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        p1.stop();
        p2.stop();
        p3.stop();
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
相關文章
相關標籤/搜索