本身動手構建無鎖的併發容器(棧和隊列)

更新日誌(2018年8月18日):這篇博客的隊列部分犯了個低級錯誤:入隊和出隊在同在隊列尾端進行。正確的實現方式見基於雙向鏈表實現無鎖隊列的正確姿式(修正以前博客中的錯誤)
git

1.前言

併發容器是線程安全的容器。它在實現容器基本功能的前提下,還提供了併發控制能力,使得容器在被多線程併發訪問的狀況下還能表現出正確的行爲。一般咱們使用獨佔鎖的悲觀策略來進行併發控制,由於其實現相對簡單,正確性易於判斷且絕大部分狀況下都能表現出不錯的性能;固然,也可使用CAS算法來進行併發控制,這是一種無鎖的樂觀策略,儘管其有着實現比較複雜,正確性較難斷定等缺點,但相比獨佔鎖的方式,它至少有如下幾方面的優點:github

  1. 由於不須要加鎖,根本上避免了死鎖的產生。
  2. 避免了線程對鎖的競爭產生的開銷,好比阻塞,喚醒以及線程的調度。
  3. 加鎖本質上是將部分代碼的執行由並行變串行,這會致使程序的並行化比例下降;而無鎖的CAS算法避免了這種狀況發生。根據AMdahl定律,這意味着,隨着計算資源的增長,基於CAS算法構建的併發容器每每有更好的性能提高,儘管其有由於CAS競爭失敗致使的重試問題。

總的來講,無鎖的併發容器更加安全,大部分狀況下吞吐量也更高,可是也有着實現較爲複雜不太好理解的缺點。經過本身動手實現無鎖的線程安全的棧和隊列,就能深入體會到這一點。完整的代碼已經放到github上beautiful-concurrent算法

2. 基於CAS算法構建無鎖的併發棧

棧一般有兩種實現方式,一種是使用數組,另外一種是使用鏈表。首先咱們定義一個無鎖的棧的接口,該接口內部只有兩個方法push()和pop(),以下圖所示數組

/**
 * @author: takumiCX
 * @create: 2018-08-09
 **/
public interface LockFreeStack<E> {

    boolean push(E e);

    E pop();
}

接下來咱們分別就數組和鏈表兩種實現方式,來探討如何基於CAS算法構建無鎖的併發棧。安全

2.1 數組實現

/**
 * @author: takumiCX
 * @create: 2018-08-08
 *
 * 基於數組實現的無鎖的併發棧
 **/
public class LockFreeArrayStack<E> implements LockFreeStack<E>{

    //不支持擴容
    final Object[] elements;

    //容量,一旦肯定不可更改
    final int capacity;

    //記錄棧頂元素在數組中的下標,初始值爲-1
    AtomicInteger top = new AtomicInteger(-1);

    public LockFreeArrayStack(int capacity) {
        this.capacity = capacity;
        elements = new Object[capacity];
    }


    /**
     * 入棧
     *
     * @param e
     * @return true:入棧成功 false:入棧失敗(棧已滿)
     */
    public boolean push(E e) {
        //死循環,保證屢次CAS嘗試後能獲得結果
        for (; ; ) {
            //當前棧頂元素在數組中的下標
            int curTop = top.get();
            //棧已滿,返回false
            if (curTop + 1 >= capacity) {
                return false;
            } else {
                //首先將元素放入棧中
                elements[curTop + 1] = e;
                //基於CAS更新棧頂指針,這裏是top值
                if (top.compareAndSet(curTop, curTop + 1)) {
                    return true;
                }
            }

        }
    }

    /**
     * 出棧
     *
     * @return 棧頂元素, 若棧爲空返回null
     */
    public E pop() {
        //死循環,保證屢次CAS嘗試後能獲得結果
        for (; ; ) {
            //當前棧頂元素在數組中的下標
            int curTop = top.get();
            //棧爲空,返回null
            if (curTop == -1) {
                return null;
            } else {
                //CAS更新棧頂指針,這裏是top值
                if (top.compareAndSet(curTop, curTop - 1)) {
                    return (E) elements[curTop];
                }
            }

        }
    }
}

