Reactor模式解析——muduo網絡庫

最近一段時間閱讀了muduo源碼,讀完的感覺有一個感覺就是有點亂。固然不是說代碼亂,是我可能尚未徹底消化和理解。爲了更好的學習這個庫,仍是要來寫一些東西促進一下。linux

我一邊讀一邊嘗試在一些地方改用c++11的新特性,這個工做持續在進行中。爲啥這麼幹?沒什麼理由,純粹是爲了學習。c++

注:本文的大部分代碼和圖文都來自《Linux多線程服務端編程》,可直接參考muduo的源碼,或者參考我這裏抄着玩兒的版本。git

Reactor介紹

什麼是Reactor? 換個名詞「non-blocking IO + IO multiplexing」,意思就顯而易見了。Reactor模式用非阻塞IO+poll(epoll)函數來處理併發,程序的基本結構是一個事件循環,以事件驅動和事件回調的方式實現業務邏輯。github

while(!done)
{
    int retval  = poll(fds,nfds,timeout)
    if(retval < 0)
        處理錯誤,回調用戶的error handler
    else{
        處理到期的timers,回調用戶的timer handler
        if(retval > 0){
            處理IO事件,毀掉用戶的IO event handler
        }
    }
}

這段代碼形式上很是簡單,跟我上一篇文章epoll的例子十分類似,除了沒有處理超時timer部分。在muduo的實現中,定時器使用了linux平臺的timerfd_*系列函數, timers和其它IO統一了起來。編程

單線程Reactor實現

muduo的Reactor核心主要由Channel、EventLoop、Poller、TimerQueue這幾個類完成。乍一看還有一點繞,代碼裏面各類回掉函數看起來有點不直觀。另外,這幾個類的生命週期也值得注意,容易理不清楚。數組

1. Channel

Channel類比較簡單,負責IO事件分發,每個Channel對象都對應了一個fd,它的核心成員以下:網絡

EventLoop* loop_;
  const int fd_;
  int events_;
  int revents_;
  int index_;

  ReadEventCallback readCallback_;
  EventCallback writeCallback_;
  EventCallback errorCallback_;
  EventCallback closeCallback_;

幾個callback函數都是c++新標準裏面的function對象(muduo裏面是Boost::function),它們會在handleEvent這個成員函數中根據不一樣的事件被調用。index_是poller類中pollfds_數組的下標。events_和revents_明顯對應了struct pollfd結構中的成員。須要注意的是,Channel並不擁有該fd,它不會在析構函數中去關閉這個fd(fd是由Socket類的析構函數中關閉,即RAII的方法),Channel的生命週期由其owner負責。數據結構

2. Poller

Poller類在這裏是poll函數的封裝(在muduo源碼裏面是抽象基類,支持poll和epoll),它有兩個核心的數據成員:多線程

typedef std::vector<struct pollfd> PollFdList;
  typedef std::map<int, Channel*> ChannelMap;  // fd to Channel
  PollFdList pollfds_;
  ChannelMap channels_;

ChannelMap是fd到Channel類的映射,PollFdList保存了每個fd所關心的事件,用做參數傳遞到poll函數中,Channel類裏面的index_便是這裏的下標。Poller類有下面四個函數併發

Timestamp poll(int timeoutMs, ChannelList* activeChannels);
void updateChannel(Channel* channel);
void removeChannel(Channel* channel);
private:
void fillActiveChannels(int numEvents, ChannelList* activeChannels) const;

updateChannel和removeChannel都是對上面兩個數據結構的操做,poll函數是對::poll的封裝。私有的fillActiveChannels函數負責把返回的活動時間添加到activeChannels(vector<Channel*>)這個結構中,返回給用戶。Poller的職責也很簡單,負責IO multiplexing,一個EventLoop有一個Poller,Poller的生命週期和EventLoop同樣長。

3. EventLoop

EventLoop類是核心,大多數類都會包含一個EventLoop*的成員,由於全部的事件都會在EventLoop::loop()中經過Channel分發。先來看一下這個loop循環:

while (!quit_)
{
    activeChannels_.clear();
    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
    for (ChannelList::iterator it = activeChannels_.begin();
        it != activeChannels_.end(); ++it)
    {
      (*it)->handleEvent(pollReturnTime_);
    }
    doPendingFunctors();
}

handleEvent是Channel類的成員函數,它會根據事件的類型去調用不一樣的Callback。循環末尾還有一個doPendingFunctors(),這個函數的做用在後面多線程的部分去說明。
由上面三個類已經能夠構成Reactor的核心,整個流程以下:

  • 用戶經過Channel向Poller類註冊fd和關心的事件

  • EventLoop從poll中返回活躍的fd和對應的Channel

  • 經過Channel去回掉相應的時間。

muduo的書裏面有一個時序圖(8-1),很清楚的說明了整個流程。

4. TimerQueue

muduo的定時器直接使用了標準的容器庫set來管理。先看一下TimerQueue:

typedef std::shared_ptr<Timer> TimerPtr;
typedef std::pair<Timestamp, TimerPtr> Entry;
typedef std::set<Entry> TimerList;
Channel timerfdChannel_;
const int timerfd_;
TimerList timers_;

採用std::pair<Timestamp, TimerPtr> 加上set的 的形式是爲了處理兩個Timer同事到期的狀況,即便到期時間相同,它們的地址也不一樣。timerfdChannel_是用來管理timerfd_create函數建立的fd。Timer類裏面包含了一個回調函數和一個到期時間。expiration_就是上面Entry中的Timestamp。

const TimerCallback callback_;
Timestamp expiration_;

