Linux C++線程池框架

       本文給出了一個通用的線程池框架,該框架將與線程執行相關的任務進行了高層次的抽象,使之與具體的執行任務無關。另外該線程池具備動態伸縮性,它能根據執行任務的輕重自動調整線程池中線程的數量。文章的最後,咱們給出一個簡單示例程序,經過該示例程序,咱們會發現,經過該線程池框架執行多線程任務是多麼的簡單。


1. 爲何須要線程池數據庫

  目前的大多數網絡服務器,包括Web服務器、Email服務器以及數據庫服務器等都具備一個共同點,就是單位時間內必須處理數目巨大的鏈接請求,但處理時間卻相對較短。
  傳統多線程方案中咱們採用的服務器模型則是一旦接受到請求以後,即建立一個新的線程,由該線程執行任務。任務執行完畢後,線程退出,這就是是「即時建立,即時銷燬」的策略。儘管與建立進程相比,建立線程的時間已經大大的縮短,可是若是提交給線程的任務是執行時間較短,並且執行次數極其頻繁,那麼服務器將處於不停的建立線程,銷燬線程的狀態。
  咱們將傳統方案中的線程執行過程分爲三個過程:T一、T二、T3。
  T1:線程建立時間
  T2:線程執行時間,包括線程的同步等時間
  T3:線程銷燬時間
  那麼咱們能夠看出,線程自己的開銷所佔的比例爲(T1+T3) / (T1+T2+T3)。若是線程執行的時間很短的話,這比開銷可能佔到20%-50%左右。若是任務執行時間很頻繁的話,這筆開銷將是不可忽略的。
  除此以外,線程池可以減小建立的線程個數。一般線程池所容許的併發線程是有上界的,若是同時須要併發的線程數超過上界,那麼一部分線程將會等待。而傳統方案中,若是同時請求數目爲2000,那麼最壞狀況下,系統可能須要產生2000個線程。儘管這不是一個很大的數目,可是也有部分機器可能達不到這種要求。
  所以線程池的出現正是着眼於減小線程池自己帶來的開銷。線程池採用預建立的技術,在應用程序啓動以後,將當即建立必定數量的線程(N1),放入空閒隊列中。這些線程都是處於阻塞(Suspended)狀態,不消耗CPU,但佔用較小的內存空間。當任務到來後,緩衝池選擇一個空閒線程,把任務傳入此線程中運行。當N1個線程都在處理任務後,緩衝池自動建立必定數量的新線程,用於處理更多的任務。在任務執行完畢後線程也不退出,而是繼續保持在池中等待下一次的任務。當系統比較空閒時,大部分線程都一直處於暫停狀態,線程池自動銷燬一部分線程,回收系統資源。
  基於這種預建立技術,線程池將線程建立和銷燬自己所帶來的開銷分攤到了各個具體的任務上,執行次數越多,每一個任務所分擔到的線程自己開銷則越小,不過咱們另外可能須要考慮進去線程之間同步所帶來的開銷。

2. 構建線程池框架服務器

  通常線程池都必須具有下面幾個組成部分:
  線程池管理器:用於建立並管理線程池
  工做線程:線程池中實際執行的線程
  任務接口:儘管線程池大多數狀況下是用來支持網絡服務器,可是咱們將線程執行的任務抽象出來,造成任務接口,從而是的線程池與具體的任務無關。
  任務隊列:線程池的概念具體到實現則多是隊列,鏈表之類的數據結構,其中保存執行線程。
  咱們實現的通用線程池框架由五個重要部分組成CThreadManage,CThreadPool,CThread,CJob,CWorkerThread,除此以外框架中還包括線程同步使用的類CThreadMutex和CCondition。
  CJob是全部的任務的基類,其提供一個接口Run,全部的任務類都必須從該類繼承,同時實現Run方法。該方法中實現具體的任務邏輯。
  CThread是Linux中線程的包裝,其封裝了Linux線程最常用的屬性和方法,它也是一個抽象類,是全部線程類的基類,具備一個接口Run。
  CWorkerThread是實際被調度和執行的線程類,其從CThread繼承而來,實現了CThread中的Run方法。
  CThreadPool是線程池類,其負責保存線程,釋放線程以及調度線程。
  CThreadManage是線程池與用戶的直接接口,其屏蔽了內部的具體實現。
  CThreadMutex用於線程之間的互斥。
  CCondition則是條件變量的封裝,用於線程之間的同步。
  它們的類的繼承關係以下圖所示: (TO ADD) 
  線程池的時序很簡單,以下圖所示:(TO ADD)。
  CThreadManage直接跟客戶端打交道,其接受須要建立的線程初始個數,並接受客戶端提交的任務。這兒的任務是具體的非抽象的任務。CThreadManage的內部實際上調用的都是CThreadPool的相關操做。CThreadPool建立具體的線程,並把客戶端提交的任務分發給CWorkerThread,CWorkerThread實際執行具體的任務。

