阻塞隊列的實現前面已經講解完畢,今天咱們繼續瞭解源碼中非阻塞隊列的實現,接下來就看一看ConcurrentLinkedQueue非阻塞隊列是怎麼完成操做的java
JDK版本號:1.8.0_171
ConcurrentLinkedQueue是一個基於鏈表實現的無界的線程安全的FIFO非阻塞隊列。最大的不一樣之處在於非阻塞特性,以前講解的阻塞隊列都會經過各類方式進行阻塞操做,在ConcurrentLinkedQueue中經過CAS操做來完成非阻塞操做。其中head和tail的更新相似以前在LinkedTransferQueue中講解的slack(鬆弛度)機制,只有在slack閾值大於等於2時纔會進行更新,儘可能減小CAS的操做次數,固然,這樣的操做也提升了代碼實現的複雜度node
從關係圖上咱們也能夠看到ConcurrentLinkedQueue沒有去實現BlockingQueue接口安全
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable
爲了瞭解其內部實現的操做,能夠看下面的過程圖理解其內部結點入隊出隊的轉換過程多線程
除了CAS須要使用的常量,就只剩下head和tail兩個引用結點,在其註釋部分能夠看到做者的說明,這裏解釋下:函數
head結點:this
tail結點(tail的next爲null):spa
因爲head結點和tail結點不是實時更新,達到鬆弛度閾值才進行更新,有可能致使head結點在tail結點以後的現象線程
/** * A node from which the first live (non-deleted) node (if any) * can be reached in O(1) time. * Invariants: * - all live nodes are reachable from head via succ() * - head != null * - (tmp = head).next != tmp || tmp != head * Non-invariants: * - head.item may or may not be null. * - it is permitted for tail to lag behind head, that is, for tail * to not be reachable from head! */ private transient volatile Node<E> head; /** * A node from which the last node on list (that is, the unique * node with node.next == null) can be reached in O(1) time. * Invariants: * - the last node is always reachable from tail via succ() * - tail != null * Non-invariants: * - tail.item may or may not be null. * - it is permitted for tail to lag behind head, that is, for tail * to not be reachable from head! * - tail.next may or may not be self-pointing to tail. */ private transient volatile Node<E> tail; // CAS操做 private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; private static final long tailOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = ConcurrentLinkedQueue.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); tailOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("tail")); } catch (Exception e) { throw new Error(e); } }
Node實現比較簡單,沒複雜的部分,主要是經過CAS操做進行更新變量指針
private static class Node<E> { volatile E item; volatile Node<E> next; /** * Constructs a new node. Uses relaxed write because item can * only be seen after publication via casNext. */ Node(E item) { UNSAFE.putObject(this, itemOffset, item); } boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
無參構造方法建立了空結點同時頭尾結點指向這個空結點,集合參數構造時循環添加結點,比較簡單,主要須要理解默認無參構造函數建立時發生的變化rest
public ConcurrentLinkedQueue() { head = tail = new Node<E>(null); } public ConcurrentLinkedQueue(Collection<? extends E> c) { Node<E> h = null, t = null; for (E e : c) { checkNotNull(e); Node<E> newNode = new Node<E>(e); if (h == null) h = t = newNode; else { t.lazySetNext(newNode); t = newNode; } } if (h == null) h = t = new Node<E>(null); head = h; tail = t; }
h != p的前提條件下嘗試更新head指向到p,成功則嘗試更新原head結點指向到本身,表示結點離隊
/** * Tries to CAS head to p. If successful, repoint old head to itself * as sentinel for succ(), below. */ final void updateHead(Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) h.lazySetNext(h); }
獲取p結點的後繼結點,當next指向本身時,結點自己可能已經處於離隊狀態,此時返回head結點
/** * Returns the successor of p, or the head node if p.next has been * linked to self, which will only be true if traversing with a * stale pointer that is now off the list. */ final Node<E> succ(Node<E> p) { Node<E> next = p.next; return (p == next) ? head : next; }
入隊操做核心方法,入隊必成功,返回爲true,表示入隊會一直嘗試操做直到成功,循環嘗試中主要分爲3種狀況:
public boolean offer(E e) { // 判空 checkNotNull(e); final Node<E> newNode = new Node<E>(e); // 循環直到成功 for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; // p此時是最後一個結點 if (q == null) { // 則開始嘗試更新p的next指向新插入的結點 // p的next未更新成功說明next被其餘結點搶先更新了,從新循環判斷嘗試 if (p.casNext(null, newNode)) { // tail指向結點後已經添加了1個結點以上時才更新tail結點指向 // 即slack >= 2時才嘗試更新 if (p != t) // hop two nodes at a time // 失敗可能被其餘線程更新了 casTail(t, newNode); // Failure is OK. return true; } // Lost CAS race to another thread; re-read next } // p非最後一個結點,同時p = p.next則表示p自己已經離隊,須要更新p else if (p == q) // tail結點已經被更新則更新tail不然從head結點開始尋找最後一個結點 p = (t != (t = tail)) ? t : head; else // p非最後一個結點,同時p未離隊刪除,若是tail被其餘線程更新了則p更新成新的tail,不然p更新成p.next繼續循環 p = (p != t && t != (t = tail)) ? t : q; } }
出隊操做核心方法,一直嘗試直到成功,循環嘗試中主要分爲4種狀況:
public E poll() { restartFromHead: // 循環嘗試直到成功 for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; // item非null則嘗試更新成null(表示結點已出隊) if (item != null && p.casItem(item, null)) { // 出隊結點非以前的頭結點,舊頭結點h距離實際的head結點已經大於1則更新head if (p != h) // hop two nodes at a time // 出隊結點後無結點則嘗試更新head結點爲出隊結點自己(item = null),有結點則更新到出隊結點後的那個結點 updateHead(h, ((q = p.next) != null) ? q : p); return item; } // item爲空或item已被其餘線程獲取 // p結點自己爲最後一個結點,則嘗試更新head並更新原h結點指向本身,返回null else if ((q = p.next) == null) { updateHead(h, p); return null; } // p非最後一個結點,p == p.next 則表示p結點已經離隊,則跳轉restartFromHead從頭從新循環判斷 else if (p == q) continue restartFromHead; // p非最後一個結點,p也未離隊,則更新p指向p的下一個結點,循環判斷 else p = q; } } }
和poll方法相似,主要在於不會對結點進行出隊操做,僅僅是獲取頭結點的item值,固然中間也可能幫助更新下head指向
public E peek() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; if (item != null || (q = p.next) == null) { updateHead(h, p); return item; } else if (p == q) continue restartFromHead; else p = q; } } }
和poll方法相似,poll返回的是item這裏返回的是結點,若是是null結點(item == null),這裏判斷下直接返回null
Node<E> first() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { boolean hasItem = (p.item != null); if (hasItem || (q = p.next) == null) { updateHead(h, p); return hasItem ? p : null; } else if (p == q) continue restartFromHead; else p = q; } } }
從隊列中刪除元素,經過item是否爲null判斷結點是否已經離隊,是則繼續後繼判斷,casItem(item, null)成功則表示移除結點成功,失敗則表示被其餘線程出隊操做了,則繼續後繼判斷
public boolean remove(Object o) { if (o != null) { Node<E> next, pred = null; for (Node<E> p = first(); p != null; pred = p, p = next) { boolean removed = false; E item = p.item; // item判斷(非離隊結點),不知足則繼續判斷後繼結點 if (item != null) { if (!o.equals(item)) { next = succ(p); continue; } // 找到匹配結點則嘗試更新item爲null,表示當前結點已經離隊 removed = p.casItem(item, null); } // 後繼結點,到這說明匹配的結點已經刪除了(別的線程刪除或者當前線程刪除) next = succ(p); // 取消匹配結點的關聯 if (pred != null && next != null) // unlink pred.casNext(p, next); // 假如是當前線程刪除的結點則返回,不然繼續判斷後繼直到匹配或沒有匹配結點才返回 if (removed) return true; } } return false; }
將集合c中的元素添加到隊列中,添加到原隊列尾部相似於上面的offer方法
public boolean addAll(Collection<? extends E> c) { if (c == this) // As historically specified in AbstractQueue#addAll throw new IllegalArgumentException(); // 定義兩個指針結點指向集合c的頭尾 // 先將c改形成Node鏈表 Node<E> beginningOfTheEnd = null, last = null; for (E e : c) { checkNotNull(e); Node<E> newNode = new Node<E>(e); if (beginningOfTheEnd == null) beginningOfTheEnd = last = newNode; else { last.lazySetNext(newNode); last = newNode; } } if (beginningOfTheEnd == null) return false; for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; // p爲隊列最後一個結點 if (q == null) { // 將隊列與上面新建立的鏈表鏈接起來,更新失敗再循環繼續 if (p.casNext(null, beginningOfTheEnd)) { // tail更新失敗從新嘗試 if (!casTail(t, last)) { t = tail; if (last.next == null) casTail(t, last); } return true; } } // p非最後一個結點且已經離隊 else if (p == q) // tail結點已經被更新則更新爲tail不然從head結點開始尋找最後一個結點 p = (t != (t = tail)) ? t : head; else // p非最後一個結點,同時p未離隊刪除,若是tail被其餘線程更新了則p更新成新的tail,不然p更新成p.next繼續循環 p = (p != t && t != (t = tail)) ? t : q; } }
迭代器和以前隊列講解的迭代器類似,源碼不是很複雜,同時remove方法這裏是將item置爲null,先後結點關聯關係並不會操做,防止多線程遍歷出現問題
構造方法中執行了advance()方法,提早設置好下次next執行時的結點nextNode,以及其item引用,hasNext判斷nextNode便可,保證了迭代器的弱一致性,一旦hasNext返回true,那麼調用next必定會獲得相對應的item,即便該結點item已經被置爲null
public Iterator<E> iterator() { return new Itr(); } private class Itr implements Iterator<E> { /** * next返回的Node */ private Node<E> nextNode; /** * 保存next的item,防止hasNext爲true後結點被刪除再調用next獲取不到值的狀況 */ private E nextItem; /** * 最近一次調用next返回的結點,若是經過調用remove刪除了此元素,則重置爲null,避免刪除了不應刪除的元素 */ private Node<E> lastRet; /** * 構造的時候就先保存了第一次調用next返回的Node */ Itr() { advance(); } /** * Moves to next valid node and returns item to return for * next(), or null if no such. */ private E advance() { lastRet = nextNode; E x = nextItem; Node<E> pred, p; if (nextNode == null) { p = first(); pred = null; } else { pred = nextNode; p = succ(nextNode); } for (;;) { if (p == null) { nextNode = null; nextItem = null; return x; } E item = p.item; if (item != null) { nextNode = p; nextItem = item; return x; } else { // 跳過null結點 Node<E> next = succ(p); if (pred != null && next != null) pred.casNext(p, next); p = next; } } } public boolean hasNext() { return nextNode != null; } public E next() { if (nextNode == null) throw new NoSuchElementException(); return advance(); } public void remove() { Node<E> l = lastRet; if (l == null) throw new IllegalStateException(); // rely on a future traversal to relink. l.item = null; lastRet = null; } }
ConcurrentLinkedQueue是一個基於鏈表實現的無界的線程安全的FIFO非阻塞隊列,總體源碼上最主要的部分在於兩點:
結合圖解很容易理清其實現以及操做流程,相比較於以前的LinkedTransferQueue源碼算是簡單了不少
以上內容若有問題歡迎指出,筆者驗證後將及時修正,謝謝