muduo的事件處理(Reactor模型關鍵結構)

muduoReactor模式主要有3個類實現-Channel、Poller、EventLoopweb

可是我本身認爲核心還應該包括定時器,只有理解了定時器與其餘三個類是如何搭配使用的才能算是真正理解了其核心架構吧.(也許是我在這裏比較迷的緣故,哈哈哈)

首先,咱們從簡單的作起,一步一步構建思惟腦圖,理解Muduo核心結構編程

1. 事件分發類 Channel (最終幹活的)

Channelselectable IO channel自始至終只負責一個 fd 的(註冊與響應) IO 事件,可是不擁有該 fd ,因此也就在析構的時候不關閉它.數組

來來來,先喊三遍口號:安全

自始至終都屬於一個EventLoop(一個EventLoop對應多個Channel,處理多個IO),自始至終只負責一個文件描述符的IO事件
自始至終都屬於一個EventLoop(一個EventLoop對應多個Channel,處理多個IO),自始至終只負責一個文件描述符的IO事件
自始至終都屬於一個EventLoop(一個EventLoop對應多個Channel,處理多個IO),自始至終只負責一個文件描述符的IO事件

如何工做:網絡

在Channel類中保存這IO事件的類型以及對應的回調函數,當IO事件發生時,最終會調用到Channel類中的回調函數多線程

具體流程以下:架構

首先給定Channel所屬的 loop,及其要處理的 fd;接着註冊 fd 上須要監聽的事件,若是是經常使用的讀寫事件的話,能夠直接調用接口函數enableReadingenableWriting來註冊對應fd上的事件,disable*是銷燬指定的事件;而後經過 setCallback 來設置事件發生時的回調便可app

註冊事件時函數調用關係,以下:Channel::update()->EventLoop::updateChannel(Channel*)->Poller::updateChannel(Channel*),最終向 poll 系統調用的監聽事件表註冊或修改事件。框架

在這裏插入圖片描述

class EventLoop;

///
/// A selectable I/O channel.
///
/// This class doesn't own the file descriptor.
/// The file descriptor could be a socket,
/// an eventfd, a timerfd, or a signalfd
class Channel
{
public:
  typedef std::function<void()> EventCallback;

  Channel(EventLoop *loop, int fd) 
  	: loop_(loop),
      fd_(fdArg),
      events_(0),
      revents_(0),
      index_(-1)
{
}
//處理事件,通常由Poller經過EventLoop來調用
//eventloop中有一個vector<Channel *>的 activeChannels_ 活動數組,天然能夠調用它
//根據revents_ 的事件類型,執行相應的回調
  void handleEvent()
  {
	 if (revents_ & POLLNVAL)
	  {
	    LOG_WARN << "Channel::handle_event() POLLNVAL";
	  }
	  if (revents_ & (POLLERR | POLLNVAL))
	  {
	    if (errorCallback_)
	      errorCallback_();
	  }
	  if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
	  {
	    if (readCallback_)
	      readCallback_();
	  }
	  if (revents_ & POLLOUT)
	  {
	    if (writeCallback_)
	      writeCallback_();
	  }
  }

  void setReadCallback(const EventCallback &cb)
  {
    readCallback_ = cb;
  }
  void setWriteCallback(const EventCallback &cb)
  {
    writeCallback_ = cb;
  }
  void setErrorCallback(const EventCallback &cb)
  {
    errorCallback_ = cb;
  }

  int fd() const { return fd_; }
    /* 返回 fd 註冊的事件 */
  int events() const { return events_; }
  void set_revents(int revt) { revents_ = revt; }
  bool isNoneEvent() const { return events_ == kNoneEvent; }

  void enableReading()
  {
    events_ |= kReadEvent;
    update();
  }
  // void enableWriting() { events_ |= kWriteEvent; update(); }
  // void disableWriting() { events_ &= ~kWriteEvent; update(); }
  // void disableAll() { events_ = kNoneEvent; update(); }

  // for Epoller
  int index() { return index_; }
  void set_index(int idx) { index_ = idx; }

  EventLoop *ownerLoop() { return loop_; }

private:
/* 經過調用loop_->updateChannel()來註冊或改變本fd在epoll中監聽的事件 */
  void update() 
 {
  	loop_->updateChannel(this);
 }
  static const int kNoneEvent=0;
  static const int kReadEvent=POLLIN | POLLPRI;
  static const int kWriteEvent =POLLOUT;