3. 理解系統組件網絡

  下面咱們分開來了解系統中的各個組件。
  CThreadManage
  CThreadManage的功能很是簡單,其提供最簡單的方法,其類定義以下:數據結構

 
class CThreadManage {
private:
    CThreadPool* m_Pool;
    int m_NumOfThread;
public:
    CThreadManage();
    CThreadManage(int num);
    virtual ~CThreadManage();

    void SetParallelNum(int num); 
    void Run(CJob* job,void* jobdata);
    void TerminateAll(void);
};
 

  其中m_Pool指向實際的線程池;m_NumOfThread是初始建立時候容許建立的併發的線程個數。另外Run和TerminateAll方法也很是簡單,只是簡單的調用CThreadPool的一些相關方法而已。其具體的實現以下:多線程

複製代碼
CThreadManage::CThreadManage() {
    m_NumOfThread = 10;
    m_Pool = new CThreadPool(m_NumOfThread);
}

CThreadManage::CThreadManage(int num) {
    m_NumOfThread = num;
    m_Pool = new CThreadPool(m_NumOfThread);
}

CThreadManage::~CThreadManage() {
    if(NULL != m_Pool)
        delete m_Pool;
}

void CThreadManage::SetParallelNum(int num) {
    m_NumOfThread = num;
}

void CThreadManage::Run(CJob* job,void* jobdata) {
    m_Pool->Run(job,jobdata);
}

void CThreadManage::TerminateAll(void) {
    m_Pool->TerminateAll();
}
 

  CThread併發

  CThread 類實現了對Linux中線程操做的封裝,它是全部線程的基類,也是一個抽象類,提供了一個抽象接口Run,全部的CThread都必須實現該Run方法。CThread的定義以下所示:框架

 
class CThread {
private:
    int m_ErrCode;
    Semaphore m_ThreadSemaphore; //the inner semaphore, which is used to realize
    unsigned long m_ThreadID; 
    bool m_Detach; //The thread is detached
    bool m_CreateSuspended; //if suspend after creating
    char* m_ThreadName;
    ThreadState m_ThreadState; //the state of the thread
protected:
    void SetErrcode(int errcode){m_ErrCode = errcode;}
    static void* ThreadFunction(void*);
public:
    CThread();
    CThread(bool createsuspended,bool detach);
    virtual ~CThread();

    virtual void Run(void) = 0;
    void SetThreadState(ThreadState state){m_ThreadState = state;}
    bool Terminate(void); //Terminate the threa
    bool Start(void); //Start to execute the thread
    void Exit(void);
    bool Wakeup(void);
    ThreadState GetThreadState(void){return m_ThreadState;}
    int GetLastError(void){return m_ErrCode;}
    void SetThreadName(char* thrname){strcpy(m_ThreadName,thrname);}
    char* GetThreadName(void){return m_ThreadName;}
    int GetThreadID(void){return m_ThreadID;}
    bool SetPriority(int priority);
    int GetPriority(void);
    int GetConcurrency(void);
    void SetConcurrency(int num);
    bool Detach(void);
    bool Join(void);
    bool Yield(void);
    int Self(void);
};
 

  線程的狀態能夠分爲四種,空閒、忙碌、掛起、終止(包括正常退出和非正常退出)。因爲目前Linux線程庫不支持掛起操做,所以,咱們的此處的掛起操做相似於暫停。若是線程建立後不想當即執行任務,那麼咱們能夠將其「暫停」,若是須要運行,則喚醒。有一點必須注意的是,一旦線程開始執行任務,將不能被掛起,其將一直執行任務至完畢。
  線程類的相關操做均十分簡單。線程的執行入口是從Start()函數開始,其將調用函數ThreadFunction,ThreadFunction再調用實際的Run函數,執行實際的任務。函數

  CThreadPoolthis

  CThreadPool是線程的承載容器,通常能夠將其實現爲堆棧、單向隊列或者雙向隊列。在咱們的系統中咱們使用STL Vector對線程進行保存。CThreadPool的實現代碼以下:spa

 
