使用stl的queue實現線程安全隊列

簡介

近日有朋友問起線程安全隊列的問題。本文基於stl的queue容器實現了線程安全的隊列,可多線程生產,多線程消費。同時與基於boost的circular_buffer實現的環形緩衝區相比較,性能略優(實驗測試下來優點也不大,不到5%)。源碼比較簡單,使用stl和boost實現,而且實現了超過隊列最大長度丟棄消息的功能。ios

實驗環境準備

CPU主要參數:intel 2.3GHz,4核
內存:12G
操做系統:windows7windows

實驗結果

線程數爲0,表示生產線程和消費線程是同一線程,統一輩子產完後,從消費第一條開始的計算時間。
其餘的是在一個線程生產,N(N>=1)個線程消費的狀況。
clipboard.png
折線圖顯示如圖所示,能夠看到使用queue實現的隊列性能最優,circular_buffer其次,list稍差。
圖片描述安全

源碼

在simplethreadqueue.h中註釋的部分是使用boost的circular_buffer實現的。
simplethreadqueue.h多線程

#ifndef CSIMPLETHREADQUEUE_H
#define CSIMPLETHREADQUEUE_H
#include <string>
#include <queue>
#include <iostream>
#include "boost/timer.hpp"
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread/thread.hpp> 
#include <boost/circular_buffer.hpp>

using namespace std;
using namespace boost;
extern long g_nMaxCount;

template <class T>
class CSimpleThreadQueue
{
public:
    CSimpleThreadQueue():m_nSize(0),m_nConsumed(0){/*m_container.resize(g_nMaxCount);*/}
    size_t size(){return m_nSize;}
    long GetConsumed(){return m_nConsumed;}
    void EnQueue(T item)
    {
        m_mutex.lock();    
        while(m_nSize >= g_nMaxCount)
        {
            m_container.pop();
            --m_nSize;
        }
        m_container.push(item);

        //m_container.push_back(item);
        ++m_nSize;
        m_cond.notify_one();
        m_mutex.unlock();
    }
    void Dequeue(T& item)
    {
        while (true)
        {
            m_mutex.lock();
            if ( m_nSize > 0)
                break;
            m_cond.wait_for(m_mutex, boost::chrono::microseconds(1));
            m_mutex.unlock();
        }

        item = m_container.front();
        m_container.pop();
        //m_container.pop_front();
        -- m_nSize;
        ++m_nConsumed;
        m_mutex.unlock();
    }
    
private:
    std::queue<T> m_container;
    //circular_buffer<T> m_container;
    size_t m_nSize;
    boost::mutex m_mutex;
    condition_variable_any m_cond;
    long m_nConsumed;
};
#endif

main.cpp性能

#include "simplethreadqueue.h"
#include <boost/date_time/posix_time/posix_time.hpp>

long g_nMaxCount = 500000;//100w
bool g_bRunning = true;
CSimpleThreadQueue<string> g_queue;
boost::mutex g_mutex;

void CallbackMethod(string& strMessage)
{
    int sum = 0;
    for(int i = 0; i < 1000; ++ i)
         sum += i;
    //cout<<strMessage<<endl;
}

void ProduceMessageInit()
{
    for(long long i = 0; i < g_nMaxCount; ++ i)
        g_queue.EnQueue("Test message."/*std::to_string(i)*/);
}

void ProduceMessage()
{
    //static long long i = 0;
    while(g_bRunning)
    {
        g_queue.EnQueue("Test message."/*std::to_string(++i)*/);
    }
}

void ConsumeMessage()
{
    string strMessge;
    //static timer t;
    static boost::posix_time::ptime t1 = boost::posix_time::microsec_clock::universal_time();
    //static long nCount = 0;
    if(g_queue.size() > 0 && g_queue.GetConsumed() < g_nMaxCount)
    {
        g_queue.Dequeue(strMessge);
        //++ nCount;
    }
    else
    {
        g_mutex.lock();
        if(g_bRunning)
        {
            g_bRunning = false;
            boost::posix_time::ptime t2 = boost::posix_time::microsec_clock::universal_time();
            cout<<g_queue.GetConsumed()<<" consumed!"<<endl;
            cout<<t2 - t1 <<"s"<<endl;
        }
        g_mutex.unlock();
    }
    CallbackMethod(strMessge);
}

void ConsumeAllMessage()
{
    while(g_bRunning)
    {
        ConsumeMessage();
    }
}

int main(int argc, char* argv[])
{
    if(argc <= 1)//單線程先生產消息再消費模型
    {
        ProduceMessageInit();
        ConsumeAllMessage();
        return 0;
    }
    //單線程生產多線程消費模型
    ProduceMessageInit();
    
    thread_group tg;
    tg.create_thread(boost::bind(ProduceMessage));
    int nThreadCount = atoi(argv[1]);
    if(nThreadCount <= 0)
        return -1;
    for(int i = 0; i < nThreadCount ; ++i)
        tg.create_thread(boost::bind(ConsumeAllMessage));
    tg.join_all();
    return 0;
}
相關文章
相關標籤/搜索