ConcurrentLinkedQueue是無阻塞隊列的一種實現, 依賴與CAS算法實現。node
if(q==null)
當前是尾節點 -> CAS賦值tail.next = newNode, 成功就跳出循環elseif(p == q)
尾節點被移除 -> 從tail或head從新日後找else
不是尾節點 -> 往next找當一個節點的next指向自身時, 表示節點已經被移除, 註釋中還會強調這一點。算法
/** * Inserts the specified element at the tail of this queue. * As the queue is unbounded, this method will never return {@code false}. * * @return {@code true} (as specified by {@link Queue#offer}) * @throws NullPointerException if the specified element is null */ /* * 變量說明: * 成員變量: * head: 首節點 * tail: 尾節點, 不必定指向末尾, 兩次入隊才更新一次 * 局部變量 * t= tail; //保存循環開始時, 當時的tail值 * p= t; // 每次查找的起始位置, 可能指向首節點head或者臨時尾節點t * q= p.next; // 每次循環下一個節點 * newNode= new Node; // 新節點 * * * 重要概念: * 當p = p.next時, 表示節點已經被移除 */ public boolean offer(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; if (q == null) { // 狀況1: p是尾節點 // p is last node // p是尾節點就直接將新節點放入末尾 if (p.casNext(null, newNode)) { // Successful CAS is the linearization point // for e to become an element of this queue, // and for newNode to become "live". if (p != t) // hop two nodes at a time // 一次跳兩個節點, 即插入兩次, tail更新一次 casTail(t, newNode); // Failure is OK. // 失敗也無妨, 說明被別的線程更新了 return true; // 退出循環 } // Lost CAS race to another thread; re-read next } else if (p == q) // 狀況2: p節點被刪除了 // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. // 保存的節點p、t都已經失效了,這時須要從新檢索,從新檢索的起始位置有兩種狀況 // 1.1. 若是tail==t,表示tail也是失效的, 那麼從head開始找 // 1.2. 不然tail就是被其餘線程更新了, 能夠又試着從tail找 p = (t != (t = tail)) ? t : head; else // 狀況3: 沿着p往下找 // Check for tail updates after two hops. // 這段簡單看做p = q就好理解了, 這麼寫是爲了提升效率: // 1. 狀況二中p可能指向了head(因爲tail節點失效致使的) // 2. 如今tail可能被其餘線程更新,也許從新指向了隊尾 // 3. 若是是, 嘗試則從隊尾開始找, 以減小迭代次數 p = (p != t && t != (t = tail)) ? t : q; } }
狀況2中的p = (t != (t = tail)) ? t : head;
(t != (t = tail))
能夠分三步來看緩存
1.1. 首先取出t 1.2. 將tail賦值給t 1.3. 將先前取出的t與更新後的t比較
狀況3中p = (p != t && t != (t = tail)) ? t : q;
併發
首先: p != t: 這種狀況只有可能發生在執行了狀況2後
現狀: 這時p指向head或者中間的元素, t指向一個被刪除了的節點
那麼若是tail被其餘線程更新了, 咱們能夠將t從新指向tail, p指向t, 就像剛進循環同樣, 從尾節點開始檢索。
這樣比從head日後找更有效率
補充一項, item==null,也表示節點已經被刪除(參考remove方法)。高併發
/** * updateHead * */ public E poll() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; if (item != null && p.casItem(item, null)) { // Successful CAS is the linearization point // for item to be removed from this queue. if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } else if ((q = p.next) == null) { updateHead(h, p); return null; } else if (p == q) continue restartFromHead; else p = q; } } } /** * Tries to CAS head to p. If successful, repoint old head to itself * as sentinel for succ(), below. */ final void updateHead(Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) h.lazySetNext(h); }
先更新head, 再將舊head的next指向本身this
UNSAFE.compareAndSwapObject(對象, 字段偏移量, 當前值, 新值)
線程
能夠爲對象中的某個字段實現CAS操做
UNSAFE.putOrderedObject(對象, 字段偏移量, 新值)
rest
這個只能用在 volatile字段上 我的理解: volatile的設值會致使本地緩存失效, 那麼須要從新從主存讀取, 使用這個方法可使寄存器緩存依舊有效, 沒必要急於從主存取值。 使用目的: 移除節點時, 須要更新節點的next指向自身, 但如今next指向的數據實際是有效的; 高併發狀況下,若是offser方法已經緩存了這個next值, 直接設置next會致使緩存行失效, CPU須要從新讀取next; 而使用putOrderedObject可讓offser從這個next繼續檢索