class CThreadPool {
    friend class CWorkerThread;
private:
    unsigned int m_MaxNum; //the max thread num that can create at the same time
    unsigned int m_AvailLow; //The min num of idle thread that shoule kept
    unsigned int m_AvailHigh; //The max num of idle thread that kept at the same time
    unsigned int m_AvailNum; //the normal thread num of idle num;
    unsigned int m_InitNum; //Normal thread num;
protected:
    CWorkerThread* GetIdleThread(void); 
    void AppendToIdleList(CWorkerThread* jobthread);
    void MoveToBusyList(CWorkerThread* idlethread);
    void MoveToIdleList(CWorkerThread* busythread);
    void DeleteIdleThread(int num);
    void CreateIdleThread(int num);
public:
    CThreadMutex m_BusyMutex; //when visit busy list,use m_BusyMutex to lock and unlock
    CThreadMutex m_IdleMutex; //when visit idle list,use m_IdleMutex to lock and unlock
    CThreadMutex m_JobMutex; //when visit job list,use m_JobMutex to lock and unlock
    CThreadMutex m_VarMutex;
    CCondition m_BusyCond; //m_BusyCond is used to sync busy thread list
    CCondition m_IdleCond; //m_IdleCond is used to sync idle thread list
    CCondition m_IdleJobCond; //m_JobCond is used to sync job list
    CCondition m_MaxNumCond;
    vector<CWorkerThread*> m_ThreadList;
    vector<CWorkerThread*> m_BusyList; //Thread List
    vector<CWorkerThread*> m_IdleList; //Idle List
    CThreadPool();
    CThreadPool(int initnum);
    virtual ~CThreadPool(); 
    void SetMaxNum(int maxnum){m_MaxNum = maxnum;}
    int GetMaxNum(void){return m_MaxNum;}
    void SetAvailLowNum(int minnum){m_AvailLow = minnum;}
    int GetAvailLowNum(void){return m_AvailLow;}
    void SetAvailHighNum(int highnum){m_AvailHigh = highnum;}
    int GetAvailHighNum(void){return m_AvailHigh;}
    int GetActualAvailNum(void){return m_AvailNum;}
    int GetAllNum(void){return m_ThreadList.size();}
    int GetBusyNum(void){return m_BusyList.size();}
    void SetInitNum(int initnum){m_InitNum = initnum;}
    int GetInitNum(void){return m_InitNum;}
    void TerminateAll(void);
    void Run(CJob* job,void* jobdata);
};

CThreadPool::CThreadPool() {
    m_MaxNum = 50;
    m_AvailLow = 5;
    m_InitNum=m_AvailNum = 10 ; 
    m_AvailHigh = 20;
    m_BusyList.clear();
    m_IdleList.clear();
    for(int i=0;i<m_InitNum;i++) {
        CWorkerThread* thr = new CWorkerThread();
        thr->SetThreadPool(this);
        AppendToIdleList(thr);
        thr->Start();
    }
}

CThreadPool::CThreadPool(int initnum) {
    assert(initnum>0 && initnum<=30);
    m_MaxNum = 30;
    m_AvailLow = initnum-10>0?initnum-10:3;
    m_InitNum=m_AvailNum = initnum ; 
    m_AvailHigh = initnum+10;
    m_BusyList.clear();
    m_IdleList.clear();
    for(int i=0;i<m_InitNum;i++) {
        CWorkerThread* thr = new CWorkerThread();
        AppendToIdleList(thr);
        thr->SetThreadPool(this);
        thr->Start(); //begin the thread,the thread wait for job
    }
}

