ConcurrentLinkedQueue源碼分析

相關文章

Lock鎖源碼分析java

java阻塞隊列BlockingQueuenode

HashMap源碼分析-jdk1.6和jdk1.8的區別算法

LinkedList源碼分析安全

ArrayList源碼分析源碼分析

前言

ConcurrentLinkedQueue是一個線程安全的隊列,它採用的是 CAS 算法來進行實現,也就是說它是非阻塞的;隊列中的元素按照 FIFO(先進先出)的原則對元素進行排列,此外,它是一個無界隊列;添加元素的時候,在鏈表的尾部進行添加,獲取元素的時候,從鏈表的頭部獲取。它內部採用的單向鏈表的形式來表示,鏈表的節點是定義在ConcurrentLinkedQueue的一個內部類。性能

類圖

ConcurrentLinkedQueue 的類圖以下所示:this

能夠看到 ConcurrentLinkedQueue 實現了 Queue 接口和實現了繼承了 AbstractQueue 類,而 Itr 和 Node則是它的一個內部類;spa

Queue 接口只是定義了一些隊列的公共方法,以下:.net

public interface Queue<E> extends Collection<E> {
    // 添加元素
    boolean add(E e);
    // 添加元素
    boolean offer(E e);
    // 刪除元素
    E remove();
    // 刪除並返回第一個元素,若是隊列爲空,則返回 null 
    E poll();
   // 返回第一個元素,若是不存在,則拋出NoSuchElementException異常
    E element();
    // 返回第一個元素,但不刪除,若是隊列爲空,則返回 null 
    E peek();
}

AbstractQueue 類也繼承了 Queue接口,提供了某些方法的實現,以下所示:線程

public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E> {

    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

    public E remove() {
        E x = poll();
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    }
...............................
}

源碼

接下來看看 ConcurrentLinkedQueue 的一個實現過程:

首先看一下隊列中鏈表節點的定義,鏈表中的節點使用一個 Node 內部類來表示:

private static class Node<E> {
        // 節點中的元素
        volatile E item;
        // 下一個節點,沒有上一個節點,表示它是一個單向鏈表的形式
        volatile Node<E> next;
        // 構造一個節點
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }
        // 使用 CAS 的方式設置節點的元素
        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }
        // 設置下一個節點
        void lazySetNext(Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }
        // 採用 CAS 的方式設置下一個節點
        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }
      //  Unsafe 類的一些初始化
}

能夠看到 Node 類的定義比較簡單,值得注意的地方是 E item 元素和 Node next 節點都使用了 volatile 來修飾,這說明了元素或某個節點被一個線程修改了以後,其餘的線程是立馬看到修改後的值的。

接下來看一下 ConcurrentLinkedQueue 類中的屬性和方法:

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {

    // 頭節點,
    private transient volatile Node<E> head;
    // 尾節點,尾節點不必定是鏈表的最後一個節點
    private transient volatile Node<E> tail;
    // 構造
    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
    }
......................
}

以上能夠看到,頭節點 head 和尾節點 tail 都被 volatile 修飾,節點被一個線程修改了以後,是會把修改的最新的值刷新到主內存中去,當其餘線程去讀取該值的時候,會中主內存中獲取最新的值,也就是一個線程修改了以後,對其餘線程是當即可見的。

當使用空的構造其是實例化一個對象的時候,會建立一個節點,節點的值爲 null(添加的時候,是不能爲null的),並把頭節點和尾節點都指向該節點,以下所示:

添加元素

以後向鏈表中添加元素,添加元素的時候,是在鏈表的尾部進行添加,添加元素有兩個方法 add() 和 offer(),add() 會調用 offer() 進行添加,這兩個方法永遠都會返回 true,因此不要使用 true | false 來判斷是否添加成功;

public boolean add(E e) {
        return offer(e);
    }
