【Parallel】.Net 並行執行程序的使用心得

1、摘要

官方介紹:提供對並行循環和區域的支持。算法

命名空間:using System.Threading.Tasks併發

三個靜態方法:Parallel.Invoke,Parallel.For,Parallel.ForEach函數

經常使用到的參數類型:ParallelLoopResult,ParallelOptions,ParallelLoopStateoop

2、參數

咱們先來介紹參數,明白了參數的做用,在選擇和調用三個靜態方法及其重載,就遊刃有餘了。spa

一、ParallelLoopResult:提供執行 Parallel 循環的完成狀態

屬性:pwa

1)public bool IsCompleted { get; } 線程

   若是該循環已運行完成(該循環的全部迭代均已執行,而且該循環沒有收到提早結束的請求),則爲 true;不然爲 false。code

2)public long? LowestBreakIteration { get; } blog

  返回一個表示從中調用 Break 語句的最低迭代的整數繼承

用途:判斷當並行循環結束時,是否因調用了break方法或stop方法而提早退出並行循環,或全部迭代均已執行。

判斷依據:

條件 結果
IsCompleted 運行完成

!IsCompleted &&

LowestBreakIteration==null

使用了Stop語句而提早終止

!IsCompleted &&

LowestBreakIteration!=null

 使用了Break語句而提早終止

 

 

 

 

 

 

 

 

 

 

二、ParallelLoopState:可用來使 Parallel 循環的迭代與其餘迭代交互。此類的實例由 Parallel 類提供給每一個循環;不能在您的用戶代碼中建立實例。

屬性:

1)public bool ShouldExitCurrentIteration { get; }

  獲取循環的當前迭代是否應基於此迭代或其餘迭代發出的請求退出。若是當前迭代應退出,則爲 true;不然爲 false。

2) public bool IsStopped { get; } 

  獲取循環的任何迭代是否已調用 System.Threading.Tasks.ParallelLoopState.Stop。若是任何迭代已中止循環,則爲 true;不然爲 false。

3) public bool IsExceptional { get; }

  獲取循環的任何迭代是否已引起相應迭代未處理的異常。若是引起了未經處理的異常,則爲 true;不然爲 false。

4) public long? LowestBreakIteration { get; }

  獲取從中調用 System.Threading.Tasks.ParallelLoopState.Break 的最低循環迭代。一個表示從中調用 Break 的最低迭代的整數。

方法:(在下面方法介紹時,有進一步的介紹)

1)Break():通知並行循環在執行完當前迭代以後儘快中止執行,可確保低索引步驟完成。且可確保正在執行的迭代繼續運行直到完成。

2)Stop():通知並行循環儘快中止執行。對於還沒有運行的迭代不能會嘗試執行低索引迭代。不保證全部已運行的迭代都執行完。

用途:提前退出並行循環。

說明:

1)不能同時在同一個並行循環中同時使用Break和Stop。

2)Stop比Break更經常使用。break語句用在並行循環中的效果和用在串行循環中不一樣。Break用在並行循環中,委託的主體方法在每次迭代的時候被調用,退出委託的主體方法對並行循環的執行沒有影響。Stop中止循環比Break快。

三、ParallelOptions:存儲用於配置  Parallel 類的方法的操做的選項。

屬性

1)public CancellationToken CancellationToken { get; set; }

  獲取或設置傳播有關應取消操做的通知。

2)public int MaxDegreeOfParallelism { get; set; }

  獲取或設置此 ParallelOptions 實例所容許的最大並行度。

3)public TaskScheduler TaskScheduler { get; set; } [沒用過,不知道功效]

  獲取或設置與此 System.Threading.Tasks.ParallelOptions 實例關聯的 System.Threading.Tasks.TaskScheduler

說明:

1)經過設置CancellationToken來取消並行循環,當前正在運行的迭代會執行完,而後拋出System.OperationCanceledException類型的異常。

