基於Linux/C++簡單線程池的實現 咱們知道Java語言對於多線程的支持十分豐富

咱們知道Java語言對於多線程的支持十分豐富,JDK自己提供了不少性能優良的庫,包括ThreadPoolExecutor和ScheduleThreadPoolExecutor等。C++11中的STL也提供了std:thread(然而我尚未看,這裏先佔個坑)還有不少第三方庫的實現。這裏我重複「造輪子」的目的仍是爲了深刻理解C++和Linux線程基礎概念,主要以學習的目的。c++

首先,爲何要使用線程池。由於線程的建立、和清理都是須要耗費系統資源的。咱們知道Linux中線程其實是由輕量級進程實現的,相對於純理論上的線程這個開銷仍是有的。假設某個線程的建立、運行和銷燬的時間分別爲T一、T二、T3,當T1+T3的時間相對於T2不可忽略時,線程池的就有必要引入了,尤爲是處理數百萬級的高併發處理時。線程池提高了多線程程序的性能,由於線程池裏面的線程都是現成的並且可以重複使用,咱們不須要臨時建立大量線程,而後在任務結束時又銷燬大量線程。一個理想的線程池可以合理地動態調節池內線程數量,既不會由於線程過少而致使大量任務堆積,也不會由於線程過多了而增長額外的系統開銷。多線程

其實線程池的原理很是簡單,它就是一個很是典型的生產者消費者同步問題。根據剛纔描述的線程池的功能,能夠看出線程池至少有兩個主要動做,一個是主程序不定時地向線程池添加任務,另外一個是線程池裏的線程領取任務去執行。且不論任務和執行任務是個什麼概念,可是一個任務確定只能分配給一個線程執行。這樣就能夠簡單猜測線程池的一種可能的架構了:主程序執行入隊操做,把任務添加到一個隊列裏面;池子裏的多個工做線程共同對這個隊列試圖執行出隊操做,這裏要保證同一時刻只有一個線程出隊成功,搶奪到這個任務,其餘線程繼續共同試圖出隊搶奪下一個任務。因此在實現線程池以前,咱們須要一個隊列。這裏的生產者就是主程序,生產任務(增長任務),消費者就是工做線程,消費任務(執行、減小任務)。由於這裏涉及到多個線程同時訪問一個隊列的問題,因此咱們須要互斥鎖來保護隊列,同時還須要條件變量來處理主線程通知任務到達、工做線程搶奪任務的問題。架構

通常來講實現一個線程池主要包括如下4個組成部分:併發

  1. 線程管理器:用於建立並管理線程池。
  2. 工做線程:線程池中實際執行任務的線程。在初始化線程時會預先建立好固定數目的線程在池中,這些初始化的線程通常處於空閒狀態。
  3. 任務接口:每一個任務必須實現的接口。當線程池的任務隊列中有可執行任務時,被空間的工做線程調去執行(線程的閒與忙的狀態是經過互斥量實現的),把任務抽象出來造成一個接口,能夠作到線程池與具體的任務無關。
  4. 任務隊列:用來存放沒有處理的任務。提供一種緩衝機制。實現這種結構有不少方法,經常使用的有隊列和鏈表結構。

流程圖以下:函數

ool.h高併發

#ifndef __THREAD_POOL_H
#define __THREAD_POOL_H

#include <vector>
#include <string>
#include <pthread.h>

using namespace std;

/*執行任務的類:設置任務數據並執行*/
class CTask {
protected:
    string m_strTaskName;   //任務的名稱
    void* m_ptrData;    //要執行的任務的具體數據

public:
    CTask() = default;
    CTask(string &taskName): m_strTaskName(taskName), m_ptrData(NULL) {}
    virtual int Run() = 0;
    void setData(void* data);   //設置任務數據
  
    virtual ~CTask() {}
    
};

/*線程池管理類*/
class CThreadPool {
private:
    static vector<CTask*> m_vecTaskList;    //任務列表
    static bool shutdown;   //線程退出標誌
    int m_iThreadNum;   //線程池中啓動的線程數
    pthread_t *pthread_id;
  
    static pthread_mutex_t m_pthreadMutex;  //線程同步鎖
    static pthread_cond_t m_pthreadCond;    //線程同步條件變量
  
protected:
    static void* ThreadFunc(void *threadData);  //新線程的線程回調函數
    static int MoveToIdle(pthread_t tid);   //線程執行結束後,把本身放入空閒線程中
    static int MoveToBusy(pthread_t tid);   //移入到忙碌線程中去
    int Create();   //建立線程池中的線程
  
public:
    CThreadPool(int threadNum);
    int AddTask(CTask *task);   //把任務添加到任務隊列中
    int StopAll();  //使線程池中的全部線程退出
    int getTaskSize();  //獲取當前任務隊列中的任務數
};

#endif

2 thread_pool.cpp性能

#include "thread_pool.h"
#include <cstdio>

void CTask::setData(void* data) {
    m_ptrData = data;
}

//靜態成員初始化
vector<CTask*> CThreadPool::m_vecTaskList;
bool CThreadPool::shutdown = false;
pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;

//線程管理類構造函數
CThreadPool::CThreadPool(int threadNum) {
    this->m_iThreadNum = threadNum;
    printf("I will create %d threads.\n", threadNum);
    Create();
}

