基於鎖的併發數據結構

1. 使用細粒度鎖和條件變量的線程安全隊列

可使用細粒度的鎖來減少隊列的臨界區,這裏使用了一個dummy node用來進一步減少鎖的臨界區。若要判斷隊列是否爲空,只須要執行下述判斷:node

head.get() == get_tail()

請注意,由於在進行push的時候須要修改tail,因此對tail的訪問和修改都須要進行加鎖。這裏使用get_tail來封裝這個操做,將鎖的粒度減少到最低。編程

// lock tail mutex and return tail node
node *get_tail()
{
std::lock_guard<std::mutex> tail_lock(tail_mutex);
return tail;
}

對push的操做只涉及到修改tail節點,因此只須要對tail節點進行加鎖。加鎖完成以後就能夠修改tail使其指向新的tail節點。安全

void push(T new_value)
{
    std::shared_ptr<T> new_data(std::make_shared<T>(std::move(new_value)));
    std::unique_ptr<node> p(new node);
    {
      std::lock_guard<std::mutex> tail_lock(tail_mutex);
      tail->data = new_data;
      node *const new_tail = p.get();
      tail->next = std::move(p);
      tail = new_tail;
    }
    data_cond.notify_one();
}

至於try_pop_head()爲了應對這一種需求,若是隊列爲空直接返回,不等待。其操做以下所示:併發

std::unique_ptr<node> try_pop_head()
{
    std::lock_guard<std::mutex> head_lock(head_mutex);
    if (head.get() == get_tail())
    {
      return std::unique_ptr<node>();
    }
    return pop_head();
}

至於wait_and_pop()須要一直等待,直到彈出隊列中的一個元素。這裏使用了條件變量,避免線程循環進行空等待。固然,在push()的時候,須要配合條件變量通知等待的線程。app

std::shared_ptr<T> wait_and_pop()
{
    std::unique_ptr<node> const old_head = wait_pop_head();
    return old_head->data;
}

std::unique_ptr<node> wait_pop_head()
{
    std::unique_lock<std::mutex> head_lock(wait_for_data());
    return pop_head();
}

  // wait for data, return std::unique_lock<std::mutex> head_lock
std::unique_lock<std::mutex> wait_for_data()
{
    std::unique_lock<std::mutex> head_lock(head_mutex);
    // wait until not empty
    data_cond.wait(head_lock, [&] { return head.get() != get_tail(); });
    return std::move(head_lock);
}

完整的代碼以下所示:函數

#pragma once
#include <memory>
#include <mutex>

template<typename T>
class threadsafe_queue
{
 public:
  threadsafe_queue() :
      head(new node), tail(head.get()) {}

  std::shared_ptr<T> try_pop()
  {
    std::unique_ptr<node> old_head = try_pop_head();
    return old_head ? old_head->data : std::shared_ptr<T>();
  }

  bool try_pop(T &value)
  {
    std::unique_ptr<node> const old_head = try_pop_head(value);
    return old_head.get();
  }

  std::shared_ptr<T> wait_and_pop()
  {
    std::unique_ptr<node> const old_head = wait_pop_head();
    return old_head->data;
  }

  void wait_and_pop(T &value)
  {
    std::unique_ptr<node> const old_head = wait_pop_head(value);
  }

  void push(T new_value)
  {
    std::shared_ptr<T> new_data(std::make_shared<T>(std::move(new_value)));
    std::unique_ptr<node> p(new node);
    {
      std::lock_guard<std::mutex> tail_lock(tail_mutex);
      tail->data = new_data;
      node *const new_tail = p.get();
      tail->next = std::move(p);
      tail = new_tail;
    }
    data_cond.notify_one();
  }

  bool empty()
  {
    std::lock_guard<std::mutex> head_lock(head_mutex);
    return (head.get() == get_tail());
  }

  threadsafe_queue(const threadsafe_queue &) = delete;
  threadsafe_queue &operator=(const threadsafe_queue &) = delete;

 private:
  struct node
  {
    std::shared_ptr<T> data;
    std::unique_ptr<node> next;
  };

  // lock tail mutex and return tail node
  node *get_tail()
  {
    std::lock_guard<std::mutex> tail_lock(tail_mutex);
    return tail;
  }

  // pop head node from queue, return old head node
  std::unique_ptr<node> pop_head()
  {
    std::unique_ptr<node> old_head = std::move(head);
    head = std::move(old_head->next);
    return old_head;
  }

  // wait for data, return std::unique_lock<std::mutex> head_lock
  std::unique_lock<std::mutex> wait_for_data()
  {
    std::unique_lock<std::mutex> head_lock(head_mutex);
    // wait until not empty
    data_cond.wait(head_lock, [&] { return head.get() != get_tail(); });
    return std::move(head_lock);
  }

