C++11 併發編程教程 - Part 2 : 保護共享數據

上一篇文章咱們講到如何啓動一些線程去併發地執行某些操做,雖然那些在線程裏執行的代碼都是獨立的,但一般狀況下,你都會在這些線程之間使用到共享數據。一旦你這麼作了,就面臨着一個新的問題 —— 同步。 編程

   下面讓咱們用示例來闡釋「同步」是個什麼問題。
安全


同步問題
多線程

   咱們就拿一個簡單的計數器做爲示例吧。這個計數器是一個結構體,他擁有一個計數變量,以及增長或減小計數的函數,看起來像這個樣子: 併發

   [譯註:原文 Counter  value 並未初始化,其初始值隨機,讀者可自行初始化爲 0 ]
app

1
2
3
4
5
6
struct  Counter {
    int  value;
    void  increment(){
        ++value;
    }
};

   這並沒什麼稀奇的,下面讓咱們來啓動一些線程來增長計數器的計數吧。 less

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
int  main(){
    Counter counter;
    std::vector<std::thread> threads;
    for(int  i = 0; i < 5; ++i){
        threads.push_back(std::thread([&counter](){
            for(int  i = 0; i < 100; ++i){
                counter.increment();
            }
        }));
    }
    for(auto&  thread  : threads){
        thread.join();
    }
    std::cout << counter.value << std::endl;
    return  0;
}


   [譯註:bill的測試環境下,上述代碼始終輸出 500,讀者可將外層 for 循環條件改成 i < 100,內層for 循環條件改成 i < 99999 以觀察實驗結果] 函數

   一樣的,也沒什麼新花樣,咱們只是啓動了 5 個線程,每一個線程都讓計數器增長 100 次而已。等這一工做結束,咱們就打印計數器最後的數值。 學習

   若是運行這一程序,咱們理所固然的指望運行結果是 500,但事與願違,沒人能保證這個程序最終輸出什麼。下面是在個人機器上獲得的一些結果:
測試

1
2
3
4
5
6
442
500
477
400
422
487

   問題的根源在於計數器的 increment() 並不是原子操做,而是由 3 個獨立的操做組成的: spa

       1. 讀取 value 變量的當前值。

       2. 將讀取的當前值加 1

       3. 將加 1 後的值寫回 value 變量。

   當你以單線程運行上述代碼時,就不會出現任何問題,上述三個步驟會按照順序依次執行。可是一旦你身處多線程環境,狀況就會變得糟糕起來,考慮以下執行順序:


       1. 線程a:讀取 value 的當前值,獲得值爲 0。加1。所以 value = 1。[譯註:此時 1 並無寫回value 內存,原文「value = 1」僅做邏輯意義,下同]

       2. 線程b讀取 value 的當前值,獲得值爲 0。加1。所以 value = 1

       3. 線程a:將 1 寫回 value 內存並返回 1

       4. 線程b:將 1 寫回 value 內存並返回 1


   這種狀況源於線程間的 interleavingInterleaving 描述了多線程同時執行幾句代碼的各類狀況。就算僅僅只有兩個線程同時執行這三個操做,也會存在不少可能的 interleaving。當你有許多線程同時執行多個操做時,要想枚舉出全部 interleaving,幾乎是不可能的。並且若是線程在執行單個操做的不一樣指令之間被搶佔,也會致使 interleaving 的發生。

   目前有許多能夠解決這一問題的方案:

  • Semaphores

  • Atomic references

  • Monitors

  • Condition codes

  • Compare and swap

  • etc.

   就本文而言,咱們將學習如何使用 Semaphores 去解決這一問題。事實上,咱們僅僅使用了Semaphores 中比較特殊的一種 —— 互斥量。互斥量是一個特殊的對象,在同一時刻只有一個線程可以獲得該對象上的鎖。藉助互斥量這種簡而有力的性質,咱們即可以解決線程同步問題。


使用互斥量保證 Counter 的線程安全

   在 C++11 的線程庫中,互斥量被放置於頭文件 <mutex>,並以 std::mutex 類加以實現。互斥量有兩個重要的函數:lock()  unlock()。顧名思義,前者使當前線程嘗試獲取互斥量的鎖,後者則釋放已經獲取的鎖。lock() 函數是阻塞式的,線程一旦調用 lock(),就會一直阻塞直到該線程得到對應的鎖。

   爲了使咱們的計數器具有線程安全性,咱們須要對其添加 std::mutex 成員,並在成員函數中對互斥量進行 lock()/unlock() 調用。

