C#多線程編程系列(三)- 線程同步



1.1 簡介

本章介紹在C#中實現線程同步的幾種方法。由於多個線程同時訪問共享數據時,可能會形成共享數據的損壞,從而致使與預期的結果不相符。爲了解決這個問題,因此須要用到線程同步,也被俗稱爲「加鎖」。可是加鎖絕對不對提升性能,最多也就是不增不減,要實現性能不增不減還得靠高質量的同步源語(Synchronization Primitive)。可是由於正確永遠比速度更重要,因此線程同步在某些場景下是必須的。數據庫

線程同步有兩種源語(Primitive)構造:用戶模式(user - mode)內核模式(kernel - mode),當資源可用時間短的狀況下,用戶模式要優於內核模式,可是若是長時間不能得到資源,或者說長時間處於「自旋」,那麼內核模式是相對來講好的選擇。c#

可是咱們但願兼具用戶模式和內核模式的優勢,咱們把它稱爲混合構造(hybrid construct),它兼具了兩種模式的優勢。併發

在C#中有多種線程同步的機制,一般能夠按照如下順序進行選擇。dom

  1. 若是代碼能經過優化能夠不進行同步,那麼就不要作同步。
  2. 使用原子性的Interlocked方法。
  3. 使用lock/Monitor類。
  4. 使用異步鎖,如SemaphoreSlim.WaitAsync()
  5. 使用其它加鎖機制,如ReaderWriterLockSlim、Mutex、Semaphore等。
  6. 若是系統提供了*Slim版本的異步對象,那麼請選用它,由於*Slim版本所有都是混合鎖,在進入內核模式前實現了某種形式的自旋。

在同步中,必定要注意避免死鎖的發生,死鎖的發生必須知足如下4個基本條件,因此只須要破壞任意一個條件,就可避免發生死鎖。異步

  1. 排他或互斥(Mutual exclusion):一個線程(ThreadA)獨佔一個資源,沒有其它線程(ThreadB)能獲取相同的資源。
  2. 佔有並等待(Hold and wait):互斥的一個線程(ThreadA)請求獲取另外一個線程(ThreadB)佔有的資源.
  3. 不可搶先(No preemption):一個線程(ThreadA)佔有資源不能被強制拿走(只能等待ThreadA主動釋放它的資源)。
  4. 循環等待條件(Circular wait condition):兩個或多個線程構成一個循環等待鏈,它們鎖定兩個或多個相同的資源,每一個線程都在等待鏈中的下一個線程佔有的資源。

1.2 執行基本原子操做

CLR保證了對這些數據類型的讀寫是原子性的:Boolean、Char、(S)Byte、(U)Int1六、(U)Int3二、(U)IntPtr和Single。可是若是讀寫Int64可能會發生讀取撕裂(torn read)的問題,由於在32位操做系統中,它須要執行兩次Mov操做,沒法在一個時間內執行完成。ide

那麼在本節中,就會着重的介紹System.Threading.Interlocked類提供的方法,Interlocked類中的每一個方法都是執行一次的讀取以及寫入操做。更多與Interlocked類相關的資料請參考連接,戳一戳本文不在贅述。函數

演示代碼以下所示,分別使用了三種方式進行計數:錯誤計數方式、lock鎖方式和Interlocked原子方式。性能

private static void Main(string[] args)
{
    Console.WriteLine("錯誤的計數");

    var c = new Counter();
    Execute(c);

    Console.WriteLine("--------------------------");


    Console.WriteLine("正確的計數 - 有鎖");

    var c2 = new CounterWithLock();
    Execute(c2);

    Console.WriteLine("--------------------------");


    Console.WriteLine("正確的計數 - 無鎖");

    var c3 = new CounterNoLock();
    Execute(c3);

    Console.ReadLine();
}

static void Execute(CounterBase c)
{
    // 統計耗時
    var sw = new Stopwatch();
    sw.Start();

    var t1 = new Thread(() => TestCounter(c));
    var t2 = new Thread(() => TestCounter(c));
    var t3 = new Thread(() => TestCounter(c));
    t1.Start();
    t2.Start();
    t3.Start();
    t1.Join();
    t2.Join();
    t3.Join();

    sw.Stop();
    Console.WriteLine($"Total count: {c.Count} Time:{sw.ElapsedMilliseconds} ms");
}

