本文使用java語言藉助java併發庫去實現生產者和消費者問題。主要設計思路:1.物料池是共享容器;2.生產者只負責生產物料,添加到物料池中;3.消費者從池中獲取物料。在這裏使用ReenTranLock控制共享容器的同步,使用Conditona作線程間的通知,當物料池滿的時候掛起生產者,而且喚醒消費者去消費池中物料,當池中無物料的時候,掛起消費者,喚醒生產者生產物料。java
在編碼以前我須要先對生產者、消費者、物料池作一個簡單的分析:併發
1.消費者和生產者他們的任務都是單一的,消費者消費物料,生產者生產物料,消費者和生產者對外只須要記住是哪一個物料池就好了。ide
2.共享數據控制同步應該同一個類中完成,這樣控制方便,並且簡單。測試
3.發出通知的應該是物料池。由於只有它本身知道本身的狀態,消費者和生產者纔不會關心它。冷暖自知!!this
在作了簡單的分析以後,清楚了各個對象的功能。接下來就是設計了。涉及到具體的地方無會在代碼中註釋,就不在這乾巴巴的說了。
編碼
核心-物料池atom
package com.autonavi.pc; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /*** * 物料池 * @author 零下三度 * */ public class Pool { private ReentrantLock lock = new ReentrantLock(); private Condition putCondition = lock.newCondition(); private Condition getCondition = lock.newCondition(); private Product[] productPool = new Product[10]; private int size;//池中物料的數量 private int currPutIndex;//當前添加物料的索引 private int currGetIndex;//當前獲取物料的索引 private Pool(){ size = 0; currPutIndex = 0; currGetIndex = 0; } public static Pool getPool(){ return new Pool(); } /** * 生產者向物料池添加一個商品 * @param product * @throws InterruptedException */ public void put(Product product) throws InterruptedException{ try{ lock.lock(); //若是物料池滿了,則再也不容許向物料池中添加物料。 while(size == productPool.length){ System.out.println("物料池已經滿了,暫時不能添加產品了,請耐心等待.....當前池中物料爲:"+size); putCondition.await(); } productPool[currPutIndex] = product; if(++currPutIndex == productPool.length){ currPutIndex = 0; } ++size; //注意:因爲終端是共享資源,放在此處才能看到真正的測試結果過 System.out.println(product.toString()+"已經添加到物料池中,當前池中產品個數:"+size+",currPutIndex="+currPutIndex); //添加了物料,池中有可用的物料,通知消費者能夠從池中獲取物料 getCondition.signal(); }finally{ lock.unlock(); } } /*** * 消費者從物料池中獲取一個商品。 * @return * @throws InterruptedException */ public Product get() throws InterruptedException{ try{ lock.lock(); //若是池中沒有物料,則禁止消費者從池中獲取物料 while(size == 0){ System.out.println("目前沒有物料,暫時沒法獲取產品,請耐心等待.....當前池中物料數量:"+size); getCondition.await(); } Product p = productPool[currGetIndex]; productPool[currGetIndex] = null; if(++currGetIndex == productPool.length){ currGetIndex = 0; } --size; System.out.println("出庫的是:"+p.toString()+"當前池中還有產品個數:"+size+",currGetIndex="+currGetIndex); putCondition.signal(); return p; }finally{ lock.unlock(); } } }
生產者:spa
package com.autonavi.pc; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /*** * 生產者 * @author 零下三度 * */ public class Producers implements Runnable{ private AtomicLong id = new AtomicLong(0); private Pool pool; private String productName; public Producers(){ } public Producers(Pool pool,String productName,AtomicLong id){ this.id = id; this.pool = pool; this.productName = productName; } public Pool getPool() { return pool; } public void setPool(Pool pool) { this.pool = pool; } public String getProductName() { return productName; } public void setProductName(String productName) { this.productName = productName; } public AtomicLong getId() { return id; } public void setId(AtomicLong id) { this.id = id; } /*** * 生產者的工做就是生產商品,添加到物料吃中 * 至於何時中止,何時開始,是須要被人去給他消息的。 */ public void run() { Product p; try { while(true){ TimeUnit.MILLISECONDS.sleep(200); p = new Product(""+id.incrementAndGet(),productName); pool.put(p); } }catch (InterruptedException e) { e.printStackTrace(); } } public void start(){ Thread t = new Thread(this); t.start(); } }
消費者:線程
package com.autonavi.pc; import java.util.concurrent.TimeUnit; /*** * 消費者 * @author 零下三度 * */ public class Consumers implements Runnable{ private Pool pool; public Pool getPool() { return pool; } public void setPool(Pool pool) { this.pool = pool; } /*** * 消費者的工做就是消費物料池中的商品 */ public void run() { try { Product p; while(true){ TimeUnit.MILLISECONDS.sleep(200); p = pool.get(); } } catch (InterruptedException e) { e.printStackTrace(); } } public void start(){ Thread t = new Thread(this); t.start(); } }
物料-產品:設計
package com.autonavi.pc; /*** * 商品 * @author 零下三度 * */ public class Product { private String id; private String name; public Product(){ } public Product(String id, String name) { this.id = id; this.name = name; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { String val = "id:"+id+"\n"+"name:"+name+"\n"; return val; } @Override public boolean equals(Object obj) { if(obj != null && obj instanceof Product){ Product p = (Product)obj; if(this.id != null && this.id.equals(p.getId())){ return true; } } return false; } @Override public int hashCode() { return id.hashCode(); } }
測試用例:
package com.autonavi.pc; import java.util.concurrent.atomic.AtomicLong; public class ProductsAndConsumersTest { public static void main(String[] args) throws InterruptedException { Pool pool = Pool.getPool(); Producers p = new Producers(pool,"產品A",new AtomicLong(0)); System.out.println("啓動生產者"); p.start(); Consumers c = new Consumers(); c.setPool(pool); System.out.println("啓動消費者"); c.start(); } }