2)TPL的方法老是會試圖利用全部可用內核以達到最好的效果,可是極可能.NET Framework內部使用的啓發式算法所獲得的注入和使用的線程數比實際須要的多(一般都會高於硬件線程數,這樣會更好地支持CPU和I/O混合型的工做負載)。

   一般將最大並行度設置爲小於等於邏輯內核數。若是設置爲等於邏輯內核數,那麼要確保不會影響其餘程序的執行。設置爲小於邏輯內核數是爲了有空閒內核來處理其餘緊急的任務。

用途:

1)從循環外部取消並行循環

2)指定並行度

 3、方法介紹

一、Parallel.Invoke

1)public static void Invoke(params Action[] actions);儘量並行執行提供的每一個操做。

    public class Test
    {
        private void Action()
        {
            Thread.Sleep(1000);
            Console.WriteLine("Action :ThreadID-{0}", Thread.CurrentThread.ManagedThreadId);
        }
        private void Action1()
        {
            Thread.Sleep(2000);
            Console.WriteLine("Action1:ThreadID-{0}", Thread.CurrentThread.ManagedThreadId);
        }
        public void Parallel_Invoke()
        {
            Console.WriteLine("開始:********");
            Stopwatch stopwatch = new Stopwatch();
            stopwatch.Start();
            try
            {
                Parallel.Invoke(Action, Action1);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
            stopwatch.Stop();
            Console.WriteLine("結束:***{0}***", stopwatch.ElapsedMilliseconds);
        }
    }

運行結果:

注:Action()休眠1s,Action1()休眠2s,執行總耗時爲2043ms,能夠看作 nvoke方法只有在actions所有執行完纔會返回,而且耗時取決於最大耗時的方法。

2)public static void Invoke(ParallelOptions parallelOptions, params Action[] actions);執行所提供的每一個操做,並且儘量並行運行,除非用戶取消了操做。

    public class Test
    {
        ParallelOptions parallelOptions = new ParallelOptions();
        private void Action()
        {
            Thread.Sleep(1000);
            //標記取消並行操做
            parallelOptions.CancellationToken = new CancellationToken(true);
            Console.WriteLine("Action :ThreadID-{0}", Thread.CurrentThread.ManagedThreadId);
        }
        private void Action1()
        {
            Thread.Sleep(2000);
            Console.WriteLine("Action1:ThreadID-{0}", Thread.CurrentThread.ManagedThreadId);
        }
        public void Parallel_Invoke()
        {
            Console.WriteLine("開始:********");
            Stopwatch stopwatch = new Stopwatch();
            stopwatch.Start();
            try
            {
                Parallel.Invoke(parallelOptions, Action1, Action, Action1, Action1, Action1);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
            stopwatch.Stop();
            Console.WriteLine("結束:***{0}***", stopwatch.ElapsedMilliseconds);
        }
    }

說明:

1)Invoke方法只有在actions所有執行完纔會返回。

2)不能保證actions中的全部操做同時執行。好比actions大小爲4,但硬件線程數爲2,那麼同時運行的操做數最多爲2。

3)actions中的操做並行的運行且與順序無關,若編寫與運行順序有關的併發代碼,應選擇其餘方法。

4)若是使用Invoke加載多個操做,多個操做運行時間迥異,總的運行時間以消耗時間最長操做爲基準,這會致使不少邏輯內核長時間處於空閒狀態。

 二、Parallel.For

 1)public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int> body); 

    public class Test
    {
        private void Action(int i)
        {
            Console.WriteLine("Action :ThreadID-{0}|i={1}", Thread.CurrentThread.ManagedThreadId, i);
        }
        public void Parallel_For()
        {
            Console.WriteLine("開始:********");
            Stopwatch stopwatch = new Stopwatch();
            stopwatch.Start();
            try
            {
                //fromInclusive: 開始索引(含)。
                //toExclusive: 結束索引(不含)。
                //body: 將爲每一個迭代調用一次的委託。
                Parallel.For(0, 5, Action);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
            stopwatch.Stop();
            Console.WriteLine("結束:***{0}***", stopwatch.ElapsedMilliseconds);
        }
    }

運行結果:

注:能夠看出,方法並非順序執行

2)public static ParallelLoopResult For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int, ParallelLoopState> body);

