Java多線程進階(三八)—— J.U.C之collections框架:LinkedTransferQueue

圖片描述

本文首發於一世流雲專欄: https://segmentfault.com/blog...

1、LinkedTransferQueue簡介

LinkedTransferQueue是在JDK1.7時,J.U.C包新增的一種比較特殊的阻塞隊列,它除了具有阻塞隊列的經常使用功能外,還有一個比較特殊的transfer方法。java

咱們知道,在普通阻塞隊列中,當隊列爲空時,消費者線程(調用takepoll方法的線程)通常會阻塞等待生產者線程往隊列中存入元素。而LinkedTransferQueuetransfer方法則比較特殊:node

  1. 當有消費者線程阻塞等待時,調用transfer方法的生產者線程不會將元素存入隊列,而是直接將元素傳遞給消費者;
  2. 若是調用transfer方法的生產者線程發現沒有正在等待的消費者線程,則會將元素入隊,而後會阻塞等待,直到有一個消費者線程來獲取該元素。

clipboard.png

TransferQueue接口

能夠看到,LinkedTransferQueue實現了一個名爲TransferQueue的接口,TransferQueue也是JDK1.7時J.U.C包新增的接口,正是該接口提供了上述的transfer方法:算法

clipboard.png

除了transfer方法外,TransferQueue還提供了兩個變種方法:tryTransfer(E e)tryTransfer(E e, long timeout, TimeUnit unit)segmentfault

tryTransfer(E e)
當生產者線程調用tryTransfer方法時,若是沒有消費者等待接收元素,則會當即返回false。該方法和transfer方法的區別就是tryTransfer方法不管消費者是否接收,方法當即返回,而transfer方法必須等到消費者消費後才返回。併發

tryTransfer(E e, long timeout, TimeUnit unit)
tryTransfer(E e,long timeout,TimeUnit unit)方法則是加上了限時等待功能,若是沒有消費者消費該元素,則等待指定的時間再返回;若是超時還沒消費元素,則返回false,若是在超時時間內消費了元素,則返回true。app

TransferQueue接口定義:
clipboard.png框架

LinkedTransferQueue的特色簡要歸納以下:less

  1. LinkedTransferQueue是一種無界阻塞隊列,底層基於單鏈表實現;
  2. LinkedTransferQueue中的結點有兩種類型:數據結點、請求結點;
  3. LinkedTransferQueue基於無鎖算法實現。

2、LinkedTransferQueue原理

內部結構

LinkedTransferQueue提供了兩種構造器,也沒有參數設置隊列初始容量,因此是一種無界隊列dom

/**
 * 隊列結點定義.
 */
static final class Node {
    final boolean isData;   // true: 數據結點; false: 請求結點
    volatile Object item;   // 結點值
    volatile Node next;     // 後驅結點指針
    volatile Thread waiter; // 等待線程

