目錄react
Reactor
是一種基於事件驅動的設計模式,即經過回調機制,咱們將事件的接口註冊到Reactor上,當事件發生以後,就會回調註冊的接口。
Reactor必要的幾個組件
:
Event Multiplexer事件分發器
:即一些I/O複用機制select、poll、epoll等.程序將事件源註冊到分發器上,等待事件的觸發,作相應處理.
Handle事件源
:用於標識一個事件,Linux上是文件描述符.
Reactor反應器
:用於管理事件的調度及註冊刪除.當有激活的事件時,則調用回調函數處理,沒有則繼續事件循環.
event handler事件處理器
:管理已註冊事件和的調度,分紅不一樣類型的事件(讀/寫,定時)當事件發生,調用對應的回調函數處理.編程
優勢
1)響應快,沒必要爲單個同步時間所阻塞,雖然Reactor自己依然是同步的;
2)編程相對簡單,能夠最大程度的避免複雜的多線程及同步問題,而且避免了多線程/進程的切換開銷;
3)可擴展性,能夠方便的經過增長Reactor實例個數來充分利用CPU資源;
4)可複用性,reactor框架自己與具體事件處理邏輯無關,具備很高的複用性;設計模式
缺點
Reactor模式在IO讀寫數據時仍是在同一個線程中實現的,即便使用多個Reactor機制的狀況下,那些共享一個Reactor的Channel若是出現一個長時間的數據讀寫,會影響這個Reactor中其餘Channel的相應時間,好比在大文件傳輸時,IO操做就會影響其餘Client的相應時間,於是對這種操做,使用傳統的Thread-Per-Connection或許是一個更好的選擇,或則此時使用Proactor模式。多線程
poll的使用方法與select類似,輪詢多個文件描述符,有讀寫時設置相應的狀態位,poll相比select優在沒有最大文件描述符數量的限制.
# include <poll.h> int poll ( struct pollfd * fds, unsigned int nfds, int timeout); struct pollfd { int fd; /* 文件描述符 */ short events; /* 等待的事件 */ short revents; /* 實際發生了的事件 */ } ;
每個pollfd結構體指定了一個被監視的文件描述符,能夠傳遞多個結構體,指示poll()監視多個文件描述符。每一個結構體的events域是監視該文件描述符的事件掩碼,由用戶來設置這個域。revents域是文件描述符的操做結果事件掩碼,內核在調用返回時設置這個域。events域中請求的任何事件均可能在revents域中返回。合法的事件以下:app
POLLIN 有數據可讀。
POLLRDNORM 有普通數據可讀。
POLLRDBAND 有優先數據可讀。
POLLPRI 有緊迫數據可讀。
POLLOUT 寫數據不會致使阻塞。
POLLWRNORM 寫普通數據不會致使阻塞。
POLLWRBAND 寫優先數據不會致使阻塞。
POLLMSGSIGPOLL 消息可用。框架
#include <fcntl.h> #include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <string.h> #include <time.h> #include <errno.h> #include <poll.h> #define MAX_BUFFER_SIZE 1024 #define IN_FILES 1 #define MAX(a,b) ((a>b)?(a):(b)) int main(int argc ,char **argv) { struct pollfd fds[3]; char buf[1024]; int i,res,real_read, maxfd; if((fds[0].fd=open("/dev/stdin",O_RDONLY|O_NONBLOCK)) < 0) { fprintf(stderr,"open data1 error:%s",strerror(errno)); return 1; } for (i = 0; i < IN_FILES; i++) { fds[i].events = POLLIN | POLLPRI; } while(1) //|| fds[1].events || fds[2].events) { int ret = poll(fds, 1, 1000); if (ret < 0) { printf("Poll error : %s\n",strerror(errno)); return 1; } if(ret == 0){ printf("Poll timeout\n"); continue; } for (i = 0; i< 1; i++) { if (fds[i].revents) { memset(buf, 0, MAX_BUFFER_SIZE); real_read = read(fds[i].fd, buf, MAX_BUFFER_SIZE); if (real_read < 0) { if (errno != EAGAIN) { printf("read eror : %s\n",strerror(errno)); continue; } } else if (!real_read) { close(fds[i].fd); fds[i].events = 0; } else { if (i == 0) { buf[real_read] = '\0'; printf("%s", buf); if ((buf[0] == 'q') || (buf[0] == 'Q')) { printf("quit\n"); return 1; } } else { buf[real_read] = '\0'; printf("%s", buf); } } } } } exit(0); }
muduo Reactor最核心的事件分發機制, 即將IO multiplexing拿到的IO事件分發給各個文件描述符(fd)的事件處理函數。
Chanel目前我對它的理解是,它負責管理一個文件描述符(file descript)IO事件.
Channel會封裝C的poll事件,把不一樣的IO事件分發到不一樣的回調:ReadCallBack、WriteCallBack等
每一個Channel對象自始至終只屬於一個EventLoop,所以每一個Channel對象都只屬於某一個IO線程。 每一個Channel對象自始至終只負責一個文件描述符(fd) 的IO事件分發函數
#ifndef NET_CHANNEL_H #define NET_CHANNEL_H #include <functional> #include "EventLoop.hh" class Channel { public: typedef std::function<void()> EventCallBack; Channel(EventLoop* loop, int fd); ~Channel(); void handleEvent(); void setReadCallBack(const EventCallBack& cb) { m_readCallBack = cb; } void setWriteCallBack(const EventCallBack& cb) { m_writeCallBack = cb; } void setErrorCallBack(const EventCallBack& cb) { m_errorCallBack = cb; } int fd() const { return m_fd; } int events() const { return m_events; } void set_revents(int revt) { m_revents = revt; } bool isNoneEvent() const { return m_events == kNoneEvent; } void eableReading() { m_events |= kReadEvent; update(); } int index() { return m_index; } void set_index(int idx) { m_index =idx; } EventLoop* ownerLoop() { return m_pLoop; } private: Channel& operator=(const Channel&); Channel(const Channel&); void update(); static const int kNoneEvent; static const int kReadEvent; static const int kWriteEvent; EventLoop* m_pLoop; const int m_fd; int m_events; // 等待的事件 int m_revents; // 實際發生了的事件 int m_index; EventCallBack m_readCallBack; EventCallBack m_writeCallBack; EventCallBack m_errorCallBack; }; #endif //Channel.cpp #include <poll.h> #include "Channel.hh" #include "Logger.hh" const int Channel::kNoneEvent = 0; const int Channel::kReadEvent = POLLIN | POLLPRI; const int Channel::kWriteEvent = POLLOUT; Channel::Channel(EventLoop* loop, int fd) : m_pLoop(loop), m_fd(fd), m_events(0), m_revents(0), m_index(-1) { } Channel::~Channel() { } void Channel::update() { m_pLoop->updateChannel(this); } void Channel::handleEvent() { if(m_revents & POLLNVAL) { LOG_WARN << "Channel::handleEvent() POLLNVAL"; } if(m_revents & (POLLERR | POLLNVAL)){ if(m_errorCallBack) m_errorCallBack(); } if(m_revents & (POLLIN | POLLPRI | POLLRDHUP)){ if(m_readCallBack) m_readCallBack(); } if(m_revents & POLLOUT){ if(m_writeCallBack) m_writeCallBack(); } }
值得一提的就是 Channel::update()它會調用EventLoop::updateChannel(), 後者會轉而調
用Poller::updateChannel()。Poller對象下面會講,經過Poller::updateChannel()註冊IO事件(即file descript).oop
Channel::handleEvent()是Channel的核心, 它由EventLoop::loop()調
用, 它的功能是根據revents發生事件的的值分別調用不一樣的用戶回調。 這個函數之後還會擴充。學習
Poller class是IO multiplexing的封裝。 它如今是個具體類,而在muduo中是個抽象基類,由於muduo同時支持poll(2)和epoll(4)兩種IOmultiplexing機制。
Poller是EventLoop的間接成員,只供其本身在EventLoop的IO線程中調用,所以無須加鎖。其生命期與EventLoop相等。
Poller並不擁有管理文件描述符事件的Channel, Channel在析構以前必須本身
unregister(EventLoop::removeChannel()) , 避免空懸指針測試
#ifndef _NET_POLLER_HH #define _NET_POLLER_HH #include <vector> #include <map> #include "TimeStamp.hh" #include "EventLoop.hh" #include "Channel.hh" struct pollfd; class Poller{ public: typedef std::vector<Channel*> ChannelList; Poller(EventLoop* loop); ~Poller(); TimeStamp poll(int timeoutMs, ChannelList* activeChannels); void updateChannel(Channel* channel); void assertInLoopThread() { m_pOwerLoop->assertInLoopThread(); } private: Poller& operator=(const Poller&); Poller(const Poller&); void fillActiveChannels(int numEvents, ChannelList* activeChannels) const; typedef std::vector<struct pollfd> PollFdList; typedef std::map<int, Channel*> ChannelMap; EventLoop* m_pOwerLoop; PollFdList m_pollfds; ChannelMap m_channels; }; #endif //Poller.cpp #include "Poller.hh" #include "Logger.hh" #include <assert.h> #include <poll.h> #include <signal.h> Poller::Poller(EventLoop* loop) : m_pOwerLoop(loop) { } Poller::~Poller() { } TimeStamp Poller::poll(int timeoutMs, ChannelList* activeChannels) { LOG_TRACE << "Poller::poll()"; int numEvents = ::poll(/*&*m_pollfds.begin()*/m_pollfds.data(), m_pollfds.size(), timeoutMs); TimeStamp now(TimeStamp::now()); if(numEvents > 0){ LOG_TRACE << numEvents << " events happended"; fillActiveChannels(numEvents, activeChannels); } else if(numEvents == 0){ LOG_TRACE << " nothing happended"; } else{ LOG_SYSERR << "Poller::poll()"; } return now; } /* *fillActiveChannels()遍歷m_pollfds, 找出有活動事件的fd, 把它對應 *的Channel填入activeChannels。 */ void Poller::fillActiveChannels(int numEvents, ChannelList* activeChannels) const { for(PollFdList::const_iterator pfd = m_pollfds.begin(); pfd != m_pollfds.end() && numEvents > 0; ++pfd) { if(pfd->revents > 0) { --numEvents; ChannelMap::const_iterator ch = m_channels.find(pfd->fd); assert(ch != m_channels.end()); Channel* channel = ch->second; assert(channel->fd() == pfd->fd); channel->set_revents(pfd->revents); activeChannels->push_back(channel); } } } void Poller::updateChannel(Channel* channel) { assertInLoopThread(); LOG_TRACE << "fd= " << channel->fd() << " events" << channel->events(); if(channel->index() < 0){ //a new one , add to pollfds assert(m_channels.find(channel->fd()) == m_channels.end()); struct pollfd pfd; pfd.fd = channel->fd(); pfd.events = static_cast<short>(channel->events()); pfd.revents = 0; m_pollfds.push_back(pfd); int idx = static_cast<int>(m_pollfds.size()) - 1; channel->set_index(idx); m_channels[pfd.fd] = channel; } else{ //update existing one assert(m_channels.find(channel->fd()) != m_channels.end()); assert(m_channels[channel->fd()] == channel); int idx = channel->index(); assert(0 <= idx && idx < static_cast<int>(m_pollfds.size())); struct pollfd& pfd = m_pollfds[idx]; assert(pfd.fd == channel->fd() || pfd.fd == -1); pfd.events = static_cast<short>(channel->events()); pfd.revents = 0; if(channel->isNoneEvent()){ //ignore this pollfd pfd.fd = -1; } } }
EventLopp在上一篇文章寫過,這裏給出改動.
EventLoop 新增了quit()成員函數, 還加了幾個數據成員,並在構造函數裏初始化它們。注意EventLoop經過智能指針來間接持有poller.
+class Poller; +class Channel; class EventLoop ------------ bool isInloopThread() const {return m_threadId == CurrentThread::tid(); } +void quit(); +void updateChannel(Channel* channel); static EventLoop* getEventLoopOfCurrentThread(); private: EventLoop& operator=(const EventLoop&); EventLoop(const EventLoop&); void abortNotInLoopThread(); +typedef std::vector<Channel*> ChannelList; bool m_looping; +bool m_quit; const pid_t m_threadId; +std::unique_ptr<Poller> m_poller; +ChannelList m_activeChannels; }; //EventLoop.cpp m_threadId(CurrentThread::tid()), + m_poller(new Poller(this)) { ------ +void EventLoop::quit() +{ + m_quit = true; + //wakeup(); +} + +void EventLoop::updateChannel(Channel* channel) +{ + assert(channel->ownerLoop() == this); + assertInLoopThread(); + m_poller->updateChannel(channel); +}
上一篇文章的EventLoop->loop()什麼也沒作,如今它有了實實在在的使命,它調用Poller::poll()得到當前活動事件的Chanel列表, 而後依次調用每一個Channel的handleEvent()函數
void EventLoop::loop() { assert(!m_looping); assertInLoopThread(); m_looping = true; m_quit = false; LOG_TRACE << "EventLoop " << this << " start loopig"; while(!m_quit) { m_activeChannels.clear(); m_poller->poll(1000, &m_activeChannels); for(ChannelList::iterator it = m_activeChannels.begin(); it != m_activeChannels.end(); ++it) { (*it)->handleEvent(); } } LOG_TRACE << "EventLoop " << this << " stop loopig"; m_looping = false; }
程序利用timerfd_create 把時間變成了一個文件描述符,該「文件」在定時器超時的那一刻變得可讀,這樣就能很方便地融入到 select/poll 框架中,用統一的方式來處理 IO 事件和超時事件,這也正是 Reactor 模式的長處。
#include <errno.h> #include <thread> #include <strings.h> #include "EventLoop.hh" #include "Channel.hh" #include "Poller.hh" //Reactor Test //單次觸發定時器 #include <sys/timerfd.h> EventLoop* g_loop; void timeout() { printf("timeout!\n"); g_loop->quit(); } int main() { EventLoop loop; g_loop = &loop; int timerfd = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK |TFD_CLOEXEC); Channel channel(&loop, timerfd); channel.setReadCallBack(timeout); channel.eableReading(); struct itimerspec howlong; bzero(&howlong, sizeof howlong); howlong.it_value.tv_sec = 3; timerfd_settime(timerfd, 0, &howlong, NULL); loop.loop(); close(timerfd); }
./test.out 2018-10-31 22:25:54.532487 [TRACE] [EventLoop.cpp:16] [EventLoop] EventLoop Create 0x7FFEB9567CC0 in thread 3075 2018-10-31 22:25:54.533563 [TRACE] [Poller.cpp:64] [updateChannel] fd= 3 events3 2018-10-31 22:25:54.534000 [TRACE] [EventLoop.cpp:41] [loop] EventLoop 0x7FFEB9567CC0 start loopig 2018-10-31 22:25:54.534334 [TRACE] [Poller.cpp:20] [poll] Poller::poll() 2018-10-31 22:25:55.535827 [TRACE] [Poller.cpp:28] [poll] nothing happended 2018-10-31 22:25:55.536287 [TRACE] [Poller.cpp:20] [poll] Poller::poll() 2018-10-31 22:25:56.538334 [TRACE] [Poller.cpp:28] [poll] nothing happended 2018-10-31 22:25:56.538802 [TRACE] [Poller.cpp:20] [poll] Poller::poll() 2018-10-31 22:25:57.534175 [TRACE] [Poller.cpp:24] [poll] 1 events happended timeout! 2018-10-31 22:25:57.534766 [TRACE] [EventLoop.cpp:55] [loop] EventLoop 0x7FFEB9567CC0 stop loopig