使用Java的BlockingQueue實現生產者-消費者

BlockingQueue也是java.util.concurrent下的主要用來控制線程同步的工具。 java

BlockingQueue有四個具體的實現類,根據不一樣需求,選擇不一樣的實現類
一、ArrayBlockingQueue:一個由數組支持的有界阻塞隊列,規定大小的BlockingQueue,其構造函數必須帶一個int參數來指明其大小.其所含的對象是以FIFO(先入先出)順序排序的。 數組


二、LinkedBlockingQueue:大小不定的BlockingQueue,若其構造函數帶一個規定大小的參數,生成的BlockingQueue有大小限制,若不帶大小參數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定.其所含的對象是以FIFO(先入先出)順序排序的。 ide


三、PriorityBlockingQueue:相似於LinkedBlockQueue,但其所含對象的排序不是FIFO,而是依據對象的天然排序順序或者是構造函數的Comparator決定的順序。 函數


四、SynchronousQueue:特殊的BlockingQueue,對其的操做必須是放和取交替完成的。 工具

 

LinkedBlockingQueue 能夠指定容量,也能夠不指定,不指定的話,默認最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在隊列滿的時候會阻塞直到有隊列成員被消費,take方法在隊列空的時候會阻塞,直到有隊列成員被放進來。 測試

 

生產者消費者的示例代碼: this

生產者: spa

Java代碼 複製代碼  收藏代碼
  1. import java.util.concurrent.BlockingQueue;  
  2.   
  3. public class Producer implements Runnable {  
  4.     BlockingQueue<String> queue;  
  5.   
  6.     public Producer(BlockingQueue<String> queue) {  
  7.         this.queue = queue;  
  8.     }  
  9.   
  10.     @Override  
  11.     public void run() {  
  12.         try {  
  13.             String temp = "A Product, 生產線程:"  
  14.                     + Thread.currentThread().getName();  
  15.             System.out.println("I have made a product:"  
  16.                     + Thread.currentThread().getName());  
  17.             queue.put(temp);//若是隊列是滿的話,會阻塞當前線程  
  18.         } catch (InterruptedException e) {  
  19.             e.printStackTrace();  
  20.         }  
  21.     }  
  22.   
  23. }  
import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {
	BlockingQueue<String> queue;

	public Producer(BlockingQueue<String> queue) {
		this.queue = queue;
	}

	@Override
	public void run() {
		try {
			String temp = "A Product, 生產線程:"
					+ Thread.currentThread().getName();
			System.out.println("I have made a product:"
					+ Thread.currentThread().getName());
			queue.put(temp);//若是隊列是滿的話,會阻塞當前線程
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}

 

 消費者: 線程

Java代碼 複製代碼  收藏代碼
  1. import java.util.concurrent.BlockingQueue;  
  2.   
  3. public class Consumer implements Runnable{  
  4.     BlockingQueue<String> queue;  
  5.       
  6.     public Consumer(BlockingQueue<String> queue){  
  7.         this.queue = queue;  
  8.     }  
  9.       
  10.     @Override  
  11.     public void run() {  
  12.         try {  
  13.             String temp = queue.take();//若是隊列爲空,會阻塞當前線程  
  14.             System.out.println(temp);  
  15.         } catch (InterruptedException e) {  
  16.             e.printStackTrace();  
  17.         }  
  18.     }  
  19. }  
import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable{
	BlockingQueue<String> queue;
	
	public Consumer(BlockingQueue<String> queue){
		this.queue = queue;
	}
	
	@Override
	public void run() {
		try {
			String temp = queue.take();//若是隊列爲空,會阻塞當前線程
			System.out.println(temp);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

 測試類: code

Java代碼 複製代碼  收藏代碼
  1. import java.util.concurrent.ArrayBlockingQueue;  
  2. import java.util.concurrent.BlockingQueue;  
  3. import java.util.concurrent.LinkedBlockingQueue;  
  4.   
  5. public class Test3 {  
  6.   
  7.     public static void main(String[] args) {  
  8.         BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);  
  9.         // BlockingQueue<String> queue = new LinkedBlockingQueue<String>();  
  10.         //不設置的話,LinkedBlockingQueue默認大小爲Integer.MAX_VALUE  
  11.           
  12.         // BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);  
  13.   
  14.         Consumer consumer = new Consumer(queue);  
  15.         Producer producer = new Producer(queue);  
  16.         for (int i = 0; i < 5; i++) {  
  17.             new Thread(producer, "Producer" + (i + 1)).start();  
  18.   
  19.             new Thread(consumer, "Consumer" + (i + 1)).start();  
  20.         }  
  21.     }  
  22. }  
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Test3 {

	public static void main(String[] args) {
		BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);
		// BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
		//不設置的話,LinkedBlockingQueue默認大小爲Integer.MAX_VALUE
		
		// BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);

		Consumer consumer = new Consumer(queue);
		Producer producer = new Producer(queue);
		for (int i = 0; i < 5; i++) {
			new Thread(producer, "Producer" + (i + 1)).start();

			new Thread(consumer, "Consumer" + (i + 1)).start();
		}
	}
}

 打印結果:

Text代碼 複製代碼  收藏代碼
  1. I have made a product:Producer1  
  2. I have made a product:Producer2  
  3. A Product, 生產線程:Producer1  
  4. A Product, 生產線程:Producer2  
  5. I have made a product:Producer3  
  6. A Product, 生產線程:Producer3  
  7. I have made a product:Producer5  
  8. I have made a product:Producer4  
  9. A Product, 生產線程:Producer5  
  10. A Product, 生產線程:Producer4  
I have made a product:Producer1
I have made a product:Producer2
A Product, 生產線程:Producer1
A Product, 生產線程:Producer2
I have made a product:Producer3
A Product, 生產線程:Producer3
I have made a product:Producer5
I have made a product:Producer4
A Product, 生產線程:Producer5
A Product, 生產線程:Producer4

 

因爲隊列的大小限定成了2,因此最多隻有兩個產品被加入到隊列當中,並且消費者取到產品的順序也是按照生產的前後順序,緣由就是LinkedBlockingQueue和ArrayBlockingQueue都是按照FIFO的順序存取元素的。

相關文章
相關標籤/搜索