生產者和消費者模式

生產者和消費者模式,由這個衍生的消息隊列,任務隊列...等等,原理都是這個,下面先上個代碼。方便你們充分想象:java

package com.my.jobQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * <p>
 * 名稱:BlockingQueueTest.java
 * </p>
 * <p>
 * 描述:
 * </p>
 * 
 * <pre>
 * 
 * </pre>
 * 
 * @author 鮑建明
 * @date 2015-1-13 下午4:10:32
 * @version 1.0.0
 */
public class BlockingQueueTest {
 final static AtomicInteger i = new AtomicInteger(0);
 /**
  * 定義裝蘋果的籃子
  */
 public static class Basket {
  // 籃子,可以容納3個蘋果
  // BlockingQueue<String> basket = new ArrayBlockingQueue<String>(3);
  BlockingQueue<String> basket = new LinkedBlockingQueue<String>(100);
  // 生產蘋果,放入籃子
  public void produce() throws InterruptedException {
   // put方法放入一個蘋果,若basket滿了,等到basket有位置
   basket.put("第" + i.get() + "個蘋果");
   System.out.println("放入第" + i.get() + "個蘋果");
   i.getAndAdd(1);
  }
  // 消費蘋果,從籃子中取走
  public String consume() throws InterruptedException {
   // get方法取出一個蘋果,若basket爲空,等到basket有蘋果爲止
   return basket.take();
  }
 }
 //  測試方法
 public static void testBasket() {
  // 創建一個裝蘋果的籃子
  final Basket basket = new Basket();
  // 定義蘋果生產者
  class Producer implements Callable<String> {
   public String instance = "";
   public Producer(String a) {
    instance = a;
   }
   public String call() throws Exception {
    try {
     while (true) {
      // 生產蘋果
      basket.produce();
      // 休眠300ms
      TimeUnit.MILLISECONDS.sleep(300);
     }
    } catch (InterruptedException ex) {
    }
    return "生產任務成功後返回的參數";
   }
  }
  // 定義蘋果消費者
  class Consumer implements Callable<String> {
   public String instance = "";
   public Consumer(String a) {
    instance = a;
   }
   public String call() throws Exception {
    try {
     while (true) {
      // 消費蘋果
      String s = basket.consume();
      System.out.println("取出" + s);
      // 休眠1000ms
      TimeUnit.MILLISECONDS.sleep(1000);
     }
    } catch (InterruptedException ex) {
    }
    return "消費任務成功後返回的參數";
   }
  }
  ExecutorService service = Executors.newCachedThreadPool();
  Producer producer = new Producer("P1");
  Producer producer2 = new Producer("P2");
  Consumer consumer = new Consumer("C1");
  service.submit(producer); // 同時開啓了2個生產者
  service.submit(producer2);
  service.submit(consumer);
  /*
   * ExecutorCompletionService<String> ecs = new
   * ExecutorCompletionService<String>(service);// 任務隊列
   * ecs.submit(producer); ecs.submit(producer2); ecs.submit(consumer);
   * try { System.out.println(ecs.take().get()); } catch
   * (InterruptedException e1) { // TODO Auto-generated catch block
   * e1.printStackTrace(); } catch (ExecutionException e1) { // TODO
   * Auto-generated catch block e1.printStackTrace(); }
   */
  // 程序運行3s後,全部任務中止
  try {
   TimeUnit.SECONDS.sleep(3);
  } catch (InterruptedException e) {
  }
  service.shutdownNow(); // 強行中止
  // service.shutdown(); //等待任務完成後中止
 }
 public static void main(String[] args) {
  BlockingQueueTest.testBasket();
 }
}

附加一個文字說明的地址:http://my.oschina.net/u/153610/blog/23905測試

相關文章
相關標籤/搜索