boost::lockfree::queue多線程讀寫實例

最近的任務是寫一個多線程的東西,就得接觸多線程隊列了,我反正是沒學過度布式的,代碼全憑感受寫出來的,不過運氣好,代碼可以work= =多線程

話很少說,直接給代碼吧,一個多消費者,多生產者的模式。假設個人任務是求隊列的中位數是啥,每消費10000次的時候,我要知道中位數是什麼。分佈式

至於加不加鎖,這個看你了,我反正是加了,代碼裏面沒寫……我反正是把寫的代碼單獨封裝了一個函數,而後加了個鎖函數

歡迎交流,這個代碼已經在實際任務上面上線了,但願不會有bug。this

用的是boost::lockfree::queue,官方文檔:http://www.boost.org/doc/libs/1_55_0/boost/lockfree/queue.hppspa

/*
關於鎖的代碼:

偉大的Boost庫給咱們提供了 shared_mutex  類,結合 unique_lock 與 shared_lock 的使用,能夠實現讀寫鎖。



一般讀寫鎖須要完成如下功能:

1.當 data 被線程A讀取時,其餘線程仍能夠進行讀取卻不能寫入

2.當 data 被線程A寫入時,其餘線程既不能讀取也不能寫入



對應於功能1,2咱們能夠這樣來描述:

1.當線程A得到共享鎖時,其餘線程仍能夠得到共享鎖但不能得到獨佔鎖

2.當線程A得到獨佔鎖時,其餘線程既不能得到共享鎖也不能得到獨佔鎖

typedef boost::shared_lock<boost::shared_mutex> read_lock;  
typedef boost::unique_lock<boost::shared_mutex> write_lock;  
  
boost::shared_mutex read_write_mutex;  
int32_t data = 1;  
  
//線程A,讀data  
{  
    read_lock rlock(read_write_mutex);  
    std::cout << data << std:; endl;  
}  
  
//線程B,讀data  
{  
    read_lock rlock(read_write_mutex);  
    std::cout << data << std:; endl;  
}  
  
//線程C,寫data  
{  
    write_lock rlock(read_write_mutex);  
    data = 2;  
}  
*/



#ifndef DYNAMIC_QUEUE_H_
#define DYNAMIC_QUEUE_H_

#include "boost/lockfree/queue.hpp"
#include "boost/thread/thread.hpp"
#include "boost/thread/mutex.hpp"
#include "abtest_parameters.h"

namespace un {
class DynamicController {

public:
boost::lockfree::queue<size_t,boost::lockfree::capacity<40000> > lockfree_queue;
// boost::lockfree::queue  boost裏面的無鎖隊列,惟一比較蛋疼的就是空間最大65536以及無法輸出size,其餘的就將就用吧。
// 隊列長度能夠自定義,也能夠不定義,會自增加的。

size_t num = 0;

void StartDaemonUpdater(){
  boost::function0<void> f = boost::bind(&DynamicController::UpdaterWorker, this);
  boost::thread thrd(f);
  thrd.detach();
}
// 啓動消費者隊列

void Producer(size_t number){
  bool succ = lockfree_queue.bounded_push(number);
  // 若是用push的話,沒空間的話,會等待消費完。
  // bounded_push的話,若是每空間會返回false,而後棄掉這個數。成功返回true
}
// 生產者

size_t GetNumber(
  return num;
}
// get代碼

void UpdaterWorker(void){
  std::vector<size_t> V;
  while(1){//穩妥起見,這個while裏面能夠寫個sleep以致於不須要一直在消費。
    size_t tmp_value;
    while(lockfree_queue.pop(tmp_value)){
      V.push_back(tmp_value);
      // 更新條件,10000個數
      // 用p99更新
      if(V.size()>10000){
        std::sort(V.begin(),V.end());
        num = V[size_t(V.size()*0.5)];
        V.clear();
      }
    }
  }
}

// 消費者

};
}
#endif
相關文章
相關標籤/搜索