BlockingQueue的幾個實現分析

ArrayBlockingQueue

 底層以數組的結構存放隊列元素,容量大小不可改變。node

先看下變量:數組

items:數組,用於存放隊列中的元素併發

takeIndex:獲取元素的索引位置less

putIndex:存放元素的索引位置this

count:隊列中當前元素數量spa

lock:控制隊列進出的鎖,ArrayBlockingQueue中進出共用一把鎖,LinkedBlockingQueue中進出是兩把分開的鎖線程

notEmpty:從隊列獲取元素時的等待條件,也就是隊列空時,獲取元素的線程會阻塞,直到有線程放元素進來,notEmpty.signal喚醒code

notFull:往隊列裏放元素時的等待條件,也就是隊列滿時,存放元素的線程會阻塞,直到有線程從隊列裏取元素並調用notFull.signal喚醒對象

下面看下代碼更直觀些:blog

put

1.先得到鎖

2.循環判斷當前元素數量是否等於數組長度,也就是隊列的容量,等於的話就掛起當前線程,直到notFull.signal

3.插入元素到隊列中,代碼以下:

putIndex爲下一次入隊的索引位置,先把元素入隊,而後+1後判斷是否等於數組長度,若是等於就置位0,從0開始

最後count+1並notEmpty.signal喚醒由於隊列爲空致使獲取元素阻塞的線程。

take

1.獲取鎖,注意,是和put同一把鎖,也就是同時只能有一個線程得到操做隊列的權限,不管是入隊仍是出隊。

2.循環判斷若是此時隊空,就掛起當前線程,等待notEmpty.signal

3.出隊:

獲取元素並將數組中相應位置置空,設置下次入隊索引並count-1,notFull.signal喚醒由於隊滿而阻塞的線程並返回出隊元素

LinkedBlockingQueue

 底層以單向鏈表結構存放隊列元素,put和take分別採用兩把不一樣的鎖,也就是讀寫是併發運行的,

那麼怎樣控制併發環境下的出隊和入隊呢?答案就是使用AtomicInteger原子操做當前隊列中元素數量。

下面看下有哪些實例變量:

capacity:隊列容量,能夠在構造方法中設置容量,若是不設置的話,默認爲Integer.MAX_VALUE

count:當前隊列中的元素數量,AtomicInteger類型,由於這裏的put和take是兩把鎖,也就是會併發修改count,因此這裏採用原子操做。

head:隊頭

last:隊尾

takeLock:出隊鎖

notEmpty:出隊時,若是隊空,就等待notEmpty.signal

putLock:入隊鎖

notFull:入隊時,若是隊滿,就等待notFull.signal

put

public void put(E e) throws InterruptedException {
     // 由於在出隊時若是隊空,有些方法是非阻塞或者等待一段時間後返回null的,好比poll
     // 因此禁止往隊列中入隊null
if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1;
     // 將入隊元素包裝進鏈表的Node對象中 Node
<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count;
     // 獲取入隊鎖 putLock.lockInterruptibly();
try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */
       // 由於count加操做都在putLock中,因此這裏能夠是==判斷
while (count.get() == capacity) { notFull.await(); }
       // 入隊 enqueue(node);
       // 獲取原來count值並原子加一 c
= count.getAndIncrement();
       // 若是隊列未滿,通知阻塞線程繼續入隊
if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); }
     // 若是隊列以前是隊空狀態,說明以前有可能存在take元素的線程由於隊空而阻塞,這裏通知takeLock喚醒等待線程
if (c == 0) signalNotEmpty(); }

很簡單,就是將剛剛的node連接到隊尾

take

1.先得到takeLock

2.若是隊空,掛起等待

3.出隊

4.獲取count值並減一

5.若是隊列中還有元素,notEmpty喚醒以前由於隊空阻塞的線程

6.若是take以前狀態爲隊滿,說明有可能存在由於隊滿而致使入隊阻塞的線程,

喚醒他們(其實這裏signal只喚醒一個,在被喚醒的線程完成入隊後判斷若是還有空間就繼續signal,是這樣一連串的動做)

出隊邏輯如上,保持head的item爲null,這裏須要說明的是h.next = h自身循環引用也是會被GC的

SynchronousQueue

 SynchronousQueue自己不存儲元素,put元素時必須同步等待元素被取走。

PriorityBlockingQueue

 具備優先級的隊列

DelayQueue

 具備延時的隊列,延時不結束就不能取數據

相關文章
相關標籤/搜索