    // 設置當前結點的後驅結點爲val
    final boolean casNext(Node cmp, Node val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    // 設置當前結點的值爲val
    final boolean casItem(Object cmp, Object val) {
        // assert cmp == null || cmp.getClass() != Node.class;
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    Node(Object item, boolean isData) {
        UNSAFE.putObject(this, itemOffset, item); // relaxed write
        this.isData = isData;
    }

    // 設置當前結點的後驅結點爲自身
    final void forgetNext() {
        UNSAFE.putObject(this, nextOffset, this);
    }

    /**
     * 設置當前結點的值爲自身.
     * 設置當前結點的等待線程爲null.
     */
    final void forgetContents() {
        UNSAFE.putObject(this, itemOffset, this);
        UNSAFE.putObject(this, waiterOffset, null);
    }

    /**
     * 判斷當前結點是否匹配成功.
     * Node.item == this || (Node.isData == true && Node.item == null)
     */
    final boolean isMatched() {
        Object x = item;
        return (x == this) || ((x == null) == isData);
    }

    /**
     * 判斷是否爲未匹配的請求結點.
     * Node.isData == false && Node.item == null
     */
    final boolean isUnmatchedRequest() {
        return !isData && item == null;
    }

    /**
     * 當該結點(havaData)是未匹配結點, 且與當前的結點類型不一樣時, 返回true.
     */
    final boolean cannotPrecede(boolean haveData) {
        boolean d = isData;
        Object x;
        return d != haveData && (x = item) != this && (x != null) == d;
    }

    /**
     * 嘗試匹配數據結點.
     */
    final boolean tryMatchData() {
        // assert isData;   當前結點必須爲數據結點
        Object x = item;
        if (x != null && x != this && casItem(x, null)) {
            LockSupport.unpark(waiter);     // 喚醒等待線程
            return true;
        }
        return false;
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;
    private static final long waiterOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
            waiterOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiter"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

關於Node結點,有如下幾點須要特別注意:異步

  1. Node結點有兩種類型:數據結點、請求結點,經過字段isData區分,只有不一樣類型的結點才能相互匹配;
  2. Node結點的值保存在item字段,匹配先後值會發生變化;

Node結點的狀態變化以下表:

結點/狀態 數據結點 請求結點
匹配前 isData = true; item = 數據結點值 isData = false; item = null
匹配後 isData = true; item = null isData = false; item = this
從上表也能夠看出,對於一個數據結點,當 item == null表示匹配成功;對於一個請求結點,當 item == this表示匹配成功。概括起來,匹配成功的結點Node就是知足 (Node.item == this) || ((Node.item == null) == Node.isData)

LinkedTransferQueue內部的其他字段定義以下,主要就是經過Unsafe類操做字段值,內部定義了不少常量字段,好比自旋,這些都是爲了非阻塞算法的鎖優化而定義的:

public class LinkedTransferQueue<E> extends AbstractQueue<E>
    implements TransferQueue<E>, java.io.Serializable {

    /**
     * True若是是多核CPU
     */
    private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1;

    /**
     * 線程自旋次數(僅多核CPU時用到).
     */
    private static final int FRONT_SPINS = 1 << 7;

    /**
     * 線程自旋次數(僅多核CPU時用到).
     */
    private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;

    /**
     * The maximum number of estimated removal failures (sweepVotes)
     * to tolerate before sweeping through the queue unlinking
     * cancelled nodes that were not unlinked upon initial
     * removal. See above for explanation. The value must be at least
     * two to avoid useless sweeps when removing trailing nodes.
     */
    static final int SWEEP_THRESHOLD = 32;

    /**
     * 隊首結點指針.
     */
    transient volatile Node head;

    /**
     * 隊尾結點指針.
     */
    private transient volatile Node tail;

    /**
     * The number of apparent failures to unsplice removed nodes
     */
    private transient volatile int sweepVotes;

    // CAS設置隊尾tail指針爲val
    private boolean casTail(Node cmp, Node val) {
        return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
    }

    // CAS設置隊首head指針爲val
    private boolean casHead(Node cmp, Node val) {
        return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
    }

    private boolean casSweepVotes(int cmp, int val) {
        return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val);
    }

    /*
     * xfer方法的入參, 不一樣類型的方法內部調用xfer方法時入參不一樣.
     */
    private static final int NOW = 0;   // for untimed poll, tryTransfer
    private static final int ASYNC = 1; // for offer, put, add
    private static final int SYNC = 2; // for transfer, take
    private static final int TIMED = 3; // for timed poll, tryTransfer

    // Unsafe mechanics

    private static final sun.misc.Unsafe UNSAFE;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long sweepVotesOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = LinkedTransferQueue.class;
            headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head"));
            tailOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("tail"));
            sweepVotesOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("sweepVotes"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

    //...
}

上述比較重要的就是4個常量值的定義:

/*
 * xfer方法的入參, 不一樣類型的方法內部調用xfer方法時入參不一樣.
 */
private static final int NOW = 0;   // for untimed poll, tryTransfer
private static final int ASYNC = 1; // for offer, put, add
private static final int SYNC = 2; // for transfer, take
private static final int TIMED = 3; // for timed poll, tryTransfer

這四個常量值,做爲xfer方法的入參,用於標識不一樣操做類型。其實從常量的命名也能夠看出它們對應的操做含義:

NOW表示即時操做(可能失敗),即不會阻塞調用線程:
poll(獲取並移除隊首元素,若是隊列爲空,直接返回null);tryTransfer(嘗試將元素傳遞給消費者,若是沒有等待的消費者,則當即返回false,也不會將元素入隊)

ASYNC表示異步操做(必然成功):
offer(插入指定元素至隊尾,因爲是無界隊列,因此會當即返回true);put(插入指定元素至隊尾,因爲是無界隊列,因此會當即返回);add(插入指定元素至隊尾,因爲是無界隊列,因此會當即返回true)

SYNC表示同步操做(阻塞調用線程):
transfer(阻塞直到出現一個消費者線程);take(從隊首移除一個元素,若是隊列爲空,則阻塞線程)

TIMED表示限時同步操做(限時阻塞調用線程):
poll(long timeout, TimeUnit unit);tryTransfer(E e, long timeout, TimeUnit unit)

關於xfer方法,它是LinkedTransferQueued的核心內部方法,咱們後面會詳細介紹。


transfer方法

transfer方法,用於將指定元素e傳遞給消費者線程(調用take/poll方法)。若是有消費者線程正在阻塞等待,則調用transfer方法的線程會直接將元素傳遞給它;若是沒有消費者線程等待獲取元素,則調用transfer方法的線程會將元素插入到隊尾,而後阻塞等待,直到出現一個消費者線程獲取元素:

/**
 * 將指定元素e傳遞給消費者線程(調用take/poll方法).
 */
public void transfer(E e) throws InterruptedException {
    if (xfer(e, true, SYNC, 0) != null) {
        // 進入到此處, 說明調用線程被中斷了
        Thread.interrupted();       // 清除中斷狀態, 而後拋出中斷異常
        throw new InterruptedException();
    }
}

transfer方法的內部實際是調用了xfer方法,入參爲SYNC=2

/**
 * 入隊/出隊元素的真正實現.
 *
 * @param e        入隊操做, e非null; 出隊操做, e爲null
 * @param haveData true表示入隊元素, false表示出隊元素
 * @param how      NOW, ASYNC, SYNC, TIMED 四種常量定義
 * @param nanos    限時模式下使用(納秒)
 * @return 匹配成功則返回匹配的元素, 不然返回e自己
 */
private E xfer(E e, boolean haveData, int how, long nanos) {
    if (haveData && (e == null))            // 入隊操做, 元素e不能爲null
        throw new NullPointerException();

    Node s = null;

    retry:
    for (; ; ) {
        for (Node h = head, p = h; p != null; ) {               // 嘗試匹配p指向的結點
            boolean isData = p.isData;                          // 結點類型
            Object item = p.item;                               // 結點值
            if (item != p && (item != null) == isData) {        // 若是結點還未匹配過
                if (isData == haveData)                         // 同種類型結點不能匹配
                    break;
                if (p.casItem(item, e)) {                       // p指向從隊首開始向後的第一個匹配結點
                    for (Node q = p; q != h; ) {
                        Node n = q.next;  // update by 2 unless singleton
                        if (head == h && casHead(h, n == null ? q : n)) {
                            h.forgetNext();
                            break;
                        }                 // advance and retry
                        if ((h = head) == null ||
                            (q = h.next) == null || !q.isMatched())
                            break;        // unless slack < 2
                    }
                    LockSupport.unpark(p.waiter);               // 喚醒匹配結點上的等待線程
                    return LinkedTransferQueue.<E>cast(item);   // 返回匹配結點的值
                }
            }
            Node n = p.next;
            p = (p != n) ? n : (h = head);  // Use head if p offlist
        }

        if (how != NOW) {
            if (s == null)
                s = new Node(e, haveData);      // 建立一個入隊結點, 添加到隊尾
            Node pred = tryAppend(s, haveData); // pred指向s的前驅結點或s(隊列中只有一個結點)或null(tryAppend失敗)
            if (pred == null)
                continue retry;                 // 入隊失敗,則重試
            if (how != ASYNC)
                return awaitMatch(s, pred, e, (how == TIMED), nanos);   // 等待出隊線程
        }
        return e;
    }
}

咱們經過示例看下xfer方法到底作了哪些事:

①隊列初始狀態

clipboard.png


②ThreadA線程調用transfer入隊元素「9」

注意,此時入隊一個數據結點,且隊列爲空,因此會直接進入xfer中的下述代碼:

if (how != NOW) {
    if (s == null)
        s = new Node(e, haveData);      // 建立一個入隊結點, 添加到隊尾
    Node pred = tryAppend(s, haveData); // pred指向s的前驅結點或s(隊列中只有一個結點)或null(tryAppend失敗)
    if (pred == null)
        continue retry;                 // 入隊失敗,則重試
    if (how != ASYNC)
        return awaitMatch(s, pred, e, (how == TIMED), nanos);   // 等待出隊線程
}

上述代碼會插入一個結點至隊尾,而後線程進入阻塞,等待一個出隊線程(消費者)的到來。

隊尾插入結點的方法是tryAppend,因爲此時隊列爲空,會進入CASE1分支,設置隊首指針head指向新結點,tryAppend方法的返回值有三種狀況:

  1. 入隊失敗,返回null;
  2. 入隊成功且隊列只有一個結點,返回該結點自身;
  3. 入隊成功且隊列不止一個結點,返回該入隊結點的前驅結點。
/**
 * 嘗試將結點s添加到隊尾.
 *
 * @param s        待添加的結點
 * @param haveData true: 數據結點
 * @return 返回null表示失敗; 不然返回s的前驅結點(沒有前驅則返回s自身)
 */
private Node tryAppend(Node s, boolean haveData) {
    for (Node t = tail, p = t; ; ) {
        Node n, u;
        if (p == null && (p = head) == null) {      // CASE1: 隊列爲空
            if (casHead(null, s))   // 設置隊首指針head
                return s;
        } else if (p.cannotPrecede(haveData))       // CASE2: 結點s不能連接到結點p
            return null;
        else if ((n = p.next) != null)              // CASE3: 遍歷至隊尾結點
            p = p != t && t != (u = tail) ? (t = u) : // stale tail
                (p != n) ? n : null;      // restart if off list
        else if (!p.casNext(null, s))          // CASE4: 插入結點s
            p = p.next;                   // re-read on CAS failure
        else {                                      // CASE5: 嘗試進行鬆弛操做
            if (p != t) {                 // update if slack now >= 2
                while ((tail != t || !casTail(t, s)) &&
                    (t = tail) != null &&
                    (s = t.next) != null && // advance and retry
                    (s = s.next) != null && s != t) ;
            }
            return p;
        }
    }
}

等待出隊線程方法awaitMatch,該方法核心做用就是進行結點匹配:

  1. 匹配成功,返回匹配值;
  2. 匹配失敗(中斷或限時等待的超時狀況),返回原匹配結點的值;
  3. 阻塞線程,等待與之匹配的結點的到來。
從awaitMatch方法其實能夠看到一種經典的「鎖優化」思路,就是 自旋 -> yield -> 阻塞,線程不會當即進入阻塞,由於線程上下文切換的開銷每每比較大,因此會先自旋必定次數,中途可能伴隨隨機的yield操做,讓出cpu時間片,若是自旋次數用完後,仍是沒有匹配線程出現,再真正阻塞線程。

通過上述步驟,ThreadA最終會進入CASE4分支中等待,此時的隊列結構以下:
clipboard.png

注意,此時的隊列中tail隊尾指針並不指向結點「9」,這是一種「鬆弛」策略,後面會講到。

③ThreadB線程調用transfer入隊元素「2」

因爲此時隊首head指針不爲null,因此會進入transfer方法中的如下循環:

for (Node h = head, p = h; p != null; ) {
    boolean isData = p.isData;                          // 結點類型
    Object item = p.item;                               // 結點值
    if (item != p && (item != null) == isData) {        // 若是結點還未匹配過
        if (isData == haveData)                         // 同種類型結點不能匹配
            break;
        if (p.casItem(item, e)) { // match
            for (Node q = p; q != h; ) {
                Node n = q.next;  // update by 2 unless singleton
                if (head == h && casHead(h, n == null ? q : n)) {
                    h.forgetNext();
                    break;
                }                 // advance and retry
                if ((h = head) == null ||
                    (q = h.next) == null || !q.isMatched())
                    break;        // unless slack < 2
            }
            LockSupport.unpark(p.waiter);
            return LinkedTransferQueue.<E>cast(item);
        }
    }
    Node n = p.next;
    p = (p != n) ? n : (h = head);  // Use head if p offlist
}

上述方法會讀取隊首結點,判斷該結點有沒被匹配過(item != p && (item != null) == isData):

  1. 若是已經被其它線程匹配過了,則繼續判斷下一個結點(p.next);
  2. 若是尚未被匹配,則判斷下當前的入隊結點類型是否和隊首中的一致;若是一致(isData == haveData)就匹配失敗,跳出循環,不然進行匹配操做。

顯然,目前隊首結點是「數據結點」,ThreadB線程的入隊結點也是「數據結點」,結點類型一致,因此匹配失敗,直接跳過循環,也進入如下代碼塊:

if (how != NOW) {
    if (s == null)
        s = new Node(e, haveData);      // 建立一個入隊結點, 添加到隊尾
    Node pred = tryAppend(s, haveData); // pred指向s的前驅結點或s(隊列中只有一個結點)或null(tryAppend失敗)
    if (pred == null)
        continue retry;                 // 入隊失敗,則重試
    if (how != ASYNC)
        return awaitMatch(s, pred, e, (how == TIMED), nanos);   // 等待出隊線程
}

再次調用tryAppend方法, 會在CASE4分支中將元素「2」插入隊尾,而後在CASE5分支中從新設置隊尾指針tail

/**
 * 嘗試將結點s添加到隊尾.
 *
 * @param s        待添加的結點
 * @param haveData true: 數據結點
 * @return 返回null表示失敗; 不然返回s的前驅結點(沒有前驅則返回s自身)
 */
private Node tryAppend(Node s, boolean haveData) {
    for (Node t = tail, p = t; ; ) {
        Node n, u;
        if (p == null && (p = head) == null) {      // CASE1: 隊列爲空
            if (casHead(null, s))   // 設置隊首指針head
                return s;
        } else if (p.cannotPrecede(haveData))       // CASE2: 結點s不能連接到結點p
            return null;
        else if ((n = p.next) != null)              // CASE3: 遍歷至隊尾結點
            p = p != t && t != (u = tail) ? (t = u) : // stale tail
                (p != n) ? n : null;      // restart if off list
        else if (!p.casNext(null, s))          // CASE4: 插入結點s
            p = p.next;                   // re-read on CAS failure
        else {                                      // CASE5: 嘗試進行鬆弛操做
            if (p != t) {                 // update if slack now >= 2
                while ((tail != t || !casTail(t, s)) &&
                    (t = tail) != null &&
                    (s = t.next) != null && // advance and retry
                    (s = s.next) != null && s != t) ;
            }
            return p;
        }
    }
}

此時隊列結構以下:
clipboard.png

最終,ThreadB也會在awaitMatch方法中進入阻塞,最終隊列結構以下:
clipboard.png


④ThreadC線程調用transfer入隊元素「93」

過程和前幾步幾乎相同,再也不贅述,最終隊列結構以下:
clipboard.png

能夠看到,隊尾指針tail的設置實際是滯後的,這是一種「鬆弛」策略,用以提高無鎖算法併發修改過程當中的性能。


take方法

再來看下消費者線程調用的take方法,該方法會從隊首取出一個元素,若是隊列爲空,則線程會阻塞:

/**
 * 從隊首出隊一個元素.
 */
public E take() throws InterruptedException {
    E e = xfer(null, false, SYNC, 0);   // (e == null && isData=false)表示一個請求結點
    if (e != null)  // 若是e!=null, 則表示匹配成功, 此時e爲與之匹配的數據結點的值
        return e;
    
    Thread.interrupted();
    throw new InterruptedException();
}

內部依然調用了xfer方法,不過此時入參有所不一樣,因爲是消費線程調用,因此入參e == null && hasData == false,表示一個「請求結點」:

/**
 * 入隊/出隊元素的真正實現.
 *
 * @param e        入隊操做, e非null; 出隊操做, e爲null
 * @param haveData true表示入隊元素, false表示出隊元素
 * @param how      NOW, ASYNC, SYNC, TIMED 四種常量定義
 * @param nanos    限時模式下使用(納秒)
 * @return 匹配成功則返回匹配的元素, 不然返回e自己
 */
private E xfer(E e, boolean haveData, int how, long nanos) {
    if (haveData && (e == null))            // 入隊操做, 元素e不能爲null
        throw new NullPointerException();

    Node s = null;

    retry:
    for (; ; ) {
        for (Node h = head, p = h; p != null; ) {               // 嘗試匹配p指向的結點
            boolean isData = p.isData;                          // 結點類型
            Object item = p.item;                               // 結點值
            if (item != p && (item != null) == isData) {        // 若是結點還未匹配過
                if (isData == haveData)                         // 同種類型結點不能匹配
                    break;
                if (p.casItem(item, e)) {                       // p指向從隊首開始向後的第一個匹配結點
                    for (Node q = p; q != h; ) {
                        Node n = q.next;  // update by 2 unless singleton
                        if (head == h && casHead(h, n == null ? q : n)) {
                            h.forgetNext();
                            break;
                        }                 // advance and retry
                        if ((h = head) == null ||
                            (q = h.next) == null || !q.isMatched())
                            break;        // unless slack < 2
                    }
                    LockSupport.unpark(p.waiter);               // 喚醒匹配結點上的等待線程
                    return LinkedTransferQueue.<E>cast(item);   // 返回匹配結點的值
                }
            }
            Node n = p.next;
            p = (p != n) ? n : (h = head);  // Use head if p offlist
        }

        if (how != NOW) {
            if (s == null)
                s = new Node(e, haveData);      // 建立一個入隊結點, 添加到隊尾
            Node pred = tryAppend(s, haveData); // pred指向s的前驅結點或s(隊列中只有一個結點)或null(tryAppend失敗)
            if (pred == null)
                continue retry;                 // 入隊失敗,則重試
            if (how != ASYNC)
                return awaitMatch(s, pred, e, (how == TIMED), nanos);   // 等待出隊線程
        }
        return e;
    }
}

仍是經過示例看:

①隊列初始狀態

clipboard.png


②ThreadD調用take方法,消費元素

此時,在xfer方法中,會從隊首開始,向後找到第一個匹配結點,並交換元素值,而後喚醒隊列中匹配結點上的等待線程:

/**
 * 入隊/出隊元素的真正實現.
 *
 * @param e        入隊操做, e非null; 出隊操做, e爲null
 * @param haveData true表示入隊元素, false表示出隊元素
 * @param how      NOW, ASYNC, SYNC, TIMED 四種常量定義
 * @param nanos    限時模式下使用(納秒)
 * @return 匹配成功則返回匹配的元素, 不然返回e自己
 */
private E xfer(E e, boolean haveData, int how, long nanos) {
    if (haveData && (e == null))            // 入隊操做, 元素e不能爲null
        throw new NullPointerException();

    Node s = null;

    retry:
    for (; ; ) {
        for (Node h = head, p = h; p != null; ) {               // 嘗試匹配p指向的結點
            boolean isData = p.isData;                          // 結點類型
            Object item = p.item;                               // 結點值
            if (item != p && (item != null) == isData) {        // 若是結點還未匹配過
                if (isData == haveData)                         // 同種類型結點不能匹配
                    break;
                if (p.casItem(item, e)) {                       // p指向從隊首開始向後的第一個匹配結點
                    for (Node q = p; q != h; ) {
                        Node n = q.next;  // update by 2 unless singleton
                        if (head == h && casHead(h, n == null ? q : n)) {
                            h.forgetNext();
                            break;
                        }                 // advance and retry
                        if ((h = head) == null ||
                            (q = h.next) == null || !q.isMatched())
                            break;        // unless slack < 2
                    }
                    LockSupport.unpark(p.waiter);               // 喚醒匹配結點上的等待線程
                    return LinkedTransferQueue.<E>cast(item);   // 返回匹配結點的值
                }
            }
            Node n = p.next;
            p = (p != n) ? n : (h = head);  // Use head if p offlist
        }

        if (how != NOW) {
            if (s == null)
                s = new Node(e, haveData);      // 建立一個入隊結點, 添加到隊尾
            Node pred = tryAppend(s, haveData); // pred指向s的前驅結點或s(隊列中只有一個結點)或null(tryAppend失敗)
            if (pred == null)
                continue retry;                 // 入隊失敗,則重試
            if (how != ASYNC)
                return awaitMatch(s, pred, e, (how == TIMED), nanos);   // 等待出隊線程
        }
        return e;
    }
}

最終隊列結構以下,匹配結點的值被置換爲null,ThreadA被喚醒,ThreadD拿到匹配結點上的元素值「9」並返回:

clipboard.png


③ThreadA被喚醒後繼續執行

ThreadA被喚醒後,從原阻塞處——繼續向下執行,而後進入下一次自旋,進入CASE1分支:

/**
 * 自旋/yield/阻塞,直到結點s被匹配.
 *
 * @param s    等待被匹配的結點s
 * @param pred s的前驅結點或s自身(隊列中只有一個結點的狀況)
 * @param e    結點s的值
 * @return 匹配值, 或e自己(中斷或超時狀況)
 */
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;   // 限時等待狀況下使用
    Thread w = Thread.currentThread();
    int spins = -1;                                                 // 自旋次數, 鎖優化操做
    ThreadLocalRandom randomYields = null; // bound if needed

    for (; ; ) {
        Object item = s.item;
        if (item != e) {                    // CASE1: 匹配成功
            // assert item != s;
            s.forgetContents();             // avoid garbage
            return LinkedTransferQueue.<E>cast(item);
        }
        if ((w.isInterrupted() || (timed && nanos <= 0))
            && s.casItem(e, s)) {           // CASE2: 取消(線程被中斷或超時)
            unsplice(pred, s);
            return e;
        }

        // CASE3: 設置輕量級鎖(自旋 -> yield)

        if (spins < 0) {                    // 初始化自旋次數
            if ((spins = spinsFor(pred, s.isData)) > 0)
                randomYields = ThreadLocalRandom.current();
        } else if (spins > 0) {             // 自選次數減1
            --spins;
            if (randomYields.nextInt(CHAINED_SPINS) == 0)
                Thread.yield();             // 隨機yield線程
        } else if (s.waiter == null) {      // waiter保存待阻塞線程
            s.waiter = w;
        } else if (timed) {                 // 限時等待狀況, 計算剩餘有效時間
            nanos = deadline - System.nanoTime();
            if (nanos > 0L)
                LockSupport.parkNanos(this, nanos);
        } else {                            // CASE4: 阻塞線程
            LockSupport.park(this);
        }
    }
}

在CASE1分支中,因爲結點的item項已經被替換成了null,因此調用s.forgetContents(),並返回null

/**
 * 設置當前結點的值爲自身.
 * 設置當前結點的等待線程爲null.
 */
final void forgetContents() {
    UNSAFE.putObject(this, itemOffset, this);
    UNSAFE.putObject(this, waiterOffset, null);
}

最終隊列結構以下:

clipboard.png


④ThreadE調用take方法出隊元素

ThreadE調用take方法出隊元素,過程和步驟②相同,進入xfer方法(e == null,hasData == false),因爲head指針指向的元素已經匹配過了,因此
向後繼續查找,找到第一個未匹配過的結點「2」,而後置換結點「2」中的元素值爲null,喚醒線程ThreadB,返回匹配結點的元素值「2」:

for (Node h = head, p = h; p != null; ) {               // 嘗試匹配p指向的結點
    boolean isData = p.isData;                          // 結點類型
    Object item = p.item;                               // 結點值
    if (item != p && (item != null) == isData) {        // 若是結點還未匹配過
        if (isData == haveData)                         // 同種類型結點不能匹配
            break;
        if (p.casItem(item, e)) {                       // p指向從隊首開始向後的第一個匹配結點
            for (Node q = p; q != h; ) {
                Node n = q.next;  // update by 2 unless singleton
                if (head == h && casHead(h, n == null ? q : n)) {
                    h.forgetNext();
                    break;
                }                 // advance and retry
                if ((h = head) == null ||
                    (q = h.next) == null || !q.isMatched())
                    break;        // unless slack < 2
            }
            LockSupport.unpark(p.waiter);               // 喚醒匹配結點上的等待線程
            return LinkedTransferQueue.<E>cast(item);   // 返回匹配結點的值
        }
    }
    Node n = p.next;
    p = (p != n) ? n : (h = head);  // Use head if p offlist
}

此時隊列狀態以下,能夠看到,隊首指針head一次性向後跳了2個位置,原來已經匹配過的元素的next指針指向自身,等待被GC回收,這其實就是LinkedTransferQueue的「鬆弛」策略:

clipboard.png


⑤ThreadB被喚醒後繼續執行

過程和步驟③徹底相同,在awaitMatch方法中,將結點的item置爲this,而後返回匹配結點值——null,最終隊列結構以下:

clipboard.png


⑥ThreadF調用take方法出隊元素

ThreadF調用take方法出隊元素,過程和步驟②相同,進入xfer方法(e == null,hasData == false),因爲head指針指向的元素此時沒有匹配,因此不用像步驟②那樣向後查找,而是直接置換匹配結點的元素值「93」,而後喚醒ThreadC,返回匹配值「93」。最終隊列結構以下:

clipboard.png


⑦ThreadC被喚醒後繼續執行

過程和步驟③徹底相同,在awaitMatch方法中,將結點的item置爲this,而後返回匹配結點值——null,最終隊列結構以下:

clipboard.png


此時的隊列結構,讀者移必定感到很是奇怪,並不嚴格遵照隊列的定義,這其實就是「Dual Queue」算法的實現,爲了對自旋優化,作了不少看似彆扭的操做,沒必要奇怪。

假設此時再有一個線程ThreadH調用take方法出隊元素會怎麼樣?其實這是隊列已經空了,ThreadH會被阻塞,可是會建立一個「請求結點」入隊:

/**
 * 嘗試將結點s添加到隊尾.
 *
 * @param s        待添加的結點
 * @param haveData true: 數據結點
 * @return 返回null表示失敗; 不然返回s的前驅結點(沒有前驅則返回s自身)
 */
private Node tryAppend(Node s, boolean haveData) {
    for (Node t = tail, p = t; ; ) {
        Node n, u;
        if (p == null && (p = head) == null) {      // CASE1: 隊列爲空
            if (casHead(null, s))   // 設置隊首指針head
                return s;
        } else if (p.cannotPrecede(haveData))       // CASE2: 結點s不能連接到結點p
            return null;
        else if ((n = p.next) != null)              // CASE3: 遍歷至隊尾結點
            p = p != t && t != (u = tail) ? (t = u) : // stale tail
                (p != n) ? n : null;      // restart if off list
        else if (!p.casNext(null, s))          // CASE4: 插入結點s
            p = p.next;                   // re-read on CAS failure
        else {                                      // CASE5: 嘗試進行鬆弛操做
            if (p != t) {                 // update if slack now >= 2
                while ((tail != t || !casTail(t, s)) &&
                    (t = tail) != null &&
                    (s = t.next) != null && // advance and retry
                    (s = s.next) != null && s != t) ;
            }
            return p;
        }
    }
}

調用完tryAppend方法後,隊列結構以下,橙色的爲「請求結點」—— item==null && isData==false

clipboard.png

而後ThreadH也會進入在awaitMatch方法後進入阻塞,並等待一個入隊線程的到來。最終隊列結構以下:

clipboard.png

3、總結

截止本篇爲止,咱們已經學習完了juc-collection框架中的全部阻塞隊列,以下表所示:

隊列特性 有界隊列 近似無界隊列 無界隊列 特殊隊列
有鎖算法 ArrayBlockingQueue LinkedBlockingQueue、LinkedBlockingDeque / PriorityBlockingQueue、DelayQueue
無鎖算法 / / LinkedTransferQueue SynchronousQueue

能夠看到,LinkedTransferQueue其實兼具了SynchronousQueue的特性以及無鎖算法的性能,而且是一種無界隊列:

  1. 和SynchronousQueue相比,LinkedTransferQueue能夠存儲實際的數據;
  2. 和其它阻塞隊列相比,LinkedTransferQueue直接用無鎖算法實現,性能有所提高。

另外,因爲LinkedTransferQueue能夠存放兩種不一樣類型的結點,因此稱之爲「Dual Queue」:
內部Node結點定義了一個 boolean 型字段——isData,表示該結點是「數據結點」仍是「請求結點」。

爲了節省 CAS 操做的開銷,LinkedTransferQueue使用了 鬆弛(slack)操做: 在結點被匹配(被刪除)以後,不會當即更新隊列的head、tail,而是當 head、tail結點與最近一個未匹配的結點之間的距離超過「鬆弛閥值」後纔會更新(默認爲 2)。這個「鬆弛閥值」通常爲1到3,若是太大會增長沿鏈表查找未匹配結點的時間,過小會增長 CAS 的開銷。
相關文章
相關標籤/搜索