最近在看asio
相關的東西,看到有人說:html
只要稍微瞭解 Asio 的人都知道,在一個 io_service 上開幾個線程後, 接下來就只要簡單的使用 io_service.post() 便可投遞一個閉包給線程去執行的. 這是一個自然的線程池.還能夠同 io 操做複用你的線程.中止發明垃圾的線程池實現吧.c++
以前也參照c++11的語法寫了一個簡單的線程池,爲了讓開啓的線程在沒有任務跑的時候睡眠,主要用到了同步原語
中的互斥信號量和條件變量(任務來了發送條件變量用於喚醒線程),實現起來還挺麻煩.程序員
asio做爲異步輸入輸出庫,全部對象都提供了異步調用的函數,若是asio
能夠實現,有一點能夠確定,線程池中的異步處理基本上不用程序員關注了, 到底有多簡單呢. 官方給出了基於boost::asio的實現,相應的線程/bind等組件我替換成了c++11(實際上隨着c++11/14大量引入boost的東西,最新的asio已經能夠不依賴boost的頭文件了,boostsystem等連接庫仍是須要的),我來學習記錄一下加深理解.閉包
io_service
的方法run()
能夠阻塞的等待在io_service上的異步操做,異步操做所有執行完以後結束, 那麼使用io_service::work
控制run()使其持續等待由應用投遞過來的任務,保證線程持續運行,同時開啓N個線程來幹這個活兒,也就是保證了線程資源只有一次申請和釋放. 用到的boost::asio
類/操做以下:異步
io_service
之上任務的起始和結束,聲明即開始,做用域結束後自動釋放,告訴io_service任務都結束了. 不過對於線程池的實現來講,只用了構造函數,告訴io_service要持續運行,爲了讓client能夠主動釋放,這裏沒有使用work的析構函數,而是調用了io_service提供的stop()方法,讓run()結束進而釋放線程資源.代碼以下:async
using namespace std; using namespace boost; typedef std::shared_ptr<std::thread> thread_ptr; typedef std::vector<thread_ptr> vecThread; class ThreadPool { public: ThreadPool(int num) : threadNum_(num), stopped_(false), work_(io_) { for(int i=i; i<threadNum_; ++i) { threads_.push_back(std::make_shared<std::thread>([&](){io_.run();})); } } ~ThreadPool() { stop(); } template<typename F, typename...Args> void post(F &&f, Args&&...args) { io_.post(std::bind(std::forward<F>(f), std::forward<Args>(args)...)); } void stop() { if(!stopped_) { io_.stop(); for(auto t : threads_) t->join(); stopped_ = true; } } private: bool stopped_; vecThread threads_; int threadNum_; asio::io_service io_; asio::io_service::work work_; };
首先,線程池的構造函數中先根據入參(線程池個數)啓動N個線程,每一個線程池調用io_service的run方法,在調用以前,初始化了asio::io_service::work
,意味着全部線程run方法在work沒有析構或者io_service沒有主動結束的時候必定持續等待運行任務.函數
// 在io_service run以前,要用同一個io_service初始化work. 因爲std::bind和boost::bind實現方式不同(boost:bind支持了不少重載),使用std::bind會編譯報錯,所以用lambda代替 threads_.push_back(std::make_shared<std::thread>([&](){io_.run();}));
此時,線程池已經啓動,只須要封裝post調用,便可在應用側灌入任何類型的異步操做.post
template<typename F, typename...Args> void post(F &&f, Args&&...args) { io_.post(std::bind(std::forward<F>(f), std::forward<Args>(args)...)); }
最後提供客戶端中止線程池的接口,全部io_service將結束線程,線程池資源釋放:學習
void stop() { if(!stopped_) { io_.stop(); for(auto t : threads_) t->join(); stopped_ = true; } }
void test1(int x) {std::cout<<"test 1:"<<x<<std::endl;} void test2(int y) {std::cout<<"test 2:"<<y<<std::endl;} int main() { ThreadPool threads(5); threads.post([](){std::cout<<"test 1"<<std::endl;}); threads.post([](){std::cout<<"test 2"<<std::endl;}); threads.post(test1, 3); threads.post(test2, 5); std::this_thread::sleep_for(2s); threads.stop(); }
A thread pool for executing arbitray tasks
https://stackoverflow.com/que...this