NetCore併發編程
示例代碼:<a href="https://github.com/lotapp/BaseCode/tree/master/netcore/4_Concurrency" target="_blank">https://github.com/lotapp/BaseCode/tree/master/netcore/4_Concurrency</a>git
先簡單說下概念(其實以前也有說,因此簡說下):github
- 併發:同時作多件事情
- 多線程:併發的一種形式
- 並行處理:多線程的一種(線程池產生的一種併發類型,eg:異步編程)
- 響應式編程:一種編程模式,對事件進行響應(有點相似於JQ的事件)
Net裏面不多用進程,在之前基本上都是線程+池+異步+並行+協程
編程
我這邊簡單引入一下,畢竟主要是寫Python的教程,Net只是幫大家回顧一下,若是你發現還沒聽過這些概念,或者你的項目中還充斥着各類Thread
和ThreadPool
的話,真的得系統的學習一下了,如今官網的文檔已經很完善了,記得早幾年啥都沒有,也只能挖那些外國開源項目:安全
<a href="https://docs.microsoft.com/zh-cn/dotnet/standard/parallel-processing-and-concurrency" target="_blank">https://docs.microsoft.com/zh-cn/dotnet/standard/parallel-processing-and-concurrency</a>服務器
1.異步編程(Task)
Task的目的其實就是爲了簡化Thread
和ThreadPool
的代碼,下面一塊兒看看吧:網絡
異步用起來比較簡單,通常IO,DB,Net用的比較多,不少時候都會採用重試機制,舉個簡單的例子:多線程
/// <summary> /// 模擬一個網絡操做(別忘了重試機制) /// </summary> /// <param name="url">url</param> /// <returns></returns> private async static Task<string> DownloadStringAsync(string url) { using (var client = new HttpClient()) { // 設置第一次重試時間 var nextDelay = TimeSpan.FromSeconds(1); for (int i = 0; i < 3; i++) { try { return await client.GetStringAsync(url); } catch { } await Task.Delay(nextDelay); // 用異步阻塞的方式防止服務器被太多重試給阻塞了 nextDelay *= 2; // 3次重試機會,第一次1s,第二次2s,第三次4s } // 最後一次嘗試,錯誤就拋出 return await client.GetStringAsync(url); } }
而後補充說下Task異常的問題,當你await的時候若是有異常會拋出,在第一個await處捕獲處理便可併發
若是async
和await
就是理解不了的能夠這樣想:async
就是爲了讓await
生效(爲了向後兼容)app
對了,若是返回的是void,你設置成Task就好了,觸發是相似於事件之類的方法才使用void,否則沒有返回值都是使用Task框架
項目裏常常有這麼一個場景:等待一組任務完成後再執行某個操做,看個引入案例:
/// <summary> /// 1.批量任務 /// </summary> /// <param name="list"></param> /// <returns></returns> private async static Task<string[]> DownloadStringAsync(IEnumerable<string> list) { using (var client = new HttpClient()) { var tasks = list.Select(url => client.GetStringAsync(url)).ToArray(); return await Task.WhenAll(tasks); } }
再舉一個場景:同時調用多個同效果的API,有一個返回就行了,其餘的忽略
/// <summary> /// 2.返回首先完成的Task /// </summary> /// <param name="list"></param> /// <returns></returns> private static async Task<string> GetIPAsync(IEnumerable<string> list) { using (var client = new HttpClient()) { var tasks = list.Select(url => client.GetStringAsync(url)).ToArray(); var task = await Task.WhenAny(tasks); // 返回第一個完成的Task return await task; } }
一個async方法被await調用後,當它恢復運行時就會回到原來的上下文中運行。
若是你的Task再也不須要上下文了可使用:task.ConfigureAwait(false)
,eg:寫個日記還要啥上下文?
逆天的建議是:在覈心代碼裏面一種使用ConfigureAwait
,用戶頁面相關代碼,不須要上下文的加上
其實若是有太多await在上下文裏恢復那也是比較卡的,使用ConfigureAwait
以後,被暫停後會在線程池裏面繼續運行
再看一個場景:好比一個耗時操做,我須要指定它的超時時間:
/// <summary> /// 3.超時取消 /// </summary> /// <returns></returns> private static async Task<string> CancellMethod() { //實例化取消任務 var cts = new CancellationTokenSource(); cts.CancelAfter(TimeSpan.FromSeconds(3)); // 設置失效時間爲3s try { return await DoSomethingAsync(cts.Token); } // 任務已經取消會引起TaskCanceledException catch (TaskCanceledException ex) { return "false"; } } /// <summary> /// 模仿一個耗時操做 /// </summary> /// <returns></returns> private static async Task<string> DoSomethingAsync(CancellationToken token) { await Task.Delay(TimeSpan.FromSeconds(5), token); return "ok"; }
異步這塊簡單回顧就不說了,留兩個擴展,大家自行探討:
- 進度方面的可使用
IProgress<T>
,就當留個做業本身摸索下吧~ - 使用了異步以後儘可能避免使用
task.Wait
ortask.Result
,這樣能夠避免死鎖
Task其餘新特徵去官網看看吧,引入到此爲止了。
2.並行編程(Parallel)
這個其實出來好久了,如今基本上都是用PLinq
比較多點,主要就是:
- 數據並行:重點在處理數據(eg:聚合)
- 任務並行:重點在執行任務(每一個任務塊儘量獨立,越獨立效率越高)
數據並行
之前都是Parallel.ForEach
這麼用,如今和Linq結合以後很是方便.AsParallel()
就OK了
說很抽象看個簡單案例:
static void Main(string[] args) { IEnumerable<int> list = new List<int>() { 1, 2, 3, 4, 5, 7, 8, 9 }; foreach (var item in ParallelMethod(list)) { Console.WriteLine(item); } } /// <summary> /// 舉個例子 /// </summary> private static IEnumerable<int> ParallelMethod(IEnumerable<int> list) { return list.AsParallel().Select(x => x * x); }
正常執行的結果應該是:
1 4 9 25 64 16 49 81
並行以後就是這樣了(無論順序了):
25 64 1 9 49 81 4 16
固然了,若是你就是對順序有要求可使用:.AsOrdered()
/// <summary> /// 舉個例子 /// </summary> private static IEnumerable<int> ParallelMethod(IEnumerable<int> list) { return list.AsParallel().AsOrdered().Select(x => x * x); }
其實實際項目中,使用並行的時候:任務時間適中,太長不適合,過短也不適合
記得你們在項目裏常常會用到如Sum
,Count
等聚合函數,其實這時候使用並行就很合適
var list = new List<long>(); for (long i = 0; i < 1000000; i++) { list.Add(i); } Console.WriteLine(GetSumParallel(list));
private static long GetSumParallel(IEnumerable<long> list) { return list.AsParallel().Sum(); }
time dotnet PLINQ.dll
499999500000 real 0m0.096s user 0m0.081s sys 0m0.025s
不使用並行:(稍微多了點,CPU越密集差距越大)
499999500000 real 0m0.103s user 0m0.092s sys 0m0.021s
其實聚合有一個通用方法,能夠支持複雜的聚合:(以上面sum爲例)
.Aggregate( seed:0, func:(sum,item)=>sum+item );
稍微擴展一下,PLinq也是支持取消的,.WithCancellation(CancellationToken)
Token的用法和上面同樣,就不復述了,若是須要和異步結合,一個Task.Run
就能夠把並行任務交給線程池了
也可使用Task的異步方法,設置超時時間,這樣PLinq超時了也就終止了
PLinq這麼方便,其實也是有一些小弊端的,好比它會直接最大程度的佔用系統資源,可能會影響其餘的任務,而傳統的Parallel則會動態調整
任務並行(並行調用)
這個PLinq好像沒有對應的方法,有新語法你能夠說下,來舉個例子:
await Task.Run(() => Parallel.Invoke( () => Task.Delay(TimeSpan.FromSeconds(3)), () => Task.Delay(TimeSpan.FromSeconds(2)) ));
取消也支持:
Parallel.Invoke(new ParallelOptions() { CancellationToken = token }, actions);
擴充說明
其實還有一些好比數據流和響應編程沒說,這個以前都是用第三方庫,剛纔看官網文檔,好像已經支持了,因此就不賣弄了,感興趣的能夠去看看,其實項目裏面有流數據相關的框架,eg:Spark
,都是比較成熟的解決方案了基本上也不太使用這些了。
而後還有一些沒說,好比NetCore裏面不可變類型(列表、字典、集合、隊列、棧、線程安全字典等等)以及限流、任務調度等,這些關鍵詞我提一下,也方便你去搜索本身學習拓展
先到這吧,其餘的本身探索一下吧,最後貼一些Nuget庫,你能夠針對性的使用:
數據流:Microsoft.Tpl.Dataflow
響應編程(Linq的Rx操做):Rx-Main
不可變類型:Microsoft.Bcl.Immutable