最近的任務是寫一個多線程的東西,就得接觸多線程隊列了,我反正是沒學過度布式的,代碼全憑感受寫出來的,不過運氣好,代碼可以work= =多線程
話很少說,直接給代碼吧,一個多消費者,多生產者的模式。假設個人任務是求隊列的中位數是啥,每消費10000次的時候,我要知道中位數是什麼。分佈式
至於加不加鎖,這個看你了,我反正是加了,代碼裏面沒寫……我反正是把寫的代碼單獨封裝了一個函數,而後加了個鎖函數
歡迎交流,這個代碼已經在實際任務上面上線了,但願不會有bug。this
用的是boost::lockfree::queue,官方文檔:http://www.boost.org/doc/libs/1_55_0/boost/lockfree/queue.hppspa
/* 關於鎖的代碼: 偉大的Boost庫給咱們提供了 shared_mutex 類,結合 unique_lock 與 shared_lock 的使用,能夠實現讀寫鎖。 一般讀寫鎖須要完成如下功能: 1.當 data 被線程A讀取時,其餘線程仍能夠進行讀取卻不能寫入 2.當 data 被線程A寫入時,其餘線程既不能讀取也不能寫入 對應於功能1,2咱們能夠這樣來描述: 1.當線程A得到共享鎖時,其餘線程仍能夠得到共享鎖但不能得到獨佔鎖 2.當線程A得到獨佔鎖時,其餘線程既不能得到共享鎖也不能得到獨佔鎖 typedef boost::shared_lock<boost::shared_mutex> read_lock; typedef boost::unique_lock<boost::shared_mutex> write_lock; boost::shared_mutex read_write_mutex; int32_t data = 1; //線程A,讀data { read_lock rlock(read_write_mutex); std::cout << data << std:; endl; } //線程B,讀data { read_lock rlock(read_write_mutex); std::cout << data << std:; endl; } //線程C,寫data { write_lock rlock(read_write_mutex); data = 2; } */ #ifndef DYNAMIC_QUEUE_H_ #define DYNAMIC_QUEUE_H_ #include "boost/lockfree/queue.hpp" #include "boost/thread/thread.hpp" #include "boost/thread/mutex.hpp" #include "abtest_parameters.h" namespace un { class DynamicController { public: boost::lockfree::queue<size_t,boost::lockfree::capacity<40000> > lockfree_queue; // boost::lockfree::queue boost裏面的無鎖隊列,惟一比較蛋疼的就是空間最大65536以及無法輸出size,其餘的就將就用吧。 // 隊列長度能夠自定義,也能夠不定義,會自增加的。 size_t num = 0; void StartDaemonUpdater(){ boost::function0<void> f = boost::bind(&DynamicController::UpdaterWorker, this); boost::thread thrd(f); thrd.detach(); } // 啓動消費者隊列 void Producer(size_t number){ bool succ = lockfree_queue.bounded_push(number); // 若是用push的話,沒空間的話,會等待消費完。 // bounded_push的話,若是每空間會返回false,而後棄掉這個數。成功返回true } // 生產者 size_t GetNumber( return num; } // get代碼 void UpdaterWorker(void){ std::vector<size_t> V; while(1){//穩妥起見,這個while裏面能夠寫個sleep以致於不須要一直在消費。 size_t tmp_value; while(lockfree_queue.pop(tmp_value)){ V.push_back(tmp_value); // 更新條件,10000個數 // 用p99更新 if(V.size()>10000){ std::sort(V.begin(),V.end()); num = V[size_t(V.size()*0.5)]; V.clear(); } } } } // 消費者 }; } #endif