系列傳送門:html
LinkedTransferQueue在JDK1.7版本誕生,是由鏈表組成的無界TransferQueue,相對於其餘阻塞隊列,多了tryTransfer和transfer方法。java
TransferQueue:生產者會一直阻塞直到所添加到隊列的元素被某一個消費者所消費(不只僅是添加到隊列裏就完事)。新添加的transfer方法用來實現這種約束。顧名思義,阻塞就是發生在元素從一個線程transfer到另外一個線程的過程當中,它有效地實現了元素在線程之間的傳遞(以創建Java內存模型中的happens-before關係的方式)。node
http://cs.oswego.edu/pipermail/concurrency-interest/2009-February/005888.html算法
Doug Lea評價TransferQueue是ConcurrentLinkedQueue、SynchronousQueue(在公平模式下)、無界的LinkedBlockingQueue等的超集,功能十分強大,最重要的是,它的實現也更加的高效。編程
總結:基於無鎖CAS方式實現的無界FIFO隊列。安全
public class LinkedTransferQueue<E> extends AbstractQueue<E> implements TransferQueue<E>, java.io.Serializable { //... }
LinkedTransferQueue不一樣於其餘的阻塞隊列,它實現了TransferQueue接口,這必定是核心所在,咱們直接來看看接口定義的方法規範:併發
// 繼承了BlockingQueue接口,並增長若干新方法 public interface TransferQueue<E> extends BlockingQueue<E> { /** * 將元素 傳給等待的消費者【若是有的話】, 返回true, 若是不存在,返回false,不入隊。 */ boolean tryTransfer(E e); /** * 將元素傳遞給等待的消費者【若是有的話】, 若是沒有,則將e插入隊列尾部, * 會一直等待,直到它被消費者接收 */ void transfer(E e) throws InterruptedException; /** * 在transfer的基礎上,增長了超時操做,時間到了尚未被消費的話,返回false,並移除元素 */ boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException; /** * 若是存在消費者線程,返回true */ boolean hasWaitingConsumer(); /** * 獲得等待獲取元素的消費者線程的數量 */ int getWaitingConsumerCount(); }
public class LinkedTransferQueue<E> extends AbstractQueue<E> implements TransferQueue<E>, java.io.Serializable { private static final long serialVersionUID = -3223113410248163686L; /** 是否爲多核處理器 */ private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1; /** * 當一個節點目前是隊列的第一個waiter時,阻塞前的自旋次數 */ private static final int FRONT_SPINS = 1 << 7; /** * 前驅節點正在處理,當前節點須要自旋的次數 */ private static final int CHAINED_SPINS = FRONT_SPINS >>> 1; /** * */ static final int SWEEP_THRESHOLD = 32; // 隊列中的節點 static final class Node {...} // 頭節點 transient volatile Node head; /** 尾指針,注意可能不是最後一個節點,初始化爲null */ private transient volatile Node tail; /** 刪除節點失敗的次數 */ private transient volatile int sweepVotes; /* * xfer方法中使用,定義how,解釋很清楚了,每一個變量對應不一樣的方法 */ private static final int NOW = 0; // for untimed poll, tryTransfer private static final int ASYNC = 1; // for offer, put, add private static final int SYNC = 2; // for transfer, take private static final int TIMED = 3; // for timed poll, tryTransfer
有耐心的同窗其實能夠看一下javadoc的介紹,LinkedTransferQueue使用的隊列結構實際上是這樣的:是
slack dual queue
,他和普通的M&S dual queue
的區別在於,它不會每次操做的時候都更新head或tail,而是保持有針對性的slack懈怠,因此它的結構多是下面這樣,tail指針指向的節點未必就是最後一個節點。apphead tail | | v v M -> M -> U -> U -> U -> U
Node節點的結構其實和SynchronousQueue公平模式差不太多,這一次看起來就比較清晰了,這邊再總結一下,主要包含幾個部分:幾個重要字段,以及一些CAS方法。less
static final class Node { final boolean isData; // isData == true表示存數據,不然爲獲取數據 volatile Object item; // 存數據,item非null, 獲取數據,匹配後,item爲null volatile Node next; // next域 volatile Thread waiter; // 等待線程 // CAS操做next域 若是next爲cmp,則變爲val final boolean casNext(Node cmp, Node val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // CAS操做item域,若是item爲cmp,變爲val final boolean casItem(Object cmp, Object val) { // assert cmp == null || cmp.getClass() != Node.class; return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } // 構造器 Node(Object item, boolean isData) { UNSAFE.putObject(this, itemOffset, item); // relaxed write this.isData = isData; } // 將next指向自身this final void forgetNext() { UNSAFE.putObject(this, nextOffset, this); } // 匹配或取消節點調用 final void forgetContents() { UNSAFE.putObject(this, itemOffset, this); UNSAFE.putObject(this, waiterOffset, null); } /** * 判斷節點是否已經匹配,匹配取消也爲true */ final boolean isMatched() { Object x = item; return (x == this) || ((x == null) == isData); } /** * 是否爲一個未匹配的請求 item爲null表示未匹配 */ final boolean isUnmatchedRequest() { return !isData && item == null; } /** * 若是給定的節點不能掛到當前節點後面,則返回true */ final boolean cannotPrecede(boolean haveData) { boolean d = isData; Object x; return d != haveData && (x = item) != this && (x != null) == d; } /** * 嘗試去匹配一個數據節點 */ final boolean tryMatchData() { // assert isData; Object x = item; if (x != null && x != this && casItem(x, null)) { LockSupport.unpark(waiter); return true; } return false; } private static final long serialVersionUID = -3375979862319811754L; // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; private static final long waiterOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); waiterOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("waiter")); } catch (Exception e) { throw new Error(e); } } }
咱們接下來將會介紹LinkedTransferQueue提供的各類操做,他們都會調用一個方法:xfer。dom
這裏咱們暫且不談具體的實現,咱們只須要知道一下這個方法的四個入參分別是什麼意思。
/** * xfer方法實現了全部的隊列方法 * * @param e take操做傳入null, 不然傳入具體元素 * @param haveData put操做爲true, take操做爲false * @param how NOW, ASYNC, SYNC, or TIMED 不一樣字段,先從名稱上猜想一下他們的大意 * @param nanos 若是是TIMED模式,也就是具備超時機制的方法啦,具體超時的時間 * @return an item if matched, else e 返回匹配的元素,不然返回e * @throws NullPointerException 插入null值拋出空指針異常: haveData==true && e == null */ private E xfer(E e, boolean haveData, int how, long nanos) { // }
接下來咱們將分幾類來分別看一下各類操做的定義。
LinkedTransferQueue是無界的,下面三個插入方法不會阻塞,他們都調用了xfer方法,傳入元素e,havaData爲true,how字段類型都爲SYNC
。
public void put(E e) { xfer(e, true, ASYNC, 0); } public boolean offer(E e, long timeout, TimeUnit unit) { xfer(e, true, ASYNC, 0); return true; } public boolean offer(E e) { xfer(e, true, ASYNC, 0); return true; } public boolean add(E e) { xfer(e, true, ASYNC, 0); return true; }
// take public E take() throws InterruptedException { E e = xfer(null, false, SYNC, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); } // timed poll public E poll(long timeout, TimeUnit unit) throws InterruptedException { E e = xfer(null, false, TIMED, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) return e; throw new InterruptedException(); } // untimed poll public E poll() { return xfer(null, false, NOW, 0); }
一樣的,獲取元素的方法也都調用了xfer方法,他們都傳入null,havaData都爲false,可是傳入的how字段類型不一樣:
public boolean tryTransfer(E e) { return xfer(e, true, NOW, 0) == null; } public void transfer(E e) throws InterruptedException { if (xfer(e, true, SYNC, 0) != null) { Thread.interrupted(); // failure possible only due to interrupt throw new InterruptedException(); } } public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null) return true; if (!Thread.interrupted()) return false; throw new InterruptedException(); }
xfer方法的實現,做者已經在註釋中說的十分清楚啦,這邊簡單看下三個核心步驟,細節部分下面會學習。
一、Try to match an existing node 嘗試去匹配一個節點
二、Try to append a new node (method tryAppend) 嘗試將一個節點入隊,對應tryAppend方法
三、Await match or cancellation (method awaitMatch) 阻塞等待一個節點被匹配或取消,對應awaitMatch方法
這個方法必然是核心方法了,畢竟它能夠實現隊列中提供的全部操做。
private E xfer(E e, boolean haveData, int how, long nanos) { // 若是 是插入的數據爲null, 則NPE if (haveData && (e == null)) throw new NullPointerException(); Node s = null; // the node to append, if needed retry: for (;;) { // restart on append race // 第一次插入數據的時候,不會進入這個循環,由於p == null // 不然進入這個循環,從head首節點開始 for (Node h = head, p = h; p != null;) { // find & match first node boolean isData = p.isData; Object item = p.item; // 找到還未匹配的節點: isData的item應該是爲非null, 若是是null代表用過了 if (item != p && (item != null) == isData) { // unmatched // 節點類型和當前類型一致,沒法匹配 if (isData == haveData) // can't match break; // 將參數加入到item域, if (p.casItem(item, e)) { // match // 下面這個for循環,是匹配item以後進行的額外操做, // 好比將head更新爲當前這個點 for (Node q = p; q != h;) { Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } // 阻塞線程 LockSupport.unpark(p.waiter); // 返回item值 return LinkedTransferQueue.<E>cast(item); } } // 若是節點已經匹配過了,向後 Node n = p.next; // p != n的狀況很簡單,將p移到n的位置, p==n表示什麼呢? // 其實若是p.next == p 說明p節點已經被其餘線程處理,那麼p就從head開始 p = (p != n) ? n : (h = head); // Use head if p offlist } // 尚未找到能夠匹配的點的話,會走到這 // 這裏 若是 how 字段傳入爲 NOW ,便不會走裏面的邏輯, // 也就是說untimed poll、 tryTransfer 不須要將元素入隊 if (how != NOW) { // No matches available // 這裏構造一個節點 if (s == null) s = new Node(e, haveData); // 初始化以後,調用tryAppend入隊, 返回前驅節點 Node pred = tryAppend(s, haveData); // pred == null表示競爭失敗,返回到retry的地方 if (pred == null) continue retry; // lost race vs opposite mode // 若是是ASYNC會跳過這裏,馬上返回e,不須要阻塞 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); } return e; // not waiting } }
核心流程:
直接上圖吧:
tryAppend包含入隊的邏輯,返回前驅節點。代碼充分考慮到併發狀況,仍是比較難懂的,若是要看明白,能夠在圖上畫一畫節點的變化。
private Node tryAppend(Node s, boolean haveData) { for (Node t = tail, p = t;;) { // move p to last node and append Node n, u; // temps for reads of next & tail // p == null && head == null 表示此時隊頭還未初始化 if (p == null && (p = head) == null) { // cas設置s爲隊頭 if (casHead(null, s)) return s; // initialize } // 這裏檢測到異常狀況,返回null,以後會continue retry; else if (p.cannotPrecede(haveData)) return null; // lost race vs opposite mode // 這裏就是p一直找到tail的位置, else if ((n = p.next) != null) // not last; keep traversing // 這段... 吐槽一下 p = p != t && t != (u = tail) ? (t = u) : // stale tail (p != n) ? n : null; // restart if off list // 嘗試將s插到隊尾,若是失敗,說明其餘線程先插了,那麼p向後移,重新開始 else if (!p.casNext(null, s)) p = p.next; // re-read on CAS failure else { if (p != t) { // update if slack now >= 2 // 這裏會設置s爲tail,若是成功的話,就退出循環了, // 若是失敗的話,會進行後面的判斷,一開始tail其實都是null的 // while ((tail != t || !casTail(t, s)) && (t = tail) != null && (s = t.next) != null && // advance and retry (s = s.next) != null && s != t); } // 返回加入節點的前驅節點 return p; } } }
該方法從當前的tail開始,找到實際的最後一個節點【前面說了,tail可能不是最後一個節點】,並嘗試追加一個新的節點【若是head爲null,則創建第一個節點】。
成功追加後,若是how爲ASYNC,則返回。
注意:僅當它的前面節點都已經匹配或mode相同時,才能夠追加節點。若是檢測到其餘的狀況,咱們須要直接返回null,從新啓動retry。
awaitMatch方法其實和SynchronousQueue的awaitFulfill邏輯差很少,線程會有三種狀況:spins/yield/blocks
,直到node s被匹配或取消。
On multiprocessors, we use front-of-queue spinning: If a node appears to be the first unmatched node in the queue, it spins a bit before blocking.
若是一個節點可能會優先被匹配呢,它會優先選擇自旋而不是阻塞,自旋次數到了才阻塞,主要是考慮到阻塞、喚醒須要消耗更多的資源。這邊簡單過一下:
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); // 自旋次數 int spins = -1; // initialized after first item and cancel checks // 這裏是線程安全的Random ThreadLocalRandom randomYields = null; // bound if needed for (;;) { Object item = s.item; // if (item != e) { // matched // assert item != s; s.forgetContents(); // avoid garbage return LinkedTransferQueue.<E>cast(item); } // 若是中斷或超時 ,就cas設置s的item爲e if ((w.isInterrupted() || (timed && nanos <= 0)) && s.casItem(e, s)) { // cancel // 斷開 unsplice(pred, s); return e; } // 計算自旋次數 if (spins < 0) { // establish spins at/near front if ((spins = spinsFor(pred, s.isData)) > 0) randomYields = ThreadLocalRandom.current(); } else if (spins > 0) { // spin --spins; // 這裏做者提示:雖然偶爾執行yield的收益不是很明顯 // 但仍限制了 自旋對busy system 的影響 if (randomYields.nextInt(CHAINED_SPINS) == 0) Thread.yield(); // occasionally yield } // 設置一下waiter線程,標記一下誰在等 else if (s.waiter == null) { s.waiter = w; // request unpark then recheck } // 超時阻塞 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos > 0L) LockSupport.parkNanos(this, nanos); } // 自旋完仍是沒有匹配,就park住 else { LockSupport.park(this); } } }
最後,來看個簡單的案例吧。
/** * @author Summerday */ public class TestTransferQueue { // 無鎖算法 無界隊列 static TransferQueue<Integer> queue = new LinkedTransferQueue<>(); public static void main (String[] args) { for (int i = 1; i <= 10; i++) { new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + "消費 id - " + queue.take()); System.out.println("---------------------------------------------"); } catch (InterruptedException e) { e.printStackTrace(); } }, "consumer" + i).start(); } Thread producer = new Thread(() -> { while (true) { System.out.println("當前隊列中等待的線程" + queue.getWaitingConsumerCount()); // 若是隊列中有等待的消費者 if (queue.hasWaitingConsumer()) { int product = new Random().nextInt(500); try { System.out.println(Thread.currentThread().getName() + "生產 id - " + product); queue.tryTransfer(product); TimeUnit.MILLISECONDS.sleep(100); // 等待消費 } catch (InterruptedException e) { e.printStackTrace(); } } } }, "producer"); producer.setDaemon(true); producer.start(); } } // 打印結果: 當前隊列中等待的線程10 producer生產 id - 266 consumer1消費 id - 266 --------------------------------------------- 當前隊列中等待的線程9 producer生產 id - 189 consumer2消費 id - 189 --------------------------------------------- //... 省略
LinkedTransferQueue在JDK1.7版本誕生,是由鏈表組成的無界TransferQueue,相對於其餘阻塞隊列,不只多了tryTransfer和transfer方法,並且性能方面也有巨大的提高。
LinkedTransferQueue使用的隊列結構是slack dual queue
,不會每次操做的時候都更新head或tail,而是保持有針對性的slack懈怠。
LinkedTransferQueue的全部隊列操做都基於xfer方法,具體狀況根據傳入的how字段決定:NOW節點不入隊,ASYNC節點入隊但會當即返回,SYNC節點入隊且阻塞,TIMED對應超時機制。
xfer的實現分爲三個流程:
最後:具體步驟能夠查看上面的解析,若有不足,望評論區指教。