大牛的Linux編程-線程池的設計與實現(詳細完整版)

前言:

假設服務器的硬件資源「充裕」,那麼提升服務器性能的一個很直接的方法就是空間換時間,即「浪費」服務器的硬件資源,以換取其運行效率。提高服務器性能的一個重要方法就是採用「池」的思路,即對一組資源在服務器啓動之初就被徹底建立好並初始化,這稱爲靜態資源分配。當服務器進入正式運行階段,即開始處理客戶端請求時,若是它須要相關資源就能夠直接從池中獲取,無需動態分配。很顯然,直接從池中取得所須要資源比動態分配資源的速度快得多,由於分配系統資源的系統調用都是很耗時的。當服務器處理完一個客戶端鏈接後,能夠把相關資源放回池中,無須執行系統調用釋放資源。從最終效果來看,資源分配和回收的系統調用只發生在服務器的啓動和結束,這種「池」的方式避免了中間的任務處理過程對內核的頻繁訪問,提升了服務器的性能。咱們經常使用的線程池和內存池都是基於以上「池」的優點所設計出來的提高服務器性能的方法,今天打算以C++98設計一個基於Linux系統的簡單線程池。golang

1、提出疑問:爲何要採用線程池?

首先想想,咱們通常的服務器都是動態建立子線程來實現併發服務器的,好比每當有一個客戶端請求創建鏈接時咱們就動態調用pthread_create去建立線程去處理該鏈接請求。這種模式有什麼缺點呢?編程

動態建立線程是比較費時的,這將會致使較慢的客戶響應。
動態建立的子線程一般只用來爲一個客戶服務,這將致使系統上產生大量的細微線程,線程切換也會耗費CPU時間。
因此咱們爲了進一步提高服務器性能,能夠採起「池」的思路,把線程的建立放在程序的初始化階段一次完成,這就避免了動態建立線程致使服務器響應請求的性能降低。安全

2、解決疑問:線程池的設計思路

一、以單例模式設計線程池,保證線程池全劇惟一;
二、在獲取線程池實例進行線程池初始化:線程預先建立+任務隊列建立;
三、建立一個任務類,咱們真實的任務會繼承該類,完成任務執行。服務器

大牛的Linux編程-線程池的設計與實現(詳細完整版)

線程池模式併發

根據以上思路咱們能夠給出這麼一個線程池類的框架:框架

class ThreadPool
{
private:
    std::queue<Task*> taskQueue;   //任務隊列
    bool isRunning;           //線程池運行標誌
    pthread_t* pThreadSet;  //指向線程id集合的指針
    int threadsNum;         //線程數目
    pthread_mutex_t mutex;    //互斥鎖
    pthread_cond_t condition;   //條件變量
    //單例模式,保證全局線程池只有一個
    ThreadPool(int num=10);
    void createThreads();  //建立內存池
    void clearThreads();  //回收線程
    void clearQueue();  //清空任務隊列
    static void* threadFunc(void* arg);
    Task* takeTask();  //工做線程獲取任務
public:
    void addTask(Task* pTask);   //任務入隊
    static ThreadPool* createThreadPool(int num=10); //靜態方法,用於建立線程池實例
    ~ThreadPool();
    int getQueueSize(); //獲取任務隊列中的任務數目
    int getThreadlNum();  //獲取線程池中線程總數目
 
};

(1)單例模式下的線程池的初始化函數

首先咱們以餓漢單例模式來設計這個線程池,以保證該線程池全局惟一:性能

一、構造函數私有化
二、提供一個靜態函數來獲取線程池對象學習

//餓漢模式,線程安全
ThreadPool* ThreadPool::createThreadPool(int num)
{
    static ThreadPool* pThreadPoolInstance = new ThreadPool(num);
    return pThreadPoolInstance;
}
123456

ThreadPool* pMyPool = ThreadPool::createThreadPool(5);
線程池對象初始化時咱們須要作三件事:測試

