一個項目,要接收 UDP 數據包,解析並獲取其中的數據,主要根據解析出來的行號和序號將數據拼接起來,而後將拼接起來的數據(最重要的數據是 R、G、B 三個通道的像素值)顯示在窗口中。考慮到每秒鐘要接收的數據包的數量較大,Python 的處理速度可能沒有那麼快,並且以前對 Qt 也比較熟悉了,因此用Qt 做爲客戶端接收處理數據包,用近期學習的 Python 模擬發送數據包。python
在 TCP/IP 協議中,UDP 數據包的大小是由限制的,所以用 UDP 傳輸數據時,還要在 UDP 層上再封裝一層自定義的協議。這個自定義的協議比較簡單,每一個 UDP 包的大小爲 1432 個字節,分爲幾個部分:c++
部分 | 起始字節 | 字節長度 | 說明 |
---|---|---|---|
Start | 0 | 4 | 包頭部的 Magic Number,設爲 0x53746172 |
PartialCnt | 4 | 1 | 分包總數,一個字節(0-255)之內 |
PartialIdx | 5 | 1 | 分包序號 |
SampleLine | 6 | 1 | 採樣率 |
RGB | 7 | 1 | rgb 通道標識符 |
LineIdx | 8 | 4 | 行號,每一行能夠包含 RGB 三個通道的數據,每一個通道由多個分包組成 |
ValidDataLen | 12 | 4 | 數據部分有效字節數 |
LineBytes | 16 | 4 | 每行數據包含的字節總數 |
Reserve | 20 | 128 | 保留部分 |
Data | 148 | 1280 | 數據部分 |
end | 1428 | 4 | 包尾部的 Magic Number,設爲 0x54456e64 |
上述表格描述的就是一個完整的 UDP 包。這裏的一個 UDP 數據包包含的是 RGB 某個通道的某一部分的數據。換種說法:git
因此要生成/解析 UDP 包,最重要的是 PartialCnt、PartialIdx、RGB、LineIdx、Data 這幾個部分。清楚了自定義協議就能夠開始編寫模擬包的生成和相應的接收邏輯了。github
因爲本地開發的時候缺乏必要的硬件環境,爲了方便開發,用 Python 編寫一個簡單的 UDPServer,發送模擬生成的數據包。根據上述協議,能夠寫出以下的 CameraData 類來表示 UDP 數據包:ubuntu
# -*- coding: utf-8 -*- DATA_START_MAGIC = bytearray(4) DATA_START_MAGIC[0] = 0x53 # S DATA_START_MAGIC[1] = 0x74 # t DATA_START_MAGIC[2] = 0x61 # a DATA_START_MAGIC[3] = 0x72 # r DATA_END_MAGIC = bytearray(4) DATA_END_MAGIC[0] = 0x54 # T DATA_END_MAGIC[1] = 0x45 # E DATA_END_MAGIC[2] = 0x6e # n DATA_END_MAGIC[3] = 0x64 # d slice_start_magic = slice(0, 4) slice_partial_cnt = 4 slice_partial_idx = 5 slice_sample_line = 6 slice_rgb_extern = 7 slice_line_idx = slice(8, 12) slice_valid_data_len = slice(12, 16) slice_line_bytes = slice(16, 20) slice_resv = slice(20, 148) slice_data = slice(148, 1428) slice_end_magic = slice(1428, 1432) import numpy as np class CameraData(object): def __init__(self): # self.new() # self.rawdata = rawdata self.dataLow = 10 self.dataHigh = 20 self.new() def genRandomByte(self, by=4): r = bytearray(by) for i in range(by): r[i] = np.random.randint(0, 255) def setPackageIdx(self, i = 0): self.rawdata[slice_partial_idx] = i def setRGB(self, c = 1): self.rawdata[slice_rgb_extern] = c def setLineIdx(self, line): start = slice_line_idx.start self.rawdata[start+3] = 0x000000ff & line self.rawdata[start+2] = (0x0000ff00 & line) >> 8 self.rawdata[start+1] = (0x00ff0000 & line) >> 16 self.rawdata[start+0] = (0xff000000 & line) >> 24 def setValidDataLen(self, len): start = slice_valid_data_len.start self.rawdata[start+3] = 0x000000ff & len self.rawdata[start+2] = (0x0000ff00 & len) >> 8 self.rawdata[start+1] = (0x00ff0000 & len) >> 16 self.rawdata[start+0] = (0xff000000 & len) >> 24 def setLineBytes(self, len): start = slice_line_bytes.start self.rawdata[start+3] = 0x000000ff & len self.rawdata[start+2] = (0x0000ff00 & len) >> 8 self.rawdata[start+1] = (0x00ff0000 & len) >> 16 self.rawdata[start+0] = (0xff000000 & len) >> 24 def randomData(self): size = slice_data.stop - slice_data.start arr = np.random.randint(self.dataLow, self.dataHigh, size, dtype=np.uint8) self.rawdata[slice_data] = bytearray(arr) def new(self): """構造新的數據對象 """ self.rawdata = bytearray(1432) self.rawdata[slice_start_magic] = DATA_START_MAGIC self.rawdata[slice_partial_cnt] = 0x02 self.rawdata[slice_partial_idx] = 0x00 self.rawdata[slice_sample_line] = 0x03 self.rawdata[slice_rgb_extern] = 0x01 self.setLineIdx(0x00) self.setValidDataLen(1280) self.setLineBytes(1432) self.randomData() self.rawdata[slice_end_magic] = DATA_END_MAGIC def hex(self): return self.rawdata.hex() def __repr__(self): return '<CameraData@{} hex len: {}>'.format(hex(id(self)), len(self.rawdata))
CameraData 中的 rawdata 是一個 bytearray 對象,它將會被 UdpServer 經過網絡接口發送出去。設置 4 個字節大小的整數時(如寫 LineIdx 行號),不能直接將數值賦到 rawdata 中,要將其中的 4 個字節分別賦值到對應的地址上才行。windows
CameraData 中的 randomData 方法是模擬隨機數據,更好的作法不是徹底隨機給每一個像素點賦值,而是有規律的變化,這樣在接收數據出現問題、分析問題的時候能夠直觀地看到哪裏有問題。數組
而後咱們須要定義一個 UdpServer,用它來將數據對象中包含的信息發送出去。緩存
import socket class UdpServer( object ): """該類功能是處理底層的 UDP 數據包發送和接收,利用隊列緩存全部數據 """ def __init__(self, *args, **kwargs): self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) self._sock.bind( ('', DATA_PORT+11 ) ) self._sock.settimeout( None ) # never timeout # self._sock.setblocking( 0 ) # none block def send_msg( self, msg ): """發送消息, @param msg 字典對象,發送 msg 的 rawdata 字段 """ self._sock.sendto( msg.rawdata, ('192.168.8.1', DATA_PORT))
這個 UdpServer 很是簡單,由於後續會經過這個 UdpServer 不停的發包,可是每次發包必須等待發送端成功將 UDP 包發送出去,這裏不要將 socket 對象設置成非阻塞的,不然程序運行時會出現錯誤提示(儘管能夠忽略掉這個錯誤提示,可是不必設置成非阻塞的,阻塞模式徹底足夠了)。網絡
在 github 中能夠找到完整的 Python 文件,裏面定義了其餘類,如 DataSender
、RGBSender
。DataSender
是在一個線程裏面發送 RGB 三個通道的值,RGBSender
的一個對象只會發送 RGB 三個通道中的某一個的值。多線程
在本地測試的時候,爲了方便在任務管理器中看到網絡佔用率,最初是在 VirtualBox 的 ubuntu 虛擬機上運行這個 Python 程序的,可是受到虛擬機的資源分配和電腦性能影響,調用 singleMain
函數時每秒鐘最多隻能產生 50MB 的數據量。可是在本地非虛擬機環境運行的時候最多能夠達到 80MB 的數據量。因此儘量地使用本地環境運行該 Python 程序能夠最大限度的生成數據包。
若是讓 RGB 三個通道分別在三個不一樣的進程中執行發送過程(註釋掉 singleMain
的調用,換用 multiSend
方法),那麼每秒鐘的數據量可到 200MB,不過 80MB 的數據量已經足夠多了(接近千兆網卡的上限了,網絡利用率太高的話經過網線傳輸時會出現嚴重丟包的狀況),不須要使用 multiSend
方法加大數據量。
在 singleMain 方法中,不直接執行 dataSender.serve()
,而是在新進程中執行,能夠更好的利用多核優點,發送數據更快:
# singleMain() dataSender = DataSender() # dataSender.serve() p = Process(target=dataSender.serve) p.start()
實際開發過程並非這麼順利,由於一開始並不知道在大量數據發送的時候,發送端可否有效地將數據發送出去,其實是邊編寫 Python 的模擬發送數據程序,邊編寫 Qt 獲取數據的程序,根據出現的問題逐步解決發送端和接收端的問題的。
Qt 這邊做爲客戶端,只須要將接收到的數據包保存下來,獲取其中的有效數據,再將 RGB 數據賦到 QImage 對應的像素上顯示出來便可。GUI 部分比較簡單,使用 QWidget 中的 label 控件,將 QImage 轉換成 QPixmap,顯示到 label 上就行了。初始化後的窗口如圖:
比較麻煩的是接收數據和拼接。一樣地,爲了方便表示和解析每一個 UDP 包,咱們構造一些類來存儲這些信息(如今想一想彷佛直接用結構體表示會更簡單)。
咱們在 Qt 中定義 CameraData
類來表示數據包實體:
/** * @brief The CameraData class * 對應從下位機接收到的字節數組的類,原始數據包,須要通過處理後變成一行數據 */ class CameraData : public DataObj { Q_OBJECT public: enum RGBType { R = 1, G = 2, B = 3, UNKOWN = 0 }; static const QByteArray DATA_START_MAGIC; static const QByteArray DATA_END_MAGIC; static const int PacketSize; explicit CameraData(QObject *parent = 0); ~CameraData(); bool isPackageValid(); // 獲取保留區域的數據 QByteArray getReserved(); // 設置原始數據 void setRawData(const QByteArray &value); void setRawData(const char *data); // 獲取數據區域內的全部數據,默認獲取有效數據 QByteArray getData(bool valid = true); int getPackageCntInLine(); int getPackageIdxInLine(); int getSampleDiffLine(); int getRGBExtern(); RGBType getRGBType(); int getLineIdx(); int getValidDataLen(); int getLineBytes(); int sliceToInt(int start, int len = 4); // DataObj interface void reset(); signals: public slots: private: inline QByteArray slice(int start, int len = -1); inline QByteArray getStartMagic(); inline QByteArray getEndMagic(); QByteArray data; int packageCntInLine = -1; int packegeIdxInLine = -1; int lineIdx = -1; int lineBytes = -1; int rgbType = -1; };
CameraData
類繼承自 DataObj
類,而 DataObj
類又繼承自 QObject
,這樣方便進行內存管理和對象上的操做。DataObj
是爲了方便複用對象而定義的基類,詳細代碼可參考 github 上的完整代碼。
C++ 部分的 CameraData
類與 Python 中定義的 CameraData
類是對應的,不過 C++ 部分的 CameraData
類只須要調用 CameraData::setRawData
傳入一個 QByteArray 對象後就能夠自動將其中包含的數據解析出來,而且它只提供獲取數據的接口而不提供修改數據的接口。
另外咱們還須要定義一個類 PreProcessData,來表示一行數據:
/** * @brief The PreProcessData class * 預處理數據 */ class PreProcessData: public DataObj { Q_OBJECT public: static const int PacketSize; static const int PacketPerLine; explicit PreProcessData(QObject *parent = 0, int line = -1); void put(CameraData *cd); bool isReady(); void reset(); int line() const; void setLine(int line); const QByteArrayList &getDataList() const; QByteArray repr(); private: /** * @brief cameraData * 每 2 個 CameraData 構成一行的單通道數據,有序存放 RGB 通道數據 * 0-1 存放 R,2-3 存放 G, 4-5 存放 B */ QByteArrayList dataList; int m_line; int m_readyCount = 0; int m_duplicateCount = 0; bool *dataPlaced = 0; };
目前的協議中,每 2 個數據包(對應 2 個 CameraData
對象)構成某一行的單通道數據,因此 PreProcessData
中至少會包含 6 個 CameraData
對象,處理完 CameraData
對象後,只須要存儲 Data 部分便可,因此這裏沒有用 QList
QByteArrayList
來存儲數據。當三個通道的數據都準備好後,
PreProcessData::isReady
就會返回 true,表示該行數據已經準備好,能夠顯示在窗口中。
咱們定義一個 Controller
類用來操做數據接收對象和子線程。用 Qt 的事件槽機制和 QObject::moveToThread
實現多線程很是方便,不重寫 QThread 的 run 方法就可讓對象的方法在子線程中執行。
class Controller : public QObject { Q_OBJECT public: explicit Controller(QObject *parent = 0); ~Controller(); static const int DataPort; static const int CONTROL_PORT; static const QStringList BOARD_IP; void start(); void stop(); DataProcessor *getDataProcessor() const; signals: public slots: private: CameraDataReceiver *cdr; QThread recvThread; QThread recvProcessThread; QByteArrayList rawdataList; DataProcessor *dp = 0; QTimer *statsTimer; int statsInterval; };
其中 CameraDataReceiver
對象會被實例化,在子線程中接收 UDP 數據包(由於發送和接收數據的端口是不一樣的,操做和數據是分離的)。這裏將 DataProcessor 經過 getDataProcessor
暴露給上層應用,以便上層應用鏈接信號槽接收圖像。僅到接收數據,就用到了三個線程:分別是 GUI 線程,用於接收 UDP 包的 recvThread 線程和處理 UDP 的 recvProcessThread。
爲何接收 UDP 包和處理 UDP 包不是放在一個線程中執行呢?由於這裏的數據量實在太多,最開始實現的時候這兩個邏輯代碼確實是在同一個線程中執行,然而因爲處理數據的代碼執行起來也要消耗時間,將會致使沒法接收其餘的 UDP 包,這樣的話就會致使比較嚴重的丟包。爲了保證接收端不會丟包,只好將處理邏輯放在其餘的線程中執行。
將接收數據和處理數據放在不一樣的線程中執行,確實能夠解決丟包問題了,可是會出現新的問題:接收到的包若是不可以及時處理完,而且釋放掉相應的資源,那麼可能會出現程序將數據緩存下來但沒法處理,程序佔用的內存愈來愈大,致使程序運行起來愈來愈慢。
在編寫程序時誤覺得是 Qt 的事件循環機制過慢致使程序處理不了那麼多數據(實際上它的速度足夠處理這些數據),所以將程序中使用的 QUdpSocket 對象換成了 [Windows 平臺的 Socket 通訊代碼][winsock demo],並將其改寫成類方便調用。其實是在 QThread 子線程中無限循環地運行 recvfrom(clientSocket, recvedData.data(), recvbuflen, 0, &fromaddr, &addrLen);
這樣的接收數據包函數,跳過了 Qt 事件循環機制,而後當接收到包以後再經過回調函數通知數據處理線程進行處理。
但當我寫這篇博客,從新用正常的代碼進行測試時,發現即使使用 QUdpSocket::readyRead
信號來接收 UDP 數據,只要數據處理進程不堆積數據,就不會出現佔用內存愈來愈多的狀況。換句話說,不是 Qt 沒法處理實時性的數據,而是本身編寫的代碼裏面有問題。
回想最開始寫的程序,在處理 QByteArray 表示的原始數據時,會爲每個接收到的數據包分配地址,並且分配的地址位於堆中。而實際上在堆 heap 中分配回收內存地址相較於在棧 stack 中是慢得多的。爲每一個到來的數據用 new 構造一個新的 CameraData 對象,而後在處理完後將這個 CameraData delete 掉實際上是很慢的,若是你這樣作了,而且你在 CameraData 的析構函數中加上 qDebug 語句打印 "CameraData is deleting...",你會發現,當發送方(咱們的 Python 模擬發送程序)中止發送數據包後很長一段時間內,Qt 程序在一直打印着 "CameraData is deleting"。
而我最開始就是這麼作的,因此發生了 Qt 程序隨着數據接收的變多,佔用的內存愈來愈大的狀況。固然,這不排除 qDebug 語句輸出到控制檯上也會佔用不少時間。若是每秒鐘要調用上萬次 qDebug() << "CameraData is deleting"
,那麼建議你使用一個計數變量控制 qDebug 的調用次數,由於這條語句的調用也會讓數據處理變得緩慢。
爲了讓接收端不丟包,須要快速的處理接收到的 UDP 包,而且在處理的代碼中不要調用耗時的函數或者 new 操做。爲了不重複調用 new 和 delete 操做符,咱們須要構建一個對象池,以便複用池中的對象,減小 new 操做。池的定義比較簡單,封裝一個 QList
容器類就行了,爲了簡化和複用池的代碼,我用到了 c++ 的 template 特性,可是這個 DataObjPool
中的容器只能是 DataObj 的子類:
template<class T> class DataObjPool { public: virtual ~DataObjPool() { qDeleteAll(pool); numAvailable = 0; } T *getAvailable() { if( numAvailable == 0 ) { return 0; } for(int i = 0; i < pool.size(); i++) { T *item = pool[i]; if(item->isValid()) { item->setValid(false); numAvailable -= 1; return item; } } return 0; } T *get(int id) { return pool[id]; } inline bool release(T *dobj) { dobj->reset(); numAvailable += 1; return true; } int releaseTimeout(int now, int timeout = 100) { int releaseCount = 0; for(int i = 0; i < pool.size(); i++) { T *item = pool[i]; if(now > item->getGenerateMs() + timeout) { item->reset(); numAvailable += 1; releaseCount += 1; } } return releaseCount; } void releaseAll() { for(int i = 0; i < pool.size(); i++) { T *item = pool[i]; if(item->isValid()) { continue; } item->reset(); numAvailable += 1; } } int getNumAvailable() const { return numAvailable; } template<class T2> operator DataObjPool<T2>(); protected: DataObjPool(int size = 100); private: QList<T *> pool; int numAvailable = 0; }; class RawDataObjPool: public DataObjPool<CameraData> { public: RawDataObjPool(int size = 100); }; class LineDataPool : public DataObjPool<PreProcessData> { public: LineDataPool(int size = 100); };
固然你也能夠直接編寫兩個類 RawDataObjPool
和 LineDataPool
,把池的操做分別複製到兩個類中,使用模板特化的好處是改動的時候不須要改動兩個類了。前面說過,DataObj
類繼承自 QObject
,就是爲了簡化在對象池中進行的操做。DataObjPool
會在構造時在內存中預分配必定數量的對象,以 RawDataObjPool
爲例,構造時傳入 size 參數,便會預先在內存中建立 size 個 CameraData,在程序運行過程當中,這些對象都會被咱們這個 Qt 程序循環利用,直到關閉程序纔會釋放掉這些 CameraData(若是操做系統的內存不足,過多的對象佔用的內存仍是會被釋放)。
對象池的主要接口有兩個:getAvailable
和 release
分別用於獲取可用的對象或釋放掉池中的對象,注意這裏的釋放是讓對象池對該對象進行標記,以便重複使用,而不是釋放掉該對象佔用的內存空間或 delete 掉。當對象池中無可用對象時,能夠根據須要釋放掉超時的對象或者釋放掉所有對象。
使用對象池減小 new 操做符的使用後,處理數據的子線程的速度明顯加快。正常狀況下就能夠看到以下的圖片:
這裏數據顯示的部分還有待完善,由於發送端的發送數據大小不夠湊成一行,因此圖片的右側部分是空白的。
這裏說一下數據的複製,從 Socket 接口中傳上來的數據,咱們用 QByteArray
對象保存了底層的數據,即使在 UDP 數據包中含有不少個 \x00
這樣的數據,QByteArray 也會正確識別出字符串的結束位置。
在設置 CameraData::setRawData(const QByteArray &value)
函數中,儘可能避免手動調用 memcpy(data.data(), value, value.size());
這個底層 API,由於你不知道它會將 QByteArray 對象 CameraData.data
中的 char * data()
指針指向哪一個位置。
我在 CameraData.cpp
文件中將它註釋掉了,由於在程序運行和調試時它給我帶來了巨大的困惑:常常出現 invalid address specified to rtlvalidateheap
這種類型的錯誤。通過很長時間的排查後發現註釋掉這行代碼,程序就能一直穩定運行。
完整的項目代碼能夠在 github 中找到。