1
2
3
4
5
6
7
8
9
10
struct  Counter {
    std::mutex mutex;
    int  value;
    Counter() : value(0) {}
    void  increment(){
        mutex.lock();
        ++value;
        mutex.unlock();
    }
};

   若是咱們如今再次運行以前的測試程序,咱們將始終獲得正確的輸出:500。


異常與鎖

如今讓咱們來看看另一種狀況會發生什麼。假設如今咱們的計數器擁有一個 derement() 操做,當  value 被減爲 0 時拋出一個異常: 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct  Counter {
    int  value;
                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           
    Counter() : value(0) {}
    void  increment(){
        ++value;
    }
    void  decrement(){
        if(value == 0){
            throw  "Value cannot be less than 0";
        }
        --value;
    }
};

   假設你想在不更改上述代碼的前提下爲其提供線程安全性,那麼你須要爲其建立一個 Wrapper 類:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct  ConcurrentCounter {
    std::mutex mutex;
    Counter counter;
    void  increment(){
        mutex.lock();
        counter.increment();
        mutex.unlock();
    }
    void  decrement(){
        mutex.lock();
        counter.decrement();  
        mutex.unlock();
    }
};

   這個 Wrapper 將在大多數狀況下正常工做,然而一旦 decrement() 拋出異常,你就遇到大麻煩了,當異常被拋出時,unlock() 函數將不會被調用,這將致使本線程得到的鎖不被釋放,你的程序也就瓜熟蒂落的被永久阻塞了。爲了修復這一問題,你須要使用 try/catch 塊以保證在拋出任何異常以前釋放得到的鎖。

1
2
3
4
5
6
7
8
9
10
void  decrement(){
    mutex.lock();
    try  {
        counter.decrement();
    }  catch  (std::string e){
        mutex.unlock();
        throw  e;
    }
    mutex.unlock();
}

   代碼並不複雜,可是看起來卻很醜陋。試想一下,你如今的函數擁有 10 個返回點,那麼你就須要在每一個返回點前調用 unlock() 函數,而忘掉其中的某一個的可能性是很是大的。更大的風險在於你又添加了新的函數返回點,卻沒有對應地添加 unlock()。下一節將給出解決此問題的好辦法。


鎖的自動管理

   當你想保護整個代碼段(就本文而言是一個函數,但也能夠是某個循環體或其餘控制結構[譯註:即一個做用域])免受多線程的侵害時,有一個辦法將有助於防止忘記釋放鎖:std::lock_guard

   這個類是一個簡單、智能的鎖管理器。當 std::lock_guard 實例被建立時,它自動地調用互斥量的lock() 函數,當該實例被銷燬時,它也順帶釋放掉得到的鎖。你能夠像這樣使用它:

1
2
3
4
5
6
7
8
9
10
11
12
struct  ConcurrentSafeCounter {
    std::mutex mutex;
    Counter counter;
    void  increment(){
        std::lock_guard<std::mutex> guard(mutex);
        counter.increment();
    }
    void  decrement(){
        std::lock_guard<std::mutex> guard(mutex);
        counter.decrement();
    }
};

   代碼變得更整潔了不是嗎?

   使用這種方法,你無須繃緊神經關注每個函數返回點是否釋放了鎖,由於這個操做已經被std::lock_guard 實例的析構函數接管了。


總結

   如今咱們結束了短暫的 Semaphores 之旅。在本章中你學習瞭如何使用 C++ 線程庫中的互斥量來保護你的共享數據。

   但有一點請牢記:鎖機制會帶來效率的下降。的確,一旦使用鎖,你的部分代碼就變得有序[譯註:非併發]了。若是你想要設計一個高度併發的應用程序,你將會用到其餘一些比鎖更好的機制,但他們已不屬於本文的討論範疇。


下篇

   在本系列的下一篇文章中,我將談及關於互斥量的一些進階概念,並介紹如何使用條件變量去解決一些併發編程問題。

相關文章
相關標籤/搜索