Parallel類是對線程的抽象,提供數據與任務的並行性。類定義了靜態方法For和ForEach,使用多個任務來完成多個做業。Parallel.For和Parallel.ForEach方法在每次迭代的時候調用相同的代碼,而Parallel.Invoke()方法容許同時調用不一樣的方法。Parallel.ForEach()方法用於數據的並行性,Parallel.Invoke()方法用於任務的並行性。緩存
For()方法用於屢次執行一個任務,能夠並行運行迭代,但迭代的順序並沒指定。For()方法前兩個參數爲定義循環的開始和結束,第三個參數爲Action<int>委託。方法的返回值是ParallelLoopResult結構,它提供了是否結束的信息。如如下循環方法,不能保證輸出順序: 多線程
static void ParallelFor() { ParallelLoopResult result = Parallel.For(0, 10, async i => { Console.WriteLine("{0}, task: {1}, thread: {2}", i, Task.CurrentId, Thread.CurrentThread.ManagedThreadId); await Task.Delay(10);//異步方法,用於釋放線程供其餘任務使用。完成後,可能看不到方法的輸出,由於主(前臺線)程結束,全部的後臺線程也將結束 Console.WriteLine("{0}, task: {1}, thread: {2}", i, Task.CurrentId, Thread.CurrentThread.ManagedThreadId); }); Console.WriteLine("Is completed: {0}", result.IsCompleted); }
異步功能雖然方便,可是知道後臺發生了什麼仍然重要,必須留意。異步
能夠根據條件提早中止For()方法,而沒必要完成所有的迭代。,傳入參數ParallelLoopState的對象,調用Break()方法或者Stop()方法。如調用Break()方法,當迭代值大於15的時候中斷(當前線程結束,相似於普通for的Continue),但其餘任務能夠同時運行,有其餘值的任務也能夠運行(若是當前線程是主線程,那麼就等同於Stop(),結束全部線程)。Stop()方法結束的是全部操做(相似於普通for的Break)。利用LowestBreakIteration屬性能夠忽略其餘任務的結果:async
static void ParallelFor() { ParallelLoopResult result = Parallel.For(10, 40, (int i, ParallelLoopState pls) => { Console.WriteLine("i: {0} task {1}", i, Task.CurrentId); Thread.Sleep(10); if (i > 15) pls.Break(); }); Console.WriteLine("Is completed: {0}", result.IsCompleted); if (!result.IsCompleted) Console.WriteLine("lowest break iteration: {0}", result.LowestBreakIteration); }
For()方法可使用幾個線程執行循環。若是要對每一個線程進行初始化,就須要使用到For<TLocal>(int, int, Func<TLocal>, Func<int, ParallelLoopState, TLocal, TLocal> , Action<TLocal>)方法。oop
static void ParallelFor() { Parallel.For<string>(0, 20, () => { // invoked once for each thread Console.WriteLine("init thread {0}, task {1}", Thread.CurrentThread.ManagedThreadId, Task.CurrentId); return String.Format("t{0}", Thread.CurrentThread.ManagedThreadId); }, (i, pls, str1) => { // invoked for each member Console.WriteLine("body i {0} str1 {1} thread {2} task {3}", i, str1, Thread.CurrentThread.ManagedThreadId, Task.CurrentId); Thread.Sleep(10); return String.Format("i {0}", i); }, (str1) => { // final action on each thread Console.WriteLine("finally {0}", str1); }); }
ForEach()方法遍歷實現了IEnumerable的集合,其方式相似於foreach語句,可是以異步方式遍歷,沒有肯定的順序。若是要中斷循環,一樣能夠採用ParallelLoopState參數。ForEach<TSource>有許多泛型的重載方法。spa
static void ParallelForeach() { string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten", "eleven", "twelve" }; ParallelLoopResult result = Parallel.ForEach<string>(data, s => { Console.WriteLine(s); }); Parallel.ForEach<string>(data, (s, pls, l) => { Console.WriteLine("{0} {1}", s, l); }); }
若是有多個任務並行,可使用Parallel.Invoke()方法,它提供任務的並行性模式:.net
static void ParallelInvoke() { Parallel.Invoke(Foo, Bar); } static void Foo() { Console.WriteLine("foo"); } static void Bar() { Console.WriteLine("bar"); }
在For()方法的重載方法中,能夠傳遞一個ParallelOptions類型的參數,利用此參數能夠傳遞一個CancellationToken參數。使用CancellationTokenSource對象用於註冊CancellationToken,並容許調用Cancel方法用於取消操做。pwa
一旦取消操做,For()方法就拋出一個OperationCanceledException類型的異常,使用CancellationToken能夠註冊取消操做時的信息。調用Register方法,傳遞一個在取消操做時調用的委託。經過取消操做,能夠將其餘的迭代操做在啓動以前取消,但已經啓動的迭代操做容許完成。取消操做是以協做方式進行的,以免在取消迭代操做的中間泄露資源。線程
static void CancelParallelLoop() { var cts = new CancellationTokenSource(); cts.Token.ThrowIfCancellationRequested(); cts.Token.Register(() => Console.WriteLine("** token cancelled")); // 在500ms後取消標記 cts.CancelAfter(500); try { ParallelLoopResult result = Parallel.For(0, 100, new ParallelOptions() { CancellationToken = cts.Token }, x => { Console.WriteLine("loop {0} started", x); int sum = 0; for (int i = 0; i < 100; i++) { Thread.Sleep(2); sum += i; } Console.WriteLine("loop {0} finished", x); }); } catch (OperationCanceledException ex) { Console.WriteLine(ex.Message); } }
使用並行循環時,若出現如下兩個問題,須要使用Partitioner(命名空間 System.Collections.Concurrent中)解決。設計
示例1中,求1000000000之內全部天然數開方的和。第一部分採用直接計算的方式,第二部分採用分區計算。第二部分的Partitioner 會把須要迭代的區間分拆爲多個不一樣的空間,並存入Tuple對象中。
/* 示例1 */
public static void PartitionerTest() { //使用計時器 System.Diagnostics.Stopwatch stopwatch = new System.Diagnostics.Stopwatch(); const int maxValue = 1000000000; long sum = 0; stopwatch.Restart();//開始計時 Parallel.For(0, maxValue, (i) => { Interlocked.Add(ref sum, (long )Math.Sqrt(i));//Interlocked是原子操做,多線程訪問時的線程互斥操做 }); stopwatch.Stop(); Console.WriteLine($"Parallel.For:{stopwatch.Elapsed}");//個人機器運行出的時間是:00:01:37.0391204 var partitioner = System.Collections.Concurrent.Partitioner.Create(0, maxValue);//拆分區間 sum = 0; stopwatch.Restart(); Parallel.ForEach(partitioner, (rang) => { long partialSum = 0; //迭代區間的數據 for(int i=rang.Item1;i<rang.Item2;i++) { partialSum += (long)Math.Sqrt(i); } Interlocked.Add(ref sum, partialSum);//原子操做 }); stopwatch.Stop(); Console.WriteLine($"Parallel.ForEach:{stopwatch.Elapsed}"); //個人機器運行出的時間是:00:00:02.7111666 }
Partitioner的分區是靜態的,只要迭代分區劃分完成,每一個分區上都會運行一個委託。若是某一段區間的迭代次數提早完成,也不會嘗試從新分區並讓處理器分擔工做。 對於任意IEnumerable<T>類型均可以建立不指定區間的分區,但這樣就會讓每一個迭代項目都建立一個委託,而不是對每一個區間建立委託。建立自定義的Partitioner能夠解決這個問題,代碼比較複雜。請自行參閱:http://www.writinghighperf.net/go/20
示例2中,採用一個委託方法來計算兩個數之間的關係值。前一種是每次運行都從新構造委託,後一種是先構造出委託的方法然後每一次調用。
//聲明一個委託 private delegate int MathOp(int x, int y); private int Add(int x,int y) { return x + y; } private int DoOperation(MathOp op,int x,int y) { return op(x, y); } /* * 委託會設計兩類開銷:構造開銷和調用開銷。大多數調用開銷和普通方法的調用差很少。 但委託是一種對象,構造開銷可能至關大,最好是隻作一次構造,而後把對象緩存起來。 */ public void Test() { System.Diagnostics.Stopwatch stopwatch = new System.Diagnostics.Stopwatch(); stopwatch.Restart(); for(int i=0;i<10;i++) { //每一次遍歷循環,都會產生一次構造和調用開銷 DoOperation(Add, 1, 2); } stopwatch.Stop(); Console.WriteLine("Construction and invocation: {0}", stopwatch.Elapsed);//00:00:00.0003812 stopwatch.Restart(); MathOp op = Add;//只產生一次構造開銷 for(int i=0;i<10;i++) { DoOperation(op, 1, 2);//每一次遍歷都只產生遍歷開銷 } stopwatch.Stop(); Console.WriteLine("Once Construction and invocation: {0}", stopwatch.Elapsed);//00:00:00.0000011 }