【讀書筆記】.Net並行編程高級教程--Parallel

一直以爲本身對併發瞭解不夠深刻,特別是看了《代碼整潔之道》以爲本身有必要好好學學併發編程,由於性能也是衡量代碼整潔的一大標準。並且在《失控》這本書中也屢次提到併發,不論是計算機仍是生物都併發處理着各類事物。人真是奇怪,當你關注一個事情的時候,你會發現周圍的事物中就常出現那個事情。因此好奇心驅使下學習併發。便有了此文。html

1、理解硬件線程和軟件線程算法

     多核處理器帶有一個以上的物理內核--物理內核是真正的獨立處理單元,多個物理內核使得多條指令可以同時並行運行。硬件線程也稱爲邏輯內核,一個物理內核可使用超線程技術提供多個硬件線程。因此一個硬件線程並不表明一個物理內核;Windows中每一個運行的程序都是一個進程,每個進程都會建立並運行一個或多個線程,這些線程稱爲軟件線程。硬件線程就像是一條泳道,而軟件線程就是在其中游泳的人。編程

2、並行場合數組

    .Net Framework4 引入了新的Task Parallel Library(任務並行庫,TPL),它支持數據並行、任務並行和流水線。讓開發人員應付不一樣的並行場合。併發

  • 數據並行:有大量數據須要處理,而且必須對每一份數據執行一樣的操做。好比經過256bit的密鑰對100個Unicode字符串進行AES算法加密。
  • 任務並行:經過任務併發運行不一樣的操做。例如生成文件散列碼,加密字符串,建立縮略圖。
  • 流水線:這是任務並行和數據並行的結合體。

    TPL引入了System.Threading.Tasks ,主類是Task,這個類表示一個異步的併發的操做,然而咱們不必定要使用Task類的實例,可使用Parallel靜態類。它提供了Parallel.Invoke, Parallel.For Parallel.Forecah 三個方法。負載均衡

3、Parallel.Invoke異步

     試圖讓不少方法並行運行的最簡單的方法就是使用Parallel類的Invoke方法。例若有四個方法:ide

  • WatchMovie
  • HaveDinner
  • ReadBook
  • WriteBlog

    經過下面的代碼就可使用並行。oop

 System.Threading.Tasks.Parallel.Invoke(WatchMovie, HaveDinner, ReadBook, WriteBlog);

  這段代碼會建立指向每個方法的委託。Invoke方法接受一個Action的參數組。性能

public static void Invoke(params Action[] actions);

  用lambda表達式或匿名委託能夠達到一樣的效果。

System.Threading.Tasks.Parallel.Invoke(() => WatchMovie(), () => HaveDinner(), () => ReadBook(), delegate() { WriteBlog(); });

 1.沒有特定的執行順序。

   Parallel.Invoke方法只有在4個方法所有完成以後纔會返回。它至少須要4個硬件線程才足以讓這4個方法併發運行。但並不保證這4個方法可以同時啓動運行,若是一個或者多個內核處於繁忙狀態,那麼底層的調度邏輯可能會延遲某些方法的初始化執行。

    

給方法加上延時,就能夠看到必須等待最長的方法執行完成纔回到主方法。

 static void Main(string[] args)
        {
            System.Threading.Tasks.Parallel.Invoke(WatchMovie, HaveDinner, ReadBook,
                WriteBlog);
            Console.WriteLine("執行完成");
            Console.ReadKey();
        }

        static void WatchMovie()
        {
            Thread.Sleep(5000);
            Console.WriteLine("看電影");
        }
        static void HaveDinner()
        {
            Thread.Sleep(1000);
            Console.WriteLine("吃晚飯");
        }
        static void ReadBook()
        {
            Thread.Sleep(2000);
            Console.WriteLine("讀書");
        }
        static void WriteBlog()
        {
            Thread.Sleep(3000);
            Console.WriteLine("寫博客");
        }
View Code

這樣會形成不少邏輯內核處於長時間閒置狀態。

4、Parallel.For

Parallel.For爲固定數目的獨立For循環迭代提供了負載均衡 (即將工做分發到不一樣的任務中執行,這樣全部的任務在大部分時間均可以保持繁忙) 的並行執行。從而能儘量地充分利用全部的可用的內核。

