可使用細粒度的鎖來減少隊列的臨界區,這裏使用了一個dummy node用來進一步減少鎖的臨界區。若要判斷隊列是否爲空,只須要執行下述判斷:node
head.get() == get_tail()
請注意,由於在進行push的時候須要修改tail,因此對tail的訪問和修改都須要進行加鎖。這裏使用get_tail來封裝這個操做,將鎖的粒度減少到最低。編程
// lock tail mutex and return tail node node *get_tail() { std::lock_guard<std::mutex> tail_lock(tail_mutex); return tail; }
對push的操做只涉及到修改tail節點,因此只須要對tail節點進行加鎖。加鎖完成以後就能夠修改tail使其指向新的tail節點。安全
void push(T new_value) { std::shared_ptr<T> new_data(std::make_shared<T>(std::move(new_value))); std::unique_ptr<node> p(new node); { std::lock_guard<std::mutex> tail_lock(tail_mutex); tail->data = new_data; node *const new_tail = p.get(); tail->next = std::move(p); tail = new_tail; } data_cond.notify_one(); }
至於try_pop_head()
爲了應對這一種需求,若是隊列爲空直接返回,不等待。其操做以下所示:併發
std::unique_ptr<node> try_pop_head() { std::lock_guard<std::mutex> head_lock(head_mutex); if (head.get() == get_tail()) { return std::unique_ptr<node>(); } return pop_head(); }
至於wait_and_pop()
須要一直等待,直到彈出隊列中的一個元素。這裏使用了條件變量,避免線程循環進行空等待。固然,在push()
的時候,須要配合條件變量通知等待的線程。app
std::shared_ptr<T> wait_and_pop() { std::unique_ptr<node> const old_head = wait_pop_head(); return old_head->data; } std::unique_ptr<node> wait_pop_head() { std::unique_lock<std::mutex> head_lock(wait_for_data()); return pop_head(); } // wait for data, return std::unique_lock<std::mutex> head_lock std::unique_lock<std::mutex> wait_for_data() { std::unique_lock<std::mutex> head_lock(head_mutex); // wait until not empty data_cond.wait(head_lock, [&] { return head.get() != get_tail(); }); return std::move(head_lock); }
完整的代碼以下所示:函數
#pragma once #include <memory> #include <mutex> template<typename T> class threadsafe_queue { public: threadsafe_queue() : head(new node), tail(head.get()) {} std::shared_ptr<T> try_pop() { std::unique_ptr<node> old_head = try_pop_head(); return old_head ? old_head->data : std::shared_ptr<T>(); } bool try_pop(T &value) { std::unique_ptr<node> const old_head = try_pop_head(value); return old_head.get(); } std::shared_ptr<T> wait_and_pop() { std::unique_ptr<node> const old_head = wait_pop_head(); return old_head->data; } void wait_and_pop(T &value) { std::unique_ptr<node> const old_head = wait_pop_head(value); } void push(T new_value) { std::shared_ptr<T> new_data(std::make_shared<T>(std::move(new_value))); std::unique_ptr<node> p(new node); { std::lock_guard<std::mutex> tail_lock(tail_mutex); tail->data = new_data; node *const new_tail = p.get(); tail->next = std::move(p); tail = new_tail; } data_cond.notify_one(); } bool empty() { std::lock_guard<std::mutex> head_lock(head_mutex); return (head.get() == get_tail()); } threadsafe_queue(const threadsafe_queue &) = delete; threadsafe_queue &operator=(const threadsafe_queue &) = delete; private: struct node { std::shared_ptr<T> data; std::unique_ptr<node> next; }; // lock tail mutex and return tail node node *get_tail() { std::lock_guard<std::mutex> tail_lock(tail_mutex); return tail; } // pop head node from queue, return old head node std::unique_ptr<node> pop_head() { std::unique_ptr<node> old_head = std::move(head); head = std::move(old_head->next); return old_head; } // wait for data, return std::unique_lock<std::mutex> head_lock std::unique_lock<std::mutex> wait_for_data() { std::unique_lock<std::mutex> head_lock(head_mutex); // wait until not empty data_cond.wait(head_lock, [&] { return head.get() != get_tail(); }); return std::move(head_lock); } std::unique_ptr<node> wait_pop_head() { std::unique_lock<std::mutex> head_lock(wait_for_data()); return pop_head(); } std::unique_ptr<node> wait_pop_head(T& value) { std::unique_lock<std::mutex> head_lock(wait_for_data()); value = std::move(*head->data); return pop_head(); } std::unique_ptr<node> try_pop_head() { std::lock_guard<std::mutex> head_lock(head_mutex); if (head.get() == get_tail()) { return std::unique_ptr<node>(); } return pop_head(); } std::unique_ptr<node> try_pop_head(T &value) { std::lock_guard<std::mutex> head_lock(head_mutex); if (head.get() == get_tail()) { return std::unique_ptr<node>(); } value = std::move(*head->data); return pop_head(); } std::mutex head_mutex; // head mutex std::unique_ptr<node> head; // head node std::mutex tail_mutex; // tail mutex node *tail; // tail node std::condition_variable data_cond; // condition variable };
線程安全的hash表是另外一個用於展現細粒度鎖同步的很好的例子。在hash實現之中,使用了基於桶的開鏈hash實現。每一個桶對應的鏈表能夠統一使用同一個鎖進行訪問控制。對鏈表的修改須要使用寫鎖進行排他的訪問控制,對鏈表的訪問則使用讀鎖進行保護,這樣就充分利用了讀鎖和寫鎖的區別,將鎖的粒度降到最低,減小可能的數據競爭。線程
下面的代碼展現了bucket_type
的用法:code
class bucket_type { public: Value value_for(Key const& key, Value const& default_value) const { // read 須要加讀鎖 boost::shared_lock<boost::shared_mutex> lock(mutex); const_bucket_iterator found_entry = find_entry_for(key); return (found_entry == data.end()) ? default_value:found_entry->second; } void add_or_update_mapping(Key const& key, Value const& value) { // 須要加寫鎖 std::unique_lock<boost::shared_mutex> lock(mutex); bucket_iterator found_entry = find_entry_for(key); if(found_entry == data.end()) { data.push_back(bucket_value(key, value)); } else { found_entry->second = value; } } void remove_mapping(Key const& key) { // 須要加寫鎖 std::unique_lock<boost::shared_mutex> lock(mutex); const_bucket_iterator found_entry = find_entry_for(key); if(found_entry != data.end()) { data.erase(found_entry); } } private: typedef std::pair<Key, Value> bucket_value; typedef std::list<bucket_value> bucket_data; typedef typename bucket_data::const_iterator const_bucket_iterator; typedef typename bucket_data::iterator bucket_iterator; bucket_data data; mutable boost::shared_mutex mutex; const_bucket_iterator find_entry_for(Key const& key) const { return std::find_if(data.begin(),data.end(), [&](bucket_value const& item) {return item.first==key;}); } bucket_iterator find_entry_for(Key const& key) { return std::find_if(data.begin(), data.end(), [&](bucket_value const& item) { return item.first == key; }); } };
上述代碼體現了讀鎖和寫鎖的區別,只有在修改鏈表的時候才使用寫鎖保證一致性,在訪問鏈表的時候使用讀鎖來屏蔽寫鎖,容許同時訪問。隊列
多個hash桶就組合成了一個hash table。根據hash規則拿到對應的hash桶,再對桶內的鏈表進行讀寫操做。rem
std::vector<std::unique_ptr<bucket_type>> buckets;
//獲取對應的hash桶 bucket_type& get_bucket(Key const& key) const { // 獲取對應桶的操做不用進行加鎖 std::size_t const bucket_index = hasher(key) % buckets.size(); return *buckets[bucket_index]; }
hash表剩餘的操做就是對bucket內置函數的轉調用。每一個bucket有本身的讀寫鎖進行訪問控制。
Value value_for(Key const& key, Value const& default_value=Value()) const { return get_bucket(key).value_for(key, default_value); } void add_or_update_mapping(Key const& key, Value const& value) { get_bucket(key).add_or_update_mapping(key, value); } void remove_mapping(Key const& key) { get_bucket(key).remove_mapping(key); }
對於線程安全的鏈表,也是用dummy node來標誌鏈表的開頭位置。注意對於遍歷鏈表的操做,在對對應的鏈表節點進行操做的時候,必定要持有對應鏈表節點的鎖,就像這樣:
template<typename Function> void for_each(Function f) { node* current = &head; std::unique_lock<std::mutex> lk(head.m); node* next; while((next = current->next.get()) != NULL) { std::unique_lock<std::mutex> next_lk(next->m); // unlock node lk.unlock(); f(*next->data); current=next; lk = std::move(next_lk); } }
template<typename Predicate> std::shared_ptr<T> find_first_if(Predicate p) { node* current = &head; std::unique_lock<std::mutex> lk(head.m); while(node* const next = current->next.get()) { std::unique_lock<std::mutex> next_lk(next->m); lk.unlock(); if(p(*next->data)) { return next->data; } current = next; lk = std::move(next_lk); } return std::shared_ptr<T>(); }
要注意的是,remove操做須要同時持有先後兩個節點的鎖,這樣才能保證從新設置先後節點的時候對應節點不被修改。
template<typename Predicate> void remove_if(Predicate p) { node* current = &head; std::unique_lock<std::mutex> lk(head.m); while(node* const next = current->next.get()) { std::unique_lock<std::mutex> next_lk(next->m); if(p(*next->data)) { // store old_next node // 保證old_next在析構以前其持有的鎖已經被解鎖 std::unique_ptr<node> old_next = std::move(current->next); current->next = std::move(next->next); next_lk.unlock(); } else { lk.unlock(); current = next; lk = std::move(next_lk); } } }
對於整個鏈表的節點的析構也是藉助remove_if
完成的。
~threadsafe_list() { // remove all node from list remove_if([](node const &){ return true; }); }
完整的鏈表實現代碼以下所示:
#include <mutex> template<typename T> class threadsafe_list { public: threadsafe_list() { } ~threadsafe_list() { // remove all node from list remove_if([](node const &){ return true; }); } // no copying threadsafe_list(threadsafe_list&) = delete; threadsafe_list& operator=(threadsafe_list&) = delete; // push node in front of the list void push_front(T const& value) { std::unique_ptr<node> new_node(new node(value)); std::lock_guard<std::mutex> lk(head.m); new_node->next = std::move(head.next); head.next = std::move(new_node); } template<typename Function> void for_each(Function f) { node* current = &head; std::unique_lock<std::mutex> lk(head.m); node* next; while((next = current->next.get()) != NULL) { std::unique_lock<std::mutex> next_lk(next->m); // unlock node lk.unlock(); f(*next->data); current=next; lk = std::move(next_lk); } } template<typename Predicate> std::shared_ptr<T> find_first_if(Predicate p) { node* current = &head; std::unique_lock<std::mutex> lk(head.m); while(node* const next = current->next.get()) { std::unique_lock<std::mutex> next_lk(next->m); lk.unlock(); if(p(*next->data)) { return next->data; } current = next; lk = std::move(next_lk); } return std::shared_ptr<T>(); } template<typename Predicate> void remove_if(Predicate p) { node* current = &head; std::unique_lock<std::mutex> lk(head.m); while(node* const next = current->next.get()) { std::unique_lock<std::mutex> next_lk(next->m); if(p(*next->data)) { // store old_next node // 保證old_next在析構以前其持有的鎖已經被解鎖 std::unique_ptr<node> old_next = std::move(current->next); current->next = std::move(next->next); next_lk.unlock(); } else { lk.unlock(); current = next; lk = std::move(next_lk); } } } private: struct node { std::mutex m; std::shared_ptr<T> data; std::unique_ptr<node> next; node(): m(), data(), next() { } node(T const& value): m(), data(std::make_shared<T>(value)), next() { } }; // dummy node, store node data node head; };
《C++ 併發編程實戰》