static void TestCounter(CounterBase c)
{
    for (int i = 0; i < 100000; i++)
    {
        c.Increment();
        c.Decrement();
    }
}

class Counter : CounterBase
{
    public override void Increment()
    {
        _count++;
    }

    public override void Decrement()
    {
        _count--;
    }
}

class CounterNoLock : CounterBase
{
    public override void Increment()
    {
        // 使用Interlocked執行原子操做
        Interlocked.Increment(ref _count);
    }

    public override void Decrement()
    {
        Interlocked.Decrement(ref _count);
    }
}

class CounterWithLock : CounterBase
{
    private readonly object _syncRoot = new Object();

    public override void Increment()
    {
        // 使用Lock關鍵字 鎖定私有變量
        lock (_syncRoot)
        {
            // 同步塊
            Count++;
        }
    }

    public override void Decrement()
    {
        lock (_syncRoot)
        {
            Count--;
        }
    }
}


abstract class CounterBase
{
    protected int _count;

    public int Count
    {
        get
        {
            return _count;
        }
        set
        {
            _count = value;
        }
    }

    public abstract void Increment();

    public abstract void Decrement();
}

運行結果以下所示,與預期結果基本相符。優化

1533267651508

1.3 使用Mutex類

System.Threading.Mutex在概念上和System.Threading.Monitor幾乎同樣,可是Mutex同步對文件或者其餘跨進程的資源進行訪問,也就是說Mutex是可跨進程的。由於其特性,它的一個用途是限制應用程序不能同時運行多個實例。

Mutex對象支持遞歸,也就是說同一個線程可屢次獲取同一個鎖,這在後面演示代碼中可觀察到。因爲Mutex的基類System.Theading.WaitHandle實現了IDisposable接口,因此當不須要在使用它時要注意進行資源的釋放。更多資料:戳一戳

演示代碼以下所示,簡單的演示瞭如何建立單實例的應用程序和Mutex遞歸獲取鎖的實現。

const string MutexName = "CSharpThreadingCookbook";

static void Main(string[] args)
{
    // 使用using 及時釋放資源
    using (var m = new Mutex(false, MutexName))
    {
        if (!m.WaitOne(TimeSpan.FromSeconds(5), false))
        {
            Console.WriteLine("已經有實例正在運行!");
        }
        else
        {

            Console.WriteLine("運行中...");

            // 演示遞歸獲取鎖
            Recursion();

            Console.ReadLine();
            m.ReleaseMutex();
        }
    }

    Console.ReadLine();
}

static void Recursion()
{
    using (var m = new Mutex(false, MutexName))
    {
        if (!m.WaitOne(TimeSpan.FromSeconds(2), false))
        {
            // 由於Mutex支持遞歸獲取鎖 因此永遠不會執行到這裏
            Console.WriteLine("遞歸獲取鎖失敗!");
        }
        else
        {
            Console.WriteLine("遞歸獲取鎖成功!");
        }
    }
}

運行結果以下圖所示,打開了兩個應用程序,由於使用Mutex實現了單實例,因此第二個應用程序沒法獲取鎖,就會顯示已有實例正在運行

1533278259064

1.4 使用SemaphoreSlim類

SemaphoreSlim類與以前提到的同步類有鎖不一樣,以前提到的同步類都是互斥的,也就是說只容許一個線程進行訪問資源,而SemaphoreSlim是能夠容許多個訪問。

在以前的部分有提到,以*Slim結尾的線程同步類,都是工做在混合模式下的,也就是說開始它們都是在用戶模式下"自旋",等發生第一次競爭時,才切換到內核模式。可是SemaphoreSlim不一樣於Semaphore類,它不支持系統信號量,因此它不能用於進程之間的同步

該類使用比較簡單,演示代碼演示了6個線程競爭訪問只容許4個線程同時訪問的數據庫,以下所示。

