muduo之TimerQueue

         muduo的TimerQueue是基於timerfd_create實現,這樣超時很容易和epoll結合起來。等待超時事件保存在set集合中,注意set集合的有序性,從小到大排列,整個對TimerQueue的處理也就是對set集合的操做。實現TimerQueue用了3個set,分別是等待超時事件set,活躍事件set,被撤銷定時set。主要是STL的一些操做。算法

TimerQueue.h數組

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is an internal header file, you should not include this.

#ifndef MUDUO_NET_TIMERQUEUE_H
#define MUDUO_NET_TIMERQUEUE_H

#include <set>
#include <vector>

#include "muduo/base/Mutex.h"
#include "muduo/base/Timestamp.h"
#include "muduo/net/Callbacks.h"
#include "muduo/net/Channel.h"

namespace muduo
{
namespace net
{

class EventLoop;
class Timer;
class TimerId;

///
/// A best efforts timer queue.
/// No guarantee that the callback will be on time.
///
class TimerQueue : noncopyable
{
 public:
  explicit TimerQueue(EventLoop* loop);
  ~TimerQueue();
  ///
  /// Schedules the callback to be run at given time,
  /// repeats if @c interval > 0.0.
  ///
  /// Must be thread safe. Usually be called from other threads.

  /* 
   * 用於註冊定時任務
   * @param cb, 超時調用的回調函數
   * @param when,超時時間(絕對時間)
   * @interval,是不是週期性超時任務
   */
  TimerId addTimer(TimerCallback cb,
                   Timestamp when,
                   double interval);
  
  /* 取消定時任務,每一個定時任務都有對應的TimerId,這是addTimer返回給調用者的 */
  void cancel(TimerId timerId);

 private:

  // FIXME: use unique_ptr<Timer> instead of raw pointers.
  // This requires heterogeneous comparison lookup (N3465) from C++14
  // so that we can find an T* in a set<unique_ptr<T>>.
  /* 
   * 主要用於刪除操做,經過TimerId找到Timer*,再經過Timer*找到在timers_中的位置,將期刪除
   * 以爲能夠省略
   */
  typedef std::pair<Timestamp, Timer*> Entry;
  typedef std::set<Entry> TimerList;
  typedef std::pair<Timer*, int64_t> ActiveTimer;
  typedef std::set<ActiveTimer> ActiveTimerSet;

  void addTimerInLoop(Timer* timer);
  void cancelInLoop(TimerId timerId);
  // called when timerfd alarms
   /* 當timerfd被激活時調用的回調函數,表示超時 */
  void handleRead();
  // move out all expired timers
  /* 從timers_中拿出全部超時的Timer* */
  std::vector<Entry> getExpired(Timestamp now);
  /* 將超時任務中週期性的任務從新添加到timers_中 */
  void reset(const std::vector<Entry>& expired, Timestamp now);

  /* 插入到timers_中 */
  bool insert(Timer* timer);

  EventLoop* loop_;				/* 所屬的事件驅動循環 */
  const int timerfd_;			/* 由timerfd_create建立的文件描述符 */
  Channel timerfdChannel_;		/* 用於監聽timerfd的Channel */
  // Timer list sorted by expiration
  TimerList timers_;			/* 保存全部的定時任務 */

  // for cancel()
  ActiveTimerSet activeTimers_;
  bool callingExpiredTimers_; /* atomic */
  ActiveTimerSet cancelingTimers_;	//保存被取消的定時器
};

}  // namespace net
}  // namespace muduo
#endif  // MUDUO_NET_TIMERQUEUE_H

TimerQueue.ccide

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)

#ifndef __STDC_LIMIT_MACROS
#define __STDC_LIMIT_MACROS
#endif

#include "muduo/net/TimerQueue.h"

#include "muduo/base/Logging.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/Timer.h"
#include "muduo/net/TimerId.h"

#include <sys/timerfd.h>
#include <unistd.h>

