併發編程之併發隊列

1、併發隊列

在併發隊列上JDK提供了兩套實現,java

一個是以ConcurrentLinkedQueue爲表明的高性能隊列非阻塞,數組

一個是以BlockingQueue接口爲表明的阻塞隊列,不管哪一種都繼承自Queue。安全

一、阻塞隊列與非阻塞隊

阻塞隊列與普通隊列的區別在於:併發

阻塞隊列:ide

  • 當隊列是空的時,從隊列中獲取元素的操做將會被阻塞,試圖從空的阻塞隊列中獲取元素的線程將會被阻塞,直到其餘的線程往空的隊列插入新的元素;
  • 當隊列是滿時,往隊列裏添加元素的操做會被阻塞。試圖往已滿的阻塞隊列中添加新元素的線程一樣也會被阻塞,直到其餘的線程使隊列從新變得空閒起來,如從隊列中移除一個或者多個元素,或者徹底清空隊列.

二、ConcurrentLinkedQeque

ConcurrentLinkedQueue : 是一個適用於高併發場景下的隊列,經過無鎖的方式,實現 了高併發狀態下的高性能,一般ConcurrentLinkedQueue性能好於BlockingQueue.它 是一個基於連接節點的無界線程安全隊列。該隊列的元素遵循先進先出的原則。頭是最早 加入的,尾是最近加入的,該隊列不容許null元素。高併發

// 非阻塞式隊列,無界隊列
ConcurrentLinkedDeque q = new ConcurrentLinkedDeque();
	q.offer("張三");
	q.offer("李四");
	q.offer("王五");
	//從頭獲取元素,刪除該元素
	System.out.println(q.poll());
	//從頭獲取元素,不刪除該元素
	System.out.println(q.peek());
	//獲取總長度
	System.out.println(q.size());

三、BlockingQueue

阻塞隊列(BlockingQueue)是一個支持兩個附加操做的隊列。這兩個附加的操做是:性能

  • 在隊列爲空時,獲取元素的線程會等待隊列變爲非空。
  • 當隊列滿時,存儲元素的線程會等待隊列可用。

在Java中,BlockingQueue的接口位於java.util.concurrent 包中(在Java5版本開始提供),由上面介紹的阻塞隊列的特性可知,阻塞隊列是線程安全的。this

1)、ArrayBlockingQueue

ArrayBlockingQueue是一個有邊界的阻塞隊列,它的內部實現是一個數組。有邊界的意思是它的容量是有限的,咱們必須在其初始化的時候指定它的容量大小,容量大小一旦指定就不可改變。線程

ArrayBlockingQueue是以先進先出的方式存儲數據,最新插入的對象是尾部,最新移出的對象是頭部。下面 是一個初始化和使用ArrayBlockingQueue的例子:code

<String> arrays = new ArrayBlockingQueue<String>(3);
	arrays.offer("張三");
	 arrays.offer("李四");
	arrays.offer("王五");
	arrays.offer("666", 3, TimeUnit.SECONDS); // 隊列滿了,阻塞3秒後向下執行
	System.out.println(arrays.poll()); // 張三
	System.out.println(arrays.poll()); // 李四
	System.out.println(arrays.poll()); // 王五
	System.out.println(arrays.poll(3, TimeUnit.SECONDS)); //隊列爲空,阻塞3秒後結束

2)、LinkedBlockingQueue

LinkedBlockingQueue阻塞隊列大小的配置是可選的,若是咱們初始化時指定一個大小,它就是有邊界的,若是不指定,它就是無邊界的。說是無邊界,實際上是採用了默認大小爲Integer.MAX_VALUE的容量 。它的內部實現是一個鏈表。

和ArrayBlockingQueue同樣,LinkedBlockingQueue 也是以先進先出的方式存儲數據,最新插入的對象是尾部,最新移出的對象是頭部。下面是一個初始化和使LinkedBlockingQueue的例子:

LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(3);
linkedBlockingQueue.add("張三");
linkedBlockingQueue.add("李四");
linkedBlockingQueue.add("李四");
System.out.println(linkedBlockingQueue.size()); // 3