  EventLoop *loop_;
  const int fd_;
  int events_; //關注的事件
  int revents_; //poll/epoll中返回的事件
  int index_; // used by epoller. 表示在poll的事件數組中的序號

  EventCallback readCallback_;
  EventCallback writeCallback_;
  EventCallback errorCallback_;
};

2. IO multiplexing 類 Poller

// 在咱們這裏將其直接寫爲一個具體類socket

Poller是個基類,具體能夠是EPollPoller(默認) 或者PollPoller,對應 poll 和 epoll.須要去實現(惟一使用面向對象的一個類)

這裏咱們再來喊三遍口號:

是eventloop的成員,它的職責僅僅是IO複用,事件分發交給 Channel 完成,生命期和 EventLoop 同樣長。
是eventloop的成員,它的職責僅僅是IO複用,事件分發交給 Channel 完成,生命期和 EventLoop 同樣長。
是eventloop的成員,它的職責僅僅是IO複用,事件分發交給 Channel 完成,生命期和 EventLoop 同樣長。

具體處理流程就是:

poll函數調用 epoll_wait/poll 來監聽註冊了的文件描述符,而後經過fillActiveChannels函數將返回的就緒事件裝入 activeChannels 數組

在這裏插入圖片描述
Poller,h

namespace muduo
{

class Channel;

///
/// IO Multiplexing with poll(2).
///
/// This class doesn't own the Channel objects.
class Poller : boost::noncopyable
{
 public:
  typedef std::vector<Channel*> ChannelList;

  Poller(EventLoop* loop); //須要傳入EventLoop Objetct 
  ~Poller();

  /// Polls the I/O events.
  /// Must be called in the loop thread.
  // 核心功能,調用 epoll_wait/poll 來監聽註冊了的文件描述符 
    /* Channel::update()->EventLoop::updateChannel(Channel* channel)->Poller::updateChannel(Channel* channel)*/

  Timestamp poll(int timeoutMs, ChannelList* activeChannels); 

  /// Changes the interested I/O events.
  /// Must be called in the loop thread.
  //負責維護和更新 pollfds_ 數組 
  void updateChannel(Channel* channel);
	 /* 斷言 確保沒有跨線程 */
  void assertInLoopThread() { ownerLoop_->assertInLoopThread(); }

 private:
 //真正填充 activeChannels 的函數
  void fillActiveChannels(int numEvents,
                          ChannelList* activeChannels) const;

  typedef std::vector<struct pollfd> PollFdList;
  typedef std::map<int, Channel*> ChannelMap;//key是文件描述符,value是Channel*

  EventLoop* ownerLoop_;
  PollFdList pollfds_;
  ChannelMap channels_;
};

}

Poller,cc

Poller::Poller(EventLoop* loop)
  : ownerLoop_(loop)
{
}

Poller::~Poller()
{
}

Timestamp Poller::poll(int timeoutMs, ChannelList* activeChannels)
{
  // XXX pollfds_ shouldn't change
  int numEvents = ::poll(&*pollfds_.begin(), pollfds_.size(), timeoutMs);
  Timestamp now(Timestamp::now());
  if (numEvents > 0) {
    LOG_TRACE << numEvents << " events happended";
    fillActiveChannels(numEvents, activeChannels);
  } else if (numEvents == 0) {
    LOG_TRACE << " nothing happended";
  } else {
    LOG_SYSERR << "Poller::poll()";
  }
  return now;
}

void Poller::fillActiveChannels(int numEvents,
                                ChannelList* activeChannels) const
{
  for (PollFdList::const_iterator pfd = pollfds_.begin();
      pfd != pollfds_.end() && numEvents > 0; ++pfd)
  {
    if (pfd->revents > 0)
    {
      --numEvents;
      ChannelMap::const_iterator ch = channels_.find(pfd->fd);
      assert(ch != channels_.end());
      Channel* channel = ch->second;
      assert(channel->fd() == pfd->fd);
      channel->set_revents(pfd->revents);
      // pfd->revents = 0;
      activeChannels->push_back(channel);
    }
  }
}

