菜鳥學習並行編程,參考《C#並行編程高級教程.PDF》,若有錯誤,歡迎指正。html
任務簡介編程
TPL引入新的基於任務的編程模型,經過這種編程模型能夠發揮多核的功效,提高應用程序的性能,不須要編寫底層複雜且重量級的線程代碼。負載均衡
但須要注意:任務並非線程(任務運行的時候須要使用線程,但並非說任務取代了線程,任務代碼是使用底層的線程(軟件線程,調度在特定的硬件線程或邏輯內核上)運行的,任務與線程之間並無一對一的關係。)異步
建立一個新的任務時,調度器(調度器依賴於底層的線程池引擎)會使用工做竊取隊列找到一個最合適的線程,而後將任務加入隊列,任務所包含的代碼會在一個線程中運行。如圖:ide
System.Threading.Tasks.Taskpost
一個Task表示一個異步操做,Task提供了不少方法和屬性,經過這些方法和屬性可以對Task的執行進行控制,而且可以得到其狀態信息。性能
Task的建立和執行都是獨立的,所以能夠對關聯操做的執行擁有徹底的控制權。學習
使用Parallel.For、Parallel.ForEach的循環迭代的並行執行,TPL會在後臺建立System.Threading.Tasks.Task的實例。編碼
使用Parallel.Invoke時,TPL也會建立與調用的委託數目一致的System.Threading.Tasks.Task的實例。url
注意項
程序中添加不少異步的操做做爲Task實例加載的時候,爲了充分利用運行時全部可用的邏輯內核,任務調度器會嘗試的並行的運行這些任務,也會嘗試在全部的可用內核上對工做進行負載均衡。
但在實際的編碼過程中,並非全部的代碼片斷都可以方便的用任務來運行,由於任務會帶來額外的開銷,儘管這種開銷比添加線程所帶來的開銷要小,可是仍然須要將這個開銷考慮在內。
Task狀態與生命週期
一個Task實例只會完成其生命週期一次,當Task到達它的3種肯呢過的最終狀態之一是,就沒法回到以前的任何狀態
下面貼代碼,詳解見註釋,方便你們理解Task的狀態:
class Program { /* coder:釋迦苦僧 */ static void Main(string[] args) { /* 建立一個任務 不調用 不執行 狀態爲Created */ Task tk = new Task(() => { }); Console.WriteLine(tk.Status.ToString()); /* 建立一個任務 執行 狀態爲 WaitingToRun */ Task tk1 = new Task(() => { }); tk1.Start();/*對於安排好的任務,就算調用Start方法也不會立馬啓動 此時任務的狀態爲WaitingToRun*/ Console.WriteLine(tk1.Status.ToString()); /* 建立一個主任務 */ Task mainTask = new Task(() => { SpinWait.SpinUntil(() => { return false; }, 30000); }); /* 將子任務加入到主任務完成以後執行 */ Task subTask = mainTask.ContinueWith((t1) => { }); /* 啓動主任務 */ mainTask.Start(); /* 此時子任務狀態爲 WaitingForActivation */ Console.WriteLine(subTask.Status.ToString()); /* 建立一個任務 執行 後 等待一段時間 並行未結束的狀況下 狀態爲 Running */ Task tk2 = new Task(() => { SpinWait.SpinUntil(() => false, 30000); }); tk2.Start();/*對於安排好的任務,就算調用Start方法也不會立馬啓動*/ SpinWait.SpinUntil(() => false, 300); Console.WriteLine(tk2.Status.ToString()); /* 建立一個任務 而後取消該任務 狀態爲Canceled */ CancellationTokenSource cts = new CancellationTokenSource(); Task tk3 = new Task(() => { for (int i = 0; i < int.MaxValue; i++) { if (!cts.Token.IsCancellationRequested) { cts.Token.ThrowIfCancellationRequested(); } } }, cts.Token); tk3.Start();/*啓動任務*/ SpinWait.SpinUntil(() => false, 100); cts.Cancel();/*取消該任務執行 但並不是立馬取消 因此對於Canceled狀態也不會立馬生效*/ SpinWait.SpinUntil(() => false, 1000); Console.WriteLine(tk3.Status.ToString() + " " + tk3.IsCanceled); SpinWait.SpinUntil(() => false, 1000); Console.WriteLine(tk3.Status.ToString() + " " + tk3.IsCanceled); SpinWait.SpinUntil(() => false, 1000); Console.WriteLine(tk3.Status.ToString() + " " + tk3.IsCanceled); /*建立一個任務 讓它成功的運行完成 會獲得 RanToCompletion 狀態*/ Task tk4 = new Task(() => { SpinWait.SpinUntil(() => false, 10); }); tk4.Start(); SpinWait.SpinUntil(() => false, 300); Console.WriteLine(tk4.Status.ToString()); /*建立一個任務 讓它運行失敗 會獲得 Faulted 狀態*/ Task tk5 = new Task(() => { throw new Exception(); }); tk5.Start(); SpinWait.SpinUntil(() => false, 300); Console.WriteLine(tk5.Status.ToString()); Console.ReadLine(); } } class Product { public string Name { get; set; } public string Category { get; set; } public int SellPrice { get; set; } }
使用任務來對代碼進行並行化
使用Parallel.Invoke能夠並行加載多個方法,使用Task實例也能完成一樣的工做,下面貼代碼:
class Program { private static ConcurrentQueue<Product> queue = null; /* coder:釋迦苦僧 */ static void Main(string[] args) { queue = new ConcurrentQueue<Product>(); Task tk1 = new Task(() => { SetProduct(1); SetProduct(3);}); Task tk2 = new Task(() => SetProduct(2)); tk1.Start(); tk2.Start(); Console.ReadLine(); } static void SetProduct(int index) { Parallel.For(0, 10000, (i) => { Product model = new Product(); model.Name = "Name" + i; model.SellPrice = i; model.Category = "Category" + i; queue.Enqueue(model); }); Console.WriteLine("SetProduct {0} 執行完成", index); } } class Product { public string Name { get; set; } public string Category { get; set; } public int SellPrice { get; set; } }
等待任務完成Task.WaitAll
Task.WaitAll 方法,這個方法是同步執行的,在Task做爲參數被接受,全部Task結束其執行前,主線程不會繼續執行下一條指令,下面貼代碼
class Program { private static ConcurrentQueue<Product> queue = null; /* coder:釋迦苦僧 */ static void Main(string[] args) { queue = new ConcurrentQueue<Product>(); Task tk1 = new Task(() => { SetProduct(1); SetProduct(3); }); Task tk2 = new Task(() => SetProduct(2)); tk1.Start(); tk2.Start(); /*等待任務執行完成後再輸出 ====== */ Task.WaitAll(tk1, tk2); Console.WriteLine("等待任務執行完成後再輸出 ======"); Task tk3 = new Task(() => { SetProduct(1); SetProduct(3); }); Task tk4 = new Task(() => SetProduct(2)); tk3.Start(); tk4.Start(); /*等待任務執行前輸出 ====== */ Console.WriteLine("等待任務執行前輸出 ======"); Task.WaitAll(tk3, tk4); Console.ReadLine(); } static void SetProduct(int index) { Parallel.For(0, 10000, (i) => { Product model = new Product(); model.Name = "Name" + i; model.SellPrice = i; model.Category = "Category" + i; queue.Enqueue(model); }); Console.WriteLine("SetProduct {0} 執行完成", index); } }
Task.WaitAll 限定等待時長
queue = new ConcurrentQueue<Product>(); Task tk1 = new Task(() => { SetProduct(1); SetProduct(3);}); Task tk2 = new Task(() => SetProduct(2)); tk1.Start(); tk2.Start(); /*若是tk1 tk2 沒能在10毫秒內完成 則輸出 ***** */ if (!Task.WaitAll(new Task[] { tk1, tk2 }, 10)) { Console.WriteLine("******"); } Console.ReadLine();
如圖10毫秒沒有完成任務,則輸出了****
經過取消標記取消任務
經過取消標記來中斷Task實例的執行。 CancellationTokenSource,CancellationToken下的IsCanceled屬性標誌當前是否已經被取消,取消任務,任務也不必定會立刻取消,下面貼代碼:
class Program { private static ConcurrentQueue<Product> queue = null; /* coder:釋迦苦僧 */ static void Main(string[] args) { queue = new ConcurrentQueue<Product>(); System.Threading.CancellationTokenSource token = new CancellationTokenSource(); Task tk1 = Task.Factory.StartNew(() => SetProduct(token.Token)); Task tk2 = Task.Factory.StartNew(() => SetProduct(token.Token)); Thread.Sleep(10); /*取消任務操做*/ token.Cancel(); try { /*等待完成*/ Task.WaitAll(new Task[] { tk1, tk2 }); } catch (AggregateException ex) { /*若是當前的任務正在被取消,那麼還會拋出一個TaskCanceledException異常,這個異常包含在AggregateException異常中*/ Console.WriteLine("tk1 Canceled:{0}", tk1.IsCanceled); Console.WriteLine("tk1 Canceled:{0}", tk2.IsCanceled); } Thread.Sleep(2000); Console.WriteLine("tk1 Canceled:{0}", tk1.IsCanceled); Console.WriteLine("tk1 Canceled:{0}", tk2.IsCanceled); Console.ReadLine(); } static void SetProduct(System.Threading.CancellationToken ct) { /* 每一次循環迭代,都會有新的代碼調用 ThrowIfCancellationRequested * 這行代碼可以對 OpreationCanceledException 異常進行觀察 * 而且這個異常的標記與Task實例關聯的那個標記進行比較,若是二者相同 ,並且IsCancelled屬性爲True,那麼Task實例就知道存在一個要求取消的請求,而且會將狀態轉變爲Canceled狀態,中斷任務執行。 * 若是當前的任務正在被取消,那麼還會拋出一個TaskCanceledException異常,這個異常包含在AggregateException異常中 /*檢查取消標記*/ ct.ThrowIfCancellationRequested(); for (int i = 0; i < 50000; i++) { Product model = new Product(); model.Name = "Name" + i; model.SellPrice = i; model.Category = "Category" + i; queue.Enqueue(model); ct.ThrowIfCancellationRequested(); } Console.WriteLine("SetProduct 執行完成"); } } class Product { public string Name { get; set; } public string Category { get; set; } public int SellPrice { get; set; } }
Task異常處理 當不少任務並行運行的時候,可能會並行發生不少異常。Task實例可以處理一組一組的異常,這些異常有System.AggregateException類處理
class Program { private static ConcurrentQueue<Product> queue = null; /* coder:釋迦苦僧 */ static void Main(string[] args) { queue = new ConcurrentQueue<Product>(); System.Threading.CancellationTokenSource token = new CancellationTokenSource(); Task tk1 = Task.Factory.StartNew(() => SetProduct(token.Token)); Thread.Sleep(2000); if (tk1.IsFaulted) { /* 循環輸出異常 */ foreach (Exception ex in tk1.Exception.InnerExceptions) { Console.WriteLine("tk1 Exception:{0}", ex.Message); } } Console.ReadLine(); } static void SetProduct(System.Threading.CancellationToken ct) { for (int i = 0; i < 5; i++) { throw new Exception(string.Format("Exception Index {0}", i)); } Console.WriteLine("SetProduct 執行完成"); } }
Task返回值 Task<TResult>
class Program { /* coder:釋迦苦僧 */ static void Main(string[] args) { Task<List<Product>> tk1 = Task<List<Product>>.Factory.StartNew(() => SetProduct()); Task.WaitAll(tk1); Console.WriteLine(tk1.Result.Count); Console.WriteLine(tk1.Result[0].Name); Console.ReadLine(); } static List<Product> SetProduct() { List<Product> result = new List<Product>(); for (int i = 0; i < 500; i++) { Product model = new Product(); model.Name = "Name" + i; model.SellPrice = i; model.Category = "Category" + i; result.Add(model); } Console.WriteLine("SetProduct 執行完成"); return result; } }
經過延續串聯多個任務
ContinueWith:建立一個目標Task完成時,異步執行的延續程序,await,如代碼所示:
class Program { /* coder:釋迦苦僧 */ static void Main(string[] args) { /*建立任務t1*/ Task t1 = Task.Factory.StartNew(() => { Console.WriteLine("執行 t1 任務"); SpinWait.SpinUntil(() => { return false; }, 2000); }); /*建立任務t2 t2任務的執行 依賴與t1任務的執行完成*/ Task t2 = t1.ContinueWith((t) => { Console.WriteLine("執行 t2 任務"); SpinWait.SpinUntil(() => { return false; }, 2000); }); /*建立任務t3 t3任務的執行 依賴與t2任務的執行完成*/ Task t3 = t2.ContinueWith((t) => { Console.WriteLine("執行 t3 任務"); }); Console.ReadLine(); } }
TaskContinuationOptions
TaskContinuationOptions參數,能夠控制延續另外一個任的任務調度和執行的可選行爲。下面看代碼:
class Program { /* coder:釋迦苦僧 */ static void Main(string[] args) { /*建立任務t1*/ Task t1 = Task.Factory.StartNew(() => { Console.WriteLine("執行 t1 任務"); SpinWait.SpinUntil(() => { return false; }, 2000); throw new Exception("異常"); }); /*建立任務t2 t2任務的執行 依賴與t1任務的執行完成*/ Task t2 = t1.ContinueWith((t) => { Console.WriteLine(t.Status); Console.WriteLine("執行 t2 任務"); SpinWait.SpinUntil(() => { return false; }, 2000); /*定義 TaskContinuationOptions 行爲爲 NotOnFaulted 在 t1 任務拋出異常後,t1 的任務狀態爲 Faulted , 則t2 不會執行裏面的方法 可是須要注意的是t3任務*/ /*t2在不符合條件時 返回Canceled狀態狀態讓t3任務執行*/ }, TaskContinuationOptions.NotOnFaulted); /*建立任務t3 t3任務的執行 依賴與t2任務的執行完成*/ /*t2在不符合條件時 返回Canceled狀態狀態讓t3任務執行*/ Task t3 = t2.ContinueWith((t) => { Console.WriteLine(t.Status); Console.WriteLine("執行 t3 任務"); }); Console.ReadLine(); } }
TaskContinuationOptions 屬性有不少,以下所示
關於並行編程中的Task就寫到這,若有問題,請指正。
做者:釋迦苦僧 出處:http://www.cnblogs.com/woxpp/p/3928788.html本文版權歸做者和博客園共有,歡迎轉載,但未經做者贊成必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接。