C#集合之併發集合

  .NET 4 開始,在System.Collection.Concurrent中提供了幾個線程安全的集合類。線程安全的集合可防止多個線程以相互衝突的方式訪問集合。
  爲了對集合進行線程安全的訪問,定義了IProducerConsumerCollection<T>接口。這個接口中最重要的方法是TryAdd()和TryTake()。TryAdd()方法嘗試給集合添加一項,但若是集合禁止添加項,這個操做就可能失敗。TryAdd()方法返回一個布爾值,以說明操做成功仍是失敗。TryTake()一樣,在成功時返回集合中的項。
    *ConcurrentQueue<T>————這個集合類用一種免鎖定的算法實現,使用在內部合併到一個鏈表中的32項數組。該類有Enqueue(),TryDequeue()和TryPeek()方法。由於這個類實現了              IProducerConsumerCollection<T>接口,因此TryAdd()和TryTake()方法僅調用Enqueue和TryDequeue方法。
    *ConcurrentStack<T>————相似於ConcurrentQueue<T>。該類定義了Push(),PushRange(),TryPeek(),TryPop()和TryPopRange()方法。在內部這個類使用其元素的鏈表。
    *ConcurrentBag<T>————該類沒有定義添加或提取項的任何順序。這個類使用一個線程映射到內部使用的數組上的概念,所以嘗試減小鎖定。方法:Add(),TryPeek(),TryTake()。
    *ConcurrentDictionary<TKey,TValue>————這是一個線程安全的鍵值集合。TryAdd(),TryGetValue(),TryRemove()和TryUpdate()方法以非阻塞的方式訪問成員。由於元素基於鍵和值,因此  ConcurrentDictionary<TKey,TValue>沒有實現IProducerConsumerCollection<T>接口。
    *BlockingCollection<T>————這個集合在能夠添加或提取元素以前,會阻塞線程並一直等待。BlockingCollection<T>集合提供了一個接口,以使用Add()和Take()方法來刪除和添加元素。這些方法會阻塞線程。Add()方法有一個重載版本,其中能夠給該重載版本傳遞一個CancellationToken令牌。這個令牌容許取消被阻塞的調用。若是不但願無限的等待下去,且不但願從外部取消調用,就可使用TryAdd()和TryTake()方法,在這些方法中,也能夠指定一個超時值。
  BlockingCollection<T>是對實現了 IProducerConsumerCollection<T>接口的任意類的修飾器,它默認使用ConcurrentQueue<T>類。還能夠給構造函數傳遞任何實現了 IProducerConsumerCollection<T>接口的類。

  下面使用一個例子演示BlockingCollection<T>的使用,一個任務向一個集合寫入,同時另外一個任務從這個集合讀取。算法

    static void Main(string[] args)
        {
          StartPipeline();
          Console.ReadLine();
        }

        private static async void StartPipeline()
        {
            //存儲文件名
          var fileNames = new BlockingCollection<string>();
          //存儲文件的每一行內容
          var lines = new BlockingCollection<string>();
          //存儲每一行的每一個單詞,單詞爲鍵,單詞個數爲值
          var words = new ConcurrentDictionary<string, int>();
          //存儲words信息
          var items = new BlockingCollection<Info>();
          var coloredItems = new BlockingCollection<Info>();
            
          Task t1 = PipelineStages.ReadFilenamesAsync(@"../../..", fileNames);
          ConsoleHelper.WriteLine("started stage 1");
          Task t2 = PipelineStages.LoadContentAsync(fileNames, lines);
          ConsoleHelper.WriteLine("started stage 2");
          Task t3 = PipelineStages.ProcessContentAsync(lines, words);
          await Task.WhenAll(t1, t2, t3);
          ConsoleHelper.WriteLine("stages 1, 2, 3 completed");
        
          //當上面三個任務完成時,才執行下面的任務
          Task t4 = PipelineStages.TransferContentAsync(words, items);
          Task t5 = PipelineStages.AddColorAsync(items, coloredItems);
          Task t6 = PipelineStages.ShowContentAsync(coloredItems);
          ConsoleHelper.WriteLine("stages 4, 5, 6 started");

          await Task.WhenAll(t4, t5, t6);

          ConsoleHelper.WriteLine("all stages finished");
        }
        
        
          public static class PipelineStages
          {
            public static Task ReadFilenamesAsync(string path, BlockingCollection<string> output)
            {
              return Task.Run(() =>
                {
                  foreach (string filename in Directory.EnumerateFiles(path, "*.cs", SearchOption.AllDirectories))
                  {
                    output.Add(filename);
                    ConsoleHelper.WriteLine(string.Format("stage 1: added {0}", filename));
                  }
                  //調用CompleteAdding,通知全部讀取器不該再等待集合中的任何額外項
                    //若是不調用該方法,讀取器會在foreach循環中等待更多的項被添加
                  output.CompleteAdding();
                });
            }

            public static async Task LoadContentAsync(BlockingCollection<string> input, BlockingCollection<string> output)
            {
                //使用讀取器讀取集合時,須要使用GetConsumingEnumerable獲取阻塞集合的枚舉器,
            //若是直接使用input迭代集合,這隻會迭代當前狀態的集合,不會迭代之後添加的項
              foreach (var filename in input.GetConsumingEnumerable())
              {
                using (FileStream stream = File.OpenRead(filename))
                {
                  var reader = new StreamReader(stream);
                  string line = null;
                  while ((line = await reader.ReadLineAsync()) != null)
                  {
                    output.Add(line);
                    ConsoleHelper.WriteLine(string.Format("stage 2: added {0}", line));
                  }
                }
              }
              output.CompleteAdding();
            }

            public static Task ProcessContentAsync(BlockingCollection<string> input, ConcurrentDictionary<string, int> output)
            {
              return Task.Run(() =>
                {
                  foreach (var line in input.GetConsumingEnumerable())
                  {
                    string[] words = line.Split(' ', ';', '\t', '{', '}', '(', ')', ':', ',', '"');
                    foreach (var word in words.Where(w => !string.IsNullOrEmpty(w)))
                    {
                    //這裏使用了字典的一個擴展方法
                      output.AddOrIncrementValue(word);
                      ConsoleHelper.WriteLine(string.Format("stage 3: added {0}", word));
                    }
                  }
                });
            }

            public static Task TransferContentAsync(ConcurrentDictionary<string, int> input, BlockingCollection<Info> output)
            {
              return Task.Run(() =>
                {
                  foreach (var word in input.Keys)
                  {
                    int value;
                    if (input.TryGetValue(word, out value))
                    {
                      var info = new Info { Word = word, Count = value };
                      output.Add(info);
                      ConsoleHelper.WriteLine(string.Format("stage 4: added {0}", info));
                    }
                  }
                  output.CompleteAdding();
                });
            }

            public static Task AddColorAsync(BlockingCollection<Info> input, BlockingCollection<Info> output)
            {
              return Task.Run(() =>
                {
                  foreach (var item in input.GetConsumingEnumerable())
                  {
                    if (item.Count > 40)
                    {
                      item.Color = "Red";
                    }
                    else if (item.Count > 20)
                    {
                      item.Color = "Yellow";
                    }
                    else
                    {
                      item.Color = "Green";
                    }
                    output.Add(item);
                    ConsoleHelper.WriteLine(string.Format("stage 5: added color {1} to {0}", item, item.Color));
                  }
                  output.CompleteAdding();
                });
            }

            public static Task ShowContentAsync(BlockingCollection<Info> input)
            {
              return Task.Run(() =>
                {
                  foreach (var item in input.GetConsumingEnumerable())
                  {
                    ConsoleHelper.WriteLine(string.Format("stage 6: {0}", item), item.Color);
                  }
                });
            }
          }
          
          
          //建立一個字典的擴展方法
         public static class ConcurrentDictionaryExtension
          {
            public static void AddOrIncrementValue(this ConcurrentDictionary<string, int> dict, string key)
            {
              bool success = false;
              while (!success)
              {
                int value;
                if (dict.TryGetValue(key, out value))
                {
                  if (dict.TryUpdate(key, value + 1, value))
                  {
                    success = true;
                  }
                }
                else
                {
                  if (dict.TryAdd(key, 1))
                  {
                    success = true;
                  }
                }
              }
            }
          }

 

這裏使用了一個管道模型的編程模式,上面的添加內容,下面處理內容
   編程

相關文章
相關標籤/搜索