muduo學習筆記(二)Reactor關鍵結構

muduo學習筆記(二)Reactor關鍵結構


Reactor簡述

什麼是Reactor

Reactor是一種基於事件驅動的設計模式,即經過回調機制,咱們將事件的接口註冊到Reactor上,當事件發生以後,就會回調註冊的接口。
Reactor必要的幾個組件
Event Multiplexer事件分發器:即一些I/O複用機制select、poll、epoll等.程序將事件源註冊到分發器上,等待事件的觸發,作相應處理.
Handle事件源:用於標識一個事件,Linux上是文件描述符.
Reactor反應器:用於管理事件的調度及註冊刪除.當有激活的事件時,則調用回調函數處理,沒有則繼續事件循環.
event handler事件處理器:管理已註冊事件和的調度,分紅不一樣類型的事件(讀/寫,定時)當事件發生,調用對應的回調函數處理.編程

Reactor模型的優缺點

優勢
1)響應快,沒必要爲單個同步時間所阻塞,雖然Reactor自己依然是同步的;
2)編程相對簡單,能夠最大程度的避免複雜的多線程及同步問題,而且避免了多線程/進程的切換開銷;
3)可擴展性,能夠方便的經過增長Reactor實例個數來充分利用CPU資源;
4)可複用性,reactor框架自己與具體事件處理邏輯無關,具備很高的複用性;設計模式

缺點
Reactor模式在IO讀寫數據時仍是在同一個線程中實現的,即便使用多個Reactor機制的狀況下,那些共享一個Reactor的Channel若是出現一個長時間的數據讀寫,會影響這個Reactor中其餘Channel的相應時間,好比在大文件傳輸時,IO操做就會影響其餘Client的相應時間,於是對這種操做,使用傳統的Thread-Per-Connection或許是一個更好的選擇,或則此時使用Proactor模式。多線程

poll簡述

poll的使用方法與select類似,輪詢多個文件描述符,有讀寫時設置相應的狀態位,poll相比select優在沒有最大文件描述符數量的限制.
# include <poll.h>
int poll ( struct pollfd * fds, unsigned int nfds, int timeout);

struct pollfd {
int fd;         /* 文件描述符 */
short events;         /* 等待的事件 */
short revents;       /* 實際發生了的事件 */
} ;

  每個pollfd結構體指定了一個被監視的文件描述符,能夠傳遞多個結構體,指示poll()監視多個文件描述符。每一個結構體的events域是監視該文件描述符的事件掩碼,由用戶來設置這個域。revents域是文件描述符的操做結果事件掩碼,內核在調用返回時設置這個域。events域中請求的任何事件均可能在revents域中返回。合法的事件以下:app

  POLLIN         有數據可讀。
  POLLRDNORM      有普通數據可讀。
  POLLRDBAND      有優先數據可讀。
  POLLPRI         有緊迫數據可讀。
  POLLOUT       寫數據不會致使阻塞。
  POLLWRNORM      寫普通數據不會致使阻塞。
  POLLWRBAND      寫優先數據不會致使阻塞。
  POLLMSGSIGPOLL     消息可用。框架

poll使用樣例

#include <fcntl.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <errno.h>
#include <poll.h>

#define MAX_BUFFER_SIZE 1024
#define IN_FILES 1
#define MAX(a,b) ((a>b)?(a):(b))

int main(int argc ,char **argv)
{
  struct pollfd fds[3];
  char buf[1024];
  int i,res,real_read, maxfd;

  if((fds[0].fd=open("/dev/stdin",O_RDONLY|O_NONBLOCK)) < 0)
  {
    fprintf(stderr,"open data1 error:%s",strerror(errno));
    return 1;
  }

  for (i = 0; i < IN_FILES; i++)
  {
    fds[i].events = POLLIN | POLLPRI;
  }

  while(1) //|| fds[1].events || fds[2].events)
  {
    int ret = poll(fds, 1, 1000);
    if (ret < 0)
    {
      printf("Poll error : %s\n",strerror(errno));
      return 1;
    }

    if(ret == 0){
      printf("Poll timeout\n");
      continue;
    }

    for (i = 0; i< 1; i++)
    {
      if (fds[i].revents)
      {
        memset(buf, 0, MAX_BUFFER_SIZE);
        real_read = read(fds[i].fd, buf, MAX_BUFFER_SIZE);
        if (real_read < 0)
        {
          if (errno != EAGAIN)
          {
            printf("read eror : %s\n",strerror(errno));
            continue;
          }
        }
        else if (!real_read)
        {
          close(fds[i].fd);
          fds[i].events = 0;
        }
        else
        {
          if (i == 0)
          {
            buf[real_read] = '\0';
            printf("%s", buf);
            if ((buf[0] == 'q') || (buf[0] == 'Q'))
            {
              printf("quit\n");
              return 1;
            }
          }
          else
          {
            buf[real_read] = '\0';
            printf("%s", buf);
          }
        }
      }
    }
  }

  exit(0);
}

