使用boost::asio實現簡易線程池

最近在看asio相關的東西,看到有人說:html

只要稍微瞭解 Asio 的人都知道,在一個 io_service 上開幾個線程後, 接下來就只要簡單的使用 io_service.post() 便可投遞一個閉包給線程去執行的. 這是一個自然的線程池.還能夠同 io 操做複用你的線程.中止發明垃圾的線程池實現吧.c++

以前也參照c++11的語法寫了一個簡單的線程池,爲了讓開啓的線程在沒有任務跑的時候睡眠,主要用到了同步原語中的互斥信號量和條件變量(任務來了發送條件變量用於喚醒線程),實現起來還挺麻煩.程序員

asio做爲異步輸入輸出庫,全部對象都提供了異步調用的函數,若是asio能夠實現,有一點能夠確定,線程池中的異步處理基本上不用程序員關注了, 到底有多簡單呢. 官方給出了基於boost::asio的實現,相應的線程/bind等組件我替換成了c++11(實際上隨着c++11/14大量引入boost的東西,最新的asio已經能夠不依賴boost的頭文件了,boostsystem等連接庫仍是須要的),我來學習記錄一下加深理解.閉包

asio實現思路

io_service的方法run()能夠阻塞的等待在io_service上的異步操做,異步操做所有執行完以後結束, 那麼使用io_service::work控制run()使其持續等待由應用投遞過來的任務,保證線程持續運行,同時開啓N個線程來幹這個活兒,也就是保證了線程資源只有一次申請和釋放. 用到的boost::asio類/操做以下:異步

  • io_service::work: 用於控制運行於io_service之上任務的起始和結束,聲明即開始,做用域結束後自動釋放,告訴io_service任務都結束了. 不過對於線程池的實現來講,只用了構造函數,告訴io_service要持續運行,爲了讓client能夠主動釋放,這裏沒有使用work的析構函數,而是調用了io_service提供的stop()方法,讓run()結束進而釋放線程資源.
  • io_service::post: 投遞線程待運行的任務.

簡易線程池實現

代碼以下:async

using namespace std;
 using namespace boost;
 typedef std::shared_ptr<std::thread> thread_ptr;
 typedef std::vector<thread_ptr> vecThread;
 
 class ThreadPool {
 public:
     ThreadPool(int num) : threadNum_(num), stopped_(false), work_(io_) {
         for(int i=i; i<threadNum_; ++i) {
             threads_.push_back(std::make_shared<std::thread>([&](){io_.run();}));
         }
     }   
     ~ThreadPool() {
         stop();  
     }   
     template<typename F, typename...Args>
     void post(F &&f, Args&&...args) {
         io_.post(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
     }   
     void stop() {
         if(!stopped_) {
             io_.stop();    
             for(auto t : threads_) t->join();
             stopped_ = true;
         }
     }   
 
 private:
     bool             stopped_;
     vecThread        threads_;
     int              threadNum_;
     asio::io_service io_;
     asio::io_service::work work_;
 };

首先,線程池的構造函數中先根據入參(線程池個數)啓動N個線程,每一個線程池調用io_service的run方法,在調用以前,初始化了asio::io_service::work,意味着全部線程run方法在work沒有析構或者io_service沒有主動結束的時候必定持續等待運行任務.函數

// 在io_service run以前,要用同一個io_service初始化work. 因爲std::bind和boost::bind實現方式不同(boost:bind支持了不少重載),使用std::bind會編譯報錯,所以用lambda代替
threads_.push_back(std::make_shared<std::thread>([&](){io_.run();}));

此時,線程池已經啓動,只須要封裝post調用,便可在應用側灌入任何類型的異步操做.post

template<typename F, typename...Args>
         void post(F &&f, Args&&...args) {
             io_.post(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
         }

最後提供客戶端中止線程池的接口,全部io_service將結束線程,線程池資源釋放:學習

void stop() {  
         if(!stopped_) {      
             io_.stop();         
             for(auto t : threads_) t->join(); 
             stopped_ = true;     
         }
     }

使用

void test1(int x) {std::cout<<"test 1:"<<x<<std::endl;}
 void test2(int y) {std::cout<<"test 2:"<<y<<std::endl;}
 
 int main()
 {
     ThreadPool threads(5);
     threads.post([](){std::cout<<"test 1"<<std::endl;});
     threads.post([](){std::cout<<"test 2"<<std::endl;});
     threads.post(test1, 3); 
     threads.post(test2, 5); 
 
     std::this_thread::sleep_for(2s);
     threads.stop();
 }

參考

A thread pool for executing arbitray tasks
https://stackoverflow.com/que...this

相關文章
相關標籤/搜索