爲了突出CAS算法實現入棧出棧的過程,同時也是爲了簡化代碼實現的複雜度,基於數組實現的棧不支持擴容,最大容量capacity在構造函數中肯定。top爲棧頂元素"指針",這裏爲棧頂元素在數組中的下標值。由於top值的更新依賴於前值,因此這裏不能使用volatile關鍵字,而應該使用原子變量,保證在更新top值以前top值沒有被其餘線程改變。棧的pop()方法實現比較簡單,只要在棧不爲空的狀況下,原子的把top值更新爲top-1;而棧的push()方法相對來講比較複雜,它包含了兩步:多線程

  • 1.將入棧元素放入數組的top+1位置
  • 2.而後原子的更新top值爲top+1

這裏可能有人會疑問:這難道不會產生併發問題嗎,好比說一個線程執行了1後,另外一個線程也執行了1,這不是把前面的結果覆蓋了嗎?咱們能夠畫圖來演繹下整個過程:
併發

能夠看到,儘管1在多線程環境下會產生元素覆蓋問題,可是對於最後一個覆蓋的線程而言,2的CAS更新是必然會成功的,無論這個CAS更新是由該線程本身執行仍是其餘線程替他執行,一旦某線程CAS更新成功,其餘線程將因CAS失敗從新執行for循環。ide

2.2 鏈表實現

基於鏈表實現無鎖的棧會更靈活,不用考慮棧擴容或者棧空間滿的問題,並且實現一樣簡單。函數

/**
 * @author: takumiCX
 * @create: 2018-08-09
 *
 * 基於鏈表實現的無鎖的併發棧
 **/
public class LockFreeLinkedStack<E> implements LockFreeStack<E>{

    //棧頂指針
    AtomicReference<Node<E>> top = new AtomicReference<Node<E>>();

    /**
     * @param e 入棧元素
     * @return true:入棧成功 false:入棧失敗
     */
    public boolean push(E e) {

        //構造新結點
        Node<E> newNode = new Node<E>(e);

        //死循環,保證CAS失敗重試後能入棧成功
        for (; ; ) {
            //當前棧頂結點
            Node<E> curTopNode = top.get();
            //新結點的next指針指向原棧頂結點
            newNode.next = curTopNode;
            //CAS更新棧頂指針
            if (top.compareAndSet(curTopNode, newNode)) {
                return true;
            }
        }

    }


    /**
     *
     * @return 返回棧頂結點中的值,若棧爲空返回null
     */
    public E pop() {

        //死循環,保證出棧成功
        for (; ; ) {
            //當前棧頂結點
            Node<E> curTopNode = top.get();
            //棧爲空,返回null
            if (curTopNode == null) {
                return null;
            } else {
                //得到原棧頂結點的後繼結點
                Node<E> nextNode = curTopNode.next;
                //CAS更新棧頂指針
                if (top.compareAndSet(curTopNode, nextNode)) {
                    return curTopNode.item;
                }
            }
        }
    }


    //定義鏈表結點
    private static class Node<E> {

        public E item;

        public Node<E> next;

        public Node(E item) {
            this.item = item;
        }
    }
}

由定義可知,該鏈表爲單鏈表,且不帶哨兵。爲了操做方便,新增結點的插入採用頭插法。以下圖所示:

由於棧頂元素的更新依賴於前值(咱們要保證更新棧頂指針前沒有其餘線程對其進行更改),故而使用原子引用,基於原子引用的CAS算法能夠保證只有當更新前的引用爲預期值時,當前更新才能成功(注意棧頂指針的初始化方式,至於爲什麼不能直接給top賦值爲null後面隊列部分會解釋)。其入棧出棧流程也十分簡單,僅需對棧頂指針top進行CAS同步。

  • 入棧時:1.獲取當前棧頂結點 2.新結點的next指針指向獲取的棧頂結點 3.CAS更新棧頂指針。CAS競爭失敗的線程將會重複這一流程。
  • 出棧時:1. 獲取當前棧頂結點 2.若棧頂結點爲null,說明棧爲空,返回null,不然CAS更新棧頂指針爲原棧頂結點的下一個結點 3.返回原棧頂結點的元素 。CAS失敗的線程將會重複這一流程。

2.3 性能測試

