整理了阻塞隊列LinkedBlockingQueue的學習筆記,但願對你們有幫助。有哪裏不正確,歡迎指出,感謝。html
咱們先來看看LinkedBlockingQueue的繼承體系。使用IntelliJ IDEA查看類的繼承關係圖形 node
- 藍色實線箭頭是指類繼承關係
- 綠色箭頭實線箭頭是指接口繼承關係
- 綠色虛線箭頭是指接口實現關係。
LinkedBlockingQueue實現了序列化接口 Serializable,所以它有序列化的特性。 LinkedBlockingQueue實現了BlockingQueue接口,BlockingQueue繼承了Queue接口,所以它擁有了隊列Queue相關方法的操做。編程
類圖來自Java併發編程之美 安全
LinkedBlockingQueue主要特性:bash
//容量範圍,默認值爲 Integer.MAX_VALUE
private final int capacity;
//當前隊列元素個數
private final AtomicInteger count = new AtomicInteger();
//頭結點
transient Node<E> head;
//尾節點
private transient Node<E> last;
//take, poll等方法的可重入鎖
private final ReentrantLock takeLock = new ReentrantLock();
//當隊列爲空時,執行出隊操做(好比take )的線程會被放入這個條件隊列進行等待
private final Condition notEmpty = takeLock.newCondition();
//put, offer等方法的可重入鎖
private final ReentrantLock putLock = new ReentrantLock();
//當隊列滿時, 執行進隊操做( 好比put)的線程會被放入這個條件隊列進行等待
private final Condition notFull = putLock.newCondition();
複製代碼
LinkedBlockingQueue有三個構造函數:數據結構
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
複製代碼
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
//設置隊列大小
this.capacity = capacity;
//new一個null節點,head、tail節點指向該節點
last = head = new Node<E>(null);
}
複製代碼
public LinkedBlockingQueue(Collection<? extends E> c) {
//調用指定容量的構造器
this(Integer.MAX_VALUE);
//獲取put, offer的可重入鎖
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
int n = 0;
//循環向隊列中添加集合中的元素
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
//將隊列的last節點指向該節點
enqueue(new Node<E>(e));
++n;
}
//更新容量值
count.set(n);
} finally {
//釋放鎖
putLock.unlock();
}
}
複製代碼
static class Node<E> {
// 當前節點的元素值
E item;
// 下一個節點的索引
Node<E> next;
//節點構造器
Node(E x) {
item = x;
}
}
複製代碼
LinkedBlockingQueue的節點符合單向鏈表的數據結構要求:併發
item表示當前節點的元素值,next表示指向下一節點的指針函數
入隊方法,其實就是向隊列的尾部插入一個元素。若是元素爲空,拋出空指針異常。若是隊列已滿,則丟棄當前元素,返回false,它是非阻塞的。若是隊列空閒則插入成功返回true。學習
offer方法源碼以下:ui
public boolean offer(E e) {
//爲空直接拋空指針
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
//若是當前隊列滿了的話,直接返回false
if (count.get() == capacity)
return false;
int c = -1;
//構造新節點
Node<E> node = new Node<E>(e);
獲取put獨佔鎖
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//判斷隊列是否已滿
if (count.get() < capacity) {
//進隊列
enqueue(node);
//遞增元素計數
c = count.getAndIncrement();
//若是元素入隊,還有空閒,則喚醒notFull條件隊列裏被阻塞的線程
if (c + 1 < capacity)
notFull.signal();
}
} finally {
//釋放鎖
putLock.unlock();
}
//若是容量爲0,則
if (c == 0)
//激活 notEmpty 的條件隊列,喚醒被阻塞的線程
signalNotEmpty();
return c >= 0;
}
複製代碼
enqueue方法源碼以下:
private void enqueue(Node<E> node) {
//從尾節點加進去
last = last.next = node;
}
複製代碼
爲了形象生動,咱們用一張圖來看看往隊列裏依次放入元素A和元素B。圖片參考來源【細談Java併發】談談LinkedBlockingQueue
signalNotEmpty方法源碼以下
private void signalNotEmpty() {
//獲取take獨佔鎖
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//喚醒notEmpty條件隊列裏被阻塞的線程
notEmpty.signal();
} finally {
//釋放鎖
takeLock.unlock();
}
}
複製代碼
基本流程:
put方法也是向隊列尾部插入一個元素。若是元素爲null,拋出空指針異常。若是隊列己滿則阻塞當前線程,直到隊列有空閒插入成功爲止。若是隊列空閒則插入成功,直接返回。若是在阻塞時被其餘線程設置了中斷標誌, 則被阻塞線程會拋出 InterruptedException 異常而返回。
public void put(E e) throws InterruptedException {
////爲空直接拋空指針異常
if (e == null) throw new NullPointerException();
int c = -1;
// 構造新節點
Node<E> node = new Node<E>(e);
//獲取putLock獨佔鎖
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//獲取獨佔鎖,它跟lock的區別,是能夠被中斷
putLock.lockInterruptibly();
try {
//隊列已滿線程掛起等待
while (count.get() == capacity) {
notFull.await();
}
//進隊列
enqueue(node);
//遞增元素計數
c = count.getAndIncrement();
//若是元素入隊,還有空閒,則喚醒notFull條件隊列裏被阻塞的線程
if (c + 1 < capacity)
notFull.signal();
} finally {
//釋放鎖
putLock.unlock();
}
//若是容量爲0,則
if (c == 0)
//激活 notEmpty 的條件隊列,喚醒被阻塞的線程
signalNotEmpty();
}
複製代碼
基本流程:
從隊列頭部獲取並移除一個元素, 若是隊列爲空則返回 null, 該方法是不阻塞的。
poll方法源代碼
public E poll() {
final AtomicInteger count = this.count;
//若是隊列爲空,返回null
if (count.get() == 0)
return null;
E x = null;
int c = -1;
//獲取takeLock獨佔鎖
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//若是隊列不爲空,則出隊,並遞減計數
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
////容量大於1,則激活 notEmpty 的條件隊列,喚醒被阻塞的線程
if (c > 1)
notEmpty.signal();
}
} finally {
//釋放鎖
takeLock.unlock();
}
if (c == capacity)
//喚醒notFull條件隊列裏被阻塞的線程
signalNotFull();
return x;
}
複製代碼
dequeue方法源代碼
//出隊列
private E dequeue() {
//獲取head節點
Node<E> h = head;
//獲取到head節點指向的下一個節點
Node<E> first = h.next;
//head節點原來指向的節點的next指向本身,等待下次gc回收
h.next = h; // help GC
// head節點指向新的節點
head = first;
// 獲取到新的head節點的item值
E x = first.item;
// 新head節點的item值設置爲null
first.item = null;
return x;
}
複製代碼
爲了形象生動,咱們用一張圖來描述出隊過程。圖片參考來源【細談Java併發】談談LinkedBlockingQueue
signalNotFull方法源碼
private void signalNotFull() {
//獲取put獨佔鎖
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
////喚醒notFull條件隊列裏被阻塞的線程
notFull.signal();
} finally {
//釋放鎖
putLock.unlock();
}
}
複製代碼
基本流程:
獲取隊列頭部元素可是不從隊列裏面移除它,若是隊列爲空則返回 null。 該方法是不 阻塞的。
public E peek() {
//隊列容量爲0,返回null
if (count.get() == 0)
return null;
//獲取takeLock獨佔鎖
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
//判斷first是否爲null,若是是直接返回
if (first == null)
return null;
else
return first.item;
} finally {
//釋放鎖
takeLock.unlock();
}
}
複製代碼
基本流程:
獲取當前隊列頭部元素並從隊列裏面移除它。 若是隊列爲空則阻塞當前線程直到隊列 不爲空而後返回元素,若是在阻塞時被其餘線程設置了中斷標誌, 則被阻塞線程會拋出 InterruptedException 異常而返回。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
//獲取takeLock獨佔鎖
final ReentrantLock takeLock = this.takeLock;
//獲取獨佔鎖,它跟lock的區別,是能夠被中斷
takeLock.lockInterruptibly();
try {
//當前隊列爲空,則阻塞掛起
while (count.get() == 0) {
notEmpty.await();
}
//)出隊並遞減計數
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
//激活 notEmpty 的條件隊列,喚醒被阻塞的線程
notEmpty.signal();
} finally {
//釋放鎖
takeLock.unlock();
}
if (c == capacity)
//激活 notFull 的條件隊列,喚醒被阻塞的線程
signalNotFull();
return x;
}
複製代碼
基本流程:
刪除隊列裏面指定的元素,有則刪除並返回 true,沒有則返回 false。
public boolean remove(Object o) {
//爲空直接返回false
if (o == null) return false;
//雙重加鎖
fullyLock();
try {
//邊歷隊列,找到元素則刪除並返回true
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
//執行unlink操做
unlink(p, trail);
return true;
}
}
return false;
} finally {
//解鎖
fullyUnlock();
}
}
複製代碼
雙重加鎖,fullyLock方法源代碼
void fullyLock() {
//putLock獨佔鎖加鎖
putLock.lock();
//takeLock獨佔鎖加鎖
takeLock.lock();
}
複製代碼
unlink方法源代碼
void unlink(Node<E> p, Node<E> trail) {
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
//若是當前隊列滿 ,則刪除後,也不忘記喚醒等待的線程
if (count.getAndDecrement() == capacity)
notFull.signal();
}
複製代碼
fullyUnlock方法源代碼
void fullyUnlock() {
//與雙重加鎖順序相反,先解takeLock獨佔鎖
takeLock.unlock();
putLock.unlock();
}
複製代碼
基本流程
獲取當前隊列元素個數。
public int size() {
return count.get();
}
複製代碼
因爲進行出隊、入隊操做時的 count是加了鎖的,因此結果相比ConcurrentLinkedQueue 的 size 方法比較準確。
Java併發編程之美中,有一張圖唯妙唯肖描述了它,以下圖: