最近一段時間閱讀了muduo源碼,讀完的感覺有一個感覺就是有點亂。固然不是說代碼亂,是我可能尚未徹底消化和理解。爲了更好的學習這個庫,仍是要來寫一些東西促進一下。linux
我一邊讀一邊嘗試在一些地方改用c++11的新特性,這個工做持續在進行中。爲啥這麼幹?沒什麼理由,純粹是爲了學習。c++
注:本文的大部分代碼和圖文都來自《Linux多線程服務端編程》,可直接參考muduo的源碼,或者參考我這裏抄着玩兒的版本。git
什麼是Reactor? 換個名詞「non-blocking IO + IO multiplexing」,意思就顯而易見了。Reactor模式用非阻塞IO+poll(epoll)函數來處理併發,程序的基本結構是一個事件循環,以事件驅動和事件回調的方式實現業務邏輯。github
while(!done) { int retval = poll(fds,nfds,timeout) if(retval < 0) 處理錯誤,回調用戶的error handler else{ 處理到期的timers,回調用戶的timer handler if(retval > 0){ 處理IO事件,毀掉用戶的IO event handler } } }
這段代碼形式上很是簡單,跟我上一篇文章epoll的例子十分類似,除了沒有處理超時timer部分。在muduo的實現中,定時器使用了linux平臺的timerfd_*系列函數, timers和其它IO統一了起來。編程
muduo的Reactor核心主要由Channel、EventLoop、Poller、TimerQueue這幾個類完成。乍一看還有一點繞,代碼裏面各類回掉函數看起來有點不直觀。另外,這幾個類的生命週期也值得注意,容易理不清楚。數組
Channel類比較簡單,負責IO事件分發,每個Channel對象都對應了一個fd,它的核心成員以下:網絡
EventLoop* loop_; const int fd_; int events_; int revents_; int index_; ReadEventCallback readCallback_; EventCallback writeCallback_; EventCallback errorCallback_; EventCallback closeCallback_;
幾個callback函數都是c++新標準裏面的function對象(muduo裏面是Boost::function),它們會在handleEvent這個成員函數中根據不一樣的事件被調用。index_是poller類中pollfds_數組的下標。events_和revents_明顯對應了struct pollfd結構中的成員。須要注意的是,Channel並不擁有該fd,它不會在析構函數中去關閉這個fd(fd是由Socket類的析構函數中關閉,即RAII的方法),Channel的生命週期由其owner負責。數據結構
Poller類在這裏是poll函數的封裝(在muduo源碼裏面是抽象基類,支持poll和epoll),它有兩個核心的數據成員:多線程
typedef std::vector<struct pollfd> PollFdList; typedef std::map<int, Channel*> ChannelMap; // fd to Channel PollFdList pollfds_; ChannelMap channels_;
ChannelMap是fd到Channel類的映射,PollFdList保存了每個fd所關心的事件,用做參數傳遞到poll函數中,Channel類裏面的index_便是這裏的下標。Poller類有下面四個函數併發
Timestamp poll(int timeoutMs, ChannelList* activeChannels); void updateChannel(Channel* channel); void removeChannel(Channel* channel); private: void fillActiveChannels(int numEvents, ChannelList* activeChannels) const;
updateChannel和removeChannel都是對上面兩個數據結構的操做,poll函數是對::poll的封裝。私有的fillActiveChannels函數負責把返回的活動時間添加到activeChannels(vector<Channel*>)這個結構中,返回給用戶。Poller的職責也很簡單,負責IO multiplexing,一個EventLoop有一個Poller,Poller的生命週期和EventLoop同樣長。
EventLoop類是核心,大多數類都會包含一個EventLoop*的成員,由於全部的事件都會在EventLoop::loop()中經過Channel分發。先來看一下這個loop循環:
while (!quit_) { activeChannels_.clear(); pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); for (ChannelList::iterator it = activeChannels_.begin(); it != activeChannels_.end(); ++it) { (*it)->handleEvent(pollReturnTime_); } doPendingFunctors(); }
handleEvent是Channel類的成員函數,它會根據事件的類型去調用不一樣的Callback。循環末尾還有一個doPendingFunctors(),這個函數的做用在後面多線程的部分去說明。
由上面三個類已經能夠構成Reactor的核心,整個流程以下:
用戶經過Channel向Poller類註冊fd和關心的事件
EventLoop從poll中返回活躍的fd和對應的Channel
經過Channel去回掉相應的時間。
muduo的書裏面有一個時序圖(8-1),很清楚的說明了整個流程。
muduo的定時器直接使用了標準的容器庫set來管理。先看一下TimerQueue:
typedef std::shared_ptr<Timer> TimerPtr; typedef std::pair<Timestamp, TimerPtr> Entry; typedef std::set<Entry> TimerList; Channel timerfdChannel_; const int timerfd_; TimerList timers_;
採用std::pair<Timestamp, TimerPtr> 加上set的 的形式是爲了處理兩個Timer同事到期的狀況,即便到期時間相同,它們的地址也不一樣。timerfdChannel_是用來管理timerfd_create函數建立的fd。Timer類裏面包含了一個回調函數和一個到期時間。expiration_就是上面Entry中的Timestamp。
const TimerCallback callback_; Timestamp expiration_;
這樣整個思路就很清晰了:
用一個set來保存全部的事件和時間
根據set集合裏面最先的時間來更新timerfd_的到期時間(用timerfd_settime函數)
時間到期後,EventLoop的poll函數會返回,並調用timerfdChannel_裏面的handleEvent回調函數。
經過handleEvent這個回調函數,再去處理到期的全部事件。
timerfdChannel_.setReadCallback( std::bind(&TimerQueue::handleRead,this)); timerfdChannel_.enableReading();
timerfdChannel_的callback函數註冊了TimerQueue的handleRead函數。在handleRead中應該幹什麼就很明顯了,天然是撈出全部到期的timer,一次去執行對應的事件:
void TimerQueue::handleRead() { loop_->assertInLoopThread(); Timestamp now(Timestamp::now()); readTimerfd(timerfd_, now); std::vector<Entry> expired = getExpired(now); // safe to callback outside critical section for (std::vector<Entry>::iterator it = expired.begin(); it != expired.end(); ++it) { it->second->run(); } reset(expired, now); }
至此爲止,單線程的Reator就已經完成了。總感受muduo這種事件回調的代碼風格,讀起來比較繞,不夠直觀。不知道其餘的Reactor模式的網絡程序會不會也是這種感受。
多線程本質上是困難的,由於它強迫你的大腦去思考兩件事情同時發生會出現的各類狀況。我目前感受除了看別人的經驗和總結,沒有什麼技巧或者方法論來解決多線程的問題。
一個線程一個EventLoop,每一個線程都有本身管理的各類ChannelList和TimerQueue。有時候,咱們總有一些需求,要在各個線程之間調配任務。好比添加一個定時時間到IO線程中,這樣TimerQueue就有兩個線程同時訪問。我認爲muduo在處理上鎖的問題上,很值得學習。
先來看幾個EventLoop裏面重要的函數和成員:
std::vector<Functor> pendingFunctors_; // @GuardedBy mutex_ void EventLoop::runInLoop(Functor&& cb) { if (isInLoopThread()) { cb(); } else { queueInLoop(std::move(cb)); } } void EventLoop::queueInLoop(Functor&& cb) { { MutexLockGuard lock(mutex_); pendingFunctors_.push_back(cb); } if (!isInLoopThread() || callingPendingFunctors_) { wakeup(); } }
注意這裏的函數參數,我用到了C++11的右值引用。
在前面的EventLoop::loop裏面,咱們已經看到了doPendingFunctors()這個函數,EventLoop還有一個重要的成員pendingFunctors_,該成員是暴露給其餘線程的。這樣,其餘線程向IO線程添加定時時間的流程就是:
其餘線程調用runInLoop(),
若是不是當前IO線程,再調用queueInLoop()
在queueLoop中,將時間push到pendingFunctors_中,並喚醒當前IO線程
注意這裏的喚醒條件:不是當前IO線程確定要喚醒;此外,若是正在調用Pending functor,也要喚醒;(爲何?,由於若是正在執行PendingFunctor裏面,若是也執行了queueLoop,若是不喚醒的話,新加的cb就不會當即執行了。)
如今來看一下doPendingFunctors()這個函數:
void EventLoop::doPendingFunctors() { std::vector<Functor> functors; callingPendingFunctors_ = true; { // reduce the lenth of the critical section // avoid the dead lock cause the functor can call queueInloop(;) MutexLockGuard lock(mutex_); functors.swap(pendingFunctors_); } for (size_t i = 0; i < functors.size(); ++i) { functors[i](); } callingPendingFunctors_ = false; }
doPendingFunctors並無直接在臨界區去執行functors,而是利用了一個棧對象,把事件swap到棧對象中,再去執行。這樣作有兩個好處:
減小了臨界區的長度,其它線程調用queueInLoop對pendingFunctors加鎖時,就不會被阻塞
避免了死鎖,可能在functors裏面也會調用queueInLoop(),從而形成死鎖。
回過頭來看,muduo在處理多線程加鎖訪問共享數據的策略上,有一個很重要的原則:拼命減小臨界區的長度
試想一下,若是沒有pendingFunctors_這個數據成員,咱們要想往TimerQueue中添加timer,確定要對TimerQueue裏面的insert函數加鎖,形成鎖的爭用,而pendingFunctors_這個成員將鎖的範圍減小到了一個vector的push_back操做上。此外,在doPendingFunctors中,利用一個棧對象減小臨界區,也是很巧妙的一個重要技巧。
前面說到喚醒IO線程,EventLoop阻塞在poll函數上,怎麼去喚醒及時它?之前的作法是利用pipe,向pipe中寫一個字節,監視在這個pipe的讀事件的poll函數就會馬上返回。在muduo中,採用了linux中eventfd調用
static int createEventfd() { int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); if (evtfd < 0) { LOG_SYSERR << "Failed in eventfd"; abort(); } return evtfd; } void EventLoop::wakeup() { uint64_t one = 1; ssize_t n = ::write(wakeupFd_, &one, sizeof one); if (n != sizeof one) { LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; } }
把eventfd獲得的fd和前面同樣,經過Channel註冊到poll裏面,喚醒的時候,只須要向wakeupFd中寫入一個字節,就能達到喚醒的目的。eventfd、timerfd都體現了linux的設計哲學,Everyting is a fd。
關於muduod的Reactor模式,如今我終於有一些理解了。關於TCPServer部分,下一篇再寫。下一步,我會繼續研究muduo的HTTP示例,並嘗試擴展它。