LinkedTransferQueue(LTQ) 相比 BlockingQueue 更進一步,生產者會一直阻塞直到所添加到隊列的元素被某一個消費者所消費(不只僅是添加到隊列裏就完事)。新添加的 transfer 方法用來實現這種約束。顧名思義,阻塞就是發生在元素從一個線程 transfer 到另外一個線程的過程當中,它有效地實現了元素在線程之間的傳遞(以創建 Java 內存模型中的 happens-before 關係的方式)。Doug Lea 說從功能角度來說,LinkedTransferQueue 其實是 ConcurrentLinkedQueue、SynchronousQueue(公平模式)和 LinkedBlockingQueue 的超集。並且 LinkedTransferQueue 更好用,由於它不只僅綜合了這幾個類的功能,同時也提供了更高效的實現。html
推薦一篇 LinkedTransferQueue 的介紹:http://ifeve.com/java-transfer-queue/java
LinkedTransferQueue 實現了 TransferQueue 接口,下面就主要介紹一下這個接口。 TransferQueue 繼承了 BlockingQueue(BlockingQueue 又繼承了 Queue)並擴展了一些新方法。BlockingQueue(和Queue)是 JDK5 中加入的接口,它是指這樣的一個隊列:當生產者向隊列添加元素但隊列已滿時,生產者會被阻塞;當消費者從隊列移除元素但隊列爲空時,消費者會被阻塞。node
TransferQueue 則更進一步,生產者會一直阻塞直到所添加到隊列的元素被某一個消費者所消費(不只僅是添加到隊列裏就完事)。新添加的 transfer 方法用來實現這種約束。顧名思義,阻塞就是發生在元素從一個線程 transfer 到另外一個線程的過程當中,它有效地實現了元素在線程之間的傳遞(以創建 Java 內存模型中的 happens-before 關係的方式)。算法
TransferQueue 還包括了其餘的一些方法:兩個 tryTransfer 方法,一個是非阻塞的,另外一個帶有 timeout 參數設置超時時間的。還有兩個輔助方法 hasWaitingConsumer() 和 getWaitingConsumerCount()。緩存
當我第一次看到 LinkedTransferQueue 時,首先想到了已有的實現類 SynchronousQueue。SynchronousQueue 的隊列長度爲 0,特別是對於兩個線程之間傳遞元素這種用例。數據結構
LinkedTransferQueue 相比 SynchronousQueue 用處更廣、更好用,由於你能夠決定是使用 BlockingQueue 的方法(譯者注:例如put方法)仍是確保一次傳遞完成(譯者注:即transfer方法)。在隊列中已有元素的狀況下,調用 transfer 方法,能夠確保隊列中被傳遞元素以前的全部元素都能被處理。app
LinkedTransferQueue 的性能分別是 SynchronousQueue 的3倍(非公平模式)和14倍(公平模式)。由於像 ThreadPoolExecutor 這樣的類在任務傳遞時都是使用 SynchronousQueue,因此使用 LinkedTransferQueue 來代替 SynchronousQueue 也會使得 ThreadPoolExecutor 獲得相應的性能提高。less
下面你能夠參考這往篇文章實現一個本身的 LinkedTransferQueue:http://ifeve.com/customizing-concurrency-classes-11-2/#more-7388dom
LTQ 內部採用的是一種很是不一樣的隊列,即鬆弛型雙重隊列(Dual Queues with Slack):http://ifeve.com/buglinkedtransferqueue-bug/#more-11117異步
強烈建議你們讀一下 Doug Lea 的 java doc 文檔,對 LTQ 的數據結構有很清楚的說明。
/** * A FIFO dual queue may be implemented using a variation of the * Michael & Scott (M&S) lock-free queue algorithm * (http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf). * It maintains two pointer fields, "head", pointing to a * (matched) node that in turn points to the first actual * (unmatched) queue node (or null if empty); and "tail" that * points to the last node on the queue (or again null if * empty). For example, here is a possible queue with four data * elements: * * head tail * | | * v v * M -> U -> U -> U -> U * * M(matched) U(unmatched) */
翻譯:FIFO 雙隊列可使用 Michael & Scott(M&S)無鎖隊列算法的變體實現。它維護兩個指針字段: head 指向第一個不匹配節點(M)的前驅節點(若是爲空則爲空);tail 指向隊列中的最後一個節點(若是爲空則爲空)。
雙重是指有兩種類型相互對立的節點(Node.isData==false或true),而且我理解的每種節點都有三種狀態:
/** * 在更新head/tail和查找中尋求平衡,大多數場景1~3比較合適。 * 本質上:是增長對 volatile 變量讀操做來減小了對 volatile 變量的寫操做 * 而對 volatile 變量的寫操做開銷要遠遠大於讀操做,所以使用Slack能增長效率 * * We introduce here an approach that lies between the extremes of * never versus always updating queue (head and tail) pointers. * This offers a tradeoff between sometimes requiring extra * traversal steps to locate the first and/or last unmatched * nodes, versus the reduced overhead and contention of fewer * updates to queue pointers. For example, a possible snapshot of * a queue is: * * head tail * | | * v v * M -> M -> U -> U -> U -> U * * The best value for this "slack" (the targeted maximum distance * between the value of "head" and the first unmatched node, and * similarly for "tail") is an empirical matter. We have found * that using very small constants in the range of 1-3 work best * over a range of platforms. Larger values introduce increasing * costs of cache misses and risks of long traversal chains, while * smaller values increase CAS contention and overhead. */
爲了節省 CAS 操做的開銷,LTQ 引入了「鬆弛度」的概念:在節點被匹配(被刪除)以後,不會當即更新 head/tail,而是當 head/tail 節點和最近一個未匹配的節點之間的距離超過一個「鬆弛閥值」以後纔會更新(在 LTQ 中,這個值爲 2)。這個「鬆弛閥值」通常爲1-3,若是太大會下降緩存命中率,而且會增長遍歷鏈的長度;過小會增長 CAS 的開銷。另外在 ConcurrentLinkedQueue 也有相應的應用:hops 設計意圖
已匹配節點的 next 引用會指向自身。若是 GC 延遲迴收,已刪除節點鏈會積累的很長,此時垃圾收集會耗費高昂的代價,而且全部剛匹配的節點也不會被回收。爲了不這種狀況,咱們在 CAS 向後推動 head 時,會把已匹配的 head 的"next"引用指向自身(即「自連接節點」),這樣就限制了鏈接已刪除節點的長度(咱們也採起相似的方法,清除在其餘節點字段中可能的垃圾保留值)。若是在遍歷時遇到一個自連接節點,那就代表當前線程已經滯後於另一個更新 head 的線程,此時就須要從新獲取 head 來遍歷。
因此,在 LTQ 中,數據在某個線程的「某一時刻」可能存在下面這種形式:
static final class QNode { volatile Object item; // 節點包含的數據,非空表示生產者,空者是消費者 final boolean isData; // 表示該節點由生產者建立仍是由消費者建立,生產者true,消費者false volatile Thread waiter; // 等待在該節點上的線程。to control park/unpark volatile QNode next; // 指向隊列中的下一個節點 }
Node 節點自己就是一個原子性操做,對節點的屬性 item、waiter、next 都是原子性操做。
transient volatile Node head; private transient volatile Node tail; // 馬上、異步、同步、超時返回 private static final int NOW = 0; // for untimed poll, tryTransfer private static final int ASYNC = 1; // for offer, put, add private static final int SYNC = 2; // for transfer, take private static final int TIMED = 3; // for timed poll, tryTransfer
LinkedTransferQueue 主要方法介紹:
public LinkedTransferQueue() { }
LinkedTransferQueue 初始化時什麼也沒作,也就是說 head=tail=null。
/** * @param e the item or null for take * @param haveData true if this is a put, else a take * @param how NOW, ASYNC, SYNC, or TIMED * @param nanos timeout in nanosecs, used only if mode is TIMED */ private E xfer(E e, boolean haveData, int how, long nanos) { if (haveData && (e == null)) throw new NullPointerException(); Node s = null; // the node to append, if needed retry: for (;;) { // restart on append race // 1. 嘗試匹配 for (Node h = head, p = h; p != null;) { // find & match first node boolean isData = p.isData; Object item = p.item; // 1.2 p節點還未匹配則嘗試進行匹配,爲何不調用 !p.isMatched() ???? if (item != p && (item != null) == isData) { // unmatched // 1.3 兩個節點的模式同樣,則直接跳出循環,嘗試入隊 if (isData == haveData) // can't match break; // 1.4 p匹配成功 if (p.casItem(item, e)) { // match for (Node q = p; q != h;) { // 1.5 p已經匹配,直接將n設置爲頭節點。h -> p -> n Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry // 1.6 有其它線程更新了頭節點,再次判斷 slack<2。 // h -> q 若是 q.isMatched() 則能夠將 q.next 設置爲頭節點 if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } LockSupport.unpark(p.waiter); // 喚醒等待的線程後直接返回 return LinkedTransferQueue.<E>cast(item); } } // 1.7 p==p.next 則說明p已經出隊,失效了。須要從新從頭節點開始匹配 Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist } // 2. 到了這一步,只有未匹配上一種狀況。根據how判斷節點是否要入隊並等待其它線程匹配 if (how != NOW) { // No matches available if (s == null) s = new Node(e, haveData); // 2.1 節點嘗試入隊,入隊失敗繼續嘗試 Node pred = tryAppend(s, haveData); if (pred == null) continue retry; // lost race vs opposite mode // 2.2 等待其它線程匹配成功後喚醒當前線程 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); } return e; // not waiting } }
說明:xfer 大體能夠分三部分:
若是在上述操做中沒有找到匹配節點,則根據參數 how 作不一樣的處理:
NOW(poll, tryTransfer)
:當即返回。SYNC(transfer, take)
:經過 tryAppend 方法插入一個新的節點 s(item=e,isData = haveData)到隊列尾,而後自旋或阻塞當前線程直到節點被匹配或者取消返回。ASYNC(offer, put, add)
:經過 tryAppend 方法插入一個新的節點 s(item=e,isData = haveData)到隊列尾,異步直接返回。TIMED(timed poll, tryTransfer)
:經過 tryAppend 方法插入一個新的節點 s(item=e,isData = haveData)到隊列尾,而後自旋或阻塞當前線程直到節點被匹配或者取消或等待超時返回。// 1. NOW(poll, tryTransfer) public boolean tryTransfer(E e) { return xfer(e, true, NOW, 0) == null; } // 2. SYNC(transfer, take) public void transfer(E e) throws InterruptedException { if (xfer(e, true, SYNC, 0) != null) { Thread.interrupted(); // failure possible only due to interrupt throw new InterruptedException(); } } // 3. ASYNC(offer, put, add) public boolean offer(E e) { xfer(e, true, ASYNC, 0); return true; } // 4. TIMED(timed poll, tryTransfer) public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null) return true; if (!Thread.interrupted()) return false; throw new InterruptedException(); }
這裏能夠看到若是使用異步(ASYNC)的方式時線程不會阻塞,如 offer 時同一線程的數據節點也能夠入隊,也就是存儲的數據長度再也不是 0,這也是和 SynchronousQueue 一個很大的不一樣點。因此 Doug Lea 說從功能角度來說,LinkedTransferQueue 其實是 ConcurrentLinkedQueue、SynchronousQueue(公平模式)和 LinkedBlockingQueue 的超集。
private Node tryAppend(Node s, boolean haveData) { for (Node t = tail, p = t;;) { // move p to last node and append Node n, u; // temps for reads of next & tail // 1. 節點初始化 if (p == null && (p = head) == null) { if (casHead(null, s)) return s; // initialize } // 2. 節點s不能追加到p節點後。①p和s的模式不一樣且②p還未匹配 else if (p.cannotPrecede(haveData)) return null; // lost race vs opposite mode // 3. p 不是尾節點 else if ((n = p.next) != null) // not last; keep traversing // t -> p 時 tail 改變則須要從新定位到尾節點 // p節點已經出隊則須要從 head 開始從新遍歷 p = p != t && t != (u = tail) ? (t = u) : // stale tail (p != n) ? n : null; // restart if off list // 4. 有其它線程添加節點時繼續自旋,直到成功 else if (!p.casNext(null, s)) p = p.next; // re-read on CAS failure // 5. 終於添加到隊列中。嘗試更新尾節點 else { // 若是 p!=t 則隊列狀況以下,須要更新尾節點: t -> p -> s if (p != t) { // update if slack now >= 2 // 5.1 其它線程已經更新 tail,從新進行下面三個條件的判斷 // 5.2 t.next.next!=null 則須要從新更新 tail。至於s!=t則是此時t沒有踢出隊列 while ((tail != t || !casTail(t, s)) && (t = tail) != null && (s = t.next) != null && // advance and retry (s = s.next) != null && s != t); } return p; } } }
說明:添加給定節點 s 到隊列尾並返回 s 的前繼節點;失敗時(與其餘不一樣模式線程競爭失敗)返回 null,此時 s 的前繼節點 p 的模式和 s 不一樣且 p 還沒有被匹配,如 s 爲請求節點,p 爲數據節點且未匹配則不能將 s 追加到 p 後面。
/** * Spins/yields/blocks 直到s節點matched或canceled * * @param s the waiting node * @param pred s的前驅節點,若是沒有前驅節點則爲s本身 * @param e s節點的原始值 * @param timed true時限時等待,false時無限等待 * @param nanos timeout in nanosecs, used only if timed is true * @return matched item, or e if unmatched on interrupt or timeout */ private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); int spins = -1; // initialized after first item and cancel checks ThreadLocalRandom randomYields = null; // bound if needed for (;;) { Object item = s.item; // 1. item已經被修改,說明匹配成功。返回匹配後的值 if (item != e) { // matched // assert item != s; s.forgetContents(); // avoid garbage return LinkedTransferQueue.<E>cast(item); } // 2. 超時,返回匹配前的值 if ((w.isInterrupted() || (timed && nanos <= 0)) && s.casItem(e, s)) { // cancel unsplice(pred, s); return e; } // 3. 設置自旋次數 if (spins < 0) { // establish spins at/near front if ((spins = spinsFor(pred, s.isData)) > 0) randomYields = ThreadLocalRandom.current(); // 4. 自旋,有很小的機率調用 yeild } else if (spins > 0) { // spin --spins; if (randomYields.nextInt(CHAINED_SPINS) == 0) Thread.yield(); // occasionally yield // 5. 設置等待線程,讓其它線程喚醒 } else if (s.waiter == null) { s.waiter = w; // request unpark then recheck // 6. 阻塞直至其它線程喚醒,繼續循環直到匹配成功或超時退出 } else if (timed) { nanos = deadline - System.nanoTime(); if (nanos > 0L) LockSupport.parkNanos(this, nanos); } else { LockSupport.park(this); } } }
說明:當前操做爲同步操做時,會調用 awaitMatch 方法阻塞等待匹配,成功返回匹配後節點 item,超時則返回匹配前節點的 item 值 e。在等待期間若是線程被中斷或等待超時,則取消匹配,並調用 unsplice 方法解除節點 s 和其前繼節點的連接。
// 計算自旋次數 FRONT_SPINS=1<<7,CHAINED_SPINS=1<<6 private static int spinsFor(Node pred, boolean haveData) { if (MP && pred != null) { if (pred.isData != haveData) // phase change return FRONT_SPINS + CHAINED_SPINS; if (pred.isMatched()) // probably at front return FRONT_SPINS; if (pred.waiter == null) // pred apparently spinning return CHAINED_SPINS; } return 0; // 單核CPU時不自旋 }
// 統計數據節點個數 public int size() { return countOfMode(true); } // 統計請求節點個數 public int getWaitingConsumerCount() { return countOfMode(false); } private int countOfMode(boolean data) { int count = 0; for (Node p = head; p != null; ) { if (!p.isMatched()) { // p未匹配且屬性指定的data模式則 ++count if (p.isData != data) return 0; if (++count == Integer.MAX_VALUE) // saturated break; } Node n = p.next; if (n != p) // 下一個節點 p = n; else { // p節點失效則計數器歸0,從新從 head 遍歷 count = 0; p = head; } } return count; }
// 查找是否包含指定的數據節點 item=o public boolean contains(Object o) { if (o == null) return false; for (Node p = head; p != null; p = succ(p)) { Object item = p.item; if (p.isData) { if (item != null && item != p && o.equals(item)) return true; } else if (item == null) // 有請求節點了,不用再匹配 break; } return false; } // 後繼節點,若是節點失效,直接從 head 開始 final Node succ(Node p) { Node next = p.next; return (p == next) ? head : next; }
// 是否有請求節點 public boolean hasWaitingConsumer() { return firstOfMode(false) != null; } // 查找第一個 isData 模式的未匹配節點 private Node firstOfMode(boolean isData) { for (Node p = head; p != null; p = succ(p)) { if (!p.isMatched()) return (p.isData == isData) ? p : null; } return null; }
參考:
天天用心記錄一點點。內容也許不重要,但習慣很重要!