線程併發BlockingQueue類

package com.yao;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * BlockingQueue是一種特殊的Queue,若BlockingQueue是空的,
 * 從BlockingQueue取東西的操做將會被阻斷進入等待狀態直到BlocingkQueue進了新貨纔會被喚醒。
 * 一樣,若是BlockingQueue是滿的任何試圖往裏存東西的操做也會被阻斷進入等待狀態,
 * 直到BlockingQueue裏有新的空間纔會被喚醒繼續操做。
 * BlockingQueue提供的方法主要有:
 * add(anObject): 把anObject加到BlockingQueue裏,若是BlockingQueue能夠容納返回true,不然拋出IllegalStateException異常。 
 * offer(anObject):把anObject加到BlockingQueue裏,若是BlockingQueue能夠容納返回true,不然返回false。 
 * put(anObject):把anObject加到BlockingQueue裏,若是BlockingQueue沒有空間,調用此方法的線程被阻斷直到BlockingQueue裏有新的空間再繼續。 
 * poll(time):取出BlockingQueue裏排在首位的對象,若不能當即取出可等time參數規定的時間。取不到時返回null。 
 * take():取出BlockingQueue裏排在首位的對象,若BlockingQueue爲空,阻斷進入等待狀態直到BlockingQueue有新的對象被加入爲止。
 * 
 * 根據不一樣的須要BlockingQueue有4種具體實現:
 * (1)ArrayBlockingQueue:規定大小的BlockingQueue,其構造函數必須帶一個int參數來指明其大小。其所含的對象是以FIFO(先入先出)順序排序的。 
 * (2)LinkedBlockingQueue:大小不定的BlockingQueue,若其構造函數帶一個規定大小的參數,生成的BlockingQueue有大小限制,
 * 若不帶大小參數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定。其所含的對象是以FIFO(先入先出)順序排序的。
 * LinkedBlockingQueue和ArrayBlockingQueue比較起來,它們背後所用的數據結構不同,
 * 致使LinkedBlockingQueue的數據吞吐量要大於ArrayBlockingQueue,但在線程數量很大時其性能的可預見性低於ArrayBlockingQueue。 
 * (3)PriorityBlockingQueue:相似於LinkedBlockingQueue,但其所含對象的排序不是FIFO,而是依據對象的天然排序順序或者是構造函數所帶的Comparator決定的順序。 
 * (4)SynchronousQueue:特殊的BlockingQueue,對其的操做必須是放和取交替完成的。
 * 
 * 下面是用BlockingQueue來實現Producer和Consumer的例子
 */
public class BlockingQueueTest {

	/**
	 * 定義裝蘋果的籃子
	 */
	public static class Basket{
		// 籃子,可以容納3個蘋果
		BlockingQueue<String> basket = new ArrayBlockingQueue<String>(3);
		
		// 生產蘋果,放入籃子
		public void produce() throws InterruptedException{
			// put方法放入一個蘋果,若basket滿了,等到basket有位置
			basket.put("An apple");
		}
		// 消費蘋果,從籃子中取走
		public String consume() throws InterruptedException{
			// get方法取出一個蘋果,若basket爲空,等到basket有蘋果爲止
			String apple = basket.take();
			return apple;
		}
		
		public int getAppleNumber(){
			return basket.size();
		}
		
	}
	// 測試方法
	public static void testBasket() {
		// 創建一個裝蘋果的籃子
		final Basket basket = new Basket();
		// 定義蘋果生產者
		class Producer implements Runnable {
			public void run() {
				try {
					while (true) {
						// 生產蘋果
						System.out.println("生產者準備生產蘋果:" 
								+ System.currentTimeMillis());
						basket.produce();
						System.out.println("生產者生產蘋果完畢:" 
								+ System.currentTimeMillis());
						System.out.println("生產完後有蘋果:"+basket.getAppleNumber()+"個");
						// 休眠300ms
						Thread.sleep(300);
					}
				} catch (InterruptedException ex) {
				}
			}
		}
		// 定義蘋果消費者
		class Consumer implements Runnable {
			public void run() {
				try {
					while (true) {
						// 消費蘋果
						System.out.println("消費者準備消費蘋果:" 
								+ System.currentTimeMillis());
						basket.consume();
						System.out.println("消費者消費蘋果完畢:" 
								+ System.currentTimeMillis());
						System.out.println("消費完後有蘋果:"+basket.getAppleNumber()+"個");
						// 休眠1000ms
						Thread.sleep(1000);
					}
				} catch (InterruptedException ex) {
				}
			}
		}
		
		ExecutorService service = Executors.newCachedThreadPool();
		Producer producer = new Producer();
		Consumer consumer = new Consumer();
		service.submit(producer);
		service.submit(consumer);
		// 程序運行10s後,全部任務中止
		try {
			Thread.sleep(10000);
		} catch (InterruptedException e) {
		}
		service.shutdownNow();
	}

	public static void main(String[] args) {
		BlockingQueueTest.testBasket();
	}
}
相關文章
相關標籤/搜索