咱們比較下下面兩個方法,一個使用For循環,一個使用Parallel.For  都是生成密鑰在轉換爲十六進制字符串。

 private static void GenerateAESKeys()
        {
            var sw = Stopwatch.StartNew();
            for (int i = 0; i < NUM_AES_KEYS; i++)
            {
                var aesM = new AesManaged();
                aesM.GenerateKey();
                byte[] result = aesM.Key;
                string hexStr = ConverToHexString(result);
            }
            Console.WriteLine("AES:"+sw.Elapsed.ToString());
        }

 private static void ParallelGenerateAESKeys()
        {
            var sw = Stopwatch.StartNew();
            System.Threading.Tasks.Parallel.For(1, NUM_AES_KEYS + 1, (int i) =>
            {
                var aesM = new AesManaged();
                aesM.GenerateKey();
                byte[] result = aesM.Key;
                string hexStr = ConverToHexString(result);
            });

            Console.WriteLine("Parallel_AES:" + sw.Elapsed.ToString());
        }
  private static int NUM_AES_KEYS = 100000;
        static void Main(string[] args)
        {
            Console.WriteLine("執行"+NUM_AES_KEYS+"次:");
            GenerateAESKeys(); ParallelGenerateAESKeys();
            Console.ReadKey();
        }

執行1000000次

這裏並行的時間是串行的一半。

 

5、Parallel.ForEach

在Parallel.For中,有時候對既有循環進行優化可能會是一個很是複雜的任務。Parallel.ForEach爲固定數目的獨立For Each循環迭代提供了負載均衡的並行執行,且支持自定義分區器,讓使用者能夠徹底掌握數據分發。實質就是將全部要處理的數據區分爲多個部分,而後並行運行這些串行循環。

修改上面的代碼:

  System.Threading.Tasks.Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS + 1), range =>
            {
                var aesM = new AesManaged();
                Console.WriteLine("AES Range({0},{1} 循環開始時間:{2})",range.Item1,range.Item2,DateTime.Now.TimeOfDay);

                for (int i = range.Item1; i < range.Item2; i++)
                {
                    aesM.GenerateKey();
                    byte[] result = aesM.Key;
                    string hexStr = ConverToHexString(result);
                }
                Console.WriteLine("AES:"+sw.Elapsed.ToString());
            });

從執行結果能夠看出,分了13個段執行的。

 

第二次執行仍是13個段。速度上稍微有差別。開始沒有指定分區數,Partitioner.Create使用的是內置默認值。

並且咱們發現這些分區並非同時執行的,大體是分了三個時間段執行。並且執行順序是不一樣的。總的時間和Parallel.For的方法差很少。

 public static ParallelLoopResult ForEach<TSource>(Partitioner<TSource> source, Action<TSource> body)

Parallel.ForEach方法定義了source和Body兩個參數。source是指分區器。提供了分解爲多個分區的數據源。body是要調用的委託。它接受每個已定義的分區做爲參數。一共有20多個重載,在上面的例子中,分區的類型爲Tuple<int,int>,是一個二元組類型。此外,返回一個ParallelLoopResult的值。

Partitioner.Create 建立分區是根據邏輯內核數及其餘因素決定。

 public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int toExclusive)
    {
      int num = 3;
      if (toExclusive <= fromInclusive)
        throw new ArgumentOutOfRangeException("toExclusive");
      int rangeSize = (toExclusive - fromInclusive) / (PlatformHelper.ProcessorCount * num); if (rangeSize == 0)
        rangeSize = 1;
      return Partitioner.Create<Tuple<int, int>>(Partitioner.CreateRanges(fromInclusive, toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering);
    }

所以咱們能夠修改分區數目,rangesize大體爲250000左右。也就是說個人邏輯內核是4.

   var rangesize = (int) (NUM_AES_KEYS/Environment.ProcessorCount) + 1;
   System.Threading.Tasks.Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS + 1,rangesize), range =>

再次執行:

分區變成了四個,時間上沒有多大差異(第一個時間是串行時間)。咱們看見這四個分區幾乎是同時執行的。大部分狀況下,TPL在幕後使用的負載均衡機制都是很是高效的,然而對分區的控制便於使用者對本身的工做負載進行分析,來改進總體的性能。

Parallel.ForEach也能對IEnumerable<int>集合進行重構。Enumerable.Range生產了序列化的數目。但這樣就沒有上面的分區效果。

 private static void ParallelForEachGenerateMD5HasHes()
        {
            var sw = Stopwatch.StartNew();
            System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, NUM_AES_KEYS), number =>
            {
                var md5M = MD5.Create();
                byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + number);
                byte[] result = md5M.ComputeHash(data);
                string hexString = ConverToHexString(result);
            });
            Console.WriteLine("MD5:"+sw.Elapsed.ToString());
        }

 

6、從循環中退出