  std::unique_ptr<node> wait_pop_head()
  {
    std::unique_lock<std::mutex> head_lock(wait_for_data());
    return pop_head();
  }

  std::unique_ptr<node> wait_pop_head(T& value)
  {
    std::unique_lock<std::mutex> head_lock(wait_for_data());
    value = std::move(*head->data);
    return pop_head();
  }

  std::unique_ptr<node> try_pop_head()
  {
    std::lock_guard<std::mutex> head_lock(head_mutex);
    if (head.get() == get_tail())
    {
      return std::unique_ptr<node>();
    }
    return pop_head();
  }

  std::unique_ptr<node> try_pop_head(T &value)
  {
    std::lock_guard<std::mutex> head_lock(head_mutex);
    if (head.get() == get_tail())
    {
      return std::unique_ptr<node>();
    }
    value = std::move(*head->data);
    return pop_head();
  }

  std::mutex head_mutex; // head mutex
  std::unique_ptr<node> head; // head node
  std::mutex tail_mutex; // tail mutex
  node *tail; // tail node
  std::condition_variable data_cond; // condition variable
};

2. 線程安全hash表

線程安全的hash表是另外一個用於展現細粒度鎖同步的很好的例子。在hash實現之中,使用了基於桶的開鏈hash實現。每一個桶對應的鏈表能夠統一使用同一個鎖進行訪問控制。對鏈表的修改須要使用寫鎖進行排他的訪問控制,對鏈表的訪問則使用讀鎖進行保護,這樣就充分利用了讀鎖和寫鎖的區別,將鎖的粒度降到最低,減小可能的數據競爭。線程

下面的代碼展現了bucket_type的用法:code

class bucket_type
{
 public:
  Value value_for(Key const& key, Value const& default_value) const
  {
    // read 須要加讀鎖
    boost::shared_lock<boost::shared_mutex> lock(mutex);
    const_bucket_iterator found_entry = find_entry_for(key);
    return (found_entry == data.end()) ? default_value:found_entry->second;
  }

  void add_or_update_mapping(Key const& key, Value const& value)
  {
    // 須要加寫鎖
    std::unique_lock<boost::shared_mutex> lock(mutex);
    bucket_iterator found_entry = find_entry_for(key);
    if(found_entry == data.end())
    {
      data.push_back(bucket_value(key, value));
    }
    else
    {
      found_entry->second = value;
    }
  }

  void remove_mapping(Key const& key)
  {
    // 須要加寫鎖
    std::unique_lock<boost::shared_mutex> lock(mutex);
    const_bucket_iterator found_entry = find_entry_for(key);
    if(found_entry != data.end())
    {
      data.erase(found_entry);
    }
  }

 private:
  typedef std::pair<Key, Value> bucket_value;
  typedef std::list<bucket_value> bucket_data;
  typedef typename bucket_data::const_iterator const_bucket_iterator;
  typedef typename bucket_data::iterator bucket_iterator;
  bucket_data data;
  mutable boost::shared_mutex mutex;

  const_bucket_iterator find_entry_for(Key const& key) const
  {
    return std::find_if(data.begin(),data.end(),
                        [&](bucket_value const& item)
                        {return item.first==key;});
  }

  bucket_iterator find_entry_for(Key const& key)
  {
    return std::find_if(data.begin(), data.end(), [&](bucket_value const& item) { return item.first == key; });
  }
};

上述代碼體現了讀鎖和寫鎖的區別,只有在修改鏈表的時候才使用寫鎖保證一致性,在訪問鏈表的時候使用讀鎖來屏蔽寫鎖,容許同時訪問。隊列

多個hash桶就組合成了一個hash table。根據hash規則拿到對應的hash桶,再對桶內的鏈表進行讀寫操做。rem

std::vector<std::unique_ptr<bucket_type>> buckets;
//獲取對應的hash桶
bucket_type& get_bucket(Key const& key) const
{
    // 獲取對應桶的操做不用進行加鎖
    std::size_t const bucket_index = hasher(key) % buckets.size();
    return *buckets[bucket_index];
}

hash表剩餘的操做就是對bucket內置函數的轉調用。每一個bucket有本身的讀寫鎖進行訪問控制。

Value value_for(Key const& key, Value const& default_value=Value()) const
{
    return get_bucket(key).value_for(key, default_value);
}

void add_or_update_mapping(Key const& key, Value const& value)
{
    get_bucket(key).add_or_update_mapping(key, value);
}

void remove_mapping(Key const& key)
{
    get_bucket(key).remove_mapping(key);
}

