BlockingQueue 隊列模擬生產者消費者問題

      多線程環境中,經過隊列能夠很容易實現數據共享,好比經典的「生產者」和「消費者」模型中,經過隊列能夠很便利地實現二者之間的數據共享(生產者線程向隊列中放數據,消費者線程從隊列中取線程消費)。java

     首頁 咱們得有一個消費的模型,至少咱們得知道生產什麼,消費什麼吧。多線程

class product {
    private String productID;
	private String productName;
	public String getProductID() {
		return productID;
	}
	public void setProductID(String productID) {
		this.productID = productID;
	}
	public String getProductName() {
		return productName;
	}
	public void setProductName(String productName) {
		this.productName = productName;
	}
	public product(String productID, String productName) {
		super();
		this.productID = productID;
		this.productName = productName;
	}
	public product() {
		super();
	}
	

}

    而後要有一個生產者線程,消費者線程dom

  生產者線程往一個隊列裏面放數據(product),消費者線程從隊列裏取數據(product) 因此 咱們須要一個    隊列   來做爲一個容器。this

BlockingQueue<product> productQueue = new LinkedBlockingQueue<product>(20);

 

 生產者線程spa

  這裏的put方法在隊列滿的狀況下線程會阻塞線程

class produceThread implements Runnable{
	public produceThread(BlockingQueue<product> productQueue) {
		// TODO Auto-generated constructor stub
		this.productQueue = productQueue;
	}

	public static String randSix(){
	     String []baseStr = {"1","2","3","4","5","6","7","8","9","0",
	         "a","b","c","d","e","A","B"};
	     String str="";
	     for(int i = 0 ;i<6;i++){
           	int index = ((int)(100*Math.random()))%baseStr.length;
            str += baseStr[index];
	     }
	     return str;
	}
    BlockingQueue<product> productQueue;
    

	public void run() {
		// TODO Auto-generated method stub
		product p = new product(randSix(),Thread.currentThread().getName()+"線程生產");
		try {
			productQueue.put(p);
			
			
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		System.out.println(Thread.currentThread().getName()+"線程生產了一個產品,產品ID  "+p.getProductID());
	
	}
	
	
}

消費者線程code

take方法在 隊列裏面沒數據的時候會阻塞隊列

class consumeThread implements Runnable{
    BlockingQueue<product> productQueue;
    

	public consumeThread(BlockingQueue<product> productQueue) {
		// TODO Auto-generated constructor stub
		this.productQueue = productQueue;
	}


	public void run() {//把這個 product從隊列裏取出來 
		// TODO Auto-generated method stub
		try {
			product p = productQueue.take();
			System.out.println("消費線程:"+Thread.currentThread().getName()+"線程消費了一個產品,產品ID  "+p.getProductID());
			
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
	}
	
	
}

整個代碼get

    ScheduledExecutorService  service = Executors.newScheduledThreadPool(3); 產品

    這行代碼構造了一個線程池 執行定時任務 這個線程池裏有3個線程

package com.dlh.concurrent;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class blockingQueueService{

	public static void main(String[] args) {
		// TODO Auto-generated method stub

		@SuppressWarnings("unused")
		BlockingQueue<product> productQueue = new LinkedBlockingQueue<product>(20);
        //每隔1秒定時執行
		ScheduledExecutorService  service = Executors.newScheduledThreadPool(3);
        service.scheduleAtFixedRate(new produceThread(productQueue), 1, 1,TimeUnit.SECONDS);
		
        //每隔2秒定時執行
		ScheduledExecutorService  service1 = Executors.newScheduledThreadPool(3);
		service1.scheduleAtFixedRate(new produceThread(productQueue), 1, 2,TimeUnit.SECONDS);
		//每隔3秒定時執行
		ScheduledExecutorService  service2 = Executors.newScheduledThreadPool(3);
		service2.scheduleAtFixedRate(new produceThread(productQueue), 1, 3,TimeUnit.SECONDS);
		
       //每隔4秒定時執行
		ScheduledExecutorService  service3 = Executors.newScheduledThreadPool(3);
		service3.scheduleAtFixedRate(new consumeThread(productQueue), 1, 4,TimeUnit.SECONDS);

	}

}
class produceThread implements Runnable{
	public produceThread(BlockingQueue<product> productQueue) {
		// TODO Auto-generated constructor stub
		this.productQueue = productQueue;
	}

