我設計這個線程池的初衷是爲了與socket對接的。線程池的實現變幻無窮,我得這個並不必定是最好的,但倒是否和我心目中需求模型的。現把部分設計思路和代碼貼出,以期拋磚引玉。我的比較喜歡搞開源,因此你們若是以爲有什麼須要改善的地方,歡迎給予評論。思前想後,也沒啥設計圖能表達出設計思想,就把類圖貼出來吧。 c++
類圖設計以下: eclipse
Command類是咱們的業務類。這個類裏只能存放簡單的內置類型,這樣方便與socket的直接傳輸。我定義了一個cmd_成員用於存放命令字,arg_用於存放業務的參數。這個參數可使用分隔符來分隔各個參數。我設計的只是簡單實現,若是有序列化操做了,徹底不須要使用我這種方法啦。 socket
ThreadProcess就是業務處理類,這裏邊定義了各個方法用於進行業務處理,它將在ThreadPool中的Process函數中調用。 函數
ThreadPool就是咱們的線程池類。其中的成員變量都是靜態變量,Process就是線程處理函數。 測試
#define MAX_THREAD_NUM 50 // 該值目前須要設定爲初始線程數的整數倍
#define ADD_FACTOR 40 // 該值表示一個線程能夠處理的最大任務數
#define THREAD_NUM 10 // 初始線程數 google
bshutdown_:用於線程退出。 編碼
command_:用於存聽任務隊列 spa
command_cond_:條件變量 線程
command_mutex_:互斥鎖 設計
icurr_thread_num_:當前線程池中的線程數
thread_id_map_:這個map用於存放線程對應的其它信息,我只存放了線程的狀態,0爲正常,1爲退出。還能夠定義其它的結構來存放更多的信息,例如存放套接字。
InitializeThreads:用於初始化線程池,先建立THREAD_NUM個線程。後期擴容也須要這個函數。
Process:線程處理函數,這裏邊會調用AddThread和DeleteThread在進行線程池的伸縮。
AddWork:往隊列中添加一個任務。
ThreadDestroy:線程銷燬函數。
AddThread:擴容THREAD_NUM個線程
DeleteThread:若是任務隊列爲空,則將原來的線程池恢復到THREAD_NUM個。這裏能夠根據須要進行修改。
如下貼出代碼以供你們參考。
command.h
#ifndef COMMAND_H_ #define COMMAND_H_ class Command { public: int get_cmd(); char* get_arg(); void set_cmd(int cmd); void set_arg(char* arg); private: int cmd_; char arg_[65]; }; #endif /* COMMAND_H_ */
command.cpp
#include <string.h> #include "command.h" int Command::get_cmd() { return cmd_; } char* Command::get_arg() { return arg_; } void Command::set_cmd(int cmd) { cmd_ = cmd; } void Command::set_arg(char* arg) { if(NULL == arg) { return; } strncpy(arg_,arg,64); arg_[64] = '\0'; }
thread_process.h
#ifndef THREAD_PROCESS_H_ #define THREAD_PROCESS_H_ class ThreadProcess { public: void Process0(void* arg); void Process1(void* arg); void Process2(void* arg); }; #endif /* THREAD_PROCESS_H_ */
thread_process.cpp
#include <pthread.h> #include <stdio.h> #include <unistd.h> #include "thread_process.h" void ThreadProcess::Process0(void* arg) { printf("thread %u is starting process %s\n",pthread_self(),arg); usleep(100*1000); } void ThreadProcess::Process1(void* arg) { printf("thread %u is starting process %s\n",pthread_self(),arg); usleep(100*1000); } void ThreadProcess::Process2(void* arg) { printf("thread %u is starting process %s\n",pthread_self(),arg); usleep(100*1000); }
thread_pool.h
#ifndef THREAD_POOL_H_ #define THREAD_POOL_H_ #include#include#include "command.h" #define MAX_THREAD_NUM 50 // 該值目前須要設定爲初始線程數的整數倍 #define ADD_FACTOR 40 // 該值表示一個線程能夠處理的最大任務數 #define THREAD_NUM 10 // 初始線程數 class ThreadPool { public: ThreadPool() {}; static void InitializeThreads(); void AddWork(Command command); void ThreadDestroy(int iwait = 2); private: static void* Process(void* arg); static void AddThread(); static void DeleteThread(); static bool bshutdown_; static int icurr_thread_num_; static std::map thread_id_map_; static std::vectorcommand_; static pthread_mutex_t command_mutex_; static pthread_cond_t command_cond_; }; #endif /* THREAD_POOL_H_ */
thread_pool.cpp
#include <pthread.h> #include <stdlib.h> #include "thread_pool.h" #include "thread_process.h" #include "command.h" bool ThreadPool::bshutdown_ = false; int ThreadPool::icurr_thread_num_ = THREAD_NUM; std::vectorThreadPool::command_; std::map ThreadPool::thread_id_map_; pthread_mutex_t ThreadPool::command_mutex_ = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t ThreadPool::command_cond_ = PTHREAD_COND_INITIALIZER; void ThreadPool::InitializeThreads() { for (int i = 0; i < THREAD_NUM ; ++i) { pthread_t tempThread; pthread_create(&tempThread, NULL, ThreadPool::Process, NULL); thread_id_map_[tempThread] = 0; } } void* ThreadPool::Process(void* arg) { ThreadProcess threadprocess; Command command; while (true) { pthread_mutex_lock(&command_mutex_); // 若是線程須要退出,則此時退出 if (1 == thread_id_map_[pthread_self()]) { pthread_mutex_unlock(&command_mutex_); printf("thread %u will exit\n", pthread_self()); pthread_exit(NULL); } // 當線程不須要退出且沒有須要處理的任務時,須要縮容的則縮容,不須要的則等待信號 if (0 == command_.size() && !bshutdown_) { if(icurr_thread_num_ > THREAD_NUM) { DeleteThread(); if (1 == thread_id_map_[pthread_self()]) { pthread_mutex_unlock(&command_mutex_); printf("thread %u will exit\n", pthread_self()); pthread_exit(NULL); } } pthread_cond_wait(&command_cond_,&command_mutex_); } // 線程池須要關閉,關閉已有的鎖,線程退出 if(bshutdown_) { pthread_mutex_unlock (&command_mutex_); printf ("thread %u will exit\n", pthread_self ()); pthread_exit (NULL); } // 若是線程池的最大線程數不等於初始線程數,則代表須要擴容 if(icurr_thread_num_ < command_.size())) { AddThread(); } // 從容器中取出待辦任務 std::vector::iterator iter = command_.begin(); command.set_arg(iter->get_arg()); command.set_cmd(iter->get_cmd()); command_.erase(iter); pthread_mutex_unlock(&command_mutex_); // 開始業務處理 switch(command.get_cmd()) { case 0: threadprocess.Process0(command.get_arg()); break; case 1: threadprocess.Process1(command.get_arg()); break; case 2: threadprocess.Process2(command.get_arg()); break; default: break; } } return NULL; // 徹底爲了消除警告(eclipse編寫的代碼,警告很煩人) } void ThreadPool::AddWork(Command command) { bool bsignal = false; pthread_mutex_lock(&command_mutex_); if (0 == command_.size()) { bsignal = true; } command_.push_back(command); pthread_mutex_unlock(&command_mutex_); if (bsignal) { pthread_cond_signal(&command_cond_); } } void ThreadPool::ThreadDestroy(int iwait) { while(0 != command_.size()) { sleep(abs(iwait)); } bshutdown_ = true; pthread_cond_broadcast(&command_cond_); std::map::iterator iter = thread_id_map_.begin(); for (; iter!=thread_id_map_.end(); ++iter) { pthread_join(iter->first,NULL); } pthread_mutex_destroy(&command_mutex_); pthread_cond_destroy(&command_cond_); } void ThreadPool::AddThread() { if(((icurr_thread_num_*ADD_FACTOR) < command_.size()) && (MAX_THREAD_NUM != icurr_thread_num_)) { InitializeThreads(); icurr_thread_num_ += THREAD_NUM; } } void ThreadPool::DeleteThread() { int size = icurr_thread_num_ - THREAD_NUM; std::map::iterator iter = thread_id_map_.begin(); for(int i=0; isecond = 1; } }
main.cpp
#include "thread_pool.h" #include "command.h" int main() { ThreadPool thread_pool; thread_pool.InitializeThreads(); Command command; char arg[8] = {0}; for(int i=1; i<=1000; ++i) { command.set_cmd(i%3); sprintf(arg,"%d",i); command.set_arg(arg); thread_pool.AddWork(command); } sleep(10); // 用於測試線程池縮容 thread_pool.ThreadDestroy(); return 0; }
代碼是按照google的開源c++編碼規範編寫。你們能夠經過改變那幾個宏的值來調整線程池。有問題你們一塊兒討論。