目錄git
本章介紹在C#中實現線程同步的幾種方法。由於多個線程同時訪問共享數據時,可能會形成共享數據的損壞,從而致使與預期的結果不相符。爲了解決這個問題,因此須要用到線程同步,也被俗稱爲「加鎖」。可是加鎖絕對不對提升性能,最多也就是不增不減,要實現性能不增不減還得靠高質量的同步源語(Synchronization Primitive)。可是由於正確永遠比速度更重要,因此線程同步在某些場景下是必須的。數據庫
線程同步有兩種源語(Primitive)構造:用戶模式(user - mode)和內核模式(kernel - mode),當資源可用時間短的狀況下,用戶模式要優於內核模式,可是若是長時間不能得到資源,或者說長時間處於「自旋」,那麼內核模式是相對來講好的選擇。c#
可是咱們但願兼具用戶模式和內核模式的優勢,咱們把它稱爲混合構造(hybrid construct),它兼具了兩種模式的優勢。併發
在C#中有多種線程同步的機制,一般能夠按照如下順序進行選擇。dom
- 若是代碼能經過優化能夠不進行同步,那麼就不要作同步。
- 使用原子性的
Interlocked
方法。- 使用
lock/Monitor
類。- 使用異步鎖,如
SemaphoreSlim.WaitAsync()
。- 使用其它加鎖機制,如
ReaderWriterLockSlim、Mutex、Semaphore
等。- 若是系統提供了
*Slim
版本的異步對象,那麼請選用它,由於*Slim
版本所有都是混合鎖,在進入內核模式前實現了某種形式的自旋。
在同步中,必定要注意避免死鎖的發生,死鎖的發生必須知足如下4個基本條件,因此只須要破壞任意一個條件,就可避免發生死鎖。異步
- 排他或互斥(Mutual exclusion):一個線程(ThreadA)獨佔一個資源,沒有其它線程(ThreadB)能獲取相同的資源。
- 佔有並等待(Hold and wait):互斥的一個線程(ThreadA)請求獲取另外一個線程(ThreadB)佔有的資源.
- 不可搶先(No preemption):一個線程(ThreadA)佔有資源不能被強制拿走(只能等待ThreadA主動釋放它的資源)。
- 循環等待條件(Circular wait condition):兩個或多個線程構成一個循環等待鏈,它們鎖定兩個或多個相同的資源,每一個線程都在等待鏈中的下一個線程佔有的資源。
CLR保證了對這些數據類型的讀寫是原子性的:Boolean、Char、(S)Byte、(U)Int1六、(U)Int3二、(U)IntPtr和Single
。可是若是讀寫Int64
可能會發生讀取撕裂(torn read)的問題,由於在32位操做系統中,它須要執行兩次Mov
操做,沒法在一個時間內執行完成。ide
那麼在本節中,就會着重的介紹System.Threading.Interlocked
類提供的方法,Interlocked
類中的每一個方法都是執行一次的讀取以及寫入操做。更多與Interlocked
類相關的資料請參考連接,戳一戳本文不在贅述。函數
演示代碼以下所示,分別使用了三種方式進行計數:錯誤計數方式、lock
鎖方式和Interlocked
原子方式。性能
private static void Main(string[] args) { Console.WriteLine("錯誤的計數"); var c = new Counter(); Execute(c); Console.WriteLine("--------------------------"); Console.WriteLine("正確的計數 - 有鎖"); var c2 = new CounterWithLock(); Execute(c2); Console.WriteLine("--------------------------"); Console.WriteLine("正確的計數 - 無鎖"); var c3 = new CounterNoLock(); Execute(c3); Console.ReadLine(); } static void Execute(CounterBase c) { // 統計耗時 var sw = new Stopwatch(); sw.Start(); var t1 = new Thread(() => TestCounter(c)); var t2 = new Thread(() => TestCounter(c)); var t3 = new Thread(() => TestCounter(c)); t1.Start(); t2.Start(); t3.Start(); t1.Join(); t2.Join(); t3.Join(); sw.Stop(); Console.WriteLine($"Total count: {c.Count} Time:{sw.ElapsedMilliseconds} ms"); } static void TestCounter(CounterBase c) { for (int i = 0; i < 100000; i++) { c.Increment(); c.Decrement(); } } class Counter : CounterBase { public override void Increment() { _count++; } public override void Decrement() { _count--; } } class CounterNoLock : CounterBase { public override void Increment() { // 使用Interlocked執行原子操做 Interlocked.Increment(ref _count); } public override void Decrement() { Interlocked.Decrement(ref _count); } } class CounterWithLock : CounterBase { private readonly object _syncRoot = new Object(); public override void Increment() { // 使用Lock關鍵字 鎖定私有變量 lock (_syncRoot) { // 同步塊 Count++; } } public override void Decrement() { lock (_syncRoot) { Count--; } } } abstract class CounterBase { protected int _count; public int Count { get { return _count; } set { _count = value; } } public abstract void Increment(); public abstract void Decrement(); }
運行結果以下所示,與預期結果基本相符。優化
System.Threading.Mutex
在概念上和System.Threading.Monitor
幾乎同樣,可是Mutex
同步對文件或者其餘跨進程的資源進行訪問,也就是說Mutex
是可跨進程的。由於其特性,它的一個用途是限制應用程序不能同時運行多個實例。
Mutex
對象支持遞歸,也就是說同一個線程可屢次獲取同一個鎖,這在後面演示代碼中可觀察到。因爲Mutex
的基類System.Theading.WaitHandle
實現了IDisposable
接口,因此當不須要在使用它時要注意進行資源的釋放。更多資料:戳一戳
演示代碼以下所示,簡單的演示瞭如何建立單實例的應用程序和Mutex
遞歸獲取鎖的實現。
const string MutexName = "CSharpThreadingCookbook"; static void Main(string[] args) { // 使用using 及時釋放資源 using (var m = new Mutex(false, MutexName)) { if (!m.WaitOne(TimeSpan.FromSeconds(5), false)) { Console.WriteLine("已經有實例正在運行!"); } else { Console.WriteLine("運行中..."); // 演示遞歸獲取鎖 Recursion(); Console.ReadLine(); m.ReleaseMutex(); } } Console.ReadLine(); } static void Recursion() { using (var m = new Mutex(false, MutexName)) { if (!m.WaitOne(TimeSpan.FromSeconds(2), false)) { // 由於Mutex支持遞歸獲取鎖 因此永遠不會執行到這裏 Console.WriteLine("遞歸獲取鎖失敗!"); } else { Console.WriteLine("遞歸獲取鎖成功!"); } } }
運行結果以下圖所示,打開了兩個應用程序,由於使用Mutex
實現了單實例,因此第二個應用程序沒法獲取鎖,就會顯示已有實例正在運行。
SemaphoreSlim
類與以前提到的同步類有鎖不一樣,以前提到的同步類都是互斥的,也就是說只容許一個線程進行訪問資源,而SemaphoreSlim
是能夠容許多個訪問。
在以前的部分有提到,以*Slim
結尾的線程同步類,都是工做在混合模式下的,也就是說開始它們都是在用戶模式下"自旋",等發生第一次競爭時,才切換到內核模式。可是SemaphoreSlim
不一樣於Semaphore
類,它不支持系統信號量,因此它不能用於進程之間的同步。
該類使用比較簡單,演示代碼演示了6個線程競爭訪問只容許4個線程同時訪問的數據庫,以下所示。
static void Main(string[] args) { // 建立6個線程 競爭訪問AccessDatabase for (int i = 1; i <= 6; i++) { string threadName = "線程 " + i; // 越後面的線程,訪問時間越久 方便查看效果 int secondsToWait = 2 + 2 * i; var t = new Thread(() => AccessDatabase(threadName, secondsToWait)); t.Start(); } Console.ReadLine(); } // 同時容許4個線程訪問 static SemaphoreSlim _semaphore = new SemaphoreSlim(4); static void AccessDatabase(string name, int seconds) { Console.WriteLine($"{name} 等待訪問數據庫.... {DateTime.Now.ToString("HH:mm:ss.ffff")}"); // 等待獲取鎖 進入臨界區 _semaphore.Wait(); Console.WriteLine($"{name} 已獲取對數據庫的訪問權限 {DateTime.Now.ToString("HH:mm:ss.ffff")}"); // Do something Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine($"{name} 訪問完成... {DateTime.Now.ToString("HH:mm:ss.ffff")}"); // 釋放鎖 _semaphore.Release(); }
運行結果以下所示,可見前4個線程立刻就獲取到了鎖,進入了臨界區,而另外兩個線程在等待;等有鎖被釋放時,才能進入臨界區。
AutoResetEvent
叫自動重置事件,雖然名稱中有事件一詞,可是重置事件和C#中的委託沒有任何關係,這裏的事件只是由內核維護的Boolean
變量,當事件爲false
,那麼在事件上等待的線程就阻塞;事件變爲true
,那麼阻塞解除。
在.Net中有兩種此類事件,即AutoResetEvent(自動重置事件)
和ManualResetEvent(手動重置事件)
。這二者均是採用內核模式,它的區別在於當重置事件爲true
時,自動重置事件它只喚醒一個阻塞的線程,會自動將事件重置回false,形成其它線程繼續阻塞。而手動重置事件不會自動重置,必須經過代碼手動重置回false。
由於以上的緣由,因此在不少文章和書籍中不推薦使用AutoResetEvent(自動重置事件)
,由於它很容易在編寫生產者線程時發生失誤,形成它的迭代次數多餘消費者線程。
演示代碼以下所示,該代碼演示了經過AutoResetEvent
實現兩個線程的互相同步。
static void Main(string[] args) { var t = new Thread(() => Process(10)); t.Start(); Console.WriteLine("等待另外一個線程完成工做!"); // 等待工做線程通知 主線程阻塞 _workerEvent.WaitOne(); Console.WriteLine("第一個操做已經完成!"); Console.WriteLine("在主線程上執行操做"); Thread.Sleep(TimeSpan.FromSeconds(5)); // 發送通知 工做線程繼續運行 _mainEvent.Set(); Console.WriteLine("如今在第二個線程上運行第二個操做"); // 等待工做線程通知 主線程阻塞 _workerEvent.WaitOne(); Console.WriteLine("第二次操做完成!"); Console.ReadLine(); } // 工做線程Event private static AutoResetEvent _workerEvent = new AutoResetEvent(false); // 主線程Event private static AutoResetEvent _mainEvent = new AutoResetEvent(false); static void Process(int seconds) { Console.WriteLine("開始長時間的工做..."); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine("工做完成!"); // 發送通知 主線程繼續運行 _workerEvent.Set(); Console.WriteLine("等待主線程完成其它工做"); // 等待主線程通知 工做線程阻塞 _mainEvent.WaitOne(); Console.WriteLine("啓動第二次操做..."); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine("工做完成!"); // 發送通知 主線程繼續運行 _workerEvent.Set(); }
運行結果以下圖所示,與預期結果符合。
ManualResetEventSlim
使用和ManualResetEvent
類基本一致,只是ManualResetEventSlim
工做在混合模式下,而它與AutoResetEventSlim
不一樣的地方就是須要手動重置事件,也就是調用Reset()
才能將事件重置爲false
。
演示代碼以下,形象的將ManualResetEventSlim
比喻成大門,當事件爲true
時大門打開,線程解除阻塞;而事件爲false
時大門關閉,線程阻塞。
static void Main(string[] args) { var t1 = new Thread(() => TravelThroughGates("Thread 1", 5)); var t2 = new Thread(() => TravelThroughGates("Thread 2", 6)); var t3 = new Thread(() => TravelThroughGates("Thread 3", 12)); t1.Start(); t2.Start(); t3.Start(); // 休眠6秒鐘 只有Thread 1小於 6秒鐘,因此事件重置時 Thread 1 確定能進入大門 而 Thread 2 可能能夠進入大門 Thread.Sleep(TimeSpan.FromSeconds(6)); Console.WriteLine($"大門如今打開了! 時間:{DateTime.Now.ToString("mm:ss.ffff")}"); _mainEvent.Set(); // 休眠2秒鐘 此時 Thread 2 確定能夠進入大門 Thread.Sleep(TimeSpan.FromSeconds(2)); _mainEvent.Reset(); Console.WriteLine($"大門如今關閉了! 時間:{DateTime.Now.ToString("mm: ss.ffff")}"); // 休眠10秒鐘 Thread 3 能夠進入大門 Thread.Sleep(TimeSpan.FromSeconds(10)); Console.WriteLine($"大門如今第二次打開! 時間:{DateTime.Now.ToString("mm: ss.ffff")}"); _mainEvent.Set(); Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine($"大門如今關閉了! 時間:{DateTime.Now.ToString("mm: ss.ffff")}"); _mainEvent.Reset(); Console.ReadLine(); } static void TravelThroughGates(string threadName, int seconds) { Console.WriteLine($"{threadName} 進入睡眠 時間:{DateTime.Now.ToString("mm:ss.ffff")}"); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine($"{threadName} 等待大門打開! 時間:{DateTime.Now.ToString("mm:ss.ffff")}"); _mainEvent.Wait(); Console.WriteLine($"{threadName} 進入大門! 時間:{DateTime.Now.ToString("mm:ss.ffff")}"); } static ManualResetEventSlim _mainEvent = new ManualResetEventSlim(false);
運行結果以下,與預期結果相符。
CountDownEvent
類內部構造使用了一個ManualResetEventSlim
對象。這個構造阻塞一個線程,直到它內部計數器(CurrentCount)
變爲0
時,才解除阻塞。也就是說它並非阻止對已經枯竭的資源池的訪問,而是隻有當計數爲0
時才容許訪問。
這裏須要注意的是,當CurrentCount
變爲0
時,那麼它就不能被更改了。爲0
之後,Wait()
方法的阻塞被解除。
演示代碼以下所示,只有當Signal()
方法被調用2次之後,Wait()
方法的阻塞才被解除。
static void Main(string[] args) { Console.WriteLine($"開始兩個操做 {DateTime.Now.ToString("mm:ss.ffff")}"); var t1 = new Thread(() => PerformOperation("操做 1 完成!", 4)); var t2 = new Thread(() => PerformOperation("操做 2 完成!", 8)); t1.Start(); t2.Start(); // 等待操做完成 _countdown.Wait(); Console.WriteLine($"全部操做都完成 {DateTime.Now.ToString("mm: ss.ffff")}"); _countdown.Dispose(); Console.ReadLine(); } // 構造函數的參數爲2 表示只有調用了兩次 Signal方法 CurrentCount 爲 0時 Wait的阻塞才解除 static CountdownEvent _countdown = new CountdownEvent(2); static void PerformOperation(string message, int seconds) { Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine($"{message} {DateTime.Now.ToString("mm:ss.ffff")}"); // CurrentCount 遞減 1 _countdown.Signal(); }
運行結果以下圖所示,可見只有當操做1和操做2都完成之後,才執行輸出全部操做都完成。
Barrier
類用於解決一個很是稀有的問題,平時通常用不上。Barrier
類控制一系列線程進行階段性的並行工做。
假設如今並行工做分爲2個階段,每一個線程在完成它本身那部分階段1的工做後,必須停下來等待其它線程完成階段1的工做;等全部線程均完成階段1工做後,每一個線程又開始運行,完成階段2工做,等待其它線程所有完成階段2工做後,整個流程才結束。
演示代碼以下所示,該代碼演示了兩個線程分階段的完成工做。
static void Main(string[] args) { var t1 = new Thread(() => PlayMusic("鋼琴家", "演奏一首使人驚歎的獨奏曲", 5)); var t2 = new Thread(() => PlayMusic("歌手", "唱着他的歌", 2)); t1.Start(); t2.Start(); Console.ReadLine(); } static Barrier _barrier = new Barrier(2, Console.WriteLine($"第 {b.CurrentPhaseNumber + 1} 階段結束")); static void PlayMusic(string name, string message, int seconds) { for (int i = 1; i < 3; i++) { Console.WriteLine("----------------------------------------------"); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine($"{name} 開始 {message}"); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine($"{name} 結束 {message}"); _barrier.SignalAndWait(); } }
運行結果以下所示,當「歌手」線程完成後,並無立刻結束,而是等待「鋼琴家」線程結束,當"鋼琴家"線程結束後,纔開始第2階段的工做。
ReaderWriterLockSlim
類主要是解決在某些場景下,讀操做多於寫操做而使用某些互斥鎖當多個線程同時訪問資源時,只有一個線程能訪問,致使性能急劇降低。
若是全部線程都但願以只讀的方式訪問數據,就根本沒有必要阻塞它們;若是一個線程但願修改數據,那麼這個線程才須要獨佔訪問,這就是ReaderWriterLockSlim
的典型應用場景。這個類就像下面這樣來控制線程。
- 一個線程向數據寫入是,請求訪問的其餘全部線程都被阻塞。
- 一個線程讀取數據時,請求讀取的線程容許讀取,而請求寫入的線程被阻塞。
- 寫入線程結束後,要麼解除一個寫入線程的阻塞,使寫入線程能向數據接入,要麼解除全部讀取線程的阻塞,使它們能併發讀取數據。若是線程沒有被阻塞,鎖就能夠進入自由使用的狀態,可供下一個讀線程或寫線程獲取。
- 從數據讀取的全部線程結束後,一個寫線程被解除阻塞,使它能向數據寫入。若是線程沒有被阻塞,鎖就能夠進入自由使用的狀態,可供下一個讀線程或寫線程獲取。
ReaderWriterLockSlim
還支持從讀線程升級爲寫線程的操做,詳情請戳一戳。文本不做介紹。ReaderWriterLock
類已通過時,並且存在許多問題,沒有必要去使用。
示例代碼以下所示,建立了3個讀線程,2個寫線程,讀線程和寫線程競爭獲取鎖。
static void Main(string[] args) { // 建立3個 讀線程 new Thread(() => Read("Reader 1")) { IsBackground = true }.Start(); new Thread(() => Read("Reader 2")) { IsBackground = true }.Start(); new Thread(() => Read("Reader 3")) { IsBackground = true }.Start(); // 建立兩個寫線程 new Thread(() => Write("Writer 1")) { IsBackground = true }.Start(); new Thread(() => Write("Writer 2")) { IsBackground = true }.Start(); // 使程序運行30S Thread.Sleep(TimeSpan.FromSeconds(30)); Console.ReadLine(); } static ReaderWriterLockSlim _rw = new ReaderWriterLockSlim(); static Dictionary<int, int> _items = new Dictionary<int, int>(); static void Read(string threadName) { while (true) { try { // 獲取讀鎖定 _rw.EnterReadLock(); Console.WriteLine($"{threadName} 從字典中讀取內容 {DateTime.Now.ToString("mm:ss.ffff")}"); foreach (var key in _items.Keys) { Thread.Sleep(TimeSpan.FromSeconds(0.1)); } } finally { // 釋放讀鎖定 _rw.ExitReadLock(); } } } static void Write(string threadName) { while (true) { try { int newKey = new Random().Next(250); // 嘗試進入可升級鎖模式狀態 _rw.EnterUpgradeableReadLock(); if (!_items.ContainsKey(newKey)) { try { // 獲取寫鎖定 _rw.EnterWriteLock(); _items[newKey] = 1; Console.WriteLine($"{threadName} 將新的鍵 {newKey} 添加進入字典中 {DateTime.Now.ToString("mm:ss.ffff")}"); } finally { // 釋放寫鎖定 _rw.ExitWriteLock(); } } Thread.Sleep(TimeSpan.FromSeconds(0.1)); } finally { // 減小可升級模式遞歸計數,並在計數爲0時 推出可升級模式 _rw.ExitUpgradeableReadLock(); } } }
運行結果以下所示,與預期結果相符。
SpinWait
是一個經常使用的混合模式的類,它被設計成使用用戶模式等待一段時間,人後切換至內核模式以節省CPU時間。
它的使用很是簡單,演示代碼以下所示。
static void Main(string[] args) { var t1 = new Thread(UserModeWait); var t2 = new Thread(HybridSpinWait); Console.WriteLine("運行在用戶模式下"); t1.Start(); Thread.Sleep(20); _isCompleted = true; Thread.Sleep(TimeSpan.FromSeconds(1)); _isCompleted = false; Console.WriteLine("運行在混合模式下"); t2.Start(); Thread.Sleep(5); _isCompleted = true; Console.ReadLine(); } static volatile bool _isCompleted = false; static void UserModeWait() { while (!_isCompleted) { Console.Write("."); } Console.WriteLine(); Console.WriteLine("等待結束"); } static void HybridSpinWait() { var w = new SpinWait(); while (!_isCompleted) { w.SpinOnce(); Console.WriteLine(w.NextSpinWillYield); } Console.WriteLine("等待結束"); }
運行結果以下兩圖所示,首先程序運行在模擬的用戶模式下,使CPU有一個短暫的峯值。而後使用SpinWait
工做在混合模式下,首先標誌變量爲False
處於用戶模式自旋中,等待之後進入內核模式。
本文主要參考瞭如下幾本書,在此對這些做者表示由衷的感謝大家提供了這麼好的資料。
- 《CLR via C#》
- 《C# in Depth Third Edition》
- 《Essential C# 6.0》
- 《Multithreading with C# Cookbook Second Edition》
源碼下載點擊連接 示例源碼下載