C# ConcurrentBag的實現原理



1、前言

筆者最近在作一個項目,項目中爲了提高吞吐量,使用了消息隊列,中間實現了生產消費模式,在生產消費者模式中須要有一個集合,來存儲生產者所生產的物品,筆者使用了最多見的List<T>集合類型。c#

因爲生產者線程有不少個,消費者線程也有不少個,因此不可避免的就產生了線程同步的問題。開始筆者是使用lock關鍵字,進行線程同步,可是性能並非特別理想,而後有網友說可使用SynchronizedList<T>來代替使用List<T>達到線程安全的目的。因而筆者就替換成了SynchronizedList<T>,可是發現性能依舊糟糕,因而查看了SynchronizedList<T>的源代碼,發現它就是簡單的在List<T>提供的API的基礎上加了lock,因此性能基本與筆者實現方式相差無幾。數組

最後筆者找到了解決的方案,使用ConcurrentBag<T>類來實現,性能有很大的改觀,因而筆者查看了ConcurrentBag<T>的源代碼,實現很是精妙,特此在這記錄一下。緩存

2、ConcurrentBag類

ConcurrentBag<T>實現了IProducerConsumerCollection<T>接口,該接口主要用於生產者消費者模式下,可見該類基本就是爲生產消費者模式定製的。而後還實現了常規的IReadOnlyCollection<T>類,實現了該類就須要實現IEnumerable<T>、IEnumerable、 ICollection類。安全

ConcurrentBag<T>對外提供的方法沒有List<T>那麼多,可是一樣有Enumerable實現的擴展方法。類自己提供的方法以下所示。多線程

名稱 說明
Add 將對象添加到 ConcurrentBag 中。
CopyTo 從指定數組索引開始,將 ConcurrentBag 元素複製到現有的一維 Array 中。
Equals(Object) 肯定指定的 Object 是否等於當前的 Object。 (繼承自 Object。)
Finalize 容許對象在「垃圾回收」回收以前嘗試釋放資源並執行其餘清理操做。 (繼承自 Object。)
GetEnumerator 返回循環訪問 ConcurrentBag 的枚舉器。
GetHashCode 用做特定類型的哈希函數。 (繼承自 Object。)
GetType 獲取當前實例的 Type。 (繼承自 Object。)
MemberwiseClone 建立當前 Object 的淺表副本。 (繼承自 Object。)
ToArray 將 ConcurrentBag 元素複製到新數組。
ToString 返回表示當前對象的字符串。 (繼承自 Object。)
TryPeek 嘗試從 ConcurrentBag 返回一個對象但不移除該對象。
TryTake 嘗試從 ConcurrentBag 中移除並返回對象。

3、 ConcurrentBag線程安全實現原理

1. ConcurrentBag的私有字段

ConcurrentBag線程安全實現主要是經過它的數據存儲的結構和細顆粒度的鎖。函數

public class ConcurrentBag<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T>
    {
        // ThreadLocalList對象包含每一個線程的數據
        ThreadLocal<ThreadLocalList> m_locals;
 
        // 這個頭指針和尾指針指向中的第一個和最後一個本地列表,這些本地列表分散在不一樣線程中
        // 容許在線程局部對象上枚舉
        volatile ThreadLocalList m_headList, m_tailList;
 
        // 這個標誌是告知操做線程必須同步操做
        // 在GlobalListsLock 鎖中 設置
        bool m_needSync;

}

首選咱們來看它聲明的私有字段,其中須要注意的是集合的數據是存放在ThreadLocal線程本地存儲中的。也就是說訪問它的每一個線程會維護一個本身的集合數據列表,一個集合中的數據可能會存放在不一樣線程的本地存儲空間中,因此若是線程訪問本身本地存儲的對象,那麼是沒有問題的,這就是實現線程安全的第一層,使用線程本地存儲數據性能

而後能夠看到ThreadLocalList m_headList, m_tailList;這個是存放着本地列表對象的頭指針和尾指針,經過這兩個指針,咱們就能夠經過遍歷的方式來訪問全部本地列表。它使用volatile修飾,不容許線程進行本地緩存,每一個線程的讀寫都是直接操做在共享內存上,這就保證了變量始終具備一致性。任何線程在任什麼時候間進行讀寫操做均是最新值。對於volatile修飾符,感謝我是攻城獅指出描述錯誤。ui

最後又定義了一個標誌,這個標誌告知操做線程必須進行同步操做,這是實現了一個細顆粒度的鎖,由於只有在幾個條件知足的狀況下才須要進行線程同步。this