這樣整個思路就很清晰了:

  • 用一個set來保存全部的事件和時間

  • 根據set集合裏面最先的時間來更新timerfd_的到期時間(用timerfd_settime函數)

  • 時間到期後,EventLoop的poll函數會返回,並調用timerfdChannel_裏面的handleEvent回調函數。

  • 經過handleEvent這個回調函數,再去處理到期的全部事件。

timerfdChannel_.setReadCallback(
    std::bind(&TimerQueue::handleRead,this));
timerfdChannel_.enableReading();

timerfdChannel_的callback函數註冊了TimerQueue的handleRead函數。在handleRead中應該幹什麼就很明顯了,天然是撈出全部到期的timer,一次去執行對應的事件:

void TimerQueue::handleRead()
{
  loop_->assertInLoopThread();
  Timestamp now(Timestamp::now());
  readTimerfd(timerfd_, now);
  std::vector<Entry> expired = getExpired(now);
  // safe to callback outside critical section
  for (std::vector<Entry>::iterator it = expired.begin();
      it != expired.end(); ++it)
  {
    it->second->run();
  }
  reset(expired, now);
}

至此爲止,單線程的Reator就已經完成了。總感受muduo這種事件回調的代碼風格,讀起來比較繞,不夠直觀。不知道其餘的Reactor模式的網絡程序會不會也是這種感受。

多線程的技巧

多線程本質上是困難的,由於它強迫你的大腦去思考兩件事情同時發生會出現的各類狀況。我目前感受除了看別人的經驗和總結,沒有什麼技巧或者方法論來解決多線程的問題。

一個線程一個EventLoop,每一個線程都有本身管理的各類ChannelList和TimerQueue。有時候,咱們總有一些需求,要在各個線程之間調配任務。好比添加一個定時時間到IO線程中,這樣TimerQueue就有兩個線程同時訪問。我認爲muduo在處理上鎖的問題上,很值得學習。

1. runInLoop()和queueInLoop()

先來看幾個EventLoop裏面重要的函數和成員:

std::vector<Functor> pendingFunctors_; // @GuardedBy mutex_
void EventLoop::runInLoop(Functor&& cb) {
  if (isInLoopThread()) {
    cb();
  } else {
    queueInLoop(std::move(cb));
  }
}
void EventLoop::queueInLoop(Functor&& cb) {
  {
    MutexLockGuard lock(mutex_);
    pendingFunctors_.push_back(cb);
  }
  if (!isInLoopThread() || callingPendingFunctors_) {
    wakeup();
  }
}

注意這裏的函數參數,我用到了C++11的右值引用。
在前面的EventLoop::loop裏面,咱們已經看到了doPendingFunctors()這個函數,EventLoop還有一個重要的成員pendingFunctors_,該成員是暴露給其餘線程的。這樣,其餘線程向IO線程添加定時時間的流程就是:

  • 其餘線程調用runInLoop(),

  • 若是不是當前IO線程,再調用queueInLoop()

  • 在queueLoop中,將時間push到pendingFunctors_中,並喚醒當前IO線程
    注意這裏的喚醒條件:不是當前IO線程確定要喚醒;此外,若是正在調用Pending functor,也要喚醒;(爲何?,由於若是正在執行PendingFunctor裏面,若是也執行了queueLoop,若是不喚醒的話,新加的cb就不會當即執行了。)

2.doPendingFunctors()

如今來看一下doPendingFunctors()這個函數:

void EventLoop::doPendingFunctors() {
  std::vector<Functor> functors;
  callingPendingFunctors_ = true;
  {
    // reduce the lenth of the critical section
    // avoid the dead lock cause the functor can call queueInloop(;)
    MutexLockGuard lock(mutex_);
    functors.swap(pendingFunctors_);
  }
  for (size_t i = 0; i < functors.size(); ++i) {
    functors[i]();
  }
  callingPendingFunctors_ = false;
}

doPendingFunctors並無直接在臨界區去執行functors,而是利用了一個棧對象,把事件swap到棧對象中,再去執行。這樣作有兩個好處:

  1. 減小了臨界區的長度,其它線程調用queueInLoop對pendingFunctors加鎖時,就不會被阻塞

  2. 避免了死鎖,可能在functors裏面也會調用queueInLoop(),從而形成死鎖。

回過頭來看,muduo在處理多線程加鎖訪問共享數據的策略上,有一個很重要的原則:拼命減小臨界區的長度
試想一下,若是沒有pendingFunctors_這個數據成員,咱們要想往TimerQueue中添加timer,確定要對TimerQueue裏面的insert函數加鎖,形成鎖的爭用,而pendingFunctors_這個成員將鎖的範圍減小到了一個vector的push_back操做上。此外,在doPendingFunctors中,利用一個棧對象減小臨界區,也是很巧妙的一個重要技巧。

3.wake()

前面說到喚醒IO線程,EventLoop阻塞在poll函數上,怎麼去喚醒及時它?之前的作法是利用pipe,向pipe中寫一個字節,監視在這個pipe的讀事件的poll函數就會馬上返回。在muduo中,採用了linux中eventfd調用

static int createEventfd() {
  int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
  if (evtfd < 0) {
    LOG_SYSERR << "Failed in eventfd";
    abort();
  }
  return evtfd;
}
void EventLoop::wakeup() {
  uint64_t one = 1;
  ssize_t n = ::write(wakeupFd_, &one, sizeof one);
  if (n != sizeof one) {
    LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
  }
}

把eventfd獲得的fd和前面同樣,經過Channel註冊到poll裏面,喚醒的時候,只須要向wakeupFd中寫入一個字節,就能達到喚醒的目的。eventfd、timerfd都體現了linux的設計哲學,Everyting is a fd。

關於muduod的Reactor模式,如今我終於有一些理解了。關於TCPServer部分,下一篇再寫。下一步,我會繼續研究muduo的HTTP示例,並嘗試擴展它。

相關文章
相關標籤/搜索