Java併發編程原理與實戰三十五:併發容器ConcurrentLinkedQueue原理與使用

1、簡介

一個基於連接節點的無界線程安全隊列。此隊列按照 FIFO(先進先出)原則對元素進行排序。隊列的頭部 是隊列中時間最長的元素。隊列的尾部 是隊列中時間最短的元素。
新的元素插入到隊列的尾部,隊列獲取操做從隊列頭部得到元素。當多個線程共享訪問一個公共 collection 時,ConcurrentLinkedQueue 是一個恰當的選擇。此隊列不容許使用 null 元素。

offer和poll

offer(E e) 
          將指定元素插入此隊列的尾部。html

poll() 
          獲取並移除此隊列的頭,若是此隊列爲空,則返回 nulljava

複製代碼
public static void main(String[] args) {
        ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
        queue.offer("哈哈哈");
        System.out.println("offer後,隊列是否空?" + queue.isEmpty());
        System.out.println("從隊列中poll:" + queue.poll());
        System.out.println("pool後,隊列是否空?" + queue.isEmpty());
    }
複製代碼

offer是往隊列添加元素,poll是從隊列取出元素而且刪除該元素node

執行結果面試

offer後,隊列是否空?false
從隊列中poll:哈哈哈
pool後,隊列是否空?true

 

ConcurrentLinkedQueue中的add() 和 offer() 徹底同樣,都是往隊列尾部添加元素編程

還有個取元素方法peek

peek() 
          獲取但不移除此隊列的頭;若是此隊列爲空,則返回 nullapi

複製代碼
public static void main(String[] args) {
        ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
        queue.offer("哈哈哈");
        System.out.println("offer後,隊列是否空?" + queue.isEmpty());
        System.out.println("從隊列中peek:" + queue.peek());
        System.out.println("從隊列中peek:" + queue.peek());
        System.out.println("從隊列中peek:" + queue.peek());
        System.out.println("pool後,隊列是否空?" + queue.isEmpty());
    }
複製代碼

執行結果:數組

offer後,隊列是否空?false
從隊列中peek:哈哈哈
從隊列中peek:哈哈哈
從隊列中peek:哈哈哈
pool後,隊列是否空?false

remove

remove(Object o) 
          從隊列中移除指定元素的單個實例(若是存在)安全

複製代碼
public static void main(String[] args) {
        ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
        queue.offer("哈哈哈");
        System.out.println("offer後,隊列是否空?" + queue.isEmpty());
        System.out.println("從隊列中remove已存在元素 :" + queue.remove("哈哈哈"));
        System.out.println("從隊列中remove不存在元素:" + queue.remove("123"));
        System.out.println("remove後,隊列是否空?" + queue.isEmpty());
    }
複製代碼

remove一個已存在元素,會返回true,remove不存在元素,返回false多線程

執行結果:併發

offer後,隊列是否空?false
從隊列中remove已存在元素 :true
從隊列中remove不存在元素:false
remove後,隊列是否空?true

size or isEmpty

size() 
          返回此隊列中的元素數量

注意:

若是此隊列包含的元素數大於 Integer.MAX_VALUE,則返回 Integer.MAX_VALUE。
須要當心的是,與大多數 collection 不一樣,此方法不是 一個固定時間操做。因爲這些隊列的異步特性,肯定當前的元素數須要進行一次花費 O(n) 時間的遍歷。
因此在須要判斷隊列是否爲空時,儘可能不要用 queue.size()>0,而是用 !queue.isEmpty()

比較size()和isEmpty() 效率的示例:

場景:10000我的去飯店吃飯,10張桌子供飯,分別比較size() 和 isEmpty() 的耗時

複製代碼
public class Test01ConcurrentLinkedQueue {
    public static void main(String[] args) throws InterruptedException {
        int peopleNum = 10000;//吃飯人數
        int tableNum = 10;//飯桌數量

        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
        CountDownLatch count = new CountDownLatch(tableNum);//計數器

        //將吃飯人數放入隊列(吃飯的人進行排隊)
        for(int i=1;i<=peopleNum;i++){
            queue.offer("消費者_" + i);
        }
        //執行10個線程從隊列取出元素(10個桌子開始供飯)
        System.out.println("-----------------------------------開飯了-----------------------------------");
        long start = System.currentTimeMillis();
        ExecutorService executorService = Executors.newFixedThreadPool(tableNum);
        for(int i=0;i<tableNum;i++) {
            executorService.submit(new Dinner("00" + (i+1), queue, count));
        }
        //計數器等待,知道隊列爲空(全部人吃完)
        count.await();
        long time = System.currentTimeMillis() - start;
        System.out.println("-----------------------------------全部人已經吃完-----------------------------------");
        System.out.println("共耗時:" + time);
        //中止線程池
        executorService.shutdown();
    }