以JDK中的Stack爲基準進行性能測試,因爲JDK中的Stack是線程不安全的,在測試時經過手動加鎖的方式保證線程安全。
開啓10個線程,每一個線程混合進行10000次push和pop操做。分別以每種容器進行100次上述操做,計算出每次的平均執行時間(單位毫秒)以下。測試環境的處理器核數爲4。這裏就不貼測試代碼了,想看的點這裏github

能夠看到基於CAS算法的棧確實比基於鎖的棧表現出了更好的性能。

3. 基於CAS算法構建無鎖的併發隊列

隊列其實也有數組(循環數組)和鏈表兩種實現方式,這裏爲了向併發大神Doug Lea致敬(笑),僅就隊列的鏈表實現進行討論。棧的操做只在棧頂進行,只須要有一個棧頂指針,並保證其更新的原子性便可。可是隊列是先進先出的,其入隊和出隊分別在隊列的兩端,因此必須有兩個指針分別指向隊列的頭部和尾部,對它們的更新不只須要保證是原子性的,還要避免其相互衝突,一旦涉及到兩項CAS操做,且它們相互間又存在必定的制約關係,無鎖算法的實現就會一會兒變得複雜起來。不過不用急,經過合理的分析和設計,總能找到正確的辦法。先來看無鎖的隊列的接口定義:

/**
 * @author: takumiCX
 * @create: 2018-08-10
 *
 * 隊列接口,僅包含入隊和出隊抽象方法
 **/
public interface LockFreeQueue<E> {

    //入隊
    boolean enqueue(E e);

    //出隊
    E dequeue();
}

該接口中僅定義了入隊和出隊的方法。

鏈表有雙向鏈表和單向鏈表之分,經過前面閱讀reentrantlock的源碼咱們知道同步隊列就是一種雙向鏈表,而且在爲咱們保留了更多信息的狀況下實現也並不複雜,借鑑下這種經驗,咱們也選擇雙向鏈表來實現隊列。鏈表的結點定義以下:

private static class Node<E> {

    //指向前一個節點的指針
    public volatile Node pre;

    //指向後一個結點的指針
    public volatile Node next;

    //真正要存儲在隊列中的值
    public E item;

    public Node(E item) {
        this.item = item;
    }

    @Override
    public String toString() {
        return "Node{" +
                "item=" + item +
                '}';
    }
}

由於可能有多線程環境對結點指針的併發訪問,因此pre和next指針都用volatile修飾保證可見性。

隊列的頭尾指針分別定義以下,咱們採用了不帶哨兵結點的方式,即頭尾指針在初始化時指向的元素值爲null,又由於咱們須要原子的更新引用,故聲明爲AtomicReference,其內部指針指向的類型爲Node

//指向隊列頭結點的原子引用
private AtomicReference<Node<E>> head = new AtomicReference<>(null);

//指向隊列尾結點的原子引用
private AtomicReference<Node<E>> tail = new AtomicReference<>(null);

注意這裏初始化的寫法,AtomicReference其實也是一個對象,其內部有一個指向真正元素的引用以及一些原子方法。咱們不能直接寫head=tail=null來表示初始化狀態,而應該聲明一個AtomicReference的空對象。整個隊列的結構以下圖所示。

能夠看到隊列的頭指針和尾指針分別指向的是一個AtomicReference對象,而後再由AtomicReference內部的指針去指向真正的頭尾結點。AtomicReference對象不只使咱們的頭尾指針間接找到了隊列的頭尾結點,還提供了原子的更新這種指向的方法。後面所講的更新頭尾指針其實更新的並非真正的頭尾指針,而是它們指向的AtomicReference對象內部的指針。固然思考的時候不用考慮這層抽象。

3.1 入隊方法

/**
 *
 * @param e 要入隊的元素
 * @return  true:入隊成功 false:入隊失敗
 */
