C#並行編程(3):並行循環

初識並行循環

並行循環主要用來處理數據並行的,如,同時對數組或列表中的多個數據執行相同的操做。編程

在C#編程中,咱們使用並行類System.Threading.Tasks.Parallel提供的靜態方法Parallel.ForParallel.ForEach來實現並行循環。從方法名能夠看出,這兩個方法是對常規循環forforeach的並行化。數組

簡單用法

使用並行循環時須要傳入循環範圍(集合)和操做數據的委託Action<T>多線程

Parallel.For(0, 100, i => { Console.WriteLine(i); });

Parallel.ForEach(Enumerable.Range(0, 100), i => { Console.WriteLine(i); });

使用場景

對於數據的處理須要耗費較長時間的循環適宜使用並行循環,利用多線程加快執行速度。oop

對於簡單的迭代操做,且迭代範圍較小,使用常規循環更好好,由於並行循環涉及到線程的建立、上下文切換和銷燬,使用並行循環反而影響執行效率。性能

對於迭代操做簡單但迭代範圍很大的狀況,咱們能夠對數據進行分區,再執行並行循環,減小線程數量。測試

循環結果

Parallel.ForParallel.ForEach方法的全部重載有着一樣的返回值類型ParallelLoopResult,並行循環結果包含循環是否完成以及最低迭代次數兩項信息。優化

下面的例子使用Parallel.ForEach展現了並行循環的結果。pwa

ParallelLoopResult result = Parallel.ForEach(Enumerable.Range(0, 100), (i,loop) =>
{// 委託傳入ParallelLoopState,用來控制循環執行
    Console.WriteLine(i + 1);
    Thread.Sleep(100);
    if (i == 30) // 此處設置循環中止的確切條件
    {
        loop.Break();
        //loop.Stop();
    }
});
Console.WriteLine($"{result.IsCompleted}-{result.LowestBreakIteration}");

值得一提的是,循環的Break()Stop()只能儘早地跳出或者中止循環,而不能當即中止。線程

取消循環操做

有時候,咱們須要在中途取消循環操做,但又不知道確切條件是什麼,好比用戶觸發的取消。這時候,能夠利用循環的ParallelOptions傳入一個CancellationToken,同時使用異常處理捕獲OperationCanceledException以進行取消後的處理。下面是一個簡單的例子。調試

/// <summary>
/// 取消通知者
/// </summary>
public static CancellationTokenSource CTSource { get; set; } = new CancellationTokenSource();

/// <summary>
/// 取消並行循環
/// </summary>
public static void CancelParallelLoop()
{
    Task.Factory.StartNew(() =>
    {
        try
        {
            Parallel.ForEach(Enumerable.Range(0, 100), new ParallelOptions { CancellationToken = CTSource.Token },
                i =>
                {
                    Console.WriteLine(i + 1);
                    Thread.Sleep(1000);
                });
        }
        catch (OperationCanceledException oce)
        {
            Console.WriteLine(oce.Message);
        }
    });
}
static void Main(string[] args)
{
    ParallelDemo.CancelParallelLoop();
    Thread.Sleep(3000);
    ParallelDemo.CTSource.Cancel();

    Console.ReadKey();
}

循環異常收集

並行循環執行過程當中,能夠捕獲並收集迭代操做引起的異常,循環結束時拋出一個AggregateException異常,並將收集到的異常賦給它的內部異常集合InnerExceptions。外部使用時,捕獲AggregateException,便可進行並行循環的異常處理。

下面的例子模擬了並行循環的異常拋出、收集及處理的過程。

