本文首發於一世流雲專欄: https://segmentfault.com/blog...
LinkedTransferQueue
是在JDK1.7時,J.U.C包新增的一種比較特殊的阻塞隊列,它除了具有阻塞隊列的經常使用功能外,還有一個比較特殊的transfer
方法。java
咱們知道,在普通阻塞隊列中,當隊列爲空時,消費者線程(調用take或poll方法的線程)通常會阻塞等待生產者線程往隊列中存入元素。而LinkedTransferQueue的transfer方法則比較特殊:node
能夠看到,LinkedTransferQueue實現了一個名爲TransferQueue
的接口,TransferQueue也是JDK1.7時J.U.C包新增的接口,正是該接口提供了上述的transfer方法:算法
除了transfer方法外,TransferQueue還提供了兩個變種方法:tryTransfer(E e)
、tryTransfer(E e, long timeout, TimeUnit unit)
。segmentfault
tryTransfer(E e)
當生產者線程調用tryTransfer方法時,若是沒有消費者等待接收元素,則會當即返回false。該方法和transfer方法的區別就是tryTransfer方法不管消費者是否接收,方法當即返回,而transfer方法必須等到消費者消費後才返回。併發
tryTransfer(E e, long timeout, TimeUnit unit)
tryTransfer(E e,long timeout,TimeUnit unit)方法則是加上了限時等待功能,若是沒有消費者消費該元素,則等待指定的時間再返回;若是超時還沒消費元素,則返回false,若是在超時時間內消費了元素,則返回true。app
TransferQueue接口定義:
框架
LinkedTransferQueue的特色簡要歸納以下:less
LinkedTransferQueue提供了兩種構造器,也沒有參數設置隊列初始容量,因此是一種無界隊列:dom
/** * 隊列結點定義. */ static final class Node { final boolean isData; // true: 數據結點; false: 請求結點 volatile Object item; // 結點值 volatile Node next; // 後驅結點指針 volatile Thread waiter; // 等待線程 // 設置當前結點的後驅結點爲val final boolean casNext(Node cmp, Node val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // 設置當前結點的值爲val final boolean casItem(Object cmp, Object val) { // assert cmp == null || cmp.getClass() != Node.class; return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } Node(Object item, boolean isData) { UNSAFE.putObject(this, itemOffset, item); // relaxed write this.isData = isData; } // 設置當前結點的後驅結點爲自身 final void forgetNext() { UNSAFE.putObject(this, nextOffset, this); } /** * 設置當前結點的值爲自身. * 設置當前結點的等待線程爲null. */ final void forgetContents() { UNSAFE.putObject(this, itemOffset, this); UNSAFE.putObject(this, waiterOffset, null); } /** * 判斷當前結點是否匹配成功. * Node.item == this || (Node.isData == true && Node.item == null) */ final boolean isMatched() { Object x = item; return (x == this) || ((x == null) == isData); } /** * 判斷是否爲未匹配的請求結點. * Node.isData == false && Node.item == null */ final boolean isUnmatchedRequest() { return !isData && item == null; } /** * 當該結點(havaData)是未匹配結點, 且與當前的結點類型不一樣時, 返回true. */ final boolean cannotPrecede(boolean haveData) { boolean d = isData; Object x; return d != haveData && (x = item) != this && (x != null) == d; } /** * 嘗試匹配數據結點. */ final boolean tryMatchData() { // assert isData; 當前結點必須爲數據結點 Object x = item; if (x != null && x != this && casItem(x, null)) { LockSupport.unpark(waiter); // 喚醒等待線程 return true; } return false; } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; private static final long waiterOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next")); waiterOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiter")); } catch (Exception e) { throw new Error(e); } } }
關於Node結點,有如下幾點須要特別注意:異步
isData
區分,只有不一樣類型的結點才能相互匹配;item
字段,匹配先後值會發生變化;Node結點的狀態變化以下表:
結點/狀態 | 數據結點 | 請求結點 |
---|---|---|
匹配前 | isData = true; item = 數據結點值 | isData = false; item = null |
匹配後 | isData = true; item = null | isData = false; item = this |
從上表也能夠看出,對於一個數據結點,當item == null
表示匹配成功;對於一個請求結點,當item == this
表示匹配成功。概括起來,匹配成功的結點Node就是知足(Node.item == this) || ((Node.item == null) == Node.isData)
。
LinkedTransferQueue內部的其他字段定義以下,主要就是經過Unsafe類操做字段值,內部定義了不少常量字段,好比自旋,這些都是爲了非阻塞算法的鎖優化而定義的:
public class LinkedTransferQueue<E> extends AbstractQueue<E> implements TransferQueue<E>, java.io.Serializable { /** * True若是是多核CPU */ private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1; /** * 線程自旋次數(僅多核CPU時用到). */ private static final int FRONT_SPINS = 1 << 7; /** * 線程自旋次數(僅多核CPU時用到). */ private static final int CHAINED_SPINS = FRONT_SPINS >>> 1; /** * The maximum number of estimated removal failures (sweepVotes) * to tolerate before sweeping through the queue unlinking * cancelled nodes that were not unlinked upon initial * removal. See above for explanation. The value must be at least * two to avoid useless sweeps when removing trailing nodes. */ static final int SWEEP_THRESHOLD = 32; /** * 隊首結點指針. */ transient volatile Node head; /** * 隊尾結點指針. */ private transient volatile Node tail; /** * The number of apparent failures to unsplice removed nodes */ private transient volatile int sweepVotes; // CAS設置隊尾tail指針爲val private boolean casTail(Node cmp, Node val) { return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val); } // CAS設置隊首head指針爲val private boolean casHead(Node cmp, Node val) { return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val); } private boolean casSweepVotes(int cmp, int val) { return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val); } /* * xfer方法的入參, 不一樣類型的方法內部調用xfer方法時入參不一樣. */ private static final int NOW = 0; // for untimed poll, tryTransfer private static final int ASYNC = 1; // for offer, put, add private static final int SYNC = 2; // for transfer, take private static final int TIMED = 3; // for timed poll, tryTransfer // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; private static final long tailOffset; private static final long sweepVotesOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = LinkedTransferQueue.class; headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head")); tailOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("tail")); sweepVotesOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("sweepVotes")); } catch (Exception e) { throw new Error(e); } } //... }
上述比較重要的就是4個常量值的定義:
/* * xfer方法的入參, 不一樣類型的方法內部調用xfer方法時入參不一樣. */ private static final int NOW = 0; // for untimed poll, tryTransfer private static final int ASYNC = 1; // for offer, put, add private static final int SYNC = 2; // for transfer, take private static final int TIMED = 3; // for timed poll, tryTransfer
這四個常量值,做爲xfer
方法的入參,用於標識不一樣操做類型。其實從常量的命名也能夠看出它們對應的操做含義:
NOW表示即時操做(可能失敗),即不會阻塞調用線程:
poll(獲取並移除隊首元素,若是隊列爲空,直接返回null);tryTransfer(嘗試將元素傳遞給消費者,若是沒有等待的消費者,則當即返回false,也不會將元素入隊)
ASYNC表示異步操做(必然成功):
offer(插入指定元素至隊尾,因爲是無界隊列,因此會當即返回true);put(插入指定元素至隊尾,因爲是無界隊列,因此會當即返回);add(插入指定元素至隊尾,因爲是無界隊列,因此會當即返回true)
SYNC表示同步操做(阻塞調用線程):
transfer(阻塞直到出現一個消費者線程);take(從隊首移除一個元素,若是隊列爲空,則阻塞線程)
TIMED表示限時同步操做(限時阻塞調用線程):
poll(long timeout, TimeUnit unit);tryTransfer(E e, long timeout, TimeUnit unit)
關於xfer
方法,它是LinkedTransferQueued的核心內部方法,咱們後面會詳細介紹。
transfer
方法,用於將指定元素e傳遞給消費者線程(調用take/poll方法)。若是有消費者線程正在阻塞等待,則調用transfer方法的線程會直接將元素傳遞給它;若是沒有消費者線程等待獲取元素,則調用transfer方法的線程會將元素插入到隊尾,而後阻塞等待,直到出現一個消費者線程獲取元素:
/** * 將指定元素e傳遞給消費者線程(調用take/poll方法). */ public void transfer(E e) throws InterruptedException { if (xfer(e, true, SYNC, 0) != null) { // 進入到此處, 說明調用線程被中斷了 Thread.interrupted(); // 清除中斷狀態, 而後拋出中斷異常 throw new InterruptedException(); } }
transfer方法的內部實際是調用了xfer方法,入參爲SYNC=2
:
/** * 入隊/出隊元素的真正實現. * * @param e 入隊操做, e非null; 出隊操做, e爲null * @param haveData true表示入隊元素, false表示出隊元素 * @param how NOW, ASYNC, SYNC, TIMED 四種常量定義 * @param nanos 限時模式下使用(納秒) * @return 匹配成功則返回匹配的元素, 不然返回e自己 */ private E xfer(E e, boolean haveData, int how, long nanos) { if (haveData && (e == null)) // 入隊操做, 元素e不能爲null throw new NullPointerException(); Node s = null; retry: for (; ; ) { for (Node h = head, p = h; p != null; ) { // 嘗試匹配p指向的結點 boolean isData = p.isData; // 結點類型 Object item = p.item; // 結點值 if (item != p && (item != null) == isData) { // 若是結點還未匹配過 if (isData == haveData) // 同種類型結點不能匹配 break; if (p.casItem(item, e)) { // p指向從隊首開始向後的第一個匹配結點 for (Node q = p; q != h; ) { Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } LockSupport.unpark(p.waiter); // 喚醒匹配結點上的等待線程 return LinkedTransferQueue.<E>cast(item); // 返回匹配結點的值 } } Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist } if (how != NOW) { if (s == null) s = new Node(e, haveData); // 建立一個入隊結點, 添加到隊尾 Node pred = tryAppend(s, haveData); // pred指向s的前驅結點或s(隊列中只有一個結點)或null(tryAppend失敗) if (pred == null) continue retry; // 入隊失敗,則重試 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); // 等待出隊線程 } return e; } }
咱們經過示例看下xfer方法到底作了哪些事:
①隊列初始狀態
②ThreadA線程調用transfer入隊元素「9」
注意,此時入隊一個數據結點,且隊列爲空,因此會直接進入xfer中的下述代碼:
if (how != NOW) { if (s == null) s = new Node(e, haveData); // 建立一個入隊結點, 添加到隊尾 Node pred = tryAppend(s, haveData); // pred指向s的前驅結點或s(隊列中只有一個結點)或null(tryAppend失敗) if (pred == null) continue retry; // 入隊失敗,則重試 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); // 等待出隊線程 }
上述代碼會插入一個結點至隊尾,而後線程進入阻塞,等待一個出隊線程(消費者)的到來。
隊尾插入結點的方法是tryAppend
,因爲此時隊列爲空,會進入CASE1分支,設置隊首指針head指向新結點,tryAppend方法的返回值有三種狀況:
/** * 嘗試將結點s添加到隊尾. * * @param s 待添加的結點 * @param haveData true: 數據結點 * @return 返回null表示失敗; 不然返回s的前驅結點(沒有前驅則返回s自身) */ private Node tryAppend(Node s, boolean haveData) { for (Node t = tail, p = t; ; ) { Node n, u; if (p == null && (p = head) == null) { // CASE1: 隊列爲空 if (casHead(null, s)) // 設置隊首指針head return s; } else if (p.cannotPrecede(haveData)) // CASE2: 結點s不能連接到結點p return null; else if ((n = p.next) != null) // CASE3: 遍歷至隊尾結點 p = p != t && t != (u = tail) ? (t = u) : // stale tail (p != n) ? n : null; // restart if off list else if (!p.casNext(null, s)) // CASE4: 插入結點s p = p.next; // re-read on CAS failure else { // CASE5: 嘗試進行鬆弛操做 if (p != t) { // update if slack now >= 2 while ((tail != t || !casTail(t, s)) && (t = tail) != null && (s = t.next) != null && // advance and retry (s = s.next) != null && s != t) ; } return p; } } }
等待出隊線程方法awaitMatch
,該方法核心做用就是進行結點匹配:
從awaitMatch方法其實能夠看到一種經典的「鎖優化」思路,就是 自旋 -> yield -> 阻塞,線程不會當即進入阻塞,由於線程上下文切換的開銷每每比較大,因此會先自旋必定次數,中途可能伴隨隨機的yield操做,讓出cpu時間片,若是自旋次數用完後,仍是沒有匹配線程出現,再真正阻塞線程。
通過上述步驟,ThreadA最終會進入CASE4分支中等待,此時的隊列結構以下:
注意,此時的隊列中tail隊尾指針並不指向結點「9」,這是一種「鬆弛」策略,後面會講到。
③ThreadB線程調用transfer入隊元素「2」
因爲此時隊首head指針不爲null,因此會進入transfer方法中的如下循環:
for (Node h = head, p = h; p != null; ) { boolean isData = p.isData; // 結點類型 Object item = p.item; // 結點值 if (item != p && (item != null) == isData) { // 若是結點還未匹配過 if (isData == haveData) // 同種類型結點不能匹配 break; if (p.casItem(item, e)) { // match for (Node q = p; q != h; ) { Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } LockSupport.unpark(p.waiter); return LinkedTransferQueue.<E>cast(item); } } Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist }
上述方法會讀取隊首結點,判斷該結點有沒被匹配過(item != p && (item != null) == isData
):
p.next
);isData == haveData
)就匹配失敗,跳出循環,不然進行匹配操做。顯然,目前隊首結點是「數據結點」,ThreadB線程的入隊結點也是「數據結點」,結點類型一致,因此匹配失敗,直接跳過循環,也進入如下代碼塊:
if (how != NOW) { if (s == null) s = new Node(e, haveData); // 建立一個入隊結點, 添加到隊尾 Node pred = tryAppend(s, haveData); // pred指向s的前驅結點或s(隊列中只有一個結點)或null(tryAppend失敗) if (pred == null) continue retry; // 入隊失敗,則重試 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); // 等待出隊線程 }
再次調用tryAppend方法, 會在CASE4分支中將元素「2」插入隊尾,而後在CASE5分支中從新設置隊尾指針tail
:
/** * 嘗試將結點s添加到隊尾. * * @param s 待添加的結點 * @param haveData true: 數據結點 * @return 返回null表示失敗; 不然返回s的前驅結點(沒有前驅則返回s自身) */ private Node tryAppend(Node s, boolean haveData) { for (Node t = tail, p = t; ; ) { Node n, u; if (p == null && (p = head) == null) { // CASE1: 隊列爲空 if (casHead(null, s)) // 設置隊首指針head return s; } else if (p.cannotPrecede(haveData)) // CASE2: 結點s不能連接到結點p return null; else if ((n = p.next) != null) // CASE3: 遍歷至隊尾結點 p = p != t && t != (u = tail) ? (t = u) : // stale tail (p != n) ? n : null; // restart if off list else if (!p.casNext(null, s)) // CASE4: 插入結點s p = p.next; // re-read on CAS failure else { // CASE5: 嘗試進行鬆弛操做 if (p != t) { // update if slack now >= 2 while ((tail != t || !casTail(t, s)) && (t = tail) != null && (s = t.next) != null && // advance and retry (s = s.next) != null && s != t) ; } return p; } } }
此時隊列結構以下:
最終,ThreadB也會在awaitMatch方法中進入阻塞,最終隊列結構以下:
④ThreadC線程調用transfer入隊元素「93」
過程和前幾步幾乎相同,再也不贅述,最終隊列結構以下:
能夠看到,隊尾指針tail
的設置實際是滯後的,這是一種「鬆弛」策略,用以提高無鎖算法併發修改過程當中的性能。
再來看下消費者線程調用的take
方法,該方法會從隊首取出一個元素,若是隊列爲空,則線程會阻塞:
/** * 從隊首出隊一個元素. */ public E take() throws InterruptedException { E e = xfer(null, false, SYNC, 0); // (e == null && isData=false)表示一個請求結點 if (e != null) // 若是e!=null, 則表示匹配成功, 此時e爲與之匹配的數據結點的值 return e; Thread.interrupted(); throw new InterruptedException(); }
內部依然調用了xfer方法,不過此時入參有所不一樣,因爲是消費線程調用,因此入參e == null && hasData == false
,表示一個「請求結點」:
/** * 入隊/出隊元素的真正實現. * * @param e 入隊操做, e非null; 出隊操做, e爲null * @param haveData true表示入隊元素, false表示出隊元素 * @param how NOW, ASYNC, SYNC, TIMED 四種常量定義 * @param nanos 限時模式下使用(納秒) * @return 匹配成功則返回匹配的元素, 不然返回e自己 */ private E xfer(E e, boolean haveData, int how, long nanos) { if (haveData && (e == null)) // 入隊操做, 元素e不能爲null throw new NullPointerException(); Node s = null; retry: for (; ; ) { for (Node h = head, p = h; p != null; ) { // 嘗試匹配p指向的結點 boolean isData = p.isData; // 結點類型 Object item = p.item; // 結點值 if (item != p && (item != null) == isData) { // 若是結點還未匹配過 if (isData == haveData) // 同種類型結點不能匹配 break; if (p.casItem(item, e)) { // p指向從隊首開始向後的第一個匹配結點 for (Node q = p; q != h; ) { Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } LockSupport.unpark(p.waiter); // 喚醒匹配結點上的等待線程 return LinkedTransferQueue.<E>cast(item); // 返回匹配結點的值 } } Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist } if (how != NOW) { if (s == null) s = new Node(e, haveData); // 建立一個入隊結點, 添加到隊尾 Node pred = tryAppend(s, haveData); // pred指向s的前驅結點或s(隊列中只有一個結點)或null(tryAppend失敗) if (pred == null) continue retry; // 入隊失敗,則重試 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); // 等待出隊線程 } return e; } }
仍是經過示例看:
①隊列初始狀態
②ThreadD調用take方法,消費元素
此時,在xfer方法中,會從隊首開始,向後找到第一個匹配結點,並交換元素值,而後喚醒隊列中匹配結點上的等待線程:
/** * 入隊/出隊元素的真正實現. * * @param e 入隊操做, e非null; 出隊操做, e爲null * @param haveData true表示入隊元素, false表示出隊元素 * @param how NOW, ASYNC, SYNC, TIMED 四種常量定義 * @param nanos 限時模式下使用(納秒) * @return 匹配成功則返回匹配的元素, 不然返回e自己 */ private E xfer(E e, boolean haveData, int how, long nanos) { if (haveData && (e == null)) // 入隊操做, 元素e不能爲null throw new NullPointerException(); Node s = null; retry: for (; ; ) { for (Node h = head, p = h; p != null; ) { // 嘗試匹配p指向的結點 boolean isData = p.isData; // 結點類型 Object item = p.item; // 結點值 if (item != p && (item != null) == isData) { // 若是結點還未匹配過 if (isData == haveData) // 同種類型結點不能匹配 break; if (p.casItem(item, e)) { // p指向從隊首開始向後的第一個匹配結點 for (Node q = p; q != h; ) { Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } LockSupport.unpark(p.waiter); // 喚醒匹配結點上的等待線程 return LinkedTransferQueue.<E>cast(item); // 返回匹配結點的值 } } Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist } if (how != NOW) { if (s == null) s = new Node(e, haveData); // 建立一個入隊結點, 添加到隊尾 Node pred = tryAppend(s, haveData); // pred指向s的前驅結點或s(隊列中只有一個結點)或null(tryAppend失敗) if (pred == null) continue retry; // 入隊失敗,則重試 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); // 等待出隊線程 } return e; } }
最終隊列結構以下,匹配結點的值被置換爲null,ThreadA被喚醒,ThreadD拿到匹配結點上的元素值「9」並返回:
③ThreadA被喚醒後繼續執行
ThreadA被喚醒後,從原阻塞處——繼續向下執行,而後進入下一次自旋,進入CASE1分支:
/** * 自旋/yield/阻塞,直到結點s被匹配. * * @param s 等待被匹配的結點s * @param pred s的前驅結點或s自身(隊列中只有一個結點的狀況) * @param e 結點s的值 * @return 匹配值, 或e自己(中斷或超時狀況) */ private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { final long deadline = timed ? System.nanoTime() + nanos : 0L; // 限時等待狀況下使用 Thread w = Thread.currentThread(); int spins = -1; // 自旋次數, 鎖優化操做 ThreadLocalRandom randomYields = null; // bound if needed for (; ; ) { Object item = s.item; if (item != e) { // CASE1: 匹配成功 // assert item != s; s.forgetContents(); // avoid garbage return LinkedTransferQueue.<E>cast(item); } if ((w.isInterrupted() || (timed && nanos <= 0)) && s.casItem(e, s)) { // CASE2: 取消(線程被中斷或超時) unsplice(pred, s); return e; } // CASE3: 設置輕量級鎖(自旋 -> yield) if (spins < 0) { // 初始化自旋次數 if ((spins = spinsFor(pred, s.isData)) > 0) randomYields = ThreadLocalRandom.current(); } else if (spins > 0) { // 自選次數減1 --spins; if (randomYields.nextInt(CHAINED_SPINS) == 0) Thread.yield(); // 隨機yield線程 } else if (s.waiter == null) { // waiter保存待阻塞線程 s.waiter = w; } else if (timed) { // 限時等待狀況, 計算剩餘有效時間 nanos = deadline - System.nanoTime(); if (nanos > 0L) LockSupport.parkNanos(this, nanos); } else { // CASE4: 阻塞線程 LockSupport.park(this); } } }
在CASE1分支中,因爲結點的item項已經被替換成了null,因此調用s.forgetContents()
,並返回null
/** * 設置當前結點的值爲自身. * 設置當前結點的等待線程爲null. */ final void forgetContents() { UNSAFE.putObject(this, itemOffset, this); UNSAFE.putObject(this, waiterOffset, null); }
最終隊列結構以下:
④ThreadE調用take方法出隊元素
ThreadE調用take方法出隊元素,過程和步驟②相同,進入xfer方法(e == null,hasData == false
),因爲head指針指向的元素已經匹配過了,因此
向後繼續查找,找到第一個未匹配過的結點「2」,而後置換結點「2」中的元素值爲null,喚醒線程ThreadB,返回匹配結點的元素值「2」:
for (Node h = head, p = h; p != null; ) { // 嘗試匹配p指向的結點 boolean isData = p.isData; // 結點類型 Object item = p.item; // 結點值 if (item != p && (item != null) == isData) { // 若是結點還未匹配過 if (isData == haveData) // 同種類型結點不能匹配 break; if (p.casItem(item, e)) { // p指向從隊首開始向後的第一個匹配結點 for (Node q = p; q != h; ) { Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } LockSupport.unpark(p.waiter); // 喚醒匹配結點上的等待線程 return LinkedTransferQueue.<E>cast(item); // 返回匹配結點的值 } } Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist }
此時隊列狀態以下,能夠看到,隊首指針head一次性向後跳了2個位置,原來已經匹配過的元素的next指針指向自身,等待被GC回收,這其實就是LinkedTransferQueue的「鬆弛」策略:
⑤ThreadB被喚醒後繼續執行
過程和步驟③徹底相同,在awaitMatch方法中,將結點的item置爲this,而後返回匹配結點值——null,最終隊列結構以下:
⑥ThreadF調用take方法出隊元素
ThreadF調用take方法出隊元素,過程和步驟②相同,進入xfer方法(e == null,hasData == false
),因爲head指針指向的元素此時沒有匹配,因此不用像步驟②那樣向後查找,而是直接置換匹配結點的元素值「93」,而後喚醒ThreadC,返回匹配值「93」。最終隊列結構以下:
⑦ThreadC被喚醒後繼續執行
過程和步驟③徹底相同,在awaitMatch方法中,將結點的item置爲this,而後返回匹配結點值——null,最終隊列結構以下:
此時的隊列結構,讀者移必定感到很是奇怪,並不嚴格遵照隊列的定義,這其實就是「Dual Queue」算法的實現,爲了對自旋優化,作了不少看似彆扭的操做,沒必要奇怪。
假設此時再有一個線程ThreadH調用take方法出隊元素會怎麼樣?其實這是隊列已經空了,ThreadH會被阻塞,可是會建立一個「請求結點」入隊:
/** * 嘗試將結點s添加到隊尾. * * @param s 待添加的結點 * @param haveData true: 數據結點 * @return 返回null表示失敗; 不然返回s的前驅結點(沒有前驅則返回s自身) */ private Node tryAppend(Node s, boolean haveData) { for (Node t = tail, p = t; ; ) { Node n, u; if (p == null && (p = head) == null) { // CASE1: 隊列爲空 if (casHead(null, s)) // 設置隊首指針head return s; } else if (p.cannotPrecede(haveData)) // CASE2: 結點s不能連接到結點p return null; else if ((n = p.next) != null) // CASE3: 遍歷至隊尾結點 p = p != t && t != (u = tail) ? (t = u) : // stale tail (p != n) ? n : null; // restart if off list else if (!p.casNext(null, s)) // CASE4: 插入結點s p = p.next; // re-read on CAS failure else { // CASE5: 嘗試進行鬆弛操做 if (p != t) { // update if slack now >= 2 while ((tail != t || !casTail(t, s)) && (t = tail) != null && (s = t.next) != null && // advance and retry (s = s.next) != null && s != t) ; } return p; } } }
調用完tryAppend方法後,隊列結構以下,橙色的爲「請求結點」—— item==null && isData==false
:
而後ThreadH也會進入在awaitMatch方法後進入阻塞,並等待一個入隊線程的到來。最終隊列結構以下:
截止本篇爲止,咱們已經學習完了juc-collection框架中的全部阻塞隊列,以下表所示:
隊列特性 | 有界隊列 | 近似無界隊列 | 無界隊列 | 特殊隊列 |
---|---|---|---|---|
有鎖算法 | ArrayBlockingQueue | LinkedBlockingQueue、LinkedBlockingDeque | / | PriorityBlockingQueue、DelayQueue |
無鎖算法 | / | / | LinkedTransferQueue | SynchronousQueue |
能夠看到,LinkedTransferQueue其實兼具了SynchronousQueue的特性以及無鎖算法的性能,而且是一種無界隊列:
另外,因爲LinkedTransferQueue能夠存放兩種不一樣類型的結點,因此稱之爲「Dual Queue」:
內部Node結點定義了一個 boolean 型字段——isData
,表示該結點是「數據結點」仍是「請求結點」。
爲了節省 CAS 操做的開銷,LinkedTransferQueue使用了 鬆弛(slack)操做: 在結點被匹配(被刪除)以後,不會當即更新隊列的head、tail,而是當 head、tail結點與最近一個未匹配的結點之間的距離超過「鬆弛閥值」後纔會更新(默認爲 2)。這個「鬆弛閥值」通常爲1到3,若是太大會增長沿鏈表查找未匹配結點的時間,過小會增長 CAS 的開銷。