Wangle中的Pipeline和Netty中的Pipeline是很類似的,既能夠將它看爲一種職責鏈模式的實現也能夠看做是Handler的容器。Pipeline中的handler都是串行化執行的,前一個handler完成本身的工做以後把事件傳遞給下一個handler,理論上Pipeline中的全部handler都是在同一個IO線程中執行的,可是爲了防止某些handler(好比序列化、編解碼handler等)耗時過長,Netty中容許爲某些handler指定其它線程(eventloop)異步執行,相似的功能在Wangle中也有體現,只是在實現方式上有些區別。和Netty中一個較大的區別是,Wangle中並無專門的Channel定義,Wangle中的Pipeline兼有了Channel的角色和功能。下面分別就Pipeline、Handler和Context的順序進行源碼分析。java
PipelineBase做爲Pipeline的基類,提供了一些最爲通用、核心的api實現,好比對handler的操做:addBack及其變體、addFront及其變體、remove及其變體等,下面看一下addBack的一個實現版本:api
template <class H> PipelineBase& PipelineBase::addBack(std::shared_ptr<H> handler) { typedef typename ContextType<H>::type Context;// 聲明Conetxt類型,ContextImpl<Handler>、InboundContextImpl<Handler>、OutboundContextImpl<Handler>其中之一 // 使用Context包裝Handler後,將其添加到pipeline中,Context中還持有pipeline的引用 return addHelper( std::make_shared<Context>(shared_from_this(), std::move(handler)), false);// false標識添加到尾部 }
首先,會根據要添加的handler類型定義一個Context(Context能夠當作是Handler的外套,後面還會單獨介紹)類型,而後根據這個Context類型建立一個Context:參數爲Pipeline指針和handler,最終addHelper會將Context添加到容器管理起來:app
template <class Context> PipelineBase& PipelineBase::addHelper(std::shared_ptr<Context>&& ctx,bool front) { // 先加入總的Context (std::vector<std::shared_ptr<PipelineContext>>) // 該vector種使用的是智能指針,能夠保持對Context的引用 ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx); // 而後根據方向(BOTH、IN、OUT分別加入相應的vector中) // std::vector<PipelineContext*> 這裏放的是Context的指針,由於引用在上面的容器中已經保持 if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) { inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get()); } if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) { outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get()); } return *this; }
Context內部包含了Pipeline、Handler,和Handler同樣,Context也有方向:BOTH、IN、OUT,首先,不管Context 什麼方向,都會在ctxs_容器上添加這個Context,而後會根據Context方向的不一樣,分別在inCtxs_和outCtxs_上添加該Context。接下來看一下這三個容器的定義:異步
std::vector<std::shared_ptr<PipelineContext>> ctxs_; // 全部的PipelineContext std::vector<PipelineContext*> inCtxs_; // inbound 類型的PipelineContext std::vector<PipelineContext*> outCtxs_; // outbound 類型的PipelineContext
因爲handler的其餘操做(addFront、remove等)都是對這三個容器的增刪操做,原理同樣,此處再也不贅述。ide
PipelineBase中還提供了設置PipelineManager的接口,從字面理解,PipelineManager就是管理Pipeline的接口,其定義以下:函數
class PipelineManager { public: virtual ~PipelineManager() = default; virtual void deletePipeline(PipelineBase* pipeline) = 0; virtual void refreshTimeout() {}; };
其中,deletePipeline會在顯示調用一個pipeline的close方法時被調用,通常用來完成該Pipeline相關的資源釋放,而refreshTimeout主要在Pipeline發生讀寫事件時被回調,主要用來刷新Pipeline的空閒時間。所以,若是你須要監聽Pipeline的delete和refresh事件,那麼能夠本身實現一個PipelineManager並設置到Pipeline上。oop
在Wangle中沒有定義專門的Channel結構,其實Wangle中的Pipeline兼有Channel的功能,好比要判斷一個Channel是否還處於鏈接狀態,在Netty中代碼以下:源碼分析
channel.isConnected();
那麼Wangle中的Pipeline並無此類方法可供使用,怎麼辦呢?其實,Wangle的Pipeline提供了一個更強大的方法:getTransport,該方法能夠得到一個底層的AsyncTransport,而該AsyncTransport擁有全部的底層鏈接信息,好比(僅列出主要接口):ui
class AsyncTransport : public DelayedDestruction, public AsyncSocketBase { public: typedef std::unique_ptr<AsyncTransport, Destructor> UniquePtr; virtual void close() = 0; virtual void closeNow() = 0; virtual void closeWithReset() { closeNow(); } virtual void shutdownWrite() = 0; virtual void shutdownWriteNow() = 0; virtual bool good() const = 0; virtual bool readable() const = 0; virtual bool isPending() const { return readable(); } virtual bool connecting() const = 0; virtual bool error() const = 0; virtual void attachEventBase(EventBase* eventBase) = 0; virtual void detachEventBase() = 0; virtual bool isDetachable() const = 0; virtual void setSendTimeout(uint32_t milliseconds) = 0; virtual uint32_t getSendTimeout() const = 0; virtual void getLocalAddress(SocketAddress* address) const = 0; virtual void getAddress(SocketAddress* address) const { getLocalAddress(address); } virtual void getPeerAddress(SocketAddress* address) const = 0; virtual ssl::X509UniquePtr getPeerCert() const { return nullptr; } };
至此,PipelineBase中的主要功能分析完畢。this
Pipeline是PipelineBase的子類,其具體定義以下:
template <class R, class W = folly::Unit> class Pipeline : public PipelineBase { public: using Ptr = std::shared_ptr<Pipeline>; static Ptr create() { return std::shared_ptr<Pipeline>(new Pipeline()); } ~Pipeline(); // 模板方法 template <class T = R> typename std::enable_if<!std::is_same<T, folly::Unit>::value>::type read(R msg);//front_->read(std::forward<R>(msg)); --> this->handler_->read(this, std::forward<Rin>(msg)); template <class T = R> typename std::enable_if<!std::is_same<T, folly::Unit>::value>::type readEOF();//front_->readEOF(); template <class T = R> typename std::enable_if<!std::is_same<T, folly::Unit>::value>::type readException(folly::exception_wrapper e);//front_->readException(std::move(e)); template <class T = R> typename std::enable_if<!std::is_same<T, folly::Unit>::value>::type transportActive();// front_->transportActive(); template <class T = R> typename std::enable_if<!std::is_same<T, folly::Unit>::value>::type transportInactive();//front_->transportInactive(); template <class T = W> typename std::enable_if<!std::is_same<T, folly::Unit>::value, folly::Future<folly::Unit>>::type write(W msg);//back_->write(std::forward<W>(msg)); template <class T = W> typename std::enable_if<!std::is_same<T, folly::Unit>::value, folly::Future<folly::Unit>>::type writeException(folly::exception_wrapper e);//back_->writeException(std::move(e)); template <class T = W> typename std::enable_if<!std::is_same<T, folly::Unit>::value, folly::Future<folly::Unit>>::type close();//back_->close() void finalize() override; protected: Pipeline(); explicit Pipeline(bool isStatic); private: bool isStatic_{false}; InboundLink<R>* front_{nullptr};// inbound類型Context(read) OutboundLink<W>* back_{nullptr};// outbound類型Context (write) };
能夠看到,Pipeline主要定義和實現了一些和Handler對應的經常使用方法:read、readEOF、readException、transportActive、transportInactive、write、writeException、close。同時,Pipeline還定義了兩個私有成員:front_和back_,從類型能夠看出這是兩個不一樣的方向,首先看一下InboundLink定義:
template <class In> class InboundLink { public: virtual ~InboundLink() = default; virtual void read(In msg) = 0; virtual void readEOF() = 0; virtual void readException(folly::exception_wrapper e) = 0; virtual void transportActive() = 0; virtual void transportInactive() = 0; };
能夠看出,InboundLink只是把Pipeline主要方法中的IN方向單獨抽象出來,都是一個IN事件(輸入事件),那麼可想而知OutboundLink的定義:
template <class Out> class OutboundLink { public: virtual ~OutboundLink() = default; virtual folly::Future<folly::Unit> write(Out msg) = 0; virtual folly::Future<folly::Unit> writeException( folly::exception_wrapper e) = 0; virtual folly::Future<folly::Unit> close() = 0; };
的確,OutboundLink定義的都是OUT事件類型的操做。
前文在講PipelineBase時,addBack之類的操做都是隻針對那三個容器進行的,沒有地方對front_鏈表和back_鏈表進行操做啊?其實,front_鏈表和back_鏈表的設置是在Pipeline的finalize中完成的:
template <class R, class W> void Pipeline<R, W>::finalize() { front_ = nullptr; if (!inCtxs_.empty()) { front_ = dynamic_cast<InboundLink<R>*>(inCtxs_.front()); for (size_t i = 0; i < inCtxs_.size() - 1; i++) { inCtxs_[i]->setNextIn(inCtxs_[i + 1]); } inCtxs_.back()->setNextIn(nullptr); } back_ = nullptr; if (!outCtxs_.empty()) { back_ = dynamic_cast<OutboundLink<W>*>(outCtxs_.back()); for (size_t i = outCtxs_.size() - 1; i > 0; i--) { outCtxs_[i]->setNextOut(outCtxs_[i - 1]); } outCtxs_.front()->setNextOut(nullptr); } if (!front_) { detail::logWarningIfNotUnit<R>( "No inbound handler in Pipeline, inbound operations will throw " "std::invalid_argument"); } if (!back_) { detail::logWarningIfNotUnit<W>( "No outbound handler in Pipeline, outbound operations will throw " "std::invalid_argument"); } for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) { (*it)->attachPipeline(); } }
代碼很簡單,以IN方向爲例,遍歷inCtxs_容器,對容器中的每個Context調用其setNextIn方法將Context組成一個單向鏈表front_。同理,outCtxs_最終會變爲back_單向鏈表。最後,還會遍歷Context的總容器ctxs_,爲每個Context調用attachPipeline方法,該方法主要工做就是把Context綁定到對應的Handler上(最終是Context和Handler都互相持有對方的引用),還會回調Handler的attachPipeline方法。
此處還有一個細節,Pipeline是一個模板類,具備兩個模板參數template <class R, class W = folly::Unit>,分別表明Pipeline的 read(IN事件)的數據類型和write(out事件)數據類型,這些類型的設置要和Pipeline中的handler類型向匹配(後文還會詳細講解)。
下面就以Pipeline中的write方法來看一下事件的流動過程:
template <class R, class W> template <class T> typename std::enable_if < !std::is_same<T, folly::Unit>::value, folly::Future<folly::Unit >>::type Pipeline<R, W>::write(W msg) { if (!back_) { throw std::invalid_argument("write(): no outbound handler in Pipeline"); } return back_->write(std::forward<W>(msg)); }
Pipeline的write方法只是簡單的調用back_的wirte方法,也就是OUT類型的事件會從Pipeline的最後一個Context依次向前傳遞(只傳遞給OUT類型的handler)。
Handler在繼承層次上相似於Pipeline,首先有一個基類HandlerBase,其定義以下:
template <class Context> class HandlerBase { public: virtual ~HandlerBase() = default; virtual void attachPipeline(Context* /*ctx*/) {} virtual void detachPipeline(Context* /*ctx*/) {} // 獲取綁定的Context Context* getContext() { if (attachCount_ != 1) { return nullptr; } CHECK(ctx_); return ctx_; } private: friend PipelineContext; // 設置PipelineContext爲友元類,便於PipelineContext操做本身 uint64_t attachCount_{0}; // 綁定計數,同一個handler能夠被同時綁定到不一樣的pipeline中 Context* ctx_{nullptr}; // 該Handler綁定的Context };
HandlerBase內部組合了一個綁定的Context指針,並提供了getContext接口用於獲取這個Handler綁定的Context。
Handler做爲HandlerBase的子類,它具備四個模板參數: Rin、Rout、Win、Wout,其中Rin做爲Handler和Context中read方法中消息的數據類型,Rout是做爲Context中fireRead方法的參數類型。同理,Win是做爲Handler和Context中wirte方法的消息參數類型,而Wout是做爲Context中fireWrite的消息參數類型。能夠這麼理解:Xout是做爲以fire開頭的事件方法的參數類型。
template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin> class Handler : public HandlerBase<HandlerContext<Rout, Wout>> { public: static const HandlerDir dir = HandlerDir::BOTH; // 方向爲雙向 typedef Rin rin; typedef Rout rout; typedef Win win; typedef Wout wout; typedef HandlerContext<Rout, Wout> Context; // 聲明該HandlerContext類型 virtual ~Handler() = default; // inbound類型事件 virtual void read(Context* ctx, Rin msg) = 0; virtual void readEOF(Context* ctx) { ctx->fireReadEOF(); } virtual void readException(Context* ctx, folly::exception_wrapper e) { ctx->fireReadException(std::move(e)); } virtual void transportActive(Context* ctx) { ctx->fireTransportActive(); } virtual void transportInactive(Context* ctx) { ctx->fireTransportInactive(); } // outbound類型事件 virtual folly::Future<folly::Unit> write(Context* ctx, Win msg) = 0; virtual folly::Future<folly::Unit> writeException(Context* ctx, folly::exception_wrapper e) { return ctx->fireWriteException(std::move(e)); } virtual folly::Future<folly::Unit> close(Context* ctx) { return ctx->fireClose(); } };
相似於Pipeline,Handler也相應的定義了inbound類型和outbound類型事件,分別對應方法:read、readEOF、readException、transportActive、transportInactive、write、writeException、close(這些方法和Pipeline中一一對應)。其中,除了read和write兩個方法是純虛接口以外,其餘的方法都提供了默認實現:就是將事件進行透傳(調用Context裏fireXxx方法)。
同理,根據事件類型的不一樣,還能夠進一步細分Handler類型,好比InboundHandler類型爲:
// inbound類型的Handler (默認狀況下讀入和讀出的類型是一致) template <class Rin, class Rout = Rin> class InboundHandler : public HandlerBase<InboundHandlerContext<Rout>> { public: static const HandlerDir dir = HandlerDir::IN; // 方向爲輸入 typedef Rin rin; typedef Rout rout; typedef folly::Unit win; typedef folly::Unit wout; typedef InboundHandlerContext<Rout> Context; // 聲明inbound類型的InboundHandlerContext virtual ~InboundHandler() = default; // 純虛函數。由子類實現 virtual void read(Context* ctx, Rin msg) = 0; // 下面的默認實現都是事件的透傳 virtual void readEOF(Context* ctx) { ctx->fireReadEOF(); } virtual void readException(Context* ctx, folly::exception_wrapper e) { ctx->fireReadException(std::move(e));// std::move } virtual void transportActive(Context* ctx) { ctx->fireTransportActive(); } virtual void transportInactive(Context* ctx) { ctx->fireTransportInactive(); } };
相應的,OutboundHandler類型定義爲:
// outbound類型的Handler (默認寫入類型和寫出類型一致,若是不一致就會產生不少的轉換) template <class Win, class Wout = Win> class OutboundHandler : public HandlerBase<OutboundHandlerContext<Wout>> { public: static const HandlerDir dir = HandlerDir::OUT; // 方向爲輸出 typedef folly::Unit rin; typedef folly::Unit rout; typedef Win win; typedef Wout wout; typedef OutboundHandlerContext<Wout> Context; virtual ~OutboundHandler() = default; // 純虛函數。由子類實現 virtual folly::Future<folly::Unit> write(Context* ctx, Win msg) = 0; // 下面的默認實現都是事件的透傳 virtual folly::Future<folly::Unit> writeException( Context* ctx, folly::exception_wrapper e) { return ctx->fireWriteException(std::move(e)); } virtual folly::Future<folly::Unit> close(Context* ctx) { return ctx->fireClose(); } };
前文所說,Handler全部的事件方法中只有read和write是純虛接口,這樣用戶每次實現本身的Handler時都須要override這兩個方法(即便只是完成簡單的事件透傳),所以,爲了方便用戶編寫本身的Handler,Wangle提供了HandlerAdapter,HandlerAdapter其實很簡單,就是以事件透傳的方式重寫(override)了read個write兩個方法。代碼以下:
// Handler適配器 template <class R, class W = R> class HandlerAdapter : public Handler<R, R, W, W> { public: typedef typename Handler<R, R, W, W>::Context Context; // 將read事件直接進行透傳 void read(Context* ctx, R msg) override { ctx->fireRead(std::forward<R>(msg)); } // 將write事件直接進行透傳 folly::Future<folly::Unit> write(Context* ctx, W msg) override { return ctx->fireWrite(std::forward<W>(msg)); } };
如前文所述,Pipeline中直接管理的並非Handler,而是Context,爲了便於理解,此處再把Pipeline中的addBack源碼列出來:
template <class H> PipelineBase& PipelineBase::addBack(std::shared_ptr<H> handler) { typedef typename ContextType<H>::type Context;// 聲明Conetxt類型,ContextImpl<Handler>、InboundContextImpl<Handler>、OutboundContextImpl<Handler>其中之一 // 使用Context包裝Handler後,將其添加到pipeline中,Context中還持有pipeline的引用 return addHelper( std::make_shared<Context>(shared_from_this(), std::move(handler)), false);// false標識添加到尾部 }
其中,ContextType的定義以下,它會根據Handler的類型(具體來講是方向)決定Context的類型,若是Handler是雙向的,那麼Context類型爲ContextImpl<Handler>,若是Handler的方向爲IN,那麼Context類型爲InboundContextImpl<Handler>,若是Handler的方向爲OUT,那麼Context類型爲OutboundContextImpl<Handler>。
template <class Handler> struct ContextType { // template< bool B, class T, class F > // type T if B == true, F if B == false typedef typename std::conditional < Handler::dir == HandlerDir::BOTH, //若是是雙向 ContextImpl<Handler>, //類型就是ContextImpl<Handler> typename std::conditional< //若是不是雙向,那麼還須要細分 Handler::dir == HandlerDir::IN, //若是是IN類型 InboundContextImpl<Handler>, //那麼類型就是InboundContextImpl<Handler> OutboundContextImpl<Handler> //不然就是OutboundContextImpl<Handler> >::type >::type type; // Context類型 };
其實,InboundContextImpl和OutboundContextImpl都是ContextImpl的子類,ContextImpl的繼承關係爲:
template <class H> class ContextImpl : public HandlerContext<typename H::rout, typename H::wout>, public InboundLink<typename H::rin>, public OutboundLink<typename H::win>, public ContextImplBase<H, HandlerContext<typename H::rout, typename H::wout>>
能夠看到,ContextImpl一個繼承自四個父類:HandlerContext、InboundLink、OutboundLink和ContextImplBase,其中HandlerContext中主要定義了以fire開頭的事件傳遞方法;InboundLink和OutboundLink分別定義了Handler中Inbound和Outbound類型的方法接口,還記得Pipeline中用於管理IN方向和OUT方向的兩個鏈表:front_和back_,它們就分別是InboundLink和OutboundLink類型;ContextImplBase主要提供了Pipeline中Context在組裝鏈表時的接口,好比:setNextIn、setNextOut,以及用於將Context綁定到handler上的attachPipeline方法。
首先來看HandlerContext基類:
// HandlerContext定義(集inbound和outbound類型於一身) // 以fire開始的方法都是Context中的事件方法 template <class In, class Out> class HandlerContext { public: virtual ~HandlerContext() = default; // inbound類型事件接口 virtual void fireRead(In msg) = 0; virtual void fireReadEOF() = 0; virtual void fireReadException(folly::exception_wrapper e) = 0; virtual void fireTransportActive() = 0; virtual void fireTransportInactive() = 0; // outbound類型事件接口 virtual folly::Future<folly::Unit> fireWrite(Out msg) = 0; virtual folly::Future<folly::Unit> fireWriteException( folly::exception_wrapper e) = 0; virtual folly::Future<folly::Unit> fireClose() = 0; virtual PipelineBase* getPipeline() = 0; virtual std::shared_ptr<PipelineBase> getPipelineShared() = 0; std::shared_ptr<folly::AsyncTransport> getTransport() { return getPipeline()->getTransport(); } virtual void setWriteFlags(folly::WriteFlags flags) = 0; virtual folly::WriteFlags getWriteFlags() = 0; virtual void setReadBufferSettings( uint64_t minAvailable, uint64_t allocationSize) = 0; virtual std::pair<uint64_t, uint64_t> getReadBufferSettings() = 0; };
HandlerContext主要定義了以fire開頭的事件傳播方法:fireRead、fireReadEOF、fireReadException、fireTransportActive、fireTransportInactive、fireWrite、fireWriteException、fireClose,以及getPipeline用於獲取Context綁定的Pipeline、getPipelineShared以智能指針的形式獲取Pipeline、getTransport用於獲取Pipeline對應的Transport。
根據事件流向的不一樣,Context也能夠細分定義,InboundHandlerContext定義爲:
// inbound 類型的InboundHandlerContext template <class In> class InboundHandlerContext { public: virtual ~InboundHandlerContext() = default; virtual void fireRead(In msg) = 0; virtual void fireReadEOF() = 0; virtual void fireReadException(folly::exception_wrapper e) = 0; virtual void fireTransportActive() = 0; virtual void fireTransportInactive() = 0; virtual PipelineBase* getPipeline() = 0; virtual std::shared_ptr<PipelineBase> getPipelineShared() = 0; std::shared_ptr<folly::AsyncTransport> getTransport() { return getPipeline()->getTransport(); } };
同理,OutboundHandlerContext定義爲:
// outbound 類型的OutboundHandlerContext template <class Out> class OutboundHandlerContext { public: virtual ~OutboundHandlerContext() = default; virtual folly::Future<folly::Unit> fireWrite(Out msg) = 0; virtual folly::Future<folly::Unit> fireWriteException( folly::exception_wrapper e) = 0; virtual folly::Future<folly::Unit> fireClose() = 0; virtual PipelineBase* getPipeline() = 0; virtual std::shared_ptr<PipelineBase> getPipelineShared() = 0; std::shared_ptr<folly::AsyncTransport> getTransport() { return getPipeline()->getTransport(); } };
如前文所述,PipelineContext主要定義瞭如何在Pipeline中組織Context鏈表的操做接口,好比setNextIn用於設置下一個IN類型的Context,setNextOut用來設置下一個OUT類型Context,具體定義以下:
class PipelineContext { public: virtual ~PipelineContext() = default; // 依附到一個pipeline中 virtual void attachPipeline() = 0; // 從pipeline中分離 virtual void detachPipeline() = 0; // 將一個HandlerContext綁定到handler上 template <class H, class HandlerContext> void attachContext(H* handler, HandlerContext* ctx) { // 只有第一次綁定的時候纔會設置 if (++handler->attachCount_ == 1) { handler->ctx_ = ctx; } else { // 爲什麼在此設置的時候就爲nullptr handler->ctx_ = nullptr; } } // 設置下一個inbound類型的Context virtual void setNextIn(PipelineContext* ctx) = 0; // 設置下一個outbound類型的Context virtual void setNextOut(PipelineContext* ctx) = 0; // 獲取方向(Context方向依賴於Handler方向) virtual HandlerDir getDirection() = 0; };
ContextImplBase主要實現了PipelineContext接口方法,同時它的兩個成員:nextIn_和nextOut_就是鏈表的指針,用來串聯起整個Context。
template <class H, class Context> class ContextImplBase : public PipelineContext { public: ~ContextImplBase() = default; // 獲取Context綁定的Handler H* getHandler() { return handler_.get(); } // Context初始化,參數爲Context所屬的Pipeline weak_ptr,Context要綁定的Handler shared_ptr void initialize(std::weak_ptr<PipelineBase> pipeline,std::shared_ptr<H> handler) { pipelineWeak_ = pipeline; pipelineRaw_ = pipeline.lock().get();//裸指針 handler_ = std::move(handler); } // PipelineContext overrides void attachPipeline() override { // 若是該Context尚未被綁定 if (!attached_) { this->attachContext(handler_.get(), impl_);// 將該Context綁定到handler上 handler_->attachPipeline(impl_); // 調用Handler的attachPipeline,有具體的Handler實現 attached_ = true;//標記Context已經attached到一個pipeline中 } } // 從pipeline中分離 void detachPipeline() override { handler_->detachPipeline(impl_);// 調用Handler的detachPipeline,有具體的Handler實現 // 依附標誌位爲false attached_ = false; } void setNextIn(PipelineContext* ctx) override { if (!ctx) { nextIn_ = nullptr; return; } // 轉成InboundLink,由於Context是InboundLink子類 auto nextIn = dynamic_cast<InboundLink<typename H::rout>*>(ctx); if (nextIn) { nextIn_ = nextIn; } else { throw std::invalid_argument(folly::sformat( "inbound type mismatch after {}", folly::demangle(typeid(H)))); } } void setNextOut(PipelineContext* ctx) override { if (!ctx) { nextOut_ = nullptr; return; } auto nextOut = dynamic_cast<OutboundLink<typename H::wout>*>(ctx); if (nextOut) { nextOut_ = nextOut; } else { throw std::invalid_argument(folly::sformat( "outbound type mismatch after {}", folly::demangle(typeid(H)))); } } // 獲取Context的方向 HandlerDir getDirection() override { return H::dir; } protected: Context* impl_; // 具體的Context實現 std::weak_ptr<PipelineBase> pipelineWeak_; // PipelineBase* pipelineRaw_; // 該Context綁定的pipeline std::shared_ptr<H> handler_; // 該Context包含的Handler InboundLink<typename H::rout>* nextIn_{nullptr}; // 下一個inbound類型的Context地址 OutboundLink<typename H::wout>* nextOut_{nullptr}; // 下一個outbound類型的Context地址 private: bool attached_{false}; // 這個Context是否已經被綁定 };
ContextImpl就是最終的Context實現,也就是要被添加到Pipeline中(好比使用addBack)的容器(ctxs_,inCtxs_,outCtxs_)的最終Context,在最後的finalize方法中還會進一步將容器中的Context組裝成front_和back_單向鏈表。
ContextImpl的主要功能就是實現了各類事件傳遞方法(以fire開頭的方法),以fireRead爲例,這是一個IN類型的事件,因爲Context中持有的Pipeline是一個weak類型的指針,所以先嚐試lock,保證在事件傳播階段這個Pipeline不會銷燬,而後會去調用下一個IN類型的Context的read方法。read方法是InboundLink中定義的接口(注意這裏的read不是Handler中的也不是Pipeline中的),ContextImpl的也實現了這個read方法,它的功能很簡單,首先仍是先lock住這個Pipeline,而後直接調用Context內部包含的Handler的read方法。
template <class H> class ContextImpl : public HandlerContext<typename H::rout, typename H::wout>, public InboundLink<typename H::rin>, public OutboundLink<typename H::win>, public ContextImplBase<H, HandlerContext<typename H::rout, typename H::wout>> { public: typedef typename H::rin Rin; typedef typename H::rout Rout; typedef typename H::win Win; typedef typename H::wout Wout; static const HandlerDir dir = HandlerDir::BOTH; explicit ContextImpl(std::weak_ptr<PipelineBase> pipeline,std::shared_ptr<H> handler) { this->impl_ = this;//實現就是本身 this->initialize(pipeline, std::move(handler));//初始化 } // For StaticPipeline ContextImpl() { this->impl_ = this; } ~ContextImpl() = default; // HandlerContext overrides // Inbound類型的事件:read事件 void fireRead(Rout msg) override { auto guard = this->pipelineWeak_.lock();// 鎖住,確保一旦鎖住成功,在操做期間,pipeline不會被銷燬 // 若是尚未到最後 if (this->nextIn_) { // 將事件繼續向下傳播(傳給下一個Inbound類型的Context) // 注意:這裏調用的是下一個Contex的read而不是fireRead // 即調用下一個Context裏面的Handler方法 this->nextIn_->read(std::forward<Rout>(msg)); } else { LOG(WARNING) << "read reached end of pipeline"; } } void fireReadEOF() override { auto guard = this->pipelineWeak_.lock(); if (this->nextIn_) { this->nextIn_->readEOF(); } else { LOG(WARNING) << "readEOF reached end of pipeline"; } } void fireReadException(folly::exception_wrapper e) override { auto guard = this->pipelineWeak_.lock(); if (this->nextIn_) { this->nextIn_->readException(std::move(e)); } else { LOG(WARNING) << "readException reached end of pipeline"; } } void fireTransportActive() override { auto guard = this->pipelineWeak_.lock(); if (this->nextIn_) { this->nextIn_->transportActive(); } } void fireTransportInactive() override { auto guard = this->pipelineWeak_.lock(); if (this->nextIn_) { this->nextIn_->transportInactive(); } } //Outbound類型的事件傳播 folly::Future<folly::Unit> fireWrite(Wout msg) override { auto guard = this->pipelineWeak_.lock(); if (this->nextOut_) { return this->nextOut_->write(std::forward<Wout>(msg)); } else { LOG(WARNING) << "write reached end of pipeline"; // 若是到了最後,返回一個future return folly::makeFuture(); } } folly::Future<folly::Unit> fireWriteException( folly::exception_wrapper e) override { auto guard = this->pipelineWeak_.lock(); if (this->nextOut_) { return this->nextOut_->writeException(std::move(e)); } else { LOG(WARNING) << "close reached end of pipeline"; return folly::makeFuture(); } } folly::Future<folly::Unit> fireClose() override { auto guard = this->pipelineWeak_.lock(); if (this->nextOut_) { return this->nextOut_->close(); } else { LOG(WARNING) << "close reached end of pipeline"; return folly::makeFuture(); } } // 獲取Context綁定的pipeline指針 PipelineBase* getPipeline() override { return this->pipelineRaw_; } // 獲取Context綁定的pipeline引用 std::shared_ptr<PipelineBase> getPipelineShared() override { return this->pipelineWeak_.lock(); } // 設置和獲取wirte標誌位 void setWriteFlags(folly::WriteFlags flags) override { this->pipelineRaw_->setWriteFlags(flags); } folly::WriteFlags getWriteFlags() override { return this->pipelineRaw_->getWriteFlags(); } // 設置read緩衝區參數 minAvailable、allocationSize void setReadBufferSettings( uint64_t minAvailable, uint64_t allocationSize) override { this->pipelineRaw_->setReadBufferSettings(minAvailable, allocationSize); } std::pair<uint64_t, uint64_t> getReadBufferSettings() override { return this->pipelineRaw_->getReadBufferSettings(); } // InboundLink overrides void read(Rin msg) override { // 保證pipeline不會被刪除 auto guard = this->pipelineWeak_.lock(); // 調用該Context綁定的Handler的read方法,至於事件是都須要繼續傳播,徹底受read中的實現 this->handler_->read(this, std::forward<Rin>(msg)); } void readEOF() override { auto guard = this->pipelineWeak_.lock(); this->handler_->readEOF(this); } void readException(folly::exception_wrapper e) override { auto guard = this->pipelineWeak_.lock(); this->handler_->readException(this, std::move(e)); } void transportActive() override { auto guard = this->pipelineWeak_.lock(); this->handler_->transportActive(this); } void transportInactive() override { auto guard = this->pipelineWeak_.lock(); this->handler_->transportInactive(this); } // OutboundLink overrides folly::Future<folly::Unit> write(Win msg) override { auto guard = this->pipelineWeak_.lock(); return this->handler_->write(this, std::forward<Win>(msg)); } folly::Future<folly::Unit> writeException( folly::exception_wrapper e) override { auto guard = this->pipelineWeak_.lock(); return this->handler_->writeException(this, std::move(e)); } folly::Future<folly::Unit> close() override { auto guard = this->pipelineWeak_.lock(); return this->handler_->close(this); } };
一樣,Context也能夠根據傳輸方向進行細分,首先是InboundContextImpl:
template <class H> class InboundContextImpl : public InboundHandlerContext<typename H::rout>, public InboundLink<typename H::rin>, public ContextImplBase<H, InboundHandlerContext<typename H::rout>> { public: typedef typename H::rin Rin; typedef typename H::rout Rout; typedef typename H::win Win; typedef typename H::wout Wout; static const HandlerDir dir = HandlerDir::IN; explicit InboundContextImpl( std::weak_ptr<PipelineBase> pipeline, std::shared_ptr<H> handler) { this->impl_ = this; this->initialize(pipeline, std::move(handler)); } // For StaticPipeline InboundContextImpl() { this->impl_ = this; } ~InboundContextImpl() = default; // InboundHandlerContext overrides void fireRead(Rout msg) override { auto guard = this->pipelineWeak_.lock(); if (this->nextIn_) { this->nextIn_->read(std::forward<Rout>(msg)); } else { LOG(WARNING) << "read reached end of pipeline"; } } void fireReadEOF() override { auto guard = this->pipelineWeak_.lock(); if (this->nextIn_) { this->nextIn_->readEOF(); } else { LOG(WARNING) << "readEOF reached end of pipeline"; } } void fireReadException(folly::exception_wrapper e) override { auto guard = this->pipelineWeak_.lock(); if (this->nextIn_) { this->nextIn_->readException(std::move(e)); } else { LOG(WARNING) << "readException reached end of pipeline"; } } void fireTransportActive() override { auto guard = this->pipelineWeak_.lock(); if (this->nextIn_) { this->nextIn_->transportActive(); } } void fireTransportInactive() override { auto guard = this->pipelineWeak_.lock(); if (this->nextIn_) { this->nextIn_->transportInactive(); } } PipelineBase* getPipeline() override { return this->pipelineRaw_; } std::shared_ptr<PipelineBase> getPipelineShared() override { return this->pipelineWeak_.lock(); } // InboundLink overrides void read(Rin msg) override { auto guard = this->pipelineWeak_.lock(); this->handler_->read(this, std::forward<Rin>(msg)); } void readEOF() override { auto guard = this->pipelineWeak_.lock(); this->handler_->readEOF(this); } void readException(folly::exception_wrapper e) override { auto guard = this->pipelineWeak_.lock(); this->handler_->readException(this, std::move(e)); } void transportActive() override { auto guard = this->pipelineWeak_.lock(); this->handler_->transportActive(this); } void transportInactive() override { auto guard = this->pipelineWeak_.lock(); this->handler_->transportInactive(this); } };
其次是OutboundContextImpl:
template <class H> class OutboundContextImpl : public OutboundHandlerContext<typename H::wout>, public OutboundLink<typename H::win>, public ContextImplBase<H, OutboundHandlerContext<typename H::wout>> { public: typedef typename H::rin Rin; typedef typename H::rout Rout; typedef typename H::win Win; typedef typename H::wout Wout; static const HandlerDir dir = HandlerDir::OUT; explicit OutboundContextImpl( std::weak_ptr<PipelineBase> pipeline, std::shared_ptr<H> handler) { this->impl_ = this; this->initialize(pipeline, std::move(handler)); } // For StaticPipeline OutboundContextImpl() { this->impl_ = this; } ~OutboundContextImpl() = default; // OutboundHandlerContext overrides folly::Future<folly::Unit> fireWrite(Wout msg) override { auto guard = this->pipelineWeak_.lock(); if (this->nextOut_) { return this->nextOut_->write(std::forward<Wout>(msg)); } else { LOG(WARNING) << "write reached end of pipeline"; return folly::makeFuture(); } } folly::Future<folly::Unit> fireWriteException( folly::exception_wrapper e) override { auto guard = this->pipelineWeak_.lock(); if (this->nextOut_) { return this->nextOut_->writeException(std::move(e)); } else { LOG(WARNING) << "close reached end of pipeline"; return folly::makeFuture(); } } folly::Future<folly::Unit> fireClose() override { auto guard = this->pipelineWeak_.lock(); if (this->nextOut_) { return this->nextOut_->close(); } else { LOG(WARNING) << "close reached end of pipeline"; return folly::makeFuture(); } } PipelineBase* getPipeline() override { return this->pipelineRaw_; } std::shared_ptr<PipelineBase> getPipelineShared() override { return this->pipelineWeak_.lock(); } // OutboundLink overrides folly::Future<folly::Unit> write(Win msg) override { auto guard = this->pipelineWeak_.lock(); return this->handler_->write(this, std::forward<Win>(msg)); } folly::Future<folly::Unit> writeException( folly::exception_wrapper e) override { auto guard = this->pipelineWeak_.lock(); return this->handler_->writeException(this, std::move(e)); } folly::Future<folly::Unit> close() override { auto guard = this->pipelineWeak_.lock(); return this->handler_->close(this); } };
按照慣例,仍是來一張圖總結一下吧:
Wangle源碼分析:EventBaseHandler、AsyncSocketHandler