    private static class Dinner implements Runnable{
        private String name;
        private ConcurrentLinkedQueue<String> queue;
        private CountDownLatch count;

        public Dinner(String name, ConcurrentLinkedQueue<String> queue, CountDownLatch count) {
            this.name = name;
            this.queue = queue;
            this.count = count;
        }

        @Override
        public void run() {
            //while (queue.size() > 0){
            while (!queue.isEmpty()){
                //從隊列取出一個元素 排隊的人少一個
                System.out.println("【" +queue.poll() + "】----已吃完..., 飯桌編號:" + name);
            }
            count.countDown();//計數器-1
        }
    }
}
複製代碼

執行結果:

使用size耗時:757ms

使用isEmpty耗時:210

當數據量越大,這種耗時差距越明顯。因此這種判斷用isEmpty 更加合理

contains

contains(Object o) 
          若是此隊列包含指定元素,則返回 true

public static void main(String[] args) throws InterruptedException {
        ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
        queue.offer("123");
        System.out.println(queue.contains("123"));
        System.out.println(queue.contains("234"));
    }

執行結果:

toArray

toArray() 
          返回以恰當順序包含此隊列全部元素的數組

toArray(T[] a) 
          返回以恰當順序包含此隊列全部元素的數組;返回數組的運行時類型是指定數組的運行時類型

複製代碼
public static void main(String[] args) throws InterruptedException {
        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();
        queue.offer("123");
        queue.offer("234");
        Object[] objects = queue.toArray();
        System.out.println(objects[0] + ", " + objects[1]);

        //將數據存儲到指定數組
        String[] strs = new String[2];
        queue.toArray(strs);
        System.out.println(strs[0] + ", " + strs[1]);
    }
複製代碼

執行結果:

iterator

iterator() 
          返回在此隊列元素上以恰當順序進行迭代的迭代器

複製代碼
public static void main(String[] args) throws InterruptedException {
        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();
        queue.offer("123");
        queue.offer("234");
        Iterator<String> iterator = queue.iterator();
        while (iterator.hasNext()){
            System.out.println(iterator.next());
        }
    }
複製代碼

ConcurrentLinkedQueue文檔說明:

構造方法摘要
ConcurrentLinkedQueue() 
          建立一個最初爲空的 ConcurrentLinkedQueue
ConcurrentLinkedQueue(Collection<? extends E> c) 
          建立一個最初包含給定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 迭代器的遍歷順序來添加元素。

 

方法摘要
 boolean add(E e) 
          將指定元素插入此隊列的尾部。
 boolean contains(Object o) 
          若是此隊列包含指定元素,則返回 true
 boolean isEmpty() 
          若是此隊列不包含任何元素,則返回 true
 Iterator<E> iterator() 
          返回在此隊列元素上以恰當順序進行迭代的迭代器。
 boolean offer(E e) 
          將指定元素插入此隊列的尾部。
 E peek() 
          獲取但不移除此隊列的頭;若是此隊列爲空,則返回 null
 E poll() 
          獲取並移除此隊列的頭,若是此隊列爲空,則返回 null
 boolean remove(Object o) 
          從隊列中移除指定元素的單個實例(若是存在)。
 int size() 
          返回此隊列中的元素數量。
 Object[] toArray() 
          返回以恰當順序包含此隊列全部元素的數組。
<T> T[]
toArray(T[] a) 
          返回以恰當順序包含此隊列全部元素的數組;返回數組的運行時類型是指定數組的運行時類型。

 

2、源代碼解析

offer操做是在鏈表末尾添加一個元素,下面看看實現原理。

