Envoy 源碼分析--event

Envoy 源碼分析--event

申明:本文的 Envoy 源碼分析基於 Envoy1.10.0。api

Envoy 的事件是複用了 libevent 的 event_base 。其在代碼中的表現就是類 Dispatcher,一個 Dispatcher 其實就是一個 event_loop,主要的核心功能有:網絡事件處理,定時器,信號處理,任務隊列,代碼對象的析構等。下面是相關的類圖。網絡

envoy_event

ImplBase 包含了 libevent 的事件類型,對象在析構時會自動調用 event_delImplBase 派生出 FileEventImplSignalEventImplTimerImpl 三種類型的事件。 RealTimeSystem 在建立調度後,會建立一個線程局部存儲(TLS)的時間隊列。DispatchedThreadImpl 包含了 DispatcherImpl 在啓動時會建立一條線程,而後啓動一個 event_loop,同時在 event_loop 外層包了個 guard_dog 防止死鎖。socket

libevent

Envoy 是 C++ 的,而 libevent 是個 C 庫,這就須要自動管理 C 結構的內存。 Envoy 經過繼承智能指針 unique_ptr 來從新封裝了 libevent 的結構體。ide

template <class T, void (*deleter)(T*)> class CSmartPtr : public std::unique_ptr<T, void (*)(T*)> {
public:
  CSmartPtr() : std::unique_ptr<T, void (*)(T*)>(nullptr, deleter) {}
  CSmartPtr(T* object) : std::unique_ptr<T, void (*)(T*)>(object, deleter) {}
};

而後使用 CSmartPtr 就能夠自動管理 libevent 的結構體。使用方式以下:函數

struct event_base;
extern "C" {
void event_base_free(event_base*);
}

struct evbuffer;
extern "C" {
void evbuffer_free(evbuffer*);
}

struct bufferevent;
extern "C" {
void bufferevent_free(bufferevent*);
}

struct evconnlistener;
extern "C" {
void evconnlistener_free(evconnlistener*);
}

typedef CSmartPtr<event_base, event_base_free> BasePtr;
typedef CSmartPtr<evbuffer, evbuffer_free> BufferPtr;
typedef CSmartPtr<bufferevent, bufferevent_free> BufferEventPtr;
typedef CSmartPtr<evconnlistener, evconnlistener_free> ListenerPtr;

這樣 libevent 的結構體就變成了 C++ 的智能指針。oop

Envoy 有三種事件都是 event 類型,咱們須要對事件類型進行抽象,自動管理事件的釋放。Envoy 將 event 做爲 ImplBase的成員,在類析構進自動釋放,全部事件只要繼承 ImplBase 就完成了事件的自動管理。源碼分析

class ImplBase {
protected:
  ~ImplBase();

  event raw_event_;
};

ImplBase::~ImplBase() {
  event_del(&raw_event_);
}

Timer

Timer 只有兩接口一個用於啓動,另外一個用於關閉。post

class Timer {
public:
  virtual ~Timer() {}
  virtual void disableTimer() PURE;
  virtual void enableTimer(const std::chrono::milliseconds& d) PURE;
};

建立 Timer 時,會在構造函數內進行初始化。enableTimer 時調用 event_add 加入事件。 disableTimer 時調用 event_del 刪除事件。ui

TimerImpl::TimerImpl(Libevent::BasePtr& libevent, TimerCb cb) : cb_(cb) {
  ASSERT(cb_);
  evtimer_assign(
      &raw_event_, libevent.get(),
      [](evutil_socket_t, short, void* arg) -> void { static_cast<TimerImpl*>(arg)->cb_(); }, this);
}

void TimerImpl::disableTimer() { event_del(&raw_event_); }

void TimerImpl::enableTimer(const std::chrono::milliseconds& d) {
  if (d.count() == 0) {
    event_active(&raw_event_, EV_TIMEOUT, 0);
  } else {
    // TODO(#4332): use duration_cast more nicely to clean up this code.
    std::chrono::microseconds us = std::chrono::duration_cast<std::chrono::microseconds>(d);
    timeval tv;
    tv.tv_sec = us.count() / 1000000;
    tv.tv_usec = us.count() % 1000000;
    event_add(&raw_event_, &tv);
  }
}