3)、PriorityBlockingQueue(有界,快滿時自動擴容,看似無界)

PriorityBlockingQueue是一個沒有邊界的隊列,它的排序規則和 java.util.PriorityQueue同樣。須要注意,PriorityBlockingQueue中容許插入null對象。

全部插入PriorityBlockingQueue的對象必須實現 java.lang.Comparable接口,隊列優先級的排序規則就 是按照咱們對這個接口的實現來定義的。

另外,咱們能夠從PriorityBlockingQueue得到一個迭代器Iterator,但這個迭代器並不保證按照優先級順序進行迭代。

4)、SynchronousQueue

SynchronousQueue隊列內部僅容許容納一個元素。當一個線程插入一個元素後會被阻塞,除非這個元素被另外一個線程消費。

5)、使用BlockingQueue模擬生產者與消費者

class ProducerThread implements Runnable {
	private BlockingQueue<String> blockingQueue;
	private AtomicInteger count = new AtomicInteger();
	private volatile boolean FLAG = true;

	public ProducerThread(BlockingQueue<String> blockingQueue) {
		this.blockingQueue = blockingQueue;
	}

	@Override
	public void run() {
		System.out.println(Thread.currentThread().getName() + "生產者開始啓動....");
		while (FLAG) {
			String data = count.incrementAndGet() + "";
			try {
				boolean offer = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
				if (offer) {
					System.out.println(Thread.currentThread().getName() + ",生產隊列" + data + "成功..");
				} else {
					System.out.println(Thread.currentThread().getName() + ",生產隊列" + data + "失敗..");
				}
				Thread.sleep(1000);
			} catch (Exception e) {

			}
		}
		System.out.println(Thread.currentThread().getName() + ",生產者線程中止...");
	}

	public void stop() {
		this.FLAG = false;
	}

}

class ConsumerThread implements Runnable {
	private volatile boolean FLAG = true;
	private BlockingQueue<String> blockingQueue;

	public ConsumerThread(BlockingQueue<String> blockingQueue) {
		this.blockingQueue = blockingQueue;
	}

	@Override
	public void run() {
		System.out.println(Thread.currentThread().getName() + "消費者開始啓動....");
		while (FLAG) {
			try {
				String data = blockingQueue.poll(2, TimeUnit.SECONDS);
				if (data == null || data == "") {
					FLAG = false;
					System.out.println("消費者超過2秒時間未獲取到消息.");
					return;
				}
				System.out.println("消費者獲取到隊列信息成功,data:" + data);

			} catch (Exception e) {
				// TODO: handle exception
			}
		}
	}

}

public class Test0008 {

	public static void main(String[] args) {
		BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(3);
		ProducerThread producerThread = new ProducerThread(blockingQueue);
		ConsumerThread consumerThread = new ConsumerThread(blockingQueue);
		Thread t1 = new Thread(producerThread);
		Thread t2 = new Thread(consumerThread);
		t1.start();
		t2.start();
		//10秒後 中止線程..
		try {
			Thread.sleep(10*1000);
			producerThread.stop();
		} catch (Exception e) {
			// TODO: handle exception
		}
	}

}
  1. ArrayDeque, (數組雙端隊列) 
  2. PriorityQueue, (優先級隊列) 
  3. ConcurrentLinkedQueue, (基於鏈表的併發隊列) 
  4. DelayQueue, (延期阻塞隊列)(阻塞隊列實現了BlockingQueue接口) 
  5. ArrayBlockingQueue, 經常使用(基於數組的併發阻塞隊列) 
  6. LinkedBlockingQueue, 經常使用(基於鏈表的FIFO阻塞隊列) 
  7. LinkedBlockingDeque, (基於鏈表的FIFO雙端阻塞隊列) 
  8. PriorityBlockingQueue,經常使用 (帶優先級的無界阻塞隊列,) 
  9. SynchronousQueue經常使用 (併發同步阻塞隊列)

本文由博客一文多發平臺 OpenWrite 發佈!

相關文章
相關標籤/搜索