public boolean offer(E e) {
    //e爲null則拋出空指針異常
    checkNotNull(e);
 
   //構造Node節點構造函數內部調用unsafe.putObject,後面統一講
    final Node<E> newNode = new Node<E>(e);
 
 
    //從尾節點插入
    for (Node<E> t = tail, p = t;;) {
 
        Node<E> q = p.next;
 
        //若是q=null說明p是尾節點則插入
        if (q == null) {
 
            //cas插入(1)
            if (p.casNext(null, newNode)) {
                //cas成功說明新增節點已經被放入鏈表,而後設置當前尾節點(包含head,1,3,5.。。個節點爲尾節點)
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        else if (p == q)//(2)
            //多線程操做時候,因爲poll時候會把老的head變爲自引用,而後head的next變爲新head,因此這裏須要
            //從新找新的head,由於新的head後面的節點纔是激活的節點
            p = (t != (t = tail)) ? t : head;
        else
            // 尋找尾節點(3)
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

從構造函數知道一開始有個item爲null的哨兵節點,而且head和tail都是指向這個節點,而後當一個線程調用offer時候首先

如圖首先查找尾節點,q==null,p就是尾節點,因此執行p.casNext經過cas設置p的next爲新增節點,這時候p==t因此不從新設置尾節點爲當前新節點。因爲多線程能夠調用offer方法,因此可能兩個線程同時執行到了(1)進行cas,那麼只有一個會成功(假如線程1成功了),成功後的鏈表爲:

失敗的線程會循環一次這時候指針爲:

這時候會執行(3)因此p=q,而後在循環後指針位置爲:

因此沒有其餘線程干擾的狀況下會執行(1)執行cas把新增節點插入到尾部,沒有干擾的狀況下線程2 cas會成功,而後去更新尾節點tail,因爲p!=t因此更新。這時候鏈表和指針爲:

假如線程2cas時候線程3也在執行,那麼線程3會失敗,循環一次後,線程3的節點狀態爲:

這時候p!=t ;而且t的原始值爲told,t的新值爲tnew ,因此told!=tnew,因此 p=tnew=tail;

而後在循環一下後節點狀態:

q==null因此執行(1)。

如今就差p==q這個分支尚未走,這個要在執行poll操做後纔會出現這個狀況。poll後會存在下面的狀態

這個時候添加元素時候指針分佈爲:

因此會執行(2)分支 結果 p=head
而後循環,循環後指針分佈:

因此執行(1),而後p!=t因此設置tail節點。如今分佈圖:

自引用的節點會被垃圾回收掉。

本節引自:http://www.importnew.com/25668.html ,能夠參考此文。

3、concurrentLinkedQueue的特性

一、應用場景
按照適用的併發強度從低到高排列以下:
LinkedList/ArrayList   非線程安全,不能用於併發場景(List的方法支持棧和隊列的操做,所以能夠用List封裝成stack和queue)
Collections.synchronizedList   使用wrapper class封裝,每一個方法都用synchronized(mutex:Object)作了同步
LinkedBlockingQueue   採用了鎖分離的設計,避免了讀/寫操做衝突,且自動負載均衡,能夠有界。BlockingQueue在生產-消費模式下首選【Iterator安全,不保證數據一致性】
ConcurrentLinkedQueue  適用於高併發讀寫操做,理論上有最高的吞吐量,無界,不保證數據訪問實時一致性,Iterator不拋出併發修改異常,採用CAS機制實現無鎖訪問。
綜上:
在併發的場景下,若是併發強度較小,性能要求不苛刻,且鎖可控的場景下,可以使用Collections.synchronizedList,既保證了數據一致又保證了線程安全,性可以用;
在大部分高併發場景下,建議使用 LinkedBlockingQueue ,性能與 ConcurrentLinkedQueue 接近,且能保證數據一致性;
ConcurrentLinkedQueue 適用於超高併發的場景,可是須要針對數據不一致採起一些措施。

二、特色
2.1 訪問操做採用了無鎖設計
2.2 Iterator的弱一致性,即不保證Iteartor訪問數據的實時一致性(與current組的成員與COW成員相似)
2.3 併發poll
2.4 併發add
2.5 poll/add併發

三、注意事項
3.1 size操做不是一個固定時長的操做(not a constant-time operation)
由於size須要遍歷整個queue,若是此時queue正在被修改,size可能返回不許確的數值(仍然是沒法保證數據一致性),就像concurrentHashMap同樣,
要獲取size,須要取得全部的bucket的鎖,這是一個很是耗時的操做。所以若是須要保證數據一致性,頻繁獲取集合對象的size,最好不使用concurrent
族的成員。

3.2 批量操做(bulk operations like addAll,removeAll,equals)沒法保證原子性,由於不保證明時性,且沒有使用獨佔鎖的設計。
例如,在執行addAll的同時,有另一個線程經過Iterator在遍歷,則遍歷的線程可能只看到一部分新增的數據。

3.3 ConcurrentLinkedQueue 沒有實現BlockingQueue接口
當隊列爲空時,take方法返回null,此時consumer會須要處理這個狀況,consumer會循環調用take來保證及時獲取數據,此爲busy waiting,會持續消耗CPU資源。

四、與 LinkedBlockingQueue 的對比
LinkedBlockingQueue 採用了鎖分離的設計,put、get鎖分離,保證兩種操做的併發,但同一種操做,而後是鎖控制的。而且當隊列爲空/滿時,某種操做
會被掛起。

4.1 併發性能
4.1.1 高併發put操做
可支持高併發場景下,多線程無鎖put操做
4.1.2 高併發的put/poll操做
多線程場景,同時put,遍歷,以及poll,都可無鎖操做。但不保證遍歷的實時一致性。

4.2 數據的實時一致性
二者的Iterator都不不保證數據一致性,Iterator遍歷的是Iterator建立時已存在的節點,建立後的修改不保證能反應出來。
參考 LinkedBlockingQueue 的java doc關於Iterator的解釋:
The returned iterator is a "weakly consistent" iterator that will never throw ConcurrentModificationException, and guarantees to traverse elements as they existed upon construction of the iterator, and may (but is not guaranteed to) reflect any modifications subsequent to construction.

4.3 遍歷操做(Iterator的遍歷操做的差別)
目前看來,沒有差別

4.4 size操做
LinkedBlockingQueue 的size是在內部用一個AtomicInteger保存,執行size操做直接獲取此原子量的當前值,時間複雜度O(1)。
ConcurrentLinkedQueue 的size操做須要遍歷(traverse the queue),所以比較耗時,時間複雜度至少爲O(n),建議使用isEmpty()。
The java doc says the size() method is typically not very useful in concurrent applications.

5.LinkedBlockingQueue和ConcurrentLinkedQueue適用場景

適用阻塞隊列的好處:多線程操做共同的隊列時不須要額外的同步,另外就是隊列會自動平衡負載,即那邊(生產與消費兩邊)處理快了就會被阻塞掉,從而減小兩邊的處理速度差距。
當許多線程共享訪問一個公共 collection 時,ConcurrentLinkedQueue 是一個恰當的選擇。
LinkedBlockingQueue 多用於任務隊列
ConcurrentLinkedQueue  多用於消息隊列
多個生產者,對於LBQ性能還算能夠接受;可是多個消費者就不行了mainLoop須要一個timeout的機制,不然空轉,cpu會飆升的。LBQ正好提供了timeout的接口,更方便使用
若是CLQ,那麼我須要收處處理sleep
單生產者,單消費者  用 LinkedBlockingqueue
多生產者,單消費者   用 LinkedBlockingqueue
單生產者 ,多消費者   用 ConcurrentLinkedQueue
多生產者 ,多消費者   用 ConcurrentLinkedQueue
java併發面試常識之ConcurrentLinkedQueue :https://www.imooc.com/article/details/id/26439 

阻塞隊列:線程安全

按 FIFO(先進先出)排序元素。隊列的頭部 是在隊列中時間最長的元素。隊列的尾部 是在隊列中時間最短的元素。新元素插入到隊列的尾部,而且隊列檢索操做會得到位於隊列頭部的元素。連接隊列的吞吐量一般要高於基於數組的隊列,可是在大多數併發應用程序中,其可預知的性能要低。

注意:

一、必需要使用take()方法在獲取的時候達成阻塞結果
二、使用poll()方法將產生非阻塞效果

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

 

public class BlockingDeque {
    //阻塞隊列,FIFO
    private static LinkedBlockingQueue<Integer> concurrentLinkedQueue = new LinkedBlockingQueue<Integer>(); 

          
 public static void main(String[] args) {  
     ExecutorService executorService = Executors.newFixedThreadPool(2);  

     executorService.submit(new Producer("producer1"));  
     executorService.submit(new Producer("producer2"));  
     executorService.submit(new Producer("producer3"));  
     executorService.submit(new Consumer("consumer1"));  
     executorService.submit(new Consumer("consumer2"));  
     executorService.submit(new Consumer("consumer3"));  

 }  

 static class Producer implements Runnable {  
     private String name;  

     public Producer(String name) {  
         this.name = name;  
     }  

     public void run() {  
         for (int i = 1; i < 10; ++i) {  
             System.out.println(name+ "  生產: " + i);  
             //concurrentLinkedQueue.add(i);  
             try {
                concurrentLinkedQueue.put(i);
                Thread.sleep(200); //模擬慢速的生產,產生阻塞的效果
            } catch (InterruptedException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
             
         }  
     }  
 }  

 static class Consumer implements Runnable {  
     private String name;  

     public Consumer(String name) {  
         this.name = name;  
     }  
     public void run() {  
         for (int i = 1; i < 10; ++i) {  
             try {          
                    //必需要使用take()方法在獲取的時候阻塞
                      System.out.println(name+"消費: " +  concurrentLinkedQueue.take());  
                      //使用poll()方法 將產生非阻塞效果
                      //System.out.println(name+"消費: " +  concurrentLinkedQueue.poll());  
                     
                     //還有一個超時的用法,隊列空時,指定阻塞時間後返回,不會一直阻塞
                     //但有一個疑問,既然能夠不阻塞,爲啥還叫阻塞隊列?
                    //System.out.println(name+" Consumer " +  concurrentLinkedQueue.poll(300, TimeUnit.MILLISECONDS));                    
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }  

         }  
     }  
 }  
}

非阻塞隊列

基於連接節點的、無界的、線程安全。此隊列按照 FIFO(先進先出)原則對元素進行排序。隊列的頭部 是隊列中時間最長的元素。隊列的尾部 是隊列中時間最短的元素。新的元素插入到隊列的尾部,隊列檢索操做從隊列頭部得到元素。當許多線程共享訪問一個公共 collection 時,ConcurrentLinkedQueue 是一個恰當的選擇。此隊列不容許 null 元素。

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;


public class NoBlockQueue {  
       private static ConcurrentLinkedQueue<Integer> concurrentLinkedQueue = new ConcurrentLinkedQueue<Integer>();   
          
    public static void main(String[] args) {  
        ExecutorService executorService = Executors.newFixedThreadPool(2);  

        executorService.submit(new Producer("producer1"));  
        executorService.submit(new Producer("producer2"));  
        executorService.submit(new Producer("producer3"));  
        executorService.submit(new Consumer("consumer1"));  
        executorService.submit(new Consumer("consumer2"));  
        executorService.submit(new Consumer("consumer3"));  

    }  
  
    static class Producer implements Runnable {  
        private String name;  
  
        public Producer(String name) {  
            this.name = name;  
        }  
  
        public void run() {  
            for (int i = 1; i < 10; ++i) {  
                System.out.println(name+ " start producer " + i);  
                concurrentLinkedQueue.add(i);  
                try {
                    Thread.sleep(20);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                //System.out.println(name+"end producer " + i);  
            }  
        }  
    }  
  
    static class Consumer implements Runnable {  
        private String name;  
  
        public Consumer(String name) {  
            this.name = name;  
        }  
        public void run() {  
            for (int i = 1; i < 10; ++i) {  
                try {
 
                    System.out.println(name+" Consumer " +  concurrentLinkedQueue.poll());

                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }  
//                System.out.println();  
//                System.out.println(name+" end Consumer " + i);  
            }  
        }  
    }  
} 

在併發編程中,通常推薦使用阻塞隊列,這樣實現能夠儘可能地避免程序出現意外的錯誤。阻塞隊列使用最經典的場景就是socket客戶端數據的讀取和解析,讀取數據的線程不斷將數據放入隊列,而後解析線程不斷從隊列取數據解析。還有其餘相似的場景,只要符合生產者-消費者模型的均可以使用阻塞隊列。

使用非阻塞隊列,雖然能即時返回結果(消費結果),但必須自行編碼解決返回爲空的狀況處理(以及消費重試等問題)。

另外他們都是線程安全的,不用考慮線程同步問題。

相關文章
相關標籤/搜索