C++ 多線程編程總結

C++ 多線程編程總結

         在開發C++程序時,通常在吞吐量、併發、實時性上有較高的要求。設計C++程序時,總結起來能夠從以下幾點提升效率:html

  • l  併發
  • l  異步
  • l  緩存

下面將我日常工做中遇到一些問題例舉一二,其設計思想無非以上三點。c++

1任務隊列

1.1    以生產者-消費者模型設計任務隊列

  生產者-消費者模型是人們很是熟悉的模型,好比在某個服務器程序中,當User數據被邏輯模塊修改後,就產生一個更新數據庫的任務(produce),投遞給IO模塊任務隊列,IO模塊從任務隊列中取出任務執行sql操做(consume)。sql

  設計通用的任務隊列,示例代碼以下:數據庫

  詳細實現可參見:編程

  http://ffown.googlecode.com/svn/trunk/fflib/include/detail/task_queue_impl.hcentos

void  task_queue_t::produce( const  task_t& task_) {      
         lock_guard_t lock(m_mutex);
         if  (m_tasklist->empty()){ //! 條件知足喚醒等待線程
             m_cond. signal ();
         }
         m_tasklist->push_back(task_);
     }
int    task_queue_t::comsume(task_t& task_){
         lock_guard_t lock(m_mutex);
         while  (m_tasklist->empty()) //! 當沒有做業時,就等待直到條件知足被喚醒{
             if  ( false  == m_flag){
                 return  -1;
             }
             m_cond.wait();
         }
         task_ = m_tasklist->front();
         m_tasklist->pop_front();
         return  0;
}

1.2    任務隊列使用技巧

1.2.1 IO 與 邏輯分離

  好比網絡遊戲服務器程序中,網絡模塊收到消息包,投遞給邏輯層後當即返回,繼續接受下一個消息包。邏輯線程在一個沒有io操做的環境下運行,以保障實時性。示例:緩存

void  handle_xx_msg( long  uid, const  xx_msg_t& msg){
     logic_task_queue->post(boost::bind(&servie_t::proces, uid, msg));
}

  注意,此模式下爲單任務隊列,每一個任務隊列單線程。安全

1.2.2  並行流水線

         上面的只是完成了io 和 cpu運算的並行,而cpu中邏輯操做是串行的。在某些場合,cpu邏輯運算部分也可實現並行,如遊戲中用戶A種菜和B種菜兩種操做是徹底能夠並行的,由於兩個操做沒有共享數據。最簡單的方式是A、B相關的操做被分配到不一樣的任務隊列中。示例以下:    服務器

void  handle_xx_msg( long  uid, const  xx_msg_t& msg) {
  logic_task_queue_array[uid % sizeof (logic_task_queue_array)]->post(
    boost::bind(&servie_t::proces, uid, msg));
}

注意,此模式下爲多任務隊列,每一個任務隊列單線程。網絡

1.2.3 鏈接池與異步回調

  好比邏輯Service模塊須要數據庫模塊異步載入用戶數據,並作後續處理計算。而數據庫模塊擁有一個固定鏈接數的鏈接池,當執行SQL的任務到來時,選擇一個空閒的鏈接,執行SQL,並把SQL 經過回調函數傳遞給邏輯層。其步驟以下:

  • n  預先分配好線程池,每一個線程建立一個鏈接到數據庫的鏈接
  • n  爲數據庫模塊建立一個任務隊列,全部線程都是這個任務隊列的消費者
  • n  邏輯層想數據庫模塊投遞sql執行任務,同時傳遞一個回調函數來接受sql執行結果

  示例以下:

void  db_t:load( long  uid_, boost::function< void  (user_data_t&) func_){
     //! sql execute, construct user_data_t user
     func_(user)
}
void  process_user_data_loaded(user_data_t&){
     //! todo something
}
db_task_queue->post(boost::bind(&db_t:load, uid, func));

         注意,此模式下爲單任務隊列,每一個任務隊列多線程。

2. 日誌

         本文主要講C++多線程編程,日誌系統不是爲了提升程序效率,可是在程序調試、運行期排錯上,日誌是無可替代的工具,相信開發後臺程序的朋友都會使用日誌。常見的日誌使用方式有以下幾種:

  • n  流式,如logstream << "start servie time[%d]" << time(0) << " app name[%s]" << app_string.c_str() << endl;
  • n  Printf 格式如:logtrace(LOG_MODULE, "start servie time[%d] app name[%s]", time(0), app_string.c_str());

  兩者各有優缺點,流式是線程安全的,printf格式格式化字符串會更直接,但缺點是線程不安全,若是把app_string.c_str() 換成app_string (std::string),編譯被經過,可是運行期會crash(若是運氣好每次都crash,運氣很差偶爾會crash)。我我的鐘愛printf風格,能夠作以下改進:

  • l  增長線程安全,利用C++模板的traits機制,能夠實現線程安全。示例:
template < typename  ARG1>
void  logtrace( const  char * module, const  char * fmt, ARG1 arg1){
     boost::format s(fmt);
     f % arg1;
}

  這樣,除了標準類型+std::string 傳入其餘類型將編譯不能經過。這裏只列舉了一個參數的例子,能夠重載該版本支持更多參數,若是你願意,能夠支持9個參數或更多。

  • l  爲日誌增長顏色,在printf中加入控制字符,能夠再屏幕終端上顯示顏色,Linux下示例:printf("\033[32;49;1m [DONE] \033[39;49;0m") 

  更多顏色方案參見:

     http://hi.baidu.com/jiemnij/blog/item/d95df8c28ac2815cb219a80e.html

  • l  每一個線程啓動時,都應該用日誌打印該線程負責什麼功能。這樣,程序跑起來的時候經過top –H – p pid 能夠得知那個功能使用cpu的多少。實際上,個人每行日誌都會打印線程id,此線程id非pthread_id,而實際上是線程對應的系統分配的進程id號。

3. 性能監控

         儘管已經有不少工具能夠分析c++程序運行性能,可是其大部分仍是運行在程序debug階段。咱們須要一種手段在debug和release階段都能監控程序,一方面得知程序瓶頸之所在,一方面儘早發現哪些組件在運行期出現了異常。

         一般都是使用gettimeofday 來計算某個函數開銷,能夠精確到微妙。能夠利用C++的肯定性析構,很是方便的實現獲取函數開銷的小工具,示例以下

複製代碼
struct profiler{
    profiler(const char* func_name){
        gettimeofday(&tv, NULL);
        m_func_name=func_name;
    }
    ~profiler(){
        struct timeval tv2;
        gettimeofday(&tv2, NULL);
        long cost = (tv.tv_sec - tv.tv_sec) * 1000000 + (tv.tv_usec - tv.tv_usec);
        //! post to some manager
    }
    struct timeval tv;
    const char * m_func_name;
};
#define PROFILER() profiler ____profiler_instance##__LINE__(__FUNCTION__)
複製代碼

 

         Cost 應該被投遞到性能統計管理器中,該管理器定時講性能統計數據輸出到文件中。

4 Lambda 編程

使用foreach 代替迭代器

         不少編程語言已經內建了foreach,可是c++尚未。因此建議本身在須要遍歷容器的地方編寫foreach函數。習慣函數式編程的人應該會很是鍾情使用foreach,使用foreach的好處多多少少有些,如:

         http://www.cnblogs.com/chsword/archive/2007/09/28/910011.html
         但主要是編程哲學上層面的。

示例:

void  user_mgr_t::foreach(boost::function< void  (user_t&)> func_){
     for  (iterator it = m_users.begin(); it != m_users.end() ++it){
         func_(it->second);
     }
}

  好比要實現dump 接口,不須要重寫關於迭代器的代碼

void  user_mgr_t:dump(){
     struct  lambda {
         static  void  print(user_t& user){
             //! print(tostring(user);
         }
     };
     this ->foreach(lambda::print);
}

  實際上,上面的代碼變通的生成了匿名函數,若是是c++ 11 標準的編譯器,本能夠寫的更簡潔一些:

  this->foreach([](user_t& user) {} );

  可是我大部分時間編寫的程序都要運行在centos 上,你知道嗎它的gcc版本是gcc 4.1.2, 因此大部分時間我都是用變通的方式使用lambda函數。

Lambda 函數結合任務隊列實現異步

  常見的使用任務隊列實現異步的代碼以下:

void  service_t:async_update_user( long  uid){
     task_queue->post(boost::bind(&service_t:sync_update_user_impl, this , uid));
}
void  service_t:sync_update_user_impl( long  uid){
     user_t& user = get_user(uid);
     user.update()
}

  這樣作的缺點是,一個接口要響應的寫兩遍函數,若是一個函數的參數變了,那麼另外一個參數也要跟着改動。而且代碼也不是很美觀。使用lambda可讓異步看起來更直觀,彷彿就是在接口函數中馬上完成同樣。示例代碼:

void  service_t:async_update_user( long  uid){
     struct  lambda {
         static  void  update_user_impl(service_t* servie, long  uid){
             user_t& user = servie->get_user(uid);
             user.update();
         }
     };
     task_queue->post(boost::bind(&lambda:update_user_impl, this , uid));
}

  這樣當要改動該接口時,直接在該接口內修改代碼,很是直觀。

5. 奇技淫巧

利用shared_ptr 實現map/reduce

         Map/reduce的語義是先將任務劃分爲多個任務,投遞到多個worker中併發執行,其產生的結果經reduce彙總後生成最終的結果。Shared_ptr的語義是什麼呢?當最後一個shared_ptr析構時,將會調用託管對象的析構函數。語義和map/reduce過程很是相近。咱們只需本身實現講請求劃分多個任務便可。示例過程以下:

  • l  定義請求託管對象,加入咱們須要在10個文件中搜索「oh nice」字符串出現的次數,定義託管結構體以下:
struct  reducer{
     void  set_result( int  index, long  result) {
         m_result[index] = result;
     }
     ~reducer(){
         long  total = 0;
         for  ( int  i = 0; i < sizeof (m_result); ++i){
             total += m_result[i];
         }
         //! post total to somewhere
     }
     long  m_result[10];
};
  • l  定義執行任務的 worker
void  worker_t:exe( int  index_, shared_ptr<reducer> ret) {
  ret->set_result(index, 100);
}
  • l  將任務分割後,投遞給不一樣的worker
shared_ptr<reducer> ret( new  reducer());
for  ( int  i = 0; i < 10; ++i)
{
     task_queue[i]->post(boost::bind(&worker_t:exe, i, ret));
}
相關文章
相關標籤/搜索