SignalEvent

SignalEvent 比較簡單在構造函數時,直接初始化並加入事件。

SignalEventImpl::SignalEventImpl(DispatcherImpl& dispatcher, int signal_num, SignalCb cb)
    : cb_(cb) {
  evsignal_assign(
      &raw_event_, &dispatcher.base(), signal_num,
      [](evutil_socket_t, short, void* arg) -> void { static_cast<SignalEventImpl*>(arg)->cb_(); },
      this);
  evsignal_add(&raw_event_, nullptr);
}

FileEvent

文件相關的事件封裝爲 FileEvent。咱們知道 linux 中 socket 也是一個文件,所以 socket 套接字相關的事件也屬於 FileEvent。FileEvent 使用持久性事件假定用戶一直讀或寫,直到收到 EAGAIN。

FileEvent 提供兩個接口。activate 不管事件是否準備就緒,此方法都會主動觸發事件,典型場景:socket 讀寫事件, EventLoop 喚醒等。setEnabled 用於設置事件。

class FileEvent {
public:
  virtual ~FileEvent() {}
  virtual void activate(uint32_t events) PURE;
  virtual void setEnabled(uint32_t events) PURE;
};

RealTimeSystem

RealTimeSystem 暴露三個接口。

class RealTimeSystem : public TimeSystem {
public:
  SchedulerPtr createScheduler(Libevent::BasePtr&) override;
  SystemTime systemTime() override { return time_source_.systemTime(); }
  MonotonicTime monotonicTime() override { return time_source_.monotonicTime(); }

private:
  RealTimeSource time_source_;
}
  • systemTime 返回系統時間。調用的是 std::chrono 的 system_clock。
  • monotonicTime 返回的是系統的啓動時間。即 linux 命令 uptime 上的啓動時間。用於時間間隔,不會受系統修改時間的影響。調用的是 std::chrono 的 steady_clock。
  • createScheduler 建立一個計時器工廠(factory模式)。間接啓用線程本地計時器隊列管理,所以每一個線程具備單獨的計時器。 RealScheduler 類放在源文件中,外部沒法調用。
//建立計時器工廠
SchedulerPtr RealTimeSystem::createScheduler(Libevent::BasePtr& libevent) {
  return std::make_unique<RealScheduler>(libevent);
}

class RealScheduler : public Scheduler {
public:
  RealScheduler(Libevent::BasePtr& libevent) : libevent_(libevent) {}
  //建立一個本地計時器
  TimerPtr createTimer(const TimerCb& cb) override {
    return std::make_unique<TimerImpl>(libevent_, cb);
  };

private:
  Libevent::BasePtr& libevent_;
};

任務隊列

Dispatcher 內部建立了一個任務隊列,將全部的 callback 加入隊列。同時建立一個 Timer 調用一個函數,函數內循環處理。

post 方法將傳進來的 callback 加入到任務任務。若是加入前的隊列爲空就須要觸發定時器。post_timer_ 在構造函數內已設置好其對應的函數,調用 runPostCallbacks

void DispatcherImpl::post(std::function<void()> callback) {
  bool do_post;
  {
    Thread::LockGuard lock(post_lock_);
    do_post = post_callbacks_.empty();
    post_callbacks_.push_back(callback);
  }

  if (do_post) {
    post_timer_->enableTimer(std::chrono::milliseconds(0));
  }
}

DispatcherImpl::DispatcherImpl(TimeSystem& time_system, Buffer::WatermarkFactoryPtr&& factory,
                               Api::Api& api)
    : ...
      post_timer_(createTimer([this]() -> void { runPostCallbacks(); })),
      current_to_delete_(&to_delete_1_) {
  RELEASE_ASSERT(Libevent::Global::initialized(), "");
}

runPostCallbacks 是一個死循環,每次取一個 callback 進行處理。直到隊列爲空跳出循環。從這能夠看出 post 進來的任務,若是在加入前隊列爲空的話,runPostCallbacks 已退出,所以須要從新觸發 post_timer_

