[zz]muduo源碼閱讀之Thread和ThreadPool

[source_address] https://blog.dujiong.net/2016/07/17/muduo-6/數據結構

muduo源碼閱讀之Thread和ThreadPool多線程

在muduo的one loop per thread + thread pool模型中,線程和線程池應該是其中最基礎也是最重要的兩個組件了。因此,本文深刻代碼,學習Thread和ThreadPool兩個類的結構和實現。函數

Thread類

__thread關鍵字

學習Thread class以前,先了解一個關鍵字的用法:__thread。
__thread是GCC內置的線程局部存儲設施。它的實現很是高效,比Pthread庫中的pthread_key_t(muduo中ThreadLocal)快不少。__thread變量是表示每一個線程有一份獨立實體,各個線程的變量值互不干擾。__thread只能修飾POD類型,不能修飾class類型,由於沒法自動調用構造函數和析構函數。
Thread類的封裝用到了命名空間CurrentThread,這個空間中定義了和線程相關的一些獨立屬性。oop

namespace CurrentThread
{
    __thread int t_cachedTid = 0;
    __thread char t_tidString[32];
    __thread int t_tidStringLength = 6;
    __thread const char* t_threadName = "unknown";
}

其中,t_cachedTid表示線程的真實id,Pthread庫中提供了pthread_self()獲取當前線程的標識,類型爲pthread_t。可是,pthread_t不必定是數值類型,也多是一個結構體,這帶來了一些問題。
(1)沒法打印輸出pthread_t,由於不知道其確切類型。
(2)沒法比較pthread_t大小或計算hash值。
(3)pthread_t值只在進程內有意義,與操做系統的任務調度之間沒法創建有效關係,Pthread庫只能保證在同一進程以內,同一時刻的各個線程的id不一樣。
因此,muduo採用gettid()系統調用的返回值做爲線程id,muduo中將操做封裝爲gettid()函數。可是,咱們知道,調用系統調用開銷比較大,因此,muduo中採用__thread變量t_cachedTid來存儲,在線程第一次使用tid時經過系統調用得到,存儲在t_cachedTid中,之後使用時再也不須要系統調用了。
t_tidString[32]:用string類型表示tid,以便輸出日誌。
t_tidStringLength:string類型tid的長度。
t_threadName:線程的名字。post

相關的數據結構

在Thread的實現中,還用到了兩個數據結構,一個位ThreadData,用來輔助調用線程執行的函數。另外一個爲ThreadNameInitializer,爲線程的建立作環境準備。其中用到了pthread_atfork(NULL,NULL,&afterfork)。該函數的原型爲
int pthread_atfork(void (*prepare)(void), void (*parent)(void), void (*child)(void));
這是一個跟進程建立有關的函數,爲fork的調用作準備和調用後子進程父進程的初始化。prepare函數在調用fork前執行,parent在調用fork後的父進程中執行,child在調用fork後的子進程中執行。
固然,在實際應用中,多線程不要調用fork(),不然會出現一些問題。由於fork智能克隆當前線程的thread of control,卻不克隆其餘線程。fork()以後,除了當前線程以外,其餘線程都消失了。這樣,會出現不少問題。好比,若是複製了一個lock的mutex,卻沒有複製unlock的線程,那麼在給mutex加鎖時就會出現死鎖。學習

Thread類分析

首先來看Thread類的數據成員和構造函數。this

class Thread : boost::noncopyable
{
    typedef boost::function<void ()> ThreadFunc;
    ...
    private:
        bool started_;
        bool joined_;
        pthread_t pthreadId_;
        boost::shared_ptr<pid_t> tid_;
        ThreadFunc func_;
        string name_;

        static AtomicInt32 numCreated_;
};
Thread::Thread(ThreadFunc&& func, const string& n)
    : started_(false), 
      joined_(false),
      pthreadId_(0),
      tid_(new pid_t(0)),
      func_(func),
      name_(n)
{
      setDefaultName();
}

其中有兩處須要說明一下,首先是shared_ptr<pid> tid_,可能有人會有疑問:爲何這裏要用shared_ptr包裝pid?
緣由是tid_所屬的對象Thread在主線程(A)中建立,而tid_須要在新建立的線程B中進行賦值操做,若是tid使用裸指針的方式傳遞給線程(B),那麼線程A中Thread對象析構(下文)銷燬後,線程B持有的就是一個野指針,因此,在Thread對象中將以shared_ptr包裝。
而後是numCreated_,是一個靜態變量,類型爲AtomicInt32,原子類型,用來表示第幾回建立線程實例,在記錄日誌時可用記錄爲:「線程名+numCreated_」。spa

接下來看Thread的一些接口函數。操作系統

void Thread::start()
{
    started = true;
    detail::ThreadData* data = new detail::ThreadData(func_, name_, tid_);
    if(pthread_create(&pthreadId_, NULL, &detail::startThread, data));
    {
        started_ = false;
        delete data;
        LOG_SYSFATAL << "Failed in pthread_create";
    }
}

Thread::start()將調用pthread_create()建立新線程,detail::startThread()是新線程的入口函數,data是新線程執行的輔助結構體。detail::startThread()調用data->runInThread()執行線程邏輯(func_)。.net

int Thread::join()
{
    assert(started_);
    assert(!joined_);
    joined_ = true;
    return pthread_join(pthreadId_, NULL);
}

Thread::~Thread()
{
    if(started_ && !joined_)
    {
        pthread_detach(pthreadId_);
    }
}

Thread析構的時候沒有銷燬持有的Pthreads句柄(pthread_t),也就是說Thread的析構不會等待線程結束。若是Thread對象的生命期長於線程,而後經過Thread::join()來等待線程結束並釋放線程資源。若是Thread對象的生命期短於線程,那麼析構時會自動detach線程,避免了資源泄露。

ThreadPool類

ThreadPool(線程池)本質上是一個生產者-消費者的模型,在實際中主要完成計算任務。在muduo線程池中有一個存放工做線程的容器ptr_vector,至關於消費者;有一個存聽任務的隊列deque。
任務隊列是有界的,相似於BoundedBlockingQueue,實現時須要兩個條件變量。
如下是ThreadPool的數據成員:

class ThreadPool : boost::noncopyable
{
    typedef boost::function<void ()> Task;
    private:
        MutexLock mutex_;
        Condition notEmpty_;
        Condition notFull_;
        string name_;
        Task threadInitCallback_;
        boost::ptr_vector<muduo::Thread> threads_;
        std::deque<Task> queue_;
        size_t maxQueueSize_;
        bool running_;
}

其中threadInitCallback_可由setThreadInitCallback(const Task& cb)設置,設置回調函數,每次在執行任務前先調用。在線程池開始運行以前,須要先設置任務隊列的大小(調用setMaxQueueSize()),由於運行線程池時,線程會從任務隊列取任務。
接下來是ThreadPool的一些接口函數。

void ThreadPool::start(int numThreads)
{
    assert(threads_.empty());
    running_ = true;
    threads_.reserve(numThreads);
    for (int i = 0; i < numThreads; ++i)
      {
        char id[32];
        snprintf(id, sizeof id, "%d", i+1);
        threads_.push_back(new muduo::Thread(
              boost::bind(&ThreadPool::runInThread, this), name_+id));
        threads_[i].start();
      }
    if(numThreads == 0 && threadInitCallback_)
    {
        threadInitCallback_();
    }
}

void ThreadPool::start(int numThreads)開啓線程池,按照線程數量numThreads_建立工做線程,線程函數爲ThreadPool::runInThread()。

void ThreadPool::runInThread()
{
    try
    {
        if(threadInitCallback_)
        {
            threadInitCallback_();
        }
        while(running_)
        {
            Task task(take());
            if(task)
            {
                task();
            }
        }
    }
    catch(const Exception& ex)
    {
        ...
    }

}

若是設置了threadInitCallback_,則進行執行任務前的一些初始化操做。而後從任務隊列中取任務執行,有可能阻塞,當任務隊列爲空時。

ThreadPool::Task ThreadPool::take()
{
    MutexLockGuard lock(mutex_);
    while(queue_.empty() && running_)
    {
        notEmpty_.wait();
    }
    Task task;
    if(!queue_.empty())
    {
        task = queue_.front();
        queue_.pop_front();
        if(maxQueueSize_ > 0)
        {
            notFull_.notify();
        }
    }
    return task;
}

多線程從消息隊列中取任務的時候,須要加鎖保護。等到隊列非空信號,就取任務。取出以後,便告知任務隊列已經非滿,能夠繼續添加任務。

void ThreadPool::run(const Task& task)
{
    if(threads_.empty())
    {
        task();
    }
    else
    {
        MutexLockGuard lock(mutex_);
        while(isFull())
        {
            notFull_.wait();
        }
        assert(!isFull());
        queue_.push_back(task);
        notEmpty_.notfy();
    }
}

若是ThreadPool沒有子線程(set和start操做在run以前),就在主線程中執行該task,不然,將任務加入到隊列,並通知線程從中取task,若是隊列已滿,便等待。

ThreadPool::~ThreadPool()
{
    if(running_)
    {
        stop();
    }
}
void ThreadPool::stop()
{
    {
        MutexLockGuard lock(mutex_);
        running_ = false;
        notEmpty_.notifyAll();
    }
    for_each(threads_.begin(),threads_.end(),boost::bind(&muduo::Thread::join, _1));
}

最後是ThreadPool的析構函數,在其中調用stop(),喚醒全部等待的線程,而後對線程池中的每個線程執行join()。

總結

以上就是muduo中Thread和ThreadPool類的學習,有不少源碼,有點囉嗦。可是,在muduo的one loop per thread + thread pool模型中,Thread和ThreadPool是很重要的組件,因此須要深刻地掌握。

相關文章
相關標籤/搜索