public boolean enqueue(E e) {

    //建立一個包含入隊元素的新結點
    Node<E> newNode = new Node<>(e);
    //死循環,保證最後都能進入隊列
    for (; ; ) {
        //當前尾結點
        Node<E> taild = tail.get();
        //當前尾結點爲null,說明隊列爲空
        if (taild == null) {
            //CAS方式更新隊列頭指針
            if (head.compareAndSet(null, newNode)) {
                //非同步方式更新尾指針
                tail.set(newNode);
                return true;
            }

        } else {

            //新結點的pre指針指向原尾結點
            newNode.pre = taild;
            //CAS方式將尾指針指向新的結點
            if (tail.compareAndSet(taild, newNode)) {
                //非同步方式更新
                taild.next = newNode;
                return true;
            }
        }
    }
}

有沒有以爲這部分代碼很眼熟?若是你認真閱讀過AQS中線程加入同步隊列等待部分的源碼,就會發現只是在它的基礎上作了些小改動。整個過程只有更新頭尾指針時進行了CAS同步,因此併發環境下性能很好。至於爲何整個過程是線程安全的能夠參考我在從源碼角度完全理解ReentrantLock(重入鎖)裏3.3小節的講解。

3.2 出隊方法

/**
     * 將隊列首元素從隊列中移除並返回該元素,若隊列爲空則返回null
     * @return
     */
    public E dequeue() {

        //死循環,保證最後都能出隊成功
        for (;;) {
            //當前頭結點
            Node<E> tailed = tail.get();
            //當前尾結點
            Node<E> headed = head.get();
            if (tailed == null) { //尾結點爲null,說明隊列爲空,直接返回null
                return null;

            } else if (headed == tailed) { //尾結點和頭結點相同,說明隊列中只有一個元素,此時要更新頭尾指針
                //CAS方式更新尾指針爲null
                if (tail.compareAndSet(tailed,null)) {
//                    head.compareAndSet(headed, null);
                    //頭指針更新爲null
                    head.set(null);
                    return tailed.item;
                }

            } else {  //此時隊列中元素個數大於1,頭尾指針指向不一樣結點,出隊操做只須要更新尾指針
                Node preNode = tailed.pre;
                //CAS方式更新尾指針指向原尾結點的前一個節點
                if (tail.compareAndSet(tailed, preNode)) {
                    preNode.next = null;  //help gc
                    return tailed.item;
                }

            }
        }
    }

更新日誌(2018年8月18日):最後一個else分句中的出隊邏輯有問題:錯誤的在隊列尾部進行出隊。正確的實現方式見基於雙向鏈表實現無鎖隊列的正確姿式(修正以前博客中的錯誤)中的2.2小節。

首先獲取當前隊列的頭尾結點,而後根據尾結點是否爲null以及頭尾結點是否相等分3種狀況討論:

  • 1.隊列爲空,此時頭尾指針都不用更新,直接返回null
  • 2.隊列只有一個元素,須要同時更新頭尾指針
  • 3.隊列元素個數大於1,只需更新尾指針

3.3 性能測試

以JDK中的LinkedBlockingQueue和ConcurrentLinkedQueue爲基準,前者經過加鎖的方式實現了線程安全,後者也是基於CAS方式實現的,不過邏輯相對複雜,由於進行了不少優化。一樣開啓10個線程,每一個線程混合進行10000次入隊和出隊操做。重複進行100次上述操做計算出平均執行時間並進行比較。測試代碼見github

由上面的結果可知,一樣是基於CAS實現,ConcurrentLinkedQueue的性能比咱們本身構建的無鎖隊列好很多,一個緣由是其內部作了很多優化,好比尾指針並非在每次插入時都會更新,另外一個緣由多是其隊列是由單鏈表構成,少了不少指針操做。LinkedBlockingQueue性能和LockFreeLinkedQueue差很少,若是算上由於測試須要將LockFreeLinkedQueue加了一層適配器的包裝致使的方法調用的額外開銷,可能後者性能稍微還要更好些。看來JDK確實對鎖作了不少優化,經過加鎖的方式
實現線程安全並不會致使性能降低不少,因此大部分併發容器都使用鎖來保證線程安全。

4. 總結

在閱讀ReentrentLock源碼領略過Doug Lea精湛的併發技藝後趁熱打鐵,本身動手構建了無鎖的線程安全的棧和隊列。整個過程雖有很多挑戰,但最終獲益匪淺,對原子變量和CAS算法有了更深的理解,也鍛鍊了分析和解決問題的能力。

相關文章
相關標籤/搜索