生產者消費者模式使用場景:線程之間須要共享數據,而且有可能線程之間生產和消費數據的速度不一樣或者性能有差別。java
本例使用JDK併發包的BlockingQueue做爲共享緩衝區併發
package concurrent; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class Producer implements Runnable { private BlockingQueue<Thing> queue; private static AtomicInteger count = new AtomicInteger(); public Producer(BlockingQueue<Thing> queue) { this.queue = queue; } @Override public void run() { // TODO Auto-generated method stub while (true) { Thing thing = new Thing(count.incrementAndGet()); try { Thread.sleep(1000);// 模擬處理前期準備 if (!queue.offer(thing, 100, TimeUnit.MILLISECONDS)) { System.out.println("Fail to put " + thing.toString()); } else { System.out.println("Put" + thing.toString()); } } catch (InterruptedException e) { e.printStackTrace(); } } } }
package concurrent; import java.util.concurrent.BlockingQueue; public class Consumer implements Runnable { private BlockingQueue<Thing> queue; public Consumer(BlockingQueue<Thing> queue) { this.queue = queue; } @Override public void run() { while (true) { try { Thing t = queue.take(); if (t != null) { System.out.println("Get " + t.toString()); Thread.sleep(100);// 模擬處理獲得的數據 } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
package concurrent; public class Thing {// 被生產和消費的東西 private final int id; public Thing(int id) { this.id = id; } @Override public String toString() { return "Thing " + id; } }
package concurrent; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; public class Main { public static void main(String[] args) { // TODO Auto-generated method stub BlockingQueue<Thing> queue = new LinkedBlockingQueue<Thing>(20); Producer p1 = new Producer(queue); Producer p2 = new Producer(queue); Producer p3 = new Producer(queue); Consumer c1 = new Consumer(queue); Consumer c2 = new Consumer(queue); Consumer c3 = new Consumer(queue); ExecutorService executor = Executors.newCachedThreadPool(); executor.submit(p1); executor.submit(p2); executor.submit(p3); executor.submit(c1); executor.submit(c2); executor.submit(c3); } }