Java多線程:隊列與阻塞隊列

1. 什麼是阻塞隊列

阻塞隊列(BlockingQueue)是 Java 5 併發新特性中的內容,阻塞隊列的接口是 java.util.concurrent.BlockingQueue,它提供了兩個附加操做:當隊列中爲空時,從隊列中獲取元素的操做將被阻塞;當隊列滿時,向隊列中添加元素的操做將被阻塞。html

阻塞隊列經常使用於生產者和消費者的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是生產者存放元素的容器。java

阻塞隊列提供了四種操做方法:git

  • 拋出異常:當隊列滿時,再向隊列中插入元素,則會拋出IllegalStateException異常。當隊列空時,再向隊列中獲取元素,則會拋出NoSuchElementException異常。
  • 返回特殊值:當隊列滿時,向隊列中添加元素,則返回false,不然返回true。當隊列爲空時,向隊列中獲取元素,則返回null,不然返回元素。
  • 一直阻塞:當阻塞隊列滿時,若是生產者向隊列中插入元素,則隊列會一直阻塞當前線程,直到隊列可用或響應中斷退出。當阻塞隊列爲空時,若是消費者線程向阻塞隊列中獲取數據,則隊列會一直阻塞當前線程,直到隊列空閒或響應中斷退出。
  • 超時退出:當隊列滿時,若是生產線程向隊列中添加元素,則隊列會阻塞生產線程一段時間,超過指定的時間則退出返回false。當隊列爲空時,消費線程從隊列中移除元素,則隊列會阻塞一段時間,若是超過指定時間退出返回null。

2. Java中的阻塞隊列

JDK7提供了7個阻塞隊列。分別是github

下面分別簡單介紹一下:數組

  1. ArrayBlockingQueue:是一個用數組實現的有界阻塞隊列,此隊列按照先進先出(FIFO)的原則對元素進行排序。支持公平鎖和非公平鎖。【注:每個線程在獲取鎖的時候可能都會排隊等待,若是在等待時間上,先獲取鎖的線程的請求必定先被知足,那麼這個鎖就是公平的。反之,這個鎖就是不公平的。公平的獲取鎖,也就是當前等待時間最長的線程先獲取鎖】緩存

  2. LinkedBlockingQueue:一個由鏈表結構組成的有界隊列,此隊列的長度爲Integer.MAX_VALUE。此隊列按照先進先出的順序進行排序。
  3. PriorityBlockingQueue: 一個支持線程優先級排序的無界隊列,默認天然序進行排序,也能夠自定義實現compareTo()方法來指定元素排序規則,不能保證同優先級元素的順序。
  4. DelayQueue: 一個實現PriorityBlockingQueue實現延遲獲取的無界隊列,在建立元素時,能夠指定多久才能從隊列中獲取當前元素。只有延時期滿後才能從隊列中獲取元素。(DelayQueue能夠運用在如下應用場景:1.緩存系統的設計:能夠用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。2.定時任務調度。使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從好比TimerQueue就是使用DelayQueue實現的。)
  5. SynchronousQueue: 一個不存儲元素的阻塞隊列,每個put操做必須等待take操做,不然不能添加元素。支持公平鎖和非公平鎖。SynchronousQueue的一個使用場景是在線程池裏。Executors.newCachedThreadPool()就使用了SynchronousQueue,這個線程池根據須要(新任務到來時)建立新的線程,若是有空閒線程則會重複使用,線程空閒了60秒後會被回收。
  6. LinkedTransferQueue: 一個由鏈表結構組成的無界阻塞隊列,至關於其它隊列,LinkedTransferQueue隊列多了transfer和tryTransfer方法。
  7. LinkedBlockingDeque: 一個由鏈表結構組成的雙向阻塞隊列。隊列頭部和尾部均可以添加和移除元素,多線程併發時,能夠將鎖的競爭最多降到一半。安全

Java中線程安全的內置隊列還有兩個:ConcurrentLinkedQueue和LinkedTransferQueue,它們使用了CAS這種無鎖的方式來實現了線程安全的隊列。無鎖的方式性能好,可是隊列是無界的,用在生產系統中,生產者生產速度過快,可能致使內存溢出。有界的阻塞隊列ArrayBlockingQueue和LinkedBlockingQueue,爲了減小Java的垃圾回收對系統性能的影響,會盡可能選擇array/heap格式的數據結構。這樣的話就只剩下ArrayBlockingQueue。(先埋個坑在這兒,近來接觸到了disruptor,感受妙趣橫生。disruptor數據結構

3. 阻塞隊列的實現原理

這裏分析下ArrayBlockingQueue的實現原理。多線程

構造方法:併發

ArrayBlockingQueue(int capacity); ArrayBlockingQueue(int capacity, boolean fair); ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)

 

ArrayBlockingQueue提供了三種構造方法,參數含義以下:

  • capacity:容量,即隊列大小。
  • fair:是否公平鎖。
  • c:隊列初始化元素,順序按照Collection遍歷順序。

插入元素

public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }

 

從源碼能夠看出,生產者首先得到鎖lock,而後判斷隊列是否已經滿了,若是滿了,則等待,直到被喚醒,而後調用enqueue插入元素。

private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null;
    final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }

 

以上是enqueue的實現,實現的操做是插入元素到一個環形數組,而後喚醒notEmpty上阻塞的線程。

獲取元素

public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }

 

從源碼能夠看出,消費者首先得到鎖,而後判斷隊列是否爲空,爲空,則等待,直到被喚醒,而後調用dequeue獲取元素。

private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null;
    final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }

 

以上是dequeue的實現,獲取環形數組當前takeIndex的元素,並及時將當前元素置爲null,設置下一次takeIndex的值takeIndex++,而後喚醒notFull上阻塞的線程。

還有其餘方法offer(E e)poll()add(E e)remove()、 offer(E e, long timeout, TimeUnit unit)等的實現,由於經常使用take和put,這些方法就不一一贅述了。

4. 阻塞隊列的基本使用

使用阻塞隊列實現生產者-消費者模式:

/** * Created by noly on 2017/5/19. */
public class BlockingQueueTest { public static void main (String[] args) { ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10); Consumer consumer = new Consumer(queue); Producer producer = new Producer(queue); producer.start(); consumer.start(); } } class Consumer extends Thread { private ArrayBlockingQueue<Integer> queue; public Consumer(ArrayBlockingQueue<Integer> queue){ this.queue = queue; } @Override public void run() { while(true) { try { Integer i = queue.take(); System.out.println("消費者從隊列取出元素:" + i); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Producer extends Thread { private ArrayBlockingQueue<Integer> queue; public Producer(ArrayBlockingQueue<Integer> queue){ this.queue = queue; } @Override public void run() { for (int i = 0; i < 100; i++) { try { queue.put(i); System.out.println("生產者向隊列插入元素:" + i); } catch (InterruptedException e) { e.printStackTrace(); } } } }

 

若是不使用阻塞隊列,使用Object.wait()和Object.notify()、非阻塞隊列實現生產者-消費者模式,考慮線程間的通信,會很是麻煩。

參考資料:

    聊聊併發(六)ConcurrentLinkedQueue的實現原理分析

    聊聊併發(七)——Java中的阻塞隊列

    阻塞隊列和ArrayBlockingQueue源碼解析

    高性能隊列——Disruptor

相關文章
相關標籤/搜索