開發windows客戶端接收RTP視頻流,當h264視頻達到1080P 60fps的時候,按包來調用recvfrom的函數壓力比較大,存在丟包的問題,windows的完成端口的性能效果固然能夠解決這個問題,而boost的asio在windows上是基於完成端口來開發的,因此採用boost的asio和環形緩衝區的方法,能夠解決接收單路大數據量udp包中丟包的問題。ios
須要引入的頭文件爲:windows
- #include "CircledBuffer.h"
- #include <iostream>
- #include <boost/asio.hpp>
- #include <boost/bind.hpp>
其中CircledBuffer.h是自定義的緩衝區的類,以後會有介紹,boost的兩個文件是asio必需的兩個文件。多線程
須要定義的全局變量爲:異步
- using boost::asio::ip::udp;
- boost::asio::io_service service;
- boost::asio::ip::udp::socket sock(service);
- boost::asio::ip::udp::endpoint sender_ep;
- CircledBuffer readBuffer;
- PacketBuffer* packet;
其中io_service是用來標示啓動的,後面會調用run。sock和endpoint相似於描述符和sockaddr_in的關係。CircledBuffer和PacketBuffer*,是自定義緩衝區。socket
主函數爲:async
- int main(int argc, char* argv[]) {
- boost::asio::ip::udp::endpoint ep( boost::asio::ip::address::from_string("192.168.1.206"),
- 9002);
- sock.open(ep.protocol());
- sock.set_option(boost::asio::ip::udp::socket::reuse_address(true));
- boost::asio::socket_base::receive_buffer_size recv_option(8*65534);
- sock.set_option(recv_option);
- sock.bind(ep);
- packet = readBuffer.GetLast();
- sock.async_receive_from(boost::asio::buffer(packet->data, packet->bufferSize), sender_ep, &on_read);
- service.run();
- }
初始化ep和sock,其中udp接收的數量比較大的話,須要設定receive_buffer_size,而後bind,設置接受buffer爲packet。函數
介紹一下async_receive_from函數,它有三個參數,分別爲接收的buffer,遠端的ep,注意與本端的ep不一樣,遠端的ep不用初始化設置,再就是buffer收滿後的回調函數。性能
回調函數的內容是:大數據
- void on_read(const boost::system::error_code & err, std::size_t
- read_bytes) {
- std::cout << "read: " << read_bytes << std::endl;
- readBuffer.MoveNext();
- packet= readBuffer.GetLast();
- sock.async_receive_from(boost::asio::buffer(packet->data, packet->bufferSize), sender_ep, &on_read);
- }
與main函數的接收部分一致,這裏用了不斷的自身回調,來實現while recvfrom的功能。this
補充說一句,用申請好的CircledBuffer,便於後期的多線程或者異步strand的處理,而不阻塞接收。
緩衝區類的代碼:
頭文件:
- #ifndef CIRCLED_BUFFER_H
- #define CIRCLED_BUFFER_H
- #include <memory.h>
- #include <boost/atomic.hpp>
- #define CIRCLED_BUFFER_SIZE 300
- #define BUFFER_SIZE 2000
- struct PacketBuffer
- {
- PacketBuffer(){bufferSize=BUFFER_SIZE;dataSize=0;}
- unsigned int bufferSize;
- unsigned int dataSize;
- char data[BUFFER_SIZE];
- PacketBuffer& operator=(PacketBuffer& other)
- {
- memcpy(data,other.data,other.dataSize);
- dataSize = other.dataSize;
- bufferSize = other.bufferSize;
- return *this;
- }
- };
- class CircledBuffer
- {
- public:
- CircledBuffer(unsigned int bufSize=CIRCLED_BUFFER_SIZE);
- public:
- ~CircledBuffer(void);
- PacketBuffer* GetAt(unsigned int idx){return &packets[idx];}
- PacketBuffer* GetLast()
- {
- return GetAt(writeIndex.load(boost::memory_order_consume));
- };
- void MoveNext()
- {
- unsigned int idx = writeIndex.load(boost::memory_order_relaxed);
- writeIndex.store((idx+1)%bufferSize,boost::memory_order_release);
- };
- unsigned int GetLastIndex(){return writeIndex.load(boost::memory_order_consume);};
- unsigned int GetSize(){return bufferSize;};
- protected:
- boost::atomic<unsigned int> writeIndex;
- unsigned int bufferSize;
- PacketBuffer* packets;
- };
- #endif
緩衝區類的構造函數與析構函數
- #include "CircledBuffer.h"
- CircledBuffer::CircledBuffer(unsigned int bufSize)
- :bufferSize(bufSize),
- writeIndex(0)
- {
- packets = new PacketBuffer[bufSize];
- }
- CircledBuffer::~CircledBuffer(void)
- {
- delete []packets;
- }