若是要實現一個線程安全的隊列有兩種方式:一種是使用阻塞算法,另外一種是使用非阻塞算法。若是使用阻塞算法的隊列能夠用一個鎖(出隊和入隊用同一把鎖)或兩個鎖(出隊和入隊使用不一樣的鎖)等方式來實現。非阻塞的實現方式可使用循環CAS的方式來實現。node
ConcurrentLinkedQueue 是一個基於連接節點的無界線程安全隊列,它採用先進先出的規則對節點進行排序,當咱們添加一個元素的時,它會返回隊列頭部的元素。採用了 wait-free(即CAS)來實現。算法
ConcurrentLInkedQueue 由head節點和tail節點組成,每一個節點(Node)由節點元素(item)和指向下一個節點(next)的引用組成,節點與節點之間就是經過這個next關聯起來,從而組成一張列表結構的隊列。安全
入隊列就是將入隊列節點添加到隊列的尾部。入隊主要作兩件事:第一是將入隊節點設置成當前隊列尾節點的下一個節點,第二是更新tail節點,若是tail節點的next節點不爲空,則將入隊節點設置成tail節點,若是tail節點的next節點爲空,則將入隊節點設置成tail的next節點,因此tail節點不老是尾節點。(不理解?)---元素會先一次入隊,而後更新tail節點的指向。this
public boolean offer(E e){ if (e == null) throw new NullPointerException(); //入隊前,建立一個入隊節點 Node<E> n = new Node<E>(e); retry: //死循環,入隊不成功反覆入隊 for (;;){ //建立一個指向tail節點的引用 Node<E> t = tail; //p用來表示隊列的尾節點,默認狀況下等於tail節點。 Node<E> p = t; for (int hops = 0;;hops++){ //得到p節點的下一個節點。 Node<E> next = succ(p); //next 節點不爲空,說明p不是尾節點,須要更新p後再將它指向next節點 if(next != null){ //循環兩次及其以上,而且當前節點仍是不等於尾節點 if (hops >HOPS && t!=tail){ continue retry; } p =next; } //若是p是尾節點,則設置p節點的next節點爲入隊節點。入隊節點是n else if(p.casNext(null,n)){ /*** * 若是tail節點有大於等於1個next ,則將入隊節點設置成tail節點,更新失敗了也沒有關係 * 由於失敗了表示有其餘線程成功更新了tail節點 */ if(hops >= HOPS){ casTail(t,n);//更新tail節點,運行失敗 return true; }else { p = succ(p); } } } } }
//JDK1.8 的處理策略 public boolean offer(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; if (q == null) { // p is last node if (p.casNext(null, newNode)) { // Successful CAS is the linearization point // for e to become an element of this queue, // and for newNode to become "live". if (p != t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. return true; } // Lost CAS race to another thread; re-read next } else if (p == q) // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t != (t = tail)) ? t : head; else // Check for tail updates after two hops. p = (p != t && t != (t = tail)) ? t : q; } }
tail節點並不老是尾節點,全部每次入隊都必須先經過tail節點來找到尾節點。尾節點多是tail節點,也多是tail節點的next節點。線程
final Node<E> succ(Node<E> p){ Node<E> next = p.getNext(); //p節點和p的next節點都等於空,表示這個隊列剛初始化,正準備添加節點,因此須要返回head節點。 return (p == next) ? head:next; }
p.casNext(null,n) 方法用於將入隊節點設置爲當前隊列尾節點的next節點,若是p是null,表示p是當前隊列的尾節點,若是不爲null,表示其餘線程更新了尾節點,則須要從新獲取當前隊列的尾節點。
讓tail節點永遠做爲隊列的尾節點,邏輯清晰和易懂。可是每次都須要使用循環CAS更新tail節點。若是能減小CAS更新tail節點的次數,就能提升入隊的效率。設計
出隊列就是從隊列裏返回一個節點元素,並清空該節點對元素的引用。隊列出隊,並非每次都更新head節點,當head節點裏又元素時,直接彈出head節點裏的元素,而不會更新head節點。緣由也是爲了提升效率,減小CAS的次數。rest
//JDK 1.8 的處理策略 public E poll() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; if (item != null && p.casItem(item, null)) { // Successful CAS is the linearization point // for item to be removed from this queue. if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } else if ((q = p.next) == null) { updateHead(h, p); return null; } else if (p == q) continue restartFromHead; else p = q; } } }