	public static String randSix(){
	     String []baseStr = {"1","2","3","4","5","6","7","8","9","0",
	         "a","b","c","d","e","A","B"};
	     String str="";
	     for(int i = 0 ;i<6;i++){
           	int index = ((int)(100*Math.random()))%baseStr.length;
            str += baseStr[index];
	     }
	     return str;
	}
    BlockingQueue<product> productQueue;
    

	public void run() {
		// TODO Auto-generated method stub
		product p = new product(randSix(),Thread.currentThread().getName()+"線程生產");
		try {
			productQueue.put(p);
			
			
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		System.out.println(Thread.currentThread().getName()+"線程生產了一個產品,產品ID  "+p.getProductID());
	
	}
	
	
}


class consumeThread implements Runnable{
    BlockingQueue<product> productQueue;
    

	public consumeThread(BlockingQueue<product> productQueue) {
		// TODO Auto-generated constructor stub
		this.productQueue = productQueue;
	}


	public void run() {//把這個 product從隊列裏取出來 要不要考慮 對這個product處理的結果 好比處理成功再 從take取一個product
		// TODO Auto-generated method stub
		try {
			product p = productQueue.take();
			System.out.println("消費線程:"+Thread.currentThread().getName()+"線程消費了一個產品,產品ID  "+p.getProductID());
			
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
	}
	
	
}
 class product {
    private String productID;
	private String productName;
	public String getProductID() {
		return productID;
	}
	public void setProductID(String productID) {
		this.productID = productID;
	}
	public String getProductName() {
		return productName;
	}
	public void setProductName(String productName) {
		this.productName = productName;
	}
	public product(String productID, String productName) {
		super();
		this.productID = productID;
		this.productName = productName;
	}
	public product() {
		super();
	}
	

}

下面咱們運行下代碼

 

pool-2-thread-1線程生產了一個產品,產品ID  41184a
pool-1-thread-1線程生產了一個產品,產品ID  a84d56
消費線程:pool-4-thread-1線程消費了一個產品,產品ID  41184a
pool-3-thread-1線程生產了一個產品,產品ID  0113A7
pool-1-thread-1線程生產了一個產品,產品ID  A619a5  ----------pool-1 的第一個線程
pool-1-thread-2線程生產了一個產品,產品ID  11eAe9
pool-2-thread-1線程生產了一個產品,產品ID  5bb9a2
pool-1-thread-2線程生產了一個產品,產品ID  Bec28A  ----------pool-1 的第二個線程
pool-3-thread-1線程生產了一個產品,產品ID  B5a017
pool-1-thread-2線程生產了一個產品,產品ID  c8A0d8
pool-2-thread-2線程生產了一個產品,產品ID  91Ad21
消費線程:pool-4-thread-1線程消費了一個產品,產品ID  a84d56
pool-1-thread-2線程生產了一個產品,產品ID  b07A53
pool-1-thread-2線程生產了一個產品,產品ID  974c25
pool-2-thread-1線程生產了一個產品,產品ID  6B36bB
pool-3-thread-2線程生產了一個產品,產品ID  70d49b
pool-1-thread-1線程生產了一個產品,產品ID  b67cbe
pool-2-thread-3線程生產了一個產品,產品ID  223261
消費線程:pool-4-thread-2線程消費了一個產品,產品ID  0113A7
pool-1-thread-1線程生產了一個產品,產品ID  cad53e
pool-3-thread-1線程生產了一個產品,產品ID  b30e6B
pool-1-thread-1線程生產了一個產品,產品ID  ed6a76
pool-1-thread-1線程生產了一個產品,產品ID  702c31
pool-2-thread-3線程生產了一個產品,產品ID  1757b1
pool-1-thread-1線程生產了一個產品,產品ID  Ab22c2
pool-1-thread-3線程生產了一個產品,產品ID  a3a567 ----------pool-1 的第三個線程
消費線程:pool-4-thread-1線程消費了一個產品,產品ID  A619a5
pool-2-thread-3線程生產了一個產品,產品ID  cBd508
相關文章
相關標籤/搜索