在上一篇文章中給你們介紹了牛批的AQS,大體講解了JUC中同步的思路。原本還沒想好這一篇應該寫點什麼,恰好上週某個同事的代碼出現問題,排查後發現是使用阻塞隊列不當致使的,因此本篇決定介紹下阻塞隊列。node
說來也是挺巧的,那天一位同事iMac換了Macbook Pro。而後像往常同樣啓動了各個服務,過了會電腦風扇瘋狂工做發出響聲,因爲日常iMac上IDEA項目開的比較多佔用較多內存時間長了也會卡頓,因此他並無在乎。可是以後一直是這樣咱們便以爲很奇怪,而後打開了他的活動監視器,發現某個Java進程居然佔用了百分之九十的CPU,而後確認是哪個項目,最後經過jstack查看該項目中的線程狀況,定位到了某個自定義線程,而後查看代碼發現以下:數組
MyThreadPool.exportEnclosurePool.execute(() -> {
while (true) {
BlockingQueue<EnclosureRequest> blockingQueue = requestQueue.getBlockingQueue();
while (!blockingQueue.isEmpty()) {
System.out.println("開始消費");
EnclosureRequest one = null;
try {
one = blockingQueue.take();
ossService.exportEnclosureToLocalServer(one.getEnclosureList(), one.getSobId(), one.getUserUuid(), one.getUserName(), one.getTmpFileName(), one.getZipUuidList());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
複製代碼
該同事的需求是作一個隊列化附件導出的功能,所以他選擇了生產者消費者模式,採用阻塞隊列來實現;可是因爲對此不太熟悉,因此寫出了這段有問題的代碼,致使死循環;萬幸的是這段代碼在測試分支上被咱們發現了並無上正式。正確的消費者代碼實現以下:安全
MyThreadPool.exportEnclosurePool.execute(() -> {
BlockingQueue<EnclosureRequest> blockingQueue = requestQueue.getBlockingQueue();
while (true) {
try {
EnclosureRequest one = blockingQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("開始消費");
ossService.exportEnclosureToLocalServer(one.getEnclosureList(), one.getSobId(), one.getUserUuid(), one.getUserName(), one.getTmpFileName(), one.getZipUuidList());
}
}
複製代碼
四種處理方式: bash
處理方式\方法 | 插入方法 | 移除方法 |
---|---|---|
拋出異常 | add(e) | remove() |
返回boolean值 | offer(e) | poll() |
阻塞 | put(e) | take() |
超時退出 | offer(e,time,unit) | poll(time,unit) |
💡小提示: 若是是無界阻塞隊列,隊列不可能出現滿的狀況,因此使用put()方法永遠不會被阻塞,使用offer()方法永遠返回true多線程
LinkedBlockingQueue是一個由成員變量Node組成的單鏈表結構,默認容量爲Integer的最大值,其內部還有兩把ReentrantLock鎖putLock、takeLock用於保證插入和刪除的線程安全(其餘阻塞隊列中使用一個ReentrantLock鎖),兩個Condition等待隊列notEmpty、notFull用於存放take()和put()阻塞的線程。這裏我簡單分析下它兩個比較重要的方法put()和take()。併發
/**
* 由Node節點組成單鏈表結構
*/
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
/** 用於移除操做的鎖 */
private final ReentrantLock takeLock = new ReentrantLock();
/** 阻塞於take的等待隊列 */
private final Condition notEmpty = takeLock.newCondition();
/** 用於插入操做的鎖 */
private final ReentrantLock putLock = new ReentrantLock();
/** 阻塞於put的等待隊列 */
private final Condition notFull = putLock.newCondition();
/**
* 不指定容量默認是Integer的最大值
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
* 阻塞式插入元素(隊列爲滿則阻塞)
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 獲取插入鎖(響應中斷)
putLock.lockInterruptibly();
try {
// 若是當前隊列長度到達容量上限則當前線程釋放鎖加入不爲滿等待隊列中
while (count.get() == capacity) {
notFull.await();
}
// 將元素加入隊尾
enqueue(node);
// 當前隊列長度加一(返回值是加一以前)
c = count.getAndIncrement();
// 若是加入後隊列長度小於容量上限則通知不爲滿等待隊列中的線程
if (c + 1 < capacity)
notFull.signal();
} finally {
// 釋放鎖
putLock.unlock();
}
// 若是在插入元素以前隊列爲空則通知不爲空等待隊列中的線程
if (c == 0)
signalNotEmpty();
}
/**
* 阻塞式移除元素(隊列爲空則阻塞)
*/
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 獲取移除鎖(響應中斷)
takeLock.lockInterruptibly();
try {
// 若是當前隊列爲空則當前線程釋放鎖加入不爲空等待隊列
while (count.get() == 0) {
notEmpty.await();
}
// 移除隊頭元素
x = dequeue();
c = count.getAndDecrement();
// 若是移除以後還有元素則通知不爲空等待隊列中的線程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 若是移除元素以前到達容量上線則通知不爲滿等待隊列中的線程
if (c == capacity)
signalNotFull();
return x;
}
複製代碼
須要注意的是put()操做將元素加入隊列後釋放鎖是在判斷容量是否小於上限通知notFull等待隊列以後,通知notEmpty隊列以前須要先獲取takeLock,take()操做同理。
源碼分析
💡小提示: LinkedBlockingQueue的put()和take()方法中和其餘阻塞隊列有個很大的區別。其餘阻塞隊列每次put()和take()都會去通知相應的等待隊列,可是LinkedBlockingQueue只有在put前是空的去通知notEmpty,take前是滿的去通知notFull等待隊列,而且put後未滿去通知notFull等待隊列,take後未空去通知notEmpty等待隊列。關於這點我我的的理解是因爲LinkedBlockingQueue裏分讀寫鎖,若是每次take都通知notFull的話,須要另外去獲取putLock產生競爭;用已經獲取putLock的線程去喚醒notFull等待隊列中線程減小了鎖的競爭。其餘阻塞隊列中只有一把鎖,因此通知不須要另外競爭鎖。固然這只是我我的的見解而已,但願有了解的小夥伴指教。post
阻塞隊列在併發中很重要,前面介紹的線程池中就用到了阻塞隊列,生產者消費者模型也是能夠用阻塞隊列實現,到此已經介紹了AQS、阻塞隊列、線程池,但願大家能關聯起來理解加深印象。測試
往期文章:ui
歡迎一樣有感興趣的小夥伴一塊兒探討