生產着,消費者模式是一個經典的多線程設計模式,它爲多線程間的協做提供了良好的解決方案。 一般有兩個角色: 若干個生產者線程,若個個消費者線程。生產者線程負責提交用戶的請求,消費者線程負責具體處理生產者提交的任務。生產者和消費者之間則經過共享內存緩衝區進行通訊。 生產者-消費者模式中的內存緩衝區主要功能是數據在多線程間的共享,此外,經過該緩衝區,能夠緩解生產者和消費者間的性能差。
public class Main {設計模式
public static void main(String[] args) throws InterruptedException { BlockingQueue<PCData> queue = new LinkedBlockingDeque<>(); //緩衝區域 Producer producer1 = new Producer(queue); Producer producer2 = new Producer(queue);//生產者 Producer producer3 = new Producer(queue); Consumer consumer1 = new Consumer(queue); Consumer consumer2 = new Consumer(queue);//消費者 Consumer consumer3 = new Consumer(queue); ExecutorService executorService = Executors.newCachedThreadPool(); executorService.execute(producer1); executorService.execute(producer2); executorService.execute(producer3); executorService.execute(consumer1); executorService.execute(consumer2); executorService.execute(consumer3); Thread.sleep(10*1000); producer1.stop(); producer2.stop(); producer3.stop(); Thread.sleep(3000); executorService.shutdown(); }
}
public class Producer implements Runnable{多線程
private volatile boolean isRunning = true; private BlockingQueue<PCData> queue; private static AtomicInteger count = new AtomicInteger(); private static final int SLEEEPTIME =1000; public Producer(BlockingQueue<PCData> queue) { this.queue = queue; } @Override public void run() { PCData data = null; Random random = new Random(); System.out.println("start producer name"+Thread.currentThread().getName()); try{ while (isRunning){ Thread.sleep(random.nextInt(SLEEEPTIME)); data = new PCData(count.incrementAndGet()); System.out.println(data+"is put into queue"); if(!queue.offer(data,2,TimeUnit.SECONDS)){ System.err.println("failed to put data"+data); } } }catch (Exception e){ e.printStackTrace(); Thread.currentThread().interrupt(); } } public void stop(){ isRunning=false; }
}
public class Consumer implements Runnable {dom
private BlockingQueue<PCData> queue; private static final int SLEEPTIME = 1000; public Consumer(BlockingQueue<PCData> queue) { this.queue = queue; } @Override public void run() { System.out.println("start Consumer id"+Thread.currentThread().getName()); Random random = new Random(); try{ while(true){ PCData pcData = queue.take(); if(pcData!=null){ int re = pcData.getData()*pcData.getData(); System.out.println(MessageFormat.format("{0}*{1}={2}",pcData.getData(),pcData.getData(),re)); Thread.sleep(random.nextInt(SLEEPTIME)); } } }catch (Exception e){ e.printStackTrace(); Thread.currentThread().interrupt(); } }
}
public class PCData {ide
private final int intData; public PCData(int intData) { this.intData = intData; } public PCData(String data) { this.intData = Integer.valueOf(data); } public int getData(){ return intData; } @Override public String toString() { return "PCData{" + "intData=" + intData + '}'; }
}性能