本篇系C++ socket網絡爬蟲(1)的姊妹篇,寫網絡爬蟲怎麼能少得了線程呢html
源代碼地址:http://files.cnblogs.com/magicsoar/ThreadPoolProject.rar網絡
*須要C++11的支持,在vs2013下編譯經過socket
運行效果函數
背景this
在傳統的收到任務即建立線程的狀況下,咱們每收到一個任務,就建立一個線程,執行任務,銷燬線程,spa
咱們把這三個過程所用的時間分別記作T1,T2,T3操作系統
任務自己所用的時間僅佔T2/(T1+T2+T3),這在任務自己所用時間很短的狀況下, 效率是很低的線程
此外,一般操做系統所能建立的線程數量都是有限的,並不能無限制的建立線程。3d
而在線程池中,咱們一般會預先建立m個線程,放到空閒容器中,當有任務來臨時,線程池會從空閒的線程中挑選一個線程來執行該任務,code
在執行完畢後再將其放回空閒容器中
C++11
在C++11中,C++對線程提供了一個很高的抽象,並無很好的提供優先級控制等功能,須要調用std::thread::native_handle(),獲取原生線程對象
運行平臺特定的操做,但這就喪失了std::thread在不一樣平臺上代碼層面的一致性。
因此在項目中實現了對std::thread二次封裝,並提供了基本的優先級控制
項目概述
項目中有一個主線程,即運行程序時建立的線程能夠從用戶那裏獲取任務,還有一個管理線程,用於進行線程池中線程的調度,還有初始化線程池時建立的若干空閒線程,用於執行任務
項目中主要有如下幾個類:
Task:任務類,內有任務的優先級,和一個純虛Run方法,咱們須要派生Task,將要完成的任務寫到Run方法中
MyThread:線程類,封裝了C++11的thread,每個線程能夠關聯一個Task對象,執行其Run方法
BusyThreadContainer:工做容器類,採用std::list<MyThread*>實現,儲存工做狀態的線程
IdleThreadContainer:空閒容器類,採用std::vector<MyThread*>實現,儲存處於空閒狀態的線程
TaskContainer:任務容器類,採用priority_queue<Task*>實現,儲存全部用戶添加未執行的任務
MyThreadPool:線程池類,用於從用戶獲取任務,管理任務,實現對線程池中線程的調度
類圖以下
*UserTask爲用戶本身編寫的從Task派生的任務類
Task類
namespace { enum PRIORITY { MIN = 1, NORMAL = 25, MAX = 50 }; } class Task { public: Task() { } void SetPriority(int priority) { if (priority>(PRIORITY::MAX)) { priority = (PRIORITY::MAX); } else if (priority>(PRIORITY::MAX)) { priority = (PRIORITY::MIN); } } virtual void Run() = 0; protected: int priority_; };
void SetPriority(int priority) :設置線程的優先級,數值在1-50之間,值越大,優先級越高
virtual void run() = 0:線程執行的方法,用戶須要重寫爲本身的方法
MyThread類
class MyThread { friend bool operator==(MyThread my1, MyThread my2); friend bool operator!=(MyThread my1, MyThread my2); public: MyThread(MyThreadPool *pool); void Assign(Task *Task); void Run(); void StartThread(); int getthreadid(); void setisdetach(bool isdetach); private: MyThreadPool *mythreadpool_; static int s_threadnumber; bool isdetach_; Task *task_; int threadid_; std::thread thread_; };
方法:
MyThread(MyThreadPool *pool):構造一個MyThread對象,將本身與指定的線程池相關聯起來
void Assign(Task *Task):將一個任務與該線程相關聯起來
void Run():調用了Task的Run方法,同時在Task的Run方法結束後將本身從工做容器移回空閒容器
void StartThread():執行線程的Run方法,即執行了Task的Run方法
int getthreadid():獲取線程的id號
void setisdetach(bool isdetach):設置線程在運行的時候是join仍是detach的
BusyThreadContainer類
class BusyThreadContainer { public: BusyThreadContainer(); ~BusyThreadContainer(); void push(MyThread *m); std::list<MyThread*>::size_type size(); void erase(MyThread *m); private: std::list<MyThread*> busy_thread_container_; typedef std::list<MyThread*> Container; typedef Container::iterator Iterator; };
void push(MyThread *m):將一個線程放入工做容器中
void erase(MyThread *m):刪除一個指定的線程
std::list<MyThread*>::size_type size():返回工做容器的大小
IdleThreadContainer類
class IdleThreadContainer { public: IdleThreadContainer(); ~IdleThreadContainer(); std::vector<MyThread*>::size_type size(); void push(MyThread *m); void assign(int n,MyThreadPool* m); MyThread* top(); void pop(); void erase(MyThread *m); private: std::vector<MyThread*> idle_thread_container_; typedef std::vector<MyThread*> Container; typedef Container::iterator Iterator; };
~IdleThreadContainer(); :負責析構空閒容器中的線程
void push(MyThread *m):將一個線程放回空閒容器中
void assign(int n,MyThreadPool* m):建立n個線程與線程池m相關聯的線程放入空閒容器中
MyThread* top():返回位於空閒容器頂端的線程
void pop():彈出空閒容器頂端的線程
void erase(MyThread *m):刪除一個指定的線程
TaskContainer類
class TaskContainer { public: TaskContainer(); ~TaskContainer(); void push(Task *); Task* top(); void pop(); std::priority_queue<Task*>::size_type size(); private: std::priority_queue<Task*> task_container_; };
void push(Task *):將一個任務放入任務容器中
Task* top():返回任務容器頂端的任務
void pop():將任務容器頂端的線程彈出
std::priority_queue<Task*>::size_type size():返回任務容器的大小
MyThreadPool類
class MyThreadPool { public: MyThreadPool(){} MyThreadPool(int number); ~MyThreadPool(); void AddTask(Task *Task,int priority); void AddIdleThread(int n); void RemoveThreadFromBusy(MyThread *myThread); void Start(); void EndMyThreadPool();private: BusyThreadContainer busy_thread_container_; IdleThreadContainer idle_thread_container_; bool issurvive_; TaskContainer task_container_; std::thread thread_this_; std::mutex busy_mutex_; std::mutex idle_mutex_; std::mutex task_mutex_; int number_of_thread_;
};
MyThreadPool(int number):構造MyThreadPool,建立包含number個線程的空閒容器
void AddTask(Task *Task,int priority):添加一個優先級爲priority的任務到任務容器中
void AddIdleThread(int n):在建立n個空閒線程到空閒容器中
void RemoveThreadFromBusy(MyThread *myThread):將一個線程從工做容器中刪除,並移回空閒容器中
void Start():判斷是否有空閒線程,若有將任務從從任務容器中提出,放入空閒容器中,等待執行
void EndMyThreadPool():結束線程池的運行
派生自Task的MyTask類
class MyTask :public Task { friend bool operator<(MyTask &lv,MyTask &rv) { return lv.priority_ < rv.priority_; } public: MyTask(); ~MyTask(); virtual void Run(); void setdata(int d); private: int data_; };
MyTask::MyTask() { } MyTask::~MyTask() { } void MyTask::setdata(int d) { data_ = d; } void MyTask::Run() { std::cout << "Hello I am "<<data_ << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); }
friend bool operator<(MyTask &lv,MyTask &rv) :用於肯定任務在任務容器中的位置
Run:自定義的Run方法
void setdata(int d):設置數據
關鍵代碼分析:
void MyThread::Run()
void MyThread::Run() { cout <<"Thread:"<< threadid_ << " run "; task_->Run(); mythreadpool_->RemoveThreadFromBusy(this); }
調用了Task的Run方法,同時在Task的Run方法結束後,通知線程池將本身從工做容器中移回空閒容器
void MyThread::StartThread()
void MyThread::StartThread() { thread_ = thread(&MyThread::Run, this); if (isdetach_ == true) thread_.detach(); else thread_.join(); }
將MyThread的Run方法與thread_相綁定,this表示類的Run方法的第一個隱含的參數
而後根據isdetach的值,判斷是否detach() or join()
void MyThreadPool::RemoveThreadFromBusy(MyThread *myThread)
void MyThreadPool::RemoveThreadFromBusy(MyThread *myThread) { busy_mutex_.lock(); cout << "Thread:" << myThread->getthreadid()<< " remove from busylist" << endl; busy_thread_container_.erase(myThread); busy_mutex_.unlock(); idle_mutex_.lock(); idle_thread_container_.push(myThread); idle_mutex_.unlock(); }
將一個線程從任務容器中移除,並將其放回空閒容器中,
使用busy_mutex_和idle_mutex_進行加鎖和解鎖,確保數據的一致性
MyThreadPool::MyThreadPool(int number)
MyThreadPool::MyThreadPool(int number) { issurvive_ = true; number_of_thread_ = number; idle_thread_container_.assign(number, this); thread_this_ =thread(&MyThreadPool::Start, this); thread_this_.detach(); }
MyThreadPool的構造函數,建立number個空閒線程與空閒容器中,同時建立管理線程thread_this,用於進行線程池中線程的調度
void MyThreadPool::Start()
void MyThreadPool::Start() { while (true) { if (issurvive_==false) { busy_mutex_.lock(); if (busy_thread_container_.size()!=0) { busy_mutex_.unlock(); continue; } busy_mutex_.unlock(); break; } idle_mutex_.lock(); if (idle_thread_container_.size() == 0) { idle_mutex_.unlock(); continue; } idle_mutex_.unlock(); task_mutex_.lock(); if (task_container_.size() == 0) { task_mutex_.unlock(); continue; } Task *b = task_container_.top();; task_container_.pop(); task_mutex_.unlock(); idle_mutex_.lock(); MyThread *mythread = idle_thread_container_.top();; idle_thread_container_.pop(); mythread->Assign(b); idle_mutex_.unlock(); busy_mutex_.lock(); busy_thread_container_.push(mythread); busy_mutex_.unlock(); mythread->StartThread(); } }
管理線程對應的Start方法,內有一個死循環,不停的判斷任務容器中是否有任務,和是否有空閒線程來執行任務,如有,則將任務從
任務容器中提出,從空閒線程中提取出一個空閒線程與其綁定,執行該任務,同時將該線程從空閒容器移動到工做容器中。
當線程池想要結束運行時,即survive爲false時,首先要判斷工做容器是否爲空,若不爲空,則表明還有任務正在被線程執行,線程池不能結束運行
不然能夠結束線程池的運行,跳出死循環
int main()
int main() { MyThreadPool mythreadPool(10); MyTask j[50]; for (int i = 0; i < 50;i++) { j[i].setdata(i); } for (int i = 0; i < 50; i++) { mythreadPool.AddTask(&j[i],i); } int i; //按100添加一個任務 //按-1結束線程池 while (true) { cin >> i; if (i == 100) { MyTask j; j.setdata(i); mythreadPool.AddTask(&j, i); } if (i == -1) { mythreadPool.EndMyThreadPool(); break; } } system("pause"); }
建立了一個含有10個空閒線程的線程池,和50個MyTask任務,並將其放入線程池中等待運行
在循環中,用戶輸入100能夠再添加一個任務到線程池中等待運行,輸入-1結束線程池的運行。
運行結果以下
線程池使用後記
線程池並非萬能的,線程池減小了建立與銷燬線程自己對任務照成的影響,但若是任務自己的運行時間很長,那麼這些開銷至關於任務自己執行開銷而言是能夠忽略的。那麼咱們也能夠
選擇「即時建立,即時銷燬」的策略
線程池一般適合下面的幾個場合:
(1) 單位時間內處理的任務數較多,且每一個任務的執行時間較短
(2) 對實時性要求較高的任務,若是接受到任務後在建立線程,再執行任務,可能知足不了實時要求,所以必須採用線程池進行預建立。