.NET Core 中的併發編程

今天咱們購買的每臺電腦都有一個多核心的 CPU,容許它並行執行多個指令。操做系統經過將進程調度到不一樣的內核來發揮這個結構的優勢。 然而,還能夠經過異步 I/O 操做和並行處理來幫助咱們提升單個應用程序的性能。 在.NET Core中,任務 (tasks) 是併發編程的主要抽象表述,但還有其餘支撐類可使咱們的工做更容易。html

併發編程 - 異步 vs. 多線程代碼

並行編程是一個普遍的術語,咱們應該經過觀察異步方法和實際的多線程之間的差別展開探討。 儘管 .NET Core 使用了任務來表達一樣的概念,一個關鍵的差別是內部處理的不一樣。 調用線程在作其餘事情時,異步方法在後臺運行。這意味着這些方法是 I/O 密集型的,即他們大部分時間用於輸入和輸出操做,例如文件或網絡訪問。 只要有可能,使用異步 I/O 方法代替同步操做頗有意義。相同的時間,調用線程能夠在處理桌面應用程序中的用戶交互或處理服務器應用程序中的同時處理其餘請求,而不只僅是等待操做完成。算法

你能夠在個人文章 Asynchronous Programming in C# using Async Await – Best Practices 中閱讀更多關於使用 async 和 await 調用異步方法。該文章來自 DNC Magazine (9月刊) 。編程

計算密集型的方法要求 CPU 週期工做,而且只能運行在他們專用的後臺線程中。CPU 的核心數限制了並行運行時的可用線程數量。操做系統負責在剩餘的線程之間切換,使他們有機會執行代碼。 這些方法仍然被併發地執行,卻沒必要被並行地執行。儘管這意味着方法不是同時執行,卻能夠在其餘方法暫停的時候執行。數組

並行 vs 併發

本文將在最後一段中重點介紹 在 .NET Core中多線程併發編程。安全

任務並行庫

.NET Framework 4 引入了任務並行庫 (TPL) 做爲編寫併發代碼的首選 API。.NET Core採用相同的編程模式。 要在後臺運行一段代碼,須要將其包裝成一個 任務服務器

?
1
2
3
var backgroundTask = Task.Run(() => DoComplexCalculation(42));
// do other work
var result = backgroundTask.Result;

當須要返回結果時,Task.Run 方法接收一個 函數 (Func) ;當不須要返回結果時,方法 Task.Run 接收一個 動做 (Action) 。固然,全部的狀況下均可以使用 lambda 表達式,就像我上面例子中調用帶一個參數的長時間方法。 線程池中的某個線程將會處理任務。.NET Core 的運行時包含一個默認調度程序,使用線程池來處理隊列並執行任務。您能夠經過派生 TaskScheduler 類實現本身的調度算法,代替默認的,但這超過本文的討論範圍。 正如咱們以前所見,我使用 Result 屬性來合併被調用的後臺線程。對於不須要返回結果的線程,我能夠調用 Wait() 來代替。這兩種方式都將被堵塞到後臺任務完成。 爲了不堵塞調用線程 ( 如在ASP.NET Core應用程序中) ,可使用 await 關鍵字:網絡

?
1
2
3
var backgroundTask = Task.Run(() => DoComplexCalculation(42));
// do other work
var result = await backgroundTask;

這樣被調用的線程將被釋放以便處理其餘傳入請求。一旦任務完成,一個可用的工做線程將會繼續處理請求。固然,控制器動做方法必須是異步的:數據結構

