【讀書筆記】.Net並行編程(三)---並行集合

     爲了讓共享的數組,集合可以被多線程更新,咱們如今(.net4.0以後)可使用併發集合來實現這個功能。而System.Collections和System.Collections.Generic命名空間中所提供的經典列表,集合和數組都不是線程安全的,若是要使用,還須要添加代碼來同步。編程

    先看一個例子,經過並行循環向一個List<string>集合添加元素。由於List不是線程安全的,因此必須對Add方法加鎖來串行化。api

    任務開始:數組

   private static int NUM_AES_KEYS =80000;
        static void Main(string[] args)
        {
            Console.WriteLine("任務開始...");
            var sw = Stopwatch.StartNew();
            for (int i = 0; i < 4; i++)
            {
                ParallelGennerateMD5Keys();
                Console.WriteLine(_keyList.Count);
            }
            Console.WriteLine("結束時間:" + sw.Elapsed);

            Console.ReadKey();
        }
View Code
        private static List<string> _keyList;

        private static void ParallelGennerateMD5Keys()
        {
            _keyList=new List<string>(NUM_AES_KEYS);
            Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS + 1), range =>
            {
                var md5M = MD5.Create();
                for (int i = range.Item1; i < range.Item2; i++)
                {
                    byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + i);
                    byte[] result = md5M.ComputeHash(data);
                    string hexString = ConverToHexString(result);
                    lock (_keyList)
                    {
                        _keyList.Add(hexString);
                    }
                }
            });
        }

但若是咱們去掉lock,獲得的結果以下: 安全

沒有一次是滿80000的。lock關鍵字建立了一個臨界代碼區,當一個任務進入以後,其餘任務會被阻塞並等待進入。lock關鍵字引入了必定的開銷,並且會下降可擴展性。對於這個問題,.Net4.0提供了System.Collections.Concurrent命名空間用於解決線程安全問題,它包含了5個集合:ConcurrentQueue<T>,ConcurrentStack<T>,ConcurrentBag<T>,BlockingCollection<T>,ConcurrentDictionary<TKey,TValue>。這些集合都在某種程度上使用了無鎖技術,性能獲得了提高。數據結構

ConcurrentQueue多線程

一個FIFO(先進先出)的集合。支持多任務進併發行入隊和出隊操做。 併發

 ConcurrentQueue是徹底無鎖的,它是System.Collections.Queue的併發版本。提供三個主要的方法:dom

  •  Enqueue--將元素加入到隊列尾部。
  • TryDequeue--嘗試刪除隊列頭部元素。並將元素經過out參數返回。返回值爲bool型,表示是否執行成功。
  • TryPeek--嘗試將隊列頭部元素經過out參數返回,但不會刪除這個元素。返回值bool型,表示操做是否成功。

 修改上面的代碼:ide

        private static ConcurrentQueue<string> _keyQueue;
        private static void ParallelGennerateMD5Keys()
        {
            _keyQueue = new ConcurrentQueue<string>();
            Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS + 1), range =>
            {
                var md5M = MD5.Create();
                for (int i = range.Item1; i < range.Item2; i++)
                {
                    byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + i);
                    byte[] result = md5M.ComputeHash(data);
                    string hexString = ConverToHexString(result);
                    _keyQueue.Enqueue(hexString);
                }
            });
        }

結果以下:函數

能夠看見,它的使用很簡單,不用擔憂同步問題。接下咱們經過生產者-消費者模式,對上面的問題進行改造,分解成兩個任務。使用兩個共享的ConcurrentQueue實例。_byteArraysQueue 和 _keyQueue ,ParallelGennerateMD5Keys 方法生產byte[],ConverKeysToHex方法去消費併產生key。

        private static ConcurrentQueue<string> _keyQueue;
        private static ConcurrentQueue<byte[]> _byteArraysQueue; 
        private static void ParallelGennerateMD5Keys(int maxDegree)
        {
            var parallelOptions = new ParallelOptions{MaxDegreeOfParallelism = maxDegree};
            var sw = Stopwatch.StartNew();
            _keyQueue = new ConcurrentQueue<string>();
            Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS + 1),parallelOptions, range =>
            {
                var md5M = MD5.Create();
                for (int i = range.Item1; i < range.Item2; i++)
                {
                    byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + i);
                    byte[] result = md5M.ComputeHash(data);
                    _byteArraysQueue.Enqueue(result);
                }
            });
            Console.WriteLine("MD5結束時間:" + sw.Elapsed);
        }

        private static void ConverKeysToHex(Task taskProducer)
        {
            var sw = Stopwatch.StartNew();
            while (taskProducer.Status == TaskStatus.Running || taskProducer.Status == TaskStatus.WaitingToRun || _byteArraysQueue.Count > 0)
            {
                byte[] result;
                if (_byteArraysQueue.TryDequeue(out result))
                {
                    string hexString = ConverToHexString(result);
                    _keyQueue.Enqueue(hexString);
                }
            }
            Console.WriteLine("key結束時間:" + sw.Elapsed);
        }

