生產者模式-消費者模式

生產着,消費者模式是一個經典的多線程設計模式,它爲多線程間的協做提供了良好的解決方案。
一般有兩個角色:
若干個生產者線程,若個個消費者線程。生產者線程負責提交用戶的請求,消費者線程負責具體處理生產者提交的任務。生產者和消費者之間則經過共享內存緩衝區進行通訊。
生產者-消費者模式中的內存緩衝區主要功能是數據在多線程間的共享,此外,經過該緩衝區,能夠緩解生產者和消費者間的性能差。

public class Main {設計模式

public static void main(String[] args) throws InterruptedException {
    BlockingQueue<PCData> queue = new LinkedBlockingDeque<>();  //緩衝區域
    Producer producer1 = new Producer(queue);
    Producer producer2 = new Producer(queue);//生產者
    Producer producer3 = new Producer(queue);
    Consumer consumer1 = new Consumer(queue);
    Consumer consumer2 = new Consumer(queue);//消費者
    Consumer consumer3 = new Consumer(queue);
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(producer1);
    executorService.execute(producer2);
    executorService.execute(producer3);
    executorService.execute(consumer1);
    executorService.execute(consumer2);
    executorService.execute(consumer3);

    Thread.sleep(10*1000);
    producer1.stop();
    producer2.stop();
    producer3.stop();
    Thread.sleep(3000);
    executorService.shutdown();
}

}
public class Producer implements Runnable{多線程

private volatile boolean isRunning = true;
private BlockingQueue<PCData> queue;
private static AtomicInteger count = new AtomicInteger();
private static final int SLEEEPTIME =1000;

public Producer(BlockingQueue<PCData> queue) {
    this.queue = queue;
}

@Override
public void run() {
    PCData data = null;
    Random random = new Random();
    System.out.println("start producer name"+Thread.currentThread().getName());
    try{
        while (isRunning){
            Thread.sleep(random.nextInt(SLEEEPTIME));
            data = new PCData(count.incrementAndGet());
            System.out.println(data+"is put into queue");
            if(!queue.offer(data,2,TimeUnit.SECONDS)){
                System.err.println("failed to put data"+data);
            }
        }
    }catch (Exception e){
        e.printStackTrace();
        Thread.currentThread().interrupt();
    }
}

public void stop(){
    isRunning=false;
}

}
public class Consumer implements Runnable {dom

private BlockingQueue<PCData> queue;
private static final int SLEEPTIME = 1000;
public Consumer(BlockingQueue<PCData> queue) {
    this.queue = queue;
}

@Override
public void run() {
    System.out.println("start Consumer id"+Thread.currentThread().getName());
    Random random = new Random();
    try{
        while(true){
            PCData pcData = queue.take();
            if(pcData!=null){
                int re = pcData.getData()*pcData.getData();
                System.out.println(MessageFormat.format("{0}*{1}={2}",pcData.getData(),pcData.getData(),re));
                Thread.sleep(random.nextInt(SLEEPTIME));
            }
        }
    }catch (Exception e){
        e.printStackTrace();
        Thread.currentThread().interrupt();
    }
}

}
public class PCData {ide

private final int intData;

public PCData(int intData) {
    this.intData = intData;
}
public PCData(String data) {
    this.intData = Integer.valueOf(data);
}
public int getData(){
    return intData;
}

@Override
public String toString() {
    return "PCData{" +
            "intData=" + intData +
            '}';
}

}性能

相關文章
相關標籤/搜索