JUC源碼分析-集合篇(十)LinkedTransferQueue

JUC源碼分析-集合篇(十)LinkedTransferQueue

LinkedTransferQueue(LTQ) 相比 BlockingQueue 更進一步,生產者會一直阻塞直到所添加到隊列的元素被某一個消費者所消費(不只僅是添加到隊列裏就完事)。新添加的 transfer 方法用來實現這種約束。顧名思義,阻塞就是發生在元素從一個線程 transfer 到另外一個線程的過程當中,它有效地實現了元素在線程之間的傳遞(以創建 Java 內存模型中的 happens-before 關係的方式)。Doug Lea 說從功能角度來說,LinkedTransferQueue 其實是 ConcurrentLinkedQueue、SynchronousQueue(公平模式)和 LinkedBlockingQueue 的超集。並且 LinkedTransferQueue 更好用,由於它不只僅綜合了這幾個類的功能,同時也提供了更高效的實現。html

1. LinkedTransferQueue 概況

推薦一篇 LinkedTransferQueue 的介紹:http://ifeve.com/java-transfer-queue/java

1.1 TransferQueue 接口

LinkedTransferQueue 實現了 TransferQueue 接口,下面就主要介紹一下這個接口。 TransferQueue 繼承了 BlockingQueue(BlockingQueue 又繼承了 Queue)並擴展了一些新方法。BlockingQueue(和Queue)是 JDK5 中加入的接口,它是指這樣的一個隊列:當生產者向隊列添加元素但隊列已滿時,生產者會被阻塞;當消費者從隊列移除元素但隊列爲空時,消費者會被阻塞。node

TransferQueue 則更進一步,生產者會一直阻塞直到所添加到隊列的元素被某一個消費者所消費(不只僅是添加到隊列裏就完事)。新添加的 transfer 方法用來實現這種約束。顧名思義,阻塞就是發生在元素從一個線程 transfer 到另外一個線程的過程當中,它有效地實現了元素在線程之間的傳遞(以創建 Java 內存模型中的 happens-before 關係的方式)。算法

TransferQueue 還包括了其餘的一些方法:兩個 tryTransfer 方法,一個是非阻塞的,另外一個帶有 timeout 參數設置超時時間的。還有兩個輔助方法 hasWaitingConsumer() 和 getWaitingConsumerCount()。緩存

1.2 LinkedTransferQueue 特色

當我第一次看到 LinkedTransferQueue 時,首先想到了已有的實現類 SynchronousQueue。SynchronousQueue 的隊列長度爲 0,特別是對於兩個線程之間傳遞元素這種用例。數據結構

LinkedTransferQueue 相比 SynchronousQueue 用處更廣、更好用,由於你能夠決定是使用 BlockingQueue 的方法(譯者注:例如put方法)仍是確保一次傳遞完成(譯者注:即transfer方法)。在隊列中已有元素的狀況下,調用 transfer 方法,能夠確保隊列中被傳遞元素以前的全部元素都能被處理。app

LinkedTransferQueue 的性能分別是 SynchronousQueue 的3倍(非公平模式)和14倍(公平模式)。由於像 ThreadPoolExecutor 這樣的類在任務傳遞時都是使用 SynchronousQueue,因此使用 LinkedTransferQueue 來代替 SynchronousQueue 也會使得 ThreadPoolExecutor 獲得相應的性能提高。less

下面你能夠參考這往篇文章實現一個本身的 LinkedTransferQueue:http://ifeve.com/customizing-concurrency-classes-11-2/#more-7388dom

2. LTQ 原理

LTQ 內部採用的是一種很是不一樣的隊列,即鬆弛型雙重隊列(Dual Queues with Slack):http://ifeve.com/buglinkedtransferqueue-bug/#more-11117異步

強烈建議你們讀一下 Doug Lea 的 java doc 文檔,對 LTQ 的數據結構有很清楚的說明。

2.1 雙重隊列(Dual Queues)

/**
 * A FIFO dual queue may be implemented using a variation of the
 * Michael & Scott (M&S) lock-free queue algorithm
 * (http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf).
 * It maintains two pointer fields, "head", pointing to a
 * (matched) node that in turn points to the first actual
 * (unmatched) queue node (or null if empty); and "tail" that
 * points to the last node on the queue (or again null if
 * empty). For example, here is a possible queue with four data
 * elements:
 *
 *  head                tail
 *    |                   |
 *    v                   v
 *    M -> U -> U -> U -> U
 *    
 *  M(matched)  U(unmatched)
 */