此次我修改了執行次數爲180000

         private static int NUM_AES_KEYS =180000;
        static void Main(string[] args)
        {
            Console.WriteLine("任務開始...");
            var sw = Stopwatch.StartNew();
            _byteArraysQueue=new ConcurrentQueue<byte[]>();
            _keyQueue=new ConcurrentQueue<string>();

//生產key 和 消費key的兩個任務
var taskKeys = Task.Factory.StartNew(()=>ParallelGennerateMD5Keys(Environment.ProcessorCount - 1)); var taskHexString = Task.Factory.StartNew(()=>ConverKeysToHex(taskKeys)); string lastKey;
//隔半秒去看一次。
while (taskHexString.Status == TaskStatus.Running || taskHexString.Status == TaskStatus.WaitingToRun) { Console.WriteLine("_keyqueue的個數是{0},_byteArraysQueue的個數是{1}", _keyQueue.Count,_byteArraysQueue.Count); if (_keyQueue.TryPeek(out lastKey)) { // Console.WriteLine("第一個Key是{0}",lastKey); } Thread.Sleep(500); } //等待兩個任務結束 Task.WaitAll(taskKeys, taskHexString); Console.WriteLine("結束時間:" + sw.Elapsed); Console.WriteLine("key的總數是{0}" , _keyQueue.Count); Console.ReadKey(); }

從結果能夠發現,_bytaArraysQueue裏面的byte[] 幾乎是生產一個,就被消費一個。

理解生產者和消費者

 使用ConcurrentQueue能夠很容易的實現並行的生產者-消費者模式或多階段的線性流水線。以下:

  

 咱們能夠改造上面的main方法,讓一半的線程用於生產,一半的線程用於消費。

  static void Main(string[] args)
        {
            Console.WriteLine("任務開始...");
            var sw = Stopwatch.StartNew();
            _byteArraysQueue=new ConcurrentQueue<byte[]>();
            _keyQueue=new ConcurrentQueue<string>();

            var taskKeyMax = Environment.ProcessorCount/2;

            var taskKeys = Task.Factory.StartNew(() => ParallelGennerateMD5Keys(taskKeyMax));

            var taskHexMax = Environment.ProcessorCount - taskKeyMax;
            var taskHexStrings=new Task[taskHexMax];
            for (int i = 0; i < taskHexMax; i++) { taskHexStrings[i] = Task.Factory.StartNew(() => ConverKeysToHex(taskKeys)); }

            Task.WaitAll(taskHexStrings);

            Console.WriteLine("結束時間:" + sw.Elapsed);
            Console.WriteLine("key的總數是{0}" , _keyQueue.Count);
            Console.ReadKey();
        }

而這些消費者的結果又能夠繼續做爲生產者,繼續串聯下去。

ConcurrentStack

一個LIFO(後進先出)的集合,支持多任務併發進行壓入和彈出操做。它是徹底無鎖的。是System.Collections.Stack的併發版本。

它和ConcurrentQueue很是類似,區別在於使用了不一樣的方法名,更好的表示一個棧。ConcurrentStack主要提供了下面五個重要方法。

  • Push:將元素添加到棧頂。
  • TryPop:嘗試刪除棧頂部的元素,並經過out返回。返回值爲bool,表示操做是否成功。
  • TryPeek:嘗試經過out返回棧頂部的元素,返回值爲bool,表示是否成功。
  • PushRange:一次將多個元素插入棧頂。
  • TryPopRange:一次將多個元素從棧頂移除。

 爲了判斷棧是否包含任意項,可使用IsEmpty屬性判斷。

if(!_byteArraysStack.IsEmpty)

而使用Count方法,開銷相對較大。另外咱們能夠將不安全的集合或數組轉化爲併發集合。下例將數組做爲參數傳入。操做上和List同樣。

   private static string[] _HexValues = {"AF", "BD", "CF", "DF", "DA", "FE", "FF", "FA"};
        static void Main(string[] args)
        {
            var invalidHexStack = new ConcurrentStack<string>(_HexValues);

            while (!invalidHexStack.IsEmpty)
            {
                string value;
                invalidHexStack.TryPop(out value);
                Console.WriteLine(value);
            }
        }

反之,能夠用CopyTo和ToArray方法將併發集合建立一個不安全集合。

ConcurrentBag

一個無序對象集合,在同一個線程添加元素(生產)和刪除元素(消費)的場合下效率特別高,ConcurrentBag最大程度上減小了同步的需求以及同步帶來的開銷。然而它在生產線程和消費線程徹底分開的狀況下,效率低下。  

它提供了3個重要方法

  • Add--添加元素到無序組
  • TryTake--嘗試從無序組中刪除一個元素,out返回。返回值bool 表示操做是否成功。
  • TryPeek--嘗試經過out返回一個參數。返回值bool 表示操做是否成功。

下面的實例中Main方法經過Parallel.Invoke併發的加載三個方法。有多個生產者和消費者。對應三個ConcurrentBag<string>:_sentencesBag,_capWrodsInSentenceBag和_finalSentencesBag。

  • ProduceSentences 隨機生產句子 (消費者)
  • CapitalizeWordsInSentence  改造句子 (消費者/生產者)
  • RemoveLettersInSentence  刪除句子 (消費者)
    static void Main(string[] args)
        {
            Console.WriteLine("任務開始...");
            var sw = Stopwatch.StartNew();

            _sentencesBag=new ConcurrentBag<string>();
            _capWrodsInSentenceBag=new ConcurrentBag<string>();
            _finalSentencesBag=new ConcurrentBag<string>();
            
            _producingSentences = true;
           
            Parallel.Invoke(ProduceSentences,CapitalizeWordsInSentence,RemoveLettersInSentence);

            Console.WriteLine("_sentencesBag的總數是{0}", _sentencesBag.Count);
            Console.WriteLine("_capWrodsInSentenceBag的總數是{0}", _capWrodsInSentenceBag.Count);
            Console.WriteLine("_finalSentencesBag的總數是{0}", _finalSentencesBag.Count);
            Console.WriteLine("總時間:{0}",sw.Elapsed);
            Console.ReadKey();
        }
 private static ConcurrentBag<string> _sentencesBag;
        private static ConcurrentBag<string> _capWrodsInSentenceBag;
        private static ConcurrentBag<string> _finalSentencesBag;

        private static volatile bool _producingSentences = false;
        private static volatile bool _capitalWords = false;

        private static void ProduceSentences()
        {
            string[] rawSentences =
            {
                "併發集合你可知",
                "ConcurrentBag 你值得擁有",
                "stoneniqiu",
                "博客園",
                ".Net併發編程學習",
                "Reading for you",
                "ConcurrentBag 是個無序集合"
            };
            try
            {
                Console.WriteLine("ProduceSentences...");
                _sentencesBag = new ConcurrentBag<string>();
                var random = new Random();
                for (int i = 0; i < NUM_AES_KEYS; i++)
                {
                    var sb = new StringBuilder();
                    sb.Append(rawSentences[random.Next(rawSentences.Length)]);
                    sb.Append(' ');
                    _sentencesBag.Add(sb.ToString());
                }

            }
            finally
            {
                _producingSentences = false;
            }
        }

        private static void CapitalizeWordsInSentence()
        {
            SpinWait.SpinUntil(() => _producingSentences);

            try
            {
                Console.WriteLine("CapitalizeWordsInSentence...");
                _capitalWords = true;
                while ((!_sentencesBag.IsEmpty)||_producingSentences)
                {
                    string sentence;
                    if (_sentencesBag.TryTake(out sentence))
                    {
                        _capWrodsInSentenceBag.Add(sentence.ToUpper()+"stoneniqiu");
                    }
                }
            }
            finally 
            {
                _capitalWords = false;
            }
        }

        private static void RemoveLettersInSentence()
        {
            SpinWait.SpinUntil(() => _capitalWords);
            Console.WriteLine("RemoveLettersInSentence...");
            while (!_capWrodsInSentenceBag.IsEmpty || _capitalWords)
            {
                string sentence;
                if (_capWrodsInSentenceBag.TryTake(out sentence))
                {
                    _finalSentencesBag.Add(sentence.Replace("stonenqiu",""));
                }
            }
        }
View Code

在CapitalizeWordsInSentence 方法中,使用SpinUntil方法並傳入共享bool變量_producingSentences,當其爲true的時候,SpinUnit方法會中止自旋。但協調多個生產者和消費者自旋並不是最好的解決方案,咱們可使用BlockingCollection(下面會講)來提高性能。

 SpinWait.SpinUntil(() => _producingSentences);

另外兩個用做標誌的共享bool變量在聲明的時候使用了volatile關鍵字。這樣能夠確保在不一樣的線程中進行訪問的時候,能夠獲得這些變量的最新值。

   private static volatile bool _producingSentences = false;
   private static volatile bool _capitalWords = false;

 

BlockingCollection

與經典的阻塞隊列數據結構相似,適用於多個任務添加和刪除數據的生產者-消費者的情形。提供了阻塞和界限的能力。

 BlockingCollection是對IProducerConsumerCollection<T>實例的一個包裝。而這個接口繼承於ICollection,IEnumerable<T>。前面的併發集合都繼承了這個接口。所以這些集合均可以封裝在BlockingCollection中。

將上面的例子換成BlockingCollection

 static void Main(string[] args)
        {
            Console.WriteLine("任務開始...");
            var sw = Stopwatch.StartNew();

            _sentencesBC = new BlockingCollection<string>(NUM_SENTENCE);
            _capWrodsInSentenceBC = new BlockingCollection<string>(NUM_SENTENCE);
            _finalSentencesBC = new BlockingCollection<string>(NUM_SENTENCE);
            
            Parallel.Invoke(ProduceSentences,CapitalizeWordsInSentence,RemoveLettersInSentence);

            Console.WriteLine("_sentencesBag的總數是{0}", _sentencesBC.Count);
            Console.WriteLine("_capWrodsInSentenceBag的總數是{0}", _capWrodsInSentenceBC.Count);
            Console.WriteLine("_finalSentencesBag的總數是{0}", _finalSentencesBC.Count);
            Console.WriteLine("總時間:{0}",sw.Elapsed);
            Console.ReadKey();
        }


        private static int NUM_SENTENCE = 2000000;
        private static BlockingCollection<string> _sentencesBC;
        private static BlockingCollection<string> _capWrodsInSentenceBC;
        private static BlockingCollection<string> _finalSentencesBC;

        private static volatile bool _producingSentences = false;
        private static volatile bool _capitalWords = false;

        private static void ProduceSentences()
        {
            string[] rawSentences =
            {
                "併發集合你可知",
                "ConcurrentBag 你值得擁有",
                "stoneniqiu",
                "博客園",
                ".Net併發編程學習",
                "Reading for you",
                "ConcurrentBag 是個無序集合"
            };
       
                Console.WriteLine("ProduceSentences...");
                _sentencesBC = new BlockingCollection<string>();
                var random = new Random();
                for (int i = 0; i < NUM_SENTENCE; i++)
                {
                    var sb = new StringBuilder();
                    sb.Append(rawSentences[random.Next(rawSentences.Length)]);
                    sb.Append(' ');
                    _sentencesBC.Add(sb.ToString());
                }
                //讓消費者知道,生產過程已經完成
               _sentencesBC.CompleteAdding();
            
        }

        private static void CapitalizeWordsInSentence()
        {
            Console.WriteLine("CapitalizeWordsInSentence...");
            //生產者是否完成
            while (!_sentencesBC.IsCompleted)
            {
                string sentence;
                if (_sentencesBC.TryTake(out sentence))
                {
                    _capWrodsInSentenceBC.Add(sentence.ToUpper() + "stoneniqiu");
                }
            }
            //讓消費者知道,生產過程已經完成
            _capWrodsInSentenceBC.CompleteAdding();
        }

        private static void RemoveLettersInSentence()
        {
            //SpinWait.SpinUntil(() => _capitalWords);
            Console.WriteLine("RemoveLettersInSentence...");
            while (!_capWrodsInSentenceBC.IsCompleted)
            {
                string sentence;
                if (_capWrodsInSentenceBC.TryTake(out sentence))
                {
                    _finalSentencesBC.Add(sentence.Replace("stonenqiu",""));
                }
            }
        }
View Code

無需再使用共享的bool變量來同步。在操做結束後,調用CompeteAdding方法來告之下游的消費者。這個時候IsAddingComplete屬性爲true。

_sentencesBC.CompleteAdding();

而在生產者中也無需使用自旋了。能夠判斷IsCompleted屬性。而當IsAddingComplete屬性爲true且集合爲空的時候,IsCompleted才爲true。這個時候就表示,生產者的元素已經被使用完了。這樣代碼也更簡潔了。

  while (!_sentencesBC.IsCompleted)

最後的結果要比使用ConcurrentBag快了0.8秒。一共是200w條數據,處理三次。

ConcurrentDictionary

與經典字典相似,提供了併發的鍵值訪問。它對讀操做是徹底無鎖的,在添加和修改的時候使用了細粒度的鎖。是IDictionary的併發版本。

它提供最重要方法以下:

  • AddOrUpdate--若是鍵不存在就添加一個鍵值對。若是鍵已經存在,就更新鍵值對。可使用函數來生成或者更新鍵值對。須要在委託內添加同步代碼來確保線程安全。
  • GetEnumerator--返回遍歷整個ConcurrentDictionary的枚舉器,並且是線程安全的。
  • GetOrAdd--若是鍵不存在就添加一個新鍵值對,若是存在就返回這個鍵如今的值,而不添加新值。
  • TryAdd
  • TryOrGetVaule
  • TryRemove
  • TryUpdate

下面的例子建立一個ConcurrentDictionary,而後不斷的更新。lock關鍵字確保一次只有一個線程運行Update方法。

 static void Main(string[] args)
        {
            Console.WriteLine("任務開始...");
            var sw = Stopwatch.StartNew();

            rectangInfoDic=new ConcurrentDictionary<string, RectangInfo>();
            GenerateRectangles();
            foreach (var keyValue in rectangInfoDic)
            {
                Console.WriteLine("{0},{1},更新次數{2}",keyValue.Key,keyValue.Value.Size,keyValue.Value.UpdateTimes);
            }

            Console.WriteLine("總時間:{0}",sw.Elapsed);
            Console.ReadKey();
        }

        private static ConcurrentDictionary<string, RectangInfo> rectangInfoDic;
        private const int MAX_RECTANGLES = 2000;
        private static void GenerateRectangles()
        {
            Parallel.For(1, MAX_RECTANGLES + 1, (i) =>
            {
                for (int j = 0; j < 50; j++)
                {
                    var newkey = string.Format("Rectangle{0}", i%5000);
                    var rect = new RectangInfo(newkey, i, j);
                    rectangInfoDic.AddOrUpdate(newkey, rect, (key, existRect) =>
                    {
                        if (existRect != rect)
                        {
                            lock (existRect)
                            {
                                existRect.Update(rect.X,rect.Y);
                            }
                            return existRect;
                        }
                        return existRect;
                    });
                }

            });

        }

Rectangle: 

    public class RectangInfo:IEqualityComparer<RectangInfo>
        {
            public string Name { get; set; }
            public int X { get; set; }
            public int Y { get; set; }
            public int UpdateTimes { get; set; }

            public int Size
            {
                get { return X*Y; }
            }

            public DateTime LastUpdate { get; set; }

            public RectangInfo(string name,int x,int y)
            {
                Name = name;
                X = x;
                Y = y;
                LastUpdate = DateTime.Now;
            }

            public RectangInfo(string key) : this(key, 0, 0)
            {
            }

            public void Update(int x,int y)
            {
                X = x;
                Y = y;
                UpdateTimes++;
            }


            public bool Equals(RectangInfo x, RectangInfo y)
            {
                return (x.Name == y.Name && x.Size == y.Size);
            }

            public int GetHashCode(RectangInfo obj)
            {
                return obj.Name.GetHashCode();
            }
        }
View Code

 

本章學習了五種併發集合,熟悉了生產者-消費者的併發模型,咱們可使用併發集合來設計並優化流水線。但願本文對你有幫助。

閱讀書籍:《C#並行編程高級教程 。

 C#並行編程高級教程

相關文章
相關標籤/搜索