在我接觸多線程編程以來,都是把「多線程」等同於「異步」,使用多線程基本上也都是爲了避免阻塞主線程(如界面),才單獨開一個線程「後臺」運行。最近遇到的狀況是數據分析程序的處理速度跟不上數據採集程序,所以考慮使用多個worker線程並行的處理採集到的數據。嘗試使用OpenMP
,在程序中使用相似於這種代碼git
#pragma omp parallel for for (int i=0;i<6;++i){run();}
可是性能仍是達不到指望,電腦有10多核,開6個線程,卻只能把運行速度提升2~3倍,而且再增長線程也沒有提升速度。github
You’re doing it wrong…編程
決定好好琢磨一下多線程。我這裏主程序是用Qt作的,但願儘可能使用Qt自帶的庫,因此多線程也是使用QThread
,仔細考慮過以後獲得的思路是維護一個線程安全的隊列,而後數據採集線程和多個數據處理線程分別向這個隊列傳入和取出數據,實現併發,也就是所謂的「單生產者、多消費者模式」。安全
線程安全的隊列是這樣的多線程
//dataqueue.h #include <QQueue> #include <QMutexLocker> //thread-safe queue class DataQueue { QMutex m_mutex;QQueue<void*> m_datas; public: DataQueue(); void enqueue(void*);void* dequeue(); }; //dataqueue.cpp #include "dataqueue.h" void DataQueue::enqueue(void *data){ QMutexLocker locker(&m_mutex); m_datas.enqueue(data); } void* DataQueue::dequeue(){ QMutexLocker locker(&m_mutex); return m_datas.empty()?NULL:m_datas.dequeue(); }
基本上就是封裝了一下QQueue
,數據用空指針void*
表示。併發
數據採集線程就直接實例化一個DataQueue
而後調用enqueue
方法便可,數據分析線程部分須要仔細考慮,主要的問題就是當有新數據到來時(加入DataQueue
以後),分析線程是競爭性地去獲取數據仍是被動地被添加數據到線程單獨的數據隊列中,以及當沒有數據時線程(如下所說的「線程」都是數據分析線程)是wait
直到被喚醒仍是sleep
一段時間再次嘗試從DataQueue
取數據。負載均衡
這裏只考慮下面兩種狀況:異步
這種方法須要維護一個「線程池」,做爲數據採集線程的成員變量,而且每一個數據分析線程都要有一個單獨的DataQueue
數據隊列做爲成員變量。性能
當有新數據到來時,要將數據加入某個線程的數據隊列,這個線程的選取有多種方法,能夠遍歷全部的線程,找到數據隊列中數據最少的一個線程,還能夠隨機地抽出一個線程。總之是要讓各個線程的負載比較均勻。線程
當線程的數據隊列爲空時,使用QWaitCondition
使線程wait
,而後當有新數據加入進來時,再wakeOne
。
這種方法的優勢是不會出現太多的鎖競爭,由於不會出現多個線程競爭獲取數據的狀況,但問題是須要主動維護各個線程的負載均衡,比較麻煩。
這種方法比較簡單粗暴,也是我最終選擇的方法。全局地維護一個DataQueue
數據隊列,全部線程(數據採集和分析)都是獨立運行的,採集線程往隊列里加數據,分析線程從隊列裏取數據,當隊列裏沒有數據時,分析線程會sleep
一段時間,而後再從隊列裏取數據。數據分析線程的實現大體是這樣的
void DataProcessor::run(){ setRunningFlag(true); void *buffer=NULL; while(getRunningFlag()){ buffer=g_dataQueue.dequeue(); if(NULL==buffer){msleep(5);continue;} processData(buffer); free(buffer); } } bool DataProcessor::getRunningFlag(){ QMutexLocker locker(&m_mutex); return m_bRunning; } void DataProcessor::setRunningFlag(bool bRun){ QMutexLocker locker(&m_mutex); m_bRunning=bRun; } DataProcessor::DataProcessor(){start();} DataProcessor::~DataProcessor(){ setRunningFlag(false);m_timer.stop();wait(); }
g_dataQueue
爲全部線程(數據採集和分析)均可訪問的一個全局變量,也就是惟一的一個DataQueue
數據隊列。
最後值得一提的是,我沒有在程序裏面使用「內存池」,而是在編譯時使用TCMALLOC,發現性能很是好。