void Poller::updateChannel(Channel* channel)
{
  assertInLoopThread();
  LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();
  if (channel->index() < 0) {
   // muduo 使用了 index 去讓channel 記住他在pollfds_的下標
   // index < 0說明是一個新的通道
    // a new one, add to pollfds_
    assert(channels_.find(channel->fd()) == channels_.end());
    struct pollfd pfd;
    pfd.fd = channel->fd();
    pfd.events = static_cast<short>(channel->events());
    pfd.revents = 0;
    pollfds_.push_back(pfd);
    int idx = static_cast<int>(pollfds_.size())-1;
    channel->set_index(idx);
    channels_[pfd.fd] = channel;
  } else {
    // update existing one
    assert(channels_.find(channel->fd()) != channels_.end());
    assert(channels_[channel->fd()] == channel);
    int idx = channel->index();
    assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
    struct pollfd& pfd = pollfds_[idx];
    assert(pfd.fd == channel->fd() || pfd.fd == -1);
    pfd.events = static_cast<short>(channel->events());
    pfd.revents = 0;
  	// 將一個通道暫時更改成不關注事件,但不從Poller中移除該通道
    if (channel->isNoneEvent()) {
      // ignore this pollfd
      pfd.fd = -1;
    }
  }
}

3. EventLoop 類

EventLoop類是Reactor模式的核心,一個線程一個事件循環,即one loop per threadEventLoop 對象的生命週期一般與其所屬的線程同樣長。EventLoop對象構造的時候,會檢查當前線程是否已經建立了其餘EventLoop對象,若是已建立,終止程序(LOG_FATAL),EventLoop類的構造函數會記錄本對象所屬線程(threadld_),建立了EventLoop對象的線程稱爲IO線程.其主要功能是運行事件循環,等待事件發生,而後調用回調處理髮生的事件。EventLoop::loop() -> Poller::poll()填充就緒事件集合 activeChannels,而後遍歷該容器,執行每一個 channelChannel::handleEvent() 完成對應就緒事件回調。
EventLoop.h

class Channel;
class Poller;

class EventLoop : boost::noncopyable
{
 public:

  EventLoop();

  // force out-line dtor, for scoped_ptr members.
  ~EventLoop();

  ///
  /// Loops forever.
  /// 核心
  /// Must be called in the same thread as creation of the object.
  ///**`EventLoop::loop() -> Poller::poll() `填充就緒事件集合 `activeChannels`,
  //而後遍歷該容器,執行每一個 `channel `的 `Channel::handleEvent()`
  // 完成對應就緒事件回調。**
  void loop();

  void quit();

  // internal use only
  void updateChannel(Channel* channel);
  // void removeChannel(Channel* channel);

  void assertInLoopThread()
  {
    if (!isInLoopThread())
    {
      abortNotInLoopThread();
    }
  }

  bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }

 private:

  void abortNotInLoopThread();

  typedef std::vector<Channel*> ChannelList;

  bool looping_; /* atomic */
  bool quit_; /* atomic */
  const pid_t threadId_; //線程ID
  boost::scoped_ptr<Poller> poller_; 
  ChannelList activeChannels_;
};

}

EventLoop.cc

using namespace muduo;

__thread EventLoop* t_loopInThisThread = 0;
const int kPollTimeMs = 10000;

EventLoop::EventLoop()
  : looping_(false),
    quit_(false),
    threadId_(CurrentThread::tid()),
    poller_(new Poller(this))
{
  LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
  if (t_loopInThisThread)
  {
    LOG_FATAL << "Another EventLoop " << t_loopInThisThread
              << " exists in this thread " << threadId_;
  }
  else
  {
    t_loopInThisThread = this;
  }
}

EventLoop::~EventLoop()
{
  assert(!looping_);
  t_loopInThisThread = NULL;
}

void EventLoop::loop()
{
  assert(!looping_);
  assertInLoopThread();
  looping_ = true;
  quit_ = false;

  while (!quit_)
  {
    activeChannels_.clear();
    poller_->poll(kPollTimeMs, &activeChannels_);
    for (ChannelList::iterator it = activeChannels_.begin();
        it != activeChannels_.end(); ++it)
    {
      (*it)->handleEvent();
    }
  }

  LOG_TRACE << "EventLoop " << this << " stop looping";
  looping_ = false;
}

void EventLoop::quit()
{
  quit_ = true;
  // wakeup();
}

void EventLoop::updateChannel(Channel* channel)
{
  assert(channel->ownerLoop() == this);
  assertInLoopThread();
  poller_->updateChannel(channel);
}

void EventLoop::abortNotInLoopThread()
{
  LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
            << " was created in threadId_ = " << threadId_
            << ", current thread id = " <<  CurrentThread::tid();
}

使用以上三個類.來看一個簡單的例子:

#include "Channel.h"
#include "EventLoop.h"

