一直以爲本身對併發瞭解不夠深刻,特別是看了《代碼整潔之道》以爲本身有必要好好學學併發編程,由於性能也是衡量代碼整潔的一大標準。並且在《失控》這本書中也屢次提到併發,不論是計算機仍是生物都併發處理着各類事物。人真是奇怪,當你關注一個事情的時候,你會發現周圍的事物中就常出現那個事情。因此好奇心驅使下學習併發。便有了此文。html
1、理解硬件線程和軟件線程算法
多核處理器帶有一個以上的物理內核--物理內核是真正的獨立處理單元,多個物理內核使得多條指令可以同時並行運行。硬件線程也稱爲邏輯內核,一個物理內核可使用超線程技術提供多個硬件線程。因此一個硬件線程並不表明一個物理內核;Windows中每一個運行的程序都是一個進程,每個進程都會建立並運行一個或多個線程,這些線程稱爲軟件線程。硬件線程就像是一條泳道,而軟件線程就是在其中游泳的人。編程
2、並行場合數組
.Net Framework4 引入了新的Task Parallel Library(任務並行庫,TPL),它支持數據並行、任務並行和流水線。讓開發人員應付不一樣的並行場合。併發
TPL引入了System.Threading.Tasks ,主類是Task,這個類表示一個異步的併發的操做,然而咱們不必定要使用Task類的實例,可使用Parallel靜態類。它提供了Parallel.Invoke, Parallel.For Parallel.Forecah 三個方法。負載均衡
3、Parallel.Invoke異步
試圖讓不少方法並行運行的最簡單的方法就是使用Parallel類的Invoke方法。例若有四個方法:ide
經過下面的代碼就可使用並行。oop
System.Threading.Tasks.Parallel.Invoke(WatchMovie, HaveDinner, ReadBook, WriteBlog);
這段代碼會建立指向每個方法的委託。Invoke方法接受一個Action的參數組。性能
public static void Invoke(params Action[] actions);
用lambda表達式或匿名委託能夠達到一樣的效果。
System.Threading.Tasks.Parallel.Invoke(() => WatchMovie(), () => HaveDinner(), () => ReadBook(), delegate() { WriteBlog(); });
1.沒有特定的執行順序。
Parallel.Invoke方法只有在4個方法所有完成以後纔會返回。它至少須要4個硬件線程才足以讓這4個方法併發運行。但並不保證這4個方法可以同時啓動運行,若是一個或者多個內核處於繁忙狀態,那麼底層的調度邏輯可能會延遲某些方法的初始化執行。
給方法加上延時,就能夠看到必須等待最長的方法執行完成纔回到主方法。
static void Main(string[] args) { System.Threading.Tasks.Parallel.Invoke(WatchMovie, HaveDinner, ReadBook, WriteBlog); Console.WriteLine("執行完成"); Console.ReadKey(); } static void WatchMovie() { Thread.Sleep(5000); Console.WriteLine("看電影"); } static void HaveDinner() { Thread.Sleep(1000); Console.WriteLine("吃晚飯"); } static void ReadBook() { Thread.Sleep(2000); Console.WriteLine("讀書"); } static void WriteBlog() { Thread.Sleep(3000); Console.WriteLine("寫博客"); }
這樣會形成不少邏輯內核處於長時間閒置狀態。
4、Parallel.For
Parallel.For爲固定數目的獨立For循環迭代提供了負載均衡 (即將工做分發到不一樣的任務中執行,這樣全部的任務在大部分時間均可以保持繁忙) 的並行執行。從而能儘量地充分利用全部的可用的內核。
咱們比較下下面兩個方法,一個使用For循環,一個使用Parallel.For 都是生成密鑰在轉換爲十六進制字符串。
private static void GenerateAESKeys() { var sw = Stopwatch.StartNew(); for (int i = 0; i < NUM_AES_KEYS; i++) { var aesM = new AesManaged(); aesM.GenerateKey(); byte[] result = aesM.Key; string hexStr = ConverToHexString(result); } Console.WriteLine("AES:"+sw.Elapsed.ToString()); } private static void ParallelGenerateAESKeys() { var sw = Stopwatch.StartNew(); System.Threading.Tasks.Parallel.For(1, NUM_AES_KEYS + 1, (int i) => { var aesM = new AesManaged(); aesM.GenerateKey(); byte[] result = aesM.Key; string hexStr = ConverToHexString(result); }); Console.WriteLine("Parallel_AES:" + sw.Elapsed.ToString()); }
private static int NUM_AES_KEYS = 100000; static void Main(string[] args) { Console.WriteLine("執行"+NUM_AES_KEYS+"次:"); GenerateAESKeys(); ParallelGenerateAESKeys(); Console.ReadKey(); }
執行1000000次
這裏並行的時間是串行的一半。
5、Parallel.ForEach
在Parallel.For中,有時候對既有循環進行優化可能會是一個很是複雜的任務。Parallel.ForEach爲固定數目的獨立For Each循環迭代提供了負載均衡的並行執行,且支持自定義分區器,讓使用者能夠徹底掌握數據分發。實質就是將全部要處理的數據區分爲多個部分,而後並行運行這些串行循環。
修改上面的代碼:
System.Threading.Tasks.Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS + 1), range => { var aesM = new AesManaged(); Console.WriteLine("AES Range({0},{1} 循環開始時間:{2})",range.Item1,range.Item2,DateTime.Now.TimeOfDay); for (int i = range.Item1; i < range.Item2; i++) { aesM.GenerateKey(); byte[] result = aesM.Key; string hexStr = ConverToHexString(result); } Console.WriteLine("AES:"+sw.Elapsed.ToString()); });
從執行結果能夠看出,分了13個段執行的。
第二次執行仍是13個段。速度上稍微有差別。開始沒有指定分區數,Partitioner.Create使用的是內置默認值。
並且咱們發現這些分區並非同時執行的,大體是分了三個時間段執行。並且執行順序是不一樣的。總的時間和Parallel.For的方法差很少。
public static ParallelLoopResult ForEach<TSource>(Partitioner<TSource> source, Action<TSource> body)
Parallel.ForEach方法定義了source和Body兩個參數。source是指分區器。提供了分解爲多個分區的數據源。body是要調用的委託。它接受每個已定義的分區做爲參數。一共有20多個重載,在上面的例子中,分區的類型爲Tuple<int,int>,是一個二元組類型。此外,返回一個ParallelLoopResult的值。
Partitioner.Create 建立分區是根據邏輯內核數及其餘因素決定。
public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int toExclusive) { int num = 3; if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive"); int rangeSize = (toExclusive - fromInclusive) / (PlatformHelper.ProcessorCount * num); if (rangeSize == 0) rangeSize = 1; return Partitioner.Create<Tuple<int, int>>(Partitioner.CreateRanges(fromInclusive, toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering); }
所以咱們能夠修改分區數目,rangesize大體爲250000左右。也就是說個人邏輯內核是4.
var rangesize = (int) (NUM_AES_KEYS/Environment.ProcessorCount) + 1; System.Threading.Tasks.Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS + 1,rangesize), range =>
再次執行:
分區變成了四個,時間上沒有多大差異(第一個時間是串行時間)。咱們看見這四個分區幾乎是同時執行的。大部分狀況下,TPL在幕後使用的負載均衡機制都是很是高效的,然而對分區的控制便於使用者對本身的工做負載進行分析,來改進總體的性能。
Parallel.ForEach也能對IEnumerable<int>集合進行重構。Enumerable.Range生產了序列化的數目。但這樣就沒有上面的分區效果。
private static void ParallelForEachGenerateMD5HasHes() { var sw = Stopwatch.StartNew(); System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, NUM_AES_KEYS), number => { var md5M = MD5.Create(); byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + number); byte[] result = md5M.ComputeHash(data); string hexString = ConverToHexString(result); }); Console.WriteLine("MD5:"+sw.Elapsed.ToString()); }
6、從循環中退出
和串行運行中的break不一樣,ParallelLoopState 提供了兩個方法用於中止Parallel.For 和 Parallel.ForEach的執行。
修改上面的方法:執行3秒後退出。
private static void ParallelLoopResult(ParallelLoopResult loopResult) { string text; if (loopResult.IsCompleted) { text = "循環完成"; } else { if (loopResult.LowestBreakIteration.HasValue) { text = "Break終止"; } else { text = "Stop 終止"; } } Console.WriteLine(text); } private static void ParallelForEachGenerateMD5HasHesBreak() { var sw = Stopwatch.StartNew(); var loopresult= System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, NUM_AES_KEYS), (int number,ParallelLoopState loopState) => { var md5M = MD5.Create(); byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + number); byte[] result = md5M.ComputeHash(data); string hexString = ConverToHexString(result); if (sw.Elapsed.Seconds > 3) { loopState.Stop(); } }); ParallelLoopResult(loopresult); Console.WriteLine("MD5:" + sw.Elapsed); }
7、捕捉並行循環中發生的異常。
當並行迭代中調用的委託拋出異常,這個異常沒有在委託中被捕獲到時,就會變成一組異常,新的System.AggregateException負責處理這一組異常。
private static void ParallelForEachGenerateMD5HasHesException() { var sw = Stopwatch.StartNew(); var loopresult = new ParallelLoopResult(); try { loopresult = System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, NUM_AES_KEYS), (number, loopState) => { var md5M = MD5.Create(); byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + number); byte[] result = md5M.ComputeHash(data); string hexString = ConverToHexString(result); if (sw.Elapsed.Seconds > 3) { throw new TimeoutException("執行超過三秒"); } }); } catch (AggregateException ex) { foreach (var innerEx in ex.InnerExceptions) { Console.WriteLine(innerEx.ToString()); } } ParallelLoopResult(loopresult); Console.WriteLine("MD5:" + sw.Elapsed); }
結果:
異常出現了好幾回。
8、指定並行度。
TPL的方法總會試圖利用全部可用的邏輯內核來實現最好的結果,但有時候你並不但願在並行循環中使用全部的內核。好比你須要留出一個不參與並行計算的內核,來建立可以響應用戶的應用程序,並且這個內核須要幫助你運行代碼中的其餘部分。這個時候一種好的解決方法就是指定最大並行度。
這須要建立一個ParallelOptions的實例,設置MaxDegreeOfParallelism的值。
private static void ParallelMaxDegree(int maxDegree) { var parallelOptions = new ParallelOptions(); parallelOptions.MaxDegreeOfParallelism = maxDegree; var sw = Stopwatch.StartNew(); System.Threading.Tasks.Parallel.For(1, NUM_AES_KEYS + 1, parallelOptions, (int i) => { var aesM = new AesManaged(); aesM.GenerateKey(); byte[] result = aesM.Key; string hexStr = ConverToHexString(result); }); Console.WriteLine("AES:" + sw.Elapsed.ToString()); }
調用:若是在四核微處理器上運行,那麼將使用3個內核。
ParallelMaxDegree(Environment.ProcessorCount - 1);
時間上大體慢了點(第一次Parallel.For 3.18s),但能夠騰出一個內核來處理其餘的事情。
小結:此次學習了Parallel相關方法以及如何退出並行循環和捕獲異常、設置並行度,還有並行相關的知識。園子裏也有相似的博客。但做爲本身知識的管理,在這裏梳理一遍。
園友的博客:8天玩轉併發
閱讀書籍:《C#並行編程高級教程》
喜歡看書,也喜歡分享書籍(不限技術書籍)的朋友, 誠邀加入書山有路羣q:452450927 。你們推薦的書籍太多,喊你來讀。
人的核心競爭力超過一半來自不緊不慢的事——讀書、鍛鍊身體、與智者交友,以及業餘愛好。