生產者和消費者模式,由這個衍生的消息隊列,任務隊列...等等,原理都是這個,下面先上個代碼。方便你們充分想象:java
package com.my.jobQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * <p> * 名稱:BlockingQueueTest.java * </p> * <p> * 描述: * </p> * * <pre> * * </pre> * * @author 鮑建明 * @date 2015-1-13 下午4:10:32 * @version 1.0.0 */ public class BlockingQueueTest { final static AtomicInteger i = new AtomicInteger(0); /** * 定義裝蘋果的籃子 */ public static class Basket { // 籃子,可以容納3個蘋果 // BlockingQueue<String> basket = new ArrayBlockingQueue<String>(3); BlockingQueue<String> basket = new LinkedBlockingQueue<String>(100); // 生產蘋果,放入籃子 public void produce() throws InterruptedException { // put方法放入一個蘋果,若basket滿了,等到basket有位置 basket.put("第" + i.get() + "個蘋果"); System.out.println("放入第" + i.get() + "個蘋果"); i.getAndAdd(1); } // 消費蘋果,從籃子中取走 public String consume() throws InterruptedException { // get方法取出一個蘋果,若basket爲空,等到basket有蘋果爲止 return basket.take(); } } // 測試方法 public static void testBasket() { // 創建一個裝蘋果的籃子 final Basket basket = new Basket(); // 定義蘋果生產者 class Producer implements Callable<String> { public String instance = ""; public Producer(String a) { instance = a; } public String call() throws Exception { try { while (true) { // 生產蘋果 basket.produce(); // 休眠300ms TimeUnit.MILLISECONDS.sleep(300); } } catch (InterruptedException ex) { } return "生產任務成功後返回的參數"; } } // 定義蘋果消費者 class Consumer implements Callable<String> { public String instance = ""; public Consumer(String a) { instance = a; } public String call() throws Exception { try { while (true) { // 消費蘋果 String s = basket.consume(); System.out.println("取出" + s); // 休眠1000ms TimeUnit.MILLISECONDS.sleep(1000); } } catch (InterruptedException ex) { } return "消費任務成功後返回的參數"; } } ExecutorService service = Executors.newCachedThreadPool(); Producer producer = new Producer("P1"); Producer producer2 = new Producer("P2"); Consumer consumer = new Consumer("C1"); service.submit(producer); // 同時開啓了2個生產者 service.submit(producer2); service.submit(consumer); /* * ExecutorCompletionService<String> ecs = new * ExecutorCompletionService<String>(service);// 任務隊列 * ecs.submit(producer); ecs.submit(producer2); ecs.submit(consumer); * try { System.out.println(ecs.take().get()); } catch * (InterruptedException e1) { // TODO Auto-generated catch block * e1.printStackTrace(); } catch (ExecutionException e1) { // TODO * Auto-generated catch block e1.printStackTrace(); } */ // 程序運行3s後,全部任務中止 try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { } service.shutdownNow(); // 強行中止 // service.shutdown(); //等待任務完成後中止 } public static void main(String[] args) { BlockingQueueTest.testBasket(); } }
附加一個文字說明的地址:http://my.oschina.net/u/153610/blog/23905測試