C++ 九陰真經之線程間通訊(消息隊列)

消息隊列是線程間通訊比較經常使用得方式,經常使用於解決經典模型生產者——消費者模型線程間得通訊。異步

本文將結束基於C++標準庫實現得消息隊列,能夠支持任意參數類型,任務參數數量。測試

爲了方便後續線程池、異步隊列得實現,這裏提取了公共基類。this

class QueueObject : public noncopyable
{
public:
	QueueObject() :m_bStop(false), m_nCapacity(MAX_QUEUE_CAPACITY)
	{
	}

	virtual ~QueueObject()
	{
	}

	void Stop()
	{
		m_bStop.store(true);
		m_condPop.notify_all(); // 喚醒全部線程執行
	}

	//設置最大容量
	void SetMaxCapacity(int nMax)
	{
		m_nCapacity = nMax;
	}
	//獲取隊列任務數量
	virtual size_t GetTaskNum() = 0;


	bool IsStop()
	{
		return m_bStop;
	}

protected:
	int m_nCapacity = 0;                     //隊列最大容量
	std::condition_variable_any m_condPush;  //寫入條件量
	std::condition_variable_any m_condPop;   //讀取條件量
	std::mutex m_mu;   //互斥鎖 

	// 是否關閉提交
	std::atomic<bool> m_bStop;

};

消息隊列實現atom

template<typename T, typename... ARGS>
class CMsgQueue : public QueueObject
{
public:
	using QueueObject::QueueObject;
	void Push(T val, const ARGS... args)
	{
		while (m_dataQueue.size() == m_nCapacity)         //隊列已滿
		{
			m_condPush.wait(m_mu);                         //等待,將暫時的解鎖
		}

		m_dataQueue.emplace(std::make_tuple(val, args...));

		m_condPop.notify_one(); // 喚醒一個線程執行
	}

	//批量獲取參數值
	bool Pop(std::tuple<T, ARGS...>& value, int waitTime = -1)
	{
		std::unique_lock<std::mutex> lock(m_mu);
		if (waitTime < 0)
		{
			this->m_condPop.wait(lock,
				[this] {
				return  !this->m_dataQueue.empty();
			}); // wait 直到有 task
		}
		else
		{
			auto status = m_condPop.wait_for(lock, std::chrono::seconds(waitTime), [this] {
				return  !this->m_dataQueue.empty();
			});
			if (!status )
			{
				return false;
			}
		}

		value = std::move(this->m_dataQueue.front()); // 取一個 task
		this->m_dataQueue.pop();

		//通知寫線程
		m_condPush.notify_one();

		return true;
	}

  	
	bool Pop( T& value, ARGS&... args, int waitTime = -1)
	{
		std::tuple<T,ARGS...> tupVal;
		if (Pop(tupVal, waitTime))
		{
			FetchParam<0>(tupVal, value, args...);
			return true;
		}
		return false;
	}


    template<int NUM, typename P, typename...PARMS>
    void FetchParam(std::tuple<T,ARGS...>& tupVal,  P& p, PARMS&... params)
    {
        p = std::get<NUM>(tupVal);
        FetchParam<NUM+1>(tupVal,  params...);
    }

    template<int NUM, typename P>
    void FetchParam(std::tuple<T,ARGS...>& tupVal, P& p)
    {
        p = std::get<NUM>(tupVal);
    } 

	//獲取隊列任務數量
	virtual size_t GetTaskNum()
	{
		return m_dataQueue.size();
	}

private:
	std::queue<std::tuple<T, ARGS...>> m_dataQueue;
};

測試:線程

int main()
{
    CMsgQueue<std::string, int, int> mq;

    mq.Push("test", 10,20);
    mq.Push("test2", 100,200);

    std::string val;
    int num1, num2;
    mq.Pop(val, num1, num2);
    std::cout << val << "   " << num1 << "  " << num2 << std::endl;
    mq.Pop( val, num1, num2);
    std::cout << val << "   " << num1 << "  " << num2 << std::endl;
    return 0;
}
相關文章
相關標籤/搜索