目錄html
更新日誌(2018年8月18日):這篇博客的隊列部分犯了個低級錯誤:入隊和出隊在同在隊列尾端進行。正確的實現方式見基於雙向鏈表實現無鎖隊列的正確姿式(修正以前博客中的錯誤)
git
併發容器是線程安全的容器。它在實現容器基本功能的前提下,還提供了併發控制能力,使得容器在被多線程併發訪問的狀況下還能表現出正確的行爲。一般咱們使用獨佔鎖的悲觀策略來進行併發控制,由於其實現相對簡單,正確性易於判斷且絕大部分狀況下都能表現出不錯的性能;固然,也可使用CAS算法來進行併發控制,這是一種無鎖的樂觀策略,儘管其有着實現比較複雜,正確性較難斷定等缺點,但相比獨佔鎖的方式,它至少有如下幾方面的優點:github
總的來講,無鎖的併發容器更加安全,大部分狀況下吞吐量也更高,可是也有着實現較爲複雜不太好理解的缺點。經過本身動手實現無鎖的線程安全的棧和隊列,就能深入體會到這一點。完整的代碼已經放到github上beautiful-concurrent算法
棧一般有兩種實現方式,一種是使用數組,另外一種是使用鏈表。首先咱們定義一個無鎖的棧的接口,該接口內部只有兩個方法push()和pop(),以下圖所示數組
/** * @author: takumiCX * @create: 2018-08-09 **/ public interface LockFreeStack<E> { boolean push(E e); E pop(); }
接下來咱們分別就數組和鏈表兩種實現方式,來探討如何基於CAS算法構建無鎖的併發棧。安全
/** * @author: takumiCX * @create: 2018-08-08 * * 基於數組實現的無鎖的併發棧 **/ public class LockFreeArrayStack<E> implements LockFreeStack<E>{ //不支持擴容 final Object[] elements; //容量,一旦肯定不可更改 final int capacity; //記錄棧頂元素在數組中的下標,初始值爲-1 AtomicInteger top = new AtomicInteger(-1); public LockFreeArrayStack(int capacity) { this.capacity = capacity; elements = new Object[capacity]; } /** * 入棧 * * @param e * @return true:入棧成功 false:入棧失敗(棧已滿) */ public boolean push(E e) { //死循環,保證屢次CAS嘗試後能獲得結果 for (; ; ) { //當前棧頂元素在數組中的下標 int curTop = top.get(); //棧已滿,返回false if (curTop + 1 >= capacity) { return false; } else { //首先將元素放入棧中 elements[curTop + 1] = e; //基於CAS更新棧頂指針,這裏是top值 if (top.compareAndSet(curTop, curTop + 1)) { return true; } } } } /** * 出棧 * * @return 棧頂元素, 若棧爲空返回null */ public E pop() { //死循環,保證屢次CAS嘗試後能獲得結果 for (; ; ) { //當前棧頂元素在數組中的下標 int curTop = top.get(); //棧爲空,返回null if (curTop == -1) { return null; } else { //CAS更新棧頂指針,這裏是top值 if (top.compareAndSet(curTop, curTop - 1)) { return (E) elements[curTop]; } } } } }
爲了突出CAS算法實現入棧出棧的過程,同時也是爲了簡化代碼實現的複雜度,基於數組實現的棧不支持擴容,最大容量capacity在構造函數中肯定。top爲棧頂元素"指針",這裏爲棧頂元素在數組中的下標值。由於top值的更新依賴於前值,因此這裏不能使用volatile關鍵字,而應該使用原子變量,保證在更新top值以前top值沒有被其餘線程改變。棧的pop()方法實現比較簡單,只要在棧不爲空的狀況下,原子的把top值更新爲top-1;而棧的push()方法相對來講比較複雜,它包含了兩步:多線程
這裏可能有人會疑問:這難道不會產生併發問題嗎,好比說一個線程執行了1後,另外一個線程也執行了1,這不是把前面的結果覆蓋了嗎?咱們能夠畫圖來演繹下整個過程:
併發
能夠看到,儘管1在多線程環境下會產生元素覆蓋問題,可是對於最後一個覆蓋的線程而言,2的CAS更新是必然會成功的,無論這個CAS更新是由該線程本身執行仍是其餘線程替他執行,一旦某線程CAS更新成功,其餘線程將因CAS失敗從新執行for循環。ide
基於鏈表實現無鎖的棧會更靈活,不用考慮棧擴容或者棧空間滿的問題,並且實現一樣簡單。函數
/** * @author: takumiCX * @create: 2018-08-09 * * 基於鏈表實現的無鎖的併發棧 **/ public class LockFreeLinkedStack<E> implements LockFreeStack<E>{ //棧頂指針 AtomicReference<Node<E>> top = new AtomicReference<Node<E>>(); /** * @param e 入棧元素 * @return true:入棧成功 false:入棧失敗 */ public boolean push(E e) { //構造新結點 Node<E> newNode = new Node<E>(e); //死循環,保證CAS失敗重試後能入棧成功 for (; ; ) { //當前棧頂結點 Node<E> curTopNode = top.get(); //新結點的next指針指向原棧頂結點 newNode.next = curTopNode; //CAS更新棧頂指針 if (top.compareAndSet(curTopNode, newNode)) { return true; } } } /** * * @return 返回棧頂結點中的值,若棧爲空返回null */ public E pop() { //死循環,保證出棧成功 for (; ; ) { //當前棧頂結點 Node<E> curTopNode = top.get(); //棧爲空,返回null if (curTopNode == null) { return null; } else { //得到原棧頂結點的後繼結點 Node<E> nextNode = curTopNode.next; //CAS更新棧頂指針 if (top.compareAndSet(curTopNode, nextNode)) { return curTopNode.item; } } } } //定義鏈表結點 private static class Node<E> { public E item; public Node<E> next; public Node(E item) { this.item = item; } } }
由定義可知,該鏈表爲單鏈表,且不帶哨兵。爲了操做方便,新增結點的插入採用頭插法。以下圖所示:
由於棧頂元素的更新依賴於前值(咱們要保證更新棧頂指針前沒有其餘線程對其進行更改),故而使用原子引用,基於原子引用的CAS算法能夠保證只有當更新前的引用爲預期值時,當前更新才能成功(注意棧頂指針的初始化方式,至於爲什麼不能直接給top賦值爲null後面隊列部分會解釋)。其入棧出棧流程也十分簡單,僅需對棧頂指針top進行CAS同步。
以JDK中的Stack爲基準進行性能測試,因爲JDK中的Stack是線程不安全的,在測試時經過手動加鎖的方式保證線程安全。
開啓10個線程,每一個線程混合進行10000次push和pop操做。分別以每種容器進行100次上述操做,計算出每次的平均執行時間(單位毫秒)以下。測試環境的處理器核數爲4。這裏就不貼測試代碼了,想看的點這裏github
能夠看到基於CAS算法的棧確實比基於鎖的棧表現出了更好的性能。
隊列其實也有數組(循環數組)和鏈表兩種實現方式,這裏爲了向併發大神Doug Lea致敬(笑),僅就隊列的鏈表實現進行討論。棧的操做只在棧頂進行,只須要有一個棧頂指針,並保證其更新的原子性便可。可是隊列是先進先出的,其入隊和出隊分別在隊列的兩端,因此必須有兩個指針分別指向隊列的頭部和尾部,對它們的更新不只須要保證是原子性的,還要避免其相互衝突,一旦涉及到兩項CAS操做,且它們相互間又存在必定的制約關係,無鎖算法的實現就會一會兒變得複雜起來。不過不用急,經過合理的分析和設計,總能找到正確的辦法。先來看無鎖的隊列的接口定義:
/** * @author: takumiCX * @create: 2018-08-10 * * 隊列接口,僅包含入隊和出隊抽象方法 **/ public interface LockFreeQueue<E> { //入隊 boolean enqueue(E e); //出隊 E dequeue(); }
該接口中僅定義了入隊和出隊的方法。
鏈表有雙向鏈表和單向鏈表之分,經過前面閱讀reentrantlock的源碼咱們知道同步隊列就是一種雙向鏈表,而且在爲咱們保留了更多信息的狀況下實現也並不複雜,借鑑下這種經驗,咱們也選擇雙向鏈表來實現隊列。鏈表的結點定義以下:
private static class Node<E> { //指向前一個節點的指針 public volatile Node pre; //指向後一個結點的指針 public volatile Node next; //真正要存儲在隊列中的值 public E item; public Node(E item) { this.item = item; } @Override public String toString() { return "Node{" + "item=" + item + '}'; } }
由於可能有多線程環境對結點指針的併發訪問,因此pre和next指針都用volatile修飾保證可見性。
隊列的頭尾指針分別定義以下,咱們採用了不帶哨兵結點的方式,即頭尾指針在初始化時指向的元素值爲null,又由於咱們須要原子的更新引用,故聲明爲AtomicReference,其內部指針指向的類型爲Node
//指向隊列頭結點的原子引用 private AtomicReference<Node<E>> head = new AtomicReference<>(null); //指向隊列尾結點的原子引用 private AtomicReference<Node<E>> tail = new AtomicReference<>(null);
注意這裏初始化的寫法,AtomicReference其實也是一個對象,其內部有一個指向真正元素的引用以及一些原子方法。咱們不能直接寫head=tail=null來表示初始化狀態,而應該聲明一個AtomicReference的空對象。整個隊列的結構以下圖所示。
能夠看到隊列的頭指針和尾指針分別指向的是一個AtomicReference對象,而後再由AtomicReference內部的指針去指向真正的頭尾結點。AtomicReference對象不只使咱們的頭尾指針間接找到了隊列的頭尾結點,還提供了原子的更新這種指向的方法。後面所講的更新頭尾指針其實更新的並非真正的頭尾指針,而是它們指向的AtomicReference對象內部的指針。固然思考的時候不用考慮這層抽象。
/** * * @param e 要入隊的元素 * @return true:入隊成功 false:入隊失敗 */ public boolean enqueue(E e) { //建立一個包含入隊元素的新結點 Node<E> newNode = new Node<>(e); //死循環,保證最後都能進入隊列 for (; ; ) { //當前尾結點 Node<E> taild = tail.get(); //當前尾結點爲null,說明隊列爲空 if (taild == null) { //CAS方式更新隊列頭指針 if (head.compareAndSet(null, newNode)) { //非同步方式更新尾指針 tail.set(newNode); return true; } } else { //新結點的pre指針指向原尾結點 newNode.pre = taild; //CAS方式將尾指針指向新的結點 if (tail.compareAndSet(taild, newNode)) { //非同步方式更新 taild.next = newNode; return true; } } } }
有沒有以爲這部分代碼很眼熟?若是你認真閱讀過AQS中線程加入同步隊列等待部分的源碼,就會發現只是在它的基礎上作了些小改動。整個過程只有更新頭尾指針時進行了CAS同步,因此併發環境下性能很好。至於爲何整個過程是線程安全的能夠參考我在從源碼角度完全理解ReentrantLock(重入鎖)裏3.3小節的講解。
/** * 將隊列首元素從隊列中移除並返回該元素,若隊列爲空則返回null * @return */ public E dequeue() { //死循環,保證最後都能出隊成功 for (;;) { //當前頭結點 Node<E> tailed = tail.get(); //當前尾結點 Node<E> headed = head.get(); if (tailed == null) { //尾結點爲null,說明隊列爲空,直接返回null return null; } else if (headed == tailed) { //尾結點和頭結點相同,說明隊列中只有一個元素,此時要更新頭尾指針 //CAS方式更新尾指針爲null if (tail.compareAndSet(tailed,null)) { // head.compareAndSet(headed, null); //頭指針更新爲null head.set(null); return tailed.item; } } else { //此時隊列中元素個數大於1,頭尾指針指向不一樣結點,出隊操做只須要更新尾指針 Node preNode = tailed.pre; //CAS方式更新尾指針指向原尾結點的前一個節點 if (tail.compareAndSet(tailed, preNode)) { preNode.next = null; //help gc return tailed.item; } } } }
更新日誌(2018年8月18日):最後一個else分句中的出隊邏輯有問題:錯誤的在隊列尾部進行出隊。正確的實現方式見基於雙向鏈表實現無鎖隊列的正確姿式(修正以前博客中的錯誤)中的2.2小節。
首先獲取當前隊列的頭尾結點,而後根據尾結點是否爲null以及頭尾結點是否相等分3種狀況討論:
以JDK中的LinkedBlockingQueue和ConcurrentLinkedQueue爲基準,前者經過加鎖的方式實現了線程安全,後者也是基於CAS方式實現的,不過邏輯相對複雜,由於進行了不少優化。一樣開啓10個線程,每一個線程混合進行10000次入隊和出隊操做。重複進行100次上述操做計算出平均執行時間並進行比較。測試代碼見github
由上面的結果可知,一樣是基於CAS實現,ConcurrentLinkedQueue的性能比咱們本身構建的無鎖隊列好很多,一個緣由是其內部作了很多優化,好比尾指針並非在每次插入時都會更新,另外一個緣由多是其隊列是由單鏈表構成,少了不少指針操做。LinkedBlockingQueue性能和LockFreeLinkedQueue差很少,若是算上由於測試須要將LockFreeLinkedQueue加了一層適配器的包裝致使的方法調用的額外開銷,可能後者性能稍微還要更好些。看來JDK確實對鎖作了不少優化,經過加鎖的方式
實現線程安全並不會致使性能降低不少,因此大部分併發容器都使用鎖來保證線程安全。
在閱讀ReentrentLock源碼領略過Doug Lea精湛的併發技藝後趁熱打鐵,本身動手構建了無鎖的線程安全的棧和隊列。整個過程雖有很多挑戰,但最終獲益匪淺,對原子變量和CAS算法有了更深的理解,也鍛鍊了分析和解決問題的能力。