目錄
-
C#並行編程-相關概念html
背景ide
經過LINQ能夠方便的查詢並處理不一樣的數據源,使用Parallel LINQ (PLINQ)來充分得到並行化所帶來的優點。函數
PLINQ不只實現了完整的LINQ操做符,並且還添加了一些用於執行並行的操做符,與對應的LINQ相比,經過PLINQ能夠得到明顯的加速,可是具體的加速效果還要取決於具體的場景,不過在並行化的狀況下一段會加速。post
若是一個查詢涉及到大量的計算和內存密集型操做,並且順序並不重要,那麼加速會很是明顯,然而,若是順序很重要,那麼加速就會受到影響。性能
AsParallel() 啓用查詢的並行化
下面貼代碼,看下效果,詳情見註釋:
class MRESDemo { /*code:釋迦苦僧*/ static void Main() { ConcurrentQueue<Product> products = new ConcurrentQueue<Product>(); /*向集合中添加多條數據 能夠修改數據量查看Linq和Plinq的性能*/ Parallel.For(0, 600000, (num) => { products.Enqueue(new Product() { Category = "Category" + num, Name = "Name" + num, SellPrice = num }); }); /*採用LINQ查詢符合條件的數據*/ Stopwatch sw = new Stopwatch(); sw.Restart(); var productListLinq = from product in products where (product.Name.Contains("1") && product.Name.Contains("2") && product.Category.Contains("1") && product.Category.Contains("2")) select product; Console.WriteLine("採用Linq 查詢得出數量爲:{0}", productListLinq.Count()); sw.Stop(); Console.WriteLine("採用Linq 耗時:{0}", sw.ElapsedMilliseconds); /*採用PLINQ查詢符合條件的數據*/ sw.Restart(); var productListPLinq = from product in products.AsParallel() /*AsParallel 試圖利用運行時全部可用的邏輯內核,從而使運行的速度比串行的版本要快 可是須要注意開銷所帶來的性能損耗*/ where (product.Name.Contains("1") && product.Name.Contains("2") && product.Category.Contains("1") && product.Category.Contains("2")) select product; Console.WriteLine("採用PLinq 查詢得出數量爲:{0}", productListPLinq.Count()); sw.Stop(); Console.WriteLine("採用PLinq 耗時:{0}", sw.ElapsedMilliseconds); Console.ReadLine(); } } class Product { public string Name { get; set; } public string Category { get; set; } public int SellPrice { get; set; } }
當前模擬的數據量比較少,數據量越多,採用並行化查詢的效果越明顯
AsOrdered()與orderby
AsOrdered:保留查詢的結果按源序列排序,在並行查詢中,多條數據會被分在多個區域中進行查詢,查詢後再將多個區的數據結果合併到一個結果集中並按源序列順序返回。
orderby:將返回的結果集按指定順序進行排序
下面貼代碼方便你們理解:
class MRESDemo { /*code:釋迦苦僧*/ static void Main() { ConcurrentQueue<string> products = new ConcurrentQueue<string>(); products.Enqueue("E"); products.Enqueue("F"); products.Enqueue("B"); products.Enqueue("G"); products.Enqueue("A"); products.Enqueue("C"); products.Enqueue("SS"); products.Enqueue("D"); /*不採用並行化 其數據輸出結果 不作任何處理 */ var productListLinq = from product in products where (product.Length == 1) select product; string appendStr = string.Empty; foreach (string str in productListLinq) { appendStr += str + " "; } Console.WriteLine("不採用並行化 輸出:{0}", appendStr); /*不採用任何排序策略 其數據輸出結果 是直接將分區數據結果合併起來 不作任何處理 */ var productListPLinq = from product in products.AsParallel() where (product.Length == 1) select product; appendStr = string.Empty; foreach (string str in productListPLinq) { appendStr += str + " "; } Console.WriteLine("不採用AsOrdered 輸出:{0}", appendStr); /*採用 AsOrdered 排序策略 其數據輸出結果 是直接將分區數據結果合併起來 並按原始數據順序排序*/ var productListPLinq1 = from product in products.AsParallel().AsOrdered() where (product.Length == 1) select product; appendStr = string.Empty; foreach (string str in productListPLinq1) { appendStr += str + " "; } Console.WriteLine("採用AsOrdered 輸出:{0}", appendStr); /*採用 orderby 排序策略 其數據輸出結果 是直接將分區數據結果合併起來 並按orderby要求進行排序*/ var productListPLinq2 = from product in products.AsParallel() where (product.Length == 1) orderby product select product; appendStr = string.Empty; foreach (string str in productListPLinq2) { appendStr += str + " "; } Console.WriteLine("採用orderby 輸出:{0}", appendStr); Console.ReadLine(); } }
在PLINQ查詢中,AsOrdered()和orderby子句都會下降運行速度,因此若是順序並非必須的,那麼在請求特定順序的結果以前,將加速效果與串行執行的性能進行比較是很是重要的。
指定執行模式 WithExecutionMode
對串行化代碼進行並行化,會帶來必定的額外開銷,Plinq查詢執行並行化也是如此,在默認狀況下,執行PLINQ查詢的時候,.NET機制會盡可能避免高開銷的並行化算法,這些算法有可能會將執行的性能下降到地獄串行執行的性能。
.NET會根據查詢的形態作出決策,並不開了數據集大小和委託執行的時間,不過也能夠強制並行執行,而不用考慮執行引擎分析的結果,能夠調用WithExecutionMode方法來進行設置。、
下面貼代碼,方便你們理解
class MRESDemo { /*code:釋迦苦僧*/ static void Main() { ConcurrentQueue<Product> products = new ConcurrentQueue<Product>(); /*向集合中添加多條數據*/ Parallel.For(0, 6000000, (num) => { products.Enqueue(new Product() { Category = "Category" + num, Name = "Name" + num, SellPrice = num }); }); /*採用並行化整個查詢 查詢符合條件的數據*/ Stopwatch sw = new Stopwatch(); sw.Restart(); var productListLinq = from product in products.AsParallel().WithExecutionMode(ParallelExecutionMode.ForceParallelism) where (product.Name.Contains("1") && product.Name.Contains("2") && product.Category.Contains("1") && product.Category.Contains("2")) select product; Console.WriteLine("採用並行化整個查詢 查詢得出數量爲:{0}", productListLinq.Count()); sw.Stop(); Console.WriteLine("採用並行化整個查詢 耗時:{0}", sw.ElapsedMilliseconds); /*採用默認設置 由.NET進行決策 查詢符合條件的數據*/ sw.Restart(); var productListPLinq = from product in products.AsParallel().WithExecutionMode(ParallelExecutionMode.Default) where (product.Name.Contains("1") && product.Name.Contains("2") && product.Category.Contains("1") && product.Category.Contains("2")) select product; Console.WriteLine("採用默認設置 由.NET進行決策 查詢得出數量爲:{0}", productListPLinq.Count()); sw.Stop(); Console.WriteLine("採用默認設置 由.NET進行決策 耗時:{0}", sw.ElapsedMilliseconds); Console.ReadLine(); } } class Product { public string Name { get; set; } public string Category { get; set; } public int SellPrice { get; set; } }
經過PLINQ執行歸約操做
PLINQ能夠簡化對一個序列或者一個組中全部成員應用一個函數的過程,這個過程稱之爲歸約操做,如在PLINQ查詢中使用相似於Average,Max,Min,Sum之類的聚合函數就能夠充分利用並行所帶來好處。
並行執行的規約和串行執行的規約的執行結果可能會不一樣,由於在操做不能同時知足可交換和可傳遞的狀況下產生攝入,在每次執行的時候,序列或組中的元素在不一樣並行任務中分佈可能也會有區別,於是在這種操做的狀況下可能會產生不一樣的最終結果,所以,必定要經過對於的串行版原本興義原始的數據源,這樣才能幫助PLINQ得到最優的執行結果。
下面貼代碼:
class MRESDemo { /*code:釋迦苦僧*/ static void Main() { ConcurrentQueue<int> products = new ConcurrentQueue<int>(); /*向集合中添加多條數據*/ Parallel.For(0, 6000000, (num) => { products.Enqueue(num); }); /*採用LINQ 返回 IEumerable<int>*/ var productListLinq = (from product in products select product).Average(); Console.WriteLine("採用Average計算平均值:{0}", productListLinq); /*採用PLINQ 返回 ParallelQuery<int>*/ var productListPLinq = (from product in products.AsParallel() select product).Average(); Console.WriteLine("採用Average計算平均值:{0}", productListPLinq); Console.ReadLine(); } }
如上述代碼所示
在LINQ版本中,該方法會返回一個 IEumerable<int>,即調用 Eumerable.Range方法生成指定範圍整數序列的結果,
在PLINQ版本中,該方法會返回一個 ParallelQuery<int>,即調用並行版本中System.Linq.ParallelEumerable的ParallelEumerable.Range方法,經過這種方法獲得的結果序列也是並行序列,能夠再PLINQ中並行運行。
若是想對特定數據源進行LINQ查詢時,能夠定義爲 private IEquatable<int> products
若是想對特定數據源進行PLINQ查詢時,能夠定義爲 private ParallelQuery<int> products
併發PLINQ任務
class MRESDemo { /*code:釋迦苦僧*/ static void Main() { ConcurrentQueue<Product> products = new ConcurrentQueue<Product>(); /*向集合中添加多條數據*/ Parallel.For(0, 600000, (num) => { products.Enqueue(new Product() { Category = "Category" + num, Name = "Name" + num, SellPrice = num }); }); CancellationTokenSource cts = new CancellationTokenSource(); /*建立tk1 任務 查詢 符合 條件的數據*/ Task<ParallelQuery<Product>> tk1 = new Task<ParallelQuery<Product>>((ct) => { Console.WriteLine("開始執行 tk1 任務", products.Count); Console.WriteLine("tk1 任務中 數據結果集數量爲:{0}", products.Count); var result = products.AsParallel().Where(p => p.Name.Contains("1") && p.Name.Contains("2")); return result; }, cts.Token); /*建立tk2 任務,在執行tk1任務完成 基於tk1的結果查詢 符合 條件的數據*/ Task<ParallelQuery<Product>> tk2 = tk1.ContinueWith<ParallelQuery<Product>>((tk) => { Console.WriteLine("開始執行 tk2 任務", products.Count); Console.WriteLine("tk2 任務中 數據結果集數量爲:{0}", tk.Result.Count()); var result = tk.Result.Where(p => p.Category.Contains("1") && p.Category.Contains("2")); return result; }, TaskContinuationOptions.OnlyOnRanToCompletion); /*建立tk3 任務,在執行tk1任務完成 基於tk1的結果查詢 符合 條件的數據*/ Task<ParallelQuery<Product>> tk3 = tk1.ContinueWith<ParallelQuery<Product>>((tk) => { Console.WriteLine("開始執行 tk3 任務", products.Count); Console.WriteLine("tk3 任務中 數據結果集數量爲:{0}", tk.Result.Count()); var result = tk.Result.Where(p => p.SellPrice > 1111 && p.SellPrice < 222222); return result; }, TaskContinuationOptions.OnlyOnRanToCompletion); tk1.Start(); Task.WaitAll(tk1, tk2, tk3); Console.WriteLine("tk2任務結果輸出,篩選後記錄總數爲:{0}", tk2.Result.Count()); Console.WriteLine("tk3任務結果輸出,篩選後記錄總數爲:{0}", tk3.Result.Count()); tk1.Dispose(); tk2.Dispose(); tk3.Dispose(); cts.Dispose(); Console.ReadLine(); } } class Product { public string Name { get; set; } public string Category { get; set; } public int SellPrice { get; set; } }
如代碼所示tk1,tk2,tk3三個任務,tk2,tk3任務的運行須要基於tk1任務的結果,所以,參數中指定了TaskContinuationOptions.OnlyOnRanToCompletion,經過這種方式,每一個被串聯的任務都會等待以前的任務完成以後纔開始執行,tk2,tk3在tk1執行完成後,這兩個任務的PLINQ查詢能夠並行運行,並將會可能地使用多個邏輯內核。
取消PLINQ WithCancellation
經過WithCancellation取消當前PLINQ正在執行的查詢操做,代碼以下:
class MRESDemo { /*code:釋迦苦僧*/ static void Main() { ConcurrentQueue<Product> products = new ConcurrentQueue<Product>(); /*向集合中添加多條數據*/ Parallel.For(0, 600000, (num) => { products.Enqueue(new Product() { Category = "Category" + num, Name = "Name" + num, SellPrice = num }); }); CancellationTokenSource cts = new CancellationTokenSource(); CancellationToken token = cts.Token; /*建立tk1 任務 查詢 符合 條件的數據*/ Task<ParallelQuery<Product>> tk1 = new Task<ParallelQuery<Product>>((ct) => { var result = products.AsParallel(); try { Console.WriteLine("開始執行 tk1 任務", products.Count); Console.WriteLine("tk1 任務中 數據結果集數量爲:{0}", products.Count); result = products.AsParallel().WithCancellation(token).Where(p => p.Name.Contains("1") && p.Name.Contains("2")); } catch (AggregateException ex) { foreach (Exception e in ex.InnerExceptions) { Console.WriteLine("tk3 錯誤:{0}", e.Message); } } return result; }, cts.Token); /*建立tk2 任務,在執行tk1任務完成 基於tk1的結果查詢 符合 條件的數據*/ Task<ParallelQuery<Product>> tk2 = tk1.ContinueWith<ParallelQuery<Product>>((tk) => { var result = tk.Result; try { Console.WriteLine("開始執行 tk2 任務", products.Count); Console.WriteLine("tk2 任務中 數據結果集數量爲:{0}", tk.Result.Count()); result = tk.Result.WithCancellation(token).Where(p => p.Category.Contains("1") && p.Category.Contains("2")); } catch (AggregateException ex) { foreach (Exception e in ex.InnerExceptions) { Console.WriteLine("tk3 錯誤:{0}", e.Message); } } return result; }, TaskContinuationOptions.OnlyOnRanToCompletion); /*建立tk3 任務,在執行tk1任務完成 基於tk1的結果查詢 符合 條件的數據*/ Task<ParallelQuery<Product>> tk3 = tk1.ContinueWith<ParallelQuery<Product>>((tk) => { var result = tk.Result; try { Console.WriteLine("開始執行 tk3 任務", products.Count); Console.WriteLine("tk3 任務中 數據結果集數量爲:{0}", tk.Result.Count()); result = tk.Result.WithCancellation(token).Where(p => p.SellPrice > 1111 && p.SellPrice < 222222); } catch (AggregateException ex) { foreach (Exception e in ex.InnerExceptions) { Console.WriteLine("tk3 錯誤:{0}", e.Message); } } return result; }, TaskContinuationOptions.OnlyOnRanToCompletion); tk1.Start(); try { Thread.Sleep(10); cts.Cancel();//取消任務 Task.WaitAll(tk1, tk2, tk3); Console.WriteLine("tk2任務結果輸出,篩選後記錄總數爲:{0}", tk2.Result.Count()); Console.WriteLine("tk3任務結果輸出,篩選後記錄總數爲:{0}", tk3.Result.Count()); } catch (AggregateException ex) { foreach (Exception e in ex.InnerExceptions) { Console.WriteLine("錯誤:{0}", e.Message); } } tk1.Dispose(); tk2.Dispose(); tk3.Dispose(); cts.Dispose(); Console.ReadLine(); } } class Product { public string Name { get; set; } public string Category { get; set; } public int SellPrice { get; set; } }
指定查詢時所需的並行度 WithDegreeOfParallelism
默認狀況下,PLINQ老是會試圖利用全部的可用邏輯內核達到最佳性能,在程序中咱們能夠利用WithDegreeOfParallelism方法指定一個不一樣最大並行度。
下面貼代碼:
/*tk1任務 採用全部可用處理器*/ result = products.AsParallel().WithCancellation(token).WithDegreeOfParallelism(Environment.ProcessorCount).Where(p => p.Name.Contains("1") && p.Name.Contains("2") && p.Category.Contains("1") && p.Category.Contains("2")); /*tk1任務 採用1個可用處理器*/ result = products.AsParallel().WithCancellation(token).WithDegreeOfParallelism(1).Where(p => p.Name.Contains("1") && p.Name.Contains("2") && p.Category.Contains("1") && p.Category.Contains("2"));
好處:若是計算機有8個可用的邏輯內核,PLINQ查詢最多運行4個併發任務,這樣可用使用Parallel.Invoke 加載多個帶有不一樣並行度的PLINQ查詢,有一些PLINQ查詢的可擴展性有限,所以這些選項可用讓您充分利用額外的內核。
使用ForAll 並行遍歷結果
下面貼代碼:
class MRESDemo { /*code:釋迦苦僧*/ static void Main() { ConcurrentQueue<Product> products = new ConcurrentQueue<Product>(); /*向集合中添加多條數據*/ Parallel.For(0, 1000, (num) => { products.Enqueue(new Product() { Category = "Category" + num, Name = "Name" + num, SellPrice = num }); }); products.AsParallel().Where(P => P.Name.Contains("1") && P.Name.Contains("2") && P.Name.Contains("3")).ForAll(product => { Console.WriteLine("Name:{0}", product.Name); }); Console.ReadLine(); } } class Product { public string Name { get; set; } public string Category { get; set; } public int SellPrice { get; set; } }
ForAll是並行,foreach是串行,若是須要以特定的順序處理數據,那麼必須使用上述串行循環或方法。
WithMergeOptions
經過WithMergeOptions擴展方法提示PLINQ應該優先使用哪一種方式合併並行結果片斷,以下:
下面貼代碼查看下差別:
class MRESDemo { /*code:釋迦苦僧*/ static void Main() { Console.WriteLine("當前計算機處理器數:{0}", Environment.ProcessorCount); ConcurrentQueue<Product> products = new ConcurrentQueue<Product>(); /*向集合中添加多條數據*/ Parallel.For(0, 600000, (num) => { products.Enqueue(new Product() { Category = "Category" + num, Name = "Name" + num, SellPrice = num }); }); Stopwatch sw = new Stopwatch(); Thread.Sleep(1000); sw.Restart(); int count = 0; Task tk1 = Task.Factory.StartNew(() => { var result = products.AsParallel().WithMergeOptions(ParallelMergeOptions.AutoBuffered).Where(p => p.Name.Contains("1") && p.Name.Contains("2") && p.Category.Contains("1") && p.Category.Contains("2")); count = result.Count(); }); Task.WaitAll(tk1); sw.Stop(); Console.WriteLine("ParallelMergeOptions.AutoBuffered 耗時:{0},數量:{1}", sw.ElapsedMilliseconds, count); sw.Restart(); int count1 = 0; Task tk2 = Task.Factory.StartNew(() => { var result = products.AsParallel().WithMergeOptions(ParallelMergeOptions.Default).Where(p => p.Name.Contains("1") && p.Name.Contains("2") && p.Category.Contains("1") && p.Category.Contains("2")); count1 = result.Count(); }); Task.WaitAll(tk2); sw.Stop(); Console.WriteLine("ParallelMergeOptions.Default 耗時:{0},數量:{1}", sw.ElapsedMilliseconds, count1); sw.Restart(); int count2 = 0; Task tk3 = Task.Factory.StartNew(() => { var result = products.AsParallel().WithMergeOptions(ParallelMergeOptions.FullyBuffered).Where(p => p.Name.Contains("1") && p.Name.Contains("2") && p.Category.Contains("1") && p.Category.Contains("2")); count2 = result.Count(); }); Task.WaitAll(tk3); sw.Stop(); Console.WriteLine("ParallelMergeOptions.FullyBuffered 耗時:{0},數量:{1}", sw.ElapsedMilliseconds, count2); sw.Restart(); int count3 = 0; Task tk4 = Task.Factory.StartNew(() => { var result = products.AsParallel().WithMergeOptions(ParallelMergeOptions.NotBuffered).Where(p => p.Name.Contains("1") && p.Name.Contains("2") && p.Category.Contains("1") && p.Category.Contains("2")); count3 = result.Count(); }); Task.WaitAll(tk4); sw.Stop(); Console.WriteLine("ParallelMergeOptions.NotBuffered 耗時:{0},數量:{1}", sw.ElapsedMilliseconds, count3); tk4.Dispose(); tk3.Dispose(); tk2.Dispose(); tk1.Dispose(); Console.ReadLine(); } } class Product { public string Name { get; set; } public string Category { get; set; } public int SellPrice { get; set; } }
須要注意的是:每個選項都有其優勢和缺點,所以必定奧測量顯示第一個結果的時間以及完成整個查詢所須要的時間,這點很重要 。
使用PLINQ執行MapReduce算法 ILookup IGrouping
mapreduce ,也稱爲Map/reduce 或者Map&Reduce ,是一種很是流行的框架,可以充分利用並行化處理巨大的數據集,MapReduce的基本思想很是簡單:將數據處理問題分解爲如下兩個獨立且能夠並行執行的操做:
映射(Map)-對數據源進行操做,爲每個數據項計算出一個鍵值對。運行的結果是一個鍵值對的集合,根據鍵進行分組。
規約(Reduce)-對映射操做產生的根據鍵進行分組的全部鍵值對進行操做,對每個組執行歸約操做,這個操做能夠返回一個或多個值。
下面貼代碼,方便你們理解,可是該案列所展現的並非一個純粹的MapReduce算法實現:
class MRESDemo { /*code:釋迦苦僧*/ static void Main() { ConcurrentQueue<string> list = new ConcurrentQueue<string>(); list.Enqueue("A"); list.Enqueue("B"); list.Enqueue("C"); list.Enqueue("D"); list.Enqueue("A"); list.Enqueue("D"); Console.WriteLine("Select......."); list.AsParallel().Select(p => new { Name = p, Count = 1 }).ForAll((p) => { Console.WriteLine("{0}\t{1}", p.Name, p.Count); }); Console.WriteLine("ILookup......."); /*map操做生成的鍵值對由一個單詞和數量1組成,該代碼意在將每一個單詞做爲鍵並將1做爲值加入*/ ILookup<string, int> map = list.AsParallel().ToLookup(p => p, k => 1); foreach (var v in map) { Console.Write(v.Key); foreach (int val in v) Console.WriteLine("\t{0}", val); } /*reduce操做單詞出現的次數*/ var reduce = from IGrouping<string, int> reduceM in map.AsQueryable() select new { key = reduceM.Key, count = reduceM.Count() }; Console.WriteLine("IGrouping......."); foreach (var v in reduce) { Console.Write(v.key); Console.WriteLine("\t{0}", v.count); } Console.ReadLine(); } }
關於PLINQ:聲明式數據並行就寫到這,主要是PLINQ下的查詢注意項和查詢調優的一些擴展方法。若有問題,歡迎指正。