namespace muduo
{
namespace net
{
namespace detail
{

int createTimerfd()
{
  int timerfd = ::timerfd_create(CLOCK_MONOTONIC,
                                 TFD_NONBLOCK | TFD_CLOEXEC);
  if (timerfd < 0)
  {
    LOG_SYSFATAL << "Failed in timerfd_create";
  }
  return timerfd;
}

struct timespec howMuchTimeFromNow(Timestamp when)//如今距離超時時間when還有多久
{
  int64_t microseconds = when.microSecondsSinceEpoch()
                         - Timestamp::now().microSecondsSinceEpoch();
  if (microseconds < 100)
  {
    microseconds = 100;
  }
  struct timespec ts;
  ts.tv_sec = static_cast<time_t>(
      microseconds / Timestamp::kMicroSecondsPerSecond);
  ts.tv_nsec = static_cast<long>(
      (microseconds % Timestamp::kMicroSecondsPerSecond) * 1000);
  return ts;
}

void readTimerfd(int timerfd, Timestamp now)//處理超時事件。超時後,timerfd變爲可讀
{
  uint64_t howmany;
  ssize_t n = ::read(timerfd, &howmany, sizeof howmany);
  LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString();
  if (n != sizeof howmany)
  {
    LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8";
  }
}

//timerfd是時間對於的文件描述符
void resetTimerfd(int timerfd, Timestamp expiration)//從新設置定時器
{
  // wake up loop by timerfd_settime()
  struct itimerspec newValue;
  struct itimerspec oldValue;
  memZero(&newValue, sizeof newValue);
  memZero(&oldValue, sizeof oldValue);
  newValue.it_value = howMuchTimeFromNow(expiration);
  int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue);
  if (ret)
  {
    LOG_SYSERR << "timerfd_settime()";
  }
}

}  // namespace detail
}  // namespace net
}  // namespace muduo




using namespace muduo;
using namespace muduo::net;
using namespace muduo::net::detail;

TimerQueue::TimerQueue(EventLoop* loop)
  : loop_(loop),
    timerfd_(createTimerfd()),
    timerfdChannel_(loop, timerfd_),
    timers_(),
    callingExpiredTimers_(false)
{
  timerfdChannel_.setReadCallback(
      std::bind(&TimerQueue::handleRead, this));
  // we are always reading the timerfd, we disarm it with timerfd_settime.
  timerfdChannel_.enableReading();
}

TimerQueue::~TimerQueue()
{
  timerfdChannel_.disableAll();
  timerfdChannel_.remove();
  ::close(timerfd_);
  // do not remove channel, since we're in EventLoop::dtor();
  for (const Entry& timer : timers_)
  {
    delete timer.second;
  }
}

TimerId TimerQueue::addTimer(TimerCallback cb,
                             Timestamp when,
                             double interval)
{
  Timer* timer = new Timer(std::move(cb), when, interval);
  /* 
   * 在本身所屬線程調用addTimerInLoop函數 
   */
  loop_->runInLoop(
      std::bind(&TimerQueue::addTimerInLoop, this, timer));
  return TimerId(timer, timer->sequence());
}

void TimerQueue::cancel(TimerId timerId)
{
  loop_->runInLoop(
      std::bind(&TimerQueue::cancelInLoop, this, timerId));
}

/* 向計時器隊列中添加超時事件 */
void TimerQueue::addTimerInLoop(Timer* timer)
{
  loop_->assertInLoopThread();
  bool earliestChanged = insert(timer);//返回true,說明timer被添加到set的頂部,做爲新的根節點,須要更新timerfd的激活時間

  // 只有在計時器爲空的時候或者新加入的計時器的最先觸發時間小於當前計時器的堆頂的最小值
  // 才須要用最近時間去更新
  if (earliestChanged)
  {
    resetTimerfd(timerfd_, timer->expiration());
  }
}

void TimerQueue::cancelInLoop(TimerId timerId)
{
  loop_->assertInLoopThread();
  assert(timers_.size() == activeTimers_.size());
  ActiveTimer timer(timerId.timer_, timerId.sequence_);
  ActiveTimerSet::iterator it = activeTimers_.find(timer);
  if (it != activeTimers_.end())//要取消的在當前激活的Timer集合中
  {
    size_t n = timers_.erase(Entry(it->first->expiration(), it->first));//在timers_中取消
    assert(n == 1); (void)n;
    delete it->first; // FIXME: no delete please
    activeTimers_.erase(it);//在activeTimers_中取消
  }
  else if (callingExpiredTimers_)//若是正在執行超時定時器的回調函數,則加入到cancelingTimers集合中
  {
    cancelingTimers_.insert(timer);
  }
  assert(timers_.size() == activeTimers_.size());
}