翻譯:FIFO 雙隊列可使用 Michael & Scott(M&S)無鎖隊列算法的變體實現。它維護兩個指針字段: head 指向第一個不匹配節點(M)的前驅節點(若是爲空則爲空);tail 指向隊列中的最後一個節點(若是爲空則爲空)。

雙重是指有兩種類型相互對立的節點(Node.isData==false或true),而且我理解的每種節點都有三種狀態:

  1. UNMATCHED 節點構造完成,剛進入隊列的狀態
  2. MATCHED 節點備置爲「知足」狀態,即入隊節點標識的線程成功取得或者傳遞了數據
  3. CANCELED 節點被置爲取消狀態,即入隊節點標識的線程由於超時或者中斷決定放棄等待

2.2 鬆弛度(Slack)

/**
 * 在更新head/tail和查找中尋求平衡,大多數場景1~3比較合適。
 * 本質上:是增長對 volatile 變量讀操做來減小了對 volatile 變量的寫操做
 * 而對 volatile 變量的寫操做開銷要遠遠大於讀操做,所以使用Slack能增長效率
 * 
 * We introduce here an approach that lies between the extremes of
 * never versus always updating queue (head and tail) pointers.
 * This offers a tradeoff between sometimes requiring extra
 * traversal steps to locate the first and/or last unmatched
 * nodes, versus the reduced overhead and contention of fewer
 * updates to queue pointers. For example, a possible snapshot of
 * a queue is:
 *
 *  head           tail
 *    |              |
 *    v              v
 *    M -> M -> U -> U -> U -> U
 *
 * The best value for this "slack" (the targeted maximum distance
 * between the value of "head" and the first unmatched node, and
 * similarly for "tail") is an empirical matter. We have found
 * that using very small constants in the range of 1-3 work best
 * over a range of platforms. Larger values introduce increasing
 * costs of cache misses and risks of long traversal chains, while
 * smaller values increase CAS contention and overhead.
 */

爲了節省 CAS 操做的開銷,LTQ 引入了「鬆弛度」的概念:在節點被匹配(被刪除)以後,不會當即更新 head/tail,而是當 head/tail 節點和最近一個未匹配的節點之間的距離超過一個「鬆弛閥值」以後纔會更新(在 LTQ 中,這個值爲 2)。這個「鬆弛閥值」通常爲1-3,若是太大會下降緩存命中率,而且會增長遍歷鏈的長度;過小會增長 CAS 的開銷。另外在 ConcurrentLinkedQueue 也有相應的應用:hops 設計意圖

2.3 節點自連接

已匹配節點的 next 引用會指向自身。若是 GC 延遲迴收,已刪除節點鏈會積累的很長,此時垃圾收集會耗費高昂的代價,而且全部剛匹配的節點也不會被回收。爲了不這種狀況,咱們在 CAS 向後推動 head 時,會把已匹配的 head 的"next"引用指向自身(即「自連接節點」),這樣就限制了鏈接已刪除節點的長度(咱們也採起相似的方法,清除在其餘節點字段中可能的垃圾保留值)。若是在遍歷時遇到一個自連接節點,那就代表當前線程已經滯後於另一個更新 head 的線程,此時就須要從新獲取 head 來遍歷。

因此,在 LTQ 中,數據在某個線程的「某一時刻」可能存在下面這種形式:

LTQ節點

  • unmatched node:未被匹配的節點。多是一個生產者節點(item!=null),也多是一個消費者節點(item==null)。
  • matched node:已經被匹配的節點。多是一個生產者節點(item!=null)的數據已經被一個消費者拿走;也多是一個消費者節點(item==null)已經被一個生產者填充上數據。

3. 數據結構

數據結構

3.1 Node 節點

static final class QNode {
    volatile Object item;         // 節點包含的數據,非空表示生產者,空者是消費者
    final boolean isData;         // 表示該節點由生產者建立仍是由消費者建立,生產者true,消費者false  
    volatile Thread waiter;       // 等待在該節點上的線程。to control park/unpark
    volatile QNode next;          // 指向隊列中的下一個節點
}