和串行運行中的break不一樣,ParallelLoopState 提供了兩個方法用於中止Parallel.For 和 Parallel.ForEach的執行。

  • Break:讓循環在執行了當前迭代後儘快中止執行。好比執行到100了,那麼循環會處理掉全部小於100的迭代。
  • Stop:讓循環儘快中止執行。若是執行到了100的迭代,那不能保證處理完全部小於100的迭代。

修改上面的方法:執行3秒後退出。

  private static void ParallelLoopResult(ParallelLoopResult loopResult)
        {
            string text;
            if (loopResult.IsCompleted)
            {
                text = "循環完成";
            }
            else
            {
                if (loopResult.LowestBreakIteration.HasValue)
                {
                    text = "Break終止";
                }
                else
                {
                    text = "Stop 終止";
                }
            }
            Console.WriteLine(text);
        }


        private static void ParallelForEachGenerateMD5HasHesBreak()
        {
            var sw = Stopwatch.StartNew();
            var loopresult= System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, NUM_AES_KEYS), (int number,ParallelLoopState loopState) =>
            {
                var md5M = MD5.Create();
                byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + number);
                byte[] result = md5M.ComputeHash(data);
                string hexString = ConverToHexString(result);
                if (sw.Elapsed.Seconds > 3)
                {
                   loopState.Stop();
                }
            });
            ParallelLoopResult(loopresult);
            Console.WriteLine("MD5:" + sw.Elapsed);
        }

 

7、捕捉並行循環中發生的異常。

  當並行迭代中調用的委託拋出異常,這個異常沒有在委託中被捕獲到時,就會變成一組異常,新的System.AggregateException負責處理這一組異常。

 private static void ParallelForEachGenerateMD5HasHesException()
        {
            var sw = Stopwatch.StartNew();
            var loopresult = new ParallelLoopResult();
            try
            {
                loopresult = System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, NUM_AES_KEYS), (number, loopState) =>
                {
                    var md5M = MD5.Create();
                    byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + number);
                    byte[] result = md5M.ComputeHash(data);
                    string hexString = ConverToHexString(result);
                    if (sw.Elapsed.Seconds > 3)
                    {
                        throw new TimeoutException("執行超過三秒");
                    }
                });
            }
            catch (AggregateException ex)
            {
                foreach (var innerEx in  ex.InnerExceptions)
                {
                    Console.WriteLine(innerEx.ToString());
                }
            }
           
            ParallelLoopResult(loopresult);
            Console.WriteLine("MD5:" + sw.Elapsed);
        }

結果:

 異常出現了好幾回。

 8、指定並行度。

TPL的方法總會試圖利用全部可用的邏輯內核來實現最好的結果,但有時候你並不但願在並行循環中使用全部的內核。好比你須要留出一個不參與並行計算的內核,來建立可以響應用戶的應用程序,並且這個內核須要幫助你運行代碼中的其餘部分。這個時候一種好的解決方法就是指定最大並行度。

這須要建立一個ParallelOptions的實例,設置MaxDegreeOfParallelism的值。

 private static void ParallelMaxDegree(int maxDegree)
        {
            var parallelOptions = new ParallelOptions();
            parallelOptions.MaxDegreeOfParallelism = maxDegree;

            var sw = Stopwatch.StartNew();
            System.Threading.Tasks.Parallel.For(1, NUM_AES_KEYS + 1, parallelOptions, (int i) =>
            {
                var aesM = new AesManaged();
                aesM.GenerateKey();
                byte[] result = aesM.Key;
                string hexStr = ConverToHexString(result);
            });
            Console.WriteLine("AES:" + sw.Elapsed.ToString());
        }

調用:若是在四核微處理器上運行,那麼將使用3個內核。

 ParallelMaxDegree(Environment.ProcessorCount - 1);

時間上大體慢了點(第一次Parallel.For 3.18s),但能夠騰出一個內核來處理其餘的事情。

 

小結:此次學習了Parallel相關方法以及如何退出並行循環和捕獲異常、設置並行度,還有並行相關的知識。園子裏也有相似的博客。但做爲本身知識的管理,在這裏梳理一遍。

園友的博客:8天玩轉併發 

閱讀書籍:《C#並行編程高級教程 

C#並行編程高級教程

 

喜歡看書,也喜歡分享書籍(不限技術書籍)的朋友,  誠邀加入書山有路羣q:452450927 。你們推薦的書籍太多,喊你來讀。

        

     人的核心競爭力超過一半來自不緊不慢的事——讀書、鍛鍊身體、與智者交友,以及業餘愛好。

相關文章
相關標籤/搜索