muduo Reactor關鍵結構

muduo Reactor最核心的事件分發機制, 即將IO multiplexing拿到的IO事件分發給各個文件描述符(fd)的事件處理函數。

Channel

Chanel目前我對它的理解是,它負責管理一個文件描述符(file descript)IO事件.
Channel會封裝C的poll事件,把不一樣的IO事件分發到不一樣的回調:ReadCallBack、WriteCallBack等
每一個Channel對象自始至終只屬於一個EventLoop,所以每一個Channel對象都只屬於某一個IO線程。 每一個Channel對象自始至終只負責一個文件描述符(fd) 的IO事件分發函數

#ifndef NET_CHANNEL_H
#define NET_CHANNEL_H

#include <functional>

#include "EventLoop.hh"

class Channel {
public:
  typedef std::function<void()> EventCallBack;
  Channel(EventLoop* loop, int fd);
  ~Channel();

  void handleEvent();
  void setReadCallBack(const EventCallBack& cb) { m_readCallBack = cb; }
  void setWriteCallBack(const EventCallBack& cb) { m_writeCallBack = cb; }
  void setErrorCallBack(const EventCallBack& cb) { m_errorCallBack = cb; }

  int fd() const { return m_fd; }
  int events() const { return m_events; }
  void set_revents(int revt) { m_revents = revt; }
  bool isNoneEvent() const { return m_events == kNoneEvent; }

  void eableReading() { m_events |=  kReadEvent; update(); }

  int index() { return m_index; }
  void set_index(int idx) { m_index =idx; }

  EventLoop* ownerLoop() { return m_pLoop; }

private:
  Channel& operator=(const Channel&);
  Channel(const Channel&);

  void update();

  static const int kNoneEvent;
  static const int kReadEvent;
  static const int kWriteEvent;

  EventLoop* m_pLoop;
  const int m_fd;
  int m_events;    // 等待的事件
  int m_revents;   // 實際發生了的事件
  int m_index;

  EventCallBack m_readCallBack;
  EventCallBack m_writeCallBack;
  EventCallBack m_errorCallBack;
};

#endif

//Channel.cpp

#include <poll.h>
#include "Channel.hh"
#include "Logger.hh"

const int Channel::kNoneEvent = 0;
const int Channel::kReadEvent = POLLIN | POLLPRI;
const int Channel::kWriteEvent = POLLOUT;

Channel::Channel(EventLoop* loop, int fd)
  : m_pLoop(loop),
    m_fd(fd),
    m_events(0),
    m_revents(0),
    m_index(-1)
{

}

Channel::~Channel()
{

}

void Channel::update()
{
  m_pLoop->updateChannel(this);
}


void Channel::handleEvent()
{
  if(m_revents & POLLNVAL)
  {
    LOG_WARN << "Channel::handleEvent() POLLNVAL";
  }

  if(m_revents & (POLLERR | POLLNVAL)){
    if(m_errorCallBack) m_errorCallBack();
  }

  if(m_revents & (POLLIN | POLLPRI | POLLRDHUP)){
    if(m_readCallBack) m_readCallBack();
  }

  if(m_revents & POLLOUT){
    if(m_writeCallBack) m_writeCallBack();
  }

}

值得一提的就是 Channel::update()它會調用EventLoop::updateChannel(), 後者會轉而調
用Poller::updateChannel()。Poller對象下面會講,經過Poller::updateChannel()註冊IO事件(即file descript).oop

Channel::handleEvent()是Channel的核心, 它由EventLoop::loop()調
用, 它的功能是根據revents發生事件的的值分別調用不一樣的用戶回調。 這個函數之後還會擴充。學習

Poller