//線程回調函數
void* CThreadPool::ThreadFunc(void* threadData) {
    pthread_t tid = pthread_self();
    while(1)
    {
        pthread_mutex_lock(&m_pthreadMutex);
        //若是隊列爲空,等待新任務進入任務隊列
        while(m_vecTaskList.size() == 0 && !shutdown)
            pthread_cond_wait(&m_pthreadCond, &m_pthreadMutex);
        
        //關閉線程
        if(shutdown)
        {
            pthread_mutex_unlock(&m_pthreadMutex);
            printf("[tid: %lu]\texit\n", pthread_self());
            pthread_exit(NULL);
        }
        
        printf("[tid: %lu]\trun: ", tid);
        vector<CTask*>::iterator iter = m_vecTaskList.begin();
        //取出一個任務並處理之
        CTask* task = *iter;
        if(iter != m_vecTaskList.end())
        {
            task = *iter;
            m_vecTaskList.erase(iter);
        }
        
        pthread_mutex_unlock(&m_pthreadMutex);
        
        task->Run();    //執行任務
        printf("[tid: %lu]\tidle\n", tid);
        
    }
    
    return (void*)0;
}

//往任務隊列裏添加任務併發出線程同步信號
int CThreadPool::AddTask(CTask *task) { 
    pthread_mutex_lock(&m_pthreadMutex);    
    m_vecTaskList.push_back(task);  
    pthread_mutex_unlock(&m_pthreadMutex);  
    pthread_cond_signal(&m_pthreadCond);    
    
    return 0;
}

//建立線程
int CThreadPool::Create() { 
    pthread_id = new pthread_t[m_iThreadNum];
        for(int i = 0; i < m_iThreadNum; i++)
            pthread_create(&pthread_id[i], NULL, ThreadFunc, NULL);
        
    return 0;
}

//中止全部線程
int CThreadPool::StopAll() {    
    //避免重複調用
    if(shutdown)
        return -1;
    printf("Now I will end all threads!\n\n");
    
    //喚醒全部等待進程,線程池也要銷燬了
    shutdown = true;
    pthread_cond_broadcast(&m_pthreadCond);
    
    //清楚殭屍
    for(int i = 0; i < m_iThreadNum; i++)
        pthread_join(pthread_id[i], NULL);
    
    delete[] pthread_id;
    pthread_id = NULL;
    
    //銷燬互斥量和條件變量
    pthread_mutex_destroy(&m_pthreadMutex);
    pthread_cond_destroy(&m_pthreadCond);
    
    return 0;
}

//獲取當前隊列中的任務數
int CThreadPool::getTaskSize() {    
    return m_vecTaskList.size();
}

3 main.cpp學習

#include "thread_pool.h"
#include <cstdio>
#include <stdlib.h>
#include <unistd.h>

class CMyTask: public CTask {   
public:
    CMyTask() = default;    
    int Run() {
        printf("%s\n", (char*)m_ptrData);
        int x = rand()%4 + 1;
        sleep(x);   
        return 0;
    }
    ~CMyTask() {}
};

int main() {
    CMyTask taskObj;
    char szTmp[] = "hello!";
    taskObj.setData((void*)szTmp);
    CThreadPool threadpool(5);  //線程池大小爲5
    
    for(int i = 0; i < 10; i++)
        threadpool.AddTask(&taskObj);
    
    while(1) {
        printf("There are still %d tasks need to handle\n", threadpool.getTaskSize());
        //任務隊列已沒有任務了
        if(threadpool.getTaskSize()==0) {
            //清除線程池
            if(threadpool.StopAll() == -1) {
                printf("Thread pool clear, exit.\n");
                exit(0);
            }
        }
        sleep(2);
        printf("2 seconds later...\n");
    }   
    return 0;
}

4 Makefilethis

CC:= g++
TARGET:= threadpool
INCLUDE:= -I./
LIBS:= -lpthread
# C++語言編譯參數  
CXXFLAGS:= -std=c++11 -g -Wall -D_REENTRANT
# C預處理參數
# CPPFLAGS:=
OBJECTS :=thread_pool.o main.o
  
$(TARGET): $(OBJECTS)
    $(CC) -o $(TARGET) $(OBJECTS) $(LIBS)
  
# $@表示全部目標集  
%.o:%.cpp   
    $(CC) -c $(CXXFLAGS) $(INCLUDE) $< -o $@
  
.PHONY : clean
clean:   
    -rm -f $(OBJECTS) $(TARGET)

5 輸出結果spa

I will create 5 threads.
There are still 10 tasks need to handle
[tid: 140056759576320]  run: hello!
[tid: 140056751183616]  run: hello!
[tid: 140056742790912]  run: hello!
[tid: 140056734398208]  run: hello!
[tid: 140056767969024]  run: hello!
2 seconds later...
There are still 5 tasks need to handle
[tid: 140056742790912]  idle
[tid: 140056742790912]  run: hello!
[tid: 140056767969024]  idle
[tid: 140056767969024]  run: hello!
[tid: 140056751183616]  idle
[tid: 140056751183616]  run: hello!
[tid: 140056759576320]  idle
[tid: 140056759576320]  run: hello!
[tid: 140056751183616]  idle
[tid: 140056751183616]  run: hello!
[tid: 140056734398208]  idle
2 seconds later...
There are still 0 tasks need to handle
Now I will end all threads!
2 seconds later...
[tid: 140056734398208]  exit
[tid: 140056767969024]  idle
[tid: 140056767969024]  exit
[tid: 140056759576320]  idle
[tid: 140056759576320]  exit
[tid: 140056751183616]  idle
[tid: 140056751183616]  exit
[tid: 140056742790912]  idle
[tid: 140056742790912]  exit
2 seconds later...
There are still 0 tasks need to handle
Thread pool clear, exit.

擴展資料:

相關文章
相關標籤/搜索