決定從這篇文章開始,開一個讀源碼系列,不限制平臺語言或工具,任何本身感興趣的都會寫。前幾天碰到一個小問題又讀了一遍ConcurrentQueue的源碼,那就拿C#中比較經常使用的併發隊列ConcurrentQueue做爲開篇來聊一聊它的實現原理。html
話很少說,直奔主題。node
要提早說明下的是,本文解析的源碼是基於.NET Framework 4.8版本,地址是:https://referencesource.microsoft.com/#mscorlib/system/Collections/Concurrent/ConcurrentQueue.cs
原本是打算用.NET Core版本的,可是找了一下居然沒找到:https://github.com/dotnet/runtime/tree/master/src/libraries/System.Collections.Concurrent/src/System/Collections/Concurrent
不知道是我找錯位置了仍是咋回事,有知道的大佬告知一下。不過我以爲實現原理應該相似吧,後面找到了我對比一下,不一樣的話再寫一篇來分析。git
若是是本身實現一個簡單的隊列功能,咱們該如何設計它的存儲結構呢?通常來講有這兩種方式:數組或者鏈表,先來簡單分析下。github
咱們都知道,數組是固定空間的集合,意味着初始化的時候要指定數組大小,可是隊列的長度是隨時變化的,超出數組大小了怎麼辦?這時候就必需要對數組進行擴容。問題又來了,擴容要擴多少呢,少了不夠用多了浪費內存空間。與之相反的,鏈表是動態空間類型的數據結構,元素之間經過指針相連,不須要提早分配空間,須要多少分配多少。但隨之而來的問題是,大量的出隊入隊操做伴隨着大量對象的建立銷燬,GC的壓力又變得很是大。
事實上,在C#的普通隊列Queue
類型中選擇使用數組進行實現,它實現了一套擴容機制,這裏再也不詳細描述,有興趣的直接看源碼,比較簡單。c#
回到主題,要實現一個高性能的線程安全隊列,咱們試着回答如下問題:數組
經過源碼能夠看到ConcurrentQueue
採用了數組+鏈表的組合模式,充分吸取了2種結構的優勢。安全
具體來講,它的整體結構是一個鏈表,鏈表的每一個節點是一個包含數組的特殊對象,咱們稱之爲Segment(段或節,原話是a queue is a linked list of small arrays, each node is called a segment.
),它裏面的數組是存儲真實數據的地方,容量固定大小是32,每個Segment有指向下一個Segment的的指針,以此造成鏈表結構。而隊列中維護了2個特殊的指針,他們分別指向隊列的首段(head segment)和尾段(tail segment),他們對入隊和出隊有着重要的做用。用一張圖來解釋隊列的內部結構:
數據結構
嗯,畫圖畫到這裏忽然聯想到,搞成雙向鏈表的話是否是就神似B+樹的葉子節點?技術就是這麼奇妙~併發
段的核心定義爲:app
/// <summary> /// private class for ConcurrentQueue. /// 鏈表節點(段) /// </summary> private class Segment { //實際存儲數據的容器 internal volatile T[] m_array; //存儲對應位置數據的狀態,當數據的對應狀態位標記爲true時該數據纔是有效的 internal volatile VolatileBool[] m_state; //下一段的指針 private volatile Segment m_next; //當前段在隊列中的索引 internal readonly long m_index; //兩個位置指針 private volatile int m_low; private volatile int m_high; //所屬的隊列實例 private volatile ConcurrentQueue<T> m_source; }
隊列的核心定義爲:
/// <summary> /// 線程安全的先進先出集合, /// </summary> public class ConcurrentQueue<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T> { //首段 [NonSerialized] private volatile Segment m_head; //尾段 [NonSerialized] private volatile Segment m_tail; //每一段的大小 private const int SEGMENT_SIZE = 32; //截取快照的操做數量 [NonSerialized] internal volatile int m_numSnapshotTakers = 0; }
先從初始化一個隊列開始看起。
與普通Queue
不一樣的是,ConcurrentQueue
再也不支持初始化時指定隊列大小(capacity),僅僅提供一個無參構造函數和一個IEnumerable<T>
參數的構造函數。
/// <summary> /// Initializes a new instance of the <see cref="ConcurrentQueue{T}"/> class. /// </summary> public ConcurrentQueue() { m_head = m_tail = new Segment(0, this); }
無參構造函數很簡單,建立了一個Segment實例並把首尾指針都指向它,此時隊列只包含一個Segment,它的索引是0,隊列容量是32。
繼續看一下Segment是如何被初始化的:
/// <summary> /// Create and initialize a segment with the specified index. /// </summary> internal Segment(long index, ConcurrentQueue<T> source) { m_array = new T[SEGMENT_SIZE]; m_state = new VolatileBool[SEGMENT_SIZE]; //all initialized to false m_high = -1; Contract.Assert(index >= 0); m_index = index; m_source = source; }
Segment只提供了一個構造函數,接受的參數分別是隊列索引和隊列實例,它建立了一個長度爲32的數組,並建立了與之對應的狀態數組,而後初始化了位置指針(m_low=0,m_high=-1,此時表示一個空的Segment)。
到這裏,一個併發隊列就建立好了。
使用集合建立隊列的過程和上面相似,只是多了兩個步驟:入隊和擴容,下面會重點描述這兩部分因此這裏再也不過多介紹。
先亮出源碼:
/// <summary> /// Adds an object to the end of the <see cref="ConcurrentQueue{T}"/>. /// </summary> /// <param name="item">The object to add to the end of the <see /// cref="ConcurrentQueue{T}"/>. The value can be a null reference /// (Nothing in Visual Basic) for reference types. /// </param> public void Enqueue(T item) { SpinWait spin = new SpinWait(); while (true) { Segment tail = m_tail; if (tail.TryAppend(item)) return; spin.SpinOnce(); } }
經過源碼能夠看到,入隊操做是在隊尾(m_tail)進行的,它嘗試在最後一個Segment中追加指定的元素,若是成功了就直接返回,失敗的話就自旋等待,直到成功爲止。那什麼狀況下會失敗呢?這就要繼續看看是如何追加元素的:
internal bool TryAppend(T value) { //先判斷一下高位指針有沒有達到數組邊界(也就是數組是否裝滿了) if (m_high >= SEGMENT_SIZE - 1) { return false; } int newhigh = SEGMENT_SIZE; try { } finally { //使用原子操做讓高位指針加1 newhigh = Interlocked.Increment(ref m_high); //若是數組還有空位 if (newhigh <= SEGMENT_SIZE - 1) { //把數據放到數組中,同時更新狀態 m_array[newhigh] = value; m_state[newhigh].m_value = true; } //數組滿了要觸發擴容 if (newhigh == SEGMENT_SIZE - 1) { Grow(); } } return newhigh <= SEGMENT_SIZE - 1; }
因此,只有當尾段m_tail裝滿的狀況下追加元素纔會失敗,這時候必需要等待下一個段產生,也就是擴容(細細品一下Grow這個詞真的很妙),自旋就是在等擴容完成纔能有地方放數據。而在保存數據的時候,經過原子自增操做保證了同一個位置只會有一個數據被寫入,從而實現了線程安全。
注意:這裏的裝滿並非指數組每一個位置都有數據,而是指最後一個位置已被使用。
繼續看一下擴容是怎麼一個過程:
/// <summary> /// Create a new segment and append to the current one /// Update the m_tail pointer /// This method is called when there is no contention /// </summary> internal void Grow() { //no CAS is needed, since there is no contention (other threads are blocked, busy waiting) Segment newSegment = new Segment(m_index + 1, m_source); //m_index is Int64, we don't need to worry about overflow m_next = newSegment; Contract.Assert(m_source.m_tail == this); m_source.m_tail = m_next; }
在普通隊列中,擴容是經過建立一個更大的數組而後把數據拷貝過去實現擴容的,這個操做比較耗時。而在併發隊列中就很是簡單了,首先建立一個新Segment,而後把當前Segment的next指向它,最後掛到隊列的末尾去就能夠了,所有是指針操做很是高效。並且從代碼註釋中能夠看到,這裏不會出現線程競爭的狀況,由於其餘線程都由於位置不夠被阻塞都在自旋等待中。
仍是先亮出源碼:
public bool TryDequeue(out T result) { while (!IsEmpty) { Segment head = m_head; if (head.TryRemove(out result)) return true; //since method IsEmpty spins, we don't need to spin in the while loop } result = default(T); return false; }
能夠看到只有在隊列不爲空(IsEmpty==false)的狀況下才會嘗試出隊操做,而出隊是在首段上進行操做的。關於如何判斷隊列是否爲空總結就一句話:當首段m_head不包含任何數據且沒有下一段的時候隊列才爲空,詳細的判斷過程源碼註釋中寫的很清楚,限於篇幅不詳細介紹。
出隊的本質是從首段中移除低位指針所指向的元素,看一下具體實現步驟:
internal bool TryRemove(out T result) { SpinWait spin = new SpinWait(); int lowLocal = Low, highLocal = High; //判斷當前段是否爲空 while (lowLocal <= highLocal) { //判斷低位指針位置是否能夠移除 if (Interlocked.CompareExchange(ref m_low, lowLocal + 1, lowLocal) == lowLocal) { SpinWait spinLocal = new SpinWait(); //判斷元素是否有效 while (!m_state[lowLocal].m_value) { spinLocal.SpinOnce(); } //取出元素 result = m_array[lowLocal]; //釋放引用關係 if (m_source.m_numSnapshotTakers <= 0) { m_array[lowLocal] = default(T); } //判斷當前段的元素是否所有被移除了,要丟棄它 if (lowLocal + 1 >= SEGMENT_SIZE) { spinLocal = new SpinWait(); while (m_next == null) { spinLocal.SpinOnce(); } Contract.Assert(m_source.m_head == this); m_source.m_head = m_next; } return true; } else { //線程競爭失敗,自旋等待並重置 spin.SpinOnce(); lowLocal = Low; highLocal = High; } }//end of while result = default(T); return false; }
首先,只有當前Segment不爲空的狀況下才嘗試移除元素,不然就直接返回false。而後經過一個原子操做Interlocked.CompareExchange
判斷當前低位指針上是否有其餘線程同時也在移除,若是有那就進入自旋等待,沒有的話就從這個位置取出元素並把低位指針往前推動一位。若是當前隊列沒有正在進行截取快照的操做,那取出元素後還要把這個位置給釋放掉。當這個Segment的全部元素都被移除掉了,這時候要把它丟棄,簡單來講就是讓隊列的首段指針指向它的下一段便可,丟棄的這一段等着GC來收拾它。
這裏稍微提一下Interlocked.CompareExchange,它的意思是比較和交換,也就是更爲你們所熟悉的CAS(Compare-and-Swap),它主要作了如下2件事情:
整個操做是原子性的,對CPU而言就是一條指令,這樣就能夠保證當前位置只有一個線程執行出隊操做。
還有一個
TryPeek()
方法和出隊相似,它是從隊首獲取一個元素可是無需移除該元素,能夠看作Dequeue的簡化版,再也不詳細介紹。
與普通Queue
不一樣的是,ConcurrentQueue
並無維護一個表示隊列中元素個數的計數器,那就意味着要獲得這個數量必須實時去計算。咱們看一下計算過程:
public int Count { get { Segment head, tail; int headLow, tailHigh; GetHeadTailPositions(out head, out tail, out headLow, out tailHigh); if (head == tail) { return tailHigh - headLow + 1; } int count = SEGMENT_SIZE - headLow; count += SEGMENT_SIZE * ((int)(tail.m_index - head.m_index - 1)); count += tailHigh + 1; return count; } }
大體思路是,先計算(GetHeadTailPositions)出首段的低位指針和尾段的高位指針,這中間的總長度就是咱們要的數量,而後分紅3節依次累加每個Segment包含的元素個數獲得最終的隊列長度,能夠看到這是一個開銷比較大的操做。
正由於如此,微軟官方推薦使用IsEmpty
屬性來判斷隊列是否爲空,而不是使用隊列長度Count==0
來判斷,使用ConcurrentStack
也是同樣。
所謂的take snapshot就是指一些格式轉換的操做,例如ToArray()
、ToList()
、GetEnumerator()
這種類型的方法。在前面隊列的核心定義中咱們提到有一個m_numSnapshotTakers
字段,這時候就派上用場了。下面以比較典型的ToList()
源碼舉例說明:
private List<T> ToList() { // Increments the number of active snapshot takers. This increment must happen before the snapshot is // taken. At the same time, Decrement must happen after list copying is over. Only in this way, can it // eliminate race condition when Segment.TryRemove() checks whether m_numSnapshotTakers == 0. Interlocked.Increment(ref m_numSnapshotTakers); List<T> list = new List<T>(); try { Segment head, tail; int headLow, tailHigh; GetHeadTailPositions(out head, out tail, out headLow, out tailHigh); if (head == tail) { head.AddToList(list, headLow, tailHigh); } else { head.AddToList(list, headLow, SEGMENT_SIZE - 1); Segment curr = head.Next; while (curr != tail) { curr.AddToList(list, 0, SEGMENT_SIZE - 1); curr = curr.Next; } tail.AddToList(list, 0, tailHigh); } } finally { // This Decrement must happen after copying is over. Interlocked.Decrement(ref m_numSnapshotTakers); } return list; }
能夠看到,ToList的邏輯和Count很是類似,都是先計算出兩個首尾位置指針,而後把隊列分爲3節依次遍歷處理,最大的不一樣之處在於方法的開頭和結尾分別對m_numSnapshotTakers
作了一個原子操做。
在方法的第一行,使用Interlocked.Increment
作了一次遞增,這時候表示隊列正在進行一次截取快照操做,在處理完後又在finally中用Interlocked.Decrement
作了一次遞減表示當前操做已完成,這樣確保了在進行快照時不被出隊影響。感受這塊很難描述的特別好,因此保留了原始的英文註釋,你們慢慢體會。
到這裏,基本把ConcurrentQueue的核心說清楚了。
回到文章開頭提出的幾個問題,如今應該有了很清晰的答案:
以上所述均是我的理解,若是有錯誤的地方還請不吝指正,以避免誤導他人。
推薦相關閱讀,篇篇都是乾貨:https://www.cnblogs.com/lucifer1982/category/126755.html