1、並行編程 - 數據並行 System.Threading.Tasks.Parallel 類

1、並行概念

一、並行編程

      在.NET 4中的並行編程是依賴Task Parallel Library(後面簡稱爲TPL) 實現的。在TPL中,最基本的執行單元是task(中文能夠理解爲"任務"),一個task就表明了你要執行的一個操做。你能夠爲你所要執行的每個操做定義一個task,TPL就負責建立線程來執行你所定義的task,而且管理線程。TPL是面向task的,自動的;而傳統的多線程是以人工爲導向的。html

如今已經進入了多核的時代,咱們的程序如何更多的利用好硬件cpu,答案是並行處理。在.net4.0以前咱們要開發並行的程序是很是的困難,在.net4.0中,在命名空間System.Threading.Tasks提供了方便的並行開發的類庫。spring

二、數據並行

      數據並行指的是對源集合或數組的元素同時(即,並行)執行相同操做的場景。 在數據並行操做中,對源集合進行分區,以便多個線程可以同時在不一樣的網段上操做。編程

      任務並行庫 (TPL) 支持經過 System.Threading.Tasks.Parallel 類實現的數據並行。 此類對 for 循環和 foreach 循環提供了基於方法的並行執行。你爲Parallel.For 或 Parallel.ForEach 循環編寫的循環邏輯與編寫連續循環的類似。 無需建立線程或列工做項。 在基本循環中,不須要加鎖。TPL 爲你處理全部低級別的工做。數組

      Parallel.For()和Parallel.ForEach()方法屢次調用同一個方法,而Parallel.Invoke()方法容許同時調用不一樣的方法。多線程

2、Parallel.Invoke():並行調用多個任務 。

例1:同時調用2個任務oop

static void Main(string[] args)
{
    var watch = Stopwatch.StartNew();
 Parallel.Invoke(Run1, Run2);
    watch.Stop();
    Console.WriteLine("我是並行開發,總共耗時:{0}", watch.ElapsedMilliseconds)
}

static void Run1()
{
    Console.WriteLine("我是任務一,我跑了3s");
    Thread.Sleep(3000);
}

static void Run2()
{
    Console.WriteLine("我是任務二,我跑了5s");
    Thread.Sleep(5000);
}

例2:說明並非每一個任務一個線程。測試

// 定義一個線程局部變量,返回其線程名
ThreadLocal<string> ThreadName = new ThreadLocal<string>(() =>
{
    return "Thread" + Thread.CurrentThread.ManagedThreadId;
});

//  打印出當前線程名的方法。 
Action action = () =>
{
    // 若是 ThreadName.IsValueCreated 爲true,在這個線程上不是第一次運行這個方法。
    bool repeat = ThreadName.IsValueCreated;
    Console.WriteLine("ThreadName = {0} {1}", ThreadName.Value, repeat ? "(repeat)" : "");
};

// 調用8個方法,你應該會看到一些重複的線程名
Parallel.Invoke(action, action, action, action, action, action, action, action);
ThreadName.Dispose();

image

3、Parallel.For(): for 循環的並行運算 

      咱們知道串行代碼中也有一個for,可是那個for並無用到多核,而Paraller.for它會在底層根據硬件線程的運行情況來充分的使用全部的可利用的硬件線程,注意這裏的Parallel.for的步行是1。this

      在For()方法中,前兩個參數定義了循環的開頭和結束。示例從0迭代到9。第3個參數是一個 Action<int>委託。整數參數是循環的迭代次數,該參數被傳遞給Action < int >委託引用的方法。 Parallel.For方法的返回類型是ParallelLoopResult結構,它提供了循環是否結束的信息。spa

ParallelLoopResult result = Parallel.For(0, 10, i =>
 {
     Console.WriteLine("{0}, task: {1}, thread: {2}", i, Task.CurrentId, Thread.CurrentThread.ManagedThreadId);
     Thread.Sleep(10);
 });
Console.WriteLine(result.IsCompleted);


首先先寫一個普通的循環:.net

private void NormalFor()
{
    for (var i = 0; i < 10000; i++)
    {
        for (var j = 0; j < 1000; j++)
        {
            for (var k = 0; k < 100; k++)
            {
                DoSomething();
            }
        }
    }
}

再看一個並行的For語句:

private void ParallelFor()
{
    Parallel.For(0, 10000, i => { for (int j = 0; j < 1000; j++) { for (var k = 0; k < 100; k++) { DoSomething(); } } }); }

