咱們知道Java語言對於多線程的支持十分豐富,JDK自己提供了不少性能優良的庫,包括ThreadPoolExecutor和ScheduleThreadPoolExecutor等。C++11中的STL也提供了std:thread(然而我尚未看,這裏先佔個坑)還有不少第三方庫的實現。這裏我重複「造輪子」的目的仍是爲了深刻理解C++和Linux線程基礎概念,主要以學習的目的。c++
首先,爲何要使用線程池。由於線程的建立、和清理都是須要耗費系統資源的。咱們知道Linux中線程其實是由輕量級進程實現的,相對於純理論上的線程這個開銷仍是有的。假設某個線程的建立、運行和銷燬的時間分別爲T一、T二、T3,當T1+T3的時間相對於T2不可忽略時,線程池的就有必要引入了,尤爲是處理數百萬級的高併發處理時。線程池提高了多線程程序的性能,由於線程池裏面的線程都是現成的並且可以重複使用,咱們不須要臨時建立大量線程,而後在任務結束時又銷燬大量線程。一個理想的線程池可以合理地動態調節池內線程數量,既不會由於線程過少而致使大量任務堆積,也不會由於線程過多了而增長額外的系統開銷。多線程
其實線程池的原理很是簡單,它就是一個很是典型的生產者消費者同步問題。根據剛纔描述的線程池的功能,能夠看出線程池至少有兩個主要動做,一個是主程序不定時地向線程池添加任務,另外一個是線程池裏的線程領取任務去執行。且不論任務和執行任務是個什麼概念,可是一個任務確定只能分配給一個線程執行。這樣就能夠簡單猜測線程池的一種可能的架構了:主程序執行入隊操做,把任務添加到一個隊列裏面;池子裏的多個工做線程共同對這個隊列試圖執行出隊操做,這裏要保證同一時刻只有一個線程出隊成功,搶奪到這個任務,其餘線程繼續共同試圖出隊搶奪下一個任務。因此在實現線程池以前,咱們須要一個隊列。這裏的生產者就是主程序,生產任務(增長任務),消費者就是工做線程,消費任務(執行、減小任務)。由於這裏涉及到多個線程同時訪問一個隊列的問題,因此咱們須要互斥鎖來保護隊列,同時還須要條件變量來處理主線程通知任務到達、工做線程搶奪任務的問題。架構
通常來講實現一個線程池主要包括如下4個組成部分:併發
流程圖以下:函數
ool.h高併發
#ifndef __THREAD_POOL_H #define __THREAD_POOL_H #include <vector> #include <string> #include <pthread.h> using namespace std; /*執行任務的類:設置任務數據並執行*/ class CTask { protected: string m_strTaskName; //任務的名稱 void* m_ptrData; //要執行的任務的具體數據 public: CTask() = default; CTask(string &taskName): m_strTaskName(taskName), m_ptrData(NULL) {} virtual int Run() = 0; void setData(void* data); //設置任務數據 virtual ~CTask() {} }; /*線程池管理類*/ class CThreadPool { private: static vector<CTask*> m_vecTaskList; //任務列表 static bool shutdown; //線程退出標誌 int m_iThreadNum; //線程池中啓動的線程數 pthread_t *pthread_id; static pthread_mutex_t m_pthreadMutex; //線程同步鎖 static pthread_cond_t m_pthreadCond; //線程同步條件變量 protected: static void* ThreadFunc(void *threadData); //新線程的線程回調函數 static int MoveToIdle(pthread_t tid); //線程執行結束後,把本身放入空閒線程中 static int MoveToBusy(pthread_t tid); //移入到忙碌線程中去 int Create(); //建立線程池中的線程 public: CThreadPool(int threadNum); int AddTask(CTask *task); //把任務添加到任務隊列中 int StopAll(); //使線程池中的全部線程退出 int getTaskSize(); //獲取當前任務隊列中的任務數 }; #endif
2 thread_pool.cpp性能
#include "thread_pool.h" #include <cstdio> void CTask::setData(void* data) { m_ptrData = data; } //靜態成員初始化 vector<CTask*> CThreadPool::m_vecTaskList; bool CThreadPool::shutdown = false; pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER; //線程管理類構造函數 CThreadPool::CThreadPool(int threadNum) { this->m_iThreadNum = threadNum; printf("I will create %d threads.\n", threadNum); Create(); } //線程回調函數 void* CThreadPool::ThreadFunc(void* threadData) { pthread_t tid = pthread_self(); while(1) { pthread_mutex_lock(&m_pthreadMutex); //若是隊列爲空,等待新任務進入任務隊列 while(m_vecTaskList.size() == 0 && !shutdown) pthread_cond_wait(&m_pthreadCond, &m_pthreadMutex); //關閉線程 if(shutdown) { pthread_mutex_unlock(&m_pthreadMutex); printf("[tid: %lu]\texit\n", pthread_self()); pthread_exit(NULL); } printf("[tid: %lu]\trun: ", tid); vector<CTask*>::iterator iter = m_vecTaskList.begin(); //取出一個任務並處理之 CTask* task = *iter; if(iter != m_vecTaskList.end()) { task = *iter; m_vecTaskList.erase(iter); } pthread_mutex_unlock(&m_pthreadMutex); task->Run(); //執行任務 printf("[tid: %lu]\tidle\n", tid); } return (void*)0; } //往任務隊列裏添加任務併發出線程同步信號 int CThreadPool::AddTask(CTask *task) { pthread_mutex_lock(&m_pthreadMutex); m_vecTaskList.push_back(task); pthread_mutex_unlock(&m_pthreadMutex); pthread_cond_signal(&m_pthreadCond); return 0; } //建立線程 int CThreadPool::Create() { pthread_id = new pthread_t[m_iThreadNum]; for(int i = 0; i < m_iThreadNum; i++) pthread_create(&pthread_id[i], NULL, ThreadFunc, NULL); return 0; } //中止全部線程 int CThreadPool::StopAll() { //避免重複調用 if(shutdown) return -1; printf("Now I will end all threads!\n\n"); //喚醒全部等待進程,線程池也要銷燬了 shutdown = true; pthread_cond_broadcast(&m_pthreadCond); //清楚殭屍 for(int i = 0; i < m_iThreadNum; i++) pthread_join(pthread_id[i], NULL); delete[] pthread_id; pthread_id = NULL; //銷燬互斥量和條件變量 pthread_mutex_destroy(&m_pthreadMutex); pthread_cond_destroy(&m_pthreadCond); return 0; } //獲取當前隊列中的任務數 int CThreadPool::getTaskSize() { return m_vecTaskList.size(); }
3 main.cpp學習
#include "thread_pool.h" #include <cstdio> #include <stdlib.h> #include <unistd.h> class CMyTask: public CTask { public: CMyTask() = default; int Run() { printf("%s\n", (char*)m_ptrData); int x = rand()%4 + 1; sleep(x); return 0; } ~CMyTask() {} }; int main() { CMyTask taskObj; char szTmp[] = "hello!"; taskObj.setData((void*)szTmp); CThreadPool threadpool(5); //線程池大小爲5 for(int i = 0; i < 10; i++) threadpool.AddTask(&taskObj); while(1) { printf("There are still %d tasks need to handle\n", threadpool.getTaskSize()); //任務隊列已沒有任務了 if(threadpool.getTaskSize()==0) { //清除線程池 if(threadpool.StopAll() == -1) { printf("Thread pool clear, exit.\n"); exit(0); } } sleep(2); printf("2 seconds later...\n"); } return 0; }
4 Makefilethis
CC:= g++ TARGET:= threadpool INCLUDE:= -I./ LIBS:= -lpthread # C++語言編譯參數 CXXFLAGS:= -std=c++11 -g -Wall -D_REENTRANT # C預處理參數 # CPPFLAGS:= OBJECTS :=thread_pool.o main.o $(TARGET): $(OBJECTS) $(CC) -o $(TARGET) $(OBJECTS) $(LIBS) # $@表示全部目標集 %.o:%.cpp $(CC) -c $(CXXFLAGS) $(INCLUDE) $< -o $@ .PHONY : clean clean: -rm -f $(OBJECTS) $(TARGET)
5 輸出結果spa
I will create 5 threads. There are still 10 tasks need to handle [tid: 140056759576320] run: hello! [tid: 140056751183616] run: hello! [tid: 140056742790912] run: hello! [tid: 140056734398208] run: hello! [tid: 140056767969024] run: hello! 2 seconds later... There are still 5 tasks need to handle [tid: 140056742790912] idle [tid: 140056742790912] run: hello! [tid: 140056767969024] idle [tid: 140056767969024] run: hello! [tid: 140056751183616] idle [tid: 140056751183616] run: hello! [tid: 140056759576320] idle [tid: 140056759576320] run: hello! [tid: 140056751183616] idle [tid: 140056751183616] run: hello! [tid: 140056734398208] idle 2 seconds later... There are still 0 tasks need to handle Now I will end all threads! 2 seconds later... [tid: 140056734398208] exit [tid: 140056767969024] idle [tid: 140056767969024] exit [tid: 140056759576320] idle [tid: 140056759576320] exit [tid: 140056751183616] idle [tid: 140056751183616] exit [tid: 140056742790912] idle [tid: 140056742790912] exit 2 seconds later... There are still 0 tasks need to handle Thread pool clear, exit.
擴展資料: