ThreadPool.hhtml
#ifndef __THREADPOOL_H #define __THREADPOOL_H #define HAVE_STRUCT_TIMESPEC //#include "servant/Application.h" #include <vector> #include <string> #include <pthread.h> using namespace std; /** * 執行任務的類,設置任務數據、定義執行方法(純虛函數) */ class CTask { protected: string m_strTaskName; //任務的名稱 void* m_ptrData; //具體數據 public: CTask() {} CTask(string taskName) { m_strTaskName = taskName; m_ptrData = NULL; } virtual int Run() = 0; //任務執行方法 void SetData(void* data); //設置任務數據 public: virtual ~CTask() {} }; /** * 線程結構體 */ struct CThread { pthread_t pthread_id; //線程id int iStat; //線程狀態 CThread() : iStat(0) { } bool operator == (const CThread &obj) const { return (long)&pthread_id == (long)&obj.pthread_id; } }; /** * 線程池管理類的實現 */ class CThreadPool { public: CThreadPool(int threadNum = 10); int AddTask(CTask *task); //把任務添加到任務隊列中 int getTaskSize(); //獲取當前任務隊列中的任務數 int StopAll(); //使線程池中的線程退出 protected: int Create(); //建立線程池中的線程 static void* ThreadFunc(void * threadData); //新線程的線程回調函數 static int MoveToIdle(CThread *pThread); //線程執行結束後,狀態置爲空閒0 static int MoveToBusy(CThread *pThread); //線程開始執行,狀態置爲運行1 private: static vector<CTask*> m_vecTaskList; //任務列表 static bool shutdown; //線程退出標誌 int m_iThreadNum; //線程池中啓動的線程數 static vector<CThread> m_vecThread; //線程列表 static pthread_mutex_t m_pthreadMutex; //線程同步鎖 static pthread_cond_t m_pthreadCond; //線程同步的條件變量 }; #endif
ThreadPool.cppios
#include "ThreadPool.h" #include <iostream> #include <algorithm> using namespace std; void CTask::SetData(void * data) { m_ptrData = data; } vector<CTask*> CThreadPool::m_vecTaskList; //任務列表 bool CThreadPool::shutdown = false; vector<CThread> CThreadPool::m_vecThread; //線程列表 pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER; /** * 線程池管理類構造函數 */ CThreadPool::CThreadPool(int threadNum) { this->m_iThreadNum = threadNum; cout << "threadNum:" << threadNum << " threads will be created." << endl; Create(); //建立線程 } /** * 建立線程 */ int CThreadPool::Create() { m_vecThread.resize(m_iThreadNum); for (size_t i = 0; i < m_vecThread.size(); i++) { pthread_create(&m_vecThread[i].pthread_id, NULL, ThreadFunc, &m_vecThread[i]); } return 0; } /** * 線程回調函數 */ void* CThreadPool::ThreadFunc(void* threadData) { CThread *pThread = (CThread*)threadData; while (1) { pthread_mutex_lock(&m_pthreadMutex); //lock while (m_vecTaskList.size() == 0 && !shutdown) { pthread_cond_wait(&m_pthreadCond, &m_pthreadMutex); /* pthread_cond_wait前要先加鎖 pthread_cond_wait把線程放進阻塞隊列後,內部會解鎖,而後等待條件變量被其它線程喚醒 pthread_cond_wait被喚醒後會再自動加鎖 */ } if (shutdown) { pthread_mutex_unlock(&m_pthreadMutex); cout << "thread:" << (long)&pThread->pthread_id << " will exit." << endl; pthread_exit(NULL); } //線程狀態置1 MoveToBusy(pThread); //取出一個任務 CTask* task = NULL; vector<CTask*>::iterator iter = m_vecTaskList.begin(); if (iter != m_vecTaskList.end()) { task = *iter; m_vecTaskList.erase(iter); } pthread_mutex_unlock(&m_pthreadMutex); //unlock //執行任務 if (task) { task->Run(); } //線程狀態置0 MoveToIdle(pThread); } return (void*)0; } int CThreadPool::MoveToIdle(CThread *pThread) { vector<CThread>::iterator iter_thread = std::find(m_vecThread.begin(), m_vecThread.end(), *pThread); if (iter_thread != m_vecThread.end()) { iter_thread->iStat = 0; cout << "tid:" << (long)&pThread->pthread_id << " idle." << endl; } return 0; } int CThreadPool::MoveToBusy(CThread *pThread) { vector<CThread>::iterator iter_thread = std::find(m_vecThread.begin(), m_vecThread.end(), *pThread); if (iter_thread != m_vecThread.end()) { iter_thread->iStat = 1; cout << "tid:" << (long)&pThread->pthread_id << " run." << endl; } return 0; } /** * 往任務隊列裏邊添加任務併發出線程同步信號 */ int CThreadPool::AddTask(CTask *task) { pthread_mutex_lock(&m_pthreadMutex); this->m_vecTaskList.push_back(task); pthread_cond_signal(&m_pthreadCond); pthread_mutex_unlock(&m_pthreadMutex); return 0; } /** * 獲取當前隊列中任務數 */ int CThreadPool::getTaskSize() { return m_vecTaskList.size(); } /** * 中止全部線程 */ int CThreadPool::StopAll() { /** 避免重複調用 */ if (shutdown) { return -1; } cout << "All threads will be stoped." << endl; /** 喚醒全部等待線程,線程池要銷燬了 */ shutdown = true; pthread_cond_broadcast(&m_pthreadCond); /** 阻塞等待線程退出,不然就成殭屍了 */ for (size_t i = 0; i < m_vecThread.size(); i++) { pthread_join(m_vecThread[i].pthread_id, NULL); } m_vecThread.clear(); /** 銷燬條件變量和互斥體 */ pthread_mutex_destroy(&m_pthreadMutex); pthread_cond_destroy(&m_pthreadCond); return 0; }
main.cppshell
#include <iostream> #include "ThreadPool.h" using namespace std; class CMyTask : public CTask { public: CMyTask() {} inline int Run() { cout << (char*)this->m_ptrData << endl; return 0; } }; int main() { CThreadPool threadPool(10); CMyTask taskObj; char szTmp[] = "this is the first thread running"; taskObj.SetData((void*)szTmp); for (int i = 0; i < 10; i++) { threadPool.AddTask(&taskObj); } while (1) { cout << "there are still " << threadPool.getTaskSize() << " tasks need to handle" << endl; if (threadPool.getTaskSize() == 0) { if (threadPool.StopAll() == -1) { cout << "Now I will exit from main" << endl; return 0; } } } return 0; }
makefliewindows
TARGET:=threadpool INC:= -I./ LIB_PATH:= LIB:= -lpthread CFLAGS:=-Wall -g -O0 -D_REENTRANT -Wl,-rpath=./ $(INC) $(LIB_PATH) CPPFLAGS:=$(CFLAGS) SRC:=$(shell echo *.cpp) OBJ:=$(patsubst %.cpp,%.o,$(SRC)) all: $(TARGET) $(TARGET): $(OBJ) $(CXX) $^ $(CFLAGS) $(LIB) -o $@ clean: rm -f $(OBJ) rm -f $(TARGET)
windows下配置 pthread 參見:https://blog.csdn.net/qianchenglenger/article/details/16907821併發
線程同步、條件變量說明參見:http://www.javashuo.com/article/p-tiflogox-be.html函數