public boolean offer(E e) {
        // 判空,爲空則拋出空指針異常
        checkNotNull(e);
        // 建立要添加的節點
        final Node<E> newNode = new Node<E>(e);
        
        // 無限循環,入隊不成功,則反覆入隊
        // t 表示 tail 節點
        // p 表示鏈表的尾節點,默認等於 tail 節點
        for (Node<E> t = tail, p = t;;) {
            // q 爲尾節點的下一個節點         
            Node<E> q = p.next;
            // 若是尾節點的下一個節點爲空,則表示 p 爲尾節點
            if (q == null) {
                // CAS 設置尾節點的下一個節點爲新添加的節點,若是設置失敗,在再次嘗試
                if (p.casNext(null, newNode)) {
                    // 若是tail節點有大於等於1個的 next 節點,則更新 tail 節點,將新添加的節點設置爲 tail 節點
                    if (p != t) // 至關於循環兩次更新一次 tail 節點
                        casTail(t, newNode);  // 新添加的節點設置爲tail節點,容許失敗,失敗了表示有其餘線程成功更新了tail節點
                    return true;
                }
            }
            else if (p == q) // 只有在尾節點和尾節點的下一個節點爲空的狀況下成立
                p = (t != (t = tail)) ? t : head;
            else 
                // 把 tail節點設置爲爲尾節點,再次循環設置下一個節點
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

以上方法就是向隊列中添加元素的方法,該方法也是該類中比較難理解的方法;

從上面的代碼能夠看出,入隊主要作兩件事情:第一是將新添加的節點設置成當前隊列尾節點的下一個節點;第二是更新tail節點,若是tail節點的next節點不爲空,則將入隊節點設置成tail節點,若是tail節點的next節點爲空,則將入隊節點設置成tail的next節點,因此tail節點不老是尾節點。

接下來經過圖的方法來看一下元素入隊的一個過程:

初始時隊列以下:

1. 添加第一個節點:

for循環:

for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;

接下來進行 q==null 的判斷:

if (q == null) {
                if (p.casNext(null, newNode)) {

                    if (p != t) 
                        casTail(t, newNode);  
                    return true;
                }
            }

此時,q==null, 也就是說尾節點的下一個節點爲空,此時把新添加的節點設置爲尾節點的下一個節點,有由於,p節點和 t節點是相等的,因此不會更新 tail 節點;設置成功後,鏈表以下:

2.添加第二個節點:

if (q == null) {
             ....
            }
            else if (p == q)
                // p = q = null
                p = (t != (t = tail)) ? t : head;
            else
                p = (p != t && t != (t = tail)) ? t : q;

此時, q 不爲空,有p和q不想等,因此只會走 else 分支,p != t 爲false,t != tail 也爲false,全部 把 q 賦給 p ,此時隊列以下:

以後,在走到for循環,此時 q == null, 因此,會把新添加的節點設置爲 p 的下一個節點:

if (q == null) {
                if (p.casNext(null, newNode)) {
                    if (p != t) 
                        casTail(t, newNode);  
                    return true;
                }
            }

又由於此時,t 節點仍是指向第一個節點,p 指向第一個節點的next節點,t 和 p 不想等,因此執行 casTail(t, newNode);  新添加的節點設置爲tail 節點,設置成功後,鏈表以下所示:

3.添加第三個元素,添加成功後,鏈表以下所示:

4.添加第四個節點,添加成功後,鏈表以下所示:

至關於在添加元素時,每循環兩次纔會更新一次 tail 節點。

爲何不讓 tail 節點永遠爲隊列的尾節點,若是讓 tail 節點永遠爲隊列的尾節點,則實現的代碼會更少且邏輯也會更清晰,這是由於,若是讓 tail 永遠爲隊列的尾節點,則每次都須要使用循環CAS更新tail節點,若是能減小更新 tail 節點的次數,入隊的性能豈不是更高?因此說並非每次入隊都須要更新尾節點,只有在tail節點和尾節點不想等的狀況下才更新,這樣能減小更新此時,提升效率。

獲取元素

下面來看看獲取元素的操做,ConcurrentLinkedQueue是一個FIFO的隊列,因此獲取元素的時候,老是獲取到隊列的第一個元素;獲取元素有兩個方法,poll() 和 peek(),poll()方法獲取元素的時候,返回鏈表的第一個元素,並刪除,而 peek() 方法獲取元素的時候則不刪除,下面看一下獲取元素的主要代碼邏輯:

public E poll() {
        // 循環跳轉,goto語法
        restartFromHead:
        for (;;) {
            // p 表示要出隊的節點,默認爲 head節點
            for (Node<E> h = head, p = h, q;;) {
                // 出隊的元素
                E item = p.item;
                // 若是出隊的元素不爲空,則把要出隊的元素設置null,不更新head節點;若是出隊元素爲null或者cas設置失敗,則表示有其餘線程已經進行修改,則須要重寫獲取
                if (item != null && p.casItem(item, null)) {
                    if (p != h) // 當head元素爲空,纔會更新head節點,這裏循環兩次,更新一次head節點
                        updateHead(h, ((q = p.next) != null) ? q : p); // 更新head節點
                    return item;
                }
                // 隊列爲空,返回null
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
                // 把 p 的next節點賦值給p
                else
                    p = q;
            }
        }
    }

下面經過圖的方式來查看獲取節點的過程:

首先隊列以下所示:

以後去獲取節點 item0,首先進行標記:

for (Node<E> h = head, p = h, q;;) {
       E item = p.item;
    ..............................
}

標記以下:

由於此時 p.item 不爲空,所示 CAS 設置 p.item 爲null,走第一個if語句,又由於此時 h 節點的 item 不爲空,即 p 和 h 相等,因此不會更新頭節點,直接返回 p 節點中的元素 item0,以下圖所示:

if (item != null && p.casItem(item, null)) {
                    // 不會執行
                    if (p != h) // 
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }

接下來再獲取 item1,進行標記,以下所示:

由於 p.item 爲空,因此不知足條件  if (item != null && p.casItem(item, null)), 以後執行  else if ((q = p.next) == null),此時,q指向了 item1,

又由於 q 不爲空,全部不會更新head節點,以後會執行最後一個else語句:p = q,第一次循環結束,開始第二次循環,又進行標記,此時標記以下:

由於 p.item 不爲空,因此走以下代碼邏輯,經過 CAS 把 p.item 設置爲null,

if (item != null && p.casItem(item, null)) {
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }

由於此時,head節點元素爲空,即 p 和 h 節點不相等,因此會更新頭節點,又由於 p.next 即 item2 不爲空,因此把 p.next 即 item2 設置爲 head節點,設置成功後,隊列以下所示:

以後獲取item2也是一樣的邏輯,獲取 items2 後隊列以下:

獲取 item3 後以下所示:

以上就是從隊列中獲取元素的主要代碼邏輯,從上可知,head節點不必定就是隊列的第一個含有元素的節點,也不是每次獲取元素後就更新head節點,只有當head中的元素爲空的時候才更新head節點,這和添加 offer() 方法中更新tail節點相似,減小 CAS 更新head節點的次數,出隊的效率會更高。

isEmpty()方法

ConcurrentLinkedQueue 經過 isEmpty來判斷隊列是否爲空,代碼以下:

public boolean isEmpty() {
        return first() == null;
    }
Node<E> first() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                boolean hasItem = (p.item != null);
                if (hasItem || (q = p.next) == null) {
                    updateHead(h, p);
                    return hasItem ? p : null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

isEmpty 方法會判斷鏈表的第一個元素是否爲空來進行判斷的。

size()方法

public int size() {
        int count = 0;
        // succ() 獲取下一個元素
        for (Node<E> p = first(); p != null; p = succ(p))
            if (p.item != null)
                if (++count == Integer.MAX_VALUE)
                    break;
        return count;
    }

size()方法會遍歷全部的鏈表來查看有多少個元素。

對於在開發的時候,若是須要判斷是否爲空,則應該使用 isEmpty 而不該該使用 size() > 0 的方式,由於 size()會變量整個鏈表,效率較低。

ConcurrentLinkedQueue 類還有其餘的一些方法,只有理解了入隊和出隊的方法 offer() 和 poll() 方法,其餘方法就很好理解了。

例子

/**
 * @ Author:lenovo
 * @ Date:Created in 下午 7:22 2018/7/17 0017
 */
public class ConcurrentLinkedQueueTest {

    public static void main(String[] args) throws InterruptedException {
        new ConcurrentLinkedQueueTest().testConcurrentLinkedQueue();
        Thread.sleep(5000L);
    }

    int num = 0;

    public ConcurrentLinkedQueue testConcurrentLinkedQueue(){
        ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
        for(int i = 0; i < 100; i++) {
            new Thread(() -> {
                num++;
                queue.offer(num);
            }).start();
        }
        return queue;
    }
}
相關文章
相關標籤/搜索