近日有朋友問起線程安全隊列的問題。本文基於stl的queue容器實現了線程安全的隊列,可多線程生產,多線程消費。同時與基於boost的circular_buffer實現的環形緩衝區相比較,性能略優(實驗測試下來優點也不大,不到5%)。源碼比較簡單,使用stl和boost實現,而且實現了超過隊列最大長度丟棄消息的功能。ios
CPU主要參數:intel 2.3GHz,4核
內存:12G
操做系統:windows7windows
線程數爲0,表示生產線程和消費線程是同一線程,統一輩子產完後,從消費第一條開始的計算時間。
其餘的是在一個線程生產,N(N>=1)個線程消費的狀況。
折線圖顯示如圖所示,能夠看到使用queue實現的隊列性能最優,circular_buffer其次,list稍差。安全
在simplethreadqueue.h中註釋的部分是使用boost的circular_buffer實現的。
simplethreadqueue.h多線程
#ifndef CSIMPLETHREADQUEUE_H #define CSIMPLETHREADQUEUE_H #include <string> #include <queue> #include <iostream> #include "boost/timer.hpp" #include <boost/date_time/posix_time/posix_time.hpp> #include <boost/thread/thread.hpp> #include <boost/circular_buffer.hpp> using namespace std; using namespace boost; extern long g_nMaxCount; template <class T> class CSimpleThreadQueue { public: CSimpleThreadQueue():m_nSize(0),m_nConsumed(0){/*m_container.resize(g_nMaxCount);*/} size_t size(){return m_nSize;} long GetConsumed(){return m_nConsumed;} void EnQueue(T item) { m_mutex.lock(); while(m_nSize >= g_nMaxCount) { m_container.pop(); --m_nSize; } m_container.push(item); //m_container.push_back(item); ++m_nSize; m_cond.notify_one(); m_mutex.unlock(); } void Dequeue(T& item) { while (true) { m_mutex.lock(); if ( m_nSize > 0) break; m_cond.wait_for(m_mutex, boost::chrono::microseconds(1)); m_mutex.unlock(); } item = m_container.front(); m_container.pop(); //m_container.pop_front(); -- m_nSize; ++m_nConsumed; m_mutex.unlock(); } private: std::queue<T> m_container; //circular_buffer<T> m_container; size_t m_nSize; boost::mutex m_mutex; condition_variable_any m_cond; long m_nConsumed; }; #endif
main.cpp性能
#include "simplethreadqueue.h" #include <boost/date_time/posix_time/posix_time.hpp> long g_nMaxCount = 500000;//100w bool g_bRunning = true; CSimpleThreadQueue<string> g_queue; boost::mutex g_mutex; void CallbackMethod(string& strMessage) { int sum = 0; for(int i = 0; i < 1000; ++ i) sum += i; //cout<<strMessage<<endl; } void ProduceMessageInit() { for(long long i = 0; i < g_nMaxCount; ++ i) g_queue.EnQueue("Test message."/*std::to_string(i)*/); } void ProduceMessage() { //static long long i = 0; while(g_bRunning) { g_queue.EnQueue("Test message."/*std::to_string(++i)*/); } } void ConsumeMessage() { string strMessge; //static timer t; static boost::posix_time::ptime t1 = boost::posix_time::microsec_clock::universal_time(); //static long nCount = 0; if(g_queue.size() > 0 && g_queue.GetConsumed() < g_nMaxCount) { g_queue.Dequeue(strMessge); //++ nCount; } else { g_mutex.lock(); if(g_bRunning) { g_bRunning = false; boost::posix_time::ptime t2 = boost::posix_time::microsec_clock::universal_time(); cout<<g_queue.GetConsumed()<<" consumed!"<<endl; cout<<t2 - t1 <<"s"<<endl; } g_mutex.unlock(); } CallbackMethod(strMessge); } void ConsumeAllMessage() { while(g_bRunning) { ConsumeMessage(); } } int main(int argc, char* argv[]) { if(argc <= 1)//單線程先生產消息再消費模型 { ProduceMessageInit(); ConsumeAllMessage(); return 0; } //單線程生產多線程消費模型 ProduceMessageInit(); thread_group tg; tg.create_thread(boost::bind(ProduceMessage)); int nThreadCount = atoi(argv[1]); if(nThreadCount <= 0) return -1; for(int i = 0; i < nThreadCount ; ++i) tg.create_thread(boost::bind(ConsumeAllMessage)); tg.join_all(); return 0; }