最近須要對採集的數據作實時處理,而且不止作一種處理,而數據採集是由一個單獨的線程負責的,所以能夠稱做單生產者多消費者。要求是速度要儘可能的快,不能阻塞數據採集線程,而數據處理線程並不要求處理全部的數據,能夠以比較低的速率或者採樣率來處理實時的數據。數組
所以設計了一種數據結構,總體上是對一個定長的數組進行封裝,數組中的元素是所謂的「內存單元」數據結構
struct MemoryMutexUnit{ QReadWriteLock mutex; void *buffer; };
這裏是用Qt的數據結構實現,這個內存單元由一個讀寫鎖和一個指針組成,爲每一個指針加個單獨的鎖的緣由是爲了讀寫分離,在大多數狀況下,數據採集線程和數據讀取線程使用的並非同一個內存單元,所以能夠避免出現有線程等待鎖釋放的狀況,至少在大多數的狀況下是如此的。函數
接下來就是對內存單元的數組進行封裝,並提供接口給外部使用。oop
static const int BUF_LEN=2048*2048; class MemoryLoopMutex { const int m_capacity; QAtomicInt m_index; MemoryMutexUnit *m_units; public: MemoryLoopMutex(); bool addBuffer(void*); bool copyBuffer(void*); void clearBuffer(); };
稱之爲"MemoryLoop"是由於咱們會環狀的遍歷內存單元數組m_units
,索引m_index
指向的是最新可讀的內存單元,數據採集線程會從索引m_index
開始順序的查找下一個可寫的內存單元,若是到了數組最後一個元素就再從第一個元素開始,所以是環狀的遍歷。一個內存單元可寫是指其中的讀寫鎖能夠LockForWrite
,若是有線程正在讀這個內存單元,那麼這個單元就是不可寫的。ui
具體實現以下線程
MemoryLoopMutex::MemoryLoopMutex(): m_capacity(100),m_index(-1) { m_units=new MemoryMutexUnit[m_capacity]; for(int i=0;i<m_capacity;i++){ MemoryMutexUnit *pUnit=m_units+i; pUnit->buffer=malloc(BUF_LEN); } }
構造函數做用就是初始化每一個內存單元。設計
bool MemoryLoopMutex::addBuffer(void *buffer){ int index=m_index.loadAcquire(); if(index<0){index=0;} int tryIndex=(index+1)%m_capacity; while(tryIndex!=index){ MemoryMutexUnit *pUnit=m_units+tryIndex; if(pUnit->mutex.tryLockForWrite(1)){ memcpy(pUnit->buffer,buffer,BUF_LEN); pUnit->mutex.unlock(); m_index.storeRelease(tryIndex); return true; } tryIndex=(tryIndex+1)%m_capacity; qDebug()<<"another try"<<tryIndex<<m_capacity; } return false; }
將數據採集線程獲得的緩衝區內存指針memcpy
到某個內存單元中,while
部分就是環狀的遍歷全部的內存單元,直到找到一個可寫的單元。指針
bool MemoryLoopMutex::copyBuffer(void *buffer){ int index=m_index.loadAcquire(); if(index<0||index>=m_capacity){return false;} MemoryMutexUnit *pUnit=m_units+index; if(!pUnit->mutex.tryLockForRead(1)){return false;} memcpy(buffer,pUnit->buffer,BUF_LEN); pUnit->mutex.unlock(); return true; }
上面的這段代碼負責的是內存讀取功能,就是將最新可讀的內存單元中的內容memcpy
出來。code
最後就是「內存清空」功能了,咱們這裏沒有進行內存釋放,所謂的內存清空只是再也不容許數據被讀取。經過設置索引爲負值來實現。索引
void MemoryLoopMutex::clearBuffer(){ m_index.storeRelease(-1); }