2. 用於數據存儲的TrehadLocalList類

接下來咱們來看一下ThreadLocalList類的構造,該類就是實際存儲了數據的位置。實際上它是使用雙向鏈表這種結構進行數據存儲。

[Serializable]
// 構造了雙向鏈表的節點
internal class Node
{
    public Node(T value)
    {
        m_value = value;
    }
    public readonly T m_value;
    public Node m_next;
    public Node m_prev;
}

/// <summary>
/// 集合操做類型
/// </summary>
internal enum ListOperation
{
    None,
    Add,
    Take
};

/// <summary>
/// 線程鎖定的類
/// </summary>
internal class ThreadLocalList
{
    // 雙向鏈表的頭結點 若是爲null那麼表示鏈表爲空
    internal volatile Node m_head;

    // 雙向鏈表的尾節點
    private volatile Node m_tail;

    // 定義當前對List進行操做的種類 
    // 與前面的 ListOperation 相對應
    internal volatile int m_currentOp;

    // 這個列表元素的計數
    private int m_count;

    // The stealing count
    // 這個不是特別理解 好像是在本地列表中 刪除某個Node 之後的計數
    internal int m_stealCount;

    // 下一個列表 可能會在其它線程中
    internal volatile ThreadLocalList m_nextList;

    // 設定鎖定是否已進行
    internal bool m_lockTaken;

    // The owner thread for this list
    internal Thread m_ownerThread;

    // 列表的版本,只有當列表從空變爲非空統計是底層
    internal volatile int m_version;

    /// <summary>
    /// ThreadLocalList 構造器
    /// </summary>
    /// <param name="ownerThread">擁有這個集合的線程</param>
    internal ThreadLocalList(Thread ownerThread)
    {
        m_ownerThread = ownerThread;
    }
    /// <summary>
    /// 添加一個新的item到鏈表首部
    /// </summary>
    /// <param name="item">The item to add.</param>
    /// <param name="updateCount">是否更新計數.</param>
    internal void Add(T item, bool updateCount)
    {
        checked
        {
            m_count++;
        }
        Node node = new Node(item);
        if (m_head == null)
        {
            Debug.Assert(m_tail == null);
            m_head = node;
            m_tail = node;
            m_version++; // 由於進行初始化了,因此將空狀態改成非空狀態
        }
        else
        {
            // 使用頭插法 將新的元素插入鏈表
            node.m_next = m_head;
            m_head.m_prev = node;
            m_head = node;
        }
        if (updateCount) // 更新計數以免此添加同步時溢出
        {
            m_count = m_count - m_stealCount;
            m_stealCount = 0;
        }
    }

    /// <summary>
    /// 從列表的頭部刪除一個item
    /// </summary>
    /// <param name="result">The removed item</param>
    internal void Remove(out T result)
    {
        // 雙向鏈表刪除頭結點數據的流程
        Debug.Assert(m_head != null);
        Node head = m_head;
        m_head = m_head.m_next;
        if (m_head != null)
        {
            m_head.m_prev = null;
        }
        else
        {
            m_tail = null;
        }
        m_count--;
        result = head.m_value;

    }

    /// <summary>
    /// 返回列表頭部的元素
    /// </summary>
    /// <param name="result">the peeked item</param>
    /// <returns>True if succeeded, false otherwise</returns>
    internal bool Peek(out T result)
    {
        Node head = m_head;
        if (head != null)
        {
            result = head.m_value;
            return true;
        }
        result = default(T);
        return false;
    }

    /// <summary>
    /// 從列表的尾部獲取一個item
    /// </summary>
    /// <param name="result">the removed item</param>
    /// <param name="remove">remove or peek flag</param>
    internal void Steal(out T result, bool remove)
    {
        Node tail = m_tail;
        Debug.Assert(tail != null);
        if (remove) // Take operation
        {
            m_tail = m_tail.m_prev;
            if (m_tail != null)
            {
                m_tail.m_next = null;
            }
            else
            {
                m_head = null;
            }
            // Increment the steal count
            m_stealCount++;
        }
        result = tail.m_value;
    }


    /// <summary>
    /// 獲取總計列表計數, 它不是線程安全的, 若是同時調用它, 則可能提供不正確的計數
    /// </summary>
    internal int Count
    {
        get
        {
            return m_count - m_stealCount;
        }
    }
}

從上面的代碼中咱們能夠更加驗證以前的觀點,就是ConcurentBag<T>在一個線程中存儲數據時,使用的是雙向鏈表ThreadLocalList實現了一組對鏈表增刪改查的方法。