void DispatcherImpl::runPostCallbacks() {
  while (true) {
    std::function<void()> callback;
    {
      Thread::LockGuard lock(post_lock_);
      if (post_callbacks_.empty()) {
        return;
      }
      callback = post_callbacks_.front();
      post_callbacks_.pop_front();
    }
    callback();
  }
}

延遲析構

延遲析構指的是將 unique_ptr 的對象的析構的動做交由 Dispatcher 來完成。 DeferredDeletable 是個空接口,全部析構的對象都要繼承 DeferredDeletable

class DeferredDeletable {
public:
  virtual ~DeferredDeletable() {}
};

typedef std::unique_ptr<DeferredDeletable> DeferredDeletablePtr;

Dispatcher 對象保存了全部要延遲析構的對象

std::vector<DeferredDeletablePtr> to_delete_1_;
std::vector<DeferredDeletablePtr> to_delete_2_;
std::vector<DeferredDeletablePtr>* current_to_delete_;

to_delete_1_to_delete_2 保存了析構的對象,current_to_delete_ 指針當前要析構的對象。加入延遲析構對象時,若是當前的析構對象長度爲 1,deferred_delete_timer_ 就會被觸發。

void DispatcherImpl::deferredDelete(DeferredDeletablePtr&& to_delete) {
  ASSERT(isThreadSafe());
  current_to_delete_->emplace_back(std::move(to_delete));
  ENVOY_LOG(trace, "item added to deferred deletion list (size={})", current_to_delete_->size());
  if (1 == current_to_delete_->size()) {
    deferred_delete_timer_->enableTimer(std::chrono::milliseconds(0));
  }
}

deferred_delete_timer_ 是在構造函數內已構造好回調函數 clearDeferredDeleteListclearDeferredDeleteListcurrent_to_delete_始終指向當前正要析構的對象列表,每次執行完析構後就指向另一個對象列表,來回交替。

void DispatcherImpl::clearDeferredDeleteList() {
  ASSERT(isThreadSafe());
  std::vector<DeferredDeletablePtr>* to_delete = current_to_delete_;

  size_t num_to_delete = to_delete->size();
  if (deferred_deleting_ || !num_to_delete) {
    return;
  }

  ENVOY_LOG(trace, "clearing deferred deletion list (size={})", num_to_delete);

  if (current_to_delete_ == &to_delete_1_) {
    current_to_delete_ = &to_delete_2_;
  } else {
    current_to_delete_ = &to_delete_1_;
  }

  deferred_deleting_ = true;

  for (size_t i = 0; i < num_to_delete; i++) {
    (*to_delete)[i].reset();
  }

  to_delete->clear();
  deferred_deleting_ = false;
}

能夠看出延遲析構的原理和任務隊列原理差很少。

爲什麼要延遲析構以及析構時爲什麼須要兩個隊列,可參考:https://yq.aliyun.com/articles/659277

dispacth_thread

dispacth_thread 只是一個簡單的 event_loop 線程,不支持像接收新鏈接那樣的工做線程。 接口很簡單,在啓動時,啓動一個新線程,在新線程中調用 dispatch run 執行 event_loop。同時會新建一個 GuardDog 監控線程是否死鎖。

void DispatchedThreadImpl::start(Server::GuardDog& guard_dog) {
  thread_ =
      api_.threadFactory().createThread([this, &guard_dog]() -> void { threadRoutine(guard_dog); });
}

void DispatchedThreadImpl::threadRoutine(Server::GuardDog& guard_dog) {
  ENVOY_LOG(debug, "dispatched thread entering dispatch loop");
  auto watchdog = guard_dog.createWatchDog(api_.threadFactory().currentThreadId());
  watchdog->startWatchdog(*dispatcher_);
  dispatcher_->run(Dispatcher::RunType::Block);
  ENVOY_LOG(debug, "dispatched thread exited dispatch loop");
  guard_dog.stopWatching(watchdog);

  watchdog.reset();
  dispatcher_.reset();
}

參考文檔: https://yq.aliyun.com/articles/659277

相關文章
相關標籤/搜索