上面的例子中,只是將最外層的For語句替換成了Parallel.For,Parallel執行速度能夠提升近一倍。

4、Parallel.ForEach():foreach 循環的並行運算 

private void NormalForeach() 
{
    foreach (var file in GetFiles())
    {
        DoSomething();
    }
     
}
 
private void ParallelForeach()
{
    Parallel.ForEach(GetFiles(), file => {
        DoSomething();
    });
}

ForEach的使用跟For使用幾乎是差很少了,只是在對非泛型的Collection進行操做的時候,須要經過Cast方法進行轉換。

ForEach的獨到之處就是能夠將數據進行分區,每個小區內實現串行計算,分區採用Partitioner.Create實現。

for (int j = 1; j < 4; j++)
{
    Console.WriteLine("\n第{0}次比較", j);
    ConcurrentBag<int> bag = new ConcurrentBag<int>();
    var watch = Stopwatch.StartNew();
    watch.Start();
    for (int i = 0; i < 3000000; i++)
    {
        bag.Add(i);
    }
    Console.WriteLine("串行計算:集合有:{0},總共耗時:{1}", bag.Count, watch.ElapsedMilliseconds);

    GC.Collect();
    bag = new ConcurrentBag<int>();
    watch = Stopwatch.StartNew();
    watch.Start();
    Parallel.ForEach(Partitioner.Create(0, 3000000), i =>
    {
        for (int m = i.Item1; m < i.Item2; m++)
        {
            bag.Add(m);
        }
    });
    Console.WriteLine("並行計算:集合有:{0},總共耗時:{1}", bag.Count, watch.ElapsedMilliseconds);
    GC.Collect();
}

5、線程局部變量

下面這段代碼屢次運行每次的結果都不同,由於total變量是公共的,而咱們的程序是多個線程的加,而多個線程之間是不能把數據共享的。

public void NormalParallelTest()
{
    int[] nums = Enumerable.Range(0, 1000000).ToArray();
    long total = 0;
    Parallel.For(0,nums.Length,i=>
    {
        total += nums[i];
    });
    Console.WriteLine("The total is {0}", total);
}

其實咱們須要的是在每一個線程中計算出一個和值,而後再進行累加。咱們來看看線程局部變量:

泛型方法Parallel.For<T>的原型:

public static ParallelLoopResult For<TLocal>
(int fromInclusive, int toExclusive, 
 Func<TLocal> localInit, 
 Func<int, ParallelLoopState, TLocal, TLocal> body,
 Action<TLocal> localFinally);
  • TLocal:線程變量的類型;第一個、第二個參數就沒必要多說了,就是起始值跟結束值。
  • localInit:每一個線程的線程局部變量初始值的設置;
  • body:每次循環執行的方法,其中方法的最後一個參數就是線程局部變量;
  • localFinally:每一個線程以後執行的方法。

一、Parallel.For中定義局部變量:

從2開始,累加2個,得49.

int[] nums = Enumerable.Range(0, 10).ToArray();
long total = 0;

Parallel.For<long>(0, nums.Length, () => { return 2; }, 

    (j, loop, subtotal) =>//一、每次循環執行的方法
    {
        subtotal += nums[j];
        Console.WriteLine("主體: thread {1}, task {2},結果:{0}", j+ ":" +nums[j] + "-" + subtotal,     Thread.CurrentThread.ManagedThreadId, Task.CurrentId);
    
        return subtotal;
    },

    (x) =>//二、每一個線程執行以後執行的方法
    {

        Console.WriteLine(" 最終執行:thread {1}, task {2},結果:{0} ", x, Thread.CurrentThread.ManagedThreadId, Task.CurrentId);
        Interlocked.Add(ref total, x);
    });
Console.WriteLine("The total is {0}", total);

image

二、Parallel.Each中定義局部變量:

要注意的是,咱們必需要使用ForEach<TSource, TLocal>,由於第一個參數表示的是迭代源的類型,第二個表示的是線程局部變量的類型,其方法的參數跟For是差很少的。

public void ForeachThreadLocalTest()
{
    int[] nums = Enumerable.Range(0, 1000000).ToArray();
    long total = 0;
    Parallel.ForEach<int,long>(nums,()=>0,

 (member,loopState,subTotal)=>//一、每次循環執行的方法 { subTotal += member; return subTotal; },
 (perLocal)=>//二、每一個線程執行以後執行的方法 
 Interlocked.Add(ref total,perLocal) ); Console.WriteLine("The total is {0}", total); }

