底層以數組的結構存放隊列元素,容量大小不可改變。node
先看下變量:數組
items:數組,用於存放隊列中的元素併發
takeIndex:獲取元素的索引位置less
putIndex:存放元素的索引位置this
count:隊列中當前元素數量spa
lock:控制隊列進出的鎖,ArrayBlockingQueue中進出共用一把鎖,LinkedBlockingQueue中進出是兩把分開的鎖線程
notEmpty:從隊列獲取元素時的等待條件,也就是隊列空時,獲取元素的線程會阻塞,直到有線程放元素進來,notEmpty.signal喚醒code
notFull:往隊列裏放元素時的等待條件,也就是隊列滿時,存放元素的線程會阻塞,直到有線程從隊列裏取元素並調用notFull.signal喚醒對象
下面看下代碼更直觀些:blog
1.先得到鎖
2.循環判斷當前元素數量是否等於數組長度,也就是隊列的容量,等於的話就掛起當前線程,直到notFull.signal
3.插入元素到隊列中,代碼以下:
putIndex爲下一次入隊的索引位置,先把元素入隊,而後+1後判斷是否等於數組長度,若是等於就置位0,從0開始
最後count+1並notEmpty.signal喚醒由於隊列爲空致使獲取元素阻塞的線程。
1.獲取鎖,注意,是和put同一把鎖,也就是同時只能有一個線程得到操做隊列的權限,不管是入隊仍是出隊。
2.循環判斷若是此時隊空,就掛起當前線程,等待notEmpty.signal
3.出隊:
獲取元素並將數組中相應位置置空,設置下次入隊索引並count-1,notFull.signal喚醒由於隊滿而阻塞的線程並返回出隊元素
底層以單向鏈表結構存放隊列元素,put和take分別採用兩把不一樣的鎖,也就是讀寫是併發運行的,
那麼怎樣控制併發環境下的出隊和入隊呢?答案就是使用AtomicInteger原子操做當前隊列中元素數量。
下面看下有哪些實例變量:
capacity:隊列容量,能夠在構造方法中設置容量,若是不設置的話,默認爲Integer.MAX_VALUE
count:當前隊列中的元素數量,AtomicInteger類型,由於這裏的put和take是兩把鎖,也就是會併發修改count,因此這裏採用原子操做。
head:隊頭
last:隊尾
takeLock:出隊鎖
notEmpty:出隊時,若是隊空,就等待notEmpty.signal
putLock:入隊鎖
notFull:入隊時,若是隊滿,就等待notFull.signal
public void put(E e) throws InterruptedException {
// 由於在出隊時若是隊空,有些方法是非阻塞或者等待一段時間後返回null的,好比poll
// 因此禁止往隊列中入隊null
if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1;
// 將入隊元素包裝進鏈表的Node對象中 Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count;
// 獲取入隊鎖 putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */
// 由於count加操做都在putLock中,因此這裏能夠是==判斷
while (count.get() == capacity) { notFull.await(); }
// 入隊 enqueue(node);
// 獲取原來count值並原子加一 c = count.getAndIncrement();
// 若是隊列未滿,通知阻塞線程繼續入隊 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); }
// 若是隊列以前是隊空狀態,說明以前有可能存在take元素的線程由於隊空而阻塞,這裏通知takeLock喚醒等待線程 if (c == 0) signalNotEmpty(); }
很簡單,就是將剛剛的node連接到隊尾
1.先得到takeLock
2.若是隊空,掛起等待
3.出隊
4.獲取count值並減一
5.若是隊列中還有元素,notEmpty喚醒以前由於隊空阻塞的線程
6.若是take以前狀態爲隊滿,說明有可能存在由於隊滿而致使入隊阻塞的線程,
喚醒他們(其實這裏signal只喚醒一個,在被喚醒的線程完成入隊後判斷若是還有空間就繼續signal,是這樣一連串的動做)
出隊邏輯如上,保持head的item爲null,這裏須要說明的是h.next = h自身循環引用也是會被GC的
SynchronousQueue自己不存儲元素,put元素時必須同步等待元素被取走。
具備優先級的隊列
具備延時的隊列,延時不結束就不能取數據