#include <stdio.h>
#include <sys/timerfd.h>

muduo::EventLoop *g_loop;

void timeout()
{
  printf("Timeout!\n");
  g_loop->quit();
}

int main()
{
  muduo::EventLoop loop;
  g_loop = &loop;

  int timerfd = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);//建立一個新的定時器對象
  muduo::Channel channel(&loop, timerfd);
  channel.setReadCallback(timeout);
  channel.enableReading();

  struct itimerspec howlong;
  bzero(&howlong, sizeof howlong);
  howlong.it_value.tv_sec = 5;
  ::timerfd_settime(timerfd, 0, &howlong, NULL);

  loop.loop();

  ::close(timerfd);
}

程序利用timerfd實現一個單詞觸發的定時器.channel將該timerfd上的可讀事件轉發給了timerout函數

在這裏插入圖片描述

4. TimerQueue 定時器

如下摘自陳碩的博客:

Linux 時間函數

Linux 的計時函數,用於得到當前時間:

  • time(2) / time_t (秒)
  • ftime(3) / struct timeb (毫秒)
  • gettimeofday(2) / struct timeval (微秒)
  • clock_gettime(2) / struct timespec (納秒)
  • gmtime / localtime / timegm / mktime / strftime / struct tm (這些與當前時間無關)
定時函數,用於讓程序等待一段時間或安排計劃任務:
sleep
alarm
usleep
nanosleep
clock_nanosleep
getitimer / setitimer
timer_create / timer_settime / timer_gettime / timer_delete
timerfd_create / timerfd_gettime / timerfd_settime

