ConcurrentBag根據操做線程,對不一樣線程分配不一樣的隊列進行數據操做。這樣,每一個隊列只有一個線程在操做,不會發生併發問題。其內部實現運用了net4.0新加入的ThreadLocal線程本地存儲功能。各個隊列間經過鏈表維護。node
其內部結構以下:數組
一、獲取線程本地隊列:安全
1 /// <summary> 2 /// 獲取當前線程的隊列 3 /// </summary> 4 /// <param name="forceCreate">若是線程沒有持有隊列,是否新建</param> 5 /// <returns></returns> 6 private ThreadLocalList<T> GetThreadList(bool forceCreate) 7 { 8 //嘗試獲取線程本地隊列列表(參考ThreadLocal),此處的m_locals不一樣線程持有不一樣實例 9 //若是獲取爲空,則說明線程是第一次執行此函數,須要分配一個隊列 10 ThreadLocalList<T> unownedList = this.m_locals.Value; 11 if (unownedList != null) 12 { 13 return unownedList; 14 } 15 if (forceCreate) 16 { 17 //獲取當前本地隊列鎖,防止在凍結隊列時產生衝突(參考FreezeBag函數) 18 object globalListsLock = this.GlobalListsLock; 19 lock (globalListsLock) 20 { 21 //獲取本地隊列 22 //若是沒有建立過隊列,則建立一個新的隊列;不然儘可能分配已有的線程終止的隊列 23 if (this.m_headList == null) 24 { 25 unownedList = new ThreadLocalList<T>(Thread.CurrentThread); 26 this.m_headList = unownedList; 27 this.m_tailList = unownedList; 28 } 29 else 30 { 31 //獲取無主隊列,不分配新隊列 32 unownedList = this.GetUnownedList(); 33 if (unownedList == null) 34 { 35 unownedList = new ThreadLocalList<T>(Thread.CurrentThread); 36 this.m_tailList.m_nextList = unownedList; 37 this.m_tailList = unownedList; 38 } 39 } 40 this.m_locals.Value = unownedList; 41 return unownedList; 42 } 43 } 44 return null; 45 }
二、獲取無主隊列併發
1 /// <summary> 2 /// 獲取無主隊列 3 /// 若是當前隊列的持有線程已經終止,則爲無主隊列 4 /// </summary> 5 /// <returns></returns> 6 private ThreadLocalList<T> GetUnownedList() 7 { 8 for (ThreadLocalList<T> list = this.m_headList; list != null; list = list.m_nextList) 9 { 10 if (list.m_ownerThread.ThreadState == System.Threading.ThreadState.Stopped) 11 { 12 list.m_ownerThread = Thread.CurrentThread; 13 return list; 14 } 15 } 16 return null; 17 }
三、插入操做代碼分析ide
1 /// <summary> 2 /// 向Bag添加元素 3 /// </summary> 4 /// <param name="item"></param> 5 6 [__DynamicallyInvokable] 7 public void Add(T item) 8 { 9 //獲取當前線程持有的隊列 10 ThreadLocalList<T> threadList = this.GetThreadList(true); 11 //向當前持有隊列添加數據 12 this.AddInternal(threadList, item); 13 } 14 15 /// <summary> 16 /// 向隊列添加數據 17 /// </summary> 18 /// <param name="list"></param> 19 /// <param name="item"></param> 20 private void AddInternal(ThreadLocalList<T> list, T item) 21 { 22 bool lockTaken = false; 23 try 24 { 25 //CAS原子操做,設置標誌位,與Steal和Freeze實現互斥 26 Interlocked.Exchange(ref list.m_currentOp, 1); 27 //若是m_needSync,則說明已經發起凍結操做,須要加鎖保證線程安全 28 if ((list.Count < 2) || this.m_needSync) 29 { 30 list.m_currentOp = 0; 31 Monitor.Enter(list, ref lockTaken); 32 } 33 list.Add(item, lockTaken); 34 } 35 finally 36 { 37 list.m_currentOp = 0; 38 if (lockTaken) 39 { 40 Monitor.Exit(list); 41 } 42 } 43 }
四、凍結Bag函數函數
1 /// <summary> 2 /// 凍結Bag,不能進行增,刪,獲取操做 3 /// </summary> 4 /// <param name="lockTaken"></param> 5 private void FreezeBag(ref bool lockTaken) 6 { 7 //獲取當前線程list鎖 8 Monitor.Enter(this.GlobalListsLock, ref lockTaken); 9 //設置同步標誌位,增,刪,獲取操做識別此標誌位,只有獲取鎖才能執行 10 this.m_needSync = true; 11 //獲取全部list的鎖 12 this.AcquireAllLocks(); 13 //等待全部操做執行完成 14 this.WaitAllOperations(); 15 }
五、轉化成數組ui
1 /// <summary> 2 /// 轉化爲數組 3 /// </summary> 4 /// <returns></returns> 5 [__DynamicallyInvokable] 6 public T[] ToArray() 7 { 8 T[] localArray; 9 //沒有數據返回空數組 10 if (this.m_headList == null) 11 { 12 return new T[0]; 13 } 14 bool lockTaken = false; 15 try 16 { 17 //凍結bag 18 this.FreezeBag(ref lockTaken); 19 //轉化成List後直接轉成Array 20 localArray = this.ToList().ToArray(); 21 } 22 finally 23 { 24 this.UnfreezeBag(lockTaken); 25 } 26 return localArray; 27 } 28 29 /// <summary> 30 /// 轉化成list 31 /// </summary> 32 /// <returns></returns> 33 private List<T> ToList() 34 { 35 List<T> list = new List<T>(); 36 //獲取全部list,遍歷生成副本 37 for (ThreadLocalList<T> list2 = this.m_headList; list2 != null; list2 = list2.m_nextList) 38 { 39 for (Node<T> node = list2.m_head; node != null; node = node.m_next) 40 { 41 list.Add(node.m_value); 42 } 43 } 44 return list; 45 }