[一塊兒讀源碼]走進C#併發隊列ConcurrentQueue的內部世界 — .NET Core篇

在上一篇《走進C#併發隊列ConcurrentQueue的內部世界》中解析了Framework下的ConcurrentQueue實現原理,通過拋磚引玉,獲得了一衆大佬的指點,找到了.NET Core版本下的ConcurrentQueue源碼,位於如下地址:html

我大體看了一下,雖然二者的實現有很多類似的地方,不過在細節上新增了許多有意思的東西,仍是以爲要單獨拉出來講一下。畫外音:誰叫我上篇立了flag,如今跪着也要寫完。。🤣git

必需要吐糟的是,代碼中ConcurrentQueue類明明是包含在System.Collections.Concurrent命名空間下,可是源碼結構中的文件卻放在System.Private.CoreLib目錄中,這是鬧哪出~github


存儲結構

從上面給出的源碼地址能夠猜想出整個結構依然是Segment+Queue的組合,經過一個Segment鏈表實現了Queue結構,但實際上內部又加了新的設計。拋去Queue先不看的話,Segment自己就是一個實現了多生產者多消費者的線程安全集合,甚至能夠直接拿它當一個固定容量的線程安全隊列使用,這點與以前Framework中差異很大。若是結合Queue總體來看,Segment再也不是固定容量,而是能夠由Queue來控制每一個Segment的容量大小(最小是32,上限是1024 * 1024)。算法

在Framework中,隊列會給每一個Segment分配一個索引,雖然這個索引是long類型的,但理論上說隊列容量仍是存在上限。在Core中就不同了,它取消了這個索引,真正實現了一個無邊界(unbounded)隊列。c#

我猜想的緣由是,在Framework中因爲每一個Segment是固定大小的,維護一個索引能夠很方便的計算隊列裏的元素數量,可是Core中的Segment大小不是固定的,使用索引並不能加快計算速度,使得這個索引再也不有意義,這也意味着計算元素數量變得很是複雜。數組

一張圖看清它的真實面目,這裏繼續沿用上一篇的結構圖稍做修改:
 緩存

從圖中能夠看到,總體結構上基本一致,核心改動就是Segment中增長了Slot(槽)的概念,這是真正存儲數據的地方,同時有一個序列號與之對應。安全

從代碼來看一下Segment的核心定義:併發

internal sealed class ConcurrentQueueSegment<T>
{
    //存放數據的容器
	internal readonly Slot[] _slots;

	//這個mask用來計算槽點,能夠防止查找越界
	internal readonly int _slotsMask;

	//首尾位置指針
	internal PaddedHeadAndTail _headAndTail;

	//觀察保留標記,表示當前段在出隊時可否刪除數據
	internal bool _preservedForObservation;

	//標記當前段是否被鎖住
	internal bool _frozenForEnqueues;

	//下一段的指針
	internal ConcurrentQueueSegment<T>? _nextSegment;
}

其中_preservedForObservation_frozenForEnqueues會比較難理解,後面再詳細介紹。分佈式

再看一下隊列的核心定義:

public class ConcurrentQueue<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T>
{
    //每一段的初始化長度,也是最小長度
	private const int InitialSegmentLength = 32;

    //每一段的最大長度
	private const int MaxSegmentLength = 1024 * 1024;

    //操做多個段時的鎖對象
	private readonly object _crossSegmentLock;

    //尾段指針
	private volatile ConcurrentQueueSegment<T> _tail;

    //首段指針
	private volatile ConcurrentQueueSegment<T> _head;
}

常規操做

仍是按上一篇的套路爲主線按部就班。

建立實例

ConcurrentQueue依然提供了2個構造函數,分別能夠建立一個空隊列和指定數據集的隊列。

/// <summary>
/// Initializes a new instance of the <see cref="ConcurrentQueue{T}"/> class.
/// </summary>
public ConcurrentQueue()
{
    _crossSegmentLock = new object();
    _tail = _head = new ConcurrentQueueSegment<T>(InitialSegmentLength);
}