使用ParallelLoopState.Break() 退出迭代:

    public class Test
    {
        ParallelOptions parallelOptions = new ParallelOptions();

        private void Action(int i, ParallelLoopState parallelLoopState)
        {
            //當執行到 索引等於5時,咱們調用Break()
            if (i > 5)
            {
                parallelLoopState.Break();
            }
            Console.WriteLine("Action :ThreadID-{0}|i={1}", Thread.CurrentThread.ManagedThreadId, i);
        }
        public void Parallel_For()
        {
            Console.WriteLine("開始:********");
            Stopwatch stopwatch = new Stopwatch();
            stopwatch.Start();
            try
            {
                //設置最大並行數爲3
                parallelOptions.MaxDegreeOfParallelism = 3;
                Parallel.For(0, 10, parallelOptions, Action);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
            stopwatch.Stop();
            Console.WriteLine("結束:***{0}***", stopwatch.ElapsedMilliseconds);
        }
    }

 運行結果:

使用ParallelLoopState.Stop() 退出迭代:

    public class Test
    {
        ParallelOptions parallelOptions = new ParallelOptions();

        private void Action(int i, ParallelLoopState parallelLoopState)
        {
            //當執行到 索引等於5時,咱們調用Stop()
            if (i > 5)
            {
                parallelLoopState.Stop();
            }
            Console.WriteLine("Action :ThreadID-{0}|i={1}", Thread.CurrentThread.ManagedThreadId, i);
        }
        public void Parallel_For()
        {
            Console.WriteLine("開始:********");
            Stopwatch stopwatch = new Stopwatch();
            stopwatch.Start();
            try
            {
                //設置最大並行數爲3
                parallelOptions.MaxDegreeOfParallelism = 3;
                Parallel.For(0, 10, parallelOptions, Action);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
            stopwatch.Stop();
            Console.WriteLine("結束:***{0}***", stopwatch.ElapsedMilliseconds);
        }
    }

 

運行結果:

注:當使用Break()退出迭代時,程序保證了索引小於5的方法都執行完成,便可確保低索引步驟完成;當使用Stop()退出迭代時,程序執行到索引爲5時,就當即退出了(正在進行的迭代方法,會執行完成),不確保低索引執行完成。

3)public static ParallelLoopResult For<TLocal>(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<int, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally);

    public class Test
    {
        ParallelOptions parallelOptions = new ParallelOptions();

        private void Action(int i, ParallelLoopState parallelLoopState)
        {
            //當執行到 索引等於5時,咱們調用Stop()
            if (i > 5)
            {
                parallelLoopState.Stop();
            }
            Console.WriteLine("Action :ThreadID-{0}|i={1}", Thread.CurrentThread.ManagedThreadId, i);
        }
        private string LocalInit()
        {
            var init = "go";
            Console.WriteLine("LocalInit:ThreadID-{0}|init={1}", Thread.CurrentThread.ManagedThreadId, init);
            return init;
        }
        private void LocalFinally(string x)
        {
            Console.WriteLine("LocalFinally:ThreadID-{0}|result={1}", Thread.CurrentThread.ManagedThreadId, x);
        }
        private string Body(int i, ParallelLoopState parallelLoopState, string x)
        {
            x = x + "_" + i;
            Console.WriteLine("Body:ThreadID-{0}|i={1}|x={2}", Thread.CurrentThread.ManagedThreadId, i, x);
            return x;
        }
        public void Parallel_For()
        {
            Console.WriteLine("開始:********");
            Stopwatch stopwatch = new Stopwatch();
            stopwatch.Start();
            try
            {
                //設置最大並行數爲3
                parallelOptions.MaxDegreeOfParallelism = 3;
                //LocalInit: 用於返回每一個任務的本地數據的初始狀態的函數委託。
                //Body: 將爲每一個迭代調用一次的委託。
                //LocalFinally: 用於對每一個任務的本地狀態執行一個最終操做的委託。
                //<TLocal>: 線程本地數據的類型。
                Parallel.For(0, 5, parallelOptions, LocalInit, Body, LocalFinally);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
            stopwatch.Stop();
            Console.WriteLine("結束:***{0}***", stopwatch.ElapsedMilliseconds);
        }
    }

運行結果:

注:LocalInit()執行了3次,Body()執行了5次,LocalFinally()執行了3次(這只是執行的一種狀況),能夠看出在同一線程中,init參數是共享傳遞的,即LocalInit()=>Body()..n次迭代..Body()=>LoaclFinally()

* localInit只是在每一個 Task/Thread 開始參與到對集合元素的處理時執行一次, 【而不是針對每一個集合元素都執行一次】相似的, localFinally只有在 Task/Thread 完成全部分配給它的任務以後,才被執行一次。
 CLR會爲每一個 Task/Thread 維護一個thread - local storage,能夠理解爲 Task/Thread 在整個執行過程當中的狀態。 當一個 Task/Thread 參與到執行中時,localInit中返回的TLocal類型值會被做爲這個狀態的初始值,隨着body的執行,
 這個狀態值會被改變,而body的返回類型也是TLocal,意味着每一次body執行結束,會把最新的TLocal值返回給CLR, 而CLR會把這個值設置到 Task/Thread 的thread - local storage上去,從而實現 Task/Thread 狀態的更新。
 最後,localFinally能夠返回這個狀態值,做爲 Task/Thread 完成它所負責的全部處理任務後的最終結果。

說明:

1)不支持浮點。

2)沒法保證迭代的執行順序。