Poller class是IO multiplexing的封裝。 它如今是個具體類,而在muduo中是個抽象基類,由於muduo同時支持poll(2)和epoll(4)兩種IOmultiplexing機制。
Poller是EventLoop的間接成員,只供其本身在EventLoop的IO線程中調用,所以無須加鎖。其生命期與EventLoop相等。
Poller並不擁有管理文件描述符事件的Channel, Channel在析構以前必須本身
unregister(EventLoop::removeChannel()) , 避免空懸指針測試

#ifndef _NET_POLLER_HH
#define _NET_POLLER_HH

#include <vector>
#include <map>

#include "TimeStamp.hh"
#include "EventLoop.hh"
#include "Channel.hh"

struct pollfd;

class Poller{
public:
  typedef std::vector<Channel*> ChannelList;

  Poller(EventLoop* loop);
  ~Poller();

  TimeStamp poll(int timeoutMs, ChannelList* activeChannels);

  void updateChannel(Channel* channel);

  void assertInLoopThread() { m_pOwerLoop->assertInLoopThread(); }

private:
  Poller& operator=(const Poller&);
  Poller(const Poller&);

  void fillActiveChannels(int numEvents, ChannelList* activeChannels) const;

  typedef std::vector<struct pollfd> PollFdList;
  typedef std::map<int, Channel*> ChannelMap;

  EventLoop* m_pOwerLoop;
  PollFdList m_pollfds;
  ChannelMap m_channels;

};

#endif

//Poller.cpp
#include "Poller.hh"
#include "Logger.hh"
#include <assert.h>
#include <poll.h>
#include <signal.h>

Poller::Poller(EventLoop* loop)
  : m_pOwerLoop(loop)
{

}

Poller::~Poller()
{

}

TimeStamp Poller::poll(int timeoutMs, ChannelList* activeChannels)
{
  LOG_TRACE << "Poller::poll()";
  int numEvents = ::poll(/*&*m_pollfds.begin()*/m_pollfds.data(), m_pollfds.size(), timeoutMs);
  TimeStamp now(TimeStamp::now());
  if(numEvents > 0){
    LOG_TRACE << numEvents << " events happended";
    fillActiveChannels(numEvents, activeChannels);
  }
  else if(numEvents == 0){
    LOG_TRACE << " nothing happended";
  }
  else{
    LOG_SYSERR << "Poller::poll()";
  }

  return now;
}

/*
 *fillActiveChannels()遍歷m_pollfds, 找出有活動事件的fd, 把它對應
 *的Channel填入activeChannels。
 */

void Poller::fillActiveChannels(int numEvents, ChannelList* activeChannels) const
{
  for(PollFdList::const_iterator pfd = m_pollfds.begin();
      pfd != m_pollfds.end() && numEvents > 0; ++pfd)
  {
    if(pfd->revents > 0)
    {
      --numEvents;
      ChannelMap::const_iterator ch = m_channels.find(pfd->fd);
      assert(ch != m_channels.end());
      Channel* channel = ch->second;
      assert(channel->fd() == pfd->fd);
      channel->set_revents(pfd->revents);
      activeChannels->push_back(channel);
    }
  }
}

void Poller::updateChannel(Channel* channel)
{
  assertInLoopThread();
  LOG_TRACE << "fd= " << channel->fd() << " events" << channel->events();
  if(channel->index() < 0){
    //a new one , add to pollfds
    assert(m_channels.find(channel->fd()) == m_channels.end());
    struct pollfd pfd;
    pfd.fd = channel->fd();
    pfd.events = static_cast<short>(channel->events());
    pfd.revents = 0;
    m_pollfds.push_back(pfd);
    int idx = static_cast<int>(m_pollfds.size()) - 1;
    channel->set_index(idx);
    m_channels[pfd.fd] = channel;

  }
  else{
    //update existing one
    assert(m_channels.find(channel->fd()) != m_channels.end());
    assert(m_channels[channel->fd()] == channel);
    int idx = channel->index();
    assert(0 <= idx && idx < static_cast<int>(m_pollfds.size()));
    struct pollfd& pfd = m_pollfds[idx];
    assert(pfd.fd == channel->fd() || pfd.fd == -1);
    pfd.events = static_cast<short>(channel->events());
    pfd.revents = 0;
    if(channel->isNoneEvent()){
      //ignore this pollfd
      pfd.fd = -1;
    }
  }

}

EventLoop

EventLopp在上一篇文章寫過,這裏給出改動.

EventLoop 新增了quit()成員函數, 還加了幾個數據成員,並在構造函數裏初始化它們。注意EventLoop經過智能指針來間接持有poller.

