併發編程之併發隊列

1、併發隊列

在併發隊列上JDK提供了兩套實現,java

一個是以ConcurrentLinkedQueue爲表明的高性能隊列非阻塞,數組

一個是以BlockingQueue接口爲表明的阻塞隊列,不管哪一種都繼承自Queue。安全

一、阻塞隊列與非阻塞隊

阻塞隊列與普通隊列的區別在於:併發

阻塞隊列:ide

  • 當隊列是空的時,從隊列中獲取元素的操做將會被阻塞,試圖從空的阻塞隊列中獲取元素的線程將會被阻塞,直到其餘的線程往空的隊列插入新的元素;
  • 當隊列是滿時,往隊列裏添加元素的操做會被阻塞。試圖往已滿的阻塞隊列中添加新元素的線程一樣也會被阻塞,直到其餘的線程使隊列從新變得空閒起來,如從隊列中移除一個或者多個元素,或者徹底清空隊列.

二、ConcurrentLinkedQeque

ConcurrentLinkedQueue : 是一個適用於高併發場景下的隊列,經過無鎖的方式,實現了高併發狀態下的高性能,一般ConcurrentLinkedQueue性能好於BlockingQueue.它是一個基於連接節點的無界線程安全隊列。該隊列的元素遵循先進先出的原則。頭是最早加入的,尾是最近加入的,該隊列不容許null元素。高併發

// 非阻塞式隊列,無界隊列
ConcurrentLinkedDeque q = new ConcurrentLinkedDeque();
    q.offer("張三");
    q.offer("李四");
    q.offer("王五");
    //從頭獲取元素,刪除該元素
    System.out.println(q.poll());
    //從頭獲取元素,不刪除該元素
    System.out.println(q.peek());
    //獲取總長度
    System.out.println(q.size());複製代碼

三、BlockingQueue

阻塞隊列(BlockingQueue)是一個支持兩個附加操做的隊列。這兩個附加的操做是:性能

  • 在隊列爲空時,獲取元素的線程會等待隊列變爲非空。
  • 當隊列滿時,存儲元素的線程會等待隊列可用。

在Java中,BlockingQueue的接口位於java.util.concurrent 包中(在Java5版本開始提供),由上面介紹的阻塞隊列的特性可知,阻塞隊列是線程安全的。this

1)、ArrayBlockingQueue

ArrayBlockingQueue是一個有邊界的阻塞隊列,它的內部實現是一個數組。有邊界的意思是它的容量是有限的,咱們必須在其初始化的時候指定它的容量大小,容量大小一旦指定就不可改變。spa

ArrayBlockingQueue是以先進先出的方式存儲數據,最新插入的對象是尾部,最新移出的對象是頭部。下面是一個初始化和使用ArrayBlockingQueue的例子:線程

<String> arrays = new ArrayBlockingQueue<String>(3);
    arrays.offer("張三");
     arrays.offer("李四");
    arrays.offer("王五");
    arrays.offer("666", 3, TimeUnit.SECONDS); // 隊列滿了,阻塞3秒後向下執行
    System.out.println(arrays.poll()); // 張三
    System.out.println(arrays.poll()); // 李四
    System.out.println(arrays.poll()); // 王五
    System.out.println(arrays.poll(3, TimeUnit.SECONDS)); //隊列爲空,阻塞3秒後結束複製代碼

2)、LinkedBlockingQueue

LinkedBlockingQueue阻塞隊列大小的配置是可選的,若是咱們初始化時指定一個大小,它就是有邊界的,若是不指定,它就是無邊界的。說是無邊界,實際上是採用了默認大小爲Integer.MAX_VALUE的容量 。它的內部實現是一個鏈表。

和ArrayBlockingQueue同樣,LinkedBlockingQueue 也是以先進先出的方式存儲數據,最新插入的對象是尾部,最新移出的對象是頭部。下面是一個初始化和使LinkedBlockingQueue的例子:

LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(3);
linkedBlockingQueue.add("張三");
linkedBlockingQueue.add("李四");
linkedBlockingQueue.add("李四");
System.out.println(linkedBlockingQueue.size()); // 3複製代碼

3)、PriorityBlockingQueue(有界,快滿時自動擴容,看似無界)

PriorityBlockingQueue是一個沒有邊界的隊列,它的排序規則和 java.util.PriorityQueue同樣。須要注意,PriorityBlockingQueue中容許插入null對象。

全部插入PriorityBlockingQueue的對象必須實現 java.lang.Comparable接口,隊列優先級的排序規則就是按照咱們對這個接口的實現來定義的。

另外,咱們能夠從PriorityBlockingQueue得到一個迭代器Iterator,但這個迭代器並不保證按照優先級順序進行迭代。

4)、SynchronousQueue

SynchronousQueue隊列內部僅容許容納一個元素。當一個線程插入一個元素後會被阻塞,除非這個元素被另外一個線程消費。

5)、使用BlockingQueue模擬生產者與消費者

class ProducerThread implements Runnable {
    private BlockingQueue<String> blockingQueue;
    private AtomicInteger count = new AtomicInteger();
    private volatile boolean FLAG = true;

    public ProducerThread(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "生產者開始啓動....");
        while (FLAG) {
            String data = count.incrementAndGet() + "";
            try {
                boolean offer = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
                if (offer) {
                    System.out.println(Thread.currentThread().getName() + ",生產隊列" + data + "成功..");
                } else {
                    System.out.println(Thread.currentThread().getName() + ",生產隊列" + data + "失敗..");
                }
                Thread.sleep(1000);
            } catch (Exception e) {

            }
        }
        System.out.println(Thread.currentThread().getName() + ",生產者線程中止...");
    }

    public void stop() {
        this.FLAG = false;
    }

}

class ConsumerThread implements Runnable {
    private volatile boolean FLAG = true;
    private BlockingQueue<String> blockingQueue;

    public ConsumerThread(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "消費者開始啓動....");
        while (FLAG) {
            try {
                String data = blockingQueue.poll(2, TimeUnit.SECONDS);
                if (data == null || data == "") {
                    FLAG = false;
                    System.out.println("消費者超過2秒時間未獲取到消息.");
                    return;
                }
                System.out.println("消費者獲取到隊列信息成功,data:" + data);

            } catch (Exception e) {
                // TODO: handle exception
            }
        }
    }

}

public class Test0008 {

    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(3);
        ProducerThread producerThread = new ProducerThread(blockingQueue);
        ConsumerThread consumerThread = new ConsumerThread(blockingQueue);
        Thread t1 = new Thread(producerThread);
        Thread t2 = new Thread(consumerThread);
        t1.start();
        t2.start();
        //10秒後 中止線程..
        try {
            Thread.sleep(10*1000);
            producerThread.stop();
        } catch (Exception e) {
            // TODO: handle exception
        }
    }

}
複製代碼

  1. ArrayDeque, (數組雙端隊列) 
  2. PriorityQueue, (優先級隊列) 
  3. ConcurrentLinkedQueue, (基於鏈表的併發隊列) 
  4. DelayQueue, (延期阻塞隊列)(阻塞隊列實現了BlockingQueue接口) 
  5. ArrayBlockingQueue, 經常使用(基於數組的併發阻塞隊列) 
  6. LinkedBlockingQueue, 經常使用(基於鏈表的FIFO阻塞隊列) 
  7. LinkedBlockingDeque, (基於鏈表的FIFO雙端阻塞隊列) 
  8. PriorityBlockingQueue,經常使用 (帶優先級的無界阻塞隊列,) 
  9. SynchronousQueue經常使用 (併發同步阻塞隊列)

本文由博客一文多發平臺 OpenWrite 發佈!

相關文章
相關標籤/搜索