線程池實現
Table of Contents
1 基本設計思路
咱們首先設計TPThread類,用於管理單個線程的屬性和方法;有了TPThread表示的線程以後,咱們定義ThreadPool類,用以管理一組TPThread對象,此處所說的管理包括:針對全部TPThread線程的建立、銷燬以及調度。算法
咱們怎麼將須要在線程中調用的業務邏輯代碼接入線程池呢?選擇之一是利用TPThread提供多態函數,將業務邏輯代碼嵌入TPThread的子對象。可是這樣作,在每次將併發任務代理給線程池以後,TPThread將會綁定到固定的業務邏輯上。更靈活的設計是分離出一個專門用於代理併發任務的TPTask類。緩存
那麼,如今的線程池結構是:一個全局的ThreadPool對象,在它初始化的時候會建立一組初始數量的TPThread對象,用戶的併發業務邏輯在TPTask的子對象中實現,這樣的子對象一樣也都交給全局的ThreadPool對象管理,ThreadPool中當前空閒的TPThread線程將被分配去處理TPTask任務。服務器
2 使用線程池的優點
設計線程池的初衷是規避頻繁建立和銷燬線程的開銷。多線程
但從上面的設計來看,除了性能方面的優點以外,利用線程池,用戶一般只需一個相似 addTask 的方法便可快捷地添加併發任務;併發線程與主線程的交互也將更加簡單,這將在接下來的源碼中體現出來。併發
綜上所述,線程池對比操做系統提供的原始線程控制原語,它不只下降了線程頻繁建立、銷燬的性能開銷,也爲用戶提供了更簡易明瞭的操做接口。框架
3 TPTask
咱們將爲TPTask類提供兩個函數:一個純虛函數,用於提供給繼承該對象的用戶子對象重寫該方法以嵌入併發的業務邏輯;一個有默認實現的在主線程中被調用的虛函數。less
class TPTask { public: enum TPTaskState { TPTask_Completed = 0, // 一個任務已經完成 TPTask_ContinueMainThread = 1, // 繼續在主線程執行 TPTask_ContinueChildThread = 2, // 繼續在子線程執行 }; virtual bool process() = 0; virtual TPTask::TPTaskState presentMainThread() { return TPTask::TPTask_Completed; } };
presentMainThread 函數是併發線程與主線程交互的接口,它將在 TPThread 對象的 process 函數執行以後的某一時刻在主線程中被調用。函數
4 TPThread
class ThreadPool; class TPThread { public: friend class ThreadPool; enum EThreadState { ThreadState_Stop = -1, ThreadState_Sleep = 0, ThreadState_Busy = 1, ThreadState_End = 2 }; TPThread(ThreadPool* threadPool, int threadWaitSecond = 0) :mThreadWaitSecond(threadWaitSecond) ,mpCurrTask(NULL) ,mpThreadPool(threadPool) { mState = ThreadState_Sleep; initCond(); initMutex(); } virtual ~TPThread() { deleteCond(); deleteMutex(); } THREAD_ID createThread(void); bool join(void); void onTaskCompleted(void); // 線程通知 等待條件信號 bool onWaitCondSignal(void); virtual TPTask* tryGetTask(void); #if PLATFORM == PLATFORM_WIN32 static unsigned __stdcall threadFunc(void *arg); #else static void* threadFunc(void* arg); #endif int sendCondSignal(void) { return THREAD_SINGNAL_SET(mCond); } virtual void onStart() {} virtual void onEnd() {} virtual void onProcessTaskStart(TPTask* pTask) {} virtual void processTask(TPTask* pTask) { pTask->process(); } virtual void onProcessTaskEnd(TPTask* pTask) {} THREAD_ID id(void) const { return mTid; } void id(THREAD_ID tidp) { mTid = tidp; } virtual void initCond(void) { THREAD_SINGNAL_INIT(mCond); } virtual void initMutex(void) { THREAD_MUTEX_INIT(mMutex); } virtual void deleteCond(void) { THREAD_SINGNAL_DELETE(mCond); } virtual void deleteMutex(void) { THREAD_MUTEX_DELETE(mMutex); } virtual void lock(void) { THREAD_MUTEX_LOCK(mMutex); } virtual void unlock(void) { THREAD_MUTEX_UNLOCK(mMutex); } TPTask* task(void) const { return mpCurrTask; } void task(TPTask* tpt) { mpCurrTask = tpt; } int state(void) const { return mState; } ThreadPool* threadPool() { return mpThreadPool; } virtual std::string printWorkState() { char buf[128]; lock(); sprintf(buf, "%p,%u", mpCurrTask, mDoneTasks); unlock(); return buf; } void resetDoneTasks() { mDoneTasks = 0; } void incDoneTasks() { ++mDoneTasks; } protected: THREAD_ID mTid; // 本線程的ID THREAD_SINGNAL mCond; THREAD_MUTEX mMutex; int mThreadWaitSecond; // 線程空閒狀態超過這個秒數則線程退出, 小於0爲永久線程(秒單位) TPTask *mpCurrTask; // 該線程的當前執行的任務 ThreadPool *mpThreadPool; // 線程池指針 EThreadState mState; // 線程狀態 uint32 mDoneTasks; // 線程啓動一次在未改變到閒置狀態下連續執行的任務計數 };
該對象封裝了Win32和Unix平臺下的線程實現。mpCurrTask指向該線程當前的任務,這不須要用戶操心,它實際是由ThreadPool管理的。post
5 ThreadPool
class ThreadPool { public: ThreadPool(); virtual ~ThreadPool(); void finalise(); void destroy(); /** 建立線程池 @param inewThreadCount: 當系統繁忙時線程池會新增長這麼多線程(臨時) @param inormalMaxThreadCount: 線程池會一直保持這麼多個數的線程 @param imaxThreadCount: 線程池最多隻能有這麼多個線程 */ bool createThreadPool(uint32 inewThreadCount, uint32 inormalMaxThreadCount, uint32 imaxThreadCount); virtual TPThread* createThread(int threadWaitSecond = ThreadPool::timeout); void bufferTask(TPTask* tptask); TPTask* popbufferTask(void); bool addFreeThread(TPThread* tptd); bool addBusyThread(TPThread* tptd); void addFiniTask(TPTask* tptask); bool removeHangThread(TPThread* tptd); virtual void onMainThreadTick(); bool hasThread(TPThread* pTPThread); std::string printThreadWorks(); bool addTask(TPTask* tptask); bool addBackgroundTask(TPTask* tptask) { return addTask(tptask); } bool pushTask(TPTask* tptask) { return addTask(tptask); } uint32 currentThreadCount(void) const { return mCurrentThreadCount; } uint32 currentFreeThreadCount(void) const { return mCurrentFreeThreadCount; } bool isThreadCountMax(void) const { return mCurrentThreadCount >= mMaxThreadCount; } bool isBusy(void) const { return mBufferedTaskList.size() > THREAD_BUSY_SIZE; } bool isInitialize(void) const { return mIsInitialize; } bool isDestroyed() const { return mIsDestroyed; } uint32 bufferTaskSize() const { return mBufferedTaskList.size(); } std::queue<TPTask*>& bufferedTaskList() { return mBufferedTaskList; } void lockBufferedTaskList() { THREAD_MUTEX_LOCK(mBufferedTaskListMutex); } void unlockBufferedTaskList() { THREAD_MUTEX_UNLOCK(mBufferedTaskListMutex); } uint32 finiTaskSize() const { return mFiniTaskListCount; } virtual std::string name() const{ return "ThreadPool"; } public: static int timeout; protected: bool mIsInitialize; bool mIsDestroyed; std::queue<TPTask *> mBufferedTaskList; // 系統處於繁忙時還未處理的任務列表 std::list<TPTask *> mFinishedTaskList; // 已經完成的任務列表 size_t mFiniTaskListCount; THREAD_MUTEX mBufferedTaskListMutex; // 處理mBufferedTaskList互斥鎖 THREAD_MUTEX mThreadStateListMutex; // 處理mBufferedTaskList and mFreeThreadList互斥鎖 THREAD_MUTEX mFinishedTaskListMutex; // 處理mFinishedTaskList互斥鎖 std::list<TPThread *> mBusyThreadList; // 繁忙的線程列表 std::list<TPThread *> mFreeThreadList; // 閒置的線程列表 std::list<TPThread *> mAllThreadList; // 全部的線程列表 uint32 mMaxThreadCount; // 最大線程總數 uint32 mExtraNewAddThreadCount; // 若是mNormalThreadCount不足夠使用則會新建立這麼多線程 uint32 mCurrentThreadCount; // 當前線程數 uint32 mCurrentFreeThreadCount; // 當前閒置的線程數 uint32 mNormalThreadCount; // 標準狀態下的線程總數 即:默認狀況下一啓動服務器就開啓這麼多線程,若是線程不足夠,則會新建立一些線程, 最大可以到mMaxThreadCount };
5.1 線程管理
從聲明中能夠看到有三個線程對象列表性能
- mAllThreadList 該容器是 mFreeThreadList 和 mBusyThreadList 容器的並集。
- mFreeThreadList 該容器記錄了當前閒置的線程對象,當用戶調用 addTask 方法添加併發任務對象時,線程池將嘗試從該容器中取出線程對象來執行併發任務。
- mBusyThreadList 當前正在執行併發任務的線程對象將會保存到此容器。
在調用 createThreadPool 初始化線程池時,全部的線程對象都會被添加到 mFreeThreadList 容器, mBusyThreadList 初始化爲空。
在調用線程池的 addTask 方法添加併發任務時,若 mFreeThreadList 非空則會從 mFreeThreadList 列表中取出一個線程對象來執行併發任務,該對象將被轉移到 mBusyThreadList,表示其正在執行任務。
5.2 併發任務管理
線程池成員中跟任務相關的容器有:
- mBufferedTaskList 若是當前沒有閒置的線程,那麼,用戶新增的任務將緩存至此容器。
- mFinishedTaskList 在併發任務對象的 process 方法執行完以後,它將被添加到該容器。 mFinishedTaskList 容器中的併發任務對象將在主線程中被進一步處理,其 presentMainThread 函數將被調用,而後根據返回值來決定相應任務對象的去向。
注意到, mBufferedTaskList 和 mFinishedTaskList 容器保存的都是當前未在執行的併發任務。實際上,正在執行的併發任務對象是直接代理給 TPThread 線程對象的,並未用任何形式的容器去緩存。
6 實現細節
6.1 線程回調函數
- 註冊線程回調函數
THREAD_ID TPThread::createThread(void) { #if PLATFORM == PLATFORM_WIN32 mTid = (THREAD_ID)_beginthreadex(NULL, 0, &TPThread::threadFunc, (void*)this, NULL, 0); #else pthread_create(&mTid, NULL, TPThread::threadFunc, (void*)this); #endif return mTid; }
- 線程回調函數的實現
#if PLATFORM == PLATFORM_WIN32 unsigned __stdcall TPThread::threadFunc(void *arg) #else void* TPThread::threadFunc(void* arg) #endif { TPThread *tptd = static_cast<TPThread *>(arg); ThreadPool *pThreadPool = tptd->threadPool(); bool isRun = true; tptd->resetDoneTasks(); #if PLATFORM == PLATFORM_WIN32 #else pthread_detach(pthread_self()); #endif tptd->onStart(); while(isRun) { if(tptd->task() != NULL) { isRun = true; } else { tptd->resetDoneTasks(); isRun = tptd->onWaitCondSignal(); } if(!isRun || pThreadPool->isDestroyed()) { if(!pThreadPool->hasThread(tptd)) tptd = NULL; goto __THREAD_END__; } TPTask * task = tptd->task(); if(task == NULL) continue; tptd->mState = ThreadState_Busy; while(task && !tptd->threadPool()->isDestroyed()) { tptd->incDoneTasks(); tptd->onProcessTaskStart(task); tptd->processTask(task); // 處理該任務 tptd->onProcessTaskEnd(task); TPTask * task1 = tptd->tryGetTask(); // 嘗試繼續從任務隊列裏取出一個繁忙的未處理的任務 if(!task1) { tptd->onTaskCompleted(); break; } else { pThreadPool->addFiniTask(task); task = task1; tptd->task(task1); } } } __THREAD_END__: if(tptd) { TPTask * task = tptd->task(); if(task) { delete task; } tptd->onEnd(); tptd->mState = ThreadState_End; tptd->resetDoneTasks(); } #if PLATFORM == PLATFORM_WIN32 return 0; #else pthread_exit(NULL); return NULL; #endif }
咱們能夠看到,該函數的核心便是:取出併發任務並執行它。
在剛進入循環時,會執行以下代碼:
if(tptd->task() != NULL) { isRun = true; } else { tptd->resetDoneTasks(); isRun = tptd->onWaitCondSignal(); }
onWaitCondSignal 的實現以下:
bool TPThread::onWaitCondSignal(void) { #if PLATFORM == PLATFORM_WIN32 if(mThreadWaitSecond <= 0) { mState = ThreadState_Sleep; WaitForSingleObject(mCond, INFINITE); ResetEvent(mCond); } else { mState = ThreadState_Sleep; DWORD ret = WaitForSingleObject(mCond, mThreadWaitSecond * 1000); ResetEvent(mCond); // 若是是由於超時了, 說明這個線程好久沒有被用到, 咱們應該註銷這個線程。 // 通知ThreadPool註銷本身 if (ret == WAIT_TIMEOUT) { mpThreadPool->removeHangThread(this); return false; } else if(ret != WAIT_OBJECT_0) { } } #else if(mThreadWaitSecond <= 0) { lock(); mState = ThreadState_Sleep; pthread_cond_wait(&mCond, &mMutex); unlock(); } else { struct timeval now; struct timespec timeout; gettimeofday(&now, NULL); timeout.tv_sec = now.tv_sec + mThreadWaitSecond; timeout.tv_nsec = now.tv_usec * 1000; lock(); mState = ThreadState_Sleep; int ret = pthread_cond_timedwait(&mCond, &mMutex, &timeout); unlock(); // 若是是由於超時了, 說明這個線程好久沒有被用到, 咱們應該註銷這個線程。 if (ret == ETIMEDOUT) { // 通知ThreadPool註銷本身 mpThreadPool->removeHangThread(this); return false; } else if(ret != 0) { } } #endif return true; }
能夠看到,在 tptd->task() 爲空的時候,線程將休眠。若是該線程是在線程池初始化時所建立,那麼,將進入永久休眠,直至被喚醒;若是該線程是在用戶添加任務過程當中因線程池中暫無閒置線程而臨時建立的,那麼,將進入超時休眠。臨時線程在超時喚醒後將被回收,初始線程則會直到線程池銷燬後才被回收。這體如今以下的代碼中:
if(!isRun || pThreadPool->isDestroyed()) { if(!pThreadPool->hasThread(tptd)) tptd = NULL; goto __THREAD_END__; }
在上述預處理以後,線程回調函數將正式進入任務處理流程:
TPTask * task = tptd->task(); if(task == NULL) continue; tptd->mState = ThreadState_Busy; while(task && !tptd->threadPool()->isDestroyed()) { tptd->incDoneTasks(); tptd->onProcessTaskStart(task); tptd->processTask(task); // 處理該任務 tptd->onProcessTaskEnd(task); TPTask * task1 = tptd->tryGetTask(); // 嘗試繼續從任務隊列裏取出一個繁忙的未處理的任務 if(!task1) { tptd->onTaskCompleted(); break; } else { pThreadPool->addFiniTask(task); task = task1; tptd->task(task1); } }
上述循環的退出時機是:當前任務已被執行,而且線程池的 mBufferedTaskList 容器爲空,這表示該線程暫時完成了本身的使命,能夠先休息了。
6.2 線程池管理
6.2.1 線程池初始化
bool ThreadPool::createThreadPool(uint32 inewThreadCount, uint32 inormalMaxThreadCount, uint32 imaxThreadCount) { assert(!mIsInitialize); mExtraNewAddThreadCount = inewThreadCount; mNormalThreadCount = inormalMaxThreadCount; mMaxThreadCount = imaxThreadCount; for(uint32 i = 0; i < mNormalThreadCount; ++i) { TPThread* tptd = createThread(0); if(!tptd) { // ERROR_MSG("ThreadPool::createThreadPool: create is error! \n"); return false; } mCurrentFreeThreadCount++; mCurrentThreadCount++; mFreeThreadList.push_back(tptd); mAllThreadList.push_back(tptd); } mIsInitialize = true; sleepms(100); return true; } TPThread* ThreadPool::createThread(int threadWaitSecond) { TPThread* tptd = new TPThread(this, threadWaitSecond); tptd->createThread(); return tptd; }
線程池的初始化便是建立一批初始數量的線程。在線程初始化過程當中建立的線程將一直延續到線程池銷燬爲止。
6.2.2 併發任務添加
bool ThreadPool::addTask(TPTask* tptask) { THREAD_MUTEX_LOCK(mThreadStateListMutex); if(mCurrentFreeThreadCount > 0) { std::list<TPThread *>::iterator itr = mFreeThreadList.begin(); TPThread* tptd = (TPThread *)(*itr); mFreeThreadList.erase(itr); mBusyThreadList.push_back(tptd); --mCurrentFreeThreadCount; tptd->task(tptask); // 給線程設置新任務 #if PLATFORM == PLATFORM_WIN32 if(tptd->sendCondSignal()== 0){ #else if(tptd->sendCondSignal()!= 0){ #endif THREAD_MUTEX_UNLOCK(mThreadStateListMutex); return false; } THREAD_MUTEX_UNLOCK(mThreadStateListMutex); return true; } bufferTask(tptask); if(isThreadCountMax()) { THREAD_MUTEX_UNLOCK(mThreadStateListMutex); return false; } for(uint32 i = 0; i < mExtraNewAddThreadCount; ++i) { TPThread* tptd = createThread(300); // 設定5分鐘未使用則退出的線程 mAllThreadList.push_back(tptd); // 全部的線程列表 mFreeThreadList.push_back(tptd); // 閒置的線程列表 ++mCurrentThreadCount; ++mCurrentFreeThreadCount; } THREAD_MUTEX_UNLOCK(mThreadStateListMutex); return true; }
若是當前有閒置線程,則直接將併發任務代理給它;若是當前沒有閒置線程,那就先將併發任務緩存起來,未來的空閒線程將經過 tryGetTask() 取出緩存的併發任務。
咱們注意到,在緩存任務以後,會嘗試建立一批新的線程對象,這些線程對象是臨時的,當線程池「悠閒」下來的時候,它們將被回收。這個額外的優化若是利用得好,將會提升線程池的併發度。
6.2.3 與主線程的交互
這是一種常有的需求,併發任務產生的輸出一般須要提交給主線程。咱們提供 onMainThreadTick 方法來達成此目的。
void ThreadPool::onMainThreadTick() { std::vector<TPTask *> finitasks; THREAD_MUTEX_LOCK(mFinishedTaskListMutex); if(mFinishedTaskList.size() == 0) { THREAD_MUTEX_UNLOCK(mFinishedTaskListMutex); return; } std::copy(mFinishedTaskList.begin(), mFinishedTaskList.end(), std::back_inserter(finitasks)); mFinishedTaskList.clear(); THREAD_MUTEX_UNLOCK(mFinishedTaskListMutex); std::vector<TPTask *>::iterator finiiter = finitasks.begin(); for(; finiiter != finitasks.end(); ) { TPTask::TPTaskState state = (*finiiter)->presentMainThread(); switch(state) { case TPTask::TPTask_Completed: delete (*finiiter); finiiter = finitasks.erase(finiiter); --mFiniTaskListCount; break; case TPTask::TPTask_ContinueChildThread: this->addTask((*finiiter)); finiiter = finitasks.erase(finiiter); --mFiniTaskListCount; break; case TPTask::TPTask_ContinueMainThread: THREAD_MUTEX_LOCK(mFinishedTaskListMutex); mFinishedTaskList.push_back((*finiiter)); THREAD_MUTEX_UNLOCK(mFinishedTaskListMutex); ++finiiter; break; default: Assert(false); break; }; } }
注意到,此函數只針對了 mFinishedTaskList 容器中的任務進行了處理,這表示,一個併發任務得在執行了至少一次 process 以後才能與主線程交互。
6.2.4 線程池銷燬
void ThreadPool::destroy() { mIsDestroyed = true; int itry = 0; while(true) { sleepms(300); itry++; std::string taskaddrs = ""; THREAD_MUTEX_LOCK(mThreadStateListMutex); int count = mAllThreadList.size(); std::list<TPThread *>::iterator itr = mAllThreadList.begin(); for(; itr != mAllThreadList.end(); ++itr) { if((*itr)) { if((*itr)->state() != TPThread::ThreadState_End) { (*itr)->sendCondSignal(); } else { count--; } } } THREAD_MUTEX_UNLOCK(mThreadStateListMutex); if(count <= 0) { break; } } THREAD_MUTEX_LOCK(mThreadStateListMutex); sleepms(100); std::list<TPThread*>::iterator itr = mAllThreadList.begin(); for(; itr != mAllThreadList.end(); ++itr) { if((*itr)) { delete (*itr); (*itr) = NULL; } } mAllThreadList.clear(); THREAD_MUTEX_UNLOCK(mThreadStateListMutex); THREAD_MUTEX_LOCK(mFinishedTaskListMutex); if(mFinishedTaskList.size() > 0) { std::list<TPTask*>::iterator finiiter = mFinishedTaskList.begin(); for(; finiiter != mFinishedTaskList.end(); ++finiiter) { delete (*finiiter); } mFinishedTaskList.clear(); mFiniTaskListCount = 0; } THREAD_MUTEX_UNLOCK(mFinishedTaskListMutex); THREAD_MUTEX_LOCK(mBufferedTaskListMutex); if(mBufferedTaskList.size() > 0) { while(mBufferedTaskList.size() > 0) { TPTask* tptask = mBufferedTaskList.front(); mBufferedTaskList.pop(); delete tptask; } } THREAD_MUTEX_UNLOCK(mBufferedTaskListMutex); THREAD_MUTEX_DELETE(mThreadStateListMutex); THREAD_MUTEX_DELETE(mBufferedTaskListMutex); THREAD_MUTEX_DELETE(mFinishedTaskListMutex); }
線程池銷燬時將會等待全部線程正常退出,因此用戶重寫的 process 方法須要在線程池銷燬時所有退出。
7 總結
實現高效且友好的線程池確實須要注意不少細節,但總的來講,線程池技術並不包含多麼高深的算法。任何線程池框架通常都只有以下的幾個簡單執行步驟:
- 預建立一批線程
- 添加併發任務
- 銷燬全部線程