Java同步併發容器隊列(非阻塞隊列與阻塞隊列)BlockingQueue家族

BlockingQueue家族(經常使用系列)

 BlockingQueue,顧名思義便是阻塞隊列,意指再讀取和插入操做狀況下可能(注意是可能)會出現阻塞。java

在併發編程中,有時候須要使用線程安全的隊列。若是要實現一個線程安全的隊列有兩種方式:一種是使用阻塞算法,另外一種是使用非阻塞算法。node

1.使用阻塞算法的隊列能夠用一個鎖(入隊和出隊用同一把鎖)或兩個鎖(入隊和出隊用不一樣的鎖)等方式來實現。算法

2.非阻塞的實現方式則可使用循環CAS的方式來實現。編程

1.1 BlockingQueue數組

Java中的阻塞隊列緩存

什麼是阻塞隊列安全

阻塞隊列(BlockingQueue)是一個支持兩個附加操做的隊列。這兩個附加的操做支持阻塞的插入和移除方法, 意指再讀取和插入操做狀況下可能(注意是可能)會出現阻塞。併發

BlockingQueue自己是一個接口,主要定義了關於隊列的各種操做方法,當發生沒法繼續入隊或者無數據能夠讀出的時候,會發生以下圖所示的狀況。其中jvm

Special Value爲true,false,null函數

Blocks的方法會發生阻塞,

Throws Exception列內的方法會拋出異常,

Times Out指超過設定時間則會按照Special Value類型的方法返回true或者false。

1)支持阻塞的插入方法:意思是當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿。
2)支持阻塞的移除方法:意思是在隊列爲空時,獲取元素的線程會等待隊列變爲非空。

1.2阻塞隊列和生產者-消費者模式

阻塞隊列經常使用於生產者和消費者的場景,生產者是向隊列裏添加元素的線程,消費者是從隊列裏取元素的線程。阻塞隊列就是生產者用來存放元素、消費者用來獲取元素的容器。

阻塞隊列(Blocking queue)提供了可阻塞的put和take方法,它們與可定時的offer和poll是等價的。若是Queue已經滿了,put方法會被阻塞直到有空間可用;若是Queue是空的,那麼take方法會被阻塞,直到有元素可用。Queue的長度能夠有限,也能夠無限;無限的Queue永遠不會充滿,因此它的put方法永遠不會阻塞。

阻塞隊列簡化了消費者的編碼,由於take會保持阻塞直到可用數據出現。若是生產者不能足夠快地產生工做,讓消費者忙碌起來,那麼消費者只能一直等待,直到有工做可作。同時,put方法的阻塞特性也大大地簡化了生產者的編碼;若是使用一個有界隊列,那麼當隊列充滿的時候,生產者就會阻塞,暫不能生成更多的工做,從而給消費者時間來趕進進度。

有界隊列是強大的資源管理工具,用來創建可靠的應用程序:它們遏制那些能夠產生過多工做量、具備威脅的活動,從而讓你的程序在面對超負荷工做時更加健壯。

雖然生產者-消費者模式能夠把生產者和消費者的代碼相互解耦合,可是它們的行爲仍是間接地經過共享隊列耦合在一塊兒了

BlockingQueue自己只是一個接口,具體的實現交由其實現類進行定義設計,下面介紹幾個實現類:

  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • SynchronousQueue
  • PriorityBlockingQueue
  • DelayQueue

1.3ArrayBlockingQueue

ArrayBlockingQueue,相信你們看名字就能猜到,該阻塞隊列是基於數組實現的,必須制定大小且不可變,同時使用ReentrantLock來實現併發問題的解決。同時須要注意的是ArrayBlockingQueue只有一把鎖,put和take操做會相互阻塞。咱們看一下其構造函數便可清楚知道

1.4LinkedBlockingQueue

LinkedBlockingQueue和ArrayBlockingQueue十分類似,其底層是藉由鏈表實現。除此以外,還有一個不一樣點,LinkedBlockingQueue擁有兩個鎖,所以put和take的線程能夠同時運行

阻塞隊列LinkedBlockingQueue是一個單向鏈表實現的阻塞隊列,先進先出的順序而且是無界隊列,默認使用int的最大值。繼承AbstractQueue,實現了BlockingQueue,Serializable接口。插入和取出使用不一樣的鎖,putLock插入鎖,takeLock取出鎖,注意不接受null,添加和刪除數據的時候能夠並行。主要做爲固定大小線程池(Executors.newFixedThreadPool())底層所使用的阻塞隊列使用。

寫:

add方法:若是隊列已滿,則拋出異常;

put方法:向隊列尾部添加元素,隊列已滿的時候,阻塞等待。對應讀方法take

offer方法:向隊列尾部添加元素,隊列已滿的時候,直接返回false。

讀:

remove移除並返回隊列頭部的元素,若是隊列已空,則拋出NoSuchElementException異常.

element返回隊列頭部的元素,若是隊列爲空.則拋出NoSuchElementException異常.

poll移除並返回隊列頭部的元素,若是隊列爲空,則返回null;

peek返回隊列頭部的元素,若是隊列爲空,則返回null;

take移除並返回隊列頭部的元素,若是隊列爲空則阻塞;