仍是熟悉的操做,建立了一個長度是32的Segment並把隊列的首尾指針都指向它,同時建立了鎖對象實例,僅此而已。
進一步看看Segment是怎麼建立的:

internal ConcurrentQueueSegment(int boundedLength)
{
    //這裏驗證了長度不能小於2而且必須是2的N次冪
    Debug.Assert(boundedLength >= 2, $"Must be >= 2, got {boundedLength}");
    Debug.Assert((boundedLength & (boundedLength - 1)) == 0, $"Must be a power of 2, got {boundedLength}");

    _slots = new Slot[boundedLength];
    //這個mask的做用就是用來計算數組索引的防止越界,能夠用`& _slotsMask`取代`% _slots.Length`
    _slotsMask = boundedLength - 1;

    //設置初始序列號
    for (int i = 0; i < _slots.Length; i++)
    {
        _slots[i].SequenceNumber = i;
    }
}

internal struct Slot
{
    [AllowNull, MaybeNull] public T Item; 
    
    public int SequenceNumber;
}

再看看怎麼用集合初始化隊列,這個過程稍微麻煩點,可是頗有意思:

public ConcurrentQueue(IEnumerable<T> collection)
{
    if (collection == null)
    {
        ThrowHelper.ThrowArgumentNullException(ExceptionArgument.collection);
    }

    _crossSegmentLock = new object();

    //計算獲得第一段的長度
    int length = InitialSegmentLength;
    if (collection is ICollection<T> c)
    {
        int count = c.Count;
        if (count > length)
        {
            length = Math.Min(ConcurrentQueueSegment<T>.RoundUpToPowerOf2(count), MaxSegmentLength);
        }
    }

    //根據前面計算出來的長度建立一個Segment,再把數據依次入隊
    _tail = _head = new ConcurrentQueueSegment<T>(length);
    foreach (T item in collection)
    {
        Enqueue(item);
    }
}

能夠看到,第一段的大小是根據初始集合的大小肯定的,若是集合大小count大於32就對count進行向上取2的N次冪(RoundUpToPowerOf2)獲得實際大小(可是不能超過最大值),不然就按默認值32來初始化。

向上取2的N次冪究竟是啥意思??例如count是5,那獲得的結果就是8(2×2×2);若是count是9,那結果就是16(2×2×2×2);若是恰好count是8那結果就是8(2×2×2),具體算法是經過位運算實現的頗有意思。至於爲何必定要是2的N次冪,中間的玄機我也沒搞明白。。

順藤摸瓜,再看看進隊操做如何實現。

元素進隊

/// <summary>在隊尾追加一個元素</summary>
public void Enqueue(T item)
{
    // 先嚐試在尾段插入一個元素
    if (!_tail.TryEnqueue(item))
    {
        // 若是插入失敗,就意味着尾段已經填滿,須要日後擴容
        EnqueueSlow(item);
    }
}

private void EnqueueSlow(T item)
{
    while (true)
    {
        ConcurrentQueueSegment<T> tail = _tail;

        // 先嚐試再隊尾插入元素,若是擴容完成了就會成功
        if (tail.TryEnqueue(item))
        {
            return;
        }
        // 得到一把鎖,避免多個線程同時進行擴容
        lock (_crossSegmentLock)
        {
            //檢查是否擴容過了
            if (tail == _tail)
            {
                // 尾段凍結
                tail.EnsureFrozenForEnqueues();
                // 計算下一段的長度
                int nextSize = tail._preservedForObservation ? InitialSegmentLength : Math.Min(tail.Capacity * 2, MaxSegmentLength);
                var newTail = new ConcurrentQueueSegment<T>(nextSize);

                // 改變隊尾指向
                tail._nextSegment = newTail;
                // 指針交換
                _tail = newTail;
            }
        }
    }
}

