詳解boost庫中的Message Queue .

Message Queue(後文簡寫成MQ或消息隊列)是boost庫中用來封裝進程間通訊的一種實現,同一臺機器上的進程或線程能夠經過消息隊列來進行通迅。消息隊列中的消息由優先級、消息長度、消息數據三部分組成。這裏須要注意的事,MQ只是簡單的將要發送的數據在內存中進行拷貝,因此咱們在發送複雜結構或對象時,咱們須要將其序列化後再發送,接收端接收時要反序列化,也就是說咱們要本身去 定義區分一條消息(就是自定義網絡通迅協議)。在MQ中,咱們能夠使用三模式去發送和接收消息:
  1. 阻塞:在發送消息時,若消息隊列滿了,那麼發送接口將會阻塞直到隊列沒有滿。在接收消息時,若隊列爲空,那麼接收接口也會阻塞直到隊列不空。
  2. 超時:用戶能夠自定義超時時間,在超時時間到了,那麼發送接口或接收接口都會返回,不管隊列滿或空
  3. Try:在隊列爲空或滿時,都能當即返回
MQ使用命名的共享內存來實現進程間通訊。共享內存換句話來講,就是用戶能夠指定一個名稱來建立一塊共享內存,而後像打一個文件同樣去打開這塊共享內存,一樣別的進程也能夠根據這個名稱來打開這塊共享內存,這樣一個進程向共享內存中寫,另外一個進程就能夠從共享內存中讀。這裏兩個進程的讀寫就涉及到同步問題。另外, 在建立一個MQ時,咱們須要指定MQ的最大消息數量以及消息的最大size。
 
  1. //Create a message_queue. If the queue   
  2. //exists throws an exception   
  3. message_queue mq  
  4.    (create_only         //only create   
  5.    ,"message_queue"     //name   
  6.    ,100                 //max message number   
  7.    ,100                 //max message size   
  8.    );  
  9. using boost::interprocess;  
  10. //Creates or opens a message_queue. If the queue   
  11. //does not exist creates it, otherwise opens it.   
  12. //Message number and size are ignored if the queue   
  13. //is opened   
  14. message_queue mq  
  15.    (open_or_create      //open or create   
  16.    ,"message_queue"     //name   
  17.    ,100                 //max message number   
  18.    ,100                 //max message size   
  19.    );  
  20. using boost::interprocess;  
  21. //Opens a message_queue. If the queue   
  22. //does not exist throws an exception.   
  23. message_queue mq  
  24.    (open_only           //only open   
  25.    ,"message_queue"     //name   
  26.    );  
使用message_queue::remove("message_queue");來移除一個指定的消息隊列。
接下來,咱們看一個使用消息隊列的生產者與消息者的例子。第一個進程作爲生產者,第二個進程作爲消費者。
生產者進程:
  1. #include <boost/interprocess/ipc/message_queue.hpp>   
  2. #include <iostream>   
  3. #include <vector>   
  4.   
  5. using namespace boost::interprocess;  
  6.   
  7. int main ()  
  8. {  
  9.    try{  
  10.       //Erase previous message queue   
  11.       message_queue::remove("message_queue");  
  12.   
  13.       //Create a message_queue.   
  14.       message_queue mq  
  15.          (create_only               //only create   
  16.          ,"message_queue"           //name   
  17.          ,100                       //max message number   
  18.          ,sizeof(int)               //max message size   
  19.          );  
  20.   
  21.       //Send 100 numbers   
  22.       for(int i = 0; i < 100; ++i){  
  23.          mq.send(&i, sizeof(i), 0);  
  24.       }  
  25.    }  
  26.    catch(interprocess_exception &ex){  
  27.       std::cout << ex.what() << std::endl;  
  28.       return 1;  
  29.    }  
  30.   
  31.    return 0;  
  32. }  
消費者進程:
  1. #include <boost/interprocess/ipc/message_queue.hpp>   
  2. #include <iostream>   
  3. #include <vector>   
  4.   
  5. using namespace boost::interprocess;  
  6.   
  7. int main ()  
  8. {  
  9.    try{  
  10.       //Open a message queue.   
  11.       message_queue mq  
  12.          (open_only        //only create   
  13.          ,"message_queue"  //name   
  14.          );  
  15.   
  16.       unsigned int priority;  
  17.       message_queue::size_type recvd_size;  
  18.   
  19.       //Receive 100 numbers   
  20.       for(int i = 0; i < 100; ++i){  
  21.          int number;  
  22.          mq.receive(&number, sizeof(number), recvd_size, priority);  
  23.          if(number != i || recvd_size != sizeof(number))  
  24.             return 1;  
  25.       }  
  26.    }  
  27.    catch(interprocess_exception &ex){  
  28.       message_queue::remove("message_queue");  
  29.       std::cout << ex.what() << std::endl;  
  30.       return 1;  
  31.    }  
  32.    message_queue::remove("message_queue");  
  33.    return 0;  
  34. }  
相關文章
相關標籤/搜索