生產者和消費問題

       本文使用java語言藉助java併發庫去實現生產者和消費者問題。主要設計思路:1.物料池是共享容器;2.生產者只負責生產物料,添加到物料池中;3.消費者從池中獲取物料。在這裏使用ReenTranLock控制共享容器的同步,使用Conditona作線程間的通知,當物料池滿的時候掛起生產者,而且喚醒消費者去消費池中物料,當池中無物料的時候,掛起消費者,喚醒生產者生產物料。java

      在編碼以前我須要先對生產者、消費者、物料池作一個簡單的分析:併發

       1.消費者和生產者他們的任務都是單一的,消費者消費物料,生產者生產物料,消費者和生產者對外只須要記住是哪一個物料池就好了。ide

       2.共享數據控制同步應該同一個類中完成,這樣控制方便,並且簡單。測試

       3.發出通知的應該是物料池。由於只有它本身知道本身的狀態,消費者和生產者纔不會關心它。冷暖自知!!this

     在作了簡單的分析以後,清楚了各個對象的功能。接下來就是設計了。涉及到具體的地方無會在代碼中註釋,就不在這乾巴巴的說了。
編碼

     核心-物料池atom

package com.autonavi.pc;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/***
 * 物料池
 * @author 零下三度
 *
 */
public class Pool {
	
	private ReentrantLock lock = new ReentrantLock();
	private Condition putCondition = lock.newCondition();
	private Condition getCondition = lock.newCondition();
	private Product[] productPool = new Product[10];
	private int size;//池中物料的數量
	private int currPutIndex;//當前添加物料的索引
	private int currGetIndex;//當前獲取物料的索引
	private Pool(){
		size = 0;
		currPutIndex = 0;
		currGetIndex = 0;
	}
	
	public static Pool getPool(){
		return new Pool();
	}

	/**
	 * 生產者向物料池添加一個商品
	 * @param product
	 * @throws InterruptedException 
	 */
	public void put(Product product) throws InterruptedException{
		try{
			lock.lock();
			//若是物料池滿了,則再也不容許向物料池中添加物料。
			while(size == productPool.length){
				System.out.println("物料池已經滿了,暫時不能添加產品了,請耐心等待.....當前池中物料爲:"+size);
				putCondition.await();
			}
			productPool[currPutIndex] = product;
	        if(++currPutIndex == productPool.length){
	        	currPutIndex = 0;
	        }
			++size;
			//注意:因爲終端是共享資源,放在此處才能看到真正的測試結果過
			System.out.println(product.toString()+"已經添加到物料池中,當前池中產品個數:"+size+",currPutIndex="+currPutIndex);
			//添加了物料,池中有可用的物料,通知消費者能夠從池中獲取物料
			getCondition.signal();
		}finally{
			lock.unlock();
		}
	}
	/***
	 * 消費者從物料池中獲取一個商品。
	 * @return
	 * @throws InterruptedException 
	 */
	public Product get() throws InterruptedException{
		try{
			lock.lock();
			//若是池中沒有物料,則禁止消費者從池中獲取物料
			while(size == 0){
				System.out.println("目前沒有物料,暫時沒法獲取產品,請耐心等待.....當前池中物料數量:"+size);
				getCondition.await();
			}
			Product p = productPool[currGetIndex];
			productPool[currGetIndex] = null;
			if(++currGetIndex == productPool.length){
				currGetIndex = 0;
			}
			--size;
			System.out.println("出庫的是:"+p.toString()+"當前池中還有產品個數:"+size+",currGetIndex="+currGetIndex);
			putCondition.signal();
			return p;
		}finally{
			lock.unlock();
		}
	}
}

生產者:spa

package com.autonavi.pc;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/***
 * 生產者
 * @author 零下三度
 *
 */
public class Producers implements Runnable{
	
	private AtomicLong id = new AtomicLong(0);
	private Pool pool;
	private String productName;
	
	public Producers(){
	}
	
	public Producers(Pool pool,String productName,AtomicLong id){
		this.id = id;
		this.pool = pool;
		this.productName = productName;
	}

	public Pool getPool() {
		return pool;
	}


	public void setPool(Pool pool) {
		this.pool = pool;
	}

	public String getProductName() {
		return productName;
	}


	public void setProductName(String productName) {
		this.productName = productName;
	}

	
	public AtomicLong getId() {
		return id;
	}

	
	public void setId(AtomicLong id) {
		this.id = id;
	}


	/***
	 * 生產者的工做就是生產商品,添加到物料吃中
	 * 至於何時中止,何時開始,是須要被人去給他消息的。
	 */
	public void run() {
		Product p;
		try {
			while(true){
				TimeUnit.MILLISECONDS.sleep(200);
				p = new Product(""+id.incrementAndGet(),productName);
				pool.put(p);
				
			} 
		}catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	public void start(){
		Thread t = new Thread(this);
		t.start();
	}
	

}

消費者:線程

package com.autonavi.pc;

import java.util.concurrent.TimeUnit;

/***
 * 消費者
 * @author 零下三度
 *
 */
public class Consumers implements Runnable{
	
	private Pool pool;
	
	

	public Pool getPool() {
		return pool;
	}



	public void setPool(Pool pool) {
		this.pool = pool;
	}



	/***
	 * 消費者的工做就是消費物料池中的商品
	 */
	public void run() {
		try {
			Product p;
			while(true){
				TimeUnit.MILLISECONDS.sleep(200);
				p = pool.get();
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
	public void start(){
		Thread t = new Thread(this);
		t.start();
	}

}

 物料-產品:設計

package com.autonavi.pc;
/***
 * 商品
 * @author 零下三度
 *
 */
public class Product {

	private String id;
	private String name;
	
	public Product(){
		
	}
	
	public Product(String id, String name) {
		this.id = id;
		this.name = name;
	}

	public String getId() {
		return id;
	}
	public void setId(String id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	
	
	@Override
	public String toString() {
		String val = "id:"+id+"\n"+"name:"+name+"\n";
		return val;
	}

	@Override
	public boolean equals(Object obj) {
		if(obj != null && obj instanceof Product){
			Product p = (Product)obj;
			if(this.id != null && this.id.equals(p.getId())){
				return true;
			}
		}
		return false;
	}

	@Override
	public int hashCode() {
		return id.hashCode();
	}
	
	
	
	
}

測試用例:

package com.autonavi.pc;

import java.util.concurrent.atomic.AtomicLong;

public class ProductsAndConsumersTest {

	public static void main(String[] args) throws InterruptedException {
		Pool pool = Pool.getPool();
		Producers p = new Producers(pool,"產品A",new AtomicLong(0));
		System.out.println("啓動生產者");
		p.start();
		Consumers c = new Consumers();
		c.setPool(pool);
		System.out.println("啓動消費者");
		c.start();

	}

}
相關文章
相關標籤/搜索