6、Break、Stop中斷與中止線程

      在並行循環的委託參數中提供了一個ParallelLoopState,該實例提供了Break和Stop方法來幫咱們實現。

  • Break「中斷」:表示完成當前線程上當前迭代以前的全部線程上的全部迭代,而後退出循環。(好比並行計算正在迭代100,那麼break後程序還會迭代全部小於100的。)
  • Stop「中止」:表示在方便的狀況下儘快中止全部迭代。(好比正在迭代100忽然遇到stop,那它啥也無論了,直接退出。)

首先咱們能夠看到在Parallel.For的一個重載方法:

public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int, ParallelLoopState > body)

在委託的最後一個參數類型爲ParallelLoopState,而ParallelLoopState裏面提供給咱們兩個方法:Break、Stop來終止迭代。

private void StopLoop()
{
    var Stack = new ConcurrentStack<string>();
    Parallel.For(0, 10000, (i, loopState
) =>
    {
        if (i < 1000)
            Stack.Push(i.ToString());
        else
        {
            loopState.Stop();
            return;
        }
    });
    Console.WriteLine("Stop Loop Info:\n elements count:{0}", Stack.Count);    
}

7、Cancel取消循環

      在並行的循環中支持經過傳遞ParallelOptions參數中的CancellationToken進行取消循環的控制,咱們能夠CancellationTokenSource實例化以後傳遞給ParallelOptions對象Cancellation值。下面來看個示例:

      在For循環的實現代碼內部,Parallel類驗證CancellationToken 的結果,並取消操做。一旦取消操做,For()方法就拋出個OperationCanceledException類型的異常,這是本例捕獲的異常。使用 CancellationTokeri能夠註冊取消操做時的信息。爲此,須要調用Register方法,並傳遞一個在取消 操做時調用的委託。

var cts = new CancellationTokenSource();
cts.Token.Register(() =>Console.WriteLine("*** token canceled"));

// start a task that sends a cancel after 500 ms
new Task(() =>
{
    Thread.Sleep(500);
    cts.Cancel(false);
}).Start();

try
{
    ParallelLoopResult result =
    Parallel.For(0, 100,
    new ParallelOptions()
    {
        CancellationToken = cts.Token,
    },
    x =>
    {
        Console.WriteLine("loop {0} started", x);
        int sum = 0;
        for (int i = 0; i < 100; i++)
        {
            Thread.Sleep(2);
            sum += i;
        }
        Console.WriteLine("loop {0} finished", x);
    });
}
catch (OperationCanceledException ex)
{
    Console.WriteLine(ex.Message);
}

8、Handel Exceptions異常處理

      在處理並行循環的異常的與順序循環異常的處理是有所不一樣的,並行循環裏面可能會一個異常在多個循環中出現,或則一個線程上的異常致使另一個線程上也出現異常。比較好的處理方式就是,首先獲取全部的異常最後經過AggregateException來包裝全部的循環的異常,循環結束後進行throw。看一段示例代碼:

private void HandleNumbers(int[] numbers)
{
    var exceptions = new ConcurrentQueue<Exception>();
    Parallel.For(0, numbers.Length, i => 
    {
        try
        {
            if (numbers[i] > 10 && numbers[i] < 20)
            {
                throw new Exception(String.Format("numbers[{0}] betwewn 10 to 20",i));
            }
        }
        catch (Exception e)
        {
            exceptions.Enqueue(e);
        }
    });
   
       if (exceptions.Count > 0 ) throw new AggregateException(exceptions); }

測試方法:

public void HandleExceptions()
{
    var numbers = Enumerable.Range(0, 10000).ToArray();
    try
    {
        this.HandleNumbers(numbers);
    }
    catch(AggregateException exceptions)
    {
        foreach (var ex in exceptions.InnerExceptions)
        {
            Console.WriteLine(ex.Message);
        }
    }
}

對上面的方法說明下,在HandleNumbers方法中,就是一個小的demo若是元素的值出如今10-20之間就拋出異常。在上面咱們的處理方法就是:在循環時經過隊列將全部的異常都集中起來,循環結束後來拋出一個AggregateException。

相關文章
相關標籤/搜索