在muduo的one loop per thread + thread pool模型中,線程和線程池應該是其中最基礎也是最重要的兩個組件了。因此,本文深刻代碼,學習Thread和ThreadPool兩個類的結構和實現。函數
Thread類
__thread關鍵字
學習Thread class以前,先了解一個關鍵字的用法:__thread。
__thread是GCC內置的線程局部存儲設施。它的實現很是高效,比Pthread庫中的pthread_key_t(muduo中ThreadLocal)快不少。__thread變量是表示每一個線程有一份獨立實體,各個線程的變量值互不干擾。__thread只能修飾POD類型,不能修飾class類型,由於沒法自動調用構造函數和析構函數。
Thread類的封裝用到了命名空間CurrentThread,這個空間中定義了和線程相關的一些獨立屬性。oop
namespace CurrentThread { __thread int t_cachedTid = 0; __thread char t_tidString[32]; __thread int t_tidStringLength = 6; __thread const char* t_threadName = "unknown"; }
其中,t_cachedTid表示線程的真實id,Pthread庫中提供了pthread_self()獲取當前線程的標識,類型爲pthread_t。可是,pthread_t不必定是數值類型,也多是一個結構體,這帶來了一些問題。
(1)沒法打印輸出pthread_t,由於不知道其確切類型。
(2)沒法比較pthread_t大小或計算hash值。
(3)pthread_t值只在進程內有意義,與操做系統的任務調度之間沒法創建有效關係,Pthread庫只能保證在同一進程以內,同一時刻的各個線程的id不一樣。
因此,muduo採用gettid()系統調用的返回值做爲線程id,muduo中將操做封裝爲gettid()函數。可是,咱們知道,調用系統調用開銷比較大,因此,muduo中採用__thread變量t_cachedTid來存儲,在線程第一次使用tid時經過系統調用得到,存儲在t_cachedTid中,之後使用時再也不須要系統調用了。
t_tidString[32]:用string類型表示tid,以便輸出日誌。
t_tidStringLength:string類型tid的長度。
t_threadName:線程的名字。post
相關的數據結構
在Thread的實現中,還用到了兩個數據結構,一個位ThreadData,用來輔助調用線程執行的函數。另外一個爲ThreadNameInitializer,爲線程的建立作環境準備。其中用到了pthread_atfork(NULL,NULL,&afterfork)
。該函數的原型爲int pthread_atfork(void (*prepare)(void), void (*parent)(void), void (*child)(void));
這是一個跟進程建立有關的函數,爲fork的調用作準備和調用後子進程父進程的初始化。prepare函數在調用fork前執行,parent在調用fork後的父進程中執行,child在調用fork後的子進程中執行。
固然,在實際應用中,多線程不要調用fork(),不然會出現一些問題。由於fork智能克隆當前線程的thread of control,卻不克隆其餘線程。fork()以後,除了當前線程以外,其餘線程都消失了。這樣,會出現不少問題。好比,若是複製了一個lock的mutex,卻沒有複製unlock的線程,那麼在給mutex加鎖時就會出現死鎖。學習
Thread類分析
首先來看Thread類的數據成員和構造函數。this
class Thread : boost::noncopyable { typedef boost::function<void ()> ThreadFunc; ... private: bool started_; bool joined_; pthread_t pthreadId_; boost::shared_ptr<pid_t> tid_; ThreadFunc func_; string name_; static AtomicInt32 numCreated_; }; Thread::Thread(ThreadFunc&& func, const string& n) : started_(false), joined_(false), pthreadId_(0), tid_(new pid_t(0)), func_(func), name_(n) { setDefaultName(); }
其中有兩處須要說明一下,首先是shared_ptr<pid> tid_
,可能有人會有疑問:爲何這裏要用shared_ptr包裝pid?
緣由是tid_所屬的對象Thread在主線程(A)中建立,而tid_須要在新建立的線程B中進行賦值操做,若是tid使用裸指針的方式傳遞給線程(B),那麼線程A中Thread對象析構(下文)銷燬後,線程B持有的就是一個野指針,因此,在Thread對象中將以shared_ptr包裝。
而後是numCreated_,是一個靜態變量,類型爲AtomicInt32,原子類型,用來表示第幾回建立線程實例,在記錄日誌時可用記錄爲:「線程名+numCreated_」。spa
接下來看Thread的一些接口函數。操作系統
void Thread::start() { started = true; detail::ThreadData* data = new detail::ThreadData(func_, name_, tid_); if(pthread_create(&pthreadId_, NULL, &detail::startThread, data)); { started_ = false; delete data; LOG_SYSFATAL << "Failed in pthread_create"; } }
Thread::start()將調用pthread_create()建立新線程,detail::startThread()是新線程的入口函數,data是新線程執行的輔助結構體。detail::startThread()調用data->runInThread()執行線程邏輯(func_)。.net
int Thread::join() { assert(started_); assert(!joined_); joined_ = true; return pthread_join(pthreadId_, NULL); } Thread::~Thread() { if(started_ && !joined_) { pthread_detach(pthreadId_); } }
Thread析構的時候沒有銷燬持有的Pthreads句柄(pthread_t),也就是說Thread的析構不會等待線程結束。若是Thread對象的生命期長於線程,而後經過Thread::join()來等待線程結束並釋放線程資源。若是Thread對象的生命期短於線程,那麼析構時會自動detach線程,避免了資源泄露。
ThreadPool類
ThreadPool(線程池)本質上是一個生產者-消費者的模型,在實際中主要完成計算任務。在muduo線程池中有一個存放工做線程的容器ptr_vector,至關於消費者;有一個存聽任務的隊列deque。
任務隊列是有界的,相似於BoundedBlockingQueue,實現時須要兩個條件變量。
如下是ThreadPool的數據成員:
class ThreadPool : boost::noncopyable { typedef boost::function<void ()> Task; private: MutexLock mutex_; Condition notEmpty_; Condition notFull_; string name_; Task threadInitCallback_; boost::ptr_vector<muduo::Thread> threads_; std::deque<Task> queue_; size_t maxQueueSize_; bool running_; }
其中threadInitCallback_可由setThreadInitCallback(const Task& cb)設置,設置回調函數,每次在執行任務前先調用。在線程池開始運行以前,須要先設置任務隊列的大小(調用setMaxQueueSize()),由於運行線程池時,線程會從任務隊列取任務。
接下來是ThreadPool的一些接口函數。
void ThreadPool::start(int numThreads) { assert(threads_.empty()); running_ = true; threads_.reserve(numThreads); for (int i = 0; i < numThreads; ++i) { char id[32]; snprintf(id, sizeof id, "%d", i+1); threads_.push_back(new muduo::Thread( boost::bind(&ThreadPool::runInThread, this), name_+id)); threads_[i].start(); } if(numThreads == 0 && threadInitCallback_) { threadInitCallback_(); } }
void ThreadPool::start(int numThreads)
開啓線程池,按照線程數量numThreads_建立工做線程,線程函數爲ThreadPool::runInThread()。
void ThreadPool::runInThread() { try { if(threadInitCallback_) { threadInitCallback_(); } while(running_) { Task task(take()); if(task) { task(); } } } catch(const Exception& ex) { ... } }
若是設置了threadInitCallback_,則進行執行任務前的一些初始化操做。而後從任務隊列中取任務執行,有可能阻塞,當任務隊列爲空時。
ThreadPool::Task ThreadPool::take() { MutexLockGuard lock(mutex_); while(queue_.empty() && running_) { notEmpty_.wait(); } Task task; if(!queue_.empty()) { task = queue_.front(); queue_.pop_front(); if(maxQueueSize_ > 0) { notFull_.notify(); } } return task; }
多線程從消息隊列中取任務的時候,須要加鎖保護。等到隊列非空信號,就取任務。取出以後,便告知任務隊列已經非滿,能夠繼續添加任務。
void ThreadPool::run(const Task& task) { if(threads_.empty()) { task(); } else { MutexLockGuard lock(mutex_); while(isFull()) { notFull_.wait(); } assert(!isFull()); queue_.push_back(task); notEmpty_.notfy(); } }
若是ThreadPool沒有子線程(set和start操做在run以前),就在主線程中執行該task,不然,將任務加入到隊列,並通知線程從中取task,若是隊列已滿,便等待。
ThreadPool::~ThreadPool() { if(running_) { stop(); } } void ThreadPool::stop() { { MutexLockGuard lock(mutex_); running_ = false; notEmpty_.notifyAll(); } for_each(threads_.begin(),threads_.end(),boost::bind(&muduo::Thread::join, _1)); }
最後是ThreadPool的析構函數,在其中調用stop(),喚醒全部等待的線程,而後對線程池中的每個線程執行join()。
總結
以上就是muduo中Thread和ThreadPool類的學習,有不少源碼,有點囉嗦。可是,在muduo的one loop per thread + thread pool模型中,Thread和ThreadPool是很重要的組件,因此須要深刻地掌握。