從以上流程能夠看到,擴容的主動權再也不由Segment去控制,而是交給了隊列。正由於如此,因此在跨段操做時要先加鎖,在Framework版本中是在原子操做得到指針後進行的擴容因此不會有這個問題,後面的出隊操做也是同樣的道理。擴容過程當中有兩個細節須要重點關注,那就是SegmentFrozen和下一段的長度計算。
從前面Segment的定義中咱們看到它維護了一個_frozenForEnqueues標記字段,表示當前段是否被凍結鎖定,在被鎖住的狀況下會讓其餘入隊操做失敗,看一下實現過程:

// must only be called while queue's segment lock is held
internal void EnsureFrozenForEnqueues() 
{
    // flag used to ensure we don't increase the Tail more than once if frozen more than once
    if (!_frozenForEnqueues) 
    {
        _frozenForEnqueues = true;
        Interlocked.Add(ref _headAndTail.Tail, FreezeOffset);
    }
}

首先判斷當前凍結狀態,而後把它設置爲true,再使用原子操做把尾指針增長了2倍段長的偏移量,這個尾指針纔是真正限制當前段不可新增元素的關鍵點,後面講段的元素追加再關聯起來詳細介紹。而爲何要指定2倍段長這麼一個特殊值呢,目的是爲了把尾指針和mask作運算後落在同一個slot上,也就是說雖然兩個指針位置不同可是都指向的是同一個槽。

再說說下一段長度的計算問題,它主要是受_preservedForObservation這個字段影響,正常狀況下一段的長度是尾段的2倍,但若是尾段正好被標記爲觀察保留(相似於上一篇的截取快照),那麼下一段的長度依然是初始值32,原做者認爲入隊操做不是很頻繁,這樣作主要是爲了不浪費空間。

接着是重頭戲,看一下如何給段追加元素:

public bool TryEnqueue(T item)
{
    Slot[] slots = _slots;

    // 若是發生競爭就自旋等待
    SpinWait spinner = default;
    while (true)
    {
        // 獲取當前段的尾指針
        int currentTail = Volatile.Read(ref _headAndTail.Tail);
        // 計算槽點
        int slotsIndex = currentTail & _slotsMask;
        // 讀取對應槽的序列號
        int sequenceNumber = Volatile.Read(ref slots[slotsIndex].SequenceNumber);

        // 判斷槽點序列號和指針是否匹配
        int diff = sequenceNumber - currentTail;
        if (diff == 0)
        {
            // 經過原子操做比較交換,保證了只有一個入隊者得到可用空間
            if (Interlocked.CompareExchange(ref _headAndTail.Tail, currentTail + 1, currentTail) == currentTail)
            {
                // 把數據存入對應的槽點,以及更新序列號
                slots[slotsIndex].Item = item;
                Volatile.Write(ref slots[slotsIndex].SequenceNumber, currentTail + 1);
                return true;
            }
        }
        else if (diff < 0)
        {
            // 序列號小於指針就說明該段已經裝滿了,直接返回false
            return false;
        }

        // 此次競爭失敗了,只好等下去
        spinner.SpinOnce(sleep1Threshold: -1);
    }
}

整個流程的核心就是藉助槽點序列號和尾指針的匹配狀況判斷是否有可用空間,由於在初始化的時候序列號是從0遞增,正常狀況下尾指針和序列號確定是匹配的,只有在整個段被裝滿時尾指針纔會大於序列號,由於前面的凍結操做會給尾指針追加2倍段長的偏移量。要重點提出的是,只有在數據被寫入而且序列號更新完成後才表示整個位置的元素有效,纔能有出隊的機會,在Framework是經過維護一個狀態位來實現這個功能。整個設計頗有意思,要慢慢品。

這裏咱們能夠總結一下序列號的核心做用:假設一個槽點N,對應序列號是Q,它能容許入隊的必要條件之一就是N==Q,因爲入隊操做把位置N的序列號修改爲N+1,那麼能夠猜想出在出隊時的必要條件之一就是知足Q==N+1

代碼中的CompareExchange在上一篇中有介紹,這裏再也不重複。另外關於Volatile相關的稍微提一下,它的核心做用是避免內存與CPU之間的高速緩存帶來的數據不一致問題,告訴編譯器直接讀寫原始數據,有興趣的能夠找資料瞭解,限於篇幅不過多介紹。

元素出隊

能夠猜想到,入隊的時候要根據容量大小進行擴容,那麼與之對應的,出隊的時候就須要對它進行壓縮,也就是丟棄沒有數據的段。

/// <summary>從隊首移除一個元素</summary>
public bool TryDequeue([MaybeNullWhen(false)] out T result) =>
    _head.TryDequeue(out result) || 
    TryDequeueSlow(out result); 

private bool TryDequeueSlow([MaybeNullWhen(false)] out T item)
{
    // 不斷循環嘗試出隊,直到成功或失敗爲止
    while (true)
    {
        ConcurrentQueueSegment<T> head = _head;

        // 嘗試從隊首移除,若是成功就直接返回了
        if (head.TryDequeue(out item))
        {
            return true;
        }

        // 若是首段爲空而且沒有下一段了,則說明整個隊列都沒有數據了,返回失敗
        if (head._nextSegment == null)
        {
            item = default!;
            return false;
        }

        // 既然下一段不爲空,那就再次確認本段是否還能出隊成功,不然就要把它給移除了,等待下次循環從下一段出隊
        if (head.TryDequeue(out item))
        {
            return true;
        }

        // 首段指針要日後移動,表示當前首段已丟棄,跨段操做要先加鎖
        lock (_crossSegmentLock)
        {
            if (head == _head)
            {
                _head = head._nextSegment;
            }
        }
    }
}

總體流程基本和入隊同樣,外層經過一個死循環不斷嘗試操做,直到出隊成功或者隊列爲空返回失敗爲止。釋放空間的操做也從Segment轉移到隊列上,因此要加鎖保證線程安全。這一步我在代碼註釋中寫的很詳細就很少解釋了,再看一下核心操做Segment是如何移除元素的:

public bool TryDequeue([MaybeNullWhen(false)] out T item)
{
    Slot[] slots = _slots;

    // 遇到競爭時自旋等待
    SpinWait spinner = default;
    while (true)
    {
        // 獲取頭指針地址
        int currentHead = Volatile.Read(ref _headAndTail.Head);
        // 計算槽點
        int slotsIndex = currentHead & _slotsMask;

        // 獲取槽點對應的序列號
        int sequenceNumber = Volatile.Read(ref slots[slotsIndex].SequenceNumber);

        // 比較序列號是否和指望值同樣,爲何要加1的緣由前面入隊時說過
        int diff = sequenceNumber - (currentHead + 1);
        if (diff == 0)
        {
            // 經過原子操做比較交換獲得能夠出隊的槽點,並把頭指針日後移動一位
            if (Interlocked.CompareExchange(ref _headAndTail.Head, currentHead + 1, currentHead) == currentHead)
            {
                // 取出數據
                item = slots[slotsIndex].Item!;
                // 此時若是該段沒有被標記觀察保護,要把這個槽點的數據清空
                if (!Volatile.Read(ref _preservedForObservation))
                {
                    slots[slotsIndex].Item = default;
                    Volatile.Write(ref slots[slotsIndex].SequenceNumber, currentHead + slots.Length);
                }
                return true;
            }
        }
        else if (diff < 0)
        {
            // 這種狀況說明該段已經沒有有效數據了,直接返回失敗。
            bool frozen = _frozenForEnqueues;
            int currentTail = Volatile.Read(ref _headAndTail.Tail);
            if (currentTail - currentHead <= 0 || (frozen && (currentTail - FreezeOffset - currentHead <= 0)))
            {
                item = default!;
                return false;
            }
        }

        // 競爭失敗進入下一輪等待
        spinner.SpinOnce(sleep1Threshold: -1);
    }
}

流程和追加元素相似,大部分都寫在備註裏面了,這裏只額外提一下爲空的狀況。Segment爲空只有一種狀況,那就是頭尾指針落在了同一個槽點,但這是會出現兩種可能性:

  • 第一種是都落在了非最後一個槽點,意味着該段沒有被裝滿,拿首尾指針相減便可判斷。
  • 第二種是都落在了最後一個槽點,意味着該段已經被裝滿了,若是此時正在進行擴容(frozen),那麼必需要在尾指針的基礎上減去FreezeOffset再去和頭指針判斷,緣由前面有說過;