3. ConcurrentBag實現新增元素

接下來咱們看一看ConcurentBag<T>是如何新增元素的。

/// <summary>
/// 嘗試獲取無主列表,無主列表是指線程已經被暫停或者終止,可是集合中的部分數據還存儲在那裏
/// 這是避免內存泄漏的方法
/// </summary>
/// <returns></returns>
private ThreadLocalList GetUnownedList()
{
    //此時必須持有全局鎖
    Contract.Assert(Monitor.IsEntered(GlobalListsLock));

    // 從頭線程列表開始枚舉 找到那些已經被關閉的線程
    // 將它所在的列表對象 返回
    ThreadLocalList currentList = m_headList;
    while (currentList != null)
    {
        if (currentList.m_ownerThread.ThreadState == System.Threading.ThreadState.Stopped)
        {
            currentList.m_ownerThread = Thread.CurrentThread; // the caller should acquire a lock to make this line thread safe
            return currentList;
        }
        currentList = currentList.m_nextList;
    }
    return null;
}
/// <summary>
/// 本地幫助方法,經過線程對象檢索線程線程本地列表
/// </summary>
/// <param name="forceCreate">若是列表不存在,那麼建立新列表</param>
/// <returns>The local list object</returns>
private ThreadLocalList GetThreadList(bool forceCreate)
{
    ThreadLocalList list = m_locals.Value;

    if (list != null)
    {
        return list;
    }
    else if (forceCreate)
    {
        // 獲取用於更新操做的 m_tailList 鎖
        lock (GlobalListsLock)
        {
            // 若是頭列表等於空,那麼說明集合中尚未元素
            // 直接建立一個新的
            if (m_headList == null)
            {
                list = new ThreadLocalList(Thread.CurrentThread);
                m_headList = list;
                m_tailList = list;
            }
            else
            {
               // ConcurrentBag內的數據是以雙向鏈表的形式分散存儲在各個線程的本地區域中
                // 經過下面這個方法 能夠找到那些存儲有數據 可是已經被中止的線程
                // 而後將已中止線程的數據 移交到當前線程管理
                list = GetUnownedList();
                // 若是沒有 那麼就新建一個列表 而後更新尾指針的位置
                if (list == null)
                {
                    list = new ThreadLocalList(Thread.CurrentThread);
                    m_tailList.m_nextList = list;
                    m_tailList = list;
                }
            }
            m_locals.Value = list;
        }
    }
    else
    {
        return null;
    }
    Debug.Assert(list != null);
    return list;
}
/// <summary>
/// Adds an object to the <see cref="ConcurrentBag{T}"/>.
/// </summary>
/// <param name="item">The object to be added to the
/// <see cref="ConcurrentBag{T}"/>. The value can be a null reference
/// (Nothing in Visual Basic) for reference types.</param>
public void Add(T item)
{
    // 獲取該線程的本地列表, 若是此線程不存在, 則建立一個新列表 (第一次調用 add)
    ThreadLocalList list = GetThreadList(true);
    // 實際的數據添加操做 在AddInternal中執行
    AddInternal(list, item);
}

/// <summary>
/// </summary>
/// <param name="list"></param>
/// <param name="item"></param>
private void AddInternal(ThreadLocalList list, T item)
{
    bool lockTaken = false;
    try
    {
        #pragma warning disable 0420
        Interlocked.Exchange(ref list.m_currentOp, (int)ListOperation.Add);
        #pragma warning restore 0420
        // 同步案例:
        // 若是列表計數小於兩個, 由於是雙向鏈表的關係 爲了不與任何竊取線程發生衝突 必須獲取鎖
        // 若是設置了 m_needSync, 這意味着有一個線程須要凍結包 也必須獲取鎖
        if (list.Count < 2 || m_needSync)
        {
            // 將其重置爲None 以免與竊取線程的死鎖
            list.m_currentOp = (int)ListOperation.None;
            // 鎖定當前對象
            Monitor.Enter(list, ref lockTaken);
        }
        // 調用 ThreadLocalList.Add方法 將數據添加到雙向鏈表中
        // 若是已經鎖定 那麼說明線程安全  能夠更新Count 計數
        list.Add(item, lockTaken);
    }
    finally
    {
        list.m_currentOp = (int)ListOperation.None;
        if (lockTaken)
        {
            Monitor.Exit(list);
        }
    }
}

從上面代碼中,咱們能夠很清楚的知道Add()方法是如何運行的,其中的關鍵就是GetThreadList()方法,經過該方法能夠獲取當前線程的數據存儲列表對象,假如不存在數據存儲列表,它會自動建立或者經過GetUnownedList()方法來尋找那些被中止可是還存儲有數據列表的線程,而後將數據列表返回給當前線程中,防止了內存泄漏。

在數據添加的過程當中,實現了細顆粒度的lock同步鎖,因此性能會很高。刪除和其它操做與新增相似,本文再也不贅述。

4. ConcurrentBag 如何實現迭代器模式

看完上面的代碼後,我很好奇ConcurrentBag<T>是如何實現IEnumerator來實現迭代訪問的,由於ConcurrentBag<T>是經過分散在不一樣線程中的ThreadLocalList來存儲數據的,那麼在實現迭代器模式時,過程會比較複雜。

後面再查看了源碼以後,發現ConcurrentBag<T>爲了實現迭代器模式,將分在不一樣線程中的數據全都存到一個List<T>集合中,而後返回了該副本的迭代器。因此每次訪問迭代器,它都會新建一個List<T>的副本,這樣雖然浪費了必定的存儲空間,可是邏輯上更加簡單了。

/// <summary>
/// 本地幫助器方法釋放全部本地列表鎖
/// </summary>
private void ReleaseAllLocks()
{
    // 該方法用於在執行線程同步之後 釋放掉全部本地鎖
    // 經過遍歷每一個線程中存儲的 ThreadLocalList對象 釋放所佔用的鎖
    ThreadLocalList currentList = m_headList;
    while (currentList != null)
    {

        if (currentList.m_lockTaken)
        {
            currentList.m_lockTaken = false;
            Monitor.Exit(currentList);
        }
        currentList = currentList.m_nextList;
    }
}

/// <summary>
/// 從凍結狀態解凍包的本地幫助器方法
/// </summary>
/// <param name="lockTaken">The lock taken result from the Freeze method</param>
private void UnfreezeBag(bool lockTaken)
{
    // 首先釋放掉 每一個線程中 本地變量的鎖
    // 而後釋放全局鎖
    ReleaseAllLocks();
    m_needSync = false;
    if (lockTaken)
    {
        Monitor.Exit(GlobalListsLock);
    }
}

/// <summary>
/// 本地幫助器函數等待全部未同步的操做
/// </summary>
private void WaitAllOperations()
{
    Contract.Assert(Monitor.IsEntered(GlobalListsLock));

    ThreadLocalList currentList = m_headList;
    // 自旋等待 等待其它操做完成
    while (currentList != null)
    {
        if (currentList.m_currentOp != (int)ListOperation.None)
        {
            SpinWait spinner = new SpinWait();
            // 有其它線程進行操做時,會將cuurentOp 設置成 正在操做的枚舉
            while (currentList.m_currentOp != (int)ListOperation.None)
            {
                spinner.SpinOnce();
            }
        }
        currentList = currentList.m_nextList;
    }
}

/// <summary>
/// 本地幫助器方法獲取全部本地列表鎖
/// </summary>
private void AcquireAllLocks()
{
    Contract.Assert(Monitor.IsEntered(GlobalListsLock));

    bool lockTaken = false;
    ThreadLocalList currentList = m_headList;
    
    // 遍歷每一個線程的ThreadLocalList 而後獲取對應ThreadLocalList的鎖
    while (currentList != null)
    {
        // 嘗試/最後 bllock 以免在獲取鎖和設置所採起的標誌之間的線程港口
        try
        {
            Monitor.Enter(currentList, ref lockTaken);
        }
        finally
        {
            if (lockTaken)
            {
                currentList.m_lockTaken = true;
                lockTaken = false;
            }
        }
        currentList = currentList.m_nextList;
    }
}

/// <summary>
/// Local helper method to freeze all bag operations, it
/// 1- Acquire the global lock to prevent any other thread to freeze the bag, and also new new thread can be added
/// to the dictionary
/// 2- Then Acquire all local lists locks to prevent steal and synchronized operations
/// 3- Wait for all un-synchronized operations to be done
/// </summary>
/// <param name="lockTaken">Retrieve the lock taken result for the global lock, to be passed to Unfreeze method</param>
private void FreezeBag(ref bool lockTaken)
{
    Contract.Assert(!Monitor.IsEntered(GlobalListsLock));

    // 全局鎖定可安全地防止多線程調用計數和損壞 m_needSync
    Monitor.Enter(GlobalListsLock, ref lockTaken);

    // 這將強制同步任何未來的添加/執行操做
    m_needSync = true;

    // 獲取全部列表的鎖
    AcquireAllLocks();

    // 等待全部操做完成
    WaitAllOperations();
}

