目錄node
筆者最近在作一個項目,項目中爲了提高吞吐量,使用了消息隊列,中間實現了生產消費模式,在生產消費者模式中須要有一個集合,來存儲生產者所生產的物品,筆者使用了最多見的List<T>
集合類型。c#
因爲生產者線程有不少個,消費者線程也有不少個,因此不可避免的就產生了線程同步的問題。開始筆者是使用lock
關鍵字,進行線程同步,可是性能並非特別理想,而後有網友說可使用SynchronizedList<T>
來代替使用List<T>
達到線程安全的目的。因而筆者就替換成了SynchronizedList<T>
,可是發現性能依舊糟糕,因而查看了SynchronizedList<T>
的源代碼,發現它就是簡單的在List<T>
提供的API的基礎上加了lock
,因此性能基本與筆者實現方式相差無幾。數組
最後筆者找到了解決的方案,使用ConcurrentBag<T>
類來實現,性能有很大的改觀,因而筆者查看了ConcurrentBag<T>
的源代碼,實現很是精妙,特此在這記錄一下。緩存
ConcurrentBag<T>
實現了IProducerConsumerCollection<T>
接口,該接口主要用於生產者消費者模式下,可見該類基本就是爲生產消費者模式定製的。而後還實現了常規的IReadOnlyCollection<T>
類,實現了該類就須要實現IEnumerable<T>、IEnumerable、 ICollection
類。安全
ConcurrentBag<T>
對外提供的方法沒有List<T>
那麼多,可是一樣有Enumerable
實現的擴展方法。類自己提供的方法以下所示。多線程
名稱 | 說明 |
---|---|
Add | 將對象添加到 ConcurrentBag
|
CopyTo | 從指定數組索引開始,將 ConcurrentBag
|
Equals(Object) | 肯定指定的 Object 是否等於當前的 Object。 (繼承自 Object。) |
Finalize | 容許對象在「垃圾回收」回收以前嘗試釋放資源並執行其餘清理操做。 (繼承自 Object。) |
GetEnumerator | 返回循環訪問 ConcurrentBag
|
GetHashCode | 用做特定類型的哈希函數。 (繼承自 Object。) |
GetType | 獲取當前實例的 Type。 (繼承自 Object。) |
MemberwiseClone | 建立當前 Object 的淺表副本。 (繼承自 Object。) |
ToArray | 將 ConcurrentBag
|
ToString | 返回表示當前對象的字符串。 (繼承自 Object。) |
TryPeek | 嘗試從 ConcurrentBag
|
TryTake | 嘗試從 ConcurrentBag
|
ConcurrentBag
線程安全實現主要是經過它的數據存儲的結構和細顆粒度的鎖。函數
public class ConcurrentBag<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T> { // ThreadLocalList對象包含每一個線程的數據 ThreadLocal<ThreadLocalList> m_locals; // 這個頭指針和尾指針指向中的第一個和最後一個本地列表,這些本地列表分散在不一樣線程中 // 容許在線程局部對象上枚舉 volatile ThreadLocalList m_headList, m_tailList; // 這個標誌是告知操做線程必須同步操做 // 在GlobalListsLock 鎖中 設置 bool m_needSync; }
首選咱們來看它聲明的私有字段,其中須要注意的是集合的數據是存放在ThreadLocal
線程本地存儲中的。也就是說訪問它的每一個線程會維護一個本身的集合數據列表,一個集合中的數據可能會存放在不一樣線程的本地存儲空間中,因此若是線程訪問本身本地存儲的對象,那麼是沒有問題的,這就是實現線程安全的第一層,使用線程本地存儲數據。性能
而後能夠看到ThreadLocalList m_headList, m_tailList;
這個是存放着本地列表對象的頭指針和尾指針,經過這兩個指針,咱們就能夠經過遍歷的方式來訪問全部本地列表。它使用volatile
修飾,不容許線程進行本地緩存,每一個線程的讀寫都是直接操做在共享內存上,這就保證了變量始終具備一致性。任何線程在任什麼時候間進行讀寫操做均是最新值。對於volatile
修飾符,感謝我是攻城獅指出描述錯誤。ui
最後又定義了一個標誌,這個標誌告知操做線程必須進行同步操做,這是實現了一個細顆粒度的鎖,由於只有在幾個條件知足的狀況下才須要進行線程同步。this
接下來咱們來看一下ThreadLocalList
類的構造,該類就是實際存儲了數據的位置。實際上它是使用雙向鏈表這種結構進行數據存儲。
[Serializable] // 構造了雙向鏈表的節點 internal class Node { public Node(T value) { m_value = value; } public readonly T m_value; public Node m_next; public Node m_prev; } /// <summary> /// 集合操做類型 /// </summary> internal enum ListOperation { None, Add, Take }; /// <summary> /// 線程鎖定的類 /// </summary> internal class ThreadLocalList { // 雙向鏈表的頭結點 若是爲null那麼表示鏈表爲空 internal volatile Node m_head; // 雙向鏈表的尾節點 private volatile Node m_tail; // 定義當前對List進行操做的種類 // 與前面的 ListOperation 相對應 internal volatile int m_currentOp; // 這個列表元素的計數 private int m_count; // The stealing count // 這個不是特別理解 好像是在本地列表中 刪除某個Node 之後的計數 internal int m_stealCount; // 下一個列表 可能會在其它線程中 internal volatile ThreadLocalList m_nextList; // 設定鎖定是否已進行 internal bool m_lockTaken; // The owner thread for this list internal Thread m_ownerThread; // 列表的版本,只有當列表從空變爲非空統計是底層 internal volatile int m_version; /// <summary> /// ThreadLocalList 構造器 /// </summary> /// <param name="ownerThread">擁有這個集合的線程</param> internal ThreadLocalList(Thread ownerThread) { m_ownerThread = ownerThread; } /// <summary> /// 添加一個新的item到鏈表首部 /// </summary> /// <param name="item">The item to add.</param> /// <param name="updateCount">是否更新計數.</param> internal void Add(T item, bool updateCount) { checked { m_count++; } Node node = new Node(item); if (m_head == null) { Debug.Assert(m_tail == null); m_head = node; m_tail = node; m_version++; // 由於進行初始化了,因此將空狀態改成非空狀態 } else { // 使用頭插法 將新的元素插入鏈表 node.m_next = m_head; m_head.m_prev = node; m_head = node; } if (updateCount) // 更新計數以免此添加同步時溢出 { m_count = m_count - m_stealCount; m_stealCount = 0; } } /// <summary> /// 從列表的頭部刪除一個item /// </summary> /// <param name="result">The removed item</param> internal void Remove(out T result) { // 雙向鏈表刪除頭結點數據的流程 Debug.Assert(m_head != null); Node head = m_head; m_head = m_head.m_next; if (m_head != null) { m_head.m_prev = null; } else { m_tail = null; } m_count--; result = head.m_value; } /// <summary> /// 返回列表頭部的元素 /// </summary> /// <param name="result">the peeked item</param> /// <returns>True if succeeded, false otherwise</returns> internal bool Peek(out T result) { Node head = m_head; if (head != null) { result = head.m_value; return true; } result = default(T); return false; } /// <summary> /// 從列表的尾部獲取一個item /// </summary> /// <param name="result">the removed item</param> /// <param name="remove">remove or peek flag</param> internal void Steal(out T result, bool remove) { Node tail = m_tail; Debug.Assert(tail != null); if (remove) // Take operation { m_tail = m_tail.m_prev; if (m_tail != null) { m_tail.m_next = null; } else { m_head = null; } // Increment the steal count m_stealCount++; } result = tail.m_value; } /// <summary> /// 獲取總計列表計數, 它不是線程安全的, 若是同時調用它, 則可能提供不正確的計數 /// </summary> internal int Count { get { return m_count - m_stealCount; } } }
從上面的代碼中咱們能夠更加驗證以前的觀點,就是ConcurentBag<T>
在一個線程中存儲數據時,使用的是雙向鏈表,ThreadLocalList
實現了一組對鏈表增刪改查的方法。
接下來咱們看一看ConcurentBag<T>
是如何新增元素的。
/// <summary> /// 嘗試獲取無主列表,無主列表是指線程已經被暫停或者終止,可是集合中的部分數據還存儲在那裏 /// 這是避免內存泄漏的方法 /// </summary> /// <returns></returns> private ThreadLocalList GetUnownedList() { //此時必須持有全局鎖 Contract.Assert(Monitor.IsEntered(GlobalListsLock)); // 從頭線程列表開始枚舉 找到那些已經被關閉的線程 // 將它所在的列表對象 返回 ThreadLocalList currentList = m_headList; while (currentList != null) { if (currentList.m_ownerThread.ThreadState == System.Threading.ThreadState.Stopped) { currentList.m_ownerThread = Thread.CurrentThread; // the caller should acquire a lock to make this line thread safe return currentList; } currentList = currentList.m_nextList; } return null; } /// <summary> /// 本地幫助方法,經過線程對象檢索線程線程本地列表 /// </summary> /// <param name="forceCreate">若是列表不存在,那麼建立新列表</param> /// <returns>The local list object</returns> private ThreadLocalList GetThreadList(bool forceCreate) { ThreadLocalList list = m_locals.Value; if (list != null) { return list; } else if (forceCreate) { // 獲取用於更新操做的 m_tailList 鎖 lock (GlobalListsLock) { // 若是頭列表等於空,那麼說明集合中尚未元素 // 直接建立一個新的 if (m_headList == null) { list = new ThreadLocalList(Thread.CurrentThread); m_headList = list; m_tailList = list; } else { // ConcurrentBag內的數據是以雙向鏈表的形式分散存儲在各個線程的本地區域中 // 經過下面這個方法 能夠找到那些存儲有數據 可是已經被中止的線程 // 而後將已中止線程的數據 移交到當前線程管理 list = GetUnownedList(); // 若是沒有 那麼就新建一個列表 而後更新尾指針的位置 if (list == null) { list = new ThreadLocalList(Thread.CurrentThread); m_tailList.m_nextList = list; m_tailList = list; } } m_locals.Value = list; } } else { return null; } Debug.Assert(list != null); return list; } /// <summary> /// Adds an object to the <see cref="ConcurrentBag{T}"/>. /// </summary> /// <param name="item">The object to be added to the /// <see cref="ConcurrentBag{T}"/>. The value can be a null reference /// (Nothing in Visual Basic) for reference types.</param> public void Add(T item) { // 獲取該線程的本地列表, 若是此線程不存在, 則建立一個新列表 (第一次調用 add) ThreadLocalList list = GetThreadList(true); // 實際的數據添加操做 在AddInternal中執行 AddInternal(list, item); } /// <summary> /// </summary> /// <param name="list"></param> /// <param name="item"></param> private void AddInternal(ThreadLocalList list, T item) { bool lockTaken = false; try { #pragma warning disable 0420 Interlocked.Exchange(ref list.m_currentOp, (int)ListOperation.Add); #pragma warning restore 0420 // 同步案例: // 若是列表計數小於兩個, 由於是雙向鏈表的關係 爲了不與任何竊取線程發生衝突 必須獲取鎖 // 若是設置了 m_needSync, 這意味着有一個線程須要凍結包 也必須獲取鎖 if (list.Count < 2 || m_needSync) { // 將其重置爲None 以免與竊取線程的死鎖 list.m_currentOp = (int)ListOperation.None; // 鎖定當前對象 Monitor.Enter(list, ref lockTaken); } // 調用 ThreadLocalList.Add方法 將數據添加到雙向鏈表中 // 若是已經鎖定 那麼說明線程安全 能夠更新Count 計數 list.Add(item, lockTaken); } finally { list.m_currentOp = (int)ListOperation.None; if (lockTaken) { Monitor.Exit(list); } } }
從上面代碼中,咱們能夠很清楚的知道Add()
方法是如何運行的,其中的關鍵就是GetThreadList()
方法,經過該方法能夠獲取當前線程的數據存儲列表對象,假如不存在數據存儲列表,它會自動建立或者經過GetUnownedList()
方法來尋找那些被中止可是還存儲有數據列表的線程,而後將數據列表返回給當前線程中,防止了內存泄漏。
在數據添加的過程當中,實現了細顆粒度的lock
同步鎖,因此性能會很高。刪除和其它操做與新增相似,本文再也不贅述。
看完上面的代碼後,我很好奇ConcurrentBag<T>
是如何實現IEnumerator
來實現迭代訪問的,由於ConcurrentBag<T>
是經過分散在不一樣線程中的ThreadLocalList
來存儲數據的,那麼在實現迭代器模式時,過程會比較複雜。
後面再查看了源碼以後,發現ConcurrentBag<T>
爲了實現迭代器模式,將分在不一樣線程中的數據全都存到一個List<T>
集合中,而後返回了該副本的迭代器。因此每次訪問迭代器,它都會新建一個List<T>
的副本,這樣雖然浪費了必定的存儲空間,可是邏輯上更加簡單了。
/// <summary> /// 本地幫助器方法釋放全部本地列表鎖 /// </summary> private void ReleaseAllLocks() { // 該方法用於在執行線程同步之後 釋放掉全部本地鎖 // 經過遍歷每一個線程中存儲的 ThreadLocalList對象 釋放所佔用的鎖 ThreadLocalList currentList = m_headList; while (currentList != null) { if (currentList.m_lockTaken) { currentList.m_lockTaken = false; Monitor.Exit(currentList); } currentList = currentList.m_nextList; } } /// <summary> /// 從凍結狀態解凍包的本地幫助器方法 /// </summary> /// <param name="lockTaken">The lock taken result from the Freeze method</param> private void UnfreezeBag(bool lockTaken) { // 首先釋放掉 每一個線程中 本地變量的鎖 // 而後釋放全局鎖 ReleaseAllLocks(); m_needSync = false; if (lockTaken) { Monitor.Exit(GlobalListsLock); } } /// <summary> /// 本地幫助器函數等待全部未同步的操做 /// </summary> private void WaitAllOperations() { Contract.Assert(Monitor.IsEntered(GlobalListsLock)); ThreadLocalList currentList = m_headList; // 自旋等待 等待其它操做完成 while (currentList != null) { if (currentList.m_currentOp != (int)ListOperation.None) { SpinWait spinner = new SpinWait(); // 有其它線程進行操做時,會將cuurentOp 設置成 正在操做的枚舉 while (currentList.m_currentOp != (int)ListOperation.None) { spinner.SpinOnce(); } } currentList = currentList.m_nextList; } } /// <summary> /// 本地幫助器方法獲取全部本地列表鎖 /// </summary> private void AcquireAllLocks() { Contract.Assert(Monitor.IsEntered(GlobalListsLock)); bool lockTaken = false; ThreadLocalList currentList = m_headList; // 遍歷每一個線程的ThreadLocalList 而後獲取對應ThreadLocalList的鎖 while (currentList != null) { // 嘗試/最後 bllock 以免在獲取鎖和設置所採起的標誌之間的線程港口 try { Monitor.Enter(currentList, ref lockTaken); } finally { if (lockTaken) { currentList.m_lockTaken = true; lockTaken = false; } } currentList = currentList.m_nextList; } } /// <summary> /// Local helper method to freeze all bag operations, it /// 1- Acquire the global lock to prevent any other thread to freeze the bag, and also new new thread can be added /// to the dictionary /// 2- Then Acquire all local lists locks to prevent steal and synchronized operations /// 3- Wait for all un-synchronized operations to be done /// </summary> /// <param name="lockTaken">Retrieve the lock taken result for the global lock, to be passed to Unfreeze method</param> private void FreezeBag(ref bool lockTaken) { Contract.Assert(!Monitor.IsEntered(GlobalListsLock)); // 全局鎖定可安全地防止多線程調用計數和損壞 m_needSync Monitor.Enter(GlobalListsLock, ref lockTaken); // 這將強制同步任何未來的添加/執行操做 m_needSync = true; // 獲取全部列表的鎖 AcquireAllLocks(); // 等待全部操做完成 WaitAllOperations(); } /// <summary> /// 本地幫助器函數返回列表中的包項, 這主要由 CopyTo 和 ToArray 使用。 /// 這不是線程安全, 應該被稱爲凍結/解凍袋塊 /// 本方法是私有的 只有使用 Freeze/UnFreeze以後纔是安全的 /// </summary> /// <returns>List the contains the bag items</returns> private List<T> ToList() { Contract.Assert(Monitor.IsEntered(GlobalListsLock)); // 建立一個新的List List<T> list = new List<T>(); ThreadLocalList currentList = m_headList; // 遍歷每一個線程中的ThreadLocalList 將裏面的Node的數據 添加到list中 while (currentList != null) { Node currentNode = currentList.m_head; while (currentNode != null) { list.Add(currentNode.m_value); currentNode = currentNode.m_next; } currentList = currentList.m_nextList; } return list; } /// <summary> /// Returns an enumerator that iterates through the <see /// cref="ConcurrentBag{T}"/>. /// </summary> /// <returns>An enumerator for the contents of the <see /// cref="ConcurrentBag{T}"/>.</returns> /// <remarks> /// The enumeration represents a moment-in-time snapshot of the contents /// of the bag. It does not reflect any updates to the collection after /// <see cref="GetEnumerator"/> was called. The enumerator is safe to use /// concurrently with reads from and writes to the bag. /// </remarks> public IEnumerator<T> GetEnumerator() { // Short path if the bag is empty if (m_headList == null) return new List<T>().GetEnumerator(); // empty list bool lockTaken = false; try { // 首先凍結整個 ConcurrentBag集合 FreezeBag(ref lockTaken); // 而後ToList 再拿到 List的 IEnumerator return ToList().GetEnumerator(); } finally { UnfreezeBag(lockTaken); } }
由上面的代碼可知道,爲了獲取迭代器對象,總共進行了三步主要的操做。
- 使用
FreezeBag()
方法,凍結整個ConcurrentBag<T>
集合。由於須要生成集合的List<T>
副本,生成副本期間不能有其它線程更改損壞數據。- 將
ConcurrrentBag<T>
生成List<T>
副本。由於ConcurrentBag<T>
存儲數據的方式比較特殊,直接實現迭代器模式困難,考慮到線程安全和邏輯,最佳的辦法是生成一個副本。- 完成以上操做之後,就可使用
UnfreezeBag()
方法解凍整個集合。
那麼FreezeBag()
方法是如何來凍結整個集合的呢?也是分爲三步走。
- 首先獲取全局鎖,經過
Monitor.Enter(GlobalListsLock, ref lockTaken);
這樣一條語句,這樣其它線程就不能凍結集合。- 而後獲取全部線程中
ThreadLocalList
的鎖,經過`AcquireAllLocks()方法來遍歷獲取。這樣其它線程就不能對它進行操做損壞數據。- 等待已經進入了操做流程線程結束,經過
WaitAllOperations()
方法來實現,該方法會遍歷每個ThreadLocalList
對象的m_currentOp
屬性,確保所有處於None
操做。
完成以上流程後,那麼就是真正的凍結了整個ConcurrentBag<T>
集合,要解凍的話也相似。在此再也不贅述。
下面給出一張圖,描述了ConcurrentBag<T>
是如何存儲數據的。經過每一個線程中的ThreadLocal
來實現線程本地存儲,每一個線程中都有這樣的結構,互不干擾。而後每一個線程中的m_headList
老是指向ConcurrentBag<T>
的第一個列表,m_tailList
指向最後一個列表。列表與列表之間經過m_locals
下的 m_nextList
相連,構成一個單鏈表。
數據存儲在每一個線程的m_locals
中,經過Node
類構成一個雙向鏈表。
PS: 要注意m_tailList
和m_headList
並非存儲在ThreadLocal
中,而是全部的線程共享一份。
以上就是有關ConcurrentBag<T>
類的實現,筆者的一些記錄和解析。
附上ConcurrentBag<T>
源碼地址:戳一戳