版權聲明:轉載著名出處 https://blog.csdn.net/gcola007/article/details/78750220html
背景
剛粗略看完一遍c++ primer第五版,一直在找一些c++小項目練手,實驗樓裏面有不少項目,可是會員太貴了,學生黨就只能google+github自行搜索完成項目了。注:本文純提供本身的理解,代碼徹底照抄,有想法的歡迎評論留言一塊兒討論。ios
本文參考:
- c++11線程池實現
- A simple C++11 Thread Pool implementation
- 詳解:https://blog.csdn.net/GavinGreenson/article/details/72770818
涉及到的c++11的特性:
- std::vector
- std::thread
- std::mutex
- std::future
- std::condition_variable
線程池原理介紹
線程池是一種多線程處理形式,處理過程當中將任務添加到隊列,而後在建立線程後自動啓動這些任務。線程池線程都是後臺線程。每一個線程都使用默認的堆棧大小,以默認的優先級運行,並處於多線程單元中。c++
線程池的組成部分:
- 線程池管理器(ThreadPoolManager):用於建立並管理線程池
- 工做線程(WorkThread): 線程池中線程
- 任務接口(Task):每一個任務必須實現的接口,以供工做線程調度任務的執行。
- 任務隊列:用於存放沒有處理的任務。提供一種緩衝機制。
代碼
#ifndef ThreadPool_h #define ThreadPool_h #include <vector> #include <queue> #include <thread> #include <mutex> #include <condition_variable> #include <future> #include <functional> class ThreadPool { public: ThreadPool(size_t); //構造函數,size_t n 表示鏈接數 template<class F, class... Args> auto enqueue(F&& f, Args&&... args) //任務管道函數 -> std::future<typename std::result_of<F(Args...)>::type>; //利用尾置限定符 std future用來獲取異步任務的結果 ~ThreadPool(); private: // need to keep track of threads so we can join them std::vector< std::thread > workers; //追蹤線程 // the task queue std::queue< std::function<void()> > tasks; //任務隊列,用於存放沒有處理的任務。提供緩衝機制 // synchronization 同步? std::mutex queue_mutex; //互斥鎖 std::condition_variable condition; //條件變量? bool stop; }; // the constructor just launches some amount of workers inline ThreadPool::ThreadPool(size_t threads): stop(false) { for(size_t i = 0;i<threads;++i) workers.emplace_back( //如下爲構造一個任務,即構造一個線程 [this] { for(;;) { std::function<void()> task; //線程中的函數對象 {//大括號做用:臨時變量的生存期,即控制lock的時間 std::unique_lock<std::mutex> lock(this->queue_mutex); this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); }); //當stop==false&&tasks.empty(),該線程被阻塞 !this->stop&&this->tasks.empty() if(this->stop && this->tasks.empty()) return; task = std::move(this->tasks.front()); this->tasks.pop(); } task(); //調用函數,運行函數 } } ); } // add new work item to the pool template<class F, class... Args> auto ThreadPool::enqueue(F&& f, Args&&... args) //&& 引用限定符,參數的右值引用, 此處表示參數傳入一個函數 -> std::future<typename std::result_of<F(Args...)>::type> { using return_type = typename std::result_of<F(Args...)>::type; //packaged_task是對任務的一個抽象,咱們能夠給其傳遞一個函數來完成其構造。以後將任務投遞給任何線程去完成,經過 //packaged_task.get_future()方法獲取的future來獲取任務完成後的產出值 auto task = std::make_shared<std::packaged_task<return_type()> >( //指向F函數的智能指針 std::bind(std::forward<F>(f), std::forward<Args>(args)...) //傳遞函數進行構造 ); //future爲指望,get_future獲取任務完成後的產出值 std::future<return_type> res = task->get_future(); //獲取future對象,若是task的狀態不爲ready,會阻塞當前調用者 { std::unique_lock<std::mutex> lock(queue_mutex); //保持互斥性,避免多個線程同時運行一個任務 // don't allow enqueueing after stopping the pool if(stop) throw std::runtime_error("enqueue on stopped ThreadPool"); tasks.emplace([task](){ (*task)(); });
//將task投遞給線程去完成,vector尾部壓入,std::packaged_task 重載了 operator(),重載後的operator()執行function。所以能夠(*task)()能夠壓入vector<function<void()>> } condition.notify_one(); //選擇一個wait狀態的線程進行喚醒,並使他得到對象上的鎖來完成任務(即其餘線程沒法訪問對象) return res; }//notify_one不能保證得到鎖的線程真正須要鎖,而且所以可能產生死鎖 // the destructor joins all threads inline ThreadPool::~ThreadPool() { { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); //通知全部wait狀態的線程競爭對象的控制權,喚醒全部線程執行 for(std::thread &worker: workers) worker.join(); //由於線程都開始競爭了,因此必定會執行完,join可等待線程執行完 } #endif /* ThreadPool_h */
線程池大約100行,下面是運行代碼git
#include <iostream> #include <vector> #include <chrono> #include "ThreadPool.h" int main() { ThreadPool pool(4); std::vector< std::future<int> > results; for(int i = 0; i < 8; ++i) { results.emplace_back( pool.enqueue([i] { std::cout << "hello " << i << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "world " << i << std::endl; return i*i; }) ); } for(auto && result: results) //經過future.get()獲取返回值 std::cout << result.get() << ' '; std::cout << std::endl; return 0; }
代碼剖析
經過新建一個線程池類,以類來管理資源(《c++ effective》資源管理一章有提到)。該類包含3個公有成員函數與5個私有成員:構造函數與析構函數即知足(RAII:Resource Acquisition Is Initialization)。github
- 構造函數接受一個size_t類型的數,表示鏈接數
- enqueue表示線程池部分中的任務管道,是一個模板函數
- workers是一個成員爲thread的vector,用來監視線程狀態
- tasks表示線程池部分中的任務隊列,提供緩衝機制
- queue_mutex表示互斥鎖
- condition表示條件變量(互斥鎖,條件變量以及stop將在後面經過例子說明)
queue_mutex、condition與stop這三個成員讓初次接觸多線程的我很是的迷惑,互斥究竟是什麼意思?爲何須要一個bool量來控制?條件變量condition又是什麼?
不懂的能夠搜索:多線程的生產者與消費者模型
同時附上condition_variable詳解markdown
構造函數ThreadPOOL(size_t):
- 省略了參數
- emplace_back至關於push_back但比push_back更爲高效
- wokers壓入了一個lambda表達式(即一個匿名函數),表示一個任務(線程),使用for的無限循環,task表示函數對象,線程池中的函數接口在enqueue傳入的參數之中,condition.wait(lock,bool),當bool爲false的時候,線程將會被堵塞掛起,被堵塞時須要notify_one來喚醒線程才能繼續執行
任務隊列函數enqueue(F&& f, Args&&… args)
- 這類多參數模板的格式就是如此
- -> 尾置限定符,語法就是如此,用來推斷auto類型
- typename與class的區別
- result_of用來獲得返回類型的對象,它有一個成員::type
析構函數~ThreadPool()
- 經過notify_all能夠喚醒線程競爭任務的執行,從而使全部任務不被遺漏