C#並行編程-Task

原文: C#並行編程-Task

菜鳥學習並行編程,參考《C#並行編程高級教程.PDF》,若有錯誤,歡迎指正。html

任務簡介編程

TPL引入新的基於任務的編程模型,經過這種編程模型能夠發揮多核的功效,提高應用程序的性能,不須要編寫底層複雜且重量級的線程代碼。負載均衡

但須要注意:任務並非線程(任務運行的時候須要使用線程,但並非說任務取代了線程,任務代碼是使用底層的線程(軟件線程,調度在特定的硬件線程或邏輯內核上)運行的,任務與線程之間並無一對一的關係。)異步

建立一個新的任務時,調度器(調度器依賴於底層的線程池引擎)會使用工做竊取隊列找到一個最合適的線程,而後將任務加入隊列,任務所包含的代碼會在一個線程中運行。如圖:ide

 

System.Threading.Tasks.Taskpost

一個Task表示一個異步操做,Task提供了不少方法和屬性,經過這些方法和屬性可以對Task的執行進行控制,而且可以得到其狀態信息。性能

Task的建立和執行都是獨立的,所以能夠對關聯操做的執行擁有徹底的控制權。學習

使用Parallel.For、Parallel.ForEach的循環迭代的並行執行,TPL會在後臺建立System.Threading.Tasks.Task的實例。編碼

使用Parallel.Invoke時,TPL也會建立與調用的委託數目一致的System.Threading.Tasks.Task的實例。url

 

注意項

程序中添加不少異步的操做做爲Task實例加載的時候,爲了充分利用運行時全部可用的邏輯內核,任務調度器會嘗試的並行的運行這些任務,也會嘗試在全部的可用內核上對工做進行負載均衡。

但在實際的編碼過程中,並非全部的代碼片斷都可以方便的用任務來運行,由於任務會帶來額外的開銷,儘管這種開銷比添加線程所帶來的開銷要小,可是仍然須要將這個開銷考慮在內。

Task狀態與生命週期

一個Task實例只會完成其生命週期一次,當Task到達它的3種肯呢過的最終狀態之一是,就沒法回到以前的任何狀態

下面貼代碼,詳解見註釋,方便你們理解Task的狀態:

    class Program
    {
        /*  coder:釋迦苦僧    */
        static void Main(string[] args)
        {
            /*  建立一個任務 不調用 不執行  狀態爲Created */
            Task tk = new Task(() =>
            {
            });
            Console.WriteLine(tk.Status.ToString());

            /*  建立一個任務 執行  狀態爲 WaitingToRun */
            Task tk1 = new Task(() =>
            {
            });
            tk1.Start();/*對於安排好的任務,就算調用Start方法也不會立馬啓動 此時任務的狀態爲WaitingToRun*/
            Console.WriteLine(tk1.Status.ToString());

            /*  建立一個主任務 */
            Task mainTask = new Task(() =>
            {
                SpinWait.SpinUntil(() =>
                {
                    return false;
                }, 30000);
            });
            /*  將子任務加入到主任務完成以後執行 */
            Task subTask = mainTask.ContinueWith((t1) =>
            {
            });
            /*  啓動主任務 */
            mainTask.Start();
            /*  此時子任務狀態爲 WaitingForActivation */
            Console.WriteLine(subTask.Status.ToString());


            /*  建立一個任務 執行 後 等待一段時間 並行未結束的狀況下 狀態爲 Running */
            Task tk2 = new Task(() =>
            {
                SpinWait.SpinUntil(() => false, 30000);
            });
            tk2.Start();/*對於安排好的任務,就算調用Start方法也不會立馬啓動*/
            SpinWait.SpinUntil(() => false, 300);
            Console.WriteLine(tk2.Status.ToString());


            /*  建立一個任務 而後取消該任務 狀態爲Canceled */
            CancellationTokenSource cts = new CancellationTokenSource();
            Task tk3 = new Task(() =>
            {
                for (int i = 0; i < int.MaxValue; i++)
                {
                    if (!cts.Token.IsCancellationRequested)
                    {
                        cts.Token.ThrowIfCancellationRequested();
                    }
                }
            }, cts.Token);
            tk3.Start();/*啓動任務*/
            SpinWait.SpinUntil(() => false, 100);
            cts.Cancel();/*取消該任務執行 但並不是立馬取消 因此對於Canceled狀態也不會立馬生效*/
            SpinWait.SpinUntil(() => false, 1000);
            Console.WriteLine(tk3.Status.ToString() + " " + tk3.IsCanceled);
            SpinWait.SpinUntil(() => false, 1000);
            Console.WriteLine(tk3.Status.ToString() + " " + tk3.IsCanceled);
            SpinWait.SpinUntil(() => false, 1000);
            Console.WriteLine(tk3.Status.ToString() + " " + tk3.IsCanceled);

            /*建立一個任務 讓它成功的運行完成 會獲得 RanToCompletion 狀態*/
            Task tk4 = new Task(() =>
            {
                SpinWait.SpinUntil(() => false, 10);
            });
            tk4.Start();
            SpinWait.SpinUntil(() => false, 300);
            Console.WriteLine(tk4.Status.ToString());

            /*建立一個任務 讓它運行失敗 會獲得 Faulted 狀態*/
            Task tk5 = new Task(() =>
            {
                throw new Exception();
            });
            tk5.Start();
            SpinWait.SpinUntil(() => false, 300);
            Console.WriteLine(tk5.Status.ToString());

            Console.ReadLine();
        }
    }

    class Product
    {
        public string Name { get; set; }
        public string Category { get; set; }
        public int SellPrice { get; set; }
    }