static void Main(string[] args)
{
    // 建立6個線程 競爭訪問AccessDatabase
    for (int i = 1; i <= 6; i++)
    {
        string threadName = "線程 " + i;
        // 越後面的線程,訪問時間越久 方便查看效果
        int secondsToWait = 2 + 2 * i;
        var t = new Thread(() => AccessDatabase(threadName, secondsToWait));
        t.Start();
    }

    Console.ReadLine();
}

// 同時容許4個線程訪問
static SemaphoreSlim _semaphore = new SemaphoreSlim(4);

static void AccessDatabase(string name, int seconds)
{
    Console.WriteLine($"{name} 等待訪問數據庫.... {DateTime.Now.ToString("HH:mm:ss.ffff")}");

    // 等待獲取鎖 進入臨界區
    _semaphore.Wait();

    Console.WriteLine($"{name} 已獲取對數據庫的訪問權限 {DateTime.Now.ToString("HH:mm:ss.ffff")}");
    // Do something
    Thread.Sleep(TimeSpan.FromSeconds(seconds));

    Console.WriteLine($"{name} 訪問完成... {DateTime.Now.ToString("HH:mm:ss.ffff")}");
    // 釋放鎖
    _semaphore.Release();
}

運行結果以下所示,可見前4個線程立刻就獲取到了鎖,進入了臨界區,而另外兩個線程在等待;等有鎖被釋放時,才能進入臨界區。1533281733322

1.5 使用AutoResetEvent類

AutoResetEvent叫自動重置事件,雖然名稱中有事件一詞,可是重置事件和C#中的委託沒有任何關係,這裏的事件只是由內核維護的Boolean變量,當事件爲false,那麼在事件上等待的線程就阻塞;事件變爲true,那麼阻塞解除。

在.Net中有兩種此類事件,即AutoResetEvent(自動重置事件)ManualResetEvent(手動重置事件)。這二者均是採用內核模式,它的區別在於當重置事件爲true時,自動重置事件它只喚醒一個阻塞的線程,會自動將事件重置回false,形成其它線程繼續阻塞。而手動重置事件不會自動重置,必須經過代碼手動重置回false

由於以上的緣由,因此在不少文章和書籍中不推薦使用AutoResetEvent(自動重置事件),由於它很容易在編寫生產者線程時發生失誤,形成它的迭代次數多餘消費者線程。

演示代碼以下所示,該代碼演示了經過AutoResetEvent實現兩個線程的互相同步。

static void Main(string[] args)
{
    var t = new Thread(() => Process(10));
    t.Start();

    Console.WriteLine("等待另外一個線程完成工做!");
    // 等待工做線程通知 主線程阻塞
    _workerEvent.WaitOne();
    Console.WriteLine("第一個操做已經完成!");
    Console.WriteLine("在主線程上執行操做");
    Thread.Sleep(TimeSpan.FromSeconds(5));

    // 發送通知 工做線程繼續運行
    _mainEvent.Set();
    Console.WriteLine("如今在第二個線程上運行第二個操做");

    // 等待工做線程通知 主線程阻塞
    _workerEvent.WaitOne();
    Console.WriteLine("第二次操做完成!");

    Console.ReadLine();
}

// 工做線程Event
private static AutoResetEvent _workerEvent = new AutoResetEvent(false);
// 主線程Event
private static AutoResetEvent _mainEvent = new AutoResetEvent(false);

static void Process(int seconds)
{
    Console.WriteLine("開始長時間的工做...");
    Thread.Sleep(TimeSpan.FromSeconds(seconds));
    Console.WriteLine("工做完成!");

    // 發送通知 主線程繼續運行
    _workerEvent.Set();
    Console.WriteLine("等待主線程完成其它工做");

    // 等待主線程通知 工做線程阻塞
    _mainEvent.WaitOne();
    Console.WriteLine("啓動第二次操做...");
    Thread.Sleep(TimeSpan.FromSeconds(seconds));
    Console.WriteLine("工做完成!");

    // 發送通知 主線程繼續運行
    _workerEvent.Set();
}