?
1
public async Task<iactionresult> Index() {     // method body }

處理異常

將兩個線程合併在一塊兒的時候,任務拋出的任何異常將被傳遞到調用線程中:多線程

  • 若是使用 Result 或 Wait() ,它們將被打包到 AggregateException 中。實際的異常將被拋出並存儲在其 InnerException 屬性中。
  • 若是您使用 await,原來的異常將不會被打包。

在這兩種狀況下,調用堆棧的信息將保持不變。併發

取消任務

因爲任務是能夠長時間運行的,因此你可能想要有一個能夠提早取消任務的選項。實現這個選項,須要在任務建立的時候傳入取消的令牌 (token),以後再使用令牌觸發取消任務:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
var tokenSource = new CancellationTokenSource();
var cancellableTask = Task.Run(() =>
{
     for ( int i = 0; i < 100; i++)
     {
         if (tokenSource.Token.IsCancellationRequested)
         {
             // clean up before exiting
             tokenSource.Token.ThrowIfCancellationRequested();
         }
         // do long-running processing
     }
     return 42;
}, tokenSource.Token);
// cancel the task
tokenSource.Cancel();
try
{
     await cancellableTask;
}
catch (OperationCanceledException e)
{
     // handle the exception
} 

實際上,爲了提早取消任務,你須要檢查任務中的取消令牌,並在須要取消的時候做出反應:在執行必要的清理操做後,調用 ThrowIfCancellationRequested() 退出任務。這個方法將會拋出 OperationCanceledException,以便在調用線程中執行相應的處理。

協調多任務

若是你須要運行多個後臺任務,這裏有些方法能夠幫助到你。 要同時運行多個任務,只需連續啓動它們並收集它們的引用,例如在數組中:

?
1
2
3
4
5
6
var backgroundTasks = new []
{
     Task.Run(() => DoComplexCalculation(1)),
     Task.Run(() => DoComplexCalculation(2)),
     Task.Run(() => DoComplexCalculation(3))
};

如今你可使用 Task 類的靜態方法,等待他們被異步或者同步執行完畢。

?
1
2
3
4
5
6
// wait synchronously
Task.WaitAny(backgroundTasks);
Task.WaitAll(backgroundTasks);
// wait asynchronously
await Task.WhenAny(backgroundTasks);
await Task.WhenAll(backgroundTasks);

實際上,這兩個方法最終都會返回全部任務的自身,能夠像任何其餘任務同樣再次操做。爲了獲取對應任務的結果,你能夠檢查該任務的 Result 屬性。 處理多任務的異常有點棘手。方法 WaitAll 和 WhenAll 無論哪一個任務被收集到異常時都會拋出異常。不過,對於 WaitAll ,將會收集全部的異常到對應的 InnerExceptions 屬性;對於 WhenAll ,只會拋出第一個異常。爲了確認哪一個任務拋出了哪一個異常,您須要單獨檢查每一個任務的 Status 和 Exception 屬性。 在使用 WaitAny 和 WhenAny 時必須足夠當心。他們會等到第一個任務完成 (成功或失敗),即便某個任務出現異常時也不會拋出任何異常。他們只會返回已完成任務的索引或者分別返回已完成的任務。你必須等到任務完成或訪問其 result 屬性時捕獲異常,例如:

?
1
2
3
4
5
6
7
8
9
var completedTask = await Task.WhenAny(backgroundTasks);
try
{
     var result = await completedTask;
}
catch (Exception e)
{
     // handle exception
}

若是你想連續運行多個任務,代替併發任務,可使用延續 (continuations)的方式:

?
1
2
3
var compositeTask = Task.Run(() => DoComplexCalculation(42))
     .ContinueWith(previous => DoAnotherComplexCalculation(previous.Result),
         TaskContinuationOptions.OnlyOnRanToCompletion)

ContinueWith() 方法容許你把多個任務一個接着一個執行。這個延續的任務將獲取到前面任務的結果或狀態的引用。 你仍然能夠增長條件判斷是否執行延續任務,例如只有在前面任務成功執行或者拋出異常時。對比連續等待多個任務,提升了靈活性。 固然,您能夠將延續任務與以前討論的全部功能相結合:異常處理、取消和並行運行任務。這就有了很大的表演空間,以不一樣的方式進行組合:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
var multipleTasks = new []
{
     Task.Run(() => DoComplexCalculation(1)),
     Task.Run(() => DoComplexCalculation(2)),
     Task.Run(() => DoComplexCalculation(3))
};
var combinedTask = Task.WhenAll(multipleTasks);
 
var successfulContinuation = combinedTask.ContinueWith(task =>
         CombineResults(task.Result), TaskContinuationOptions.OnlyOnRanToCompletion);
var failedContinuation = combinedTask.ContinueWith(task =>
         HandleError(task.Exception), TaskContinuationOptions.NotOnRanToCompletion);
 
await Task.WhenAny(successfulContinuation, failedContinuation);

任務同步

若是任務是徹底獨立的,那麼咱們剛纔看到的協調方法就已足夠。然而,一旦須要同時共享數據,爲了防止數據損壞,就必需要有額外的同步。 兩個以及更多的線程同時更新一個數據結構時,數據很快就會變得不一致。就好像下面這個示例代碼同樣:

?
1
2
3
4
5
6
7
8
9
10
var counters = new Dictionary< int , int >();
 
if (counters.ContainsKey(key))
{
     counters[key] ++;
}
else
{
     counters[key] = 1;
}

當多個線程同時執行上述代碼時,不一樣線程中的特定順序執行指令可能致使數據不正確,例如:

  • 全部線程將會檢查集合中是否存在同一個 key
  • 結果,他們都會進入 else 分支,並將這個 key 的值設爲1
  • 最後結果將會是1,而不是2。若是是接連着執行代碼的話,將會是預期的結果。

上述代碼中,臨界區 (critical section) 一次只容許一個線程能夠進入。在C# 中,可使用 lock 語句來實現:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
var counters = new Dictionary< int , int >();
 
lock (syncObject)
{
     if (counters.ContainsKey(key))
     {
         counters[key]++;
     }
     else
     {
         counters[key] = 1;
     }
}

在這個方法中,全部線程都必須共享相同的的 syncObject 。做爲最佳作法,syncObject 應該是一個專用的 Object 實例,專門用於保護對一個獨立的臨界區的訪問,避免從外部訪問。 在 lock 語句中,只容許一個線程訪問裏面的代碼塊。它將阻止下一個嘗試訪問它的線程,直到前一個線程退出。這將確保線程完整執行臨界區代碼,而不會被另外一個線程中斷。固然,這將減小並行性並減慢代碼的總體執行速度,所以您最好最小化臨界區的數量並使其儘量的短。

使用 Monitor 類來簡化 lock 聲明:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
var lockWasTaken = false ;
var temp = syncObject;
try
{
     Monitor.Enter(temp, ref lockWasTaken);
     // lock statement body
}
finally
{
     if (lockWasTaken)
     {
         Monitor.Exit(temp);
     }
}

儘管大部分時間您都但願使用 lock 語句,但 Monitor 類能夠在須要時給予額外的控制。例如,您可使用 TryEnter() 而不是 Enter(),並指定一個限定時間,避免無止境地等待鎖釋放。

其餘同步基元

Monitor 只是 .NET Core 中衆多同步基元的一員。根據實際狀況,其餘基元可能更適合。

Mutex 是 Monitor 更重量級的版本,依賴於底層的操做系統,提供跨多個進程同步訪問資源[1], 是針對 Mutex 進行同步的推薦替代方案。

SemaphoreSlim 和 Semaphore 能夠限制同時訪問資源的最大線程數量,而不是像 Monitor 同樣只能限制一個線程。 SemaphoreSlim 比 Semaphore 更輕量,但僅限於單個進程。若是可能,您最好使用 SemaphoreSlim 而不是 Semaphore。

ReaderWriterLockSlim 能夠區分兩種對訪問資源的方式。它容許無限數量的讀取器 (readers) 同時訪問資源,而且限制同時只容許一個寫入器 (writers) 訪問鎖定資源。讀取時線程安全,但修改數據時須要獨佔資源,很好地保護了資源。

AutoResetEvent、ManualResetEvent 和 ManualResetEventSlim 將堵塞傳入的線程,直到它們接收到一個信號 (即調用 Set() )。而後等待中的線程將繼續執行。AutoResetEvent 在下一次調用 Set() 以前,將一直阻塞,並只容許一個線程繼續執行。ManualResetEvent 和 ManualResetEventSlim 不會堵塞線程,除非 Reset() 被調用。ManualResetEventSlim 比前二者更輕量,更值得推薦。

Interlocked 提供一種選擇——原子操做,這是替代 locking 和其餘同步基元更好的選擇(若是適用):

?
1
2
3
4
5
6
7
// non-atomic operation with a lock
lock (syncObject)
{
     counter++;
}
// equivalent atomic operation that doesn't require a lock
Interlocked.Increment( ref counter);

併發集合

當一個臨界區須要確保對數據結構的原子訪問時,用於併發訪問的專用數據結構多是更好和更有效的替代方案。例如,使用 ConcurrentDictionary 而不是 Dictionary,能夠簡化 lock 語句示例:

?
1
2
3
4
5
6
7
var counters = new ConcurrentDictionary< int , int >();
 
counters.TryAdd(key, 0);
lock (syncObject)
{
     counters[key]++;
}

天然地,也有可能像下面同樣:

?
1
counters.AddOrUpdate(key, 1, (oldKey, oldValue) => oldValue + 1);

由於 update 的委託是臨界區外面的方法,所以,第二個線程可能在第一個線程更新值以前,讀取到一樣的舊值,使用本身的值有效地覆蓋了第一個線程的更新值,這就丟失了一個增量。錯誤使用併發集合也是沒法避免多線程帶來的問題。 併發集合的另外一個替代方案是 不變的集合 (immutable collections)。 相似於併發集合,一樣是線程安全的,可是底層實現是不同的。任何關改變數據結構的操做將不會改變原來的實例。相反,它們返回一個更改後的副本,並保持原始實例不變:

?
1
2
var original = new Dictionary< int , int >().ToImmutableDictionary();
var modified = original.Add(key, value);

所以在一個線程中對集合任何更改對於其餘線程來講都是不可見的。由於它們仍然引用原來的未修改的集合,這就是不變的集合本質上是線程安全的緣由。 固然,這使得它們對於解決不一樣集合的問題頗有效。最好的狀況是多個線程在同一個輸入集合的狀況下,獨立地修改數據,在最後一步可能爲全部線程合併變動。而使用常規集合,須要提早爲每一個線程建立集合的副本。

並行LINQ (PLINQ)

並行LINQ (PLINQ) 是 Task Parallel Library 的替代方案。顧名思義,它很大程度上依賴於 LINQ(語言集成查詢)功能。對於在大集合中執行相同的昂貴操做的場景是頗有用的。與全部操做都是順序執行的普通 LINQ to Objects 不一樣的是,PLINQ能夠在多個CPU上並行執行這些操做。 發揮優點所須要的代碼改動也是極小的:

?
1
2
3
4
5
6
7
8
9
10
// sequential execution
var sequential = Enumerable.Range(0, 40)
     .Select(n => ExpensiveOperation(n))
     .ToArray();
 
// parallel execution
var parallel = Enumerable.Range(0, 40)
     .AsParallel()
     .Select(n => ExpensiveOperation(n))
     .ToArray();

如你所見,這兩個代碼片斷的不一樣僅僅是調用 AsParallel()。這將IEnumerable 轉換爲 ParallelQuery,致使查詢的部分並行運行。要切換爲回順序執行,您能夠調用 AsSequential(),它將再次返回一個IEnumerable。 默認狀況下,PLINQ 不保留集合中的順序,以便讓進程更有效率。可是當順序很重要時,能夠調用 AsOrdered():

?
1
2
3
4
5
var parallel = Enumerable.Range(0, 40)
     .AsParallel()
     .AsOrdered()
     .Select(n => ExpensiveOperation(n))
     .ToArray();

同理,你能夠經過調用 AsUnordered() 切換回來。

在完整的 .NET Framework 中併發編程

因爲 .NET Core 是完整的 .NET Framework 的簡化實現,因此 .NET Framework 中全部並行編程方法也能夠在.NET Core 中使用。惟一的例外是不變的集合,它們不是完整的 .NET Framework 的組成部分。它們做爲單獨的 NuGet 軟件包(System.Collections.Immutable)分發,您須要在項目中安裝使用。

結論:

每當應用程序包含能夠並行運行的 CPU 密集型代碼時,利用併發編程來提升性能並提升硬件利用率是頗有意義的。 .NET Core 中的 API 抽象了許多細節,使編寫併發代碼更容易。然而須要注意某些潛在的問題, 其中大部分涉及從多個線程訪問共享數據。 若是能夠的話,你應該徹底避免這種狀況。若是不行,請確保選擇最合適的同步方法或數據結構。

相關文章
相關標籤/搜索