讀寫操做時用到的鎖:

當執行take、poll等操做時線程須要獲取的鎖

ReentrantLock takeLock = new ReentrantLock();

當執行add、put、offer等操做時線程須要獲取鎖

ReentrantLock putLock = new ReentrantLock();

1.5SynchronousQueue

SynchronousQueue的特色是隻能容納一個元素,同時SynchronousQueue使用了兩種模式來管理元素,一種是使用先進先出的隊列,一種是使用後進先出的棧,使用哪一種模式能夠經過構造函數來指定。

1.6、PriorityBlockingQueue

PriorityBlockingQueue顧名思義,按照優先級排序且無邊界的隊列。插入其中的元素必須實現java.lang.Comparable接口。其排序規則就按此實現。

1.7DelayQueue

DelayQueue即爲延遲隊列,使得插入其中的元素在延遲必定時間後,才能獲取到,插入其中的元素須要實現java.util.concurrent.Delayed接口。該接口須要實現getDelay()和compareTo()方法。getDealy()返回0或者小於0的值時,delayedQueue經過其take()方法就能夠得到此元素。compareTo()方法用於實現內部元素的排序,通常狀況,按元素過時時間的優先級進行排序是比較好的選擇。下面咱們經過一個示例來演示一下DelayQueue的使用

JDK 7提供了7個阻塞隊列,以下。
·ArrayBlockingQueue:一個由數組結構組成的有界阻塞隊列。
·LinkedBlockingQueue:一個由鏈表結構組成的有界int最大值阻塞隊列。
·PriorityBlockingQueue:一個支持優先級排序的無界阻塞隊列。
·DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。
·SynchronousQueue:一個不存儲元素的阻塞隊列。
·LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
·LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。

2.1.ConcurrentLinkedQueue(非阻塞)

 

咱們一塊兒來研究一下如何使用非阻塞的方式來實現線程安全隊列ConcurrentLinkedQueue是一個基於連接節點的無界線程安全隊列,它採用先進先出的規則對節點進行排序,當咱們添加一個元素的時候,它會添加到隊列的尾部;當咱們獲取一個元素時,它會返回隊列頭部的元素。

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>

        implements Queue<E>, java.io.Serializable {

    private transient volatile Node<E> head;//頭指針

    private transient volatile Node<E> tail;//尾指針

    public ConcurrentLinkedQueue() {//初始化,head=tail=(一個空的頭結點)

        head = tail = new Node<E>(null);

    }

    private static class Node<E> {

        volatile E item;

        volatile Node<E> next;//內部是使用單向鏈表實現

        ......

    }

    ......

}

public boolean offer(E e) {

        checkNotNull(e);

final Node<E> newNode = new Node<E>(e);//入隊前,建立一個新節點

   for (Node<E> t = tail, p = t;;) {//除非插入成功並返回,不然反覆循環

            Node<E> q = p.next;

            if (q == null) {

                // p is last node

                if (p.casNext(null, newNode)) {//利用CAS操做,將p的next指針從舊值null更新爲newNode

                    if (p != t) // hop two nodes at a time

                        casTail(t, newNode);  // Failure is OK.利用CAS操做更新tail,若是失敗說明其餘線程添加了元素,由其餘線程負責更新tail

                    return true;

                }

                // Lost CAS race to another thread; re-read next 若是添加元素失敗,說明其餘線程添加了元素,p後移,並繼續嘗試

            }

            else if (p == q) //若是p被移除出鏈表,咱們須要調整指針從新指向head,不然咱們指向新的tail

                p = (t != (t = tail)) ? t : head;

            else

                //p指向tail或者q

                p = (p != t && t != (t = tail)) ? t : q;

        }

    }

casTail(cmp,value)方法用於更新tail節點。tail被設置爲volatile保證可見性。

p.casNext(cmp,value)方法用於將入隊節點設置爲當前隊列尾節點的next節點。value也被設置爲volatile。

對於出隊操做,也是使用CAS的方式循環嘗試將元素從頭部移除。

由於採用CAS操做,容許多個線程併發執行,而且不會由於加鎖而阻塞線程,使得併發性能更好。

關鍵解釋:

原子性與可見性分析之synchronized;volatile

原子性

原子是世界上的最小單位,具備不可分割性。好比 a=0;(a非long和double類型) 這個操做是不可分割的,那麼咱們說這個操做時原子操做。再好比:a++; 這個操做實際是a = a + 1;是可分割的,因此他不是一個原子操做。非原子操做都會存在線程安全問題,須要咱們使用同步技術(sychronized)來讓它變成一個原子操做。

可見性

可見性,是指線程之間的可見性,一個線程修改的狀態對另外一個線程是可見的。也就是一個線程修改的結果。另外一個線程立刻就能看到。好比:用volatile修飾的變量,就會具備可見性。volatile修飾的變量不容許線程內部緩存和重排序,即直接修改內存。因此對其餘線程是可見的。

volatile 本質是在告訴jvm當前變量在寄存器中的值是不肯定的,須要從主存中讀取,

synchronized 則是鎖定當前變量,只有當前線程能夠訪問該變量,其餘線程被阻塞住.

相關文章
相關標籤/搜索