在Envoy的代碼中Dispatcher
是隨處可見的,能夠說在Envoy中有着舉足輕重的地位,一個Dispatcher
就是一個EventLoop,其承擔了任務隊列、網絡事件處理、定時器、信號處理等核心功能。在Envoy threading model這篇文章所提到的EventLoop
(Each worker thread runs a 「non-blocking」 event loop
)指的就是這個Dispatcher
對象。這個部分的代碼相對較獨立,和其餘模塊耦合也比較少,但重要性卻不言而喻。下面是與Dispatcher
相關的類圖,在接下來會對其中的關鍵概念進行介紹。數據庫
Dispatcher
本質上就是一個EventLoop,Envoy並無從新實現,而是複用了Libevent中的event_base
,在Libevent的基礎上進行了二次封裝並抽象出一些事件類,好比FileEvent
、SignalEvent
、Timer
等。Libevent是一個C庫,而Envoy是C++,爲了不手動管理這些C結構的內存,Envoy經過繼承unique_ptr
的方式從新封裝了這些libevent暴露出來的C結構。安全
template <class T, void (*deleter)(T*)> class CSmartPtr : public std::unique_ptr<T, void (*)(T*)> { public: CSmartPtr() : std::unique_ptr<T, void (*)(T*)>(nullptr, deleter) {} CSmartPtr(T* object) : std::unique_ptr<T, void (*)(T*)>(object, deleter) {} };
經過CSmartPtr
就能夠將Libevent中的一些C數據結構的內存經過RAII機制自動管理起來,使用方式以下:服務器
extern "C" { void event_base_free(event_base*); } struct evbuffer; extern "C" { void evbuffer_free(evbuffer*); } ..... typedef CSmartPtr<event_base, event_base_free> BasePtr; typedef CSmartPtr<evbuffer, evbuffer_free> BufferPtr; typedef CSmartPtr<bufferevent, bufferevent_free> BufferEventPtr; typedef CSmartPtr<evconnlistener, evconnlistener_free> ListenerPtr;
在Libevent中不管是定時器到期、收到信號、仍是文件可讀寫等都是事件,統一使用event
類型來表示,Envoy中則將event
做爲ImplBase
的成員,而後讓全部的事件類型的對象都繼承ImplBase
,從而實現了事件的抽象。網絡
class ImplBase { protected: ~ImplBase(); event raw_event_; };
SignalEvent的實現很簡單,經過evsignal_assign
來初始化事件,而後經過evsignal_add
添加事件使事件成爲未決狀態(關於Libevent事件狀態見附錄)。數據結構
class SignalEventImpl : public SignalEvent, ImplBase { public: // signal_num: 要設置的信號值 // cb: 信號事件的處理函數 SignalEventImpl(DispatcherImpl& dispatcher, int signal_num, SignalCb cb); private: SignalCb cb_; }; SignalEventImpl::SignalEventImpl(DispatcherImpl& dispatcher, int signal_num, SignalCb cb) : cb_(cb) { evsignal_assign( &raw_event_, &dispatcher.base(), signal_num, [](evutil_socket_t, short, void* arg) -> void { static_cast<SignalEventImpl*>(arg)->cb_(); }, this); evsignal_add(&raw_event_, nullptr); }
Timer事件暴露了兩個接口一個用於關閉Timer,另一個則用於啓動Timer,須要傳遞一個時間來設置Timer的到期時間間隔。多線程
class Timer { public: virtual ~Timer() {} virtual void disableTimer() PURE; virtual void enableTimer(const std::chrono::milliseconds& d) PURE; };
建立Timer的時候會經過evtimer_assgin
對event進行初始化,這個時候事件還處於未決狀態而不會觸發,須要經過event_add
添加到Dispatcher
中才能被觸發。socket
class TimerImpl : public Timer, ImplBase { public: TimerImpl(Libevent::BasePtr& libevent, TimerCb cb); // Timer void disableTimer() override; void enableTimer(const std::chrono::milliseconds& d) override; private: TimerCb cb_; }; TimerImpl::TimerImpl(DispatcherImpl& dispatcher, TimerCb cb) : cb_(cb) { ASSERT(cb_); evtimer_assign( &raw_event_, &dispatcher.base(), [](evutil_socket_t, short, void* arg) -> void { static_cast<TimerImpl*>(arg)->cb_(); }, this); }
disableTimer
被調用時其內部會調用event_del
來刪除事件,使事件成爲非未決狀態,enableTimer
被調用時則間接調用event_add
使事件成爲未決狀態,這樣一旦超時時間到了就會觸發超時事件。ide
void TimerImpl::disableTimer() { event_del(&raw_event_); } void TimerImpl::enableTimer(const std::chrono::milliseconds& d) { if (d.count() == 0) { event_active(&raw_event_, EV_TIMEOUT, 0); } else { std::chrono::microseconds us = std::chrono::duration_cast<std::chrono::microseconds>(d); timeval tv; tv.tv_sec = us.count() / 1000000; tv.tv_usec = us.count() % 1000000; event_add(&raw_event_, &tv); } }
上面的代碼在計算
timer
時間timeval
的時候實現的並不優雅,應該避免使用像1000000
這樣的不具有可讀性的數字常量,社區中有人建議能夠改爲以下的形式。函數
auto secs = std::chrono::duration_cast<std::chrono::seconds>(d); auto usecs = std::chrono::duration_cast<std::chrono::microseconds>(d - secs); tv.tv_secs = secs.count(); tv.tv_usecs = usecs.count();
socket
套接字相關的事件被封裝爲FileEvent
,其上暴露了二個接口:activate
用於主動觸發事件,典型的使用場景好比: 喚醒EventLoop、Write Buffer有數據,能夠主動觸發下可寫事件(Envoy中的典型使用場景)等;setEnabled
用於設置事件類型,將事件添加到EventLoop
中使其成爲未決狀態。oop
void FileEventImpl::activate(uint32_t events) { int libevent_events = 0; if (events & FileReadyType::Read) { libevent_events |= EV_READ; } if (events & FileReadyType::Write) { libevent_events |= EV_WRITE; } if (events & FileReadyType::Closed) { libevent_events |= EV_CLOSED; } ASSERT(libevent_events); event_active(&raw_event_, libevent_events, 0); } void FileEventImpl::setEnabled(uint32_t events) { event_del(&raw_event_); assignEvents(events); event_add(&raw_event_, nullptr); }
Dispatcher
的內部有一個任務隊列,也會建立一個線程專們處理任務隊列中的任務。經過Dispatcher
的post
方法能夠將任務投遞到任務隊列中,交給Dispatcher
內的線程去處理。
void DispatcherImpl::post(std::function<void()> callback) { bool do_post; { Thread::LockGuard lock(post_lock_); do_post = post_callbacks_.empty(); post_callbacks_.push_back(callback); } if (do_post) { post_timer_->enableTimer(std::chrono::milliseconds(0)); } }
post
方法將傳遞進來的callback
所表明的任務,添加到post_callbacks_
所表明的類型爲vector<callback>
的成員表變量中。若是post_callbacks_
爲空的話,說明背後的處理線程是處於非活動狀態,這時經過post_timer_
設置一個超時時間時間爲0的方式來喚醒它。post_timer_
在構造的時候就已經設置好對應的callback
爲runPostCallbacks
,對應代碼以下:
DispatcherImpl::DispatcherImpl(TimeSystem& time_system, Buffer::WatermarkFactoryPtr&& factory) : ...... post_timer_(createTimer([this]() -> void { runPostCallbacks(); })), current_to_delete_(&to_delete_1_) { RELEASE_ASSERT(Libevent::Global::initialized(), ""); }
runPostCallbacks
是一個while循環,每次都從post_callbacks_
中取出一個callback
所表明的任務去運行,直到post_callbacks_
爲空。每次運行runPostCallbacks
都會確保全部的任務都執行完。顯然,在runPostCallbacks
被線程執行的期間若是post
進來了新的任務,那麼新任務直接追加到post_callbacks_
尾部便可,而無需作喚醒線程這一動做。
void DispatcherImpl::runPostCallbacks() { while (true) { std::function<void()> callback; { Thread::LockGuard lock(post_lock_); if (post_callbacks_.empty()) { return; } callback = post_callbacks_.front(); post_callbacks_.pop_front(); } callback(); } }
最後講一下Dispatcher
中比較難理解也很重要的DeferredDeletable
,它是一個空接口,全部要進行延遲析構的對象都要繼承自這個空接口。在Envoy的代碼中像下面這樣繼承自DeferredDeletable
的類隨處可見。
class DeferredDeletable { public: virtual ~DeferredDeletable() {} };
那何爲延遲析構呢?用在哪一個場景呢?延遲析構指的是將析構的動做交由Dispatcher
來完成,因此DeferredDeletable
和Dispatcher
密切相關。Dispatcher
對象有一個vector
保存了全部要延遲析構的對象。
class DispatcherImpl : public Dispatcher { ...... private: ........ std::vector<DeferredDeletablePtr> to_delete_1_; std::vector<DeferredDeletablePtr> to_delete_2_; std::vector<DeferredDeletablePtr>* current_to_delete_; }
to_delete_1_
和to_delete_2_
就是用來存放全部的要延遲析構的對象,這裏使用兩個vector
存放,爲何要這樣作呢?。current_to_delete_
始終指向當前正要析構的對象列表,每次執行完析構後就交替指向另一個對象列表,來回交替。
void DispatcherImpl::clearDeferredDeleteList() { ASSERT(isThreadSafe()); std::vector<DeferredDeletablePtr>* to_delete = current_to_delete_; size_t num_to_delete = to_delete->size(); if (deferred_deleting_ || !num_to_delete) { return; } ENVOY_LOG(trace, "clearing deferred deletion list (size={})", num_to_delete); if (current_to_delete_ == &to_delete_1_) { current_to_delete_ = &to_delete_2_; } else { current_to_delete_ = &to_delete_1_; } deferred_deleting_ = true; for (size_t i = 0; i < num_to_delete; i++) { (*to_delete)[i].reset(); } to_delete->clear(); deferred_deleting_ = false; }
上面的代碼在執行對象析構的時候先使用to_delete
來指向當前正要析構的對象列表,而後將current_to_delete_
指向另一個列表,這樣在添加延遲刪除的對象時,就能夠作到安全的把對象添加到列表中了。由於deferredDelete
和clearDeferredDeleteList
都是在同一個線程中運行,因此current_to_delete_
是一個普通的指針,能夠安全的更改指針指向另一個,而不用擔憂有線程安全問題。
void DispatcherImpl::deferredDelete(DeferredDeletablePtr&& to_delete) { ASSERT(isThreadSafe()); current_to_delete_->emplace_back(std::move(to_delete)); ENVOY_LOG(trace, "item added to deferred deletion list (size={})", current_to_delete_->size()); if (1 == current_to_delete_->size()) { deferred_delete_timer_->enableTimer(std::chrono::milliseconds(0)); } }
當有要進行延遲析構的對象時,調用deferredDelete
便可,這個函數內部會經過current_to_delete_
把對象放到要延遲析構的列表中,最後判斷下當前要延遲析構的列表大小是不是1,若是是1代表這是第一次添加延遲析構的對象,那麼就須要經過deferred_delete_timer_
把背後的線程喚醒執行clearDeferredDeleteList
函數。這樣作的緣由是避免屢次喚醒,由於有一種狀況是線程已經喚醒了正在執行clearDeferredDeleteList
,在這個過程當中又有其餘的對象須要析構而加入到vector
中。
到此爲止deferredDelete
的實現原理就基本分析完了,能夠看出它的實現和任務隊列的實現很相似,只不過一個是循環執行callback
所表明的任務,另外一個是對對象進行析構。最後咱們來看一下deferredDelete
的應用場景,卻「爲什麼要進行延遲析構?」在Envoy的源代碼中常常會看到像下面這樣的代碼片斷。
ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket, TransportSocketPtr&& transport_socket, bool connected) { ...... } // 傳遞裸指針到回調中 file_event_ = dispatcher_.createFileEvent( fd(), [this](uint32_t events) -> void { onFileEvent(events); }, Event::FileTriggerType::Edge, Event::FileReadyType::Read | Event::FileReadyType::Write); ...... }
傳遞給Dispatcher
的callback
都是經過裸指針的方式進行回調,若是進行回調的時候對象已經析構了,就會出現野指針的問題,我相信C++水平還能夠的同窗都會看出這個問題,除非能在邏輯上保證Dispatcher
的生命週期比全部對象都短,這樣就能保證在回調的時候對象確定不會析構,可是這不可能成立的,由於Dispatcher
是EventLoop
的核心。
一個線程運行一個EventLoop
直到線程結束,Dispatcher
對象纔會析構,這意味着Dispatcher
對象的生命週期是最長的。因此從邏輯上沒辦法保證進行回調的時候對象沒有析構。可能有人會有疑問,對象在析構的時候把註冊的事件取消不就能夠避免野指針的問題嗎? 那若是事件已經觸發了,callback
正在等待運行呢? 又或者callback
運行了一半呢?前者libevent是能夠保證的,在調用event_del
的時候能夠把處於等待運行的事件取消掉,可是後者就無能爲力了,這個時候若是對象析構了,那行爲就是未定義了。沿着這個思路想想,是否是隻要保證對象析構的時候沒有callback
正在運行就能夠解決問題了呢?是的,只要保證全部在執行中的callback
執行完了,再作對象析構就能夠了。能夠利用Dispatcher
是順序執行全部callback
的特色,向Dispatcher
中插入一個任務就是用來對象析構的,那麼當這個任務執行的時候是能夠保證沒有其餘任何callback
在運行。經過這個方法就完美解決了這裏遇到的野指針問題了。
或許有人又會想,這裏是否是能夠用shared_ptr和shared_from_this來解這個呢? 是的,這是解決多線程環境下對象析構的祕密武器,經過延長對象的生命週期,把對象的生命週期延長到和callback
同樣,等callback
執行完再進行析構,一樣能夠達到效果,可是這帶來了兩個問題,第一就是對象生命週期被無限拉長,雖然延遲析構也拉長了生命週期,可是時間是可預期的,一旦EventLoop
執行了clearDeferredDeleteList
任務就會馬上被回收,而經過shared_ptr
的方式其生命週期取決於callback
什麼時候運行,而callback
什麼時候運行這個是沒辦法保證的,好比一個等待socket
的可讀事件進行回調,若是對端一直不發送數據,那麼callback
就一直不會被運行,對象就一直沒法被析構,長時間累積會致使內存使用率上漲。第二就是在使用方式上侵入性較強,須要強制使用shared_ptr
的方式建立對象。
Dispatcher
總的來講其實現仍是比較簡單明瞭的,比較容易驗證其正確性,一樣功能也相對較弱,和chromium的MessageLoop
、boost的asio
都是類似的用途,可是功能上差得比較多。好在這是專門給Envoy設計的,並且Envoy的場景也比較單一,沒必要作成那麼通用的。另一個我以爲比較奇怪的是,爲何在DeferredDeletable
的實現中要用to_delete_1_
和to_delete_2_
兩個隊列交替來存放,其實按照個人理解一個隊列便可,由於clearDeferredDeleteList
和deferredDelete
是保證在同一個線程中執行的,就和Dispatcher
的任務隊列同樣,用一個隊列保存全部要執行的任務,循環的執行便可。可是Envoy中沒有這樣作,我理解這樣設計的緣由多是由於相比於任務隊列來講延遲析構的重要性更低一些,大量對象的析構若是保存在一個隊列中循環的進行析構勢必會影響其餘關鍵任務的執行,因此這裏拆分紅兩個隊列,多個任務交替的執行,就比如把一個大任務拆分紅了好幾個小任務順序來執行。
雙十一廣告:阿里雲雙十一1折拼團活動:已滿6人,都是最低折扣了
【滿6人】1核2G雲服務器99.5元一年298.5元三年 2核4G雲服務器545元一年 1227元三年
【滿6人】1核1G MySQL數據庫 119.5元一年
【滿6人】3000條國內短信包 60元每6月
參團地址:http://click.aliyun.com/m/1000020293/