併發編程—— LinkedTransferQueue

1. 前言

Java 中總的算起來有 8 種阻塞隊列。java

咱們分析了:node

ArrayBlockingQueue 數組隊列,咱們在 使用 ReentrantLock 和 Condition 實現一個阻塞隊列 看過了 JDK 寫的一個例子,就是該類的基本原理和實現。樓主不許備分析了。算法

LinkedBlockingDeque是一個雙向鏈表的隊列。經常使用於 「工做竊取算法」,有機會再分析。編程

DelayQueue 是一個支持延時獲取元素的無界阻塞隊列。內部用 PriorityQueue 實現。有機會再分析。數組

PriorityBlockingQueue 是一個支持優先級的無界阻塞隊列,和 DelayWorkQueue 相似。有機會再分析。併發

今天要分析的是剩下的一個比較有意思的隊列:LinkedTransferQueueapp

爲何說有意思呢?他能夠算是 LinkedBolckingQueueSynchronousQueue 和合體。less

咱們知道 SynchronousQueue 內部沒法存儲元素,當要添加元素的時候,須要阻塞,不夠完美,LinkedBolckingQueue 則內部使用了大量的鎖,性能不高。異步

兩兩結合,豈不完美?性能又高,又不阻塞。源碼分析

咱們一塊兒來看看。

2. LinkedTransferQueue 介紹

image.png

該類實現了一個 TransferQueue。該接口定義了幾個方法:

public interface TransferQueue<E> extends BlockingQueue<E> {
    // 若是可能,當即將元素轉移給等待的消費者。 
    // 更確切地說,若是存在消費者已經等待接收它(在 take 或 timed poll(long,TimeUnit)poll)中,則當即傳送指定的元素,不然返回 false。
    boolean tryTransfer(E e);

    // 將元素轉移給消費者,若是須要的話等待。 
    // 更準確地說,若是存在一個消費者已經等待接收它(在 take 或timed poll(long,TimeUnit)poll)中,則當即傳送指定的元素,不然等待直到元素由消費者接收。
    void transfer(E e) throws InterruptedException;

    // 上面方法的基礎上設置超時時間
    boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;

    // 若是至少有一位消費者在等待,則返回 true
    boolean hasWaitingConsumer();

    // 返回等待消費者人數的估計值
    int getWaitingConsumerCount();
}
複製代碼

相比較普通的阻塞隊列,增長了這麼幾個方法。

3. 關鍵源碼分析

阻塞隊列不外乎put ,take,offer ,poll等方法,再加上TransferQueue的 幾個 tryTransfer 方法。咱們看看這幾個方法的實現。

put方法:

public void put(E e) {
     xfer(e, true, ASYNC, 0);
}
複製代碼

take方法:

public E take() throws InterruptedException {
    E e = xfer(null, false, SYNC, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}
複製代碼

offer 方法:

public boolean offer(E e) {
    xfer(e, true, ASYNC, 0);
    return true;
}
複製代碼

poll 方法:

public E poll() {
    return xfer(null, false, NOW, 0);
}
複製代碼

tryTransfer 方法:

public boolean tryTransfer(E e) {
    return xfer(e, true, NOW, 0) == null;
}
複製代碼

transfer 方法:

public void transfer(E e) throws InterruptedException {
    if (xfer(e, true, SYNC, 0) != null) {
        Thread.interrupted(); // failure possible only due to interrupt
        throw new InterruptedException();
    }
}
複製代碼

可怕,全部方法都指向了xfer 方法,只不過傳入的不一樣的參數。

第一個參數,若是是 put 類型,就是實際的值,反之就是 null。 第二個參數,是否包含數據,put 類型就是 true,take 就是 false。 第三個參數,執行類型,有當即返回的NOW,有異步的ASYNC,有阻塞的SYNC, 有帶超時的 TIMED。 第四個參數,只有在 TIMED類型纔有做用。

So,這個類的關鍵方法就是 xfer 方法了。

4. xfer 方法分析

源碼加註釋:

private E xfer(E e, boolean haveData, int how, long nanos) {
    if (haveData && (e == null))
        throw new NullPointerException();
    Node s = null;                        // the node to append, if needed

    retry:
    for (;;) {                            // restart on append race
        // 從 head 開始
        for (Node h = head, p = h; p != null;) { // find & match first node
            // head 的類型。
            boolean isData = p.isData;
            // head 的數據
            Object item = p.item;
            // item != null 有 2 種狀況,一是 put 操做, 二是 take 的 itme 被修改了(匹配成功)
            // (itme != null) == isData 要麼表示 p 是一個 put 操做, 要麼表示 p 是一個還沒匹配成功的 take 操做
            if (item != p && (item != null) == isData) { 
                // 若是當前操做和 head 操做相同,就沒有匹配上,結束循環,進入下面的 if 塊。
                if (isData == haveData)   // can't match
                    break;
                // 若是操做不一樣,匹配成功, 嘗試替換 item 成功,
                if (p.casItem(item, e)) { // match
                    // 更新 head
                    for (Node q = p; q != h;) {
                        Node n = q.next;  // update by 2 unless singleton
                        if (head == h && casHead(h, n == null ? q : n)) {
                            h.forgetNext();
                            break;
                        }                 // advance and retry
                        if ((h = head)   == null ||
                            (q = h.next) == null || !q.isMatched())
                            break;        // unless slack < 2
                    }
                    // 喚醒原 head 線程.
                    LockSupport.unpark(p.waiter);
                    return LinkedTransferQueue.<E>cast(item);
                }
            }
            // 找下一個
            Node n = p.next;
            p = (p != n) ? n : (h = head); // Use head if p offlist
        }
        // 若是這個操做不是馬上就返回的類型 
        if (how != NOW) {                 // No matches available
            // 且是第一次進入這裏
            if (s == null)
                // 建立一個 node
                s = new Node(e, haveData);
            // 嘗試將 node 追加對隊列尾部,並返回他的上一個節點。
            Node pred = tryAppend(s, haveData);
            // 若是返回的是 null, 表示不能追加到 tail 節點,由於 tail 節點的模式和當前模式相反.
            if (pred == null)
                // 重來
                continue retry;           // lost race vs opposite mode
            // 若是不是異步操做(即馬上返回結果)
            if (how != ASYNC)
                // 阻塞等待匹配值
                return awaitMatch(s, pred, e, (how == TIMED), nanos);
        }
        return e; // not waiting
    }
}
複製代碼

代碼有點長,其實邏輯很簡單。

邏輯以下: 找到 head 節點,若是 head 節點是匹配的操做,就直接賦值,若是不是,添加到隊列中。

注意:隊列中永遠只有一種類型的操做,要麼是 put 類型, 要麼是 take 類型.

整個過程以下圖:

image.png

相比較 SynchronousQueue 多了一個能夠存儲的隊列,相比較 LinkedBlockingQueue 多了直接傳遞元素,少了用鎖來同步。

性能更高,用處更大。

5. 總結

LinkedTransferQueueSynchronousQueueLinkedBlockingQueue 的合體,性能比 LinkedBlockingQueue 更高(沒有鎖操做),比 SynchronousQueue能存儲更多的元素。

put 時,若是有等待的線程,就直接將元素 「交給」 等待者, 不然直接進入隊列。

puttransfer 方法的區別是,put 是當即返回的, transfer 是阻塞等待消費者拿到數據才返回。transfer方法和 SynchronousQueue的 put 方法相似。

相關文章
相關標籤/搜索