做爲多線程和並行計算不得不考慮的問題就是臨界資源的訪問問題,解決臨界資源的訪問一般是加鎖或者是使用信號量,這個你們應該很熟悉了。算法
而集合做爲一種重要的臨界資源,通用性更廣,爲了讓你們更安全的使用它們,微軟爲咱們帶來了強大的並行集合:System.Collections.Concurrent裏面的各位仁兄們。數組
首先,我們從一個經典的問題談起。安全
生產者消費者問題數據結構
這個問題是最爲經典的多線程應用問題,簡單的表述這個問題就是:有一個或多個線程(生產者線程)產生一些數據,同時,還有一個或者多個線程(消費者線程)要取出這些數據並執行一些相應的工做。以下圖所示:多線程
下面就是使用程序去描述這個問題了。函數
最直接的想法多是這樣:spa
static void Main(string[] args) { int count = 0; // 臨界資源區 var queue = new Queue<string>(); // 生產者線程 Task.Factory.StartNew(() => { while (true) { queue.Enqueue("value" + count); count++; } }); // 消費者線程1 Task.Factory.StartNew(() => { while (true) { if (queue.Count > 0) { string value = queue.Dequeue(); Console.WriteLine("Worker 1: " + value); } } }); // 消費者線程2 Task.Factory.StartNew(() => { while (true) { if (queue.Count > 0) { string value = queue.Dequeue(); Console.WriteLine("Worker 2: " + value); } } }); Thread.Sleep(50000); }
使用Queue<string>模擬了一個簡單的資源池,一個生產者放數據,兩個消費者消費數據。上面這個程序運行之後會產生異常,異常的緣由很簡單,當某個時刻,第一個消費者判斷queue.Count > 0爲true時,就會到Queue中取數據,可是這個時候數據可能會被第二個消費者拿走了,由於第二個消費者也判斷出此時有數據可取。這是一個簡單的臨界資源線程安全問題。線程
知道問題了,那麼如何解決呢?
第一種方案是加鎖,這個方案是可行的,不少時候咱們也是這麼作的,包括微軟早期實現線程安全的ArrayList和Hashtable內部(Synchronized方法)也是這麼實現的。這個方案適用於只有少許的消費者,而且每一個消費者都會執行大量操做的時候,這時lock並沒什麼太大問題,可是,若是是大批量短小精悍的消費者存在的話,lock會嚴重影響代碼的執行效率。
第二種方案就是咱們直接用新的線程安全的集合區解決這個問題。新的線程安全的這些集合內部再也不使用lock機制這種比較低效的方式去實現線程安全,而是轉而使用SpinWait和Interlocked等機制,間接實現了線程安全,這種方式的效率要高於使用lock的方式。看一下實現代碼:3d
var queue = new ConcurrentQueue<string>(); Task.Factory.StartNew(() => { while (true) { queue.Enqueue("value" + count); count++; } }); Task.Factory.StartNew(() => { while (true) { string value; if (queue.TryDequeue(out value)) { Console.WriteLine("Worker 1: " + value); } } }); Task.Factory.StartNew(() => { while (true) { string value; if (queue.TryDequeue(out value)) { Console.WriteLine("Worker 2: " + value); } } });
執行這段代碼,能夠工做,可是有點不太優雅,能不能不要去判斷集合是否爲空?集合當本身沒有元素的時候本身Block一下能夠嗎?答案固然是能夠的,使用BlockingCollection便可:code
var blockingCollection = new BlockingCollection<string>(); Task.Factory.StartNew(() => { while (true) { blockingCollection.Add("value" + count); count++; } }); Task.Factory.StartNew(() => { while (true) { Console.WriteLine("Worker 1: " + blockingCollection.Take()); } }); Task.Factory.StartNew(() => { while (true) { Console.WriteLine("Worker 2: " + blockingCollection.Take()); } });
BlockingCollection集合是一個擁有阻塞功能的集合,它就是完成了經典生產者消費者的算法功能。它沒有實現底層的存儲結構,而是使用了實現IProducerConsumerCollection接口的幾個集合做爲底層的數據結構,例如ConcurrentBag, ConcurrentStack或者是ConcurrentQueue。你能夠在構造BlockingCollection實例的時候傳入這個參數,若是不指定的話,則默認使用ConcurrentQueue做爲存儲結構。
而對於生產者來講,只須要經過調用其Add方法放數據,消費者只須要調用Take方法來取數據就能夠了。
固然了上面的消費者代碼中還有一點是讓人不爽的,那就是while語句,能夠更優雅一點嗎?答案仍是確定的:
Task.Factory.StartNew(() => { foreach (string value in blockingCollection.GetConsumingEnumerable()) { Console.WriteLine("Worker 1: " + value); } });
GetConsumingEnumerable()方法是關鍵,這個方法會遍歷集合取出數據,一旦發現集合空了,則阻塞本身,直到集合中又有元素了再開始遍歷,神奇吧。
好了,到此完美瞭解決了生產者消費者問題。然而一般來講,還有兩個問題咱們有時須要去控制:
第一個問題:控制集合中數據的最大數量。
這個問題由BlockingCollection構造函數解決,構造該對象實例的時候,構造函數中的BoundedCapacity決定了集合最大的可容納數據數量,這個比較簡單,很少說了。
第二個問題:什麼時候中止的問題。
這個問題由CompleteAdding和IsCompleted兩個配合解決。
CompleteAdding方法是直接不容許任何元素被加入集合;當使用了CompleteAdding方法後且集合內沒有元素的時候,另外一個屬性IsCompleted此時會爲True,這個屬性能夠用來判斷是否當前集合內的全部元素都被處理完。看一下生產者修改後的代碼:
Task.Factory.StartNew(() => { for (int count = 0; count < 10; count++) { blockingCollection.Add("value" + count); } blockingCollection.CompleteAdding(); });
當使用了CompleteAdding方法後,對象中止往集合中添加數據,這時若是是使用GetConsumingEnumerable枚舉的,那麼這種枚舉會天然結束,不會再Block住集合,這種方式最優雅,也是推薦的寫法。可是若是是使用TryTake訪問元素的,則須要使用IsCompleted判斷一下,由於這個時候使用TryTake會拋InvalidOperationException異常。
看一下最終的代碼形式:
static void Main(string[] args) { var blockingCollection = new BlockingCollection<string>(); var producer = Task.Factory.StartNew(() => { for (int count = 0; count < 10; count++) { blockingCollection.Add("value" + count); Thread.Sleep(300); } blockingCollection.CompleteAdding(); }); var consumer1 = Task.Factory.StartNew(() => { foreach (string value in blockingCollection.GetConsumingEnumerable()) { Console.WriteLine("Worker 1: " + value); } }); var consumer2 = Task.Factory.StartNew(() => { foreach (string value in blockingCollection.GetConsumingEnumerable()) { Console.WriteLine("Worker 2: " + value); } }); Task.WaitAll(producer, consumer1, consumer2); }
BlockingCollection的枚舉
此外,須要注意BlockingCollection有兩種枚舉方法,首先BlockingCollection自己繼承自IEnumerable<T>,因此它本身就能夠被foreach枚舉,首先BlockingCollection包裝了一個線程安全集合,那麼它本身也是線程安全的,而當多個線程在同時修改或訪問線程安全容器時,BlockingCollection本身做爲IEnumerable會返回一個必定時間內的集合片斷,也就是隻會枚舉在那個時間點上內部集合的元素。使用這種方式枚舉的時候,不會有Block效果。
另一種方式就是咱們上面使用的GetConsumingEnumerable方式的枚舉,這種方式會有Block效果,直到CompleteAdding被調用爲止。
最後提一下實現IProducerConsumerCollection接口的幾個集合:ConcurrentBag(線程安全的無序的元素集合), ConcurrentStack(線程安全的堆棧)和ConcurrentQueue(線程安全的隊列)。這些都很簡單,功能與非線程安全的那些集合都同樣,只很少是多了TryXXX方法,多線程環境下使用這些方法就行了,其餘就很少說了。
到今生產者和消費者這個經典的問題告一段落了。
System.Collections.Concurrent下面的集合除了解決生產者消費者問題外,還有一些與多線程相關的集合,例如:
1. ConcurrentDictionary,這個是鍵/值對字典的線程安全實現,這個類在原來的基礎上也添加了一下新的方法,例如:AddOrUpdate,GetOrAdd,TryXXX等等,都很容易理解。
2. 各類Partitioner 類,提供針對數組、列表和可枚舉項的常見分區策略。
若要對數據源操做進行並行化,其中一個必要步驟是將源分區爲可由多個線程同時訪問的多個部分。 PLINQ 和任務並行庫 (TPL) 提供了默認的分區程序,當編寫並行查詢或ForEach循環時,默認的分區程序以透明方式工做。 可是毫無疑問,對於一些複雜的狀況,咱們是能夠插入本身的分區程序的,這就是微軟爲咱們提供的各類Partitioner類,這個很少說了,感興趣的同窗請本身參考一下MSDN。