線程池介紹
服務器完成一項任務的時間可分爲:T1:建立線程或進程時間;T2:執行任務時間;T3:銷燬進程或線程時間。一般T1+T3的時間大於T2,線程池正是關注如何縮短T1和T3的時間。
線程池經過在系統中預先建立必定數量的線程,當任務請求到來時從線程池中分一個預先建立的線程去處理任務,線程在處理完任務後還能夠重用,不會銷燬,繼續等待下次任務的到開。這樣能避免大量的線程建立和銷燬操做,從而節省系統資源;同時有不少任務時,也會減小建立線程的數量。
用C++11的線程相關特性,好比線程、條件變量、互斥量,讓咱們編寫併發程序更簡單。
linux
半同步半異步線程池實現的關鍵技術分析
線程池又三層組成:
1. 同步服務層:不斷的將新任務添加到同步隊列中,能夠用多路複用或者多線程來完成。一開始沒看懂任務是什麼,其實一個函數就是一個任務,C++11經過std::function將函數封裝爲類模板對象,能夠將這些任務(函數)放到容器中保存起來,以進行添加讀取任務操做。
2. 排隊層:就是一個同步隊列,處於核心地位。全部待處理的任務都存在這裏,要保證隊列中共享數據線程安全(加鎖),還要控制任務的數量,上層服務層往隊列添加任務,下層從這裏取任務去執行。
3. 異步服務層: 預先建立好線程,來並行處理隊列中的任務。
ios
本身畫了一張圖:安全
代碼實現與分析
用到了不少C++11的特性,裏面寫了不少註釋,是根據本身理解分析的。服務器
同步隊列:多線程
#include <iostream> #include <string> #include <stack> #include <vector> #include <algorithm> #include <cstdio> #include <list> #include <thread> #include <mutex> #include <condition_variable> using namespace std; template<typename T> class Syncqueue { public: //初始化,隊列的最大元素個數,開始不終止 Syncqueue( int maxsize ) : m_maxsize(maxsize),m_needStop(false){} //往隊列中添加任務,重載兩個版本,左值和右值引用 void Put( const T& x ) { Add(x); } void Put(T&& x) { Add(forward<T>(x)); } //Take和Add相似 void Take(list<T>& list) { unique_lock<mutex> locker(m_mtx); m_notEmpty.wait(locker, [this] { return m_needStop|| NotEmpty(); });//中止或者不空就繼續執行,不用wait if(m_needStop) return ; //一次加鎖,一下取出隊列中的全部數據 list = move(m_queue); //經過移動,將 m_queue 轉移到 list,而不是拷貝 m_notFull.notify_one(); //喚醒線程去添加任務 } //每次獲取一個數據,效率較低 void Take(T& t) { unique_lock<mutex> locker(m_mtx); m_notFull.wait(locker, [this] { return m_needStop || NotEmpty(); }); if(m_needStop) return ; t = m_queue.front(); //取出一個 m_queue.pop_front(); m_notFull.notify_one(); } //方便讓用戶能終止任務 void Stop() { { lock_guard<mutex> locker(m_mtx); m_needStop = true; //將須要中止標誌 置爲 true //執行到這,lock_guard釋放鎖 } //喚醒全部等待的線程,到if(m_needStop)時爲真,而後相繼退出 m_notEmpty.notify_all(); //被喚醒的線程直接獲取鎖 m_notFull.notify_all(); } //判斷隊列是否爲空 bool Empty() { lock_guard<mutex> locker(m_mtx); return m_queue.empty(); } //判斷隊列滿了 bool Full() { lock_guard<mutex> locker(m_mtx); return m_queue.size() == m_maxsize; } //隊列大小 size_t Size() { lock_guard<mutex> locker(m_mtx); return m_queue.size(); } private: //隊列未滿 bool NotFull() const { bool full = m_queue.size() >= m_maxsize; if(full) cout << "緩衝區滿了,須要等待…… " << endl; return !full; } //隊列不空 bool NotEmpty() const { bool empty = m_queue.empty(); if(empty) cout << "緩衝區空了,須要等待…… 異步層的線程id: " << this_thread::get_id() << endl; return !empty; } //範型事件函數 template<typename F> void Add(F&& x) { unique_lock<mutex> locker(m_mtx); m_notFull.wait(locker,[this]{ return m_needStop|| NotFull(); }); //須要中止 或者 不滿則繼續往下執行,不然wait if(m_needStop) return; //若是須要終止就 return m_queue.push_back(forward<F>(x)); //不終止,把任務添加到同步隊列 m_notEmpty.notify_one(); //提醒線程隊列不爲空,喚醒線程去取數據 } private: list<T> m_queue; //緩衝區 用鏈表實現 mutex m_mtx; //互斥量 condition_variable m_notEmpty; //不爲空的條件變量 condition_variable m_notFull; //沒有滿的條件變量 int m_maxsize; //同步隊列最大的size bool m_needStop; //中止的標誌,開始是false };
線程池:併發
/************************************************************************* > File Name: ThreadPool.h > Author: Tanswer > Mail: duxm@xiyoulinux.org > Created Time: 2017年08月10日 星期四 14時46分51秒 ************************************************************************/ #include <iostream> #include <string> #include <stack> #include <vector> #include <list> #include <algorithm> #include <cstdio> #include <thread> #include <mutex> #include <atomic> #include <condition_variable> #include <functional> #include "Syncqueue.h" using namespace std; const int MaxTaskCount = 100; //最大任務數量 class ThreadPool { public: using Task = function<void()>; //任務類型,這裏是無參數無返回值,能夠修改成任何類型的範型函數模板 //hardware_concurrency CPU核數 當默認線程數 ThreadPool(int numThreads = thread::hardware_concurrency()) : m_queue(MaxTaskCount) { Start(numThreads); //啓動 } ~ThreadPool() { Stop(); //若是沒有中止時,則主動終止線程池 } //終止線程池,銷燬池中全部線程 void Stop() { //保證多線程狀況下只調用一次StopThreadGroup call_once(m_flag, [this] { StopThreadGroup(); }); } //同步服務層:往同步隊列中添加任務,兩個版本 void AddTask(Task&& task) { m_queue.Put(forward<Task>(task)); } void AddTask(const Task& task) { m_queue.Put(task); } private: void Start( int numThreads ) //線程池開始,預先建立包含numThreads 個線程的線程組 { m_running = true; //建立線程組 for(int i=0; i<numThreads; i++) { //智能指針管理,給出線程函數&ThreadPool::RunInThread 和對應參數this m_threadgroup.push_back( make_shared<thread>(&ThreadPool::RunInThread, this) ); } } void RunInThread() { while(m_running) { //一次取出隊列中全部任務 list<Task> list; m_queue.Take(list); for(auto& task : list) { if(!m_running) //若是中止 return ; task(); //執行任務 } } } //終止線程池,銷燬池中全部線程 void StopThreadGroup() { m_queue.Stop(); //讓同步隊列中的線程中止 m_running = false; //讓內部線程跳出循環並退出 for(auto thread : m_threadgroup) { if(thread) thread -> join(); } m_threadgroup.clear(); } private: list<shared_ptr<thread>> m_threadgroup; //處理任務的線程組,用list保存 Syncqueue<Task> m_queue; //同步隊列 atomic_bool m_running; //是否中止的標誌 once_flag m_flag; //call_once的參數 };
測試例子:異步
/************************************************************************* > File Name: TestThreadPool.cpp > Author: Tanswer > Mail: duxm@xiyoulinux.org > Created Time: 2017年08月10日 星期四 16時49分35秒 ************************************************************************/ #include <iostream> #include <string> #include <stack> #include <vector> #include <algorithm> #include <cstdio> #include <thread> #include "ThreadPool.h" using namespace std; void TestThreadPool() { ThreadPool pool(2); //線程池建立兩個線程,異步層此時無任務須要先等待 //pool.Start(2); //建立兩個同步層的線程不斷往線程池中添加任務 //在這任務很簡單,打印同步層線程ID,用lambda表達式表示,每一個線程處理10個任務 thread thd1( [&pool]{ for(int i=0; i<10; i++ ) { auto thdId = this_thread::get_id(); pool.AddTask( [thdId]{ cout << "同步層線程1的線程ID: " << thdId << endl; } ); } } ); thread thd2( [&pool]{ for( int i=0; i<10; i++ ) { auto thdId = this_thread::get_id(); pool.AddTask( [thdId]{ cout << "同步層線程2的線程ID: " << thdId << endl; } ); } } ); this_thread::sleep_for(chrono::seconds(2)); getchar(); //中止線程池 pool.Stop(); //等待同步層的兩個線程執行完 thd1.join(); thd2.join(); } int main() { TestThreadPool(); exit(EXIT_SUCCESS); }
測試結果:函數
線程池預先建立了兩個線程,線程ID分別爲: 140141544822528 和 140141536429824,開始時同步隊列是空的,尚未任務,因此兩個線程都等待。而後建立了兩個同步層線程1和2,線程ID分別爲:140141528037120 和 140141519644416,這兩個線程開始不斷往線程池同步隊列中添加任務。有了任務後,線程池異步層中的兩個線程開始處理任務,任務很簡單,就是打印同步層線程ID,異步層線程交替處理上層的任務。測試