CThreadPool::~CThreadPool() {
    TerminateAll();
}

void CThreadPool::TerminateAll() {
    for(int i=0;i < m_ThreadList.size();i++) {
        CWorkerThread* thr = m_ThreadList[i];
        thr->Join();
    }
    return;
}

CWorkerThread* CThreadPool::GetIdleThread(void) {
    while(m_IdleList.size() ==0 )
    m_IdleCond.Wait();
    m_IdleMutex.Lock();
    if(m_IdleList.size() > 0) {
        CWorkerThread* thr = (CWorkerThread*)m_IdleList.front();
        printf("Get Idle thread %d/n",thr->GetThreadID());
        m_IdleMutex.Unlock();
        return thr;
    }
    m_IdleMutex.Unlock();
    return NULL;
}

//add an idle thread to idle list
void CThreadPool::AppendToIdleList(CWorkerThread* jobthread) {
    m_IdleMutex.Lock();
    m_IdleList.push_back(jobthread);
    m_ThreadList.push_back(jobthread);
    m_IdleMutex.Unlock();
}

//move and idle thread to busy thread
void CThreadPool::MoveToBusyList(CWorkerThread* idlethread) {
    m_BusyMutex.Lock();
    m_BusyList.push_back(idlethread);
    m_AvailNum--;
    m_BusyMutex.Unlock();
    
    m_IdleMutex.Lock();
    vector<CWorkerThread*>::iterator pos;
    pos = find(m_IdleList.begin(),m_IdleList.end(),idlethread);
    if(pos !=m_IdleList.end())
        m_IdleList.erase(pos);
    m_IdleMutex.Unlock();
}

void CThreadPool::MoveToIdleList(CWorkerThread* busythread) {
    m_IdleMutex.Lock();
    m_IdleList.push_back(busythread);
    m_AvailNum++;
    m_IdleMutex.Unlock();
    m_BusyMutex.Lock();
    vector<CWorkerThread*>::iterator pos;
    pos = find(m_BusyList.begin(),m_BusyList.end(),busythread);
    if(pos!=m_BusyList.end())
        m_BusyList.erase(pos);
    m_BusyMutex.Unlock();
    m_IdleCond.Signal();
    m_MaxNumCond.Signal();
}

//create num idle thread and put them to idlelist
void CThreadPool::CreateIdleThread(int num) {
    for(int i=0;i<num;i++) {
        CWorkerThread* thr = new CWorkerThread();
        thr->SetThreadPool(this);
        AppendToIdleList(thr);
        m_VarMutex.Lock();
        m_AvailNum++;
        m_VarMutex.Unlock();
        thr->Start(); //begin the thread,the thread wait for job
    }
}

void CThreadPool::DeleteIdleThread(int num)
{
    printf("Enter into CThreadPool::DeleteIdleThread/n");
    m_IdleMutex.Lock();
    printf("Delete Num is %d/n",num);
    for(int i=0;i<num;i++){
        CWorkerThread* thr;
        if(m_IdleList.size() > 0 ){
            thr = (CWorkerThread*)m_IdleList.front();
            printf("Get Idle thread %d/n",thr->GetThreadID());
        }

        vector<CWorkerThread*>::iterator pos;
        pos = find(m_IdleList.begin(),m_IdleList.end(),thr);
        if(pos!=m_IdleList.end())
        m_IdleList.erase(pos);
        m_AvailNum--;
        printf("The idle thread available num:%d /n",m_AvailNum);
        printf("The idlelist num:%d /n",m_IdleList.size());
    }
    m_IdleMutex.Unlock();
}