View Code

 

使用任務來對代碼進行並行化

使用Parallel.Invoke能夠並行加載多個方法,使用Task實例也能完成一樣的工做,下面貼代碼:

    class Program
    {
        private static ConcurrentQueue<Product> queue = null;
        /*  coder:釋迦苦僧    */
        static void Main(string[] args)
        {
            queue = new ConcurrentQueue<Product>();
            Task tk1 = new Task(() => { SetProduct(1); SetProduct(3);});
            Task tk2 = new Task(() => SetProduct(2));
            tk1.Start();
            tk2.Start();
          
          
            Console.ReadLine();
        }
        static void SetProduct(int index)
        {
            Parallel.For(0, 10000, (i) =>
            {
                Product model = new Product();
                model.Name = "Name" + i;
                model.SellPrice = i;
                model.Category = "Category" + i;
                queue.Enqueue(model);
            });
            Console.WriteLine("SetProduct {0} 執行完成", index);
        }
    } 
    class Product
    {
        public string Name { get; set; }
        public string Category { get; set; }
        public int SellPrice { get; set; }
    }
View Code

等待任務完成Task.WaitAll
Task.WaitAll 方法,這個方法是同步執行的,在Task做爲參數被接受,全部Task結束其執行前,主線程不會繼續執行下一條指令,下面貼代碼

    class Program
    {
        private static ConcurrentQueue<Product> queue = null;
        /*  coder:釋迦苦僧    */
        static void Main(string[] args)
        {
            queue = new ConcurrentQueue<Product>();
            Task tk1 = new Task(() => { SetProduct(1); SetProduct(3); });
            Task tk2 = new Task(() => SetProduct(2));
            tk1.Start();
            tk2.Start();
            /*等待任務執行完成後再輸出 ====== */
            Task.WaitAll(tk1, tk2);
            Console.WriteLine("等待任務執行完成後再輸出 ======");

            Task tk3 = new Task(() => { SetProduct(1); SetProduct(3); });
            Task tk4 = new Task(() => SetProduct(2));
            tk3.Start();
            tk4.Start();
            /*等待任務執行前輸出 ====== */
            Console.WriteLine("等待任務執行前輸出 ======");
            Task.WaitAll(tk3, tk4);


            Console.ReadLine();
        }
        static void SetProduct(int index)
        {
            Parallel.For(0, 10000, (i) =>
            {
                Product model = new Product();
                model.Name = "Name" + i;
                model.SellPrice = i;
                model.Category = "Category" + i;
                queue.Enqueue(model);
            });
            Console.WriteLine("SetProduct {0} 執行完成", index);
        }
    }
