1.生產者儘量生產; this
2.消費者按照自身需求執行消費行爲; spa
code: 線程
class CMediaSendBlock : public ACE_Message_Block
{
public:
CMediaSendBlock( const char* pData, const unsigned long ulDataLen)
: ACE_Message_Block((size_t)ulDataLen, MB_DATA, 0, 0)
{
if (NULL != pData)
{
(void)this->copy(pData, (size_t) ulDataLen);
}
} code
virtual ~CMediaSendBlock() {}
}; ci
class CMediaSendQueue : public ACE_Message_Queue<ACE_MT_SYNCH>
{
public:
CMediaSendQueue() {}
virtual ~CMediaSendQueue() {} get
int enqueue_tail_ex(CMediaSendBlock* pBlock)
{
ACE_Time_Value timeValue = ACE_OS::gettimeofday() + ACE_Time_Value(0, 10);
int nRetVal = ACE_Message_Queue<ACE_MT_SYNCH>::enqueue_tail(pBlock, &timeValue); it
if (0 >= nRetVal)
{
return -1;
} ast
return nRetVal;
}
}; class
CMediaSendQueue m_sendQueue;
bool m_bRunning = true; thread
//生產者
//生產者儘量快的生產
void* produce(void *arg)
{
static int iThreadIndex = -1;
++iThreadIndex;
std::cout << "this is produce thread num " << iThreadIndex << std::endl;
int iSize = 1024 * 1024;
char* pBuff = new char[iSize];
memset(pBuff, 0x0, iSize);
while(m_bRunning)
{
CMediaSendBlock* pBlock = new CMediaSendBlock( pBuff, iSize);
int iRet = m_sendQueue.enqueue_tail(pBlock, NULL); //等待到達低水位
if (0 >= iRet)
{
delete pBlock;
pBlock = NULL;
std::cout << "center is full...." << std::endl;
}
else
{
std::cout << "push success...." << std::endl;
}
}
std::cout << "produce task finished...." << std::endl;
return NULL;
}
//消費者
//消費者按照自身須要的速度進行消費
void* consume(void *arg)
{
static int iThreadIndex = -1;
++iThreadIndex;
std::cout << "this is consume thread num " << iThreadIndex << std::endl;
while(m_bRunning)
{
ACE_Message_Block* pBlock = NULL;
if (-1 == m_sendQueue.dequeue_head(pBlock))
continue;
//發送
CMediaSendBlock* pSendBlock = dynamic_cast<CMediaSendBlock*>(pBlock);
if(NULL == pSendBlock)
continue;
std::cout << "I am consuming.... " << std::endl;
//釋放block
delete pSendBlock;
pSendBlock = NULL;
std::cout << "message_bytes" << std::dec << m_sendQueue.message_bytes() << std::endl;
std::cout << "message_length" << std::dec << m_sendQueue.message_length() << std::endl;
ACE_OS::sleep(2);
}
std::cout << "consume over......" << std::endl;
return NULL;
}
int main(int argc, char* argv[])
{
ACE::init();
m_sendQueue.high_water_mark(10 * 1024 * 1024);
m_sendQueue.low_water_mark(2 * 1024 * 1024);
m_sendQueue.activate();
//m個生產者,n個消費者
//產生生產者線程
ACE_Thread_Manager::instance()->spawn_n
(
5,
(ACE_THR_FUNC) produce
);
ACE_OS::sleep(2); //讓生產者填滿倉庫
////產生消費者線程
ACE_Thread_Manager::instance()->spawn_n
(
2,
(ACE_THR_FUNC) consume
);
int iData;
std::cin >> iData;
//close
m_bRunning = false;
m_sendQueue.deactivate();
//wait
ACE_OS::sleep(2);
ACE::fini(); return 0; }