基於C++11的線程池

本篇系C++ socket網絡爬蟲(1)的姊妹篇,寫網絡爬蟲怎麼能少得了線程呢html

 

源代碼地址:http://files.cnblogs.com/magicsoar/ThreadPoolProject.rar網絡

*須要C++11的支持,在vs2013下編譯經過socket

運行效果函數

image

 

背景this

在傳統的收到任務即建立線程的狀況下,咱們每收到一個任務,就建立一個線程,執行任務,銷燬線程,spa

咱們把這三個過程所用的時間分別記作T1,T2,T3操作系統

任務自己所用的時間僅佔T2/(T1+T2+T3),這在任務自己所用時間很短的狀況下, 效率是很低的線程

此外,一般操做系統所能建立的線程數量都是有限的,並不能無限制的建立線程。3d

 

而在線程池中,咱們一般會預先建立m個線程,放到空閒容器中,當有任務來臨時,線程池會從空閒的線程中挑選一個線程來執行該任務,code

在執行完畢後再將其放回空閒容器中

 

C++11

在C++11中,C++對線程提供了一個很高的抽象,並無很好的提供優先級控制等功能,須要調用std::thread::native_handle(),獲取原生線程對象

運行平臺特定的操做,但這就喪失了std::thread在不一樣平臺上代碼層面的一致性。

因此在項目中實現了對std::thread二次封裝,並提供了基本的優先級控制

 

項目概述

項目中有一個主線程,即運行程序時建立的線程能夠從用戶那裏獲取任務,還有一個管理線程,用於進行線程池中線程的調度,還有初始化線程池時建立的若干空閒線程,用於執行任務

 

項目中主要有如下幾個類:

Task:任務類,內有任務的優先級,和一個純虛Run方法,咱們須要派生Task,將要完成的任務寫到Run方法中

MyThread:線程類,封裝了C++11的thread,每個線程能夠關聯一個Task對象,執行其Run方法

BusyThreadContainer:工做容器類,採用std::list<MyThread*>實現,儲存工做狀態的線程

IdleThreadContainer:空閒容器類,採用std::vector<MyThread*>實現,儲存處於空閒狀態的線程

TaskContainer:任務容器類,採用priority_queue<Task*>實現,儲存全部用戶添加未執行的任務

MyThreadPool:線程池類,用於從用戶獲取任務,管理任務,實現對線程池中線程的調度

 

類圖以下

MainDig

*UserTask爲用戶本身編寫的從Task派生的任務類

 

Task類

namespace
{
    enum  PRIORITY
    {

        MIN = 1, NORMAL = 25, MAX = 50
    };
}

class Task
{
    
public:
    Task()
    {

    }
    void SetPriority(int priority)
    {
        if (priority>(PRIORITY::MAX))
        {
            priority = (PRIORITY::MAX);
        }
        else if (priority>(PRIORITY::MAX))
        {
            priority = (PRIORITY::MIN);
        }
    }    
    virtual void Run() = 0;
protected:
    int priority_;
};
 

void SetPriority(int priority) :設置線程的優先級,數值在1-50之間,值越大,優先級越高

virtual void run() = 0:線程執行的方法,用戶須要重寫爲本身的方法

 

MyThread類

class MyThread
{
    friend bool operator==(MyThread my1, MyThread my2);
    friend bool operator!=(MyThread my1, MyThread my2);
public:
    MyThread(MyThreadPool *pool);
    void Assign(Task *Task);
    void Run();
    void StartThread();
    int getthreadid();
    void setisdetach(bool isdetach);    
private:
    MyThreadPool *mythreadpool_;
    static int  s_threadnumber;
    bool isdetach_;
    Task *task_;
    int threadid_;
    std::thread thread_;
};

方法:

MyThread(MyThreadPool *pool):構造一個MyThread對象,將本身與指定的線程池相關聯起來

void Assign(Task *Task):將一個任務與該線程相關聯起來

void Run():調用了Task的Run方法,同時在Task的Run方法結束後將本身從工做容器移回空閒容器

void StartThread():執行線程的Run方法,即執行了Task的Run方法

int getthreadid():獲取線程的id號

void setisdetach(bool isdetach):設置線程在運行的時候是join仍是detach的

 

BusyThreadContainer類