運行結果以下圖所示,與預期結果符合。

1533286221209

1.6 使用ManualResetEventSlim類

ManualResetEventSlim使用和ManualResetEvent類基本一致,只是ManualResetEventSlim工做在混合模式下,而它與AutoResetEventSlim不一樣的地方就是須要手動重置事件,也就是調用Reset()才能將事件重置爲false

演示代碼以下,形象的將ManualResetEventSlim比喻成大門,當事件爲true時大門打開,線程解除阻塞;而事件爲false時大門關閉,線程阻塞。

static void Main(string[] args)
        {
            var t1 = new Thread(() => TravelThroughGates("Thread 1", 5));
            var t2 = new Thread(() => TravelThroughGates("Thread 2", 6));
            var t3 = new Thread(() => TravelThroughGates("Thread 3", 12));
            t1.Start();
            t2.Start();
            t3.Start();

            // 休眠6秒鐘  只有Thread 1小於 6秒鐘,因此事件重置時 Thread 1 確定能進入大門  而 Thread 2 可能能夠進入大門
            Thread.Sleep(TimeSpan.FromSeconds(6));
            Console.WriteLine($"大門如今打開了!  時間:{DateTime.Now.ToString("mm:ss.ffff")}");
            _mainEvent.Set();

            // 休眠2秒鐘 此時 Thread 2 確定能夠進入大門
            Thread.Sleep(TimeSpan.FromSeconds(2));
            _mainEvent.Reset();
            Console.WriteLine($"大門如今關閉了! 時間:{DateTime.Now.ToString("mm: ss.ffff")}");

            // 休眠10秒鐘 Thread 3 能夠進入大門
            Thread.Sleep(TimeSpan.FromSeconds(10));
            Console.WriteLine($"大門如今第二次打開! 時間:{DateTime.Now.ToString("mm: ss.ffff")}");
            _mainEvent.Set();
            Thread.Sleep(TimeSpan.FromSeconds(2));

            Console.WriteLine($"大門如今關閉了! 時間:{DateTime.Now.ToString("mm: ss.ffff")}");
            _mainEvent.Reset();

            Console.ReadLine();
        }

        static void TravelThroughGates(string threadName, int seconds)
        {
            Console.WriteLine($"{threadName} 進入睡眠 時間:{DateTime.Now.ToString("mm:ss.ffff")}");
            Thread.Sleep(TimeSpan.FromSeconds(seconds));

            Console.WriteLine($"{threadName} 等待大門打開! 時間:{DateTime.Now.ToString("mm:ss.ffff")}");
            _mainEvent.Wait();

            Console.WriteLine($"{threadName} 進入大門! 時間:{DateTime.Now.ToString("mm:ss.ffff")}");
        }

        static ManualResetEventSlim _mainEvent = new ManualResetEventSlim(false);

運行結果以下,與預期結果相符。

1533287222335

1.7 使用CountDownEvent類

CountDownEvent類內部構造使用了一個ManualResetEventSlim對象。這個構造阻塞一個線程,直到它內部計數器(CurrentCount)變爲0時,才解除阻塞。也就是說它並非阻止對已經枯竭的資源池的訪問,而是隻有當計數爲0時才容許訪問。

這裏須要注意的是,當CurrentCount變爲0時,那麼它就不能被更改了。爲0之後,Wait()方法的阻塞被解除。

演示代碼以下所示,只有當Signal()方法被調用2次之後,Wait()方法的阻塞才被解除。

static void Main(string[] args)
{
    Console.WriteLine($"開始兩個操做  {DateTime.Now.ToString("mm:ss.ffff")}");
    var t1 = new Thread(() => PerformOperation("操做 1 完成!", 4));
    var t2 = new Thread(() => PerformOperation("操做 2 完成!", 8));
    t1.Start();
    t2.Start();

    // 等待操做完成
    _countdown.Wait();
    Console.WriteLine($"全部操做都完成  {DateTime.Now.ToString("mm: ss.ffff")}");
    _countdown.Dispose();

    Console.ReadLine();
}