/// <summary>
/// 捕獲循環異常
/// </summary>
public static void CaptureTheLoopExceptions()
{
    ConcurrentQueue<Exception> exceptions = new ConcurrentQueue<Exception>();
    Parallel.ForEach(Enumerable.Range(0, 100), i =>
    {
        try
        {
            if (i % 10 == 0)
            {//模擬拋出異常
                throw new Exception($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] had thrown a exception. [{i}]");
            }
            Console.WriteLine(i + 1);
            Thread.Sleep(100);
        }
        catch (Exception ex)
        {//捕獲並收集異常
            exceptions.Enqueue(ex);
        }
    });

    if (!exceptions.IsEmpty)
    {// 方法內部可直接進行異常處理,若需外部處理,將收集到的循環異常拋出
        throw new AggregateException(exceptions);
    }
}

外部處理方式

try
{
    ParallelDemo.CaptureTheLoopExceptions();
}
catch (AggregateException aex)
{
    foreach (Exception ex in aex.InnerExceptions)
    {// 模擬異常處理
        Console.WriteLine(ex.Message);
    }
}

分區並行處理

當循環操做很簡單,迭代範圍很大的時候,ParallelLoop提供一種分區的方式來優化循環性能。下面的例子展現了分區循環的使用,同時也能比較幾種循環方式的執行效率。

/// <summary>
/// 分區並行處理,順便比較各類循環的效率
/// </summary>
/// <param name="rangeSize">迭代範圍</param>
/// <param name="opDuration">操做耗時</param>
public static void PartationParallelLoop(int rangeSize = 10000, int opDuration = 1)
{
    //PartationParallelLoopWithBuffer
    Stopwatch watch0 = Stopwatch.StartNew();
    Parallel.ForEach(Partitioner.Create(Enumerable.Range(0, rangeSize), EnumerablePartitionerOptions.None),
        i =>
        {//模擬操做
            Console.WriteLine($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] was running. [{i}]");
            Thread.Sleep(opDuration);
        });
    watch0.Stop();

    //PartationParallelLoopWithoutBuffer
    Stopwatch watch1 = Stopwatch.StartNew();
    Parallel.ForEach(Partitioner.Create(Enumerable.Range(0, rangeSize),EnumerablePartitionerOptions.NoBuffering),
        i =>
        {//模擬操做
            Console.WriteLine($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] was running. [{i}]");
            Thread.Sleep(opDuration);
        });
    watch1.Stop();

    //NormalParallelLoop
    Stopwatch watch2 = Stopwatch.StartNew();
    Parallel.ForEach(Enumerable.Range(0, rangeSize),
        i =>
        {//模擬操做
            Console.WriteLine($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] was running. [{i}]");
            Thread.Sleep(opDuration);
        });
    watch2.Stop();

    //NormalLoop
    Stopwatch watch3 = Stopwatch.StartNew();
    foreach (int i in Enumerable.Range(0, rangeSize))
    {//模擬操做
        Console.WriteLine($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] was running. [{i}]");
        Thread.Sleep(opDuration);
    }
    watch2.Stop();
            
    Console.WriteLine();
    Console.WriteLine($"PartationParallelLoopWithBuffer    => {watch0.ElapsedMilliseconds}ms");
    Console.WriteLine($"PartationParallelLoopWithoutBuffer => {watch1.ElapsedMilliseconds}ms");
    Console.WriteLine($"NormalParallelLoop                 => {watch2.ElapsedMilliseconds}ms");
    Console.WriteLine($"NormalLoop                         => {watch3.ElapsedMilliseconds}ms");
}

在 I7-7700HQ + 16GB 配置 VS調試模式下獲得下面一組測試結果。

Loop Condition PartationParallelLoop WithBuffer PartationParallelLoop WithoutBuffer Normal ParallelLoop Normal Loop
10000,1 10527 11799 11155 19434
10000,1 9513 11442 11048 19354
10000,1 9871 11391 14782 19154
100,1000 9107 5951 5081 100363
100,1000 9086 5974 5187 100162
100,1000 9208 5125 5255 100239
100,1 350 439 243 200
100,1 390 227 166 198
100,1 466 225 84 197

應該根據不一樣的應用場景選擇合適的循環策略,具體如何選擇,朋友們可自行體會~

相關文章
相關標籤/搜索