/* 
 * 當定時器超時,保存timerfd的Channel激活,調用回調函數
 */
void TimerQueue::handleRead()
{
  loop_->assertInLoopThread();
  Timestamp now(Timestamp::now());
  readTimerfd(timerfd_, now);

  /* 從定時任務set中拿出全部超時任務 */
  std::vector<Entry> expired = getExpired(now);

  callingExpiredTimers_ = true;
  cancelingTimers_.clear();
  // safe to callback outside critical section
  /* 調用超時的事件回調函數 */
  for (const Entry& it : expired)
  {
    it.second->run();
  }
  callingExpiredTimers_ = false;

  reset(expired, now);
}

/*
 * 從新整理時間set中的任務,將全部超時的任務都拿出,而後調用其回調函數
 */
std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
{
  assert(timers_.size() == activeTimers_.size());
  std::vector<Entry> expired;
  Entry sentry(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
  
  //返回第一個大於等於now的迭代器,小於now的都已經超時
  //lower_bound( begin,end,num):從數組的begin位置到end-1位置二分查找第一個大於或等於num的數字,找到返回該數字的地址
  //lower_bound(val):返回容器中第一個值【大於或等於】val的元素的iterator位置
  TimerList::iterator end = timers_.lower_bound(sentry);
  assert(end == timers_.end() || now < end->first);
  
  /* back_inserter:容器適配器,將數據插入到參數的尾部 */
  //一個序列(sequence)拷貝到一個容器(container)中去,一般用std::copy算法
  std::copy(timers_.begin(), end, back_inserter(expired));
  timers_.erase(timers_.begin(), end);   //從timers_中移除

  for (const Entry& it : expired)
  {
    ActiveTimer timer(it.second, it.second->sequence());
    size_t n = activeTimers_.erase(timer);  //從activeTimers_中移除
    assert(n == 1); (void)n;
  }

  assert(timers_.size() == activeTimers_.size());
  return expired;
}

/* 
 * 調用完全部超時的回調函數後,須要對這些超時任務進行整理
 * 將週期性的定時任務從新添加到set中
 */
void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
{
  Timestamp nextExpire;

  for (const Entry& it : expired)
  {
    ActiveTimer timer(it.second, it.second->sequence());
    if (it.second->repeat()   		/* 是不是週期性的定時任務 */
        && cancelingTimers_.find(timer) == cancelingTimers_.end())
    {
      /* 從新計算超時時間 */
      it.second->restart(now);
	  /* 從新添加到set中 */
      insert(it.second);
    }
    else
    {
      // FIXME move to a free list
      delete it.second; // FIXME: no delete please
    }
  }

  /* 計算下次timerfd被激活的時間 */
  if (!timers_.empty())
  {
    nextExpire = timers_.begin()->second->expiration();//set從小到大排序
  }

  /* 設置 */
  if (nextExpire.valid())//時間是有效的
  {
    resetTimerfd(timerfd_, nextExpire);
  }
}

bool TimerQueue::insert(Timer* timer)
{
  loop_->assertInLoopThread();
  assert(timers_.size() == activeTimers_.size());
  bool earliestChanged = false;

  /* 獲取timer的UTC時間戳,和timer組成std::pair<Timestamp, Timer*> */
  Timestamp when = timer->expiration();
  
  /* timers_begin()是set頂層元素(紅黑樹根節點),是超時時間最近的Timer* */
  TimerList::iterator it = timers_.begin();
  
  /* 若是要添加的timer的超時時間比timers_中的超時時間近,更改新的超時時間 */
  if (it == timers_.end() || when < it->first)
  {
    earliestChanged = true;
  }
  {
    /* 添加到定時任務的set中 */
    std::pair<TimerList::iterator, bool> result = timers_.insert(Entry(when, timer));
    assert(result.second); (void)result;
  }
  {
    /* 同時也添加到activeTimers_中,用於刪除時查找操做 */
    std::pair<ActiveTimerSet::iterator, bool> result = activeTimers_.insert(ActiveTimer(timer, timer->sequence()));
    assert(result.second); (void)result;
  }

  assert(timers_.size() == activeTimers_.size());
  return earliestChanged;
}
相關文章
相關標籤/搜索