// 構造函數的參數爲2 表示只有調用了兩次 Signal方法 CurrentCount 爲 0時  Wait的阻塞才解除
static CountdownEvent _countdown = new CountdownEvent(2);

static void PerformOperation(string message, int seconds)
{
    Thread.Sleep(TimeSpan.FromSeconds(seconds));
    Console.WriteLine($"{message}  {DateTime.Now.ToString("mm:ss.ffff")}");

    // CurrentCount 遞減 1
    _countdown.Signal();
}

運行結果以下圖所示,可見只有當操做1和操做2都完成之後,才執行輸出全部操做都完成。

1533296843834

1.8 使用Barrier類

Barrier類用於解決一個很是稀有的問題,平時通常用不上。Barrier類控制一系列線程進行階段性的並行工做。

假設如今並行工做分爲2個階段,每一個線程在完成它本身那部分階段1的工做後,必須停下來等待其它線程完成階段1的工做;等全部線程均完成階段1工做後,每一個線程又開始運行,完成階段2工做,等待其它線程所有完成階段2工做後,整個流程才結束。

演示代碼以下所示,該代碼演示了兩個線程分階段的完成工做。

static void Main(string[] args)
{
    var t1 = new Thread(() => PlayMusic("鋼琴家", "演奏一首使人驚歎的獨奏曲", 5));
    var t2 = new Thread(() => PlayMusic("歌手", "唱着他的歌", 2));

    t1.Start();
    t2.Start();

    Console.ReadLine();
}

static Barrier _barrier = new Barrier(2,
 Console.WriteLine($"第 {b.CurrentPhaseNumber + 1} 階段結束"));

static void PlayMusic(string name, string message, int seconds)
{
    for (int i = 1; i < 3; i++)
    {
        Console.WriteLine("----------------------------------------------");
        Thread.Sleep(TimeSpan.FromSeconds(seconds));
        Console.WriteLine($"{name} 開始 {message}");
        Thread.Sleep(TimeSpan.FromSeconds(seconds));
        Console.WriteLine($"{name} 結束 {message}");
        _barrier.SignalAndWait();
    }
}

運行結果以下所示,當「歌手」線程完成後,並無立刻結束,而是等待「鋼琴家」線程結束,當"鋼琴家"線程結束後,纔開始第2階段的工做。

1533297859508

1.9 使用ReaderWriterLockSlim類

ReaderWriterLockSlim類主要是解決在某些場景下,讀操做多於寫操做而使用某些互斥鎖當多個線程同時訪問資源時,只有一個線程能訪問,致使性能急劇降低。

若是全部線程都但願以只讀的方式訪問數據,就根本沒有必要阻塞它們;若是一個線程但願修改數據,那麼這個線程才須要獨佔訪問,這就是ReaderWriterLockSlim的典型應用場景。這個類就像下面這樣來控制線程。

  • 一個線程向數據寫入是,請求訪問的其餘全部線程都被阻塞。
  • 一個線程讀取數據時,請求讀取的線程容許讀取,而請求寫入的線程被阻塞。
  • 寫入線程結束後,要麼解除一個寫入線程的阻塞,使寫入線程能向數據接入,要麼解除全部讀取線程的阻塞,使它們能併發讀取數據。若是線程沒有被阻塞,鎖就能夠進入自由使用的狀態,可供下一個讀線程或寫線程獲取。
  • 從數據讀取的全部線程結束後,一個寫線程被解除阻塞,使它能向數據寫入。若是線程沒有被阻塞,鎖就能夠進入自由使用的狀態,可供下一個讀線程或寫線程獲取。

ReaderWriterLockSlim還支持從讀線程升級爲寫線程的操做,詳情請戳一戳。文本不做介紹。ReaderWriterLock類已通過時,並且存在許多問題,沒有必要去使用。

示例代碼以下所示,建立了3個讀線程,2個寫線程,讀線程和寫線程競爭獲取鎖。