void CThreadPool::Run(CJob* job,void* jobdata) {
    assert(job!=NULL);

    //if the busy thread num adds to m_MaxNum,so we should wait
    if(GetBusyNum() == m_MaxNum)
        m_MaxNumCond.Wait();

    if(m_IdleList.size()<m_AvailLow) {
        if(GetAllNum()+m_InitNum-m_IdleList.size() < m_MaxNum)
            CreateIdleThread(m_InitNum-m_IdleList.size());
        else
            CreateIdleThread(m_MaxNum-GetAllNum());
    }

    CWorkerThread* idlethr = GetIdleThread();
    if(idlethr !=NULL) {
        idlethr->m_WorkMutex.Lock();
        MoveToBusyList(idlethr);
        idlethr->SetThreadPool(this);
        job->SetWorkThread(idlethr);
        printf("Job is set to thread %d /n",idlethr->GetThreadID());
        idlethr->SetJob(job,jobdata);
    }
}
 

  在CThreadPool中存在兩個鏈表,一個是空閒鏈表,一個是忙碌鏈表。Idle鏈表中存放全部的空閒進程,當線程執行任務時候,其狀態變爲忙碌狀態,同時從空閒鏈表中刪除,並移至忙碌鏈表中。在CThreadPool的構造函數中,咱們將執行下面的代碼:

for(int i=0;i<m_InitNum;i++) {
    CWorkerThread* thr = new CWorkerThread();
    AppendToIdleList(thr);
    thr->SetThreadPool(this);
    thr->Start(); //begin the thread,the thread wait for job
}

  在該代碼中,咱們將建立m_InitNum個線程,建立以後即調用AppendToIdleList放入Idle鏈表中,因爲目前沒有任務分發給這些線程,所以線程執行Start後將本身掛起。
  事實上,線程池中容納的線程數目並非一成不變的,其會根據執行負載進行自動伸縮。爲此在CThreadPool中設定四個變量:
  m_InitNum:處世建立時線程池中的線程的個數。
  m_MaxNum:當前線程池中所容許併發存在的線程的最大數目。
  m_AvailLow:當前線程池中所容許存在的空閒線程的最小數目,若是空閒數目低於該值,代表負載可能太重,此時有必要增長空閒線程池的數目。實現中咱們老是將線程調整爲m_InitNum個。
  m_AvailHigh:當前線程池中所容許的空閒的線程的最大數目,若是空閒數目高於該值,代表當前負載可能較輕,此時將刪除多餘的空閒線程,刪除後調整數也爲m_InitNum個。
  m_AvailNum:目前線程池中實際存在的線程的個數,其值介於m_AvailHigh和m_AvailLow之間。若是線程的個數始終維持在m_AvailLow和m_AvailHigh之間,則線程既不須要建立,也不須要刪除,保持平衡狀態。所以如何設定m_AvailLow和m_AvailHigh的值,使得線程池最大可能的保持平衡態,是線程池設計必須考慮的問題。
  線程池在接受到新的任務以後,線程池首先要檢查是否有足夠的空閒池可用。檢查分爲三個步驟:
  (1)檢查當前處於忙碌狀態的線程是否達到了設定的最大值m_MaxNum,若是達到了,代表目前沒有空閒線程可用,並且也不能建立新的線程,所以必須等待直到有線程執行完畢返回到空閒隊列中。
  (2)若是當前的空閒線程數目小於咱們設定的最小的空閒數目m_AvailLow,則咱們必須建立新的線程,默認狀況下,建立後的線程數目應該爲m_InitNum,所以建立的線程數目應該爲( 當前空閒線程數與m_InitNum);可是有一種特殊狀況必須考慮,就是現有的線程總數加上建立後的線程數可能超過m_MaxNum,所以咱們必須對線程的建立區別對待。

if(GetAllNum()+m_InitNum-m_IdleList.size() < m_MaxNum)
    CreateIdleThread(m_InitNum-m_IdleList.size());
else
    CreateIdleThread(m_MaxNum-GetAllNum());

  若是建立後總數不超過m_MaxNum,則建立後的線程爲m_InitNum;若是超過了,則只建立( m_MaxNum-當前線程總數 )個。
  (3)調用GetIdleThread方法查找空閒線程。若是當前沒有空閒線程,則掛起;不然將任務指派給該線程,同時將其移入忙碌隊列。
