boost的asio接收單路大數據量udp包的方法

開發windows客戶端接收RTP視頻流,當h264視頻達到1080P 60fps的時候,按包來調用recvfrom的函數壓力比較大,存在丟包的問題,windows的完成端口的性能效果固然能夠解決這個問題,而boost的asio在windows上是基於完成端口來開發的,因此採用boost的asio和環形緩衝區的方法,能夠解決接收單路大數據量udp包中丟包的問題。ios

    須要引入的頭文件爲:windows

 

[cpp]  view plain  copy
 
  1. #include "CircledBuffer.h"  
  2. #include <iostream>  
  3. #include <boost/asio.hpp>  
  4. #include <boost/bind.hpp>  

其中CircledBuffer.h是自定義的緩衝區的類,以後會有介紹,boost的兩個文件是asio必需的兩個文件。多線程

 

    須要定義的全局變量爲:異步

 

[cpp]  view plain  copy
 
  1. using boost::asio::ip::udp;  
  2. boost::asio::io_service service;  
  3. boost::asio::ip::udp::socket sock(service);  
  4. boost::asio::ip::udp::endpoint sender_ep;  
  5. CircledBuffer readBuffer;  
  6. PacketBuffer* packet;  

其中io_service是用來標示啓動的,後面會調用run。sock和endpoint相似於描述符和sockaddr_in的關係。CircledBuffer和PacketBuffer*,是自定義緩衝區。socket

 

 

主函數爲:async

 

[cpp]  view plain  copy
 
  1. int main(int argc, char* argv[]) {  
  2.     boost::asio::ip::udp::endpoint ep( boost::asio::ip::address::from_string("192.168.1.206"),  
  3.         9002);  
  4.     sock.open(ep.protocol());  
  5.     sock.set_option(boost::asio::ip::udp::socket::reuse_address(true));  
  6.     boost::asio::socket_base::receive_buffer_size recv_option(8*65534);  
  7.     sock.set_option(recv_option);  
  8.     sock.bind(ep);  
  9.     packet = readBuffer.GetLast();  
  10.     sock.async_receive_from(boost::asio::buffer(packet->data, packet->bufferSize), sender_ep, &on_read);  
  11.     service.run();  
  12. }  


初始化ep和sock,其中udp接收的數量比較大的話,須要設定receive_buffer_size,而後bind,設置接受buffer爲packet。函數

 

介紹一下async_receive_from函數,它有三個參數,分別爲接收的buffer,遠端的ep,注意與本端的ep不一樣,遠端的ep不用初始化設置,再就是buffer收滿後的回調函數。性能

 

回調函數的內容是:大數據

 

[cpp]  view plain  copy
 
  1. void on_read(const boost::system::error_code & err, std::size_t  
  2.              read_bytes) {  
  3.         std::cout << "read: " << read_bytes << std::endl;  
  4.         readBuffer.MoveNext();  
  5.         packet= readBuffer.GetLast();  
  6.         sock.async_receive_from(boost::asio::buffer(packet->data, packet->bufferSize), sender_ep, &on_read);  
  7. }  

與main函數的接收部分一致,這裏用了不斷的自身回調,來實現while recvfrom的功能。this

 

補充說一句,用申請好的CircledBuffer,便於後期的多線程或者異步strand的處理,而不阻塞接收。

緩衝區類的代碼:

頭文件:

 

[cpp]  view plain  copy
 
  1. #ifndef CIRCLED_BUFFER_H  
  2. #define CIRCLED_BUFFER_H  
  3.   
  4. #include <memory.h>  
  5. #include <boost/atomic.hpp>  
  6. #define CIRCLED_BUFFER_SIZE 300  
  7. #define BUFFER_SIZE 2000  
  8.   
  9. struct PacketBuffer  
  10. {  
  11.     PacketBuffer(){bufferSize=BUFFER_SIZE;dataSize=0;}   
  12.     unsigned int bufferSize;  
  13.     unsigned int dataSize;  
  14.     char data[BUFFER_SIZE];  
  15.   
  16.     PacketBuffer& operator=(PacketBuffer& other)  
  17.     {  
  18.         memcpy(data,other.data,other.dataSize);  
  19.         dataSize = other.dataSize;  
  20.         bufferSize = other.bufferSize;  
  21.         return *this;  
  22.     }  
  23. };  
  24.   
  25. class CircledBuffer  
  26. {  
  27. public:  
  28.     CircledBuffer(unsigned int bufSize=CIRCLED_BUFFER_SIZE);  
  29. public:  
  30.     ~CircledBuffer(void);  
  31.     PacketBuffer* GetAt(unsigned int idx){return &packets[idx];}  
  32.     PacketBuffer* GetLast()  
  33.     {         
  34.         return GetAt(writeIndex.load(boost::memory_order_consume));  
  35.     };  
  36.     void MoveNext()  
  37.     {  
  38.         unsigned int idx = writeIndex.load(boost::memory_order_relaxed);  
  39.         writeIndex.store((idx+1)%bufferSize,boost::memory_order_release);  
  40.     };  
  41.     unsigned int GetLastIndex(){return writeIndex.load(boost::memory_order_consume);};  
  42.     unsigned int GetSize(){return bufferSize;};  
  43. protected:  
  44.     boost::atomic<unsigned int> writeIndex;  
  45.     unsigned int bufferSize;  
  46.     PacketBuffer* packets;  
  47. };  
  48. #endif  


緩衝區類的構造函數與析構函數

 

 

[cpp]  view plain  copy
 
  1. #include "CircledBuffer.h"  
  2.   
  3. CircledBuffer::CircledBuffer(unsigned int bufSize)  
  4. :bufferSize(bufSize),  
  5. writeIndex(0)  
  6. {  
  7.     packets = new PacketBuffer[bufSize];  
  8. }  
  9.   
  10. CircledBuffer::~CircledBuffer(void)  
  11. {  
  12.     delete []packets;  
  13. }  


源代碼下載連接

相關文章
相關標籤/搜索