線程阻塞隊列ArrayBlockingQueue

     阻塞隊列是Java5線程新特徵中的內容,Java定義了阻塞隊列的接口java.util.concurrent.BlockingQueue,阻塞隊列的概念是,一個指定長度的隊列,若是隊列滿了,添加新元素的操做會被阻塞等待,直到有空位爲止。一樣,當隊列爲空時候,請求隊列元素的操做一樣會阻塞等待,直到有可用元素爲止。
     有界隊列。當使用有限的 maximumPoolSizes 時,有界隊列(如 ArrayBlockingQueue)有助於防止資源耗盡,可是可能較難調整和控制。隊列大小和最大池大小可能須要相互折衷:使用大型隊列和小型池能夠最大限度地下降 CPU 使用率、操做系統資源和上下文切換開銷,可是可能致使人工下降吞吐量。若是任務頻繁阻塞(例如,若是它們是 I/O 邊界),則系統可能爲超過您許可的更多線程安排時間。使用小型隊列一般要求較大的池大小,CPU 使用率較高,可是可能遇到不可接受的調度開銷,這樣也會下降吞吐量。
java

條件變量 lock: http://my.oschina.net/jielucky/blog/156827 

package com.thread.q;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class QueueTest {
	private final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(
			2);

//	{
//		try {
//			for (int i = 0; i <= 0; i++)
//				queue.put(1);
//		} catch (InterruptedException e) {
//			e.printStackTrace();
//		}
//	}

	public void test() throws InterruptedException, ExecutionException {
		System.out.println("隊列裏總共有:" + queue.size());
		// 建立一個線程池
		ExecutorService pool = Executors.newFixedThreadPool(2);
		// 建立兩個有返回值的任務
		Callable<String> readThread = new ReadThread(queue);
		Callable<String> writeThread = new WriteThread(queue);
		// 執行任務並獲取Future對象
		Future<String> writeFuture = pool.submit(writeThread);
		Future<String> readFuture = pool.submit(readThread);
		// 執行
		// writeFuture.get();
		writeFuture.get();
		readFuture.get();

		System.out.println("隊列裏還有:" + queue.size());
		pool.shutdownNow();
	}

	public static void main(String[] args) {
		try {
			new QueueTest().test();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}
	}
}

class ReadThread implements Callable<String> {
	BlockingQueue<Object> queue;

	ReadThread(BlockingQueue<Object> queue) {
		this.queue = queue;
	}

	@Override
	public String call() throws Exception {
		for (int i = 0; i < 9; i++) {
			Thread.sleep(1000);
			// 獲取並移除此隊列的頭,若是此隊列爲空,則返回 null
			Object obj = queue.poll();
			if (obj == null) {
				continue;
			}
			System.out.println("讀:隊列裏還有 " + queue.size());
		}
		// return "讀:隊列裏還有" + queue.size();
		return "";
	}
}

class WriteThread implements Callable<String> {
	BlockingQueue<Object> queue;

	WriteThread(BlockingQueue<Object> queue) {
		this.queue = queue;
	}

	@Override
	public String call() throws Exception {
		// 將指定的元素插入此隊列的尾部,若是該隊列已滿,則等待可用的空間。
		for (int i = 1; i < 10; i++) {
			queue.put(i);
			System.out.println("寫:隊列裏還有" + queue.size());
		}
		// return "寫:隊列裏還有" + queue.size();
		return "";
	}
}