是selectable IO channel
,自始至終只負責一個 fd 的(註冊與響應) IO 事件,可是不擁有該 fd ,因此也就在析構的時候不關閉它.數組
所屬的 loop
,及其要處理的 fd;接着註冊 fd 上須要監聽的事件,若是是經常使用的讀寫事件的話,能夠直接調用接口函數enableReading
來註冊對應fd上的事件,disable*是銷燬指定的事件;而後經過 setCallback
,最終向 poll 系統調用的監聽事件表註冊或修改事件。框架
class EventLoop; /// /// A selectable I/O channel. /// /// This class doesn't own the file descriptor. /// The file descriptor could be a socket, /// an eventfd, a timerfd, or a signalfd class Channel { public: typedef std::function<void()> EventCallback; Channel(EventLoop *loop, int fd) : loop_(loop), fd_(fdArg), events_(0), revents_(0), index_(-1) { } //處理事件,通常由Poller經過EventLoop來調用 //eventloop中有一個vector<Channel *>的 activeChannels_ 活動數組,天然能夠調用它 //根據revents_ 的事件類型,執行相應的回調 void handleEvent() { if (revents_ & POLLNVAL) { LOG_WARN << "Channel::handle_event() POLLNVAL"; } if (revents_ & (POLLERR | POLLNVAL)) { if (errorCallback_) errorCallback_(); } if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)) { if (readCallback_) readCallback_(); } if (revents_ & POLLOUT) { if (writeCallback_) writeCallback_(); } } void setReadCallback(const EventCallback &cb) { readCallback_ = cb; } void setWriteCallback(const EventCallback &cb) { writeCallback_ = cb; } void setErrorCallback(const EventCallback &cb) { errorCallback_ = cb; } int fd() const { return fd_; } /* 返回 fd 註冊的事件 */ int events() const { return events_; } void set_revents(int revt) { revents_ = revt; } bool isNoneEvent() const { return events_ == kNoneEvent; } void enableReading() { events_ |= kReadEvent; update(); } // void enableWriting() { events_ |= kWriteEvent; update(); } // void disableWriting() { events_ &= ~kWriteEvent; update(); } // void disableAll() { events_ = kNoneEvent; update(); } // for Epoller int index() { return index_; } void set_index(int idx) { index_ = idx; } EventLoop *ownerLoop() { return loop_; } private: /* 經過調用loop_->updateChannel()來註冊或改變本fd在epoll中監聽的事件 */ void update() { loop_->updateChannel(this); } static const int kNoneEvent=0; static const int kReadEvent=POLLIN | POLLPRI; static const int kWriteEvent =POLLOUT; EventLoop *loop_; const int fd_; int events_; //關注的事件 int revents_; //poll/epoll中返回的事件 int index_; // used by epoller. 表示在poll的事件數組中的序號 EventCallback readCallback_; EventCallback writeCallback_; EventCallback errorCallback_; };
// 在咱們這裏將其直接寫爲一個具體類socket
Poller是個基類,具體能夠是EPollPoller(默認) 或者PollPoller,對應 poll 和 epoll.須要去實現(惟一使用面向對象的一個類)
函數調用 epoll_wait/poll
函數將返回的就緒事件裝入 activeChannels
namespace muduo { class Channel; /// /// IO Multiplexing with poll(2). /// /// This class doesn't own the Channel objects. class Poller : boost::noncopyable { public: typedef std::vector<Channel*> ChannelList; Poller(EventLoop* loop); //須要傳入EventLoop Objetct ~Poller(); /// Polls the I/O events. /// Must be called in the loop thread. // 核心功能,調用 epoll_wait/poll 來監聽註冊了的文件描述符 /* Channel::update()->EventLoop::updateChannel(Channel* channel)->Poller::updateChannel(Channel* channel)*/ Timestamp poll(int timeoutMs, ChannelList* activeChannels); /// Changes the interested I/O events. /// Must be called in the loop thread. //負責維護和更新 pollfds_ 數組 void updateChannel(Channel* channel); /* 斷言 確保沒有跨線程 */ void assertInLoopThread() { ownerLoop_->assertInLoopThread(); } private: //真正填充 activeChannels 的函數 void fillActiveChannels(int numEvents, ChannelList* activeChannels) const; typedef std::vector<struct pollfd> PollFdList; typedef std::map<int, Channel*> ChannelMap;//key是文件描述符,value是Channel* EventLoop* ownerLoop_; PollFdList pollfds_; ChannelMap channels_; }; }
Poller::Poller(EventLoop* loop) : ownerLoop_(loop) { } Poller::~Poller() { } Timestamp Poller::poll(int timeoutMs, ChannelList* activeChannels) { // XXX pollfds_ shouldn't change int numEvents = ::poll(&*pollfds_.begin(), pollfds_.size(), timeoutMs); Timestamp now(Timestamp::now()); if (numEvents > 0) { LOG_TRACE << numEvents << " events happended"; fillActiveChannels(numEvents, activeChannels); } else if (numEvents == 0) { LOG_TRACE << " nothing happended"; } else { LOG_SYSERR << "Poller::poll()"; } return now; } void Poller::fillActiveChannels(int numEvents, ChannelList* activeChannels) const { for (PollFdList::const_iterator pfd = pollfds_.begin(); pfd != pollfds_.end() && numEvents > 0; ++pfd) { if (pfd->revents > 0) { --numEvents; ChannelMap::const_iterator ch = channels_.find(pfd->fd); assert(ch != channels_.end()); Channel* channel = ch->second; assert(channel->fd() == pfd->fd); channel->set_revents(pfd->revents); // pfd->revents = 0; activeChannels->push_back(channel); } } } void Poller::updateChannel(Channel* channel) { assertInLoopThread(); LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events(); if (channel->index() < 0) { // muduo 使用了 index 去讓channel 記住他在pollfds_的下標 // index < 0說明是一個新的通道 // a new one, add to pollfds_ assert(channels_.find(channel->fd()) == channels_.end()); struct pollfd pfd; pfd.fd = channel->fd(); = static_cast<short>(channel->events()); pfd.revents = 0; pollfds_.push_back(pfd); int idx = static_cast<int>(pollfds_.size())-1; channel->set_index(idx); channels_[pfd.fd] = channel; } else { // update existing one assert(channels_.find(channel->fd()) != channels_.end()); assert(channels_[channel->fd()] == channel); int idx = channel->index(); assert(0 <= idx && idx < static_cast<int>(pollfds_.size())); struct pollfd& pfd = pollfds_[idx]; assert(pfd.fd == channel->fd() || pfd.fd == -1); = static_cast<short>(channel->events()); pfd.revents = 0; // 將一個通道暫時更改成不關注事件,但不從Poller中移除該通道 if (channel->isNoneEvent()) { // ignore this pollfd pfd.fd = -1; } } }
模式的核心,一個線程一個事件循環,即one loop per thread
對象的生命週期一般與其所屬的線程同樣長。EventLoop對象構造的時候,會檢查當前線程是否已經建立了其餘EventLoop對象,若是已建立,終止程序(LOG_FATAL),EventLoop類的構造函數會記錄本對象所屬線程(threadld_),建立了EventLoop對象的線程稱爲IO線程.其主要功能是運行事件循環,等待事件發生,而後調用回調處理髮生的事件。EventLoop::loop() -> Poller::poll()
填充就緒事件集合 activeChannels
,而後遍歷該容器,執行每一個 channel
的 Channel::handleEvent()
class Channel; class Poller; class EventLoop : boost::noncopyable { public: EventLoop(); // force out-line dtor, for scoped_ptr members. ~EventLoop(); /// /// Loops forever. /// 核心 /// Must be called in the same thread as creation of the object. ///**`EventLoop::loop() -> Poller::poll() `填充就緒事件集合 `activeChannels`, //而後遍歷該容器,執行每一個 `channel `的 `Channel::handleEvent()` // 完成對應就緒事件回調。** void loop(); void quit(); // internal use only void updateChannel(Channel* channel); // void removeChannel(Channel* channel); void assertInLoopThread() { if (!isInLoopThread()) { abortNotInLoopThread(); } } bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); } private: void abortNotInLoopThread(); typedef std::vector<Channel*> ChannelList; bool looping_; /* atomic */ bool quit_; /* atomic */ const pid_t threadId_; //線程ID boost::scoped_ptr<Poller> poller_; ChannelList activeChannels_; }; }
using namespace muduo; __thread EventLoop* t_loopInThisThread = 0; const int kPollTimeMs = 10000; EventLoop::EventLoop() : looping_(false), quit_(false), threadId_(CurrentThread::tid()), poller_(new Poller(this)) { LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; if (t_loopInThisThread) { LOG_FATAL << "Another EventLoop " << t_loopInThisThread << " exists in this thread " << threadId_; } else { t_loopInThisThread = this; } } EventLoop::~EventLoop() { assert(!looping_); t_loopInThisThread = NULL; } void EventLoop::loop() { assert(!looping_); assertInLoopThread(); looping_ = true; quit_ = false; while (!quit_) { activeChannels_.clear(); poller_->poll(kPollTimeMs, &activeChannels_); for (ChannelList::iterator it = activeChannels_.begin(); it != activeChannels_.end(); ++it) { (*it)->handleEvent(); } } LOG_TRACE << "EventLoop " << this << " stop looping"; looping_ = false; } void EventLoop::quit() { quit_ = true; // wakeup(); } void EventLoop::updateChannel(Channel* channel) { assert(channel->ownerLoop() == this); assertInLoopThread(); poller_->updateChannel(channel); } void EventLoop::abortNotInLoopThread() { LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this << " was created in threadId_ = " << threadId_ << ", current thread id = " << CurrentThread::tid(); }
#include "Channel.h" #include "EventLoop.h" #include <stdio.h> #include <sys/timerfd.h> muduo::EventLoop *g_loop; void timeout() { printf("Timeout!\n"); g_loop->quit(); } int main() { muduo::EventLoop loop; g_loop = &loop; int timerfd = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);//建立一個新的定時器對象 muduo::Channel channel(&loop, timerfd); channel.setReadCallback(timeout); channel.enableReading(); struct itimerspec howlong; bzero(&howlong, sizeof howlong); howlong.it_value.tv_sec = 5; ::timerfd_settime(timerfd, 0, &howlong, NULL); loop.loop(); ::close(timerfd); }
Linux 的計時函數,用於得到當前時間:
定時函數,用於讓程序等待一段時間或安排計劃任務: sleep alarm usleep nanosleep clock_nanosleep getitimer / setitimer timer_create / timer_settime / timer_gettime / timer_delete timerfd_create / timerfd_gettime / timerfd_settime
是 int32_t
timerfd_* 入選的緣由:
sleep / alarm / usleep 在實現時有可能用了信號 SIGALRM,在多線程程序中處理信號是個至關麻煩的事情,應當儘可能避免。(近期我會寫一篇博客仔細講講「多線程、RAII、fork() 與信號」)
nanosleep 和 clock_nanosleep 是線程安全的,可是在非阻塞網絡編程中,絕對不能用讓線程掛起的方式來等待一段時間,程序會失去響應。正確的作法是註冊一個時間回調函數。
getitimer 和 timer_create 也是用信號來 deliver 超時,在多線程程序中也會有麻煩。timer_create 能夠指定信號的接收方是進程仍是線程,算是一個進步,不過在信號處理函數(signal handler)能作的事情實在很受限。
把時間變成了一個文件描述符,該「文件」在定時器超時的那一刻變得可讀,這樣就能很方便地融入到 select/poll
框架中,用統一的方式來處理 IO 事件和超時事件,這也正是 Reactor
傳統的 Reactor 利用 select/poll/epoll 的 timeout 來實現定時功能,但 poll 和 epoll 的定時精度只有毫秒,遠低於 timerfd_settime 的定時精度。
惟一標識一個 Timer 定時器。TimerId Class 同時保存Timer* 和 sequence_,這個 sequence_ 是每一個 Timer 對象有一個全局遞增的序列號 int64_t sequence_,用原子計數器(AtomicInt64)生成
它主要用於註銷定時器,int64_t sequence_ 能夠區分地址相同的前後兩個 Timer 對象。下面代碼先忽略int64_t sequence_,也就是先不實現cancel 的接口,咱們在這裏只理解其工做方式便可
TimreId 類
namespace muduo { class Timer; /// /// An opaque identifier, for canceling Timer. /// class TimerId : public muduo::copyable { public: explicit TimerId(Timer *timer) : value_(timer) { } // default copy-ctor, dtor and assignment are okay private: Timer *value_; };
封裝了定時器的一些參數,包括超時時間(expiration_)、超時回調函數(callback_)、時間間隔(interval_)、是否重複定時(repeat_)、定時器的序列號等成員變量,成員函數大都是返回這些變量的值,run() 用來調用回調函數,restart() 用來重啓定時器。
namespace muduo { /// /// Internal class for timer event. /// class Timer : boost::noncopyable { public: Timer(const TimerCallback& cb, Timestamp when, double interval) : callback_(cb), expiration_(when), interval_(interval), repeat_(interval > 0.0) { } void run() const { callback_();//執行定時器回調函數 } Timestamp expiration() const { return expiration_; } /* 是否週期性定時 */ bool repeat() const { return repeat_; } /* 重啓定時器 */ void restart(Timestamp now) { if (repeat_) { //若是須要重複,那就將時間設爲下次超時的時間 expiration_ = addTime(now, interval_); } else { //若是不須要重複,那就將超時時間設爲一個不可用的 value expiration_ = Timestamp::invalid(); } } private: const TimerCallback callback_; //回調函數 Timestamp expiration_; //時間戳 const double interval_;//時間間隔,若是是一次性定時器,該值爲0 const bool repeat_;//是否重複執行 }; }
typedef std::pair<Timestamp, Timer*> Entry; typedef std::set<Entry> TimerList;
TimerQueue 定時器容器類中須要處理兩個 Timer 的超時時間相同的問題,因此能夠用multimap
經過給 timerfd
一個超時時間實現超時計時,它內部有 Channel
,經過 Channel
管理 timerfd
註冊 timerfd
的可讀事件,當 timerfd
調用可讀事件回調 handleRead()
,經過 getExpired()
找出全部的超時事件,而後執行相應的超時回調函數 Timer::run()
timerfd 如何實現多個定時器超時計時的呢?就是在插入的時候與set 元素比較,而後更新timerfd,從而保證 timerfd 始終是 set 中最近的一個超時時間.當 timerfd 可讀時,仍是須要遍歷容器,由於有可能此時有多個 Timer 超時了(儘管 tiemrfd 是當前最小的超時時間).唉,何須這麼麻煩吶,直接用時間堆管理很差嗎?timerfd == 堆頂,不過,我學到的是仍是須要遍歷容器(堆)的
namespace muduo { class EventLoop; class Timer; class TimerId; /// /// A best efforts timer queue. /// No guarantee that the callback will be on time. /// class TimerQueue : boost::noncopyable { public: TimerQueue(EventLoop* loop); ~TimerQueue(); /// /// Schedules the callback to be run at given time, /// repeats if @c interval > 0.0. /// /// Must be thread safe. Usually be called from other threads. TimerId addTimer(const TimerCallback& cb, Timestamp when, double interval); // void cancel(TimerId timerId); private: // FIXME: use unique_ptr<Timer> instead of raw pointers. typedef std::pair<Timestamp, Timer*> Entry; typedef std::set<Entry> TimerList; void addTimerInLoop(Timer* timer); // called when timerfd alarms void handleRead(); // move out all expired timers std::vector<Entry> getExpired(Timestamp now); void reset(const std::vector<Entry>& expired, Timestamp now); bool insert(Timer* timer); EventLoop* loop_; const int timerfd_; Channel timerfdChannel_; // Timer list sorted by expiration TimerList timers_; }; } #endif // MUDUO_NET_TIMERQUEUE_H
#include "datetime/Timestamp.h" #include "thread/Thread.h" #include "Callbacks.h" #include "TimerId.h" #include <boost/scoped_ptr.hpp> #include <vector> namespace muduo { class Channel; class Poller; class TimerQueue; class EventLoop : boost::noncopyable { public: EventLoop(); // force out-line dtor, for scoped_ptr members. ~EventLoop(); /// /// Loops forever. /// /// Must be called in the same thread as creation of the object. /// void loop(); void quit(); /// /// Time when poll returns, usually means data arrivial. /// Timestamp pollReturnTime() const { return pollReturnTime_; } // timers /// /// Runs callback at 'time'. /// TimerId runAt(const Timestamp& time, const TimerCallback& cb); /// /// Runs callback after @c delay seconds. /// TimerId runAfter(double delay, const TimerCallback& cb); /// /// Runs callback every @c interval seconds. /// TimerId runEvery(double interval, const TimerCallback& cb); // void cancel(TimerId timerId); // internal use only void updateChannel(Channel* channel); // void removeChannel(Channel* channel); void assertInLoopThread() { if (!isInLoopThread()) { abortNotInLoopThread(); } } bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); } private: void abortNotInLoopThread(); typedef std::vector<Channel*> ChannelList; bool looping_; /* atomic */ bool quit_; /* atomic */ const pid_t threadId_; Timestamp pollReturnTime_; boost::scoped_ptr<Poller> poller_; boost::scoped_ptr<TimerQueue> timerQueue_; ChannelList activeChannels_; }; } #endif // MUDUO_NET_EVENTLOOP_H
using namespace muduo; __thread EventLoop* t_loopInThisThread = 0; const int kPollTimeMs = 10000; EventLoop::EventLoop() : looping_(false), quit_(false), threadId_(CurrentThread::tid()), poller_(new Poller(this)), timerQueue_(new TimerQueue(this)) { LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; if (t_loopInThisThread) { LOG_FATAL << "Another EventLoop " << t_loopInThisThread << " exists in this thread " << threadId_; } else { t_loopInThisThread = this; } } EventLoop::~EventLoop() { assert(!looping_); t_loopInThisThread = NULL; } void EventLoop::loop() { assert(!looping_); assertInLoopThread(); looping_ = true; quit_ = false; while (!quit_) { activeChannels_.clear(); pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); for (ChannelList::iterator it = activeChannels_.begin(); it != activeChannels_.end(); ++it) { (*it)->handleEvent(); } } LOG_TRACE << "EventLoop " << this << " stop looping"; looping_ = false; } void EventLoop::quit() { quit_ = true; // wakeup(); } TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb) { return timerQueue_->addTimer(cb, time, 0.0); } TimerId EventLoop::runAfter(double delay, const TimerCallback& cb) { Timestamp time(addTime(Timestamp::now(), delay)); return runAt(time, cb); } TimerId EventLoop::runEvery(double interval, const TimerCallback& cb) { Timestamp time(addTime(Timestamp::now(), interval)); return timerQueue_->addTimer(cb, time, interval); } void EventLoop::updateChannel(Channel* channel) { assert(channel->ownerLoop() == this); assertInLoopThread(); poller_->updateChannel(channel); } void EventLoop::abortNotInLoopThread() { LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this << " was created in threadId_ = " << threadId_ << ", current thread id = " << CurrentThread::tid(); }