目錄git
System.Threading.Tasks
中的類型被稱爲任務並行庫(Task Parallel Library,TPL)。github
System.Thread.Tasks
命名空間是.NET Framework4.0所提供,編程
「TPL使用CLR線程池自動將應用程序的工做動態分配到可用的CPU中。TPL還處理工做分區、線程調度、狀態管理和其餘低級別的細節操做。最終結果是,你能夠最大限度地提高.NET應用程序的性能,而且避免直接操做線程所帶來的複雜性」 --《精通C#》api
在System.Threading.Tasks
命名空間下有一個靜態類:Parallel類數組
Parallel能夠實現對實現了IEnumerable接口的數據集合的每個元素並行操做瀏覽器
有一點要說明的:並行操做會帶來必定的成本,若是任務自己能很快完成,或是循環次數不多,那麼並行處理的速度也許會比非並行處理還慢。安全
Parallel類就只有有三個方法:Parallel.For()
、Parallel.ForEach()
和Parallel.Invoke()
多線程
可是呢,這每一個方法都有大量的重載(F12-->自行查看Parallel定義)併發
使用Parallel.For()
能夠對數組中的每個元素進行並行操做異步
正常的遍歷數組是按照索引的順序執行的,可是並行操做,對數組的每個元素的操做不必定按照索引順序操做
Parallel.For(),第一個參數是循環開始的索引(包含),第二個參數是循環結束的索引(不含)
Parallel.For()的第三個參數是一個有參數無返回值的委託,其參數是數組的索引
其實就至關於:for (int i = 0; i < length; i++)
的異步版本,只是在這裏是並行操做,因此並不按照數組中元素的順序執行,具體的執行順序是不可控的。
示例
static void Main(string[] args) { int[] intArray = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; Console.WriteLine("------------常規,對數組進行循環遍歷------------"); Array.ForEach(intArray, n => Console.WriteLine($"當前操做的數組元素是{n}"));//注意這裏的參數n是元素而不是索引 Console.WriteLine("------------並行操做 對數組進行循環遍歷------------"); Parallel.For(0, intArray.Length, (i) => Console.WriteLine($"當前循環次數{i},當前操做的數組元素是{intArray[i]}")); Console.ReadKey(); }
運行結果:能夠看出,對數組的元素的操做順序並非按照索引的順序,而是不肯定的。
Parallel.ForEach()
用於對泛型可枚舉對象的元素進行並行操做
其實就至關於:foreach (var item in collection)
的異步版本
Parallel.ForEach()有大量的重載,這裏展現一個簡單的操做
Parallel.ForEach()的第一個參數是待操做的可枚舉對象,第二個參數是一個有參數無返回值的委託,該委託參數是集合的元素(而不是索引)
示例
List<int> intList = new List<int>() { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; Parallel.ForEach(intList, n => Console.WriteLine(n+100)); Console.ReadKey();
Parallel.Invoke()
對指定一系列操做並行運算
參數是一個Action委託數組(注意只能是Action[],即只能是無返回值的委託數組)
Parallel.Invoke()最多見用於併發請求接口
示例:
static void Main(string[] args) { Action action1=() => { for (int i = 0; i < 5; i++) { Console.WriteLine($"action-1-操做"); } }; Action action2 = () => { for (int i = 0; i < 5; i++) { Console.WriteLine($"action-2-操做"); } }; //Parallel.Invoke(action1, action2); Action[] actions = { action1, action2 }; Parallel.Invoke(actions); Console.ReadKey(); }
運行結果:
詳細能夠參考微軟的在線文檔
多線程對同一個數據集合同時讀寫操做,可能會形成數據的混亂
.NET4 引入了System.Collections.Concurrent
命名空間,其中包含多個線程安全的數據集合類型。
如今的新項目中,只要是對數據集合進行多線程的增刪操做,就應該使用併發集合類。
可是,若是僅從集合進行多線程的讀取,則可以使用通常的數據集合,即 System.Collections.Generic 命名空間中的類。
.net 中線程安全的數據集合有一下一些:
類型 | 描述 |
---|---|
BlockingCollection | 爲實現 IProducerConsumerCollection 的全部類型提供限制和阻止功能。 有關詳細信息,請參閱 BlockingCollection 概述。 |
ConcurrentDictionary | 鍵值對字典的線程安全實現。 |
ConcurrentQueue | FIFO(先進先出)隊列的線程安全實現。 |
ConcurrentStack | LIFO(後進先出)堆棧的線程安全實現。 |
ConcurrentBag | 無序元素集合的線程安全實現。 |
IProducerConsumerCollection | 類型必須實現以在 BlockingCollection 中使用的接口。 |
一個簡單的示例:給一個數據集合添加大批量的數據
List<int> list = new List<int>(); Parallel.For(0, 1000000, t => list.Add(t));
如果按照上面使用Parallel.For()
的並行方式給List添加數據,
則會報錯:「索引超出了數組界限。」或「 源數組長度不足。請檢查 srcIndex 和長度以及數組的下限。」
即便沒有報錯,list中的數據也是有問題的(比可能數量不足)
固然能夠經過加鎖的方式進行彌補:
List<int> list = new List<int>(); object locker = new object(); Parallel.For(0, 1000000, t => { lock(locker) { list.Add(t); } });
這樣經過對操做的線程枷鎖,徹底是沒有必要的,你可使用線程安全的集合類型,好比在這裏使用ConcurrentBag
ConcurrentBag<int> cBag = new ConcurrentBag<int>(); Parallel.For(0, 100000, t => cBag.Add(t));
固然由於是並行操做,因此插入集合中的數據並非按照0-100000的順序(僅僅是成段的有序)。
System.Threading.Tasks
命名空間中Task類,表示異步操做。
Task類能夠輕鬆地在次線程中調用方法,能夠做爲異步委託的簡單替代品。
同時在該命名空間還有一個泛型Task<TResul>
類,TResult 表示異步操做執行完成後返回值的類型。
建立一個Task操做,只須要使用靜態函數Task.Run()
便可,
Task.Run()是一個.net framework4.5及以上定義的一個默認異步操做,
Task.Run()參數是委託,即須要異步執行的方法,
注意做爲Task.Run()的參數的委託都是無參委託,
若Task.Run()參數是無返回值的委託Action
,則Task.Run()返回值是Task
類型
若Task.Run()參數是有返回值的委託Func<TResult>
,則Task.Run()返回值是Task<TResult>
泛型
注意:如果低於.net4.5,則可使用Task.Factory.StartNew()
,和Task.Run()靜態方法做用同樣
總而言之,言而總之,show you code ,一切皆明瞭!
示例:無返回值的Task
static void Main(string[] args) { //1.使用Task構造函數建立,必須顯式的使用.Start()才能開始執行 //Task task = new Task(() => { Thread.Sleep(10); Console.WriteLine("我是Task ,我結束了"); }); //task.Start(); //2.使用TaskFactory.StartNew(工廠建立) 方法 //Task task = Task.Factory.StartNew(() => { Thread.Sleep(10); Console.WriteLine("我是Task ,我結束了"); }); //3.使用Task.Run() Task task = Task.Run(() => { Thread.Sleep(10); Console.WriteLine("我是Task.Run ,我結束了"); }); if (!task.IsCompleted)//task.IsCompleted判斷當前的任務是否已完成 { Console.WriteLine("當前的Task.Run()還沒有執行完,可是由於異步,返回到調用函數,因此能夠先執行後續的代碼"); } Console.WriteLine("當前Task.Run尚未完成,咱們是在他以後的代碼可是先執行了"); task.Wait();//強行鎖定線程,等待task完成 Console.WriteLine("終於Task.Run完成了工做"); Console.ReadKey(); }
如果Task任務有返回值,返回值類型爲Task<T>
,使用返回值的Result
屬性查詢具體值
調試時注意查看,運行到 Console.WriteLine(task.Result)
的時候,其中Task任務仍是在執行Thread.Sleep(1000)
尚未出結果,咱們但願的異步執行也沒有發生,而是程序是在一直在等待,這是爲何呢?
是由於一但執行了task.Result,即便task任務尚未完成,主線程則停下等待,直到等待task.Result出結果
這種狀況和異步委託中調用EndInvoke()是同樣的:一旦運行EndInvoke,如果引用方法尚未完成,主線程則中止,直到引用函數運行結束。
因此能夠這樣理解:task.Result能夠看做是一個將來結果(必定有結果但還在運算中)
示例:有返回值的Task
static void Main(string[] args) { Console.WriteLine("SomeDoBeforeTask"); Func<int> Do = () => { Thread.Sleep(1000); Console.WriteLine("Task.Run結束"); return 2; }; Task<int> task = Task.Run(Do); Console.WriteLine(task.Status);//使用task.Status查看當前的Task的狀態:當前的狀態:WaitingToRun Console.WriteLine(task.Result);//使用task.result操做Task任務的返回值:返回值是:2 Console.WriteLine(task.Status);//使用task.Status查看當前的Task的狀態:當前的狀態:RanToComplation Console.WriteLine("SomeDoAfterTask"); Console.ReadKey(); }
運行結果:
說明:
其中咱們使用task.Result查看當前的task的狀態,其中Task的狀態(即其生命週期)只有三種:
Task任務是在後臺執行的同時,主線程的繼續執行後續程序
因此有時候須要在Task結束後,繼續執行某個特定的任務,即爲Task添加延續任務(也稱接續工做)
舉一個簡單的例子,
求解1-5000能求被3整除的個數,這個過程須要許多時間,我把它定義爲一個Task.Run()
咱們須要在求出結果後打印出結果,這裏怎麼操做呢?
如果直接使用task.Result
則會阻塞主線程,一直等待運算出結果,這顯然不是咱們想要的
如果使用while(!task.IsComplation){//後續操做}
,你沒法判斷Task什麼時候結束,並且一旦Task結束則會中斷後續操做
這裏就是須要爲Task加上接續工做
這裏你能夠明白,接續本質和異步委託中的回調模式是同樣的,回調方法就是接續工做
task1.ContinueWith(...task2..)
表示當task1結束後接着運行task2任務
注意這裏咱們使用Lambda表達式編寫接續工做,接續工做是有一個參數的,參數是Task類型,即上一個Task
即第一個Task完成後自動啓動下一個Task,實現Task的延續
注意:ContinueWith()的返回值亦是Task類型對象,即新建立的任務
能夠爲接續工做task2繼續添加接續工做task3
示例5 :
static void Main(string[] args) { Console.WriteLine("task執行前..."); Task<int> task1 = Task.Run(() => Enumerable.Range(1, 5000).Count(n => (n % 3) == 0)); Task task2 = task1.ContinueWith(t => Console.WriteLine($"當你看到這句話則task1結束了,1-5000中能被3整除的個數{t.Result}"));//這裏的t就是task1 Task task3 = task2.ContinueWith(t => Console.WriteLine($"當你看到這句話則task2也結束了")); Console.WriteLine($"task1及其接續工做正在執行中," + "\t\n" + "咱們如今正在執行其餘的後續代碼"); Console.ReadKey(); }
運行結果:
使用task.GetAwaiter()
爲相關的task建立一個等待者
示例:
static void Main(string[] args) { Console.WriteLine("task執行前..."); Task<int> task1 = Task.Run(() => Enumerable.Range(1, 5000).Count(n => (n % 3) == 0)); var awaiter = task1.GetAwaiter();//建立一個awaiter對象 //awaiter.OnCompleted(() => Console.WriteLine($"當你看到這句話則task1結束了,1-5000中能被3整除的個{task1.Result}")); awaiter.OnCompleted(() => Console.WriteLine($"當你看到這句話則task1結束了,1-5000中能被3整除的個{awaiter.GetResult()}")); Console.WriteLine($"task1及其接續工做正在執行中," + "\t\n" + "咱們如今正在執行其餘的後續代碼"); Console.ReadKey(); }
運行效果同上。
ContinueWith會返回Task對象,它很是適合用於增長更多的接續工做,不過,若是Task出錯,必須直接處理AggregateException。
使用task.GetAwaiter建立awaiter對象,是在.net4.5以後,其中C#5.0的異步功能就是使用這種方式。
使用awaiter也是可使用task.Result直接的查看任務的結果,可是使用awaiter.GetResult()能夠在Task出現異常的時候直接拋出,不會封裝在AggregateException中。
延時執行Task
其實就至關於實現Thread.Sleep()的異步版本
如果你使用Thread.Sleep(),則會程序一直在等待(即阻塞線程),直到等待結束纔會運行後續的代碼
而這裏就至關於給給Thread.Sleep()一個加了接續工做,且這個接續工做是異步的。
即便用Task.Delay()不會阻塞主線程,主線程能夠繼續執行後續代碼
示例:
//新建異步任務,30毫秒秒後執行 Task.Delay(30).ContinueWith(c => { for (int i = 0; i < 50; i++) { Console.WriteLine(i + "這是Task在運行"); } }); for (int i = 0; i < 100; i++) { Console.WriteLine(i + "這是Task以後的程序在運行"); }
調試的時候你會發現,剛開始的時候的時候是先顯示的"i這是Task以後的程序在運行"
以後在等帶了30毫秒,後就會開始顯示"i這是Task在運行"和"i這是Task以後的程序在運行"交叉顯示
運行結果以下:
示例:運行效果同上
Task.Delay(30).GetAwaiter().OnCompleted(() => { for (int i = 0; i < 50; i++) { Console.WriteLine(i + "這是Awaiter在運行行"); } }); for (int i = 0; i < 100; i++) { Console.WriteLine(i + "這是Awaiter以後的程序在運行行"); } Console.ReadKey();
方法名 | 說明 |
---|---|
Task.Wait | task1.Wait();就是等待任務執行(task1)完成,task1的狀態變爲Completed |
Task.WaitAll | 待全部的任務都執行完成 |
Task.WaitAny | 發同Task.WaitAll,就是等待任何一個任務完成就繼續向下執行 |
CancellationTokenSource | 經過cancellation的tokens來取消一個Task |
異步方法是能夠請求終止運行的,
System.Threading.Tasks命名空間中有兩個類是爲此目的而設計的:Cance1lationToken和CancellationTokenSource。
下面看使用CancellationTokenSource和CancellationToken來實現取消某個異步操做。
這裏使用Task.Run()爲例,其第一個參數是一個Action委託,第二個參數就是CancellationToken對象
static void Main(string[] args) { CancellationTokenSource cts = new CancellationTokenSource();//生成一個CancellationTokenSource對象,該對象能夠建立CancellationToken CancellationToken ct = cts.Token;//獲取一個令牌(token) Task.Run(() => { for (int i = 0; i < 20; i++) { if (ct.IsCancellationRequested) { return; } Thread.Sleep(1000); Console.WriteLine($"異步程序的的循環:{i}"); } }, ct);//注意Run()的第二個參數就是終止令牌token for (int i = 0; i < 4; i++) { Thread.Sleep(1000); Console.WriteLine($"主線程中循環:{i}"); } Console.WriteLine("立刻sts.Cancel(),即將要終止異步程序"); cts.Cancel();//含有該CancellationTokenSource的token的異步程序,終止! Console.ReadKey(); }
運行結果:能夠發現異步任務Task.Run()尚未完成,可是由於cst.Cancel()運行,token的屬性IsCancellationRequested變爲true,異步循環結束。
說明:取消一個異步操做的過程,注意,該過程是協同的。
即:調用CancellationTokenSource的Cancel時,它自己並不會執行取消操做。
而是會將CancellationToken的IsCancellationRequested屬性設置爲true。
包含CancellationToken的代碼負責檢查該屬性,並判斷是否須要中止執行並返回。
System.Linq名稱空間中有一個ParallelEnumerable
類,該類中的方法能夠分解Linq查詢的工做,使其分佈在多個線程上,即實現並行查詢。
爲並行運行而設計的LINQ查詢稱爲PLINQ查詢。
下面讓咱們先簡單的理一理:
首先咱們都知道Enumerable
類爲IEnumberable<T>
接口擴展了一系列的靜態方法。(就是咱們使用Linq方法語法的中用的哪些經常使用的靜態方法,自行F12)
正如MSDN中所說:「ParallelEnumberable是Enumberable的並行等效項」,ParallelEnumberable
類則是Enumerable
類的並行版本,
F12查看定義能夠看到ParallelEnumerable
類中幾乎全部的方法都是對ParallelQuery<TSource>
接口的擴展,
可是,在ParallelEnumberable
類有一個重要的例外,AsParallel()
方法還對IEnumerable<T>
接口的擴展,而且返回的是一個ParallelQuery<TSource>
類型的對象,
因此呢?凡是實現類IEnumberable<T>集合能夠經過調用靜態方法AsParallel()
,返回一個ParallelQuery
注意在運行PLinq的時候,PLinq會自動的判斷若是查詢能從並行化中受益,則將同時運行。而若是並行執行查詢會損害性能,PLINQ將按順序運行查詢。
示例:求1到50000000中能夠整除3的數,將所求的結果倒序存放在modThreeIsZero[]中
這是須要很是多的重複運算,因此咱們能夠對比按照通常Linq查詢下方式和PLinq查詢,對比一些須要的時間。
static void Main(string[] args) { int[] intArray = Enumerable.Range(1, 50000000).ToArray(); Stopwatch sw = new Stopwatch(); //順序查詢 sw.Start(); int[] modThreeIsZero1 = intArray.Select(n => n).Where(n => n % 3 == 0).OrderByDescending(n => n).ToArray(); sw.Stop(); Console.WriteLine($"順序查詢,運行時間:{sw.ElapsedMilliseconds}毫秒,能夠整除3的個數:{modThreeIsZero1.Count()}"); //使用AsParallel()實現並行查詢 //AsParallel()方法返回ParallelQuery<TSourc>類型對象。由於返回的類型,因此編譯器選擇的Select()、Where()等方法是ParallelEnumerable.Where(),而不是Enumerable.Where()。 sw.Restart(); int[] modThreeIsZero2 = intArray.AsParallel().Select(n => n).Where(n => n % 3 == 0).OrderByDescending(n => n).ToArray(); sw.Stop(); Console.WriteLine($"並行查詢,運行時間:{sw.ElapsedMilliseconds}毫秒,能夠整除3的個數:{modThreeIsZero2.Count()}"); Console.ReadKey(); }
說明:AsParallel()方法返回ParallelQuery<TSourc>類型對象。由於返回的類型,因此編譯器選擇的Select()、Where()等方法是ParallelEnumerable.Where(),而不是Enumerable.Where()。
運行結果:
能夠對比結果,在大規模的Linq查詢中,同步查詢和並行查詢二者的運行時間的差距仍是很大的!
可是小規模的Linq查詢兩者的效果其實並無很明顯。
在3.6取消異步操做中解釋瞭如何取消一個長時間的任務,
那麼對於長時間運行的PLinq也是能夠取消的
一樣是使用CancellationTokenSource
生成一個CancellationToken
對象做爲token
怎麼把token給PLinq呢?使用ParallelQuery<TSource>
中靜態方法WithCancellation(token)
在PLinq中,如果取消了並行操做,則會拋出OperationCanceledException
示例:
static void Main(string[] args) { //具體的做用和含義能夠看0030取消一個異步操做 CancellationTokenSource cts = new CancellationTokenSource(); CancellationToken ct = cts.Token; int[] intArray = Enumerable.Range(1, 50000000).ToArray(); Task<int[]> task = Task.Run(() => { try { int[] modThreeIsZero = intArray.AsParallel().WithCancellation(ct).Select(n => n).Where(n=> n% 3 == 0).OrderByDescending(n => n).ToArray(); return modThreeIsZero; } catch (OperationCanceledException ex)//一旦PLinq中取消查詢就會觸發OperationCanceledException異常 { Console.WriteLine(ex.Message);//注意:Message的內容就是:已取消該操做 return null; } }); Console.WriteLine("取消PLinq?Y/N"); string input = Console.ReadLine(); if (input.ToLower().Equals("y")) { cts.Cancel();//取消並行查詢 Console.WriteLine("取消了PLinq!");//undone:怎麼驗證已經真的取消了 } else { Console.WriteLine("Loading……"); Console.WriteLine(task.Result.Count()); } Console.ReadKey(); }
唉,書真是越看越厚,皆是淺嘗輒止,先到這裏吧!