Node 節點自己就是一個原子性操做,對節點的屬性 item、waiter、next 都是原子性操做。

  • forgetNext 是將節點踢出隊列。
  • forgetContents 是將節點踢出隊列後,將節點的屬性 item 和 waiter 置空或自鏈接,便於 GC 垃圾回收。
  • isMatched 判斷節點是否已經匹配成功。
  • isUnmatchedRequest 是不是請求節點,且還未匹配成功。
  • cannotPrecede 可否將指定的節點 node 追加到當前節點後。node 節點屬性與當前節點相反,且當前節點還未進行匹配則不能追加。
  • tryMatchData 嘗試匹配數據節點,匹配成功返回 true,即將 item 設置爲 null 成功

3.2 LinkedTransferQueue

transient volatile Node head;
private transient volatile Node tail;

// 馬上、異步、同步、超時返回
private static final int NOW   = 0; // for untimed poll, tryTransfer
private static final int ASYNC = 1; // for offer, put, add
private static final int SYNC  = 2; // for transfer, take
private static final int TIMED = 3; // for timed poll, tryTransfer

LinkedTransferQueue 主要方法介紹:

  • transfer 阻塞式的將數據從一個線程傳遞到另外一個線程。
  • tryTransfer 則是非阻塞式的將數據從一個線程傳遞到另外一個線程。
  • xfer 最核心的方法。將數據從一個線程傳遞到另外一個線程。
  • tryAppend 將節點添加到隊列中。
  • awaitMatch 匹配節點。
public LinkedTransferQueue() {
}

LinkedTransferQueue 初始化時什麼也沒作,也就是說 head=tail=null。

4. 源碼分析

4.1 核心方法 xfer

/** 
 * @param e the item or null for take
 * @param haveData true if this is a put, else a take
 * @param how NOW, ASYNC, SYNC, or TIMED
 * @param nanos timeout in nanosecs, used only if mode is TIMED
 */
private E xfer(E e, boolean haveData, int how, long nanos) {
    if (haveData && (e == null))
        throw new NullPointerException();
    Node s = null;                        // the node to append, if needed

    retry:
    for (;;) {                            // restart on append race
        // 1. 嘗試匹配
        for (Node h = head, p = h; p != null;) { // find & match first node
            boolean isData = p.isData;
            Object item = p.item;
            // 1.2 p節點還未匹配則嘗試進行匹配,爲何不調用 !p.isMatched() ????
            if (item != p && (item != null) == isData) { // unmatched
                // 1.3 兩個節點的模式同樣,則直接跳出循環,嘗試入隊
                if (isData == haveData)   // can't match
                    break;
                // 1.4 p匹配成功
                if (p.casItem(item, e)) { // match
                    for (Node q = p; q != h;) {
                        // 1.5 p已經匹配,直接將n設置爲頭節點。h -> p -> n
                        Node n = q.next;  // update by 2 unless singleton
                        if (head == h && casHead(h, n == null ? q : n)) {
                            h.forgetNext();
                            break;
                        }                 // advance and retry
                        // 1.6 有其它線程更新了頭節點,再次判斷 slack<2。
                        //     h -> q 若是 q.isMatched() 則能夠將 q.next 設置爲頭節點
                        if ((h = head)   == null ||
                            (q = h.next) == null || !q.isMatched())
                            break;        // unless slack < 2
                    }
                    LockSupport.unpark(p.waiter);   // 喚醒等待的線程後直接返回
                    return LinkedTransferQueue.<E>cast(item);
                }
            }
            // 1.7 p==p.next 則說明p已經出隊,失效了。須要從新從頭節點開始匹配
            Node n = p.next;
            p = (p != n) ? n : (h = head); // Use head if p offlist
        }

        // 2. 到了這一步,只有未匹配上一種狀況。根據how判斷節點是否要入隊並等待其它線程匹配
        if (how != NOW) {                 // No matches available
            if (s == null)
                s = new Node(e, haveData);
            // 2.1 節點嘗試入隊,入隊失敗繼續嘗試
            Node pred = tryAppend(s, haveData);
            if (pred == null)
                continue retry;           // lost race vs opposite mode
            // 2.2 等待其它線程匹配成功後喚醒當前線程
            if (how != ASYNC)
                return awaitMatch(s, pred, e, (how == TIMED), nanos);
        }
        return e; // not waiting
    }
}

說明:xfer 大體能夠分三部分:

  1. 首先,節點嘗試和隊列中已經的元素進行匹配,匹配成功(1.4)則喚醒等待節點的線程後直接返回。匹配成功只要鬆弛度大於等於2(h -> p -> n),須要從新將頭節點設置爲 n。
  2. 其次,匹配失敗則調用 tryAppend 嘗試入隊(2.1),入隊失敗後則自旋直至入隊成功,入隊後線程會自旋或被掛起;
  3. 最後,調用 awaitMatch 方法(2.2),等待其它線程匹配上後喚醒該線程。