當線程執行完畢後,其會調用MoveToIdleList方法移入空閒鏈表中,其中還調用m_IdleCond.Signal()方法,喚醒GetIdleThread()中可能阻塞的線程。
  CJob
  CJob類相對簡單,其封裝了任務的基本的屬性和方法,其中最重要的是Run方法,代碼以下:

 
class CJob {
private:
    int m_JobNo; //The num was assigned to the job
    char* m_JobName; //The job name
    CThread *m_pWorkThread; //The thread associated with the job
public:
    CJob( void );
    virtual ~CJob();

    int GetJobNo(void) const { return m_JobNo; }
    void SetJobNo(int jobno){ m_JobNo = jobno;}
    char* GetJobName(void) const { return m_JobName; }
    void SetJobName(char* jobname);
    CThread *GetWorkThread(void){ return m_pWorkThread; }
    void SetWorkThread ( CThread *pWorkThread ){
        m_pWorkThread = pWorkThread;
    }
    virtual void Run ( void *ptr ) = 0;
};

CJob::CJob(void):m_pWorkThread(NULL),m_JobNo(0),m_JobName(NULL){}

CJob::~CJob(){
    if(NULL != m_JobName)
        free(m_JobName);
}

void CJob::SetJobName(char* jobname) {
    if(NULL !=m_JobName) {
        free(m_JobName);
        m_JobName = NULL;
    }
    if(NULL !=jobname) {
        m_JobName = (char*)malloc(strlen(jobname)+1);
        strcpy(m_JobName,jobname);
    }
}
 

4. 線程池使用示例

  至此咱們給出了一個簡單的與具體任務無關的線程池框架。使用該框架很是的簡單,咱們所須要的作的就是派生CJob類,將須要完成的任務實如今Run方法中。而後將該Job交由CThreadManage去執行。下面咱們給出一個簡單的示例程序:

複製代碼
class CXJob: public CJob {
public:
    CXJob(){i=0;}
    ~CXJob(){}
    void Run(void* jobdata) {
        printf("The Job comes from CXJOB/n");
        sleep(2);
    }
};

class CYJob: public CJob {
public:
    CYJob(){i=0;}
    ~CYJob(){}
    void Run(void* jobdata) {
        printf("The Job comes from CYJob/n");
    }
};

void main() {
    CThreadManage* manage = new CThreadManage(10);
    for(int i=0;i<40;i++) {
        CXJob* job = new CXJob();
        manage->Run(job,NULL);
    }
    sleep(2);
    CYJob* job = new CYJob();
    manage->Run(job,NULL);
    manage->TerminateAll();
}

  CXJob和CYJob都是從Job類繼承而來,其都實現了Run接口。CXJob只是簡單的打印一句」The Job comes from CXJob」,CYJob也只打印」The Job comes from CYJob」,而後均休眠2秒鐘。在主程序中咱們初始建立10個工做線程。而後分別執行40次CXJob和一次CYJob。


5. 線程池使用後記

線程池適合場合  事實上,線程池並非萬能的。它有其特定的使用場合。線程池致力於減小線程自己的開銷對應用所產生的影響,這是有前提的,前提就是線程自己開銷與線程執行任務相比不可忽略。若是線程自己的開銷相對於線程任務執行開銷而言是能夠忽略不計的,那麼此時線程池所帶來的好處是不明顯的,好比對於FTP服務器以及Telnet服務器,一般傳送文件的時間較長,開銷較大,那麼此時,咱們採用線程池未必是理想的方法,咱們能夠選擇「即時建立,即時銷燬」的策略。  總之線程池一般適合下面的幾個場合:  (1) 單位時間內處理任務頻繁並且任務處理時間短  (2) 對實時性要求較高。若是接受到任務後在建立線程,可能知足不了實時要求,所以必須採用線程池進行預建立。  (3) 必須常常面對高突發性事件,好比Web服務器,若是有足球轉播,則服務器將產生巨大的衝擊。此時若是採起傳統方法,則必須不停的大量產生線程,銷燬線程。此時採用動態線程池能夠避免這種狀況的發生。

相關文章
相關標籤/搜索