LinkedTransferQueue 是一個由鏈表結構組成的無界阻塞 TransferQueue 隊列。
接口 TransferQueue 和實現類 LinkedTransferQueue 從 Java 7 開始加入 J.U.C 之中。java
java.util.concurrent.LinkedTransferQueuenode
public class LinkedTransferQueue<E> extends AbstractQueue<E> implements TransferQueue<E>, java.io.Serializable
LinkedTransferQueue 的數據結構爲鏈表,是一個鬆弛的雙重隊列(Dual Queues with Slack)。算法
雙重隊列指的是鏈表中的節點存在兩種模式:數據節點(提供數據)、請求節點(請求數據)。segmentfault
對於 TransferQueue#transfer:
線程入隊非數據節點時,若是沒有匹配到數據節點則阻塞,直到其餘線程提供數據節點與之匹配。
線程入隊數據節點時,若是沒有匹配到非數據節點則阻塞,直到其餘線程提供非數據節點與之匹配。安全
item == null
與 isData 的值相反。與 SynchronousQueue.TransferQueue.QNode 的定義是同樣的。
節點的匹配狀態由 item 屬性來控制:
對於數據節點,在匹配的時候,把該節點的 item 域從非空數據 CAS 設置爲空;對於非數據節點,則相反。數據結構
static final class Node { final boolean isData; // 是不是數據節點 volatile Object item; // isData爲true時才初始化,匹配時CAS修改該字段。使用Object而不是泛型E,容許將item指向自身 volatile Node next; volatile Thread waiter; // null until waiting // CAS methods for fields final boolean casNext(Node cmp, Node val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } final boolean casItem(Object cmp, Object val) { // assert cmp == null || cmp.getClass() != Node.class; return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } /** * Constructs a new node. Uses relaxed write because item can * only be seen after publication via casNext. */ Node(Object item, boolean isData) { UNSAFE.putObject(this, itemOffset, item); // relaxed write this.isData = isData; } final void forgetNext() { UNSAFE.putObject(this, nextOffset, this); } // 節點被取消或被匹配以後會調用:設置item自鏈接,waiter爲null final void forgetContents() { UNSAFE.putObject(this, itemOffset, this); UNSAFE.putObject(this, waiterOffset, null); } // 是否已匹配(已取消或已匹配:item自鏈接;已匹配:item == null 與 isData 的值相反) final boolean isMatched() { Object x = item; return (x == this) || ((x == null) == isData); } // 是不是一個未匹配的請求節點(!isData 爲請求節點,item 爲空說明未被修改,而一旦被匹配或取消則會修改 item) final boolean isUnmatchedRequest() { return !isData && item == null; } // 若是給定節點不能鏈接在當前節點後則返回true final boolean cannotPrecede(boolean haveData) { boolean d = isData; Object x; // 當前節點未匹配,且數據模式與給定節點相反,則返回true return d != haveData && (x = item) != this && (x != null) == d; } /** * Tries to artificially match a data node -- used by remove. */ final boolean tryMatchData() { // assert isData; Object x = item; if (x != null && x != this && casItem(x, null)) { LockSupport.unpark(waiter); return true; } return false; } private static final long serialVersionUID = -3375979862319811754L; // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; private static final long waiterOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); waiterOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("waiter")); } catch (Exception e) { throw new Error(e); } } }
// 隊列頭節點,第一次添加節點以前爲空 transient volatile Node head; // 隊列尾節點,第一次添加節點以前爲空 private transient volatile Node tail;
LinkedTransferQueue 隊列的鬆弛體如今:
採用無鎖算法來維持鏈表的 head 和 tail 節點的位置,head 和 tail 節點並不嚴格指向鏈表的頭尾節點。app
好比,一個包含 4 個有效節點的隊列結構可能呈現爲如下形式:
head 節點指向一個已匹配(matched)節點,該節點又指向隊列中第一個未匹配(unmatched)節點。
tail 節點指向隊列中最後一個節點。less
head tail | | v v M -> U -> U -> U -> U
因爲隊列中的節點須要維護其匹配狀態,而一旦節點被匹配了,其匹配狀態不會再改變。
所以,能夠在鏈表頭部存放零個或多個已經被匹配的前置節點,在鏈表尾部存放零個或多個還沒有匹配的後置節點。
因爲前置和後置節點都容許爲零,意味着 LinkedTransferQueue 並不使用 dummy node 做爲頭節點。dom
head tail | | v v M -> M -> U -> U -> U -> U
好處是:每次入隊出隊操做,不會當即更新 head/tail,而是當 head/tail 節點和最近一個未匹配的節點之間的距離超過一個「鬆弛閥值」以後纔會更新,能夠節省 CAS 操做的開銷。異步
LinkedTransferQueue 與 ConcurrentLinkedQueue 同樣採用了鬆弛的隊列結構。
默認構造函數爲空,當第一次加入元素時才初始化 head/tail 節點。
/** * Creates an initially empty {@code LinkedTransferQueue}. */ public LinkedTransferQueue() { } /** * Creates a {@code LinkedTransferQueue} * initially containing the elements of the given collection, * added in traversal order of the collection's iterator. * * @param c the collection of elements to initially contain * @throws NullPointerException if the specified collection or any * of its elements are null */ public LinkedTransferQueue(Collection<? extends E> c) { this(); addAll(c); }
與之相比,ConcurrentLinkedQueue 和 SynchronousQueue.TransferQueue 初始化時都會構造空節點(dummy node)。
// java.util.concurrent.ConcurrentLinkedQueue#ConcurrentLinkedQueue() public ConcurrentLinkedQueue() { head = tail = new Node<E>(null); } // java.util.concurrent.SynchronousQueue.TransferQueue#TransferQueue TransferQueue() { QNode h = new QNode(null, false); head = h; tail = h; }
LinkedTransferQueue 中定義了傳遞數據的 4 種方式:
/* * Possible values for "how" argument in xfer method. */ private static final int NOW = 0; // for untimed poll, tryTransfer // 當即返回 private static final int ASYNC = 1; // for offer, put, add // 異步,不會阻塞 private static final int SYNC = 2; // for transfer, take // 同步,阻塞直到匹配 private static final int TIMED = 3; // for timed poll, tryTransfer // 超時,阻塞直到超時
傳遞數據的方法定義:
java.util.concurrent.LinkedTransferQueue#xfer
/** * Implements all queuing methods. See above for explanation. * * @param e the item or null for take // 存入、取出、移交的數據元素 * @param haveData true if this is a put, else a take // 是否具備數據 * @param how NOW, ASYNC, SYNC, or TIMED // 4 種模式 * @param nanos timeout in nanosecs, used only if mode is TIMED // 超時時間 * @return an item if matched, else e * @throws NullPointerException if haveData mode but e is null */ private E xfer(E e, boolean haveData, int how, long nanos)
LinkedTransferQueue 因爲繼承了 BlockingQueue,遵循方法約定:
拋出異常 特殊值 阻塞 超時 插入 add(e) offer(e) put(e) offer(e, time, unit) 移除 remove() poll() take() poll(time, unit) 檢查 element() peek() 不可用 不可用
此外,TransferQueue 新增瞭如下方法:
// 嘗試移交元素,當即返回 boolean tryTransfer(E e); // 嘗試移交元素,阻塞直到成功、超時或中斷 boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException; // 移交元素,阻塞直到成功或中斷 void transfer(E e) throws InterruptedException; // 判斷是否有消費者 boolean hasWaitingConsumer(); // 查看消費者的數量 int getWaitingConsumerCount();
底層都是經過調用 LinkedTransferQueue#xfer 來實現。
入隊: add(e) xfer(e, true, ASYNC, 0) offer(e) xfer(e, true, ASYNC, 0) put(e) xfer(e, true, ASYNC, 0) offer(e, time, unit) xfer(e, true, ASYNC, 0) 出隊: remove() xfer(null, false, NOW, 0) poll() xfer(null, false, NOW, 0) take() xfer(null, false, SYNC, 0) poll(time, unit) xfer(null, false, TIMED, unit.toNanos(timeout)) 移交元素: tryTransfer(e) xfer(e, true, NOW, 0) tryTransfer(e, time, unit) xfer(e, true, TIMED, unit.toNanos(timeout) transfer(e) xfer(e, true, SYNC, 0)
因爲隊列是無界的,入隊方法(add/put/offer)既不會拋異常,也不會阻塞或超時。
/** * Implements all queuing methods. See above for explanation. * * @param e the item or null for take // 存入、取出、移交的數據元素 * @param haveData true if this is a put, else a take // 是否具備數據 * @param how NOW, ASYNC, SYNC, or TIMED // 4 種模式 * @param nanos timeout in nanosecs, used only if mode is TIMED // 超時時間 * @return an item if matched, else e * @throws NullPointerException if haveData mode but e is null */ private E xfer(E e, boolean haveData, int how, long nanos) { if (haveData && (e == null)) throw new NullPointerException(); Node s = null; // the node to append, if needed retry: for (;;) { // restart on append race for (Node h = head, p = h; p != null;) { // find & match first node // 從頭節點開始遍歷,初始時h和p都指向頭節點 boolean isData = p.isData; Object item = p.item; if (item != p && (item != null) == isData) { // unmatched // 節點p還沒有匹配過:item不是p,item是否有值與isData相符 if (isData == haveData) // can't match // 節點p沒法匹配:節點p與入參節點類型相同。此時需跳出本層循環,嘗試入隊 break; if (p.casItem(item, e)) { // match // 節點p匹配成功:item域的值從item變動爲e for (Node q = p; q != h;) { // 若q != h,說明當前匹配的節點p不是頭節點,而是位於頭節點以後。 // 說明隊列頭部具備多於一個的已匹配節點,須要設置新的頭節點,把已匹配的節點出隊 // 循環以節點p爲起始,一直日後遍歷已匹配的節點 Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { // 若是節點h是頭節點,而q是已匹配節點,分爲兩種狀況: // 1. 若q.next爲空,則將q設爲新的頭節點; // 2. 若q.next不爲空,則將q.next設爲新的頭節點(注意此時q還在隊列中,但不可達) h.forgetNext(); // 舊的頭節點h出隊(若h以前還有節點,則h自鏈接表明着以h爲尾節點的舊鏈表將被回收) break; } // advance and retry // 進入這裏,h不是頭節點,說明其餘線程修改過head,須要取最新的head做進一步判斷 // 1. 若是head爲空,或者head.next爲空,或者head.next未匹配,則跳出再也不遍歷head.next了 // 2. 雖然取得最新head,可是head.next是已匹配節點,須要從head.next開始遍歷,從新設置head if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } LockSupport.unpark(p.waiter); // 喚醒p中等待的線程 return LinkedTransferQueue.<E>cast(item); } } // 來到這裏,說明節點p是已匹配節點,沒法與入參節點匹配,須要繼續遍歷p的下一個節點 Node n = p.next; // 若 p != n 則繼續遍歷下一個節點;若 p == n 說明p已經出隊,這種狀況是其餘線程修改了head致使的,須要取新的head從新開始遍歷 p = (p != n) ? n : (h = head); // Use head if p offlist } if (how != NOW) { // No matches available // 來到這裏,說明沒有匹配成功,則按照4種模式的規則入隊。 if (s == null) s = new Node(e, haveData); Node pred = tryAppend(s, haveData); // 嘗試入隊 if (pred == null) continue retry; // lost race vs opposite mode // 入隊失敗,重試 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); // 阻塞等待,直到匹配或超時 } return e; // not waiting } }
基本思想:
代碼流程:
匹配開始時,節點 p 和 h 均處於 head 的位置,假設此時鏈表的快照以下:
head/p/h | v M -> M -> U -> U -> U -> U
使用 p = p.next 遍歷鏈表,當節點 p 匹配成功時,此時隊列以下(U2M 表示節點從未匹配變成已匹配):
h p | | v v M -> M -> U2M -> U -> U -> U
因爲 p != h,此時須要從新設置 head,分爲兩種狀況:
狀況一
若是在此期間沒有其餘線程修改過 head,說明當前狀況以下:
head/h p | | v v M -> M -> M -> U -> U -> U
經過 casHead 將 p.next 設爲新的 head,並對舊的頭節點 h 執行 forgetNext 設爲自鏈接,從原鏈表中斷開。
h p head | | | v v v M -> M M -> U -> U -> U
因爲節點 p 是不可達的,會被 GC 回收,最終已匹配的節點都會從鏈表中清除。
head | v U -> U -> U
狀況二
若是在此期間其餘線程修改過 head,那麼 casHead 以前須要先獲取最新的 head,再判斷是否進一步重設 head。
若是獲取最新的 head 以後,head.next 爲已匹配節點,須要從 head.next 開始從新遍歷再一次設置 head。
在 xfer 方法中,若是在隊列頭部匹配失敗,則會按照 4 中規則從隊列尾部入隊:
java.util.concurrent.LinkedTransferQueue#xfer
if (how != NOW) { // No matches available if (s == null) s = new Node(e, haveData); Node pred = tryAppend(s, haveData); // 嘗試入隊 if (pred == null) // 若入隊失敗,從新從頭部匹配 continue retry; // lost race vs opposite mode if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); }
節點 s 嘗試從隊列尾部入隊的方法:
java.util.concurrent.LinkedTransferQueue#tryAppend
/** * Tries to append node s as tail. // 將節點s做爲尾節點入隊 * * @param s the node to append * @param haveData true if appending in data mode * @return null on failure due to losing race with append in * different mode, else s's predecessor, or s itself if no * predecessor */ private Node tryAppend(Node s, boolean haveData) { for (Node t = tail, p = t;;) { // move p to last node and append Node n, u; // temps for reads of next & tail if (p == null && (p = head) == null) { if (casHead(null, s)) // 隊列爲空,將s做爲頭節點。注意,這裏插入第一個元素的時候tail指針並無指向s return s; // initialize } else if (p.cannotPrecede(haveData)) return null; // lost race vs opposite mode // 節點p以後沒法鏈接節點,返回null(p與s匹配,不須要將s入隊) else if ((n = p.next) != null) // not last; keep traversing // 節點p不是尾節點(由於tail並不嚴格指向尾節點),需繼續遍歷p.next // 若是節點p與t不等,且t不是tail節點(說明其餘線程修改了tail,沒必要遍歷p.next了),則取新的tail賦值給p和t,從新從tail節點開始遍歷 // 不然(嘗試遍歷p.next):1. 若是p與p.next不等,從p.next繼續遍歷;2. 若是p與p.next相等,則設p爲空(說明隊列爲空,後續會將s做爲頭節點) p = p != t && t != (u = tail) ? (t = u) : // stale tail (p != n) ? n : null; // restart if off list else if (!p.casNext(null, s))// 進入這裏,說明p是尾節點。若CAS失敗,說明其餘線程在p後加了節點,需繼續遍歷p.next p = p.next; // re-read on CAS failure else { // 進入這裏,說明p是尾節點,且CAS將s設爲p.next成功。 if (p != t) { // update if slack now >= 2 // p != t 說明鬆弛度大於等於2,須要從新設置tail節點 while ((tail != t || !casTail(t, s)) && (t = tail) != null && (s = t.next) != null && // advance and retry (s = s.next) != null && s != t); } return p; } } }
代碼流程:
當節點 s 入隊成功以後,須要在隊列之中自旋、等待被其餘線程匹配。
java.util.concurrent.LinkedTransferQueue#awaitMatch
/** * Spins/yields/blocks until node s is matched or caller gives up. * * @param s the waiting node // 當前節點,處於等待之中 * @param pred the predecessor of s, or s itself if it has no // 當前節點的上一個節點,若爲 s 說明沒有上一個節點,若爲 null 則是未知的(做爲預留) * predecessor, or null if unknown (the null case does not occur * in any current calls but may in possible future extensions) * @param e the comparison value for checking match // 當前節點的數據域值 * @param timed if true, wait only until timeout elapses // 是否等待超時 * @param nanos timeout in nanosecs, used only if timed is true // 超時時間 * @return matched item, or e if unmatched on interrupt or timeout */ private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { final long deadline = timed ? System.nanoTime() + nanos : 0L; // 到期時間 Thread w = Thread.currentThread(); // 當前線程,即操做節點s的線程 int spins = -1; // initialized after first item and cancel checks // 自旋次數 ThreadLocalRandom randomYields = null; // bound if needed // 隨機數,隨機讓一些自旋的線程讓出CPU for (;;) { Object item = s.item; if (item != e) { // matched // 節點s的數據域item先後不相等,說明節點s已經被其餘線程匹配了 // assert item != s; s.forgetContents(); // avoid garbage // s設置item自鏈接,waiter爲null,這裏沒有使s=s.next,s仍掛在鏈表上 return LinkedTransferQueue.<E>cast(item); } if ((w.isInterrupted() || (timed && nanos <= 0)) && // 當前線程被取消,或者已超時 s.casItem(e, s)) { // cancel // 須要把節點s取消掉,設置item自鏈接 unsplice(pred, s); // 把s跟它的上一個節點pred斷開鏈接 return e; } if (spins < 0) { // establish spins at/near front // 初始化spins if ((spins = spinsFor(pred, s.isData)) > 0) randomYields = ThreadLocalRandom.current(); } else if (spins > 0) { // spin // 自減,隨機讓出CPU --spins; if (randomYields.nextInt(CHAINED_SPINS) == 0) Thread.yield(); // occasionally yield // 讓出CPU時間片,若又爭奪到了CPU時間片則繼續執行,不然等待直到再次得到CPU時間片(由其餘線程讓出) } else if (s.waiter == null) { // 當 spin == 0 時纔會執行這裏及後面的邏輯 s.waiter = w; // request unpark then recheck // 自旋結束了,設置s.waiter } else if (timed) { // 若是有超時,計算超時時間,並阻塞必定時間 nanos = deadline - System.nanoTime(); if (nanos > 0L) LockSupport.parkNanos(this, nanos); } else { // 不是超時的,直接阻塞,等待被喚醒 LockSupport.park(this); } } }
在等待匹配的自旋過程當中,執行如下邏輯:
等待匹配的過程當中,首先進行自旋,等自旋次數歸零後,再判斷是否進入阻塞,是由於線程的阻塞和喚醒操做涉及到系統內核態與用戶態之間的切換,比較耗時。
在 LinkedTransferQueue#awaitMatch 中,節點在隊列之中自旋和阻塞以前,首先須要獲取自旋的次數。
if (spins < 0) { if ((spins = spinsFor(pred, s.isData)) > 0) randomYields = ThreadLocalRandom.current(); }
與自旋次數相關的屬性:
/** True if on multiprocessor */ private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1; /** * The number of times to spin (with randomly interspersed calls * to Thread.yield) on multiprocessor before blocking when a node * is apparently the first waiter in the queue. See above for * explanation. Must be a power of two. The value is empirically * derived -- it works pretty well across a variety of processors, * numbers of CPUs, and OSes. */ // 當一個節點是隊列中的第一個waiter時,在多處理器上進行自旋的次數(隨機穿插調用thread.yield) private static final int FRONT_SPINS = 1 << 7; /** * The number of times to spin before blocking when a node is * preceded by another node that is apparently spinning. Also * serves as an increment to FRONT_SPINS on phase changes, and as * base average frequency for yielding during spins. Must be a * power of two. */ // 當前繼節點正在處理,當前節點在阻塞以前的自旋次數。 private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
與自旋次數相關的方法:
java.util.concurrent.LinkedTransferQueue#spinsFor
/** * Returns spin/yield value for a node with given predecessor and * data mode. See above for explanation. */ private static int spinsFor(Node pred, boolean haveData) { if (MP && pred != null) { if (pred.isData != haveData) // phase change return FRONT_SPINS + CHAINED_SPINS; if (pred.isMatched()) // probably at front return FRONT_SPINS; if (pred.waiter == null) // pred apparently spinning return CHAINED_SPINS; } return 0; }
須要注意的是,當前節點 s 在隊列之中開始自旋以前,它的上一個的節點 pred 可能也正處於自旋之中。
初始化節點 s 自旋次數的流程:
對於自旋次數多少的設計,個人理解是,節點被匹配的可能性越高,則自旋次數越多;被匹配的可能性越低,則自旋次數越少,儘早進入阻塞以避免 CPU 空轉。
而位於隊列頭部的節點是越有可能被匹配的,須要自旋最屢次。
在 LinkedTransferQueue#awaitMatch 中,節點在隊列之中等待匹配時,若是檢測到線程中斷或已超時,須要取消匹配並將節點從鏈表中斷開。
if ((w.isInterrupted() || (timed && nanos <= 0)) && s.casItem(e, s)) { unsplice(pred, s); return e; }
此外,LinkedTransferQueue 中移除節點也會調用到 LinkedTransferQueue#unsplice 方法。
與清除節點相關的屬性:
/** The number of apparent failures to unsplice removed nodes */ // 從鏈表上清除節點失敗的次數 private transient volatile int sweepVotes; /** * The maximum number of estimated removal failures (sweepVotes) * to tolerate before sweeping through the queue unlinking * cancelled nodes that were not unlinked upon initial * removal. See above for explanation. The value must be at least * two to avoid useless sweeps when removing trailing nodes. */ // sweepVotes的閥值,達到該閾值才清理無效節點 static final int SWEEP_THRESHOLD = 32;
清除指定節點的方法:
java.util.concurrent.LinkedTransferQueue#unsplice
/** * Unsplices (now or later) the given deleted/cancelled node with * the given predecessor. * * @param pred a node that was at one time known to be the * predecessor of s, or null or s itself if s is/was at head * @param s the node to be unspliced */ // 把s跟它的上一個節點pred斷開鏈接(當即斷開,或者隔段時間再斷開) final void unsplice(Node pred, Node s) { s.forgetContents(); // forget unneeded fields // s設置item自鏈接,waiter爲null if (pred != null && pred != s && pred.next == s) { // 校驗pred是不是s的上一個節點 // s是尾節點,或者 (s不是尾節點,s未出隊,pred修改下一個節點爲s.next成功,pred已經被匹配) Node n = s.next; if (n == null || (n != s && pred.casNext(s, n) && pred.isMatched())) { // 自旋的目的是:設置新的頭節點,把舊的頭節點出隊 for (;;) { // check if at, or could be, head // pred或s爲頭節點,或者頭節點爲空,直接返回,不須要處理sweepVotes Node h = head; if (h == pred || h == s || h == null) return; // at head or list empty // 頭節點未被匹配,跳出循環(說明head是準確的,後續須要處理sweepVotes) if (!h.isMatched()) break; // head已經被匹配了,且head.next爲空,說明如今隊列爲空了,直接返回,不須要處理sweepVotes Node hn = h.next; if (hn == null) return; // now empty // 走到這裏,說明head已經被匹配了,但head.next不爲空。 // 若head不是自鏈接,嘗試將head.next設置爲新的頭節點 if (hn != h && casHead(h, hn)) h.forgetNext(); // advance head // 舊的頭節點設爲自鏈接,表示出隊 // 後續繼續重新的頭節點遍歷,把已匹配的節點出隊,重設頭節點 } // 進入這裏,說明 1. s多是尾節點 2. pred、s均不爲頭節點 3. 隊列不爲空 if (pred.next != pred && s.next != s) { // recheck if offlist // 再一次校驗pred和s是否未出隊 for (;;) { // sweep now if enough votes int v = sweepVotes; if (v < SWEEP_THRESHOLD) { // 閾值爲32 if (casSweepVotes(v, v + 1)) break; } else if (casSweepVotes(v, 0)) { sweep(); // 達到閥值,進行「大掃除」,清除隊列中的無效節點 break; } } } } } }
代碼流程:
爲何要累計 sweepVotes 進行大掃除,官方的解釋:
除了經過節點自鏈接(self-linking)來方便垃圾回收之外,還須要在鏈表上解開對無效節點的鏈接。
通常來講,若是想要在鏈表上移除節點s,只須要把 s 的上一個節點的 next 屬性改掉便可。
可是用這種方式來讓節點 s 不可達,在如下兩種場景下是沒法保證的:
這樣會形成一種後果:當隊列頭部是一個阻塞的 take 請求節點,該節點以後是大量的超時時間很短的 poll 請求節點,一旦過了超時時間,隊列中就會出現大量可達的無效節點。
(把這種狀況代入 unsplice,大部分節點均可以經過 pred.casNext 移除,迷惑)
鑑於此,須要在一段時間以後遍歷整個鏈表,把已匹配的節點清理出隊。
/** * Unlinks matched (typically cancelled) nodes encountered in a * traversal from head. */ private void sweep() { for (Node p = head, s, n; p != null && (s = p.next) != null; ) { // 初始時,p = head,s = p.next if (!s.isMatched()) // Unmatched nodes are never self-linked // 未匹配的節點不會是自鏈接的! p = s; // 若是節點s未匹配,則讓 p = p.next 繼續遍歷下一個節點 else if ((n = s.next) == null) // trailing node is pinned // 遍歷結束,跳出循環 break; else if (s == n) // stale // s自鏈接,說明s已出隊,當前是在廢棄的鏈表上遍歷,須要從新從head開始遍歷 // No need to also check for p == s, since that implies s == n p = head; else // 進入這裏,說明節點s是已匹配的,不是尾節點,不是自鏈接。 // 此時將節點s出隊,雖然沒有使s自鏈接,s不可達會被回收(注意s.item會不會被回收則取決於item自身是否可達) p.casNext(s, n); } }
size 方法遍歷隊列中所有的數據節點,並進行計數,最大值爲 Integer.MAX_VALUE。
當遍歷過程當中其餘線程修改了 head,須要取新的 head 從新開始遍歷。
size 方法並非一個恆定時長的操做。
java.util.concurrent.LinkedTransferQueue#size
public int size() { return countOfMode(true); }
/** * Traverses and counts unmatched nodes of the given mode. * Used by methods size and getWaitingConsumerCount. */ private int countOfMode(boolean data) { int count = 0; for (Node p = head; p != null; ) { if (!p.isMatched()) { if (p.isData != data) return 0; if (++count == Integer.MAX_VALUE) // saturated break; } Node n = p.next; if (n != p) // 節點p未出隊,繼續遍歷p.next p = n; else { // 節點p已出隊,取新的head從新開始遍歷 count = 0; p = head; } } return count; }