3)若是fromInclusive大於或等於toExclusive,方法當即返回而不會執行任何迭代。

4)對於body參數中含有的ParallelLoopState實例,其做用爲提前中斷並行循環。

5)只有在迭代所有完成之後纔會返回結果,不然循環將一直阻塞。

三、Parallel.ForEach 

1)public static ParallelLoopResult ForEach(IEnumerable<TSource> source, Action<TSource> body);

2)public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Action<TSource, ParallelLoopState> body);

3)public static ParallelLoopResult ForEach<TSource>(Partitioner<TSource> source, Action<TSource> body);

用法及其重載和Parallel.For不盡相同(把fromInclusive-toExclusive 換成你想遍歷的集合List),就不在這裏贅述了(關鍵是如今好餓啊...要去吃飯了)

4、異常處理

1)異常優先於從循環外部取消和使用Break()方法或Stop()方法提早退出並行循環。

2)並行循環體拋出一個未處理的異常,並行循環就不能再開始新的迭代。

3)默認狀況下當某次迭代拋出一個未處理異常,那麼正在執行的迭代若是沒拋出異常,正在執行的迭代會執行完。
***當全部迭代都執行完(有可能其餘的迭代在執行的過程當中也拋出異常),並行循環將在調用它的線程中拋出異常。
***並行循環運行的過程當中,可能有多個迭代拋出異常,因此通常使用AggregateException來捕獲異常。AggregateException繼承自Exception。
***爲了防止僅使用AggregateException未能捕獲某些異常,使用AggregateException的同時還要使用Exception。

異常捕獲:

try
{
    //Do something
}
catch(AggregateException e)
{
    Foreach(Exception ex in e.InnerExceptions)
    {
        //Do something
    }
}
catch(Exception e)
{
    //Do something
}

5、總結

1.Parallel執行方法組,不是順序的,和方法位置前後,索引大小無關;

2.迭代所有完成之後纔會返回結果(前提沒有Break、Stop、Exception),不然循環將一直阻塞;

3.整體耗時通常取決於耗時最長的方法;

4.某一方法出現異常,程序將中止新的迭代,當前正在進行的方法,會繼續執行完成,後拋出異常。

5.能夠去吃一碗牛肉麪了...

相關文章
相關標籤/搜索