上一篇文章咱們講到如何啓動一些線程去併發地執行某些操做,雖然那些在線程裏執行的代碼都是獨立的,但一般狀況下,你都會在這些線程之間使用到共享數據。一旦你這麼作了,就面臨着一個新的問題 —— 同步。 編程
下面讓咱們用示例來闡釋「同步」是個什麼問題。
安全
同步問題
多線程
咱們就拿一個簡單的計數器做爲示例吧。這個計數器是一個結構體,他擁有一個計數變量,以及增長或減小計數的函數,看起來像這個樣子: 併發
[譯註:原文 Counter 的 value 並未初始化,其初始值隨機,讀者可自行初始化爲 0 ]
app
1
2
3
4
5
6
|
struct
Counter {
int
value;
void
increment(){
++value;
}
};
|
這並沒什麼稀奇的,下面讓咱們來啓動一些線程來增長計數器的計數吧。 less
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
int
main(){
Counter counter;
std::vector<std::thread> threads;
for(int
i = 0; i < 5; ++i){
threads.push_back(std::thread([&counter](){
for(int
i = 0; i < 100; ++i){
counter.increment();
}
}));
}
for(auto&
thread
: threads){
thread.join();
}
std::cout << counter.value << std::endl;
return
0;
}
|
[譯註:bill的測試環境下,上述代碼始終輸出 500,讀者可將外層 for 循環條件改成 i < 100,內層for 循環條件改成 i < 99999 以觀察實驗結果] 函數
一樣的,也沒什麼新花樣,咱們只是啓動了 5 個線程,每一個線程都讓計數器增長 100 次而已。等這一工做結束,咱們就打印計數器最後的數值。 學習
若是運行這一程序,咱們理所固然的指望運行結果是 500,但事與願違,沒人能保證這個程序最終輸出什麼。下面是在個人機器上獲得的一些結果:
測試
1
2
3
4
5
6
|
442
500
477
400
422
487
|
問題的根源在於計數器的 increment() 並不是原子操做,而是由 3 個獨立的操做組成的: spa
1. 讀取 value 變量的當前值。
2. 將讀取的當前值加 1。
3. 將加 1 後的值寫回 value 變量。
當你以單線程運行上述代碼時,就不會出現任何問題,上述三個步驟會按照順序依次執行。可是一旦你身處多線程環境,狀況就會變得糟糕起來,考慮以下執行順序:
1. 線程a:讀取 value 的當前值,獲得值爲 0。加1。所以 value = 1。[譯註:此時 1 並無寫回value 內存,原文「value = 1」僅做邏輯意義,下同]
2. 線程b:讀取 value 的當前值,獲得值爲 0。加1。所以 value = 1。
3. 線程a:將 1 寫回 value 內存並返回 1。
4. 線程b:將 1 寫回 value 內存並返回 1。
這種狀況源於線程間的 interleaving。Interleaving 描述了多線程同時執行幾句代碼的各類狀況。就算僅僅只有兩個線程同時執行這三個操做,也會存在不少可能的 interleaving。當你有許多線程同時執行多個操做時,要想枚舉出全部 interleaving,幾乎是不可能的。並且若是線程在執行單個操做的不一樣指令之間被搶佔,也會致使 interleaving 的發生。
目前有許多能夠解決這一問題的方案:
Semaphores
Atomic references
Monitors
Condition codes
Compare and swap
etc.
就本文而言,咱們將學習如何使用 Semaphores 去解決這一問題。事實上,咱們僅僅使用了Semaphores 中比較特殊的一種 —— 互斥量。互斥量是一個特殊的對象,在同一時刻只有一個線程可以獲得該對象上的鎖。藉助互斥量這種簡而有力的性質,咱們即可以解決線程同步問題。
使用互斥量保證 Counter 的線程安全
在 C++11 的線程庫中,互斥量被放置於頭文件 <mutex>,並以 std::mutex 類加以實現。互斥量有兩個重要的函數:lock() 和 unlock()。顧名思義,前者使當前線程嘗試獲取互斥量的鎖,後者則釋放已經獲取的鎖。lock() 函數是阻塞式的,線程一旦調用 lock(),就會一直阻塞直到該線程得到對應的鎖。
爲了使咱們的計數器具有線程安全性,咱們須要對其添加 std::mutex 成員,並在成員函數中對互斥量進行 lock()/unlock() 調用。
1
2
3
4
5
6
7
8
9
10
|
struct
Counter {
std::mutex mutex;
int
value;
Counter() : value(0) {}
void
increment(){
mutex.lock();
++value;
mutex.unlock();
}
};
|
若是咱們如今再次運行以前的測試程序,咱們將始終獲得正確的輸出:500。
異常與鎖
如今讓咱們來看看另一種狀況會發生什麼。假設如今咱們的計數器擁有一個 derement() 操做,當 value 被減爲 0 時拋出一個異常:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
struct
Counter {
int
value;
Counter() : value(0) {}
void
increment(){
++value;
}
void
decrement(){
if(value == 0){
throw
"Value cannot be less than 0";
}
--value;
}
};
|
假設你想在不更改上述代碼的前提下爲其提供線程安全性,那麼你須要爲其建立一個 Wrapper 類:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
struct
ConcurrentCounter {
std::mutex mutex;
Counter counter;
void
increment(){
mutex.lock();
counter.increment();
mutex.unlock();
}
void
decrement(){
mutex.lock();
counter.decrement();
mutex.unlock();
}
};
|
這個 Wrapper 將在大多數狀況下正常工做,然而一旦 decrement() 拋出異常,你就遇到大麻煩了,當異常被拋出時,unlock() 函數將不會被調用,這將致使本線程得到的鎖不被釋放,你的程序也就瓜熟蒂落的被永久阻塞了。爲了修復這一問題,你須要使用 try/catch 塊以保證在拋出任何異常以前釋放得到的鎖。
1
2
3
4
5
6
7
8
9
10
|
void
decrement(){
mutex.lock();
try
{
counter.decrement();
}
catch
(std::string e){
mutex.unlock();
throw
e;
}
mutex.unlock();
}
|
代碼並不複雜,可是看起來卻很醜陋。試想一下,你如今的函數擁有 10 個返回點,那麼你就須要在每一個返回點前調用 unlock() 函數,而忘掉其中的某一個的可能性是很是大的。更大的風險在於你又添加了新的函數返回點,卻沒有對應地添加 unlock()。下一節將給出解決此問題的好辦法。
鎖的自動管理
當你想保護整個代碼段(就本文而言是一個函數,但也能夠是某個循環體或其餘控制結構[譯註:即一個做用域])免受多線程的侵害時,有一個辦法將有助於防止忘記釋放鎖:std::lock_guard。
這個類是一個簡單、智能的鎖管理器。當 std::lock_guard 實例被建立時,它自動地調用互斥量的lock() 函數,當該實例被銷燬時,它也順帶釋放掉得到的鎖。你能夠像這樣使用它:
1
2
3
4
5
6
7
8
9
10
11
12
|
struct
ConcurrentSafeCounter {
std::mutex mutex;
Counter counter;
void
increment(){
std::lock_guard<std::mutex> guard(mutex);
counter.increment();
}
void
decrement(){
std::lock_guard<std::mutex> guard(mutex);
counter.decrement();
}
};
|
代碼變得更整潔了不是嗎?
使用這種方法,你無須繃緊神經關注每個函數返回點是否釋放了鎖,由於這個操做已經被std::lock_guard 實例的析構函數接管了。
總結
如今咱們結束了短暫的 Semaphores 之旅。在本章中你學習瞭如何使用 C++ 線程庫中的互斥量來保護你的共享數據。
但有一點請牢記:鎖機制會帶來效率的下降。的確,一旦使用鎖,你的部分代碼就變得有序[譯註:非併發]了。若是你想要設計一個高度併發的應用程序,你將會用到其餘一些比鎖更好的機制,但他們已不屬於本文的討論範疇。
下篇
在本系列的下一篇文章中,我將談及關於互斥量的一些進階概念,並介紹如何使用條件變量去解決一些併發編程問題。