Java 併發容器和框架--ConcurrentLinkedQueue

簡述

若是要實現一個線程安全的隊列有兩種方式:一種是使用阻塞算法,另外一種是使用非阻塞算法。若是使用阻塞算法的隊列能夠用一個鎖(出隊和入隊用同一把鎖)或兩個鎖(出隊和入隊使用不一樣的鎖)等方式來實現。非阻塞的實現方式可使用循環CAS的方式來實現。node

ConcurrentLinkedQueue 是一個基於連接節點的無界線程安全隊列,它採用先進先出的規則對節點進行排序,當咱們添加一個元素的時,它會返回隊列頭部的元素。採用了 wait-free(即CAS)來實現。算法

ConcurrentLinkedQueue的結構

ConcurrentLInkedQueue 由head節點和tail節點組成,每一個節點(Node)由節點元素(item)和指向下一個節點(next)的引用組成,節點與節點之間就是經過這個next關聯起來,從而組成一張列表結構的隊列。安全

入隊列

1.入隊列的過程

入隊列就是將入隊列節點添加到隊列的尾部。入隊主要作兩件事:第一是將入隊節點設置成當前隊列尾節點的下一個節點,第二是更新tail節點,若是tail節點的next節點不爲空,則將入隊節點設置成tail節點,若是tail節點的next節點爲空,則將入隊節點設置成tail的next節點,因此tail節點不老是尾節點。(不理解?)---元素會先一次入隊,而後更新tail節點的指向。this

public boolean offer(E e){
        if (e == null) throw new NullPointerException();
        //入隊前,建立一個入隊節點
        Node<E> n = new Node<E>(e);

        retry:
        //死循環,入隊不成功反覆入隊
        for (;;){
            //建立一個指向tail節點的引用
            Node<E> t = tail;
            //p用來表示隊列的尾節點,默認狀況下等於tail節點。
            Node<E> p = t;
            for (int hops = 0;;hops++){
                //得到p節點的下一個節點。
                Node<E> next = succ(p);
                
                //next 節點不爲空,說明p不是尾節點,須要更新p後再將它指向next節點
                if(next != null){
                    //循環兩次及其以上,而且當前節點仍是不等於尾節點
                    if (hops >HOPS && t!=tail){
                        continue retry;
                    }
                    p =next;
                }
                //若是p是尾節點,則設置p節點的next節點爲入隊節點。入隊節點是n
                else if(p.casNext(null,n)){
                    /***
                     * 若是tail節點有大於等於1個next ,則將入隊節點設置成tail節點,更新失敗了也沒有關係
                     * 由於失敗了表示有其餘線程成功更新了tail節點
                     */
                    if(hops >= HOPS){
                        casTail(t,n);//更新tail節點,運行失敗
                        return true;
                    }else {
                        p = succ(p);
                    }
                    
                }
                
                
            }
            
        }
    }
//JDK1.8 的處理策略
    public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p is last node
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

2.定位尾節點

tail節點並不老是尾節點,全部每次入隊都必須先經過tail節點來找到尾節點。尾節點多是tail節點,也多是tail節點的next節點。線程

final Node<E> succ(Node<E> p){
    Node<E> next = p.getNext();
    //p節點和p的next節點都等於空,表示這個隊列剛初始化,正準備添加節點,因此須要返回head節點。
    return (p == next) ? head:next;
    
}

3.設置入隊節點爲尾節點

p.casNext(null,n) 方法用於將入隊節點設置爲當前隊列尾節點的next節點,若是p是null,表示p是當前隊列的尾節點,若是不爲null,表示其餘線程更新了尾節點,則須要從新獲取當前隊列的尾節點。

4.HOPS 的設計意圖

讓tail節點永遠做爲隊列的尾節點,邏輯清晰和易懂。可是每次都須要使用循環CAS更新tail節點。若是能減小CAS更新tail節點的次數,就能提升入隊的效率。設計

出隊列

出隊列就是從隊列裏返回一個節點元素,並清空該節點對元素的引用。隊列出隊,並非每次都更新head節點,當head節點裏又元素時,直接彈出head節點裏的元素,而不會更新head節點。緣由也是爲了提升效率,減小CAS的次數。rest

//JDK 1.8 的處理策略

  public E poll() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;

                if (item != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }
相關文章
相關標籤/搜索