若是在上述操做中沒有找到匹配節點,則根據參數 how 作不一樣的處理:

  • NOW(poll, tryTransfer):當即返回。
  • SYNC(transfer, take):經過 tryAppend 方法插入一個新的節點 s(item=e,isData = haveData)到隊列尾,而後自旋或阻塞當前線程直到節點被匹配或者取消返回。
  • ASYNC(offer, put, add):經過 tryAppend 方法插入一個新的節點 s(item=e,isData = haveData)到隊列尾,異步直接返回。
  • TIMED(timed poll, tryTransfer):經過 tryAppend 方法插入一個新的節點 s(item=e,isData = haveData)到隊列尾,而後自旋或阻塞當前線程直到節點被匹配或者取消或等待超時返回。
// 1. NOW(poll, tryTransfer)
public boolean tryTransfer(E e) {
    return xfer(e, true, NOW, 0) == null;
}
// 2. SYNC(transfer, take)
public void transfer(E e) throws InterruptedException {
    if (xfer(e, true, SYNC, 0) != null) {
        Thread.interrupted(); // failure possible only due to interrupt
        throw new InterruptedException();
    }
}
// 3. ASYNC(offer, put, add)
public boolean offer(E e) {
    xfer(e, true, ASYNC, 0);
    return true;
}
// 4. TIMED(timed poll, tryTransfer)
public boolean tryTransfer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
        return true;
    if (!Thread.interrupted())
        return false;
    throw new InterruptedException();
}

這裏能夠看到若是使用異步(ASYNC)的方式時線程不會阻塞,如 offer 時同一線程的數據節點也能夠入隊,也就是存儲的數據長度再也不是 0,這也是和 SynchronousQueue 一個很大的不一樣點。因此 Doug Lea 說從功能角度來說,LinkedTransferQueue 其實是 ConcurrentLinkedQueue、SynchronousQueue(公平模式)和 LinkedBlockingQueue 的超集。

4.2 入隊 tryAppend

private Node tryAppend(Node s, boolean haveData) {
    for (Node t = tail, p = t;;) {        // move p to last node and append
        Node n, u;                        // temps for reads of next & tail
        // 1. 節點初始化
        if (p == null && (p = head) == null) {
            if (casHead(null, s))
                return s;                 // initialize
        }
        // 2. 節點s不能追加到p節點後。①p和s的模式不一樣且②p還未匹配
        else if (p.cannotPrecede(haveData))
            return null;                  // lost race vs opposite mode
        // 3. p 不是尾節點
        else if ((n = p.next) != null)    // not last; keep traversing
            // t -> p 時 tail 改變則須要從新定位到尾節點
            // p節點已經出隊則須要從 head 開始從新遍歷
            p = p != t && t != (u = tail) ? (t = u) : // stale tail
                (p != n) ? n : null;      // restart if off list
        // 4. 有其它線程添加節點時繼續自旋,直到成功
        else if (!p.casNext(null, s))
            p = p.next;                   // re-read on CAS failure
        // 5. 終於添加到隊列中。嘗試更新尾節點
        else {
            // 若是 p!=t 則隊列狀況以下,須要更新尾節點: t -> p -> s
            if (p != t) {                 // update if slack now >= 2
                // 5.1 其它線程已經更新 tail,從新進行下面三個條件的判斷
                // 5.2 t.next.next!=null 則須要從新更新 tail。至於s!=t則是此時t沒有踢出隊列
                while ((tail != t || !casTail(t, s)) &&     
                       (t = tail)   != null &&
                       (s = t.next) != null && // advance and retry
                       (s = s.next) != null && s != t);
            }
            return p;
        }
    }
}

說明:添加給定節點 s 到隊列尾並返回 s 的前繼節點;失敗時(與其餘不一樣模式線程競爭失敗)返回 null,此時 s 的前繼節點 p 的模式和 s 不一樣且 p 還沒有被匹配,如 s 爲請求節點,p 爲數據節點且未匹配則不能將 s 追加到 p 後面。

4.3 匹配 awaitMatch

