Java 中總的算起來有 8 種阻塞隊列。java
咱們分析了:node
ArrayBlockingQueue
數組隊列,咱們在 使用 ReentrantLock 和 Condition 實現一個阻塞隊列 看過了 JDK 寫的一個例子,就是該類的基本原理和實現。樓主不許備分析了。算法
LinkedBlockingDeque
是一個雙向鏈表的隊列。經常使用於 「工做竊取算法」,有機會再分析。編程
DelayQueue
是一個支持延時獲取元素的無界阻塞隊列。內部用 PriorityQueue
實現。有機會再分析。數組
PriorityBlockingQueue
是一個支持優先級的無界阻塞隊列,和 DelayWorkQueue
相似。有機會再分析。併發
今天要分析的是剩下的一個比較有意思的隊列:LinkedTransferQueue
。app
爲何說有意思呢?他能夠算是 LinkedBolckingQueue
和 SynchronousQueue
和合體。less
咱們知道 SynchronousQueue
內部沒法存儲元素,當要添加元素的時候,須要阻塞,不夠完美,LinkedBolckingQueue
則內部使用了大量的鎖,性能不高。異步
兩兩結合,豈不完美?性能又高,又不阻塞。源碼分析
咱們一塊兒來看看。
該類實現了一個 TransferQueue。該接口定義了幾個方法:
public interface TransferQueue<E> extends BlockingQueue<E> {
// 若是可能,當即將元素轉移給等待的消費者。
// 更確切地說,若是存在消費者已經等待接收它(在 take 或 timed poll(long,TimeUnit)poll)中,則當即傳送指定的元素,不然返回 false。
boolean tryTransfer(E e);
// 將元素轉移給消費者,若是須要的話等待。
// 更準確地說,若是存在一個消費者已經等待接收它(在 take 或timed poll(long,TimeUnit)poll)中,則當即傳送指定的元素,不然等待直到元素由消費者接收。
void transfer(E e) throws InterruptedException;
// 上面方法的基礎上設置超時時間
boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
// 若是至少有一位消費者在等待,則返回 true
boolean hasWaitingConsumer();
// 返回等待消費者人數的估計值
int getWaitingConsumerCount();
}
複製代碼
相比較普通的阻塞隊列,增長了這麼幾個方法。
阻塞隊列不外乎put ,take,offer ,poll
等方法,再加上TransferQueue
的 幾個 tryTransfer
方法。咱們看看這幾個方法的實現。
put
方法:
public void put(E e) {
xfer(e, true, ASYNC, 0);
}
複製代碼
take
方法:
public E take() throws InterruptedException {
E e = xfer(null, false, SYNC, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
複製代碼
offer
方法:
public boolean offer(E e) {
xfer(e, true, ASYNC, 0);
return true;
}
複製代碼
poll
方法:
public E poll() {
return xfer(null, false, NOW, 0);
}
複製代碼
tryTransfer
方法:
public boolean tryTransfer(E e) {
return xfer(e, true, NOW, 0) == null;
}
複製代碼
transfer
方法:
public void transfer(E e) throws InterruptedException {
if (xfer(e, true, SYNC, 0) != null) {
Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();
}
}
複製代碼
可怕,全部方法都指向了xfer
方法,只不過傳入的不一樣的參數。
第一個參數,若是是 put
類型,就是實際的值,反之就是 null。 第二個參數,是否包含數據,put 類型就是 true,take 就是 false。 第三個參數,執行類型,有當即返回的NOW
,有異步的ASYNC
,有阻塞的SYNC
, 有帶超時的 TIMED
。 第四個參數,只有在 TIMED
類型纔有做用。
So,這個類的關鍵方法就是 xfer 方法了。
源碼加註釋:
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed
retry:
for (;;) { // restart on append race
// 從 head 開始
for (Node h = head, p = h; p != null;) { // find & match first node
// head 的類型。
boolean isData = p.isData;
// head 的數據
Object item = p.item;
// item != null 有 2 種狀況,一是 put 操做, 二是 take 的 itme 被修改了(匹配成功)
// (itme != null) == isData 要麼表示 p 是一個 put 操做, 要麼表示 p 是一個還沒匹配成功的 take 操做
if (item != p && (item != null) == isData) {
// 若是當前操做和 head 操做相同,就沒有匹配上,結束循環,進入下面的 if 塊。
if (isData == haveData) // can't match
break;
// 若是操做不一樣,匹配成功, 嘗試替換 item 成功,
if (p.casItem(item, e)) { // match
// 更新 head
for (Node q = p; q != h;) {
Node n = q.next; // update by 2 unless singleton
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
// 喚醒原 head 線程.
LockSupport.unpark(p.waiter);
return LinkedTransferQueue.<E>cast(item);
}
}
// 找下一個
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
// 若是這個操做不是馬上就返回的類型
if (how != NOW) { // No matches available
// 且是第一次進入這裏
if (s == null)
// 建立一個 node
s = new Node(e, haveData);
// 嘗試將 node 追加對隊列尾部,並返回他的上一個節點。
Node pred = tryAppend(s, haveData);
// 若是返回的是 null, 表示不能追加到 tail 節點,由於 tail 節點的模式和當前模式相反.
if (pred == null)
// 重來
continue retry; // lost race vs opposite mode
// 若是不是異步操做(即馬上返回結果)
if (how != ASYNC)
// 阻塞等待匹配值
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}
複製代碼
代碼有點長,其實邏輯很簡單。
邏輯以下: 找到 head
節點,若是 head
節點是匹配的操做,就直接賦值,若是不是,添加到隊列中。
注意:隊列中永遠只有一種類型的操做,要麼是 put
類型, 要麼是 take
類型.
整個過程以下圖:
相比較 SynchronousQueue
多了一個能夠存儲的隊列,相比較 LinkedBlockingQueue
多了直接傳遞元素,少了用鎖來同步。
性能更高,用處更大。
LinkedTransferQueue
是 SynchronousQueue
和 LinkedBlockingQueue
的合體,性能比 LinkedBlockingQueue
更高(沒有鎖操做),比 SynchronousQueue
能存儲更多的元素。
當 put
時,若是有等待的線程,就直接將元素 「交給」 等待者, 不然直接進入隊列。
put
和 transfer
方法的區別是,put 是當即返回的, transfer 是阻塞等待消費者拿到數據才返回。transfer
方法和 SynchronousQueue
的 put 方法相似。