併發集合數組
1 爲何使用併發集合?安全
緣由主要有如下幾點:併發
- System.Collections和System.Collections.Generic名稱空間中所提供的經典列表、集合和數組都不是線程安全的,若無同步機制,他們不適合於接受併發的指令來添加和刪除元素。
- 在併發代碼中使用上述經典集合須要複雜的同步管理,使用起來很不方便。
- 使用複雜的同步機制會大大下降性能。
- NET Framework 4所提供的新的集合儘量地減小須要使用鎖的次數。這些新的集合經過使用比較並交換(compare-and-swap,CAS)指令和內存屏障,避免使用互斥的重量級鎖。這對性能有保障。
注意:函數
與經典集合相比,併發集合會有更大的開銷,所以在串行代碼中使用併發集合無心義,只會增長額外的開銷且運行速度比訪問經典集合慢。性能
2 併發集合spa
1)ConcurrentQueue:線程安全的先進先出 (FIFO) 集合線程
主要方法:3d
- Enqueue(T item);將對象添加到集合結尾。
- TryDequeue(out T result); 嘗試移除並返回位於集合開始處的對象,返回值表示操做是否成功。
- TryPeek(out T result);嘗試返回集合開始處的對象,但不將其移除,返回值表示操做是否成功。
說明:code
- ConcurrentQueue是徹底無鎖的,但當CAS操做失敗且面臨資源爭用時,它可能會自旋而且重試操做。
- ConcurrentQueue是FIFO集合,某些和出入順序無關的場合,儘可能不要用ConcurrentQueue。
2)ConcurrentStack:線程安全的後進先出 (LIFO) 集合orm
主要方法及屬性:
- Push(T item);將對象插入集合的頂部。
- TryPop(out T result);嘗試彈出並返回集合頂部的對象,返回值表示操做是否成功。
- TryPeek(out T result);嘗試返回集合開始處的對象,但不將其移除,返回值表示操做是否成功。
- IsEmpty { get; }指示集合是否爲空。
- PushRange(T[] items);將多個對象插入集合的頂部。
- TryPopRange(T[] items);彈出頂部多個元素,返回結果爲彈出元素個數。
說明:
- 與ConcurrentQueue類似地,ConcurrentStack徹底無鎖的,但當CAS操做失敗且面臨資源爭用時,它可能會自旋而且重試操做。
- 獲取集合是否包含元素使用IsEmpty屬性,而不是經過判斷Count屬性是否大於零。調用Count比調用IsEmpty開銷大。
- 使用PushRange(T[] items)和TryPopRange(T[] items)時注意緩衝引發的額外開銷和額外的內存消耗。
3) ConcurrentBag:元素可重複的無序集合
主要方法及屬性:
- TryPeek(out T result);嘗試從集合返回一個對象,但不移除該對象,返回值表示是否成功得到該對象。
- TryTake(out T result);嘗試從集合返回一個對象並移除該對象,返回值表示是否成功得到該對象。
- Add(T item);將對象添加到集合中。
- IsEmpty { get; }解釋同ConcurrentStack
說明:
- ConcurrentBag爲每個訪問集合的線程維護了一個本地隊列,在可能的狀況下,它會以無鎖的方式訪問本地隊列。
- ConcurrentBag在同一個線程添加和刪除元素的場合下效率很是高。
- 由於ConcurrentBag有時會須要鎖,在生產者線程和消費者線程徹底分開的場景下效率很是低。
- ConcurrentBag調用IsEmpty的開銷很是大,由於這須要臨時得到這個無序組的全部鎖。
4)BlockingCollection:實現
System.Collections.Concurrent.IProducerConsumerCollection<T> 的線程安全集合,提供阻塞和限制功能
主要方法及屬性:
- BlockingCollection(int boundedCapacity);boundedCapacity表示集合限制大小。
- CompleteAdding();將BlockingCollection實例標記爲再也不接受任何添加。
- IsCompleted { get; }此集合是否已標記爲已完成添加而且爲空。
- GetConsumingEnumerable();從集合中移除並返回移除的元素
- Add(T item);添加元素到集合。
- TryTake(T item, int millisecondsTimeout, CancellationToken cancellationToken);
說明:
- 使用BlockingCollection()構造函數實例化BlockingCollection,意味着不設置boundedCapacity,那麼boundedCapacity爲默認值: int.MaxValue。
- 限界:使用BlockingCollection(int boundedCapacity),設置boundedCapacity的值,當集合容量達到這個值得時候,向BlockingCollection添加元素的線程將會被阻塞,直到有元素被刪除。
限界功能可控制內存中集合最大大小,這對於須要處理大量元素的時候很是有用。
- 默認狀況下,BlockingCollection封裝了一個ConcurrentQueue。能夠在構造函數中指定一個實現了IProducerConsumerCollection接口的併發集合,包括:ConcurrentStack、ConcurrentBag。
- 使用此集合包含易於無限制等待的風險,因此使用TryTake更加,由於TryTake提供了超時控制,指定的時間內能夠從集合中移除某個項,則爲 true;不然爲 false。
5)ConcurrentDictionary:可由多個線程同時訪問的鍵值對的線程安全集合。
主要方法
- AddOrUpdate(TKey key, TValue addValue, Func<TKey, TValue, TValue> updateValueFactory);若是指定的鍵尚不存在,則將鍵/值對添加到 字典中;若是指定的鍵已存在,則更新字典中的鍵/值對。
- GetOrAdd(TKey key, TValue value);若是指定的鍵尚不存在,則將鍵/值對添加到字典中。
- TryRemove(TKey key, out TValue value);嘗試從字典中移除並返回具備指定鍵的值。
- TryUpdate(TKey key, TValue newValue, TValue comparisonValue);將指定鍵的現有值與指定值進行比較,若是相等,則用第三個值更新該鍵。
說明:
- ConcurrentDictionary對於讀操做是徹底無鎖的。當多個任務或線程向其中添加元素或修改數據的時候,ConcurrentDictionary使用細粒度的鎖。使用細粒度的鎖只會鎖定真正須要鎖定的部分,而不是整個字典。
6)IProducerConsumerCollection:定義供生產者/消費者用來操做線程安全集合的方法。 此接口提供一個統一的表示(爲生產者/消費者集合),從而更高級別抽象如 System.Collections.Concurrent.BlockingCollection<T>可使用集合做爲基礎的存儲機制。
3.經常使用模式
1)並行的生產者-消費者模式
定義:
生成者和消費者是此模式中的兩類對象模型,消費者依賴於生產者的結果,生產者生成結果的同時,消費者使用結果。
圖1 並行的生產者-消費者模式
說明:
- 併發集合用在此模式下很是合適,由於併發集合支持此模式中對象的並行操做。
- 若不使用併發集合,那麼就要加入同步機制,從而使程序變得比較複雜,難於維護和理解,同時大大下降性能。
- 上圖爲生產者消費者模式示意圖,縱軸爲時間軸,生成者與消費者的並不在一條時間線上,但兩者有交叉,意在代表生成者先產生結果,然後消費者才真正使用了生成者產生的數據。
2)流水線模式
定義:
流水線由多個階段構成,每一個階段由一系列的生產者和消費者構成。通常來說前一個階段是後一個階段的生成者;依靠相鄰兩個階段之間的緩衝區隊列,每一個階段能夠併發執行。
圖2 並行的流水線模式
說明:
- 常使用BlockingCollection<T>做爲緩衝罐區隊列。
- 流水線的速度近似等於流水線最慢階段的速度。
- 上圖爲流水線模式示意圖,前一階段爲後一階段的生成者,這裏展現了最爲簡單和基本的流水線模式,更復雜的模式能夠認爲是每一個階段都包括了對數據更多的處理過程。
4 使用方式
僅以ConcurrentBag和BlockingCollection爲例,其餘的併發集合與之類似。
ConcurrentBag
1 List<string> list = ......
2 ConcurrentBag<string> bags = new ConcurrentBag<string>();
3 Parallel.ForEach(list, (item) =>
4 {
5 //對list中的每一個元素進行處理而後,加入bags中
6 bags.Add(itemAfter);
7 });
BlockingCollection—生產者消費者模式
1 public static void Execute()
2 {
3 //調用Invoke,使得生產者任務和消費者任務並行執行
4 //Producer方法和Customer方法在Invoke中的參數順序任意,不論何種順序都會得到正確的結果
5 Parallel.Invoke(()=>Customer(),()=>Producer());
6 Console.WriteLine(string.Join(",",customerColl));
7 }
8
9 //生產者集合
10 private static BlockingCollection<int> producerColl = new BlockingCollection<int>();
11 //消費者集合
12 private static BlockingCollection<string> customerColl = new BlockingCollection<string>();
13
14 public static void Producer()
15 {
16 //循環將數據加入生成者集合
17 for (int i = 0; i < 100; i++)
18 {
19 producerColl.Add(i);
20 }
21
22 //設置信號,代表不在向生產者集合中加入新數據
23 //能夠設置更加複雜的通知形式,好比數據量達到必定值且其中的數據知足某一條件時就設置完成添加
24 producerColl.CompleteAdding();
25 }
26
27 public static void Customer()
28 {
29 //調用IsCompleted方法,判斷生產者集合是否在添加數據,是否還有未"消費"的數據
30 //注意不要使用IsAddingCompleted,IsAddingCompleted只代表集合標記爲已完成添加,而不能說明其爲空
31 //而IsCompleted爲ture時,那麼IsAddingCompleted爲ture且集合爲空
32 while (!producerColl.IsCompleted)
33 {
34 //調用Take或TryTake "消費"數據,消費一個,移除一個
35 //TryAdd的好處是提供超時機制
36 customerColl.Add(string.Format("消費:{0}", producerColl.Take()));
37 }
38 }
-----------------------------------------------------------------------------------------
轉載與引用請註明出處。
時間倉促,水平有限,若有不當之處,歡迎指正。