3. 線程安全鏈表

對於線程安全的鏈表,也是用dummy node來標誌鏈表的開頭位置。注意對於遍歷鏈表的操做,在對對應的鏈表節點進行操做的時候,必定要持有對應鏈表節點的鎖,就像這樣:

template<typename Function>
void for_each(Function f)
{
    node* current = &head;
    std::unique_lock<std::mutex> lk(head.m);
    node* next;
    while((next = current->next.get()) != NULL)
    {
      std::unique_lock<std::mutex> next_lk(next->m);
      // unlock node
      lk.unlock();
      f(*next->data);
      current=next;
      lk = std::move(next_lk);
    }
}
template<typename Predicate>
std::shared_ptr<T> find_first_if(Predicate p)
{
    node* current = &head;
    std::unique_lock<std::mutex> lk(head.m);
    while(node* const next = current->next.get())
    {
      std::unique_lock<std::mutex> next_lk(next->m);
      lk.unlock();
      if(p(*next->data))
      {
        return next->data;
      }
      current = next;
      lk = std::move(next_lk);
    }
    return std::shared_ptr<T>();
}

要注意的是,remove操做須要同時持有先後兩個節點的鎖,這樣才能保證從新設置先後節點的時候對應節點不被修改。

template<typename Predicate>
void remove_if(Predicate p)
{
    node* current = &head;
    std::unique_lock<std::mutex> lk(head.m);
    while(node* const next = current->next.get())
    {
      std::unique_lock<std::mutex> next_lk(next->m);
      if(p(*next->data))
      {
        // store old_next node
        // 保證old_next在析構以前其持有的鎖已經被解鎖
        std::unique_ptr<node> old_next = std::move(current->next);
        current->next = std::move(next->next);
        next_lk.unlock();
      }
      else
      {
        lk.unlock();
        current = next;
        lk = std::move(next_lk);
      }
    }
}

對於整個鏈表的節點的析構也是藉助remove_if完成的。

~threadsafe_list()
{
    // remove all node from list
    remove_if([](node const &){ return true; });
}

完整的鏈表實現代碼以下所示:

#include <mutex>

template<typename T>
class threadsafe_list
{
 public:
  threadsafe_list()
  { }

  ~threadsafe_list()
  {
    // remove all node from list
    remove_if([](node const &){ return true; });
  }

  // no copying
  threadsafe_list(threadsafe_list&) = delete;
  threadsafe_list& operator=(threadsafe_list&) = delete;

  // push node in front of the list
  void push_front(T const& value)
  {
    std::unique_ptr<node> new_node(new node(value));
    std::lock_guard<std::mutex> lk(head.m);
    new_node->next = std::move(head.next);
    head.next = std::move(new_node);
  }

  template<typename Function>
  void for_each(Function f)
  {
    node* current = &head;
    std::unique_lock<std::mutex> lk(head.m);
    node* next;
    while((next = current->next.get()) != NULL)
    {
      std::unique_lock<std::mutex> next_lk(next->m);
      // unlock node
      lk.unlock();
      f(*next->data);
      current=next;
      lk = std::move(next_lk);
    }
  }

  template<typename Predicate>
  std::shared_ptr<T> find_first_if(Predicate p)
  {
    node* current = &head;
    std::unique_lock<std::mutex> lk(head.m);
    while(node* const next = current->next.get())
    {
      std::unique_lock<std::mutex> next_lk(next->m);
      lk.unlock();
      if(p(*next->data))
      {
        return next->data;
      }
      current = next;
      lk = std::move(next_lk);
    }
    return std::shared_ptr<T>();
  }

  template<typename Predicate>
  void remove_if(Predicate p)
  {
    node* current = &head;
    std::unique_lock<std::mutex> lk(head.m);
    while(node* const next = current->next.get())
    {
      std::unique_lock<std::mutex> next_lk(next->m);
      if(p(*next->data))
      {
        // store old_next node
        // 保證old_next在析構以前其持有的鎖已經被解鎖
        std::unique_ptr<node> old_next = std::move(current->next);
        current->next = std::move(next->next);
        next_lk.unlock();
      }
      else
      {
        lk.unlock();
        current = next;
        lk = std::move(next_lk);
      }
    }
  }

 private:
  struct node
  {
    std::mutex m;
    std::shared_ptr<T> data;
    std::unique_ptr<node> next;
    node():
        m(),
        data(),
        next()
    { }

    node(T const& value):
        m(),
        data(std::make_shared<T>(value)),
        next()
    { }
  };

  // dummy node, store node data
  node head;
};

《C++ 併發編程實戰》

相關文章
相關標籤/搜索