目錄linux
TimerQueue是EventLoop的組件之一,能夠提供定時任務,和週期任務。git
本章首先會簡述關於timerfd系統定時函數的基本使用,和TimerQueue類的封裝結構,最後給出TimerQueue::addTimer()接口的時序圖與使用例子.github
·time(2) / time_t(秒)
·ftime(3) / struct timeb(毫秒)
·gettimeofday(2) / struct timeval(微秒)
·clock_gettime(2) / struct timespec(納秒)
還有gmtime / localtime / timegm / mktime / strftime / struct tm等與當
前時間無關的時間格式轉換函數。
定時函數, 用於讓程序等待一段時間或安排計劃任務:·sleep(3)
·alarm(2)
·usleep(3)
·nanosleep(2)
·clock_nanosleep(2)
·getitimer(2) / setitimer(2)
·timer_create(2) / timer_settime(2) / timer_gettime(2) / timer_delete(2)
·timerfd_create(2) / timerfd_gettime(2) / timerfd_settime(2)編程
muduo中作的取捨以下
·(計時) 只使用gettimeofday(2)來獲取當前時間。
·(定時) 只使用timerfd_*系列函數來處理定時任務。安全
gettimeofday(2)入選緣由(這也是muduo::Timestamp class的主要設計考慮) :
1. time(2)的精度過低, ftime(3)已被廢棄; clock_gettime(2)精度最高, 可是其系統調用的開銷比gettimeofday(2)大。
2. 在x86-64平臺上, gettimeofday(2)不是系統調用, 而是在用戶態實現的, 沒有上下文切換和陷入內核的開銷32。
3. gettimeofday(2)的分辨率(resolution) 是1微秒, 如今的實現確實能達到這個計時精度, 足以知足平常計時的須要。 muduo::Timestamp用一個int64_t來表示從Unix Epoch到如今的微秒數, 其範圍可達上下30萬年。網絡
timerfd_*入選的緣由:
1. sleep(3) / alarm(2) / usleep(3)在實現時有可能用了SIGALRM信號, 在多線程程序中處理信號是個至關麻煩的事情, 應當儘可能避免, 再說, 若是主程序和程序庫都使用SIGALRM, 就糟糕了。
2. nanosleep(2)和clock_nanosleep(2)是線程安全的, 可是在非阻塞網絡編程中, 絕對不能用讓線程掛起的方式來等待一段時間, 這樣一來程序會失去響應。 正確的作法是註冊一個時間回調函數。
3. getitimer(2)和timer_create(2)也是用信號來deliver超時,在多線程程序中也會有麻煩。timer_create(2)能夠指定信號的接收方是進程仍是線程, 算是一個進步, 不過信號處理函數(signal handler) 能作的事情實在很受限。
4.timerfd_create(2)
把時間變成了一個文件描述符, 該「文件」在定時器超時的那一刻變得可讀, 這樣就能很方便地融入select(2)/poll(2)框架中, 用統一的方式來處理IO事件和超時事件, 這也正是Reactor模式的長處。
5. 傳統的Reactor利用select(2)/poll(2)/epoll(4)的timeout來實現定時功能, 但poll(2)和epoll_wait(2)的定時精度只有毫秒,遠低於timerfd_settime(2)的定時精度。多線程
本章使用到的兩個系統函數:app
#include <sys/timerfd.h> int timerfd_create(int clockid, int flags); int timerfd_settime(int fd, int flags, const struct itimerspec *new_value,struct itimerspec *old_value);
一、timerfd_create函數生成一個定時器,返回與之關聯的文件描述,其中的clockid能夠設成CLOCK_REALTIME和CLOCK_MONOTONIC
CLOCK_REALTIME
:系統實時時間,隨系統實時時間改變而改變,即從UTC1970-1-1 0:0:0開始計時,中間時刻若是系統時間被用戶改爲其餘,則對應的時間相應改變
CLOCK_MONOTONIC
:從系統啓動這一刻起開始計時,不受系統時間被用戶改變的影響框架
二、timerfd_settime用於啓停定時器,new_value爲超時時間,old_value爲週期性定時時間,爲0表示不進行週期性定時.函數
struct timespec { time_t tv_sec; /* Seconds */ long tv_nsec; /* Nanoseconds */ }; struct itimerspec { struct timespec it_interval; /* Interval for periodic timer */ struct timespec it_value; /* Initial expiration */ };
經過select 監聽timerfd,可讀時代表到達定時時間.
#include <sys/timerfd.h> #include <sys/select.h> /* According to earlier standards */ #include <sys/time.h> #include <sys/types.h> #include <unistd.h> int main() { int timerfd = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK |TFD_CLOEXEC); struct itimerspec howlong; bzero(&howlong, sizeof howlong); howlong.it_value.tv_sec = 3; timerfd_settime(timerfd, 0, &howlong, NULL); fd_set rdset; FD_ZERO(&rdset); FD_SET(timerfd, &rdset); struct timeval timeout; timeout.tv_sec = 1; timeout.tv_usec = 0; while(1) { if(select(timerfd + 1, &rdset, NULL, NULL, &timeout) == 0) { std::cout << "timeout\n"; timeout.tv_sec = 1; timeout.tv_usec = 0; FD_SET(timerfd, &rdset); continue; } std::cout << " timer happend\n"; break; } close(timerfd); return 0; } /* print timeout timeout timer happend */
int createTimerfd() { int timerfd = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); if (timerfd < 0) { std::cout << "Failed in timerfd_create" << std::endl; abort(); } return timerfd; } struct timespec howMuchTimeFromNow(TimeStamp when) { int64_t microseconds = when.microSecondsSinceEpoch() - TimeStamp::now().microSecondsSinceEpoch(); if (microseconds < 100) { microseconds = 100; } struct timespec ts; ts.tv_sec = static_cast<time_t>( microseconds / TimeStamp::kMicroSecondsPerSecond); ts.tv_nsec = static_cast<long>( (microseconds % TimeStamp::kMicroSecondsPerSecond) * 1000); return ts; } void readTimerfd(int timerfd, TimeStamp now) { uint64_t howmany; ssize_t n = ::read(timerfd, &howmany, sizeof howmany); std::cout << "TimerQueue::handleRead() " << howmany << " at " << now.toString() << std::endl; if (n != sizeof howmany) { std::cout << "TimerQueue::handleRead() reads " << n << " bytes instead of 8" << std::endl; } } void resetTimerfd(int timerfd, TimeStamp expiration) { // wake up loop by timerfd_settime() struct itimerspec newValue; struct itimerspec oldValue; bzero(&newValue, sizeof newValue); bzero(&oldValue, sizeof oldValue); newValue.it_value = howMuchTimeFromNow(expiration); int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue); if (ret) { std::cout << "timerfd_settime()" << std::endl; } }
TimerQueue只提供了兩個對外的接口
addTimer()
添加一個定時器超時執行回調函數cb. interval是週期性定時時間,本章不講,置零不打開.
cancel() 取消一個定時器,本章也不細述,本章只介紹addTimer()接口.後面若是須要會補充本章.
TimerQueue定義以下
class TimerQueue { public: TimerQueue(EventLoop* loop); ~TimerQueue(); // Schedules the callback to be run at given time, TimerId addTimer(const NetCallBacks::TimerCallBack& cb, TimeStamp when, double interval = 0.0); void cancel(TimerId timerId); private: /* ..... */ EventLoop* p_loop; const int m_timerfd; Channel m_timerfdChannel; //Timer List sorted by expiration TimerList m_timers; ActiveTimerSet m_activeTimers; };
雖然只提供了兩個對外接口,可是私有成比較複雜,首先簡介成員Timer.
Timer類做爲TimerQueue的內部成員使用,封裝一個定時器,addTimer()接口調用後生成一個Timer(),外頭的回調函數和定時反應時間會封裝再此Timer中.
Tiemr主要接口都是獲取這些構造時配置的成員值
run() 調用回調函數.
expiration 定時器過時時間.
interval 週期性定時時間.
repeat 是不是週期性定時.
sequence 一個靜態成員,用於記錄Timer建立的個數. 爲了保證它的線程安全性,使用AtomicInt64封裝了一層原子操做.
TimerId 用於Cancel Timer的標誌Id, 這兩個類都不復雜,可自行參看muduo源碼,也可翻我github上的SimpleMuduo單獨類文件夾下的測試代碼.
Timer(const NetCallBacks::TimerCallBack& cb, TimeStamp when, double interval) :m_callBack(cb), m_expiration(when), m_interval(interval), m_repeat(interval > 0.0), m_sequence(s_numCreated.incrementAndGet())
TimerId TimerQueue::addTimer(const NetCallBacks::TimerCallBack& cb, TimeStamp when, double interval) { Timer* timer = new Timer(cb, when, interval); //p_loop->runInLoop(std::bind(&TimerQueue::addTimerInLoop, this, timer)); return TimerId(timer, timer->sequence()); }
typedef std::pair<TimeStamp, Timer*> Entry; typedef std::set<Entry> TimerList; typedef std::pair<Timer*, int64_t> ActiveTimer; typedef std::set<ActiveTimer> ActiveTimerSet;
爲了解決沒法處理兩個Timer到期時間相同的狀況。使用了pair將時間戳和Timer的地址組成了一對.而後使用Set存儲.
ActiveTimer 將Timer和sequence組成一對主要做用來索引迭代的.
void addTimerInLoop(Timer* timer); void cancelInLoop(TimerId timerId); //called when timerfd alarms void handleRead(); //move out all expired timers and return they. std::vector<Entry> getExpired(TimeStamp now); bool insert(Timer* timer); void reset(const std::vector<Entry>& expired, TimeStamp now);
bool insert(Timer* timer); //插入一個定時器.
EventLoop添加了一個runInLoop接口: 在它的IO線程內執行某個用戶
任務回調, 即EventLoop::runInLoop(const Functor& cb), 其中Functor是
std::function<void()>。 若是用戶在當前IO線程調用這個函數, 回調會
同步進行; 若是用戶在其餘線程調用runInLoop(), cb會被加入隊列, IO
線程會被喚醒來調用這個Functor。
addTimer()使用了EventLoop的runInLoop接口(這個接口主要來保證線程安全性的,暫不講),來執行addTimerInLoop().addTimerInLoop()調用insert()插入定時器.
若是此定時器是最先觸發的那一個則會調用resetTimerfd()->timerfd_settime()啓動定時器.
addTimer()->addTimerInLoop()->insert()->resetTimerfd()->timerfd_settime()
添加邏輯就在上面了,下面給出處理邏輯.
三個接口.
void handleRead(); //定時器觸發回調.獲取已過時的事件並處理隨後更新列表.
std::vector
getExpired
(TimeStamp now);//獲取一組已過時的定時器,並從TimerList中刪除.
void reset(const std::vector
handleRead()->getExpired()->Timer->run(cb)->reset()->resetTimerfd()->timerfd_settime()
#ifndef _NET_TIMERQUEUE_HH #define _NET_TIMERQUEUE_HH #include "TimerId.hh" #include "CallBacks.hh" #include "TimeStamp.hh" #include "Channel.hh" #include <set> #include <vector> class EventLoop; class TimerQueue { public: TimerQueue(EventLoop* loop); ~TimerQueue(); // Schedules the callback to be run at given time, TimerId addTimer(const NetCallBacks::TimerCallBack& cb, TimeStamp when, double interval = 0.0); void cancel(TimerId timerId); private: typedef std::pair<TimeStamp, Timer*> Entry; typedef std::set<Entry> TimerList; typedef std::pair<Timer*, int64_t> ActiveTimer; typedef std::set<ActiveTimer> ActiveTimerSet; void addTimerInLoop(Timer* timer); void cancelInLoop(TimerId timerId); //called when timerfd alarms void handleRead(); //move out all expired timers and return they. std::vector<Entry> getExpired(TimeStamp now); bool insert(Timer* timer); void reset(const std::vector<Entry>& expired, TimeStamp now); EventLoop* p_loop; const int m_timerfd; Channel m_timerfdChannel; //Timer List sorted by expiration TimerList m_timers; ActiveTimerSet m_activeTimers; bool m_callingExpiredTimers; /*atomic*/ ActiveTimerSet m_cancelingTimers; }; #endif //TimerQueeu.cpp #include <stdint.h> #include <assert.h> #include <sys/timerfd.h> #include <unistd.h> #include "Logger.hh" #include "EventLoop.hh" #include "Timer.hh" #include "TimerQueue.hh" namespace TimerFd { int createTimerfd() { int timerfd = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); if (timerfd < 0) { LOG_SYSFATAL << "Failed in timerfd_create"; } return timerfd; } struct timespec howMuchTimeFromNow(TimeStamp when) { int64_t microseconds = when.microSecondsSinceEpoch() - TimeStamp::now().microSecondsSinceEpoch(); if (microseconds < 100) { microseconds = 100; } struct timespec ts; ts.tv_sec = static_cast<time_t>( microseconds / TimeStamp::kMicroSecondsPerSecond); ts.tv_nsec = static_cast<long>( (microseconds % TimeStamp::kMicroSecondsPerSecond) * 1000); return ts; } void readTimerfd(int timerfd, TimeStamp now) { uint64_t howmany; ssize_t n = ::read(timerfd, &howmany, sizeof howmany); LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString(); if (n != sizeof howmany) { LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8"; } } void resetTimerfd(int timerfd, TimeStamp expiration) { // wake up loop by timerfd_settime() LOG_TRACE << "resetTimerfd()"; struct itimerspec newValue; struct itimerspec oldValue; bzero(&newValue, sizeof newValue); bzero(&oldValue, sizeof oldValue); newValue.it_value = howMuchTimeFromNow(expiration); int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue); if (ret) { LOG_SYSERR << "timerfd_settime()"; } } }; using namespace TimerFd; TimerQueue::TimerQueue(EventLoop* loop) :p_loop(loop), m_timerfd(createTimerfd()), m_timerfdChannel(p_loop, m_timerfd), m_timers(), m_callingExpiredTimers(false) { m_timerfdChannel.setReadCallBack(std::bind(&TimerQueue::handleRead, this)); m_timerfdChannel.enableReading(); } TimerQueue::~TimerQueue() { m_timerfdChannel.disableAll(); m_timerfdChannel.remove(); ::close(m_timerfd); for (TimerList::iterator it = m_timers.begin(); it != m_timers.end(); ++it) { delete it->second; } } std::vector<TimerQueue::Entry> TimerQueue::getExpired(TimeStamp now) { std::vector<Entry> expired; Entry sentry = std::make_pair(now, reinterpret_cast<Timer*>UINTPTR_MAX); TimerList::iterator it = m_timers.lower_bound(sentry); assert(it == m_timers.end() || now < it->first); std::copy(m_timers.begin(), it, back_inserter(expired)); m_timers.erase(m_timers.begin(), it); for(std::vector<Entry>::iterator it = expired.begin(); it != expired.end(); ++it) { ActiveTimer timer(it->second, it->second->sequence()); size_t n = m_activeTimers.erase(timer); assert(n == 1); (void)n; } assert(m_timers.size() == m_activeTimers.size()); return expired; } TimerId TimerQueue::addTimer(const NetCallBacks::TimerCallBack& cb, TimeStamp when, double interval) { Timer* timer = new Timer(cb, when, interval); p_loop->runInLoop(std::bind(&TimerQueue::addTimerInLoop, this, timer)); return TimerId(timer, timer->sequence()); } void TimerQueue::addTimerInLoop(Timer* timer) { p_loop->assertInLoopThread(); bool earliestChanged = insert(timer); if (earliestChanged) { resetTimerfd(m_timerfd, timer->expiration()); } } void TimerQueue::cancel(TimerId timerId) { p_loop->runInLoop(std::bind(&TimerQueue::cancelInLoop, this, timerId)); } void TimerQueue::cancelInLoop(TimerId timerId) { p_loop->assertInLoopThread(); assert(m_timers.size() == m_activeTimers.size()); ActiveTimer timer(timerId.m_timer, timerId.m_sequence); ActiveTimerSet::iterator it = m_activeTimers.find(timer); if(it != m_activeTimers.end()) { size_t n = m_timers.erase(Entry(it->first->expiration(), it->first)); assert(n == 1); delete it->first; } else if (m_callingExpiredTimers) { m_cancelingTimers.insert(timer); } assert(m_timers.size() == m_activeTimers.size()); } bool TimerQueue::insert(Timer* timer) { p_loop->assertInLoopThread(); assert(m_timers.size() == m_activeTimers.size()); bool earliestChanged = false; TimeStamp when = timer->expiration(); TimerList::iterator it = m_timers.begin(); if (it == m_timers.end() || when < it->first) { earliestChanged = true; } { std::pair<TimerList::iterator, bool> result = m_timers.insert(Entry(when, timer)); assert(result.second); (void)result; } { std::pair<ActiveTimerSet::iterator, bool> result = m_activeTimers.insert(ActiveTimer(timer, timer->sequence())); assert(result.second); (void)result; } LOG_TRACE << "TimerQueue::insert() " << "m_timers.size() : " << m_timers.size() << " m_activeTimers.size() : " << m_activeTimers.size(); assert(m_timers.size() == m_activeTimers.size()); return earliestChanged; } void TimerQueue::handleRead() { p_loop->assertInLoopThread(); TimeStamp now(TimeStamp::now()); readTimerfd(m_timerfd, now); std::vector<Entry> expired = getExpired(now); LOG_TRACE << "Expired Timer size " << expired.size() << " "; m_callingExpiredTimers = true; m_cancelingTimers.clear(); for(std::vector<Entry>::iterator it = expired.begin(); it != expired.end(); ++it ) { it->second->run(); } m_callingExpiredTimers = false; reset(expired, now); } void TimerQueue::reset(const std::vector<Entry>& expired, TimeStamp now) { TimeStamp nextExpire; for(std::vector<Entry>::const_iterator it = expired.begin(); it != expired.end(); ++it) { ActiveTimer timer(it->second, it->second->sequence()); if(it->second->repeat() && m_cancelingTimers.find(timer) == m_cancelingTimers.end()) {//若是是週期定時器則從新設定時間插入. 不然delete. it->second->restart(now); insert(it->second); } else {// FIXME move to a free list no delete please delete it->second; } } if (!m_timers.empty()) { nextExpire = m_timers.begin()->second->expiration(); } if (nextExpire.valid()) { resetTimerfd(m_timerfd, nextExpire); } }
測試TimerQueue的addTimer接口.
#include <errno.h> #include <thread> #include <strings.h> #include <poll.h> #include <functional> #include "EventLoop.hh" #include "Channel.hh" #include "Poller.hh" #include "Logger.hh" #include "Timer.hh" #include "TimeStamp.hh" #include "TimerQueue.hh" EventLoop* g_loop; void print() { LOG_DEBUG << "test print()"; } void test() { LOG_DEBUG << "[test] : test timerQue"; } int main() { EventLoop loop; g_loop = &loop; TimerQueue timerQue(&loop); timerQue.addTimer(test, times::addTime(TimeStamp::now(), 3.0)); timerQue.addTimer(test, times::addTime(TimeStamp::now(), 3.0)); timerQue.addTimer(test, times::addTime(TimeStamp::now(), 5.0)); loop.loop(); return 0; }
./test.out 2018-11-11 15:49:22.493990 [TRACE] [Poller.cpp:64] [updateChannel] fd= 3 events3 2018-11-11 15:49:22.494042 [TRACE] [EventLoop.cpp:34] [EventLoop] EventLoop Create 0x7FFFBD3C39B0 in thread 3262 2018-11-11 15:49:22.494047 [TRACE] [Poller.cpp:64] [updateChannel] fd= 5 events3 2018-11-11 15:49:22.494055 [TRACE] [TimerQueue.cpp:172] [insert] TimerQueue::insert() m_timers.size() : 1 m_activeTimers.size() : 1 2018-11-11 15:49:22.494058 [TRACE] [TimerQueue.cpp:55] [resetTimerfd] resetTimerfd() 2018-11-11 15:49:22.494066 [TRACE] [TimerQueue.cpp:172] [insert] TimerQueue::insert() m_timers.size() : 2 m_activeTimers.size() : 2 2018-11-11 15:49:22.494070 [TRACE] [TimerQueue.cpp:172] [insert] TimerQueue::insert() m_timers.size() : 3 m_activeTimers.size() : 3 2018-11-11 15:49:22.494073 [TRACE] [EventLoop.cpp:59] [loop] EventLoop 0x7FFFBD3C39B0 start loopig 2018-11-11 15:49:22.494075 [TRACE] [Poller.cpp:20] [poll] Poller::poll() 2018-11-11 15:49:23.495462 [TRACE] [Poller.cpp:28] [poll] nothing happended 2018-11-11 15:49:23.495488 [TRACE] [Poller.cpp:20] [poll] Poller::poll() 2018-11-11 15:49:24.496618 [TRACE] [Poller.cpp:28] [poll] nothing happended 2018-11-11 15:49:24.496640 [TRACE] [Poller.cpp:20] [poll] Poller::poll() 2018-11-11 15:49:25.494222 [TRACE] [Poller.cpp:24] [poll] 1 events happended 2018-11-11 15:49:25.494253 [TRACE] [TimerQueue.cpp:45] [readTimerfd] TimerQueue::handleRead() 1 at 1541922565.494251 2018-11-11 15:49:25.494316 [TRACE] [TimerQueue.cpp:188] [handleRead] Expired Timer size 2 2018-11-11 15:49:25.494320 [DEBUG] [main.cpp:50] [test] [test] : test timerQue 2018-11-11 15:49:25.494322 [DEBUG] [main.cpp:50] [test] [test] : test timerQue 2018-11-11 15:49:25.494325 [TRACE] [TimerQueue.cpp:55] [resetTimerfd] resetTimerfd() 2018-11-11 15:49:25.494332 [TRACE] [Poller.cpp:20] [poll] Poller::poll() 2018-11-11 15:49:26.496293 [TRACE] [Poller.cpp:28] [poll] nothing happended 2018-11-11 15:49:26.496318 [TRACE] [Poller.cpp:20] [poll] Poller::poll() 2018-11-11 15:49:27.494291 [TRACE] [Poller.cpp:24] [poll] 1 events happended 2018-11-11 15:49:27.494319 [TRACE] [TimerQueue.cpp:45] [readTimerfd] TimerQueue::handleRead() 1 at 1541922567.494317 2018-11-11 15:49:27.494328 [TRACE] [TimerQueue.cpp:188] [handleRead] Expired Timer size 1 2018-11-11 15:49:27.494331 [DEBUG] [main.cpp:50] [test] [test] : test timerQue 2018-11-11 15:49:27.494334 [TRACE] [Poller.cpp:20] [poll] Poller::poll() 2018-11-11 15:49:28.495665 [TRACE] [Poller.cpp:28] [poll] nothing happended