在JDK8的阻塞隊列實現中還有兩個未進行說明,今天繼續對其中的一個阻塞隊列LinkedTransferQueue進行源碼分析,若是以前的隊列分析已經讓你對阻塞隊列有了必定的瞭解,相信本文要講解的LinkedTransferQueue的源碼也能很快被理解,接下來一塊兒學習吧java
JDK版本號:1.8.0_171
LinkedTransferQueue是基於鏈表的FIFO無界阻塞隊列,在源碼分析前,須要提早對源碼實現總體有個印象,便於細節的理解。註釋部分對於這個類進行了一些說明和介紹,若是有能力的話能夠閱讀理解,對於其中的部分這裏進行簡單說明:node
LinkedTransferQueue使用了鬆弛型雙重隊列,雙重的意思能夠理解爲兩種類型的節點(請求數據的消費者和生產數據的生產者),也就是說隊列中保存了這兩種類型的節點,理解上要稍微複雜些,其實以前SynchronousQueue中就使用了相似的隊列,隊列維護了兩個指針:head指向第一個匹配節點(M)(若是爲空則爲空);tail 指向隊列中的最後一個節點(若是爲空則爲空)併發
在雙重隊列中爲了減小CAS的開銷,加入了Slack(鬆弛度)的處理方式,在節點被匹配(被刪除)以後,不會當即更新 head/tail,而是當 head/tail 節點和最近一個未匹配的節點之間的距離超過一個閾值以後纔會更新,在LinkedTransferQueue中鬆弛度值設置爲2,這是一個經驗值,很少深究。同時爲了不匹配節點在隊列中的堆積,在CAS更新head時,會把已匹配的head的next引用指向本身。當咱們進行遍歷時,遇到這種節點,表示當前線程已經落後於其餘線程,須要從新獲取head來進行遍歷app
其與其餘阻塞隊列不一樣之處在於,LinkedTransferQueue容許消費者線程獲取元素時,若是未請求到數據,則能夠生成一個數據節點(節點item爲null)入隊,而後消費者線程在這個節點線程上等待,直到以後生產者線程入隊時發現有一個item爲null的數據節點,生產者線程就再也不進行入隊操做了,直接就將元素填充到該節點的item,並喚醒該節點等待的線程,被喚醒的消費者線程取走元素,從調用處返回,生產者一樣直接返回less
實現TransferQueue接口中的方法是LinkedTransferQueue操做的核心部分dom
public class LinkedTransferQueue<E> extends AbstractQueue<E> implements TransferQueue<E>, java.io.Serializable
先了解其中的每一個方法的含義,便於下面源碼實現的理解異步
/** * 若是機器爲多處理器則爲true,MP爲multiprocessor縮寫 */ private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1; /** * 節點自旋等待的次數 128 */ private static final int FRONT_SPINS = 1 << 7; /** * 當前驅節點在處理,當前節點自旋等待的次數 64 */ private static final int CHAINED_SPINS = FRONT_SPINS >>> 1; /** * sweepVotes的閾值,達到這個閾值上限則進行一次清理操做 */ static final int SWEEP_THRESHOLD = 32; /** 隊列頭節點 */ transient volatile Node head; /** 隊列尾節點*/ private transient volatile Node tail; /** 解除刪除節點關聯失敗的次數 */ private transient volatile int sweepVotes; /* * xfer方法how參數可能的取值類型 * 隊列操做統一方法根據類型進行不一樣的處理 */ /** poll和tryTransfer使用 */ private static final int NOW = 0; // for untimed poll, tryTransfer /** offer, put, add方法使用 */ private static final int ASYNC = 1; // for offer, put, add /** transfer, take方法使用 */ private static final int SYNC = 2; // for transfer, take /** 超時等待的poll和tryTransfer使用 */ private static final int TIMED = 3; // for timed poll, tryTransfer
上述參數中對xfer參數類型進行詳細說明:源碼分析
LinkedTransferQueue中鏈表的節點實現Node與SynchronousQueue中的實現相似,須要注意的是當節點已經匹配或被取消時咱們必然須要將節點離隊,經過forgetNext和forgetContents來將節點排除隊列匹配操做學習
/** * 隊列Node實現 * CAS更新Node成員變量 */ static final class Node { // 數據節點和請求節點類型區分標識 final boolean isData; // false if this is a request node // 數據節點保存數據,請求節點爲null volatile Object item; // initially non-null if isData; CASed to match // 指向隊列中下一個節點 volatile Node next; // 當前節點對應的等待線程 volatile Thread waiter; // null until waiting // CAS methods for fields final boolean casNext(Node cmp, Node val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } final boolean casItem(Object cmp, Object val) { // assert cmp == null || cmp.getClass() != Node.class; return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } /** * 構造方法需傳入的參數 */ Node(Object item, boolean isData) { UNSAFE.putObject(this, itemOffset, item); // relaxed write this.isData = isData; } /** * 將next指向本身,避免無用Node過長影響垃圾回收 * 在cas更新head後調用 */ final void forgetNext() { UNSAFE.putObject(this, nextOffset, this); } /** * 匹配或者節點被取消的時候被調用,設置item指向本身,waiter爲null */ final void forgetContents() { UNSAFE.putObject(this, itemOffset, this); UNSAFE.putObject(this, waiterOffset, null); } /** * 若是此節點已匹配或者是被取消匹配的節點,則返回true * x == this 調用了forgetContents * (x == null) == isData 表示請求節點匹配了數據節點(請求節點的item更新爲數據節點的數據) * 或者數據節點匹配了請求節點(數據節點的item更新爲null) */ final boolean isMatched() { Object x = item; return (x == this) || ((x == null) == isData); } /** * 當前節點是不是未匹配的請求節點 * !isData 請求節點 * item == null 還未匹配被更新 */ final boolean isUnmatchedRequest() { return !isData && item == null; } /** * 可否將指定的節點node(haveData類型)追加到當前節點後。若是node節點屬性與當前節點相反,且當前節點還未進行匹配則不能追加 */ final boolean cannotPrecede(boolean haveData) { boolean d = isData; Object x; return d != haveData && (x = item) != this && (x != null) == d; } /** * 嘗試人爲匹配數據節點,匹配成功返回true,設置item爲null(不用再匹配了) * 至關於移除當前數據節點,用在remove方法中 */ final boolean tryMatchData() { // assert isData; Object x = item; // item非空且未指向本身則表示當前節點爲還未匹配的數據節點 // 以後嘗試將item置爲null同時喚醒等待的線程 if (x != null && x != this && casItem(x, null)) { LockSupport.unpark(waiter); return true; } return false; } private static final long serialVersionUID = -3375979862319811754L; // Unsafe mechanics // CAS操做 private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; private static final long waiterOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); waiterOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("waiter")); } catch (Exception e) { throw new Error(e); } } }
構造方法比較容易理解,addAll最終循環調用add方法一個一個進行添加this
public LinkedTransferQueue() { } public LinkedTransferQueue(Collection<? extends E> c) { this(); addAll(c); }
put,offer,add 都是調用xfer(e, true, ASYNC, 0)
,須要注意,offer設置超時的那個方法沒用,使用時須要注意!ASYNC表示異步操做,至關於這些方法執行後直接入隊元素而後結束不會像SynchronousQueue隊列那樣阻塞等待匹配元素出現
public void put(E e) { xfer(e, true, ASYNC, 0); } public boolean offer(E e, long timeout, TimeUnit unit) { xfer(e, true, ASYNC, 0); return true; } public boolean offer(E e) { xfer(e, true, ASYNC, 0); return true; } public boolean add(E e) { xfer(e, true, ASYNC, 0); return true; }
當咱們須要不一樣的隊列入隊操做時,根據須要使用下列方法
// 因爲中斷操做致使失敗會拋錯 public void transfer(E e) throws InterruptedException { if (xfer(e, true, SYNC, 0) != null) { Thread.interrupted(); // failure possible only due to interrupt throw new InterruptedException(); } } // 馬上嘗試匹配返回,不進行任何等待操做,xfer源碼部分有判斷這個標識 public boolean tryTransfer(E e) { return xfer(e, true, NOW, 0) == null; } // 嘗試匹配未匹配等待超時時間才返回,如被中斷則拋錯 public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null) return true; if (!Thread.interrupted()) return false; throw new InterruptedException(); }
出隊操做,請求數據節點,這裏xfer方法的參數也能看出其使用方法的不一樣,take方法獲取不到對應的匹配節點會阻塞操做,而poll方法在未設置超時時間時以NOW模式,至關於直接獲取數據,無論有沒有都會直接返回結果,不會進行阻塞等待
public E take() throws InterruptedException { E e = xfer(null, false, SYNC, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); } public E poll(long timeout, TimeUnit unit) throws InterruptedException { E e = xfer(null, false, TIMED, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) return e; throw new InterruptedException(); } public E poll() { return xfer(null, false, NOW, 0); }
LinkedTransferQueue核心方法,全部的操做最終都經過xfer實現,經過how參數的不一樣進行不一樣的處理,在匹配上時判斷當前head的slack閾值,若是達到上限則進行head更新
/** * @param e 數據節點(e非空)或者請求節點(e爲null) * @param haveData 數據節點爲true,請求節點爲false * @param how NOW, ASYNC, SYNC, or TIMED 4種類型,上面已經介紹過 * @param nanos TIMED模式下設置的超時時間 * @return 節點匹配上則返回對應的匹配項不然傳入的參數e * @throws */ private E xfer(E e, boolean haveData, int how, long nanos) { // 校驗節點 if (haveData && (e == null)) throw new NullPointerException(); // 假如須要的話,這裏先聲明待添加的節點 Node s = null; // the node to append, if needed // 標籤 continue跳轉 retry: for (;;) { // restart on append race // 從頭節點開始嘗試匹配 for (Node h = head, p = h; p != null;) { // find & match first node boolean isData = p.isData; Object item = p.item; // item != p 表示p節點item未指向本身,未執行forgetContents,未被取消或匹配 // (item != null) == isData 表示 p 是一個還未匹配的數據節點或請求節點 // 不知足條件可能須要執行後面邏輯 if (item != p && (item != null) == isData) { // unmatched // 相同節點類型,說明和隊列中全部節點相同類型,無需匹配,跳出這個循環根據類型繼續接下來的操做 if (isData == haveData) // can't match break; // 執行到這說明p節點還未匹配上且與當前節點是相異類型,cas更新item成功則表示匹配上了 // 注意這裏只更新了head指向的節點,由於本次線程的e節點到這裏還未入隊 // 這裏將p的item指向爲對應操做的節點e,表示p對應的節點已經與這次的e匹配上了 // 若是未更新成功,說明p已經被其餘人匹配上,執行後面邏輯繼續循環 if (p.casItem(item, e)) { // match // p當前已經不是指向h了,說明p已經被循環next更新過了 for (Node q = p; q != h;) { Node n = q.next; // update by 2 unless singleton // h依舊指向頭節點嘗試更新head指向 // 鬆弛度等於2則更新head,h->q->n if (head == h && casHead(h, n == null ? q : n)) { // 將h的next更新方便回收 h.forgetNext(); break; } // advance and retry // head爲空或者head的next節爲空或者head的next節點未被匹配或取消 // 此時跳出循環,slack較小不須要更新head if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } // p已經與e匹配上了,喚醒p節點對應的等待線程 LockSupport.unpark(p.waiter); // 轉換類型返回 return LinkedTransferQueue.<E>cast(item); } } // 已被其餘線程匹配則遍歷下一個節點 Node n = p.next; // p == n 即 p == p.next 執行了forgetNext // 說明頭節點指向已經更新了,p節點已經離隊須要從新從頭開始匹配 p = (p != n) ? n : (h = head); // Use head if p offlist } // 循環隊列未匹配上同時爲非NOW模式,NOW則直接返回入參e if (how != NOW) { // No matches available // s爲空則初始化s節點 if (s == null) s = new Node(e, haveData); // 嘗試添加節點s到隊列尾部 Node pred = tryAppend(s, haveData); // null表示當前有匹配的節點了,從retry開始從新開始判斷處理 // 在後面的方法中會分析tryAppend if (pred == null) continue retry; // lost race vs opposite mode // 執行到這裏說明pred非null,s添加到隊列中了,pred表明的是s的前驅節點或者s自己 // 處理SYNC/TIMED模式 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); } return e; // not waiting } }
爲了方便理解,這裏經過流程圖顯示xfer的操做流程:
嘗試將s節點添加到隊列尾部,同時當tail的slack達到閾值時則更新tail指向,不一樣的返回值對應不一樣的處理過程,查看xfer源碼上的處理,返回值含義以下:
/** * * @param s 添加到隊列的節點元素 * @param haveData 數據節點入隊爲true,請求節點入隊爲false */ private Node tryAppend(Node s, boolean haveData) { // p指向尾結點 for (Node t = tail, p = t;;) { // move p to last node and append Node n, u; // temps for reads of next & tail // 頭尾節點爲空則表示隊列爲空 if (p == null && (p = head) == null) { // 隊列爲空時嘗試更新頭節點爲s便可,失敗從新循環處理 if (casHead(null, s)) return s; // initialize } // 隊列非空同時添加的s節點與p節點數據類型不一樣表示二者能夠匹配則返回null進行標記處理(xfer中使用到了) else if (p.cannotPrecede(haveData)) return null; // lost race vs opposite mode // p節點非當前尾節點(可能被其餘線程更新了tail) else if ((n = p.next) != null) // not last; keep traversing // 知足條件更新t指向tail,p指向t從新循環開始 p = p != t && t != (u = tail) ? (t = u) : // stale tail // 在隊列更新p = p.next 從新開始循環或離隊狀態則置p爲null循環從頭節點開始 (p != n) ? n : null; // restart if off list // p爲當前尾結點,嘗試更新p的next指向s失敗則更新p指向p的next // 失敗說明別的線程更新了p的next,此時更新p從新循環執行 else if (!p.casNext(null, s)) p = p.next; // re-read on CAS failure else { // 執行到這代表更新p的next爲s成功 // p和t已經不一樣了,p可能循環了幾回才成功更新next,t仍是以前的指向,須要更新 // p != t 爲真時 slack >=2 // 若是t = p 更新next爲s成功,則slack = 1,這個條件不會進去 // t != p,整個節點關聯爲...->t->...->p->s,t到s距離 >= 2 if (p != t) { // update if slack now >= 2 // 更新tail,不停的判斷是不是tail,不是則持續向前直到尾部節點,而後更新tail退出while while ((tail != t || !casTail(t, s)) && (t = tail) != null && (s = t.next) != null && // advance and retry (s = s.next) != null && s != t); } // 返回添加節點的前驅節點p return p; } } }
xfer中處理SYNC/TIMED模式時調用,處理阻塞等待和超時等待匹配節點方式,參考上面xfer中調用的地方,其中的入參說明以下:
注意返回值的含義,匹配上返回對應匹配節點的item,如未匹配上,中斷或者超時則返回入參e
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { // 計算過時時間 final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); // 初始化 int spins = -1; // initialized after first item and cancel checks // 使用ThreadLocalRandom產生併發隨機數 ThreadLocalRandom randomYields = null; // bound if needed for (;;) { Object item = s.item; // item != e 說明s的item已經被更新了,表示已經與其餘節點匹配上了 // item更新成對應匹配節點的item,參考xfer匹配節點過程理解 if (item != e) { // matched // assert item != s; // 已經被匹配上了,將當前節點forgetContents,避免垃圾堆積 s.forgetContents(); // avoid garbage // 類型轉化返回結束 return LinkedTransferQueue.<E>cast(item); } // 還未被匹配,先判斷當前線程是否被中斷或者超時 // 第一個條件爲true時,s節點嘗試更新item指向本身(取消操做,這裏s是本次操做的節點,取消了就不用再繼續處理了) if ((w.isInterrupted() || (timed && nanos <= 0)) && s.casItem(e, s)) { // cancel // 將s和其前驅節點解除關聯 unsplice(pred, s); return e; } // 到這裏代表沒匹配上同時也沒被中斷或超時 // 自旋次數設置,單核機器不會進行自旋 if (spins < 0) { // establish spins at/near front // 計算spins if ((spins = spinsFor(pred, s.isData)) > 0) // 調用current獲取併發隨機數產生類 randomYields = ThreadLocalRandom.current(); } // 自旋次數循環遞減 else if (spins > 0) { // spin --spins; if (randomYields.nextInt(CHAINED_SPINS) == 0) // 隨機等於0時,當前線程讓步,給其餘線程執行機會 Thread.yield(); // occasionally yield } // 執行到這已經進行過自旋了spins = 0,說明暫時無匹配節點先保存當前線程 // 這裏設置完了還繼續循環處理 else if (s.waiter == null) { s.waiter = w; // request unpark then recheck } // 設置超時則超時等待 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos > 0L) LockSupport.parkNanos(this, nanos); } // 阻塞等待當前線程直到被其餘線程喚醒 else { LockSupport.park(this); } } }
根據當前節點的前驅節點和當前節點的數據類型返回對應的自旋值,不一樣狀況下返回不一樣的數值
private static int spinsFor(Node pred, boolean haveData) { // 多CPU,前驅節點非空才能自旋操做 if (MP && pred != null) { // 前驅和當前節點類型不一樣則自旋FRONT_SPINS + CHAINED_SPINS if (pred.isData != haveData) // phase change return FRONT_SPINS + CHAINED_SPINS; // 前驅節點已經被匹配了,返回自旋FRONT_SPINS次數 if (pred.isMatched()) // probably at front return FRONT_SPINS; // 前驅等待線程爲空,還沒更新waiter,說明前驅節點在自旋操做,返回CHAINED_SPINS if (pred.waiter == null) // pred apparently spinning return CHAINED_SPINS; } return 0; }
返回p的後繼節點,若是p.next指向p(p節點已經離隊),則返回head頭節點
final Node succ(Node p) { Node next = p.next; return (p == next) ? head : next; }
找到第一個未匹配節點,數據類型一致則返回節點,不一致則返回null。hasWaitingConsumer使用firstOfMode來進行了判斷,firstDataNode,firstDataItem(peek使用了)相似不詳細進行說明了,countOfMode計算對應類型節點的數量,源碼也比較簡單
private Node firstOfMode(boolean isData) { // 從頭開始進行循環判斷 for (Node p = head; p != null; p = succ(p)) { // 未匹配節點 if (!p.isMatched()) // 數據類型一致則返回p,不然返回null return (p.isData == isData) ? p : null; } return null; } final Node firstDataNode() { for (Node p = head; p != null;) { Object item = p.item; if (p.isData) { if (item != null && item != p) return p; } // 頭節點未被匹配同時非數據節點則隊列中此刻應該只有請求節點不須要再循環判斷下去了 else if (item == null) break; if (p == (p = p.next)) p = head; } return null; } private E firstDataItem() { for (Node p = head; p != null; p = succ(p)) { Object item = p.item; if (p.isData) { if (item != null && item != p) return LinkedTransferQueue.<E>cast(item); } else if (item == null) return null; } return null; } private int countOfMode(boolean data) { int count = 0; for (Node p = head; p != null; ) { if (!p.isMatched()) { if (p.isData != data) return 0; if (++count == Integer.MAX_VALUE) // saturated break; } Node n = p.next; if (n != p) p = n; else { count = 0; p = head; } } return count; }
前驅節點與已刪除或者取消狀態的s節點取消鏈接,將兩個節點取消關聯
/** * * @param pred s的前驅節點或者爲null或者爲s本身(當s爲頭節點時) * @param s 取消或刪除的節點 */ final void unsplice(Node pred, Node s) { // 清理s節點變量 s.forgetContents(); // forget unneeded fields // 確認pred的next指向s即二者之間還有關聯才處理 if (pred != null && pred != s && pred.next == s) { Node n = s.next; // s的next爲空表示s爲尾結點 // s的後繼非s且pred更新next成功且pred已被匹配,嘗試解除s節點 if (n == null || (n != s && pred.casNext(s, n) && pred.isMatched())) { // 檢查是不是頭節點並更新 for (;;) { // check if at, or could be, head Node h = head; // 頭節點爲前驅節點 // 頭節點爲s節點 // 頭節點爲空,則表示爲空隊列 if (h == pred || h == s || h == null) return; // at head or list empty // 頭節點未被匹配則跳出循環 if (!h.isMatched()) break; // 到這說明h已經被匹配,須要更新head Node hn = h.next; // 頭節點後繼節點爲空,驗證隊列爲空 if (hn == null) return; // now empty // 頭節點後繼節點非頭節點而且嘗試更新頭節點爲後繼節點 if (hn != h && casHead(h, hn)) // 清理原頭節點 h.forgetNext(); // advance head } // 解除先後節點連接失敗則統計閾值處理 // 再次檢查是否離隊 if (pred.next != pred && s.next != s) { // recheck if offlist // 根據SWEEP_THRESHOLD閾值進行判斷處理 for (;;) { // sweep now if enough votes int v = sweepVotes; // 小於閾值則嘗試將閾值加1 if (v < SWEEP_THRESHOLD) { if (casSweepVotes(v, v + 1)) break; } // 大於等於閾值則將閾值歸0同時經過sweep方法進行清理 else if (casSweepVotes(v, 0)) { sweep(); break; } } } } } }
從頭節點開始遍歷清理匹配節點(取消的節點)的節點關聯關係
private void sweep() { // 從頭節點開始,p開始爲頭節點,s爲p的後繼節點 for (Node p = head, s, n; p != null && (s = p.next) != null; ) { // s爲未匹配的節點,開始遍歷下一個 if (!s.isMatched()) // Unmatched nodes are never self-linked p = s; // s已經被匹配了,若是s爲尾節點,遍歷完了,終止 else if ((n = s.next) == null) // trailing node is pinned break; // s的next指向本身,說明s已經離隊 else if (s == n) // stale // No need to also check for p == s, since that implies s == n // 從頭從新開始 p = head; else // 更新p的next p.casNext(s, n); } }
移除對應的節點
private boolean findAndRemove(Object e) { if (e != null) { // 循環 for (Node pred = null, p = head; p != null; ) { // 匹配item Object item = p.item; // 數據節點比較item是否相等,相等則經過tryMatchData自我匹配,而後unsplice取消先後節點關係 if (p.isData) { if (item != null && item != p && e.equals(item) && p.tryMatchData()) { unsplice(pred, p); return true; } } // 請求節點同時還未被匹配,隊列中沒有數據節點,直接跳出 else if (item == null) break; pred = p; // p已是舊的數據,須要更新p指向head從新循環處理 if ((p = p.next) == pred) { // stale pred = null; p = head; } } } return false; }
主要的源碼部分基本已分析完畢,關於迭代器的部分再也不詳述,讀者可自行閱讀理解
LinkedTransferQueue做爲一個基於鏈表的FIFO無界阻塞隊列,使用了一些複雜的概念,雙重隊列,鬆弛度都是須要好好理解的部分,應該先從總體瞭解其流程處理,再細看其內部實現,其核心方法在於xfer,能夠參考流程圖進行梳理,做爲阻塞隊列,使用好LinkedTransferQueue是不容易的,方法的使用須要參考源碼,不然用錯地方致使線上事故得不償失,但願本文對各位有所幫助
以上內容若有問題歡迎指出,筆者驗證後將及時修正,謝謝