消息隊列是線程間通訊比較經常使用得方式,經常使用於解決經典模型生產者——消費者模型線程間得通訊。異步
本文將結束基於C++標準庫實現得消息隊列,能夠支持任意參數類型,任務參數數量。測試
爲了方便後續線程池、異步隊列得實現,這裏提取了公共基類。this
class QueueObject : public noncopyable { public: QueueObject() :m_bStop(false), m_nCapacity(MAX_QUEUE_CAPACITY) { } virtual ~QueueObject() { } void Stop() { m_bStop.store(true); m_condPop.notify_all(); // 喚醒全部線程執行 } //設置最大容量 void SetMaxCapacity(int nMax) { m_nCapacity = nMax; } //獲取隊列任務數量 virtual size_t GetTaskNum() = 0; bool IsStop() { return m_bStop; } protected: int m_nCapacity = 0; //隊列最大容量 std::condition_variable_any m_condPush; //寫入條件量 std::condition_variable_any m_condPop; //讀取條件量 std::mutex m_mu; //互斥鎖 // 是否關閉提交 std::atomic<bool> m_bStop; };
消息隊列實現atom
template<typename T, typename... ARGS> class CMsgQueue : public QueueObject { public: using QueueObject::QueueObject; void Push(T val, const ARGS... args) { while (m_dataQueue.size() == m_nCapacity) //隊列已滿 { m_condPush.wait(m_mu); //等待,將暫時的解鎖 } m_dataQueue.emplace(std::make_tuple(val, args...)); m_condPop.notify_one(); // 喚醒一個線程執行 } //批量獲取參數值 bool Pop(std::tuple<T, ARGS...>& value, int waitTime = -1) { std::unique_lock<std::mutex> lock(m_mu); if (waitTime < 0) { this->m_condPop.wait(lock, [this] { return !this->m_dataQueue.empty(); }); // wait 直到有 task } else { auto status = m_condPop.wait_for(lock, std::chrono::seconds(waitTime), [this] { return !this->m_dataQueue.empty(); }); if (!status ) { return false; } } value = std::move(this->m_dataQueue.front()); // 取一個 task this->m_dataQueue.pop(); //通知寫線程 m_condPush.notify_one(); return true; } bool Pop( T& value, ARGS&... args, int waitTime = -1) { std::tuple<T,ARGS...> tupVal; if (Pop(tupVal, waitTime)) { FetchParam<0>(tupVal, value, args...); return true; } return false; } template<int NUM, typename P, typename...PARMS> void FetchParam(std::tuple<T,ARGS...>& tupVal, P& p, PARMS&... params) { p = std::get<NUM>(tupVal); FetchParam<NUM+1>(tupVal, params...); } template<int NUM, typename P> void FetchParam(std::tuple<T,ARGS...>& tupVal, P& p) { p = std::get<NUM>(tupVal); } //獲取隊列任務數量 virtual size_t GetTaskNum() { return m_dataQueue.size(); } private: std::queue<std::tuple<T, ARGS...>> m_dataQueue; };
測試:線程
int main() { CMsgQueue<std::string, int, int> mq; mq.Push("test", 10,20); mq.Push("test2", 100,200); std::string val; int num1, num2; mq.Pop(val, num1, num2); std::cout << val << " " << num1 << " " << num2 << std::endl; mq.Pop( val, num1, num2); std::cout << val << " " << num1 << " " << num2 << std::endl; return 0; }