class BusyThreadContainer
{
    
public:
    BusyThreadContainer();
    ~BusyThreadContainer();
    void push(MyThread *m);
    std::list<MyThread*>::size_type size();
    void erase(MyThread *m);

private:
    std::list<MyThread*> busy_thread_container_;
    typedef std::list<MyThread*> Container;
    typedef Container::iterator Iterator;
};
 

void push(MyThread *m):將一個線程放入工做容器中

void erase(MyThread *m):刪除一個指定的線程

std::list<MyThread*>::size_type size():返回工做容器的大小

 

 

IdleThreadContainer類

class IdleThreadContainer
{
    
public:
    IdleThreadContainer();
    ~IdleThreadContainer();
    std::vector<MyThread*>::size_type size();
    void push(MyThread *m);
    void assign(int n,MyThreadPool* m);    
    MyThread* top();
    void pop();
    void erase(MyThread *m);
private:
    std::vector<MyThread*> idle_thread_container_;
    typedef std::vector<MyThread*> Container;
    typedef Container::iterator Iterator;
};

~IdleThreadContainer(); :負責析構空閒容器中的線程

void push(MyThread *m):將一個線程放回空閒容器中

void assign(int n,MyThreadPool* m):建立n個線程與線程池m相關聯的線程放入空閒容器中

MyThread* top():返回位於空閒容器頂端的線程

void pop():彈出空閒容器頂端的線程

void erase(MyThread *m):刪除一個指定的線程

 

 

 

TaskContainer類

class TaskContainer
{
public:
    TaskContainer();
    ~TaskContainer();
    void push(Task *);
    Task* top();
    void pop();
    std::priority_queue<Task*>::size_type size();
private:
    std::priority_queue<Task*> task_container_;
};

void push(Task *):將一個任務放入任務容器中

Task* top():返回任務容器頂端的任務

void pop():將任務容器頂端的線程彈出

std::priority_queue<Task*>::size_type size():返回任務容器的大小

 

MyThreadPool類

class MyThreadPool
{
public:
    
    MyThreadPool(){}
    MyThreadPool(int number);
    ~MyThreadPool();
    void AddTask(Task *Task,int priority);
    void AddIdleThread(int n);
    void RemoveThreadFromBusy(MyThread *myThread);
    void Start();
    void EndMyThreadPool();private:
    BusyThreadContainer busy_thread_container_;
    IdleThreadContainer idle_thread_container_;
    bool issurvive_;
    TaskContainer task_container_;
    std::thread thread_this_;
    std::mutex busy_mutex_;
    std::mutex idle_mutex_;
    std::mutex task_mutex_;
    int number_of_thread_;
};

MyThreadPool(int number):構造MyThreadPool,建立包含number個線程的空閒容器

void AddTask(Task *Task,int priority):添加一個優先級爲priority的任務到任務容器中

void AddIdleThread(int n):在建立n個空閒線程到空閒容器中

void RemoveThreadFromBusy(MyThread *myThread):將一個線程從工做容器中刪除,並移回空閒容器中

void Start():判斷是否有空閒線程,若有將任務從從任務容器中提出,放入空閒容器中,等待執行

void EndMyThreadPool():結束線程池的運行

 

派生自Task的MyTask類

class MyTask :public Task
{
    friend bool operator<(MyTask  &lv,MyTask &rv)
    {
        return lv.priority_ < rv.priority_;
    }
public:
    MyTask();
    ~MyTask();
    virtual void Run();
    void setdata(int d);
private:
    int data_;
};
MyTask::MyTask()
{
}
MyTask::~MyTask()
{
}
void MyTask::setdata(int d)
{
    data_ = d;
}
void MyTask::Run()
{
    std::cout << "Hello I am "<<data_ << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(1));
}

friend bool operator<(MyTask &lv,MyTask &rv) :用於肯定任務在任務容器中的位置

Run:自定義的Run方法

void setdata(int d):設置數據

 

關鍵代碼分析:

void MyThread::Run()

void MyThread::Run()
{
    cout <<"Thread:"<< threadid_ << " run ";
    task_->Run();
    mythreadpool_->RemoveThreadFromBusy(this);
}

調用了Task的Run方法,同時在Task的Run方法結束後,通知線程池將本身從工做容器中移回空閒容器

 

void MyThread::StartThread()

void MyThread::StartThread()
{
    thread_ = thread(&MyThread::Run, this);
    if (isdetach_ == true)
        thread_.detach();
    else
        thread_.join();
}

將MyThread的Run方法與thread_相綁定,this表示類的Run方法的第一個隱含的參數