是否是感受環環相扣、相輔相成、如膠似漆、balabala.....😜

統計元素數量

前面也預告過,由於隊列再也不維護段索引,這樣會致使計算元素數量變得很是複雜,複雜到我都不想說這一部分了😭。簡單描述一下就跳過了:核心思路就是一段一段來遍歷,而後計算出每段的大小最後把結果累加,若是涉及多個段還得加鎖,具體到段內部就要根據首尾指針計算槽點得出實際數量等等等等,代碼很長就不貼出來了。

這裏也嚴重提醒一句,非必要狀況下不要調用Count不要調用Count不要調用Count。

接下來重點說一下隊列的IsEmpty。因爲Segment再也不維護IsEmpty信息,因此實現方式就有點曲線救國了,經過嘗試可否從隊首位置獲取一個元素來判斷是否隊列爲空,也就是常說的TryPeek操做,但細節上稍有不一樣。

/// <summary>
/// 判斷隊列是否爲空,千萬不要使用Count==0來判斷,也不要直接TryPeek
/// </summary>
public bool IsEmpty => !TryPeek(out _, resultUsed: false);

private bool TryPeek([MaybeNullWhen(false)] out T result, bool resultUsed)
{
    ConcurrentQueueSegment<T> s = _head;
    while (true)
    {
        ConcurrentQueueSegment<T>? next = Volatile.Read(ref s._nextSegment);

        // 從首段中獲取頭部元素,成功的話直接返回true,獲取失敗就意味着首段爲空了
        if (s.TryPeek(out result, resultUsed))
        {
            return true;
        }

        // 若是下一段不爲空那就再嘗試從下一段從新獲取
        if (next != null)
        {
            s = next;
        }
        //若是下一段爲空就說明整個隊列爲空,跳出循環直接返回false了
        else if (Volatile.Read(ref s._nextSegment) == null)
        {
            break;
        }
    }
    result = default!;
    return false;
}

上面的代碼能夠看到有一個特殊的參數resultUsed,它具體會有什麼影響呢,那就得看看Segment是如何peek的:

public bool TryPeek([MaybeNullWhen(false)] out T result, bool resultUsed)
{
    // 實際上隊列的TryPeek是一個觀察保護操做,這時resultUsed會標記成true,若是是IsEmpty操做的話就爲false,由於並不關心這個元素是否被釋放了
    if (resultUsed)
    {
        _preservedForObservation = true;
        Interlocked.MemoryBarrier();
    }

    Slot[] slots = _slots;

    SpinWait spinner = default;
    while (true)
    {
        int currentHead = Volatile.Read(ref _headAndTail.Head);
        int slotsIndex = currentHead & _slotsMask;

        int sequenceNumber = Volatile.Read(ref slots[slotsIndex].SequenceNumber);

        int diff = sequenceNumber - (currentHead + 1);
        if (diff == 0)
        {
            result = resultUsed ? slots[slotsIndex].Item! : default!;
            return true;
        }
        else if (diff < 0)
        {
            bool frozen = _frozenForEnqueues;
            int currentTail = Volatile.Read(ref _headAndTail.Tail);
            if (currentTail - currentHead <= 0 || (frozen && (currentTail - FreezeOffset - currentHead <= 0)))
            {
                result = default!;
                return false;
            }
        }
        spinner.SpinOnce(sleep1Threshold: -1);
    }
}

除了最開始的resultUsed判斷,其餘的基本和出隊的邏輯一致,前面說的很詳細,這裏很少介紹了。

枚舉轉換數據

前面反覆的提到觀察保護,這到底是個啥意思??爲何要有這個操做??

其實看過上一篇文章的話就比較好理解一點,這裏稍微回顧一下方便對比。在Framework中會有截取快照的操做,也就是相似ToArray\ToList\GetEnumerator這種要作數據迭代,它是經過原子操做維護一個m_numSnapshotTakers字段來實現對數據的保護,目的是爲了告訴其餘出隊的線程我正在遍歷數據,大家執行出隊的時候不要把數據給刪了我要用的。在Core中也是爲了實現一樣的功能才引入了觀察保護的概念,換了一種實現方式而已。

那麼就以ToArray爲例是怎麼和其餘操做交互的:

public T[] ToArray()
{
    // 這一步能夠理解爲保護現場
    SnapForObservation(out ConcurrentQueueSegment<T> head, out int headHead, out ConcurrentQueueSegment<T> tail, out int tailTail);

    // 計算隊列長度,這也是要返回的數組大小
    long count = GetCount(head, headHead, tail, tailTail);
    T[] arr = new T[count];

    // 開始迭代數據塞到目標數組中
    using (IEnumerator<T> e = Enumerate(head, headHead, tail, tailTail))
    {
        int i = 0;
        while (e.MoveNext())
        {
            arr[i++] = e.Current;
        }
        Debug.Assert(count == i);
    }
    return arr;
}

上面的代碼中,有一次獲取隊列長度的操做,還有一次獲取迭代數據的操做,這兩步邏輯比較類似都是對整個隊列進行遍歷,因此作一次數據轉換的開銷很是很是大,使用的時候必定要謹慎。別的很少說,重點介紹一下如何實現保護現場的過程:

private void SnapForObservation(out ConcurrentQueueSegment<T> head, out int headHead, out ConcurrentQueueSegment<T> tail, out int tailTail)
{
    // 要保護現場確定要先來一把鎖
    lock (_crossSegmentLock) 
    {
        head = _head;
        tail = _tail;

        // 一段一段進行遍歷
        for (ConcurrentQueueSegment<T> s = head; ; s = s._nextSegment!)
        {
            // 把每一段的觀察保護標記設置成true
            s._preservedForObservation = true;
            // 遍歷到最後一段了就結束
            if (s == tail) break;
        }
        // 尾段凍結,這樣就不能新增元素
        tail.EnsureFrozenForEnqueues(); 

        // 返回兩個指針地址用來對每個元素進行遍歷
        headHead = Volatile.Read(ref head._headAndTail.Head);
        tailTail = Volatile.Read(ref tail._headAndTail.Tail);
    }
}

能夠看到上來就是一把鎖,若是此時正在進行擴容或者收容的操做會直接阻塞掉,運氣好沒有阻塞的話你也不能有新元素入隊了,由於尾段已經凍結鎖死只能自旋等待,而出隊也不能釋放空間了。原話是:

At this point, any dequeues from any segment won't overwrite the value, and none of the existing segments can have new items enqueued.

有人就要問,這裏把尾段鎖死那等ToArray()完成後豈不是也不能有新元素入隊了?不用擔憂,前面入隊邏輯提到過若是該段被鎖住隊列會新建立一個段而後再嘗試入隊,這樣就能成功了。可是問題又來了,假如前面的段還有不少空位,那豈不是有浪費空間的嫌疑?咱們知道沒有觀察保護的時候每段會以2倍長度遞增,這樣的話空間浪費率仍是挺高的。帶着疑問提了個Issue問一下:
https://github.com/dotnet/runtime/issues/35094

到這裏就基本把.NET Core ConcurrentQueue說完了。


總結

對比Framework下的併發隊列,Core裏面的改動仍是不小的,儘管保留了SpinWaitInterlocked相關操做,可是也加入了lock,邏輯上也複雜了不少,我一步步分析和寫文章搞了好幾天。

至於性能對比,我找到一個官方給出的測試結果,有興趣的能夠看看:

https://github.com/dotnet/runtime/issues/27458#issuecomment-423964046

最後強行打個廣告,基於.NET Core平臺的開源分佈式任務調度系統ScheduleMaster有興趣的star支持一下,2.0版本即將上線:

相關文章
相關標籤/搜索