這是一個簡單的C++11實現的線程池,代碼很簡單。
原理就是管理一個任務隊列和一個工做線程隊列。
工做線程不斷的從任務隊列取任務,而後執行。若是沒有任務就等待新任務的到來。添加新任務的時候先添加到任務隊列,而後通知任意(條件變量notify_one)一個線程有新的任務來了。git
源代碼來自https://github.com/progschj/ThreadPoolgithub
#ifndef THREAD_POOL_H #define THREAD_POOL_H #include <vector> #include <queue> #include <memory> #include <thread> #include <mutex> #include <condition_variable> #include <future> #include <functional> #include <stdexcept> // 線程池類 class ThreadPool { public: // 構造函數,傳入線程數 ThreadPool(size_t threads); // 入隊任務(傳入函數和函數的參數) template<class F, class... Args> auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>; // 一個最簡單的函數包裝模板能夠這樣寫(C++11)適用於任何函數(變參、成員均可以) // template<class F, class... Args> // auto enqueue(F&& f, Args&&... args) -> decltype(declval<F>()(declval<Args>()...)) // { return f(args...); } // C++14更簡單 // template<class F, class... Args> // auto enqueue(F&& f, Args&&... args) // { return f(args...); } // 析構 ~ThreadPool(); private: // need to keep track of threads so we can join them // 工做線程組 std::vector< std::thread > workers; // 任務隊列 std::queue< std::function<void()> > tasks; // synchronization 異步 std::mutex queue_mutex; // 隊列互斥鎖 std::condition_variable condition; // 條件變量 bool stop; // 中止標誌 }; // the constructor just launches some amount of workers // 構造函數僅啓動一些工做線程 inline ThreadPool::ThreadPool(size_t threads) : stop(false) { for(size_t i = 0;i<threads;++i) // 添加線程到工做線程組 workers.emplace_back( // 與push_back類型,但性能更好(與此相似的還有emplace/emlace_front) [this] { // 線程內不斷的從任務隊列取任務執行 for(;;) { std::function<void()> task; { // 拿鎖(獨佔全部權式) std::unique_lock<std::mutex> lock(this->queue_mutex); // 等待條件成立 this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); }); // 執行條件變量等待的時候,已經拿到了鎖(即lock已經拿到鎖,沒有阻塞) // 這裏將會unlock釋放鎖,其餘線程能夠繼續拿鎖,但此處任然阻塞,等待條件成立 // 一旦收到其餘線程notify_*喚醒,則再次lock,而後進行條件判斷 // 當[return this->stop || !this->tasks.empty()]的結果爲false將阻塞 // 條件爲true時候解除阻塞。此時lock依然爲鎖住狀態 // 若是線程池中止或者任務隊列爲空,結束返回 if(this->stop && this->tasks.empty()){ return; } // 取得任務隊首任務(注意此處的std::move) task = std::move(this->tasks.front()); // 從隊列移除 this->tasks.pop(); } // 執行任務 task(); } } ); } // add new work item to the pool // 添加一個新的工做任務到線程池 template<class F, class... Args> auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> { using return_type = typename std::result_of<F(Args...)>::type; // 將任務函數和其參數綁定,構建一個packaged_task auto task = std::make_shared< std::packaged_task<return_type()> >( std::bind(std::forward<F>(f), std::forward<Args>(args)...) ); // 獲取任務的future std::future<return_type> res = task->get_future(); { // 獨佔拿鎖 std::unique_lock<std::mutex> lock(queue_mutex); // don't allow enqueueing after stopping the pool // 不容許入隊到已經中止的線程池 if(stop){ throw std::runtime_error("enqueue on stopped ThreadPool"); } // 將任務添加到任務隊列 tasks.emplace([task](){ (*task)(); }); } // 發送通知,喚醒某一個工做線程取執行任務 condition.notify_one(); return res; } // the destructor joins all threads inline ThreadPool::~ThreadPool() { { // 拿鎖 std::unique_lock<std::mutex> lock(queue_mutex); // 中止標誌置true stop = true; } // 通知全部工做線程,喚醒後由於stop爲true了,因此都會結束 condition.notify_all(); // 等待全部工做線程結束 for(std::thread &worker: workers){ worker.join(); } } #endif