而後根據isdetach的值,判斷是否detach() or join()

 

void MyThreadPool::RemoveThreadFromBusy(MyThread *myThread)

void MyThreadPool::RemoveThreadFromBusy(MyThread *myThread)
{

    busy_mutex_.lock();
    cout << "Thread:" << myThread->getthreadid()<< " remove from busylist" << endl;
    busy_thread_container_.erase(myThread);
    busy_mutex_.unlock();

    idle_mutex_.lock();
    idle_thread_container_.push(myThread);
    idle_mutex_.unlock();
}

將一個線程從任務容器中移除,並將其放回空閒容器中,

使用busy_mutex_和idle_mutex_進行加鎖和解鎖,確保數據的一致性

 

MyThreadPool::MyThreadPool(int number)

MyThreadPool::MyThreadPool(int number)
{
    issurvive_ = true;
    number_of_thread_ = number;
    idle_thread_container_.assign(number, this);
    thread_this_ =thread(&MyThreadPool::Start, this);
    thread_this_.detach();
}

MyThreadPool的構造函數,建立number個空閒線程與空閒容器中,同時建立管理線程thread_this,用於進行線程池中線程的調度

 

void MyThreadPool::Start()

void MyThreadPool::Start()
{
    
    while (true)
    {
        if (issurvive_==false)
        {
            busy_mutex_.lock();
            if (busy_thread_container_.size()!=0)
            {
                busy_mutex_.unlock();
                continue;
            }
            busy_mutex_.unlock();
            break;
        }
        idle_mutex_.lock();
        if (idle_thread_container_.size() == 0)
        {
            idle_mutex_.unlock();
            continue;
        }
        idle_mutex_.unlock();
        task_mutex_.lock();
        if (task_container_.size() == 0)
        {
            task_mutex_.unlock();
            continue;
        }
        Task *b = task_container_.top();;
        task_container_.pop();
        task_mutex_.unlock();
        
        idle_mutex_.lock();
        MyThread *mythread = idle_thread_container_.top();;
        idle_thread_container_.pop();
        mythread->Assign(b);
        idle_mutex_.unlock();

        busy_mutex_.lock();
        busy_thread_container_.push(mythread);
        busy_mutex_.unlock();
        mythread->StartThread();
    }
}

管理線程對應的Start方法,內有一個死循環,不停的判斷任務容器中是否有任務,和是否有空閒線程來執行任務,如有,則將任務從

任務容器中提出,從空閒線程中提取出一個空閒線程與其綁定,執行該任務,同時將該線程從空閒容器移動到工做容器中。

當線程池想要結束運行時,即survive爲false時,首先要判斷工做容器是否爲空,若不爲空,則表明還有任務正在被線程執行,線程池不能結束運行

不然能夠結束線程池的運行,跳出死循環

 

int main()

int main()
{
    MyThreadPool mythreadPool(10);

    MyTask j[50];
    for (int i = 0; i < 50;i++)
    {
        j[i].setdata(i);
    }
    for (int i = 0; i < 50; i++)
    {
        mythreadPool.AddTask(&j[i],i);
    }
    int i;
    //按100添加一個任務
    //按-1結束線程池
    while (true)
    {
        cin >> i;    
        if (i == 100)
        {
            MyTask j;
            j.setdata(i);
            mythreadPool.AddTask(&j, i);
        }
        if (i == -1)
        {        
            mythreadPool.EndMyThreadPool();
            break;
        }        
    }
    system("pause");
}

建立了一個含有10個空閒線程的線程池,和50個MyTask任務,並將其放入線程池中等待運行

在循環中,用戶輸入100能夠再添加一個任務到線程池中等待運行,輸入-1結束線程池的運行。

運行結果以下

image

 

 

線程池使用後記

線程池並非萬能的,線程池減小了建立與銷燬線程自己對任務照成的影響,但若是任務自己的運行時間很長,那麼這些開銷至關於任務自己執行開銷而言是能夠忽略的。那麼咱們也能夠

選擇「即時建立,即時銷燬」的策略

線程池一般適合下面的幾個場合:

(1)  單位時間內處理的任務數較多,且每一個任務的執行時間較短

(2)  對實時性要求較高的任務,若是接受到任務後在建立線程,再執行任務,可能知足不了實時要求,所以必須採用線程池進行預建立。

相關文章
相關標籤/搜索