/**
 * Spins/yields/blocks 直到s節點matched或canceled
 *
 * @param s the waiting node
 * @param pred s的前驅節點,若是沒有前驅節點則爲s本身
 * @param e s節點的原始值
 * @param timed true時限時等待,false時無限等待
 * @param nanos timeout in nanosecs, used only if timed is true
 * @return matched item, or e if unmatched on interrupt or timeout
 */
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    int spins = -1; // initialized after first item and cancel checks
    ThreadLocalRandom randomYields = null; // bound if needed

    for (;;) {
        Object item = s.item;
        // 1. item已經被修改,說明匹配成功。返回匹配後的值
        if (item != e) {                  // matched
            // assert item != s;
            s.forgetContents();           // avoid garbage
            return LinkedTransferQueue.<E>cast(item);
        }
        // 2. 超時,返回匹配前的值
        if ((w.isInterrupted() || (timed && nanos <= 0)) && s.casItem(e, s)) {        // cancel
            unsplice(pred, s);
            return e;
        }
        // 3. 設置自旋次數
        if (spins < 0) {                  // establish spins at/near front
            if ((spins = spinsFor(pred, s.isData)) > 0)
                randomYields = ThreadLocalRandom.current();
        // 4. 自旋,有很小的機率調用 yeild
        } else if (spins > 0) {             // spin
            --spins;
            if (randomYields.nextInt(CHAINED_SPINS) == 0)
                Thread.yield();           // occasionally yield
        // 5. 設置等待線程,讓其它線程喚醒
        } else if (s.waiter == null) {
            s.waiter = w;                 // request unpark then recheck
        // 6. 阻塞直至其它線程喚醒,繼續循環直到匹配成功或超時退出
        } else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos > 0L)
                LockSupport.parkNanos(this, nanos);
        } else {
            LockSupport.park(this);
        }
    }
}

說明:當前操做爲同步操做時,會調用 awaitMatch 方法阻塞等待匹配,成功返回匹配後節點 item,超時則返回匹配前節點的 item 值 e。在等待期間若是線程被中斷或等待超時,則取消匹配,並調用 unsplice 方法解除節點 s 和其前繼節點的連接。

// 計算自旋次數 FRONT_SPINS=1<<7,CHAINED_SPINS=1<<6
private static int spinsFor(Node pred, boolean haveData) {
    if (MP && pred != null) {
        if (pred.isData != haveData)      // phase change
            return FRONT_SPINS + CHAINED_SPINS;
        if (pred.isMatched())             // probably at front
            return FRONT_SPINS;
        if (pred.waiter == null)          // pred apparently spinning
            return CHAINED_SPINS;
    }
    return 0;   // 單核CPU時不自旋
}

4.4 其它方法說明

4.4.1 數據節點個數 size

// 統計數據節點個數
public int size() {
    return countOfMode(true);
}
// 統計請求節點個數
public int getWaitingConsumerCount() {
    return countOfMode(false);
}

private int countOfMode(boolean data) {
    int count = 0;
    for (Node p = head; p != null; ) {
        if (!p.isMatched()) {       // p未匹配且屬性指定的data模式則 ++count
            if (p.isData != data)
                return 0;
            if (++count == Integer.MAX_VALUE) // saturated
                break;
        }
        Node n = p.next;            
        if (n != p)     // 下一個節點
            p = n;
        else {          // p節點失效則計數器歸0,從新從 head 遍歷
            count = 0;
            p = head;
        }
    }
    return count;
}

4.4.2 包含 contains

// 查找是否包含指定的數據節點 item=o
public boolean contains(Object o) {
    if (o == null) return false;
    for (Node p = head; p != null; p = succ(p)) {
        Object item = p.item;
        if (p.isData) {
            if (item != null && item != p && o.equals(item))
                return true;
        } else if (item == null)    // 有請求節點了,不用再匹配
            break;
    }
    return false;
}
// 後繼節點,若是節點失效,直接從 head 開始
final Node succ(Node p) {
    Node next = p.next;
    return (p == next) ? head : next;
}

4.4.3 是否有請求節點 hasWaitingConsumer

// 是否有請求節點
public boolean hasWaitingConsumer() {
    return firstOfMode(false) != null;
}

// 查找第一個 isData 模式的未匹配節點
private Node firstOfMode(boolean isData) {
    for (Node p = head; p != null; p = succ(p)) {
        if (!p.isMatched())
            return (p.isData == isData) ? p : null;
    }
    return null;
}

參考:

  1. 《JUC源碼分析-集合篇(六):LinkedTransferQueue》:https://www.jianshu.com/p/42ceaed2afe6

天天用心記錄一點點。內容也許不重要,但習慣很重要!

相關文章
相關標籤/搜索