相關變量的初始化(線程池狀態、互斥鎖、條件變量等)+任務隊列的建立+線程預先建立

ThreadPool::ThreadPool(int num):threadsNum(num)
{
    printf("creating threads pool...n");
    isRunning = true;
    pthread_mutex_init(&mutex, NULL);
    pthread_cond_init(&condition, NULL);
    createThreads();
    printf("created threads pool successfully!n");
}
123456789

線程池的數目根據對象建立時輸入的數目來建立,若是不指定數目,咱們就是使用默認數目10個。

void ThreadPool::createThreads()
{
    pThreadSet = (pthread_t*)malloc(sizeof(pthread_t) * threadsNum);
    for(int i=0;i<threadsNum;i++)
    {
        pthread_create(&pThreadSet[i], NULL, threadFunc, this);
    }
}
12345678

(2)任務添加和線程調度

對於每個服務請求咱們均可以看做是一個任務,一個任務來了咱們就將它送進線程池中的任務隊列中,並經過條件變量的方式通知線程池中的空閒線程去拿任務去完成。那問題來了,這裏的任務在編程的層面上看究竟是什麼?咱們能夠將任務當作是一個回調函數,將要執行的函數指針往任務隊列裏面送就能夠了,咱們線程拿到這個指針後運行該函數就等於完成服務請求。基於以上的考慮,咱們設計了一個單獨的抽象任務類,讓子類繼承。類裏面有個純虛函數run(),用於執行相應操做。

考慮到回調函數須要傳參數進來,因此特地設置了個指針arg來存儲參數地址,到時候咱們就能夠根據該指針解析出傳入的函數實參是什麼了。

任務基類

class Task
{
public:
    Task(void* a = NULL): arg(a)
    {

    }

    void SetArg(void* a) {
        arg = a;
    }

    virtual int run()=0;

protected:
    void* arg;

};
typedef struct
{
    int task_id;
    std::string task_name;
}msg_t;

class MyTask: public Task
{
public:
    int run() {
        msg_t* msg = (msg_t*)arg;
        printf("working thread[%lu] : task_id:%d  task_name:%sn", pthread_self(),
               msg->task_id, msg->task_name.c_str());
        sleep(10);
        return 0;
    }
};
123456789101112131415161718192021222324252627282930313233343536373839

真正使用該類時就本身定義一個子類繼承Task類,並實現run()函數,並經過SetArg()方法去設置傳入的參數。好比能夠這麼用:

msg_t msg[10];
MyTask task_A[10];

//模擬生產者生產任務
for(int i=0;i<10;i++)
{
    msg[i].task_id = i;
    sprintf(buf,"qq_task_%d",i);
    msg[i].task_name = buf;
    task_A[i].SetArg(&msg[i]);
    pMyPool->addTask(&task_A[i]);
    sleep(1);
}
12345678910111213

如今來到線程池設計中最難搞的地方:線程調度。一個任務來了,究竟怎麼讓空閒線程去拿任務去作呢?咱們又如何保證空閒的線程不斷地去拿任務呢?

抽象而言,這是一個生產者消費者的模型,系統不斷往任務隊列裏送任務,咱們經過互斥鎖和條件變量來控制任務的加入和獲取,線程每當空閒時就會去調用takeTask()去拿任務。若是隊列沒任務那麼一些沒得到互斥鎖的線程就會擁塞等待(由於沒鎖),得到互斥鎖的那個線程會由於沒任務而擁塞等待。一旦有任務就會喚醒這個帶鎖線程拿走任務釋放互斥鎖。看看代碼層面是如何操做的:

加入一個任務

void ThreadPool::addTask(Task* pTask)
{
    pthread_mutex_lock(&mutex);
    taskQueue.push(pTask);
    printf("one task is put into queue! Current queue size is %lun",taskQueue.size());
    pthread_mutex_unlock(&mutex);
    pthread_cond_signal(&condition);
}
12345678

取走一個任務

