生產者和消費者

  • 若是生產者的隊列滿了(while循環判斷是否滿),則等待。若是生產者的隊列沒滿,則生產數據並喚醒消費者進行消費。java

  • 若是消費者的隊列空了(while循環判斷是否空),則等待。若是消費者的隊列沒空,則消費數據並喚醒生產者進行生產。dom

package test;ide

 

import java.util.Random;this

import java.util.Vector;atom

import java.util.concurrent.atomic.AtomicInteger;spa

 

public class Producer implements Runnable {線程

 

    // true--->生產者一直執行,false--->停掉生產者隊列

    private volatile boolean isRunning = true;內存

 

    // 公共資源ci

    private final Vector sharedQueue;

 

    // 公共資源的最大數量

    private final int SIZE;

 

    // 生產數據

    private static AtomicInteger count = new AtomicInteger();

 

    public Producer(Vector sharedQueue, int SIZE) {

        this.sharedQueue = sharedQueue;

        this.SIZE = SIZE;

    }

 

    @Override

    public void run() {

        int data;

        Random r = new Random();

 

        System.out.println("start producer id = " + Thread.currentThread().getId());

        try {

            while (isRunning) {

                // 模擬延遲

                Thread.sleep(r.nextInt(1000));

 

                // 當隊列滿時阻塞等待

                while (sharedQueue.size() == SIZE) {

                    synchronized (sharedQueue) {

                        System.out.println("Queue is full, producer " + Thread.currentThread().getId()

                                + " is waiting, size:" + sharedQueue.size());

                        sharedQueue.wait();

                    }

                }

 

                // 隊列不滿時持續創造新元素

                synchronized (sharedQueue) {

                    // 生產數據

                    data = count.incrementAndGet();

                    sharedQueue.add(data);

 

                    System.out.println("producer create data:" + data + ", size:" + sharedQueue.size());

                    sharedQueue.notifyAll();

                }

            }

        } catch (InterruptedException e) {

            e.printStackTrace();

            Thread.currentThread().interrupted();

        }

    }

 

    public void stop() {

        isRunning = false;

    }

}

 

 

package test;

 

import java.util.Random;

import java.util.Vector;

 

public class Consumer implements Runnable {

 

    // 公共資源

    private final Vector sharedQueue;

 

    public Consumer(Vector sharedQueue) {

        this.sharedQueue = sharedQueue;

    }

 

    @Override

    public void run() {

 

        Random r = new Random();

 

        System.out.println("start consumer id = " + Thread.currentThread().getId());

        try {

            while (true) {

                // 模擬延遲

                Thread.sleep(r.nextInt(1000));

 

                // 當隊列空時阻塞等待

                while (sharedQueue.isEmpty()) {

                    synchronized (sharedQueue) {

                        System.out.println("Queue is empty, consumer " + Thread.currentThread().getId()

                                + " is waiting, size:" + sharedQueue.size());

                        sharedQueue.wait();

                    }

                }

                // 隊列不空時持續消費元素

                synchronized (sharedQueue) {

                    System.out.println("consumer consume data:" + sharedQueue.remove(0) + ", size:" + sharedQueue.size());

                    sharedQueue.notifyAll();

                }

            }

        } catch (InterruptedException e) {

            e.printStackTrace();

            Thread.currentThread().interrupt();

        }

    }

}

 

 

 

package test;

 

import java.util.Vector;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

 

public class Test2 {

 

 

    public static void main(String[] args) throws InterruptedException {

 

        // 1.構建內存緩衝區

        Vector sharedQueue = new Vector();

        int size = 4;

 

        // 2.創建線程池和線程

        ExecutorService service = Executors.newCachedThreadPool();

        Producer prodThread1 = new Producer(sharedQueue, size);

        Producer prodThread2 = new Producer(sharedQueue, size);

        Producer prodThread3 = new Producer(sharedQueue, size);

        Consumer consThread1 = new Consumer(sharedQueue);

        Consumer consThread2 = new Consumer(sharedQueue);

        Consumer consThread3 = new Consumer(sharedQueue);

        service.execute(prodThread1);

        service.execute(prodThread2);

        service.execute(prodThread3);

        service.execute(consThread1);

        service.execute(consThread2);

        service.execute(consThread3);

 

        // 3.睡一下子而後嘗試中止生產者(結束循環)

        Thread.sleep(10 * 1000);

        prodThread1.stop();

        prodThread2.stop();

        prodThread3.stop();

 

        // 4.再睡一下子關閉線程池

        Thread.sleep(3000);

 

        // 5.shutdown()等待任務執行完才中斷線程(由於消費者一直在運行的,因此會發現程序沒法結束)

        service.shutdown();

 

 

    }

}

相關文章
相關標籤/搜索