View Code

Task.WaitAll 限定等待時長

            queue = new ConcurrentQueue<Product>(); Task tk1 = new Task(() => { SetProduct(1); SetProduct(3);}); Task tk2 = new Task(() => SetProduct(2)); tk1.Start(); tk2.Start(); /*若是tk1 tk2 沒能在10毫秒內完成 則輸出 ***** */ if (!Task.WaitAll(new Task[] { tk1, tk2 }, 10)) { Console.WriteLine("******"); } Console.ReadLine();
View Code

如圖10毫秒沒有完成任務,則輸出了****

經過取消標記取消任務

經過取消標記來中斷Task實例的執行。 CancellationTokenSource,CancellationToken下的IsCanceled屬性標誌當前是否已經被取消,取消任務,任務也不必定會立刻取消,下面貼代碼:

    class Program
    {
        private static ConcurrentQueue<Product> queue = null;
        /*  coder:釋迦苦僧    */
        static void Main(string[] args)
        {
            queue = new ConcurrentQueue<Product>();
            System.Threading.CancellationTokenSource token = new CancellationTokenSource();
            Task tk1 = Task.Factory.StartNew(() => SetProduct(token.Token));
            Task tk2 = Task.Factory.StartNew(() => SetProduct(token.Token));
            Thread.Sleep(10);
            /*取消任務操做*/
            token.Cancel();
            try
            {
                /*等待完成*/
                Task.WaitAll(new Task[] { tk1, tk2 });
            }
            catch (AggregateException ex)
            {
                /*若是當前的任務正在被取消,那麼還會拋出一個TaskCanceledException異常,這個異常包含在AggregateException異常中*/
                Console.WriteLine("tk1 Canceled:{0}", tk1.IsCanceled);
                Console.WriteLine("tk1 Canceled:{0}", tk2.IsCanceled);
            }

            Thread.Sleep(2000);
            Console.WriteLine("tk1 Canceled:{0}", tk1.IsCanceled);
            Console.WriteLine("tk1 Canceled:{0}", tk2.IsCanceled);
            Console.ReadLine();
        }
        static void SetProduct(System.Threading.CancellationToken ct)
        {
            /* 每一次循環迭代,都會有新的代碼調用 ThrowIfCancellationRequested 
             * 這行代碼可以對 OpreationCanceledException 異常進行觀察
             * 而且這個異常的標記與Task實例關聯的那個標記進行比較,若是二者相同 ,並且IsCancelled屬性爲True,那麼Task實例就知道存在一個要求取消的請求,而且會將狀態轉變爲Canceled狀態,中斷任務執行。  
             * 若是當前的任務正在被取消,那麼還會拋出一個TaskCanceledException異常,這個異常包含在AggregateException異常中
            /*檢查取消標記*/
            ct.ThrowIfCancellationRequested();
            for (int i = 0; i < 50000; i++)
            {
                Product model = new Product();
                model.Name = "Name" + i;
                model.SellPrice = i;
                model.Category = "Category" + i;
                queue.Enqueue(model);
            
                ct.ThrowIfCancellationRequested();
            }
            Console.WriteLine("SetProduct   執行完成");
        }
    }
    class Product
    {
        public string Name { get; set; }
        public string Category { get; set; }
        public int SellPrice { get; set; }
    }
View Code

Task異常處理 當不少任務並行運行的時候,可能會並行發生不少異常。Task實例可以處理一組一組的異常,這些異常有System.AggregateException類處理

    class Program
    {
        private static ConcurrentQueue<Product> queue = null;
        /*  coder:釋迦苦僧    */
        static void Main(string[] args)
        {
            queue = new ConcurrentQueue<Product>();
            System.Threading.CancellationTokenSource token = new CancellationTokenSource();
            Task tk1 = Task.Factory.StartNew(() => SetProduct(token.Token));
            Thread.Sleep(2000);
            if (tk1.IsFaulted)
            {
                /*  循環輸出異常    */
                foreach (Exception ex in tk1.Exception.InnerExceptions)
                {
                    Console.WriteLine("tk1 Exception:{0}", ex.Message);
                }
            }
            Console.ReadLine();
        }

        static void SetProduct(System.Threading.CancellationToken ct)
        {
            for (int i = 0; i < 5; i++)
            {
                throw new Exception(string.Format("Exception Index {0}", i));
            }
            Console.WriteLine("SetProduct   執行完成");
        }
    }