Task* ThreadPool::takeTask()
{
    Task* pTask = NULL;
    while(!pTask)
    {
        pthread_mutex_lock(&mutex);
        //線程池運行正常但任務隊列爲空,那就等待任務的到來
        while(taskQueue.empty() && isRunning)
        {
            pthread_cond_wait(&condition, &mutex);
        }

        if(!isRunning)
        {
            pthread_mutex_unlock(&mutex);
            break;
        }
        else if(taskQueue.empty())
        {
            pthread_mutex_unlock(&mutex);
            continue;
        }

        pTask = taskQueue.front();
        taskQueue.pop();
        pthread_mutex_unlock(&mutex);

    }

    return pTask;
}
12345678910111213141516171819202122232425262728293031

線程中的回調函數。這裏注意的是,若是取到的任務爲空,咱們認爲是線程池關閉的信號(線程池銷燬時咱們會在析構函數中調用pthread_cond_broadcast(&condition)來通知線程來拿任務,拿到的固然是空指針),咱們退出該線程。

void* ThreadPool::threadFunc(void* arg)
{
    ThreadPool* p = (ThreadPool*)arg;
    while(p->isRunning)
    {
        Task* task = p->takeTask();
        //若是取到的任務爲空,那麼咱們結束這個線程
        if(!task)
        {
            //printf("%lu thread will shutdown!n", pthread_self());
            break;
        }

        printf("take one...n");

        task->run();
    }
}
123456789101112131415161718

(3)使用例子和測試

下面給出一個線程池的一個使用例子。能夠看出,我首先定義了msg_t的結構體,這是由於咱們的服務響應函數是帶參數的,因此咱們定義了這個結構體並把其地址做爲參數傳進線程池中去(經過SetArg方法)。而後咱們也定義了一個任務類MyTask繼承於Task,並重寫了run方法。咱們要執行的服務函數就能夠寫在run函數之中。當須要往任務隊列投聽任務時調用addTask()就能夠了,而後線程池會本身安排任務的分發,外界無須關心。因此一個線程池執行任務的過程能夠簡化爲:createThreadPool() -> SetArg() -> addTask -> while(1) -> delete pMyPool

#include <stdio.h>
#include "thread_pool.h"
#include <string>
#include <stdlib.h>

typedef struct
{
    int task_id;
    std::string task_name;
}msg_t;

class MyTask: public Task
{
public:
    int run() {
        msg_t* msg = (msg_t*)arg;
        printf("working thread[%lu] : task_id:%d  task_name:%sn", pthread_self(),
               msg->task_id, msg->task_name.c_str());
        sleep(10);
        return 0;
    }
};

int main() {
    ThreadPool* pMyPool = ThreadPool::createThreadPool(5);
    char buf[32] = {0};

    msg_t msg[10];
    MyTask task_A[10];

    //模擬生產者生產任務
    for(int i=0;i<10;i++)
    {
        msg[i].task_id = i;
        sprintf(buf,"qq_task_%d",i);
        msg[i].task_name = buf;
        task_A[i].SetArg(&msg[i]);
        pMyPool->addTask(&task_A[i]);
        sleep(1);
    }

    while(1)
    {
        //printf("there are still %d tasks need to processn", pMyPool->getQueueSize());
        if (pMyPool->getQueueSize() == 0)
        {
            printf("Now I will exit from mainn");
            break;
        }

        sleep(1);
    }

    delete pMyPool;
    return 0;
}
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758

程序具體運行的邏輯是,咱們創建了一個5個線程大小的線程池,而後咱們又生成了10個任務,往任務隊列裏放。因爲線程數小於任務數,因此當每一個線程都拿到本身的任務時,任務隊列中還有5個任務待處理,而後有些線程處理完本身的任務了,又去隊列裏取任務,直到全部任務被處理完了,循環結束,銷燬線程池,退出程序。
image
本羣免費分享學習資料(C/C++,Linux,golang,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,ffmpeg,TCP/IP,協程,DPDK,嵌入式)等,交流討論領取資料請加羣Q:1106675687。

相關文章
相關標籤/搜索