生產者消費者模式

定義

         生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。java

實現

1.利用wait和notify實現

package productorConsumer;

import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author littlecar
 * @date 2019/7/26 10:24
 */
public class ProductorConsumer {
    public static void main(String[] arg) {
        LinkedList<Integer> list = new LinkedList<>();
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(15);
        for (int i = 0; i < 5; i++) {
            scheduledThreadPoolExecutor.submit(new Productor(list, 5));
        }
        for (int i = 0; i < 10; i++) {
            scheduledThreadPoolExecutor.submit(new Consumer(list));
        }
    }
    static class Productor implements Runnable{
        private List<Integer> list;
        private int maxLength;
        public Productor(List<Integer> list,int maxLength) {
            this.list=list;
            this.maxLength=maxLength;
        }
        @Override
        public void run() {
            while (true) {
                synchronized (list) {
                    try {
                        while (list.size() == maxLength) {
                            System.out.println("生產者"+Thread.currentThread().getName()+"list已滿,進行wait");
                            list.wait();
                            System.out.println("生產者"+Thread.currentThread().getName()+"退出wait");
                        }
                        Random r = new Random();
                        int i = r.nextInt();
                        System.out.println("生產者"+Thread.currentThread().getName()+"生產數據"+i);
                        list.add(i);
                        list.notifyAll();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    static class Consumer implements Runnable{
        private List<Integer> list;
        public Consumer(List<Integer> list) {
            this.list=list;
        }
        @Override
        public void run() {
            while (true) {
                synchronized (list) {
                    try {
                        while (list.isEmpty()) {
                            System.out.println("消費者"+Thread.currentThread().getName()+"list爲空,進行wait");
                            list.wait();
                            System.out.println("消費者"+Thread.currentThread().getName()+"退出wait");
                        }
                        Integer i = list.remove(0);
                        System.out.println("消費者"+Thread.currentThread().getName()+"消費數據"+i);
                        list.notifyAll();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

2.利用Lock中Condition的await/signalAll實現生產者消費者

package productorConsumer;

import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author littlecar
 * @date 2019/7/26 14:00
 */
public class ProductorConsumer1 {
    private static ReentrantLock lock = new ReentrantLock();
    private static Condition full = lock.newCondition();
    private static Condition empty = lock.newCondition();
    public static void main(String[] args) {
        LinkedList<Integer> list = new LinkedList<>();
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(15);
        for (int i = 0; i < 5; i++) {
            scheduledThreadPoolExecutor.submit(new Productor(list, lock,5));
        }
        for (int i = 0; i < 10; i++) {
            scheduledThreadPoolExecutor.submit(new Consumer(list,lock));
        }
    }
    static class Productor implements Runnable{
        private List<Integer> list;
        private Lock lock;
        private int maxLength;
        public Productor(List<Integer> list,Lock lock,int maxLength) {
            this.list=list;
            this.lock=lock;
            this.maxLength=maxLength;
        }
        @Override
        public void run() {
            while (true) {
                lock.lock();
                try {
                    while (list.size() == maxLength) {
                        System.out.println("生產者"+Thread.currentThread().getName()+"list已滿,進行wait");
                        full.await();
                        System.out.println(""+Thread.currentThread().getName()+"退出wait");
                    }
                    Random r = new Random();
                    int i = r.nextInt();
                    System.out.println("生產者"+Thread.currentThread().getName()+"生產數據"+i);
                    list.add(i);
                    empty.signalAll();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally{
                    lock.unlock();
                }
            }
        }
    }
    static class Consumer implements Runnable{
        private List<Integer> list;
        private Lock lock;
        public Consumer(List<Integer> list,Lock lock) {
            this.list=list;
            this.lock=lock;
        }
        @Override
        public void run() {
            while (true) {
                lock.lock();
                try {
                    while (list.isEmpty()) {
                        System.out.println("消費者"+Thread.currentThread().getName()+"list爲空,進行wait");
                        empty.await();
                        System.out.println("消費者"+Thread.currentThread().getName()+"退出wait");
                    }
                    Integer i = list.remove(0);
                    System.out.println("消費者"+Thread.currentThread().getName()+"消費數據"+i);
                    full.signalAll();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally{
                    lock.unlock();
                }
            }
        }
    }
}

3. 使用BlockingQueue實現生產者-消費者

package productorConsumer;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;

/**
 * @author littlecar
 * @date 2019/7/26 14:15
 */
public class ProductorConsumer2 {
    private static LinkedBlockingQueue<Integer> linkedBlockingQueue=new LinkedBlockingQueue<>();
    public static void main(String[] args) {
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(15);
        for (int i = 0; i < 5; i++) {
            scheduledThreadPoolExecutor.submit(new Productor(linkedBlockingQueue));
        }
        for (int i = 0; i < 10; i++) {
            scheduledThreadPoolExecutor.submit(new Consumer(linkedBlockingQueue));
        }
    }

    static class Productor implements Runnable {
        private BlockingQueue queue;
        public Productor(BlockingQueue queue){
            this.queue=queue;
        }
        @Override
        public void run() {
            try {
                while (true) {
                    Random r = new Random();
                    int i = r.nextInt();
                    System.out.println("生產者"+Thread.currentThread().getName()+"生產數據"+i);
                    queue.put(i);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    static class Consumer implements Runnable {
        private BlockingQueue queue;
        public Consumer(BlockingQueue queue){
            this.queue=queue;
        }
        @Override
        public void run() {
            try {
                while (true) {
                    Integer i = (Integer) queue.take();
                    System.out.println("消費者"+Thread.currentThread().getName()+"消費數據"+i);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
相關文章
相關標籤/搜索