/// <summary>
/// 本地幫助器函數返回列表中的包項, 這主要由 CopyTo 和 ToArray 使用。
/// 這不是線程安全, 應該被稱爲凍結/解凍袋塊
/// 本方法是私有的 只有使用 Freeze/UnFreeze以後纔是安全的 
/// </summary>
/// <returns>List the contains the bag items</returns>
private List<T> ToList()
{
    Contract.Assert(Monitor.IsEntered(GlobalListsLock));
    // 建立一個新的List
    List<T> list = new List<T>();
    ThreadLocalList currentList = m_headList;
    // 遍歷每一個線程中的ThreadLocalList 將裏面的Node的數據 添加到list中
    while (currentList != null)
    {
        Node currentNode = currentList.m_head;
        while (currentNode != null)
        {
            list.Add(currentNode.m_value);
            currentNode = currentNode.m_next;
        }
        currentList = currentList.m_nextList;
    }

    return list;
}

/// <summary>
/// Returns an enumerator that iterates through the <see
/// cref="ConcurrentBag{T}"/>.
/// </summary>
/// <returns>An enumerator for the contents of the <see
/// cref="ConcurrentBag{T}"/>.</returns>
/// <remarks>
/// The enumeration represents a moment-in-time snapshot of the contents
/// of the bag.  It does not reflect any updates to the collection after 
/// <see cref="GetEnumerator"/> was called.  The enumerator is safe to use
/// concurrently with reads from and writes to the bag.
/// </remarks>
public IEnumerator<T> GetEnumerator()
{
    // Short path if the bag is empty
    if (m_headList == null)
        return new List<T>().GetEnumerator(); // empty list

    bool lockTaken = false;
    try
    {
        // 首先凍結整個 ConcurrentBag集合
        FreezeBag(ref lockTaken);
        // 而後ToList 再拿到 List的 IEnumerator
        return ToList().GetEnumerator();
    }
    finally
    {
        UnfreezeBag(lockTaken);
    }
}

由上面的代碼可知道,爲了獲取迭代器對象,總共進行了三步主要的操做。

  1. 使用FreezeBag()方法,凍結整個ConcurrentBag<T>集合。由於須要生成集合的List<T>副本,生成副本期間不能有其它線程更改損壞數據。
  2. ConcurrrentBag<T>生成List<T>副本。由於ConcurrentBag<T>存儲數據的方式比較特殊,直接實現迭代器模式困難,考慮到線程安全和邏輯,最佳的辦法是生成一個副本。
  3. 完成以上操做之後,就可使用UnfreezeBag()方法解凍整個集合。

那麼FreezeBag()方法是如何來凍結整個集合的呢?也是分爲三步走。

  1. 首先獲取全局鎖,經過Monitor.Enter(GlobalListsLock, ref lockTaken);這樣一條語句,這樣其它線程就不能凍結集合。
  2. 而後獲取全部線程中ThreadLocalList的鎖,經過`AcquireAllLocks()方法來遍歷獲取。這樣其它線程就不能對它進行操做損壞數據。
  3. 等待已經進入了操做流程線程結束,經過WaitAllOperations()方法來實現,該方法會遍歷每個ThreadLocalList對象的m_currentOp屬性,確保所有處於None操做。

完成以上流程後,那麼就是真正的凍結了整個ConcurrentBag<T>集合,要解凍的話也相似。在此再也不贅述。

4、總結

下面給出一張圖,描述了ConcurrentBag<T>是如何存儲數據的。經過每一個線程中的ThreadLocal來實現線程本地存儲,每一個線程中都有這樣的結構,互不干擾。而後每一個線程中的m_headList老是指向ConcurrentBag<T>的第一個列表,m_tailList指向最後一個列表。列表與列表之間經過m_locals 下的 m_nextList相連,構成一個單鏈表。

數據存儲在每一個線程的m_locals中,經過Node類構成一個雙向鏈表。
PS: 要注意m_tailListm_headList並非存儲在ThreadLocal中,而是全部的線程共享一份。

1534581126728

以上就是有關ConcurrentBag<T>類的實現,筆者的一些記錄和解析。

筆者水平有限,若是錯誤歡迎各位批評指正!

附上ConcurrentBag<T>源碼地址:戳一戳

相關文章
相關標籤/搜索