java 隊列

隊列簡述

Queue: 基本上,一個隊列就是一個先入先出(FIFO)的數據結構
Queue接口與List、Set同一級別,都是繼承了Collection接口。LinkedList實現了Deque接 口。
在併發隊列上JDK提供了兩套實現,一個是以ConcurrentLinkedQueue爲表明的高性能隊列非阻塞,一個是以BlockingQueue接口爲表明的阻塞隊列,不管哪一種都繼承自Queuehtml

阻塞隊列與非阻塞隊列

阻塞隊列與普通隊列的區別在於,當隊列是空的時,從隊列中獲取元素的操做將會被阻塞,或者當隊列是滿時,往隊列裏添加元素的操做會被阻塞。試圖從空的阻塞隊列中獲取元素的線程將會被阻塞,直到其餘的線程往空的隊列插入新的元素。一樣,試圖往已滿的阻塞隊列中添加新元素的線程一樣也會被阻塞,直到其餘的線程使隊列從新變得空閒起來,如從隊列中移除一個或者多個元素,或者徹底清空隊列java

clipboard.png
(圖片來自網絡https://www.cnblogs.com/lemon...程序員

非阻塞隊列

沒有實現的阻塞接口的LinkedList: 實現了java.util.Queue接口和java.util.AbstractQueue接口數組

內置的不阻塞隊列: PriorityQueue 和 ConcurrentLinkedQueue安全

PriorityQueue 和 ConcurrentLinkedQueue 類在 Collection Framework 中加入兩個具體集合實現。 網絡

PriorityQueue 類實質上維護了一個有序列表。加入到 Queue 中的元素根據它們的自然排序(經過其 java.util.Comparable 實現)或者根據傳遞給構造函數的 java.util.Comparator 實現來定位。數據結構

ConcurrentLinkedQueue 是基於連接節點的、線程安全的隊列。併發訪問不須要同步。由於它在隊列的尾部添加元素並從頭部刪除它們,因此只要不須要知道隊列的大小,ConcurrentLinkedQueue 對公共集合的共享訪問就能夠工做得很好。收集關於隊列大小的信息會很慢,須要遍歷隊列。多線程

ConcurrentLinkedQueue : 是一個適用於高併發場景下的隊列,經過無鎖的方式,實現
了高併發狀態下的高性能,一般ConcurrentLinkedQueue性能好於BlockingQueue.它
是一個基於連接節點的無界線程安全隊列。該隊列的元素遵循先進先出的原則。頭是最早
加入的,尾是最近加入的,該隊列不容許null元素。併發

ConcurrentLinkedQueue重要方法:
add 和offer() 都是加入元素的方法(在ConcurrentLinkedQueue中這倆個方法沒有任何區別)
poll() 和peek() 都是取頭元素節點,區別在於前者會刪除元素,後者不會。app

ConcurrentLinkedQueue例子

@RequestMapping("test-clq")
    public void testConcurrentLinkedQueue() {
        ConcurrentLinkedDeque<String> q = new ConcurrentLinkedDeque<>();
        q.offer("Java");
        q.offer("C#");
        q.offer("Javascript");
        q.offer("Python");
        // 從頭獲取元素,刪除該元素
        System.out.println(q.poll());
        // 從頭獲取元素,不刪除該元素
        System.out.println(q.peek());
        // 獲取總長度
        System.out.println(q.size());
        // 遍歷
        for (String s : q) {
            System.out.println(s);
        }
    }

結果:
Java
C#
3
C#
Javascript
Python

BlockingQueue

阻塞隊列,顧名思義,首先它是一個隊列,經過一個共享的隊列,可使得數據由隊列的一端輸入,從另一端輸出;
經常使用的隊列主要有如下兩種:(固然經過不一樣的實現方式,還能夠延伸出不少不一樣類型的隊列,DelayQueue就是其中的一種)

  • 先進先出(FIFO):先插入的隊列的元素也最早出隊列,相似於排隊的功能。從某種程度上來講這種隊列也體現了一種公平性。
  • 後進先出(LIFO):後插入隊列的元素最早出隊列,這種隊列優先處理最近發生的事件。

多線程環境中,經過隊列能夠很容易實現數據共享,好比經典的「生產者」和「消費者」模型中,經過隊列能夠很便利地實現二者之間的數據共享。假設咱們有若干生產者線程,另外又有若干個消費者線程。若是生產者線程須要把準備好的數據共享給消費者線程,利用隊列的方式來傳遞數據,就能夠很方便地解決他們之間的數據共享問題。但若是生產者和消費者在某個時間段內,萬一發生數據處理速度不匹配的狀況呢?理想狀況下,若是生產者產出數據的速度大於消費者消費的速度,而且當生產出來的數據累積到必定程度的時候,那麼生產者必須暫停等待一下(阻塞生產者線程),以便等待消費者線程把累積的數據處理完畢,反之亦然。然而,在concurrent包發佈之前,在多線程環境下,咱們每一個程序員都必須去本身控制這些細節,尤爲還要兼顧效率和線程安全,而這會給咱們的程序帶來不小的複雜度。好在此時,強大的concurrent包橫空出世了,而他也給咱們帶來了強大的BlockingQueue。(在多線程領域:所謂阻塞,在某些狀況下會掛起線程(即阻塞),一旦條件知足,被掛起的線程又會自動被喚醒)

阻塞隊列(BlockingQueue)是一個支持兩個附加操做的隊列。這兩個附加的操做是:
在隊列爲空時,獲取元素的線程會等待隊列變爲非空。
當隊列滿時,存儲元素的線程會等待隊列可用。
阻塞隊列經常使用於生產者和消費者的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器裏拿元素。
BlockingQueue即阻塞隊列,從阻塞這個詞能夠看出,在某些狀況下對阻塞隊列的訪問可能會形成阻塞。被阻塞的狀況主要有以下兩種:

  1. 當隊列滿了的時候進行入隊列操做
  2. 當隊列空了的時候進行出隊列操做

所以,當一個線程試圖對一個已經滿了的隊列進行入隊列操做時,它將會被阻塞,除非有另外一個線程作了出隊列操做;一樣,當一個線程試圖對一個空隊列進行出隊列操做時,它將會被阻塞,除非有另外一個線程進行了入隊列操做。
在Java中,BlockingQueue的接口位於java.util.concurrent 包中(在Java5版本開始提供),由上面介紹的阻塞隊列的特性可知,阻塞隊列是線程安全的。
在新增的Concurrent包中,BlockingQueue很好的解決了多線程中,如何高效安全「傳輸」數據的問題。經過這些高效而且線程安全的隊列類,爲咱們快速搭建高質量的多線程程序帶來極大的便利。

下表顯示了jdk1.5中的阻塞隊列的操做:

  add 增長一個元索 若是隊列已滿,則拋出一個IIIegaISlabEepeplian異常
  remove 移除並返回隊列頭部的元素 若是隊列爲空,則拋出一個NoSuchElementException異常
  element 返回隊列頭部的元素 若是隊列爲空,則拋出一個NoSuchElementException異常
  offer 添加一個元素並返回true 若是隊列已滿,則返回false
  poll 移除並返問隊列頭部的元素 若是隊列爲空,則返回null
  peek 返回隊列頭部的元素 若是隊列爲空,則返回null
  put 添加一個元素 若是隊列滿,則阻塞
  take 移除並返回隊列頭部的元素 若是隊列爲空,則阻塞

阻塞隊列操做:
aad、remove和element操做在你試圖爲一個已滿的隊列增長元素或從空隊列取得元素時 拋出異常
offer、poll、peek方法。這些方法在沒法完成任務時,只是給出一個出錯提示而不會拋出異常
阻塞操做put和take。put方法在隊列滿時阻塞,take方法在隊列空時阻塞。

ArrayBlockingQueue

ArrayBlockingQueue是一個有邊界的阻塞隊列,它的內部實現是一個數組。有邊界的意思是它的容量是有限的,咱們必須在其初始化的時候指定它的容量大小,容量大小一旦指定就不可改變。
ArrayBlockingQueue是以先進先出的方式存儲數據,最新插入的對象是尾部,最新移出的對象是頭部

LinkedBlockingQueue

LinkedBlockingQueue阻塞隊列大小的配置是可選的,若是咱們初始化時指定一個大小,它就是有邊界的,若是不指定,它就是無邊界的。說是無邊界,實際上是採用了默認大小爲Integer.MAX_VALUE的容量 。它的內部實現是一個鏈表。
和ArrayBlockingQueue同樣,LinkedBlockingQueue 也是以先進先出的方式存儲數據,最新插入的對象是尾部,最新移出的對象是頭部。

PriorityBlockingQueue

PriorityBlockingQueue是一個沒有邊界的隊列,它的排序規則和 java.util.PriorityQueue同樣。須要注意,PriorityBlockingQueue中容許插入null對象。
全部插入PriorityBlockingQueue的對象必須實現 java.lang.Comparable接口,隊列優先級的排序規則就是按照咱們對這個接口的實現來定義的。
另外,咱們能夠從PriorityBlockingQueue得到一個迭代器Iterator,但這個迭代器並不保證按照優先級順序進行迭代。

SynchronousQueue

SynchronousQueue隊列內部僅容許容納一個元素。當一個線程插入一個元素後會被阻塞,除非這個元素被另外一個線程消費

DelayQueue

(基於PriorityQueue來實現的)是一個存放Delayed 元素的無界阻塞隊列,只有在延遲期滿時才能從中提取元素。該隊列的頭部是延遲期滿後保存時間最長的 Delayed 元素。若是延遲都尚未期滿,則隊列沒有頭部,而且poll將返回null。當一個元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一個小於或等於零的值時,則出現期滿,poll就以移除這個元素了。此隊列不容許使用 null 元素。

使用BlockingQueue模擬生產者與消費者

  • 生產者
public class ProducerThread implements Runnable {
    private BlockingQueue<String> queue;
    private AtomicInteger count = new AtomicInteger();
    private volatile boolean FLAG = true;

    public ProducerThread(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "生產者開始啓動....");
        while (FLAG) {
            String data = count.incrementAndGet() + "";
            try {
                boolean offer = queue.offer(data, 2, TimeUnit.SECONDS);
                if (offer) {
                    System.out.println(Thread.currentThread().getName() + ",生產隊列" + data + "成功..");
                } else {
                    System.out.println(Thread.currentThread().getName() + ",生產隊列" + data + "失敗..");
                }
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(Thread.currentThread().getName() + ",生產者線程中止...");
    }

    public void stop() {
        this.FLAG = false;
    }

}
  • 消費者
public class ConsumerThread implements Runnable {
    private volatile boolean FLAG = true;
    private BlockingQueue<String> blockingQueue;

    public ConsumerThread(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "消費者開始啓動....");
        while (FLAG) {
            try {
                String data = blockingQueue.poll(2, TimeUnit.SECONDS);
                if (data == null || data == "") {
                    FLAG = false;
                    System.out.println("消費者超過2秒時間未獲取到消息.");
                    return;
                }
                System.out.println("消費者獲取到隊列信息成功,data:" + data);

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

}
  • 請求
@RequestMapping("test-blockingQueue")
    public void testBlockingQueue() {
        LinkedBlockingDeque<String> blockingDeque = new LinkedBlockingDeque<>(1);
        ProducerThread producerThread = new ProducerThread(blockingDeque);
        ConsumerThread consumerThread = new ConsumerThread(blockingDeque);
        Thread t1 = new Thread(producerThread, "生產者");
        Thread t2 = new Thread(consumerThread, "消費者");
        t1.start();
        t2.start();

        // 10秒後中止線程
        try {
            Thread.sleep(10 * 1000);
            producerThread.stop();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
  • 結果

消費者消費者開始啓動....生產者生產者開始啓動....生產者,生產隊列1成功..消費者獲取到隊列信息成功,data:1生產者,生產隊列2成功..消費者獲取到隊列信息成功,data:2生產者,生產隊列3成功..消費者獲取到隊列信息成功,data:3生產者,生產隊列4成功..消費者獲取到隊列信息成功,data:4生產者,生產隊列5成功..消費者獲取到隊列信息成功,data:5生產者,生產隊列6成功..消費者獲取到隊列信息成功,data:6生產者,生產隊列7成功..消費者獲取到隊列信息成功,data:7生產者,生產隊列8成功..消費者獲取到隊列信息成功,data:8生產者,生產隊列9成功..消費者獲取到隊列信息成功,data:9生產者,生產隊列10成功..消費者獲取到隊列信息成功,data:10生產者,生產者線程中止...消費者超過2秒時間未獲取到消息.

相關文章
相關標籤/搜索