本章主要描述多線程之間共享數據的方法、存在問題、解決方案。c++
第一部分:mutex在保護共享數據中的使用編程
1、最簡單使用:數組
#include<mutex> std::mutex some_mutex; void func(){ some_mutex.lock(); //訪問共享數據 .... some_mutex.unlock(); }
2、向lock_guard推動:安全
可是不推薦直接使用lock、unlock,由於unlock必定要調用,若是因爲你的疏忽或前面的異常將會致使問題,再次利用RAII思想,用對象管理資源就有了標準庫的std::lock_guard,在構造函數中lock,析構函數中unlock。多線程
std::mutex some_mutex; void func(){ lock_guard<std::mutex> some_guard(some_mutex); //訪問共享數據 .... }
3、向封裝前進:dom
每次數據訪問都要記得加解鎖,若是能讓用戶從加解鎖中解脫就行了。將共享數據、mutex、對共享數據的訪問函數(接口)封裝到一塊兒,這樣用戶就能夠在多線程下安全使用該共享數據了。示例以下:函數
class EncapShareData{ std::mutex m_mutex; Data m_data; public: void Func(){ std::lock_guard<std::mutex> mutexGuard; //對數據訪問 ..... } //下述方法曝光了m_data,是危險的 //返回共享數據的引用 Data& DangerFunc1(); //返回共享數據的指針 Data* DangerFunc2(); //參數中返回了共享數據的引用 void DangerFunc3(Data&); //參數中返回了共享數據的指針 void DangerFunc4(Data*); //函數f作了什麼?將m_data的指針或引用保存到其餘地方了(糟糕)? template<typename Function> void DangerFunc5(Function f){ std::lock_guard<std::mutex> mutexGuard(m_mutex); f(m_data); } //friend!class、func... friend void DangerFunc6(); }
注意:示例中的DangerFunc*任意一種都會讓對共享數據的多線程安全訪問毀於一旦。性能
4、到此爲止?this
當你走完上述封裝之路,而且確保封裝類的每一個接口都是多線程安全的,是否是真的就多線程安全了呢?看下面例子所示:spa
template<typename T> class ThreadSafeStack{ private: std::stack<T> m_data; std::mutex m_mutex; public: ThreadSafeStack(){} ThreadSafeStack(const ThreadSafeStack& other){ std::lock_guard<std::mutex> lock(other.m_mutex); data = other.data; } ThreadSafeStack& operator=(const ThreadSafeStack&) = delete; //多線程安全的push void push(T v){ std::lock_guard<std::mutex> lock(m_mutex); data.push(v); } //多線程安全的pop void pop(){ std::lock_guard<std::mutex> lock(m_mutex); data.pop(); } //stack::top返回內部元素引用,這裏爲了安全返回拷貝 T top(){ return data.top(); } //empty/size是隻讀,是多線程安全的,只須要轉發 bool empty(){ return data.empty(); } size_t size(){ return data.size(); } };
ThreadSafeStack類的每個接口單獨拿出來都是線程安全的,咱們知道stl中stack的入棧只須要一個函數:stack::push()就能夠了,可是若是要出棧就須要一連串函數調用,先要判斷棧是否爲空(top並不進行是否爲空檢查,因此你要檢查)stack::empty(),而後取得棧頂元素stack::top(),最後才能stack::pop(),以下代碼所示:
if(!mStack.empty()){ //① T const v=mStack.top(); //② mStack.pop(); }
單線程下這是安全的胡庸置疑。多線程線考慮如下狀況:
a、棧中只有一個元素,兩個獨立要pop的線程可能都走到①處,而後在②處出如今空棧上pop,悲劇!
b、棧中有兩個元素,兩個獨立要pop的線程可能都走到①處,可是取得的是同一個元素,而後pop兩次,致使數據丟失!
函數中每一個元素訪問是多線程安全的,不表明這個函數是多線程安全的,一個類中每一個函數是多線程安全的,不表明將這些函的數組合是多線程安全的,若是這些組合是多線程安全的,並不表明更高層的組合是多線程安全的...。你要根據你的須要權衡提供哪個層次的多線程安全(即肯定mutex的做用範圍)。好比上述的問題:類中每一個單獨函數都是多線程安全的,組合到一塊兒再也不多線程安全,要求的多線程範圍是「函數組合」再也不是單獨的函數了,解決方法是將這個「函數組合」封裝起來,將mutex做用範圍擴展到這個組合,最簡單的封裝就是封裝成一個單獨的函數,將上面全部步驟封裝成一個新的pop以下:
bool pop(T& v){ std::lock_guard<std::mutex> lock(m_mutex); if(m_data.empty()) return false; value=data.top(); data.pop(); return true; }
mutex的做用範圍要設置合理,若是過小,只能保證小範圍的多線程安全,可是小範圍能確保足夠少的函數須要mutex同步,效率高;若是做用範圍太大,能夠保證大範圍的線程安全,可是大範圍內不少操做都是不必同步的自己是線程安全的,這樣會下降性能。
第二部分:多個mutex致使的死鎖:
1、最基本的最核心的解決方法(按順序加鎖):
寫字須要紙和筆,只有一張紙、一支筆(資源有限),兩我的同時決定去寫字,A拿到了筆,B拿到了紙(推動順序不當),若是他們誰都不妥協就會致使死鎖。教科書中死鎖產生緣由:資源有限、推動順序不當。若是規定先拿到筆才能去拿紙,在這種狀況下就不會產生死鎖。
由此可知在多個mutex狀況下規定加鎖順序能夠避免死鎖,PV操做以下:
P(pen);
P(paper);
寫字;
V(paper);
V(pen);
2、當按順序加鎖行不通時:
按規定順序加鎖能夠避免死鎖,可是有的狀況下這種順序並「沒法」肯定,例如Swap(C,D)須要訪問須要同時訪問C、D的內部數據,理所固然對C、D都加鎖再訪問,按常規思路首先對第一個參數加鎖,而後對第二個參數加鎖,以下Swap(C,D)所示,那另外一個線程同時執行Swap(D,C)將參數調用來了,結果如何呢?死鎖!
Swap(C,D):P(C);P(D);執行內部數據交換;V(D);V(C);
Swap(D,C):P(D);P(C);執行內部數據交換;V(C);V(D);
該如何解決呢?stl提供std::lock(mutex1,mutex2)將mutex1和mutex2的兩個加鎖操做當成原子一步操做(但你要記得對mutex1和mutex2進行unlock),這就不會致使死鎖,std::lock要麼將mutex1和mutex2都加鎖也麼一個都不鎖。以下代碼所示:
void Swap(T&C, T&D){ if (&T == &D) return; //對std::mutex兩次lock是未定義的,從效率、安全考慮這都很必要 std::lock(C.m_mutex, D.m_mutex); std::lock_guard<std::mutex> lock1(C.m_mutex, std::adopt_lock); std::lock_guard<std::mutex> lock2(D.m_mutex, std::adopt_lock); //數據交換的操做 ..... }
std::adopt_lock告訴lock_guard在構造函數中不用在調用std::mutex::lock了,在前面已經調用過了。
3、hierarchical_mutex實如今運行時檢查死鎖的出現。
hierarchical_mutex規則思想是:將mutex分層,規定加鎖順序是由高層到底層才能進行,底層到高層報出運行時錯誤,這樣就能夠利用編程的方法檢測死鎖。書中實現了hierarchical_mutex類做爲可分層的mutex,先列出使用方法以下:
hierarchical_mutex high_level_mutex(10000); hierarchical_mutex low_level_mutex(500); void ThreadA(){ std::lock_guard<hierarchical_mutex> lock1(high_level_mutex); ... //作一些使用high_level_mutex就能夠乾的事 std::lock_guard<hierarchical_mutex> lock1(low_level_mutex); ... //須要兩個mutex同時加鎖才能夠乾的事 } void ThreadB(){ std::lock_guard<hierarchical_mutex> lock1(low_level_mutex); ... //作一些使用low_level_mutex就能夠乾的事 //對高低層mutex加鎖的狀況下,對高層mutex加鎖,不符合規定的順序,拋出異常! std::lock_guard<hierarchical_mutex> lock1(high_level_mutex); }
ThreadA符合hierarchical_mutex使用規定不會出現死鎖是多線程安全的,ThreadB沒按hierarchical_mutex 規定,有出現死鎖的危險,在運行時就拋出異常。
hierarchical_mutex實現方法,要能使用lock_guard對hierarchical_mutex進行管理必須實現lock/unlock/try_lock方法;爲了實現層次間的比較進而決定能不能加鎖,須要記錄準備加鎖mutex的層號、該線程中當前已經加鎖的mutex的層號、在解鎖時恢復原先現場就要記錄先前mutex的層號,綜上hierarchical_mutex定義以下:
class hierarchical_mutex{ std::mutex internal_mutex; //準備加鎖mutex的層號 const unsigned hierarchical_value; //在解鎖時恢復原先現場就要記錄先前mutex的層號 unsigned previous_hierarchical_value; //該線程中當前已經加鎖的mutex的層號 static thread_local unsigned this_thread_hierarchical_value; //檢查是否知足hierarchical_mutex規則 void check_for_hierarchical_violation(){ if (this_thread_hierarchical_value <= hierarchical_value){ throw std::logic_error(「mutex hierarchical violated!」); } } void update_hierarchical_value(){ previous_hierarchical_value = this_thread_hierarchical_value; this_thread_hierarchical_value = hierarchical_value; } public: explicit hierarchical_mutex(unsigned value) :hierarchical_value(value), previous_hierarchical_value(0){} //知足hierarchical_mutex規則時加鎖並更新內部記錄變量 void lock(){ check_for_hierarchical_violation(); internal_mutex.lock(); update_hierarchical_value; } //恢復到調用lock以前的現場,解鎖 void unlock(){ this_thread_hierarchical_value = previous_hierarchical_value; internal_mutex.unlock(); } bool try_lock(){ check_for_hierarchical_violation(); if (!internal_mutex.try_lock()) return false; update_hierarchical_value(); return true; } }; //使用UNSIGND_MAX初始化代表剛開始任何層的mutex均可以加鎖成功 thread_local unsigned hierarchical_mutex::this_thread_hierarchical_value(UNSIGND_MAX);
標註:mutex::try_lock()檢查mutex是否能夠成功lock,若是能夠就lock,若是不行馬上返回false.
第三部分:擴展
1、unique_lock:
一、相對lock_guard加鎖、解鎖更靈活的控制。
內部保存的是相關聯mutex的狀態,使用std::lock調用或std::unique_lock::lock()或使用std::unique_lock(mutex)構造時打開狀態,使用unique_lock::unlock時關閉狀態,在析構時候檢查狀態以決定是否調用std::mutex::unlock(),靈活就在隨時調用lock、unlock。
std::unique_lock lock(m_mutex,std::defer_lock); //std::defer_lock說明定義時對m_mutex不加鎖 .... std::lock(lock); //須要加鎖時再加鎖 .... lock.unlock(); //使用結束手動解鎖
二、對mutex的控制權轉移:
//*1做爲參數 void f2(std::unique_lock&& p){ ..此範圍內p關聯的m_mutex已經加鎖.. } void f1(){ std::unique_lock lock(m_mutex); //對m_mutex加鎖 f2(lock); //m_mutex管理權限轉移到f2中 } //*2做爲返回值 unique_lock f2(){ std::unique_lock lock(m_mutex); prepareData(); return lock; } void f1(){ std::unique_lock lock(f2()); processData(); }
unique_lock內部保存的是關聯mutex的狀態標記,能夠利用unique_lock::owe_lock()檢查關聯的mutex是否已經加鎖。管理權轉移爲何不用lock_guard呢?從源代碼分析可知lock_guard沒有拷貝構造、賦值、移動構造、移動賦值,意味着不能做爲參數、返回值傳遞,可是unique_lock跟unique_ptr同樣雖然沒有拷貝構造、賦值,但有移動構造、移動賦值,「天生」是用作權限轉移的。
2、Lazy-initialization:
單線程中:
std::shared_ptr<some_resource> resource_ptr; void foo(){ if (!resource_ptr){ resource_ptr.reset(new some_resource); } resource_ptr->do_something(); }
多線程中:
std::shared_ptr<some_resource> resource_ptr; std::mutex resource_mutex; void foo(){ std::unique_lock<std::mutex> lock(resource_mutex); if (!resource_ptr){ resource_ptr.reset(new some_resource); } lock.unlock(); resource_ptr->do_something(); }
咱們要的是隻在resource_ptr還沒初始化時才加鎖、解鎖,這個版本任什麼時候候進入foo都加鎖、解鎖,明顯效率降低。而後就有了「infamous」雙重鎖:
std::shared_ptr<some_resource> resource_ptr; std::mutex resource_mutex; void foo(){ if (!resource_ptr){//① std::lock_guard<std::mutex> lock(resource_mutex); if (!resource_ptr){ resource_ptr.reset(new some_resource);//② } } resource_ptr->do_something();//③ }
看似很合理,但這卻變成了一個讓不少學者頭疼的問題。問題出在②處對resource_ptr指針的賦值和some_resource構造函數的調用誰先誰後不肯定(編譯器要重排代碼順序,究竟怎麼重排沒規定),若是首先對resource_ptr指針賦值可是some_resource構造函數尚未調用,另外一個線程在①處檢查到resource_ptr有值了(true),馬上執行③,實際上是錯誤的。
標準庫提供了一種解決方案,具體怎麼解決的很複雜參考http://blog.jobbole.com/52164/。這裏只談使用方法,你只需知道這是多線程安全的而且比在正確狀況下的雙重檢查鎖效率還高,代碼示例以下:
std::shared_ptr<some_resource> resource_ptr; std::once_flag resource_flag; void init_resource(){ resource_ptr.reset(new some_resource); } void foo(){ std::call_once(resource_flag, init_resource); resource_ptr->do_something(); }
你能夠將此方法很容易擴展到多線程安全的單例模式,實現真正的多線程安全。
3、保護不多更新,可是常常是隻讀的數據:
能夠想到使用讀者優先,你能夠本身利用mutex去設計本身的讀者優先的函數,標準庫並無提供任何讀寫鎖。可是boost庫提供了boost::shared_mutex和boost::shared_lock函數能夠很簡單的解決這個問題:當使用lock_guard或unique_lock對boost::shared_mutex加鎖後,對該boost::shared_mutex的獲取方式(lock_guard/unique_lock/shared_lock)都會阻塞;當使用boost::shared_lock對該boost::shared_mutex加鎖後,使用lock_guard和unique_lock對該boost::shared_lock的獲取就會阻塞,可是使用shared_lock對該boost::shared_mutex的獲取就不會阻塞。使用實例以下:
#include <map> #include <string> #include <mutex> #include <boost/thread/shared_mutex.hpp> class dns_entry; class dns_cache { std::map<std::string, dns_entry> entries; mutable boost::shared_mutex entry_mutex; public: //讀者 dns_entry find_entry(std::string const& domain) const { boost::shared_lock<boost::shared_mutex> lk(entry_mutex); //阻塞寫者但不阻塞讀者 std::map<std::string, dns_entry>::const_iterator const it = entries.find(domain); return (it == entries.end()) ? dns_entry() : it->second; } //寫者 void update_or_add_entry(std::string const& domain, dns_entry const& dns_details) { std::lock_guard<boost::shared_mutex> lk(entry_mutex); //將其餘的寫者和全部的讀者都阻塞 entries[domain] = dns_details; } };
對recursive_mutex的使用,做者不推薦「Most of the time, if you think you want a recursive mutex, you probably need to change your design instead」,這裏就再也不詳述了。
本帖全本身在閱讀《c++ concurrency in action》中的總結,若是對你有幫助,請點個贊^-^