阻塞隊列(BlockingQueue)是 Java 5 併發新特性中的內容,阻塞隊列的接口是 java.util.concurrent.BlockingQueue,它提供了兩個附加操做:當隊列中爲空時,從隊列中獲取元素的操做將被阻塞;當隊列滿時,向隊列中添加元素的操做將被阻塞。html
阻塞隊列經常使用於生產者和消費者的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是生產者存放元素的容器。java
阻塞隊列提供了四種操做方法:git
JDK7提供了7個阻塞隊列。分別是github
下面分別簡單介紹一下:數組
ArrayBlockingQueue:是一個用數組實現的有界阻塞隊列,此隊列按照先進先出(FIFO)的原則對元素進行排序。支持公平鎖和非公平鎖。【注:每個線程在獲取鎖的時候可能都會排隊等待,若是在等待時間上,先獲取鎖的線程的請求必定先被知足,那麼這個鎖就是公平的。反之,這個鎖就是不公平的。公平的獲取鎖,也就是當前等待時間最長的線程先獲取鎖】緩存
LinkedBlockingDeque: 一個由鏈表結構組成的雙向阻塞隊列。隊列頭部和尾部均可以添加和移除元素,多線程併發時,能夠將鎖的競爭最多降到一半。安全
Java中線程安全的內置隊列還有兩個:ConcurrentLinkedQueue和LinkedTransferQueue,它們使用了CAS這種無鎖的方式來實現了線程安全的隊列。無鎖的方式性能好,可是隊列是無界的,用在生產系統中,生產者生產速度過快,可能致使內存溢出。有界的阻塞隊列ArrayBlockingQueue和LinkedBlockingQueue,爲了減小Java的垃圾回收對系統性能的影響,會盡可能選擇array/heap格式的數據結構。這樣的話就只剩下ArrayBlockingQueue。(先埋個坑在這兒,近來接觸到了disruptor,感受妙趣橫生。disruptor)數據結構
這裏分析下ArrayBlockingQueue的實現原理。多線程
構造方法:併發
ArrayBlockingQueue(int capacity); ArrayBlockingQueue(int capacity, boolean fair); ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)
ArrayBlockingQueue提供了三種構造方法,參數含義以下:
插入元素:
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
從源碼能夠看出,生產者首先得到鎖lock,而後判斷隊列是否已經滿了,若是滿了,則等待,直到被喚醒,而後調用enqueue插入元素。
private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null;
final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
以上是enqueue的實現,實現的操做是插入元素到一個環形數組,而後喚醒notEmpty上阻塞的線程。
獲取元素:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
從源碼能夠看出,消費者首先得到鎖,而後判斷隊列是否爲空,爲空,則等待,直到被喚醒,而後調用dequeue獲取元素。
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null;
final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
以上是dequeue的實現,獲取環形數組當前takeIndex的元素,並及時將當前元素置爲null,設置下一次takeIndex的值takeIndex++,而後喚醒notFull上阻塞的線程。
還有其餘方法offer(E e)
、poll()
、add(E e)
、remove()
、 offer(E e, long timeout, TimeUnit unit)
等的實現,由於經常使用take和put,這些方法就不一一贅述了。
使用阻塞隊列實現生產者-消費者模式:
/** * Created by noly on 2017/5/19. */
public class BlockingQueueTest { public static void main (String[] args) { ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10); Consumer consumer = new Consumer(queue); Producer producer = new Producer(queue); producer.start(); consumer.start(); } } class Consumer extends Thread { private ArrayBlockingQueue<Integer> queue; public Consumer(ArrayBlockingQueue<Integer> queue){ this.queue = queue; } @Override public void run() { while(true) { try { Integer i = queue.take(); System.out.println("消費者從隊列取出元素:" + i); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Producer extends Thread { private ArrayBlockingQueue<Integer> queue; public Producer(ArrayBlockingQueue<Integer> queue){ this.queue = queue; } @Override public void run() { for (int i = 0; i < 100; i++) { try { queue.put(i); System.out.println("生產者向隊列插入元素:" + i); } catch (InterruptedException e) { e.printStackTrace(); } } } }
若是不使用阻塞隊列,使用Object.wait()和Object.notify()、非阻塞隊列實現生產者-消費者模式,考慮線程間的通信,會很是麻煩。