沿用微軟的寫法,System.Threading.Tasks.Parallel類,提供對並行循環和區域的支持。 咱們會用到的方法有For,ForEach,Invoke。html
Program.Data = new List<int>(); for (int i = 0; i < 10; i++) { Data.Add(i); }
下面咱們定義4個方法,分別爲for,foreach,並行For,並行ForEach。並測試他們的運行時長。算法
/// <summary> /// 是否顯示執行過程 /// </summary> public bool ShowProcessExecution = false; /// <summary> /// 這是普通循環for /// </summary> private void Demo1() { List<int> data = Program.Data; DateTime dt1 = DateTime.Now; for (int i = 0; i < data.Count; i++) { Thread.Sleep(500); if (ShowProcessExecution) Console.WriteLine(data[i]); } DateTime dt2 = DateTime.Now; Console.WriteLine("普通循環For運行時長:{0}毫秒。", (dt2 - dt1).TotalMilliseconds); } /// <summary> /// 這是普通循環foreach /// </summary> private void Demo2() { List<int> data = Program.Data; DateTime dt1 = DateTime.Now; foreach (var i in data) { Thread.Sleep(500); if (ShowProcessExecution) Console.WriteLine(i); } DateTime dt2 = DateTime.Now; Console.WriteLine("普通循環For運行時長:{0}毫秒。", (dt2 - dt1).TotalMilliseconds); } /// <summary> /// 這是並行計算For /// </summary> private void Demo3() { List<int> data = Program.Data; DateTime dt1 = DateTime.Now; Parallel.For(0, data.Count, (i) => { Thread.Sleep(500); if (ShowProcessExecution) Console.WriteLine(data[i]); }); DateTime dt2 = DateTime.Now; Console.WriteLine("並行運算For運行時長:{0}毫秒。", (dt2 - dt1).TotalMilliseconds); } /// <summary> /// 這是並行計算ForEach /// </summary> private void Demo4() { List<int> data = Program.Data; DateTime dt1 = DateTime.Now; Parallel.ForEach(data, (i) => { Thread.Sleep(500); if (ShowProcessExecution) Console.WriteLine(i); }); DateTime dt2 = DateTime.Now; Console.WriteLine("並行運算ForEach運行時長:{0}毫秒。", (dt2 - dt1).TotalMilliseconds); }
下面是運行結果:數組
這裏咱們能夠看出並行循環在執行效率上的優點了。安全
結論1:在對一個數組內的每個項作單獨處理時,徹底能夠選擇並行循環的方式來提高執行效率。多線程
原理1:並行計算的線程開啓是緩步開啓的,線程數量1,2,4,8緩步提高。(不詳,PLinq最多64個線程,可能這也是64)函數
當在進行循環時,偶爾會須要中斷循環或跳出循環。下面是兩種跳出循環的方法Stop和Break,LoopState是循環狀態的參數。oop
/// <summary> /// 中斷Stop /// </summary> private void Demo5() { List<int> data = Program.Data; Parallel.For(0, data.Count, (i, LoopState) => { if (data[i] > 5) LoopState.Stop(); Thread.Sleep(500); Console.WriteLine(data[i]); }); Console.WriteLine("Stop執行結束。"); } /// <summary> /// 中斷Break /// </summary> private void Demo6() { List<int> data = Program.Data; Parallel.ForEach(data, (i, LoopState) => { if (i > 5) LoopState.Break(); Thread.Sleep(500); Console.WriteLine(i); }); Console.WriteLine("Break執行結束。"); }
執行結果以下:測試
結論2:使用Stop會當即中止循環,使用Break會執行完畢全部符合條件的項。spa
上面的應用場景其實並非很是多見,畢竟只是爲了遍歷一個數組內的資源,咱們更多的時候是爲了遍歷資源,找到咱們所須要的。那麼請繼續看。線程
下面是咱們通常會想到的寫法:
private void Demo7() { List<int> data = new List<int>(); Parallel.For(0, Program.Data.Count, (i) => { if (Program.Data[i] % 2 == 0) data.Add(Program.Data[i]); }); Console.WriteLine("執行完成For."); } private void Demo8() { List<int> data = new List<int>(); Parallel.ForEach(Program.Data, (i) => { if (Program.Data[i] % 2 == 0) data.Add(Program.Data[i]); }); Console.WriteLine("執行完成ForEach."); }
看起來應該是沒有問題的,可是咱們屢次運行後會發現,偶爾會出現錯誤以下:
這是由於List是非線程安全的類,咱們須要使用System.Collections.Concurrent命名空間下的類型來用於並行循環體內。
類 | 說明 |
BlockingCollection<T> | 爲實現 IProducerConsumerCollection<T> 的線程安全集合提供阻止和限制功能。 |
ConcurrentBag<T> | 表示對象的線程安全的無序集合。 |
ConcurrentDictionary<TKey, TValue> | 表示可由多個線程同時訪問的鍵值對的線程安全集合。 |
ConcurrentQueue<T> | 表示線程安全的先進先出 (FIFO) 集合。 |
ConcurrentStack<T> | 表示線程安全的後進先出 (LIFO) 集合。 |
OrderablePartitioner<TSource> | 表示將一個可排序數據源拆分紅多個分區的特定方式。 |
Partitioner | 提供針對數組、列表和可枚舉項的常見分區策略。 |
Partitioner<TSource> | 表示將一個數據源拆分紅多個分區的特定方式。 |
那麼咱們上面的代碼能夠修改成,加了了ConcurrentQueue和ConcurrentStack的最基本的操做。
/// <summary> /// 並行循環操做集合類,集合內只取5個對象 /// </summary> private void Demo7() { ConcurrentQueue<int> data = new ConcurrentQueue<int>(); Parallel.For(0, Program.Data.Count, (i) => { if (Program.Data[i] % 2 == 0) data.Enqueue(Program.Data[i]);//將對象加入到隊列末尾 }); int R; while (data.TryDequeue(out R))//返回隊列中開始處的對象 { Console.WriteLine(R); } Console.WriteLine("執行完成For."); } /// <summary> /// 並行循環操做集合類 /// </summary> private void Demo8() { ConcurrentStack<int> data = new ConcurrentStack<int>(); Parallel.ForEach(Program.Data, (i) => { if (Program.Data[i] % 2 == 0) data.Push(Program.Data[i]);//將對象壓入棧中 }); int R; while (data.TryPop(out R))//彈出棧頂對象 { Console.WriteLine(R); } Console.WriteLine("執行完成ForEach."); }
ok,這裏返回一個序列的問題也解決了。
結論3:在並行循環內重複操做的對象,必需要是thread-safe(線程安全)的。集合類的線程安全對象所有在System.Collections.Concurrent命名空間下。
使用循環的時候常常也會用到迭代,那麼在並行循環中叫作 含有局部變量的循環 。下面的代碼中詳細的解釋。
/// <summary> /// 具備線程局部變量的For循環 /// </summary> private void Demo9() { List<int> data = Program.Data; long total = 0; //這裏定義返回值爲long類型方便下面各個參數的解釋 Parallel.For<long>(0, // For循環的起點 data.Count, // For循環的終點 () => 0, // 初始化局部變量的方法(long),既爲下面的subtotal的初值 (i, LoopState, subtotal) => // 爲每一個迭代調用一次的委託,i是當前索引,LoopState是循環狀態,subtotal爲局部變量名 { subtotal += data[i]; // 修改局部變量 return subtotal; // 傳遞參數給下一個迭代 }, (finalResult) => Interlocked.Add(ref total, finalResult) //對每一個線程結果執行的最後操做,這裏是將全部的結果相加 ); Console.WriteLine(total); } /// <summary> /// 具備線程局部變量的ForEach循環 /// </summary> private void Demo10() { List<int> data = Program.Data; long total = 0; Parallel.ForEach<int, long>(data, // 要循環的集合對象 () => 0, // 初始化局部變量的方法(long),既爲下面的subtotal的初值 (i, LoopState, subtotal) => // 爲每一個迭代調用一次的委託,i是當前元素,LoopState是循環狀態,subtotal爲局部變量名 { subtotal += i; // 修改局部變量 return subtotal; // 傳遞參數給下一個迭代 }, (finalResult) => Interlocked.Add(ref total, finalResult) //對每一個線程結果執行的最後操做,這裏是將全部的結果相加 ); Console.WriteLine(total); }
結論4:並行循環中的迭代,確實很傷人。代碼太難理解了。
上面介紹完了For和ForEach的並行計算盛宴,微軟也沒忘記在Linq中加入並行計算。下面介紹Linq中的並行計算。
4.0中在System.Linq命名空間下加入了下面幾個新的類:
類 | 說明 |
ParallelEnumerable | 提供一組用於查詢實現 ParallelQuery{TSource} 的對象的方法。這是 Enumerable 的並行等效項。 |
ParallelQuery | 表示並行序列。 |
ParallelQuery<TSource> | 表示並行序列。 |
原理2:PLinq最多會開啓64個線程
原理3:PLinq會本身判斷是否能夠進行並行計算,若是不行則會以順序模式運行。
原理4:PLinq會在昂貴的並行算法或成本較低的順序算法之間進行選擇,默認狀況下它選擇順序算法。
在ParallelEnumerable中提供的並行化的方法
ParallelEnumerable 運算符 | 說明 |
AsParallel() | PLINQ 的入口點。指定若是可能,應並行化查詢的其他部分。 |
AsSequential() | 指定查詢的其他部分應像非並行 LINQ 查詢同樣按順序運行。 |
AsOrdered() | 指定 PLINQ 應保留查詢的其他部分的源序列排序,直到例如經過使用 orderby 子句更改排序爲止。 |
AsUnordered() | 指定查詢的其他部分的 PLINQ 不須要保留源序列的排序。 |
WithCancellation() | 指定 PLINQ 應按期監視請求取消時提供的取消標記和取消執行的狀態。 |
WithDegreeOfParallelism() | 指定 PLINQ 應當用來並行化查詢的處理器的最大數目。 |
WithMergeOptions() | 提供有關 PLINQ 應當如何(若是可能)將並行結果合併回到使用線程上的一個序列的提示。 |
WithExecutionMode() | 指定 PLINQ 應當如何並行化查詢(即便默認行爲是按順序運行查詢)。 |
ForAll() | 多線程枚舉方法,與循環訪問查詢結果不一樣,它容許在不首先合併回到使用者線程的狀況下並行處理結果。 |
Aggregate() 重載 | 對於 PLINQ 惟一的重載,它啓用對線程本地分區的中間聚合以及一個用於合併全部分區結果的最終聚合函數。 |
下面是PLinq的簡單代碼
/// <summary> /// PLinq簡介 /// </summary> private void Demo11() { var source = Enumerable.Range(1, 10000); //查詢結果按source中的順序排序 var evenNums = from num in source.AsParallel().AsOrdered() where num % 2 == 0 select num; //ForAll的使用 ConcurrentBag<int> concurrentBag = new ConcurrentBag<int>(); var query = from num in source.AsParallel() where num % 10 == 0 select num; query.ForAll((e) => concurrentBag.Add(e * e)); }
上面代碼中使用了ForAll,ForAll和foreach的區別以下:
PLinq的東西很繁雜,可是都只是幾個簡單的方法,熟悉下方法就行了。
原文地址:http://www.cnblogs.com/sorex/archive/2010/09/16/1828214.html