基於ACE_Message_Queue的生產者消費者模式

1.生產者儘量生產; this

2.消費者按照自身需求執行消費行爲; spa

code: 線程


class CMediaSendBlock : public ACE_Message_Block
{
public:
 CMediaSendBlock( const char* pData, const unsigned long ulDataLen)
  :    ACE_Message_Block((size_t)ulDataLen, MB_DATA, 0, 0)
 {
  if (NULL != pData)
  {
   (void)this->copy(pData, (size_t) ulDataLen);
  }
 } code

 virtual ~CMediaSendBlock() {}
}; ci

class CMediaSendQueue : public ACE_Message_Queue<ACE_MT_SYNCH>
{
public:
 CMediaSendQueue() {}
 virtual ~CMediaSendQueue() {} get

 int enqueue_tail_ex(CMediaSendBlock* pBlock)
 {
  ACE_Time_Value timeValue = ACE_OS::gettimeofday() + ACE_Time_Value(0, 10);
  int nRetVal = ACE_Message_Queue<ACE_MT_SYNCH>::enqueue_tail(pBlock, &timeValue); it

  if (0 >= nRetVal)
  {
   return -1;
  } ast

  return nRetVal;
 }
}; class

CMediaSendQueue        m_sendQueue;
bool                   m_bRunning = true; thread

//生產者
//生產者儘量快的生產
void* produce(void *arg)
{
 static int iThreadIndex = -1;
 ++iThreadIndex;
 std::cout << "this is produce thread num " << iThreadIndex << std::endl;

 int iSize = 1024 * 1024;
 char* pBuff = new char[iSize];
 memset(pBuff, 0x0, iSize);

 while(m_bRunning)
 {
  CMediaSendBlock* pBlock = new CMediaSendBlock( pBuff, iSize);
  int iRet = m_sendQueue.enqueue_tail(pBlock, NULL); //等待到達低水位

  if (0 >= iRet)
  {
   delete pBlock;
   pBlock = NULL;

   std::cout << "center is full...." << std::endl;
  }
  else
  {
   std::cout << "push success...." << std::endl;
  }
 }

 std::cout << "produce task finished...." << std::endl;
 return NULL;
}

//消費者
//消費者按照自身須要的速度進行消費
void* consume(void *arg)
{
 static int iThreadIndex = -1;
 ++iThreadIndex;
 std::cout << "this is consume thread num " << iThreadIndex << std::endl;

 while(m_bRunning)
 {
  ACE_Message_Block* pBlock = NULL;
  if (-1 == m_sendQueue.dequeue_head(pBlock))
   continue;

  //發送
  CMediaSendBlock* pSendBlock = dynamic_cast<CMediaSendBlock*>(pBlock);
  if(NULL == pSendBlock)
   continue;

  std::cout << "I am consuming.... " << std::endl;

  //釋放block
  delete pSendBlock;
  pSendBlock = NULL;

  std::cout << "message_bytes" << std::dec << m_sendQueue.message_bytes() << std::endl;
  std::cout << "message_length" << std::dec << m_sendQueue.message_length() << std::endl;

  ACE_OS::sleep(2);
 }

 std::cout << "consume over......" << std::endl;
 return NULL;
}

int main(int argc, char* argv[])
{
 ACE::init();

 m_sendQueue.high_water_mark(10 * 1024 * 1024);
 m_sendQueue.low_water_mark(2 * 1024 * 1024);
 m_sendQueue.activate();

 //m個生產者,n個消費者
 //產生生產者線程
 ACE_Thread_Manager::instance()->spawn_n
  (
  5,
  (ACE_THR_FUNC) produce
  );

 ACE_OS::sleep(2); //讓生產者填滿倉庫

 ////產生消費者線程
 ACE_Thread_Manager::instance()->spawn_n
  (
  2,
  (ACE_THR_FUNC) consume
  );

 int iData;
 std::cin >> iData;

 //close
 m_bRunning = false;
 m_sendQueue.deactivate();

 //wait
 ACE_OS::sleep(2);

 ACE::fini();  return 0; }

相關文章
相關標籤/搜索