View Code

Task返回值  Task<TResult>

    class Program
    {
        /*  coder:釋迦苦僧    */
        static void Main(string[] args)
        {
            Task<List<Product>> tk1 = Task<List<Product>>.Factory.StartNew(() => SetProduct());
            Task.WaitAll(tk1);
            Console.WriteLine(tk1.Result.Count);
            Console.WriteLine(tk1.Result[0].Name);
            Console.ReadLine();
        }
        static List<Product> SetProduct()
        {
            List<Product> result = new List<Product>();
            for (int i = 0; i < 500; i++)
            {
                Product model = new Product();
                model.Name = "Name" + i;
                model.SellPrice = i;
                model.Category = "Category" + i;
                result.Add(model);
            }
            Console.WriteLine("SetProduct   執行完成");
            return result;
        }
    }
View Code

經過延續串聯多個任務

ContinueWith:建立一個目標Task完成時,異步執行的延續程序,await,如代碼所示:

    class Program
    {
        /*  coder:釋迦苦僧    */
        static void Main(string[] args)
        {
            /*建立任務t1*/
            Task t1 = Task.Factory.StartNew(() =>
            {
                Console.WriteLine("執行 t1 任務");
                SpinWait.SpinUntil(() =>
                {
                    return false;
                }, 2000);

            });
            /*建立任務t2   t2任務的執行 依賴與t1任務的執行完成*/
            Task t2 = t1.ContinueWith((t) =>
            {
                Console.WriteLine("執行 t2 任務"); 
                SpinWait.SpinUntil(() =>
                {
                    return false;
                }, 2000);

            });    
            /*建立任務t3   t3任務的執行 依賴與t2任務的執行完成*/
            Task t3 = t2.ContinueWith((t) =>
            {
                Console.WriteLine("執行 t3 任務");
            });
            Console.ReadLine();
        }
    }
View Code

TaskContinuationOptions

TaskContinuationOptions參數,能夠控制延續另外一個任的任務調度和執行的可選行爲。下面看代碼:

    class Program
    {
        /*  coder:釋迦苦僧    */
        static void Main(string[] args)
        {
            /*建立任務t1*/
            Task t1 = Task.Factory.StartNew(() =>
            {
                Console.WriteLine("執行 t1 任務");
                SpinWait.SpinUntil(() =>
                {
                    return false;
                }, 2000);
                throw new Exception("異常");
            });

            /*建立任務t2   t2任務的執行 依賴與t1任務的執行完成*/
            Task t2 = t1.ContinueWith((t) =>
            {
                Console.WriteLine(t.Status);
                Console.WriteLine("執行 t2 任務");
                SpinWait.SpinUntil(() =>
                {
                    return false;
                }, 2000);

                /*定義 TaskContinuationOptions 行爲爲 NotOnFaulted 在 t1 任務拋出異常後,t1 的任務狀態爲 Faulted , 則t2 不會執行裏面的方法 可是須要注意的是t3任務*/
                /*t2在不符合條件時 返回Canceled狀態狀態讓t3任務執行*/
            }, TaskContinuationOptions.NotOnFaulted);
            /*建立任務t3   t3任務的執行 依賴與t2任務的執行完成*/

            /*t2在不符合條件時 返回Canceled狀態狀態讓t3任務執行*/
            Task t3 = t2.ContinueWith((t) =>
            {
                Console.WriteLine(t.Status);
                Console.WriteLine("執行 t3 任務");
            });

            Console.ReadLine();
        }
    }
View Code

TaskContinuationOptions 屬性有不少,以下所示

 關於並行編程中的Task就寫到這,若有問題,請指正。


做者:釋迦苦僧 出處:http://www.cnblogs.com/woxpp/p/3928788.html本文版權歸做者和博客園共有,歡迎轉載,但未經做者贊成必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接。

相關文章
相關標籤/搜索