+class Poller;
+class Channel;

class EventLoop
------------
    bool isInloopThread() const {return m_threadId == CurrentThread::tid(); }

    +void quit();
    +void updateChannel(Channel* channel);

    static EventLoop* getEventLoopOfCurrentThread();

private:
    EventLoop& operator=(const EventLoop&);
    EventLoop(const EventLoop&);

    void abortNotInLoopThread();

    +typedef std::vector<Channel*> ChannelList;

    bool m_looping;
    +bool m_quit;
    const pid_t m_threadId;
    +std::unique_ptr<Poller> m_poller;
    +ChannelList m_activeChannels;
};

//EventLoop.cpp
  m_threadId(CurrentThread::tid()),
 + m_poller(new Poller(this))
{
------
+void EventLoop::quit()
+{
+  m_quit = true;
+  //wakeup();
+}
+
+void EventLoop::updateChannel(Channel* channel)
+{
+  assert(channel->ownerLoop() == this);
+  assertInLoopThread();
+  m_poller->updateChannel(channel);
+}

上一篇文章的EventLoop->loop()什麼也沒作,如今它有了實實在在的使命,它調用Poller::poll()得到當前活動事件的Chanel列表, 而後依次調用每一個Channel的handleEvent()函數

void EventLoop::loop()
{
  assert(!m_looping);
  assertInLoopThread();
  m_looping = true;
  m_quit = false;

  LOG_TRACE << "EventLoop " << this << " start loopig";

  while(!m_quit)
  {
    m_activeChannels.clear();
    m_poller->poll(1000, &m_activeChannels);
    for(ChannelList::iterator it = m_activeChannels.begin();
      it != m_activeChannels.end(); ++it)
    {
      (*it)->handleEvent();
    }

  }

  LOG_TRACE << "EventLoop " << this << " stop loopig";
  m_looping = false;

}

Reactor時序圖

測試程序-單次觸發的定時器

程序利用timerfd_create 把時間變成了一個文件描述符,該「文件」在定時器超時的那一刻變得可讀,這樣就能很方便地融入到 select/poll 框架中,用統一的方式來處理 IO 事件和超時事件,這也正是 Reactor 模式的長處。

#include <errno.h>
#include <thread>
#include <strings.h>

#include "EventLoop.hh"
#include "Channel.hh"
#include "Poller.hh"

//Reactor Test
//單次觸發定時器
#include <sys/timerfd.h>

EventLoop* g_loop;

void timeout()
{
  printf("timeout!\n");
  g_loop->quit();
}

int main()
{

  EventLoop loop;
  g_loop = &loop;

  int timerfd = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK |TFD_CLOEXEC);

  Channel channel(&loop, timerfd);
  channel.setReadCallBack(timeout);
  channel.eableReading();

  struct itimerspec howlong;
  bzero(&howlong, sizeof howlong);
  howlong.it_value.tv_sec = 3;
  timerfd_settime(timerfd, 0, &howlong, NULL);

  loop.loop();

  close(timerfd);

}
./test.out 
2018-10-31 22:25:54.532487 [TRACE] [EventLoop.cpp:16] [EventLoop] EventLoop Create 0x7FFEB9567CC0 in thread 3075
2018-10-31 22:25:54.533563 [TRACE] [Poller.cpp:64] [updateChannel] fd= 3 events3
2018-10-31 22:25:54.534000 [TRACE] [EventLoop.cpp:41] [loop] EventLoop 0x7FFEB9567CC0 start loopig
2018-10-31 22:25:54.534334 [TRACE] [Poller.cpp:20] [poll] Poller::poll()
2018-10-31 22:25:55.535827 [TRACE] [Poller.cpp:28] [poll]  nothing happended
2018-10-31 22:25:55.536287 [TRACE] [Poller.cpp:20] [poll] Poller::poll()
2018-10-31 22:25:56.538334 [TRACE] [Poller.cpp:28] [poll]  nothing happended
2018-10-31 22:25:56.538802 [TRACE] [Poller.cpp:20] [poll] Poller::poll()
2018-10-31 22:25:57.534175 [TRACE] [Poller.cpp:24] [poll] 1 events happended
timeout!
2018-10-31 22:25:57.534766 [TRACE] [EventLoop.cpp:55] [loop] EventLoop 0x7FFEB9567CC0 stop loopig
相關文章
相關標籤/搜索