static void Main(string[] args)
{
    // 建立3個 讀線程
    new Thread(() => Read("Reader 1")) { IsBackground = true }.Start();
    new Thread(() => Read("Reader 2")) { IsBackground = true }.Start();
    new Thread(() => Read("Reader 3")) { IsBackground = true }.Start();

    // 建立兩個寫線程
    new Thread(() => Write("Writer 1")) { IsBackground = true }.Start();
    new Thread(() => Write("Writer 2")) { IsBackground = true }.Start();

    // 使程序運行30S
    Thread.Sleep(TimeSpan.FromSeconds(30));

    Console.ReadLine();
}

static ReaderWriterLockSlim _rw = new ReaderWriterLockSlim();
static Dictionary<int, int> _items = new Dictionary<int, int>();

static void Read(string threadName)
{
    while (true)
    {
        try
        {
            // 獲取讀鎖定
            _rw.EnterReadLock();
            Console.WriteLine($"{threadName} 從字典中讀取內容  {DateTime.Now.ToString("mm:ss.ffff")}");
            foreach (var key in _items.Keys)
            {
                Thread.Sleep(TimeSpan.FromSeconds(0.1));
            }
        }
        finally
        {
            // 釋放讀鎖定
            _rw.ExitReadLock();
        }
    }
}

static void Write(string threadName)
{
    while (true)
    {
        try
        {
            int newKey = new Random().Next(250);
            // 嘗試進入可升級鎖模式狀態
            _rw.EnterUpgradeableReadLock();
            if (!_items.ContainsKey(newKey))
            {
                try
                {
                    // 獲取寫鎖定
                    _rw.EnterWriteLock();
                    _items[newKey] = 1;
                    Console.WriteLine($"{threadName} 將新的鍵 {newKey} 添加進入字典中  {DateTime.Now.ToString("mm:ss.ffff")}");
                }
                finally
                {
                    // 釋放寫鎖定
                    _rw.ExitWriteLock();
                }
            }
            Thread.Sleep(TimeSpan.FromSeconds(0.1));
        }
        finally
        {
            // 減小可升級模式遞歸計數,並在計數爲0時  推出可升級模式
            _rw.ExitUpgradeableReadLock();
        }
    }
}

運行結果以下所示,與預期結果相符。

1533301318169

1.10 使用SpinWait類

SpinWait是一個經常使用的混合模式的類,它被設計成使用用戶模式等待一段時間,人後切換至內核模式以節省CPU時間。

它的使用很是簡單,演示代碼以下所示。

static void Main(string[] args)
{
    var t1 = new Thread(UserModeWait);
    var t2 = new Thread(HybridSpinWait);

    Console.WriteLine("運行在用戶模式下");
    t1.Start();
    Thread.Sleep(20);
    _isCompleted = true;
    Thread.Sleep(TimeSpan.FromSeconds(1));
    _isCompleted = false;

    Console.WriteLine("運行在混合模式下");
    t2.Start();
    Thread.Sleep(5);
    _isCompleted = true;

    Console.ReadLine();
}

static volatile bool _isCompleted = false;

static void UserModeWait()
{
    while (!_isCompleted)
    {
        Console.Write(".");
    }
    Console.WriteLine();
    Console.WriteLine("等待結束");
}

static void HybridSpinWait()
{
    var w = new SpinWait();
    while (!_isCompleted)
    {
        w.SpinOnce();
        Console.WriteLine(w.NextSpinWillYield);
    }
    Console.WriteLine("等待結束");
}

運行結果以下兩圖所示,首先程序運行在模擬的用戶模式下,使CPU有一個短暫的峯值。而後使用SpinWait工做在混合模式下,首先標誌變量爲False處於用戶模式自旋中,等待之後進入內核模式。

1533302799590

1533302772242

參考書籍

本文主要參考瞭如下幾本書,在此對這些做者表示由衷的感謝大家提供了這麼好的資料。

  1. 《CLR via C#》
  2. 《C# in Depth Third Edition》
  3. 《Essential C# 6.0》
  4. 《Multithreading with C# Cookbook Second Edition》

源碼下載點擊連接 示例源碼下載

筆者水平有限,若是錯誤歡迎各位批評指正!

相關文章
相關標籤/搜索