個人取捨以下:

  • (計時)只使用 gettimeofday 來獲取當前時間。(這裏我在協程裏面我是使用clock_gettime(2)
  • (定時)只使用 timerfd_* 系列函數來處理定時。
    gettimeofday 入選緣由:(這也是 muduo::Timestamp class 的主要設計考慮)
  1. time 的精度過低,ftime 已被廢棄,clock_gettime 精度最高,可是它系統調用的開銷比 gettimeofday 大。(PS:這是陳碩2011年寫的文章,如今時間獲取函數在主流系統上基本都不是系統調用了)
  2. /在 x86-64 平臺上,gettimeofday 不是系統調用,而是在用戶態實現的(搜 vsyscall),沒有上下文切換和陷入內核的開銷。/
  3. gettimeofday 的分辨率 (resolution) 是 1 微秒,足以知足平常計時的須要。muduo::Timestamp 用一個 int64_t 來表示從 Epoch 到如今的微秒數,其範圍可達上下 30 萬年。

PS:C++的time_tint32_t 表示秒數,時間不許.

timerfd_* 入選的緣由:

  • sleep / alarm / usleep 在實現時有可能用了信號 SIGALRM,在多線程程序中處理信號是個至關麻煩的事情,應當儘可能避免。(近期我會寫一篇博客仔細講講「多線程、RAII、fork() 與信號」)

  • nanosleep 和 clock_nanosleep 是線程安全的,可是在非阻塞網絡編程中,絕對不能用讓線程掛起的方式來等待一段時間,程序會失去響應。正確的作法是註冊一個時間回調函數。

  • getitimer 和 timer_create 也是用信號來 deliver 超時,在多線程程序中也會有麻煩。timer_create 能夠指定信號的接收方是進程仍是線程,算是一個進步,不過在信號處理函數(signal handler)能作的事情實在很受限。

  • timerfd_create 把時間變成了一個文件描述符,該「文件」在定時器超時的那一刻變得可讀,這樣就能很方便地融入到 select/poll 框架中,用統一的方式來處理 IO 事件和超時事件,這也正是 Reactor 模式的長處。

傳統的 Reactor 利用 select/poll/epoll 的 timeout 來實現定時功能,但 poll 和 epoll 的定時精度只有毫秒,遠低於 timerfd_settime 的定時精度。

這裏有三個類,還比較麻煩,不過不急,咱們慢慢來看:
(1)TimerId 類

惟一標識一個 Timer 定時器。TimerId Class 同時保存Timer* 和 sequence_,這個 sequence_ 是每一個 Timer 對象有一個全局遞增的序列號 int64_t sequence_,用原子計數器(AtomicInt64)生成 
主要用於註銷定時器,int64_t sequence_ 能夠區分地址相同的前後兩個 Timer 對象。下面代碼先忽略int64_t sequence_,也就是先不實現cancel 的接口,咱們在這裏只理解其工做方式便可
TimreId 類

namespace muduo
{

class Timer;

///
/// An opaque identifier, for canceling Timer.
///
class TimerId : public muduo::copyable
{
public:
  explicit TimerId(Timer *timer)
      : value_(timer)
  {
  }

  // default copy-ctor, dtor and assignment are okay

private:
  Timer *value_;
};
Timer 定時器類(真正的超時事件,封裝了真正的超時回調)

封裝了定時器的一些參數,包括超時時間(expiration_)、超時回調函數(callback_)、時間間隔(interval_)、是否重複定時(repeat_)、定時器的序列號等成員變量,成員函數大都是返回這些變量的值,run() 用來調用回調函數,restart() 用來重啓定時器。

Timer.h

namespace muduo
{

///
/// Internal class for timer event.
///
class Timer : boost::noncopyable
{
 public:
  Timer(const TimerCallback& cb, Timestamp when, double interval)
    : callback_(cb),
      expiration_(when),
      interval_(interval),
      repeat_(interval > 0.0)
  { }

  void run() const
  {
    callback_();//執行定時器回調函數
  }

  Timestamp expiration() const  { return expiration_; }
   /* 是否週期性定時 */
  bool repeat() const { return repeat_; }
	 /* 重啓定時器 */
  void restart(Timestamp now)
  {
	  if (repeat_)
	  {
        //若是須要重複,那就將時間設爲下次超時的時間
	    expiration_ = addTime(now, interval_);
	  }
	  else
	  {
		//若是不須要重複,那就將超時時間設爲一個不可用的 value
	    expiration_ = Timestamp::invalid();
	  }
	}

 private:
  const TimerCallback callback_; //回調函數
  Timestamp expiration_; //時間戳 
  const double interval_;//時間間隔,若是是一次性定時器,該值爲0
  const bool repeat_;//是否重複執行
};

}
TimerQueue 定時器容器類

這裏muduo使用的是下面的結構去管理的定時器的

typedef std::pair<Timestamp, Timer*> Entry;
typedef std::set<Entry> TimerList;

他說是multimap應爲不經常使用被拋棄了,唉,對於這點,也是醉了

TimerQueue 定時器容器類中須要處理兩個 Timer 的超時時間相同的問題,因此能夠用multimap

經過給 timerfd 一個超時時間實現超時計時,它內部有 Channel,經過 Channel 管理 timerfd,而後向EventLoopPoller 註冊 timerfd 的可讀事件,當 timerfd 的可讀事件就緒時代表一個超時時間點到了,而後Channle對象timerfdChannel_調用可讀事件回調 handleRead(),經過 getExpired() 找出全部的超時事件,而後執行相應的超時回調函數 Timer::run()。爲了複用定時器,每次處理完以後,會檢查這些超時定時器是否須要重複定時,若是須要重複,就再次添加到定時器集合中。

timerfd 如何實現多個定時器超時計時的呢?就是在插入的時候與set 元素比較,而後更新timerfd,從而保證 timerfd 始終是 set 中最近的一個超時時間.當 timerfd 可讀時,仍是須要遍歷容器,由於有可能此時有多個 Timer 超時了(儘管 tiemrfd 是當前最小的超時時間).唉,何須這麼麻煩吶,直接用時間堆管理很差嗎?timerfd == 堆頂,不過,我學到的是仍是須要遍歷容器(堆)的

TimerQueue.h

namespace muduo
{

class EventLoop;
class Timer;
class TimerId;

///
/// A best efforts timer queue.
/// No guarantee that the callback will be on time.
///
class TimerQueue : boost::noncopyable
{
 public:
  TimerQueue(EventLoop* loop);
  ~TimerQueue();

  ///
  /// Schedules the callback to be run at given time,
  /// repeats if @c interval > 0.0.
  ///
  /// Must be thread safe. Usually be called from other threads.
  TimerId addTimer(const TimerCallback& cb,
                   Timestamp when,
                   double interval);

  // void cancel(TimerId timerId);

 private:

  // FIXME: use unique_ptr<Timer> instead of raw pointers.
  typedef std::pair<Timestamp, Timer*> Entry;
  typedef std::set<Entry> TimerList;

  void addTimerInLoop(Timer* timer);
  // called when timerfd alarms
  void handleRead();
  // move out all expired timers
  std::vector<Entry> getExpired(Timestamp now);
  void reset(const std::vector<Entry>& expired, Timestamp now);

  bool insert(Timer* timer);

  EventLoop* loop_;
  const int timerfd_;
  Channel timerfdChannel_;
  // Timer list sorted by expiration
  TimerList timers_;
};

}
#endif // MUDUO_NET_TIMERQUEUE_H

TimerQueue.cc

 
加入定時器以後的EventLoop類

EventLoop加入三個函數:runAt()、runAfter()、runEvery()。通通轉而調用TimerQueue::addTimer

muduo中有一個EventLoop::runInLoop函數,用來在其餘線程中喚醒IO線程(就是建立並運行了EventLoop的線程),可是在協程裏面應該是用不到,因此暫時不接觸這一點
EventLoop.h

#include "datetime/Timestamp.h"
#include "thread/Thread.h"
#include "Callbacks.h"
#include "TimerId.h"

#include <boost/scoped_ptr.hpp>
#include <vector>

namespace muduo
{

class Channel;
class Poller;
class TimerQueue;

class EventLoop : boost::noncopyable
{
 public:

  EventLoop();

  // force out-line dtor, for scoped_ptr members.
  ~EventLoop();

  ///
  /// Loops forever.
  ///
  /// Must be called in the same thread as creation of the object.
  ///
  void loop();

  void quit();

  ///
  /// Time when poll returns, usually means data arrivial.
  ///
  Timestamp pollReturnTime() const { return pollReturnTime_; }

  // timers

  ///
  /// Runs callback at 'time'.
  ///
  TimerId runAt(const Timestamp& time, const TimerCallback& cb);
  ///
  /// Runs callback after @c delay seconds.
  ///
  TimerId runAfter(double delay, const TimerCallback& cb);
  ///
  /// Runs callback every @c interval seconds.
  ///
  TimerId runEvery(double interval, const TimerCallback& cb);

  // void cancel(TimerId timerId);

  // internal use only
  void updateChannel(Channel* channel);
  // void removeChannel(Channel* channel);

  void assertInLoopThread()
  {
    if (!isInLoopThread())
    {
      abortNotInLoopThread();
    }
  }

  bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }

 private:

  void abortNotInLoopThread();

  typedef std::vector<Channel*> ChannelList;

  bool looping_; /* atomic */
  bool quit_; /* atomic */
  const pid_t threadId_;
  Timestamp pollReturnTime_;
  boost::scoped_ptr<Poller> poller_;
  boost::scoped_ptr<TimerQueue> timerQueue_;
  ChannelList activeChannels_;
};

}

#endif // MUDUO_NET_EVENTLOOP_H

EventLoop.cc

using namespace muduo;

__thread EventLoop* t_loopInThisThread = 0;
const int kPollTimeMs = 10000;

EventLoop::EventLoop()
  : looping_(false),
    quit_(false),
    threadId_(CurrentThread::tid()),
    poller_(new Poller(this)),
    timerQueue_(new TimerQueue(this))
{
  LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
  if (t_loopInThisThread)
  {
    LOG_FATAL << "Another EventLoop " << t_loopInThisThread
              << " exists in this thread " << threadId_;
  }
  else
  {
    t_loopInThisThread = this;
  }
}

EventLoop::~EventLoop()
{
  assert(!looping_);
  t_loopInThisThread = NULL;
}

void EventLoop::loop()
{
  assert(!looping_);
  assertInLoopThread();
  looping_ = true;
  quit_ = false;

  while (!quit_)
  {
    activeChannels_.clear();
    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
    for (ChannelList::iterator it = activeChannels_.begin();
        it != activeChannels_.end(); ++it)
    {
      (*it)->handleEvent();
    }
  }

  LOG_TRACE << "EventLoop " << this << " stop looping";
  looping_ = false;
}

void EventLoop::quit()
{
  quit_ = true;
  // wakeup();
}

TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
{
  return timerQueue_->addTimer(cb, time, 0.0);
}

TimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
{
  Timestamp time(addTime(Timestamp::now(), delay));
  return runAt(time, cb);
}

TimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
{
  Timestamp time(addTime(Timestamp::now(), interval));
  return timerQueue_->addTimer(cb, time, interval);
}

void EventLoop::updateChannel(Channel* channel)
{
  assert(channel->ownerLoop() == this);
  assertInLoopThread();
  poller_->updateChannel(channel);
}

void EventLoop::abortNotInLoopThread()
{
  LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
            << " was created in threadId_ = " << threadId_
            << ", current thread id = " <<  CurrentThread::tid();
}
相關文章
相關標籤/搜索