Wangle源碼分析:Pipeline、Handler、Context

基本概念

      Wangle中的Pipeline和Netty中的Pipeline是很類似的,既能夠將它看爲一種職責鏈模式的實現也能夠看做是Handler的容器。Pipeline中的handler都是串行化執行的,前一個handler完成本身的工做以後把事件傳遞給下一個handler,理論上Pipeline中的全部handler都是在同一個IO線程中執行的,可是爲了防止某些handler(好比序列化、編解碼handler等)耗時過長,Netty中容許爲某些handler指定其它線程(eventloop)異步執行,相似的功能在Wangle中也有體現,只是在實現方式上有些區別。和Netty中一個較大的區別是,Wangle中並無專門的Channel定義,Wangle中的Pipeline兼有了Channel的角色和功能。下面分別就Pipeline、Handler和Context的順序進行源碼分析。java

Pipeline

     PipelineBase做爲Pipeline的基類,提供了一些最爲通用、核心的api實現,好比對handler的操做:addBack及其變體、addFront及其變體、remove及其變體等,下面看一下addBack的一個實現版本:api

template <class H>
PipelineBase& PipelineBase::addBack(std::shared_ptr<H> handler) {
  typedef typename ContextType<H>::type Context;// 聲明Conetxt類型,ContextImpl<Handler>、InboundContextImpl<Handler>、OutboundContextImpl<Handler>其中之一
  // 使用Context包裝Handler後,將其添加到pipeline中,Context中還持有pipeline的引用
  return addHelper(
           std::make_shared<Context>(shared_from_this(), std::move(handler)),
           false);// false標識添加到尾部
}

      首先,會根據要添加的handler類型定義一個Context(Context能夠當作是Handler的外套,後面還會單獨介紹)類型,而後根據這個Context類型建立一個Context:參數爲Pipeline指針和handler,最終addHelper會將Context添加到容器管理起來:app

template <class Context>
PipelineBase& PipelineBase::addHelper(std::shared_ptr<Context>&& ctx,bool front) {
  // 先加入總的Context (std::vector<std::shared_ptr<PipelineContext>>)
  // 該vector種使用的是智能指針,能夠保持對Context的引用
  ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx);
  // 而後根據方向(BOTH、IN、OUT分別加入相應的vector中)
  // std::vector<PipelineContext*> 這裏放的是Context的指針,由於引用在上面的容器中已經保持
  if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) {
    inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get());
  }
  if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) {
    outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get());
  }
  return *this;
}

      Context內部包含了Pipeline、Handler,和Handler同樣,Context也有方向:BOTH、IN、OUT,首先,不管Context 什麼方向,都會在ctxs_容器上添加這個Context,而後會根據Context方向的不一樣,分別在inCtxs_和outCtxs_上添加該Context。接下來看一下這三個容器的定義:異步

std::vector<std::shared_ptr<PipelineContext>> ctxs_;  // 全部的PipelineContext
  std::vector<PipelineContext*> inCtxs_;  // inbound 類型的PipelineContext
  std::vector<PipelineContext*> outCtxs_; // outbound 類型的PipelineContext

     因爲handler的其餘操做(addFront、remove等)都是對這三個容器的增刪操做,原理同樣,此處再也不贅述。ide

     PipelineBase中還提供了設置PipelineManager的接口,從字面理解,PipelineManager就是管理Pipeline的接口,其定義以下:函數

class PipelineManager {
 public:
  virtual ~PipelineManager() = default;
  virtual void deletePipeline(PipelineBase* pipeline) = 0;
  virtual void refreshTimeout() {};
};

      其中,deletePipeline會在顯示調用一個pipeline的close方法時被調用,通常用來完成該Pipeline相關的資源釋放,而refreshTimeout主要在Pipeline發生讀寫事件時被回調,主要用來刷新Pipeline的空閒時間。所以,若是你須要監聽Pipeline的delete和refresh事件,那麼能夠本身實現一個PipelineManager並設置到Pipeline上。oop

      在Wangle中沒有定義專門的Channel結構,其實Wangle中的Pipeline兼有Channel的功能,好比要判斷一個Channel是否還處於鏈接狀態,在Netty中代碼以下:源碼分析

channel.isConnected();

     那麼Wangle中的Pipeline並無此類方法可供使用,怎麼辦呢?其實,Wangle的Pipeline提供了一個更強大的方法:getTransport,該方法能夠得到一個底層的AsyncTransport,而該AsyncTransport擁有全部的底層鏈接信息,好比(僅列出主要接口):ui

class AsyncTransport : public DelayedDestruction, public AsyncSocketBase {
public:
  typedef std::unique_ptr<AsyncTransport, Destructor> UniquePtr;
  virtual void close() = 0;
  virtual void closeNow() = 0;
  virtual void closeWithReset() {
    closeNow();
  }

  virtual void shutdownWrite() = 0;
  virtual void shutdownWriteNow() = 0;
  virtual bool good() const = 0;
  virtual bool readable() const = 0;
  virtual bool isPending() const {
    return readable();
  }
  virtual bool connecting() const = 0;
  virtual bool error() const = 0;
  virtual void attachEventBase(EventBase* eventBase) = 0;
  virtual void detachEventBase() = 0;
  virtual bool isDetachable() const = 0;
  virtual void setSendTimeout(uint32_t milliseconds) = 0;
  virtual uint32_t getSendTimeout() const = 0;
  virtual void getLocalAddress(SocketAddress* address) const = 0;
  virtual void getAddress(SocketAddress* address) const {
    getLocalAddress(address);
  }
  virtual void getPeerAddress(SocketAddress* address) const = 0;
  virtual ssl::X509UniquePtr getPeerCert() const { return nullptr; }
};

       至此,PipelineBase中的主要功能分析完畢。this

       Pipeline是PipelineBase的子類,其具體定義以下:

template <class R, class W = folly::Unit>
class Pipeline : public PipelineBase {
 public:
  using Ptr = std::shared_ptr<Pipeline>;

  static Ptr create() {
    return std::shared_ptr<Pipeline>(new Pipeline());
  }

  ~Pipeline();

  // 模板方法
  template <class T = R>
  typename std::enable_if<!std::is_same<T, folly::Unit>::value>::type
  read(R msg);//front_->read(std::forward<R>(msg)); --> this->handler_->read(this, std::forward<Rin>(msg));

  template <class T = R>
  typename std::enable_if<!std::is_same<T, folly::Unit>::value>::type
  readEOF();//front_->readEOF();

  template <class T = R>
  typename std::enable_if<!std::is_same<T, folly::Unit>::value>::type
  readException(folly::exception_wrapper e);//front_->readException(std::move(e));

  template <class T = R>
  typename std::enable_if<!std::is_same<T, folly::Unit>::value>::type
  transportActive();// front_->transportActive();

  template <class T = R>
  typename std::enable_if<!std::is_same<T, folly::Unit>::value>::type
  transportInactive();//front_->transportInactive();

  template <class T = W>
  typename std::enable_if<!std::is_same<T, folly::Unit>::value,
                          folly::Future<folly::Unit>>::type
  write(W msg);//back_->write(std::forward<W>(msg));

  template <class T = W>
  typename std::enable_if<!std::is_same<T, folly::Unit>::value,
                          folly::Future<folly::Unit>>::type
  writeException(folly::exception_wrapper e);//back_->writeException(std::move(e));

  template <class T = W>
  typename std::enable_if<!std::is_same<T, folly::Unit>::value,
                          folly::Future<folly::Unit>>::type
  close();//back_->close()

  void finalize() override;

 protected:
  Pipeline();
  explicit Pipeline(bool isStatic);

 private:
  bool isStatic_{false};

  InboundLink<R>* front_{nullptr};// inbound類型Context(read)
  OutboundLink<W>* back_{nullptr};// outbound類型Context (write)
};

      能夠看到,Pipeline主要定義和實現了一些和Handler對應的經常使用方法:read、readEOF、readException、transportActive、transportInactive、write、writeException、close。同時,Pipeline還定義了兩個私有成員:front_和back_,從類型能夠看出這是兩個不一樣的方向,首先看一下InboundLink定義:

template <class In>
class InboundLink {
public:
  virtual ~InboundLink() = default;
  virtual void read(In msg) = 0;
  virtual void readEOF() = 0;
  virtual void readException(folly::exception_wrapper e) = 0;
  virtual void transportActive() = 0;
  virtual void transportInactive() = 0;
};

       能夠看出,InboundLink只是把Pipeline主要方法中的IN方向單獨抽象出來,都是一個IN事件(輸入事件),那麼可想而知OutboundLink的定義:

template <class Out>
class OutboundLink {
public:
  virtual ~OutboundLink() = default;
  virtual folly::Future<folly::Unit> write(Out msg) = 0;
  virtual folly::Future<folly::Unit> writeException(
    folly::exception_wrapper e) = 0;
  virtual folly::Future<folly::Unit> close() = 0;
};

      的確,OutboundLink定義的都是OUT事件類型的操做。

      前文在講PipelineBase時,addBack之類的操做都是隻針對那三個容器進行的,沒有地方對front_鏈表和back_鏈表進行操做啊?其實,front_鏈表和back_鏈表的設置是在Pipeline的finalize中完成的:

template <class R, class W>
void Pipeline<R, W>::finalize() {
  front_ = nullptr;
  if (!inCtxs_.empty()) {
    front_ = dynamic_cast<InboundLink<R>*>(inCtxs_.front());
    for (size_t i = 0; i < inCtxs_.size() - 1; i++) {
      inCtxs_[i]->setNextIn(inCtxs_[i + 1]);
    }
    inCtxs_.back()->setNextIn(nullptr);
  }

  back_ = nullptr;
  if (!outCtxs_.empty()) {
    back_ = dynamic_cast<OutboundLink<W>*>(outCtxs_.back());
    for (size_t i = outCtxs_.size() - 1; i > 0; i--) {
      outCtxs_[i]->setNextOut(outCtxs_[i - 1]);
    }
    outCtxs_.front()->setNextOut(nullptr);
  }

  if (!front_) {
    detail::logWarningIfNotUnit<R>(
      "No inbound handler in Pipeline, inbound operations will throw "
      "std::invalid_argument");
  }
  if (!back_) {
    detail::logWarningIfNotUnit<W>(
      "No outbound handler in Pipeline, outbound operations will throw "
      "std::invalid_argument");
  }

  for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {
    (*it)->attachPipeline();
  }
}

     代碼很簡單,以IN方向爲例,遍歷inCtxs_容器,對容器中的每個Context調用其setNextIn方法將Context組成一個單向鏈表front_。同理,outCtxs_最終會變爲back_單向鏈表。最後,還會遍歷Context的總容器ctxs_,爲每個Context調用attachPipeline方法,該方法主要工做就是把Context綁定到對應的Handler上(最終是Context和Handler都互相持有對方的引用),還會回調Handler的attachPipeline方法。

    此處還有一個細節,Pipeline是一個模板類,具備兩個模板參數template <class R, class W = folly::Unit>,分別表明Pipeline的 read(IN事件)的數據類型和write(out事件)數據類型,這些類型的設置要和Pipeline中的handler類型向匹配(後文還會詳細講解)。

     下面就以Pipeline中的write方法來看一下事件的流動過程:

template <class R, class W>
template <class T>
typename std::enable_if < !std::is_same<T, folly::Unit>::value,
         folly::Future<folly::Unit >>::type
Pipeline<R, W>::write(W msg) {
  if (!back_) {
    throw std::invalid_argument("write(): no outbound handler in Pipeline");
  }
  return back_->write(std::forward<W>(msg));
}

      Pipeline的write方法只是簡單的調用back_的wirte方法,也就是OUT類型的事件會從Pipeline的最後一個Context依次向前傳遞(只傳遞給OUT類型的handler)。

Handler

      Handler在繼承層次上相似於Pipeline,首先有一個基類HandlerBase,其定義以下:

template <class Context>
class HandlerBase {
public:
  virtual ~HandlerBase() = default;

  virtual void attachPipeline(Context* /*ctx*/) {}
  virtual void detachPipeline(Context* /*ctx*/) {}

  // 獲取綁定的Context
  Context* getContext() {
    if (attachCount_ != 1) {
      return nullptr;
    }
    CHECK(ctx_);
    return ctx_;
  }

private:
  friend PipelineContext;    // 設置PipelineContext爲友元類,便於PipelineContext操做本身
  uint64_t attachCount_{0};  // 綁定計數,同一個handler能夠被同時綁定到不一樣的pipeline中
  Context* ctx_{nullptr};    // 該Handler綁定的Context
};

      HandlerBase內部組合了一個綁定的Context指針,並提供了getContext接口用於獲取這個Handler綁定的Context。

      Handler做爲HandlerBase的子類,它具備四個模板參數: Rin、Rout、Win、Wout,其中Rin做爲Handler和Context中read方法中消息的數據類型,Rout是做爲Context中fireRead方法的參數類型。同理,Win是做爲Handler和Context中wirte方法的消息參數類型,而Wout是做爲Context中fireWrite的消息參數類型。能夠這麼理解:Xout是做爲以fire開頭的事件方法的參數類型。

template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin>
class Handler : public HandlerBase<HandlerContext<Rout, Wout>> {
public:
  static const HandlerDir dir = HandlerDir::BOTH; // 方向爲雙向

  typedef Rin rin;
  typedef Rout rout;
  typedef Win win;
  typedef Wout wout;
  typedef HandlerContext<Rout, Wout> Context;  // 聲明該HandlerContext類型
  virtual ~Handler() = default;

  // inbound類型事件
  virtual void read(Context* ctx, Rin msg) = 0;
  virtual void readEOF(Context* ctx) {
    ctx->fireReadEOF();
  }
  virtual void readException(Context* ctx, folly::exception_wrapper e) {
    ctx->fireReadException(std::move(e));
  }
  virtual void transportActive(Context* ctx) {
    ctx->fireTransportActive();
  }
  virtual void transportInactive(Context* ctx) {
    ctx->fireTransportInactive();
  }

  // outbound類型事件
  virtual folly::Future<folly::Unit> write(Context* ctx, Win msg) = 0;
  virtual folly::Future<folly::Unit> writeException(Context* ctx,
      folly::exception_wrapper e) {
    return ctx->fireWriteException(std::move(e));
  }
  virtual folly::Future<folly::Unit> close(Context* ctx) {
    return ctx->fireClose();
  }
};

      相似於Pipeline,Handler也相應的定義了inbound類型和outbound類型事件,分別對應方法:read、readEOF、readException、transportActive、transportInactive、write、writeException、close(這些方法和Pipeline中一一對應)。其中,除了read和write兩個方法是純虛接口以外,其餘的方法都提供了默認實現:就是將事件進行透傳(調用Context裏fireXxx方法)。

       同理,根據事件類型的不一樣,還能夠進一步細分Handler類型,好比InboundHandler類型爲:

// inbound類型的Handler (默認狀況下讀入和讀出的類型是一致)
template <class Rin, class Rout = Rin>
class InboundHandler : public HandlerBase<InboundHandlerContext<Rout>> {
public:
  static const HandlerDir dir = HandlerDir::IN;  // 方向爲輸入

  typedef Rin rin;
  typedef Rout rout;
  typedef folly::Unit win;
  typedef folly::Unit wout;
  typedef InboundHandlerContext<Rout> Context; // 聲明inbound類型的InboundHandlerContext
  virtual ~InboundHandler() = default;

  // 純虛函數。由子類實現
  virtual void read(Context* ctx, Rin msg) = 0;
  // 下面的默認實現都是事件的透傳
  virtual void readEOF(Context* ctx) {
    ctx->fireReadEOF();
  }
  virtual void readException(Context* ctx, folly::exception_wrapper e) {
    ctx->fireReadException(std::move(e));// std::move
  }
  virtual void transportActive(Context* ctx) {
    ctx->fireTransportActive();
  }
  virtual void transportInactive(Context* ctx) {
    ctx->fireTransportInactive();
  }
};

       相應的,OutboundHandler類型定義爲:

// outbound類型的Handler (默認寫入類型和寫出類型一致,若是不一致就會產生不少的轉換)
template <class Win, class Wout = Win>
class OutboundHandler : public HandlerBase<OutboundHandlerContext<Wout>> {
public:
  static const HandlerDir dir = HandlerDir::OUT; // 方向爲輸出

  typedef folly::Unit rin;
  typedef folly::Unit rout;
  typedef Win win;
  typedef Wout wout;
  typedef OutboundHandlerContext<Wout> Context;
  virtual ~OutboundHandler() = default;

  // 純虛函數。由子類實現
  virtual folly::Future<folly::Unit> write(Context* ctx, Win msg) = 0;
  // 下面的默認實現都是事件的透傳
  virtual folly::Future<folly::Unit> writeException(
    Context* ctx, folly::exception_wrapper e) {
    return ctx->fireWriteException(std::move(e));
  }
  virtual folly::Future<folly::Unit> close(Context* ctx) {
    return ctx->fireClose();
  }
};

      前文所說,Handler全部的事件方法中只有read和write是純虛接口,這樣用戶每次實現本身的Handler時都須要override這兩個方法(即便只是完成簡單的事件透傳),所以,爲了方便用戶編寫本身的Handler,Wangle提供了HandlerAdapter,HandlerAdapter其實很簡單,就是以事件透傳的方式重寫(override)了read個write兩個方法。代碼以下:

// Handler適配器
template <class R, class W = R>
class HandlerAdapter : public Handler<R, R, W, W> {
public:
  typedef typename Handler<R, R, W, W>::Context Context;

  // 將read事件直接進行透傳
  void read(Context* ctx, R msg) override {
    ctx->fireRead(std::forward<R>(msg));
  }

  // 將write事件直接進行透傳
  folly::Future<folly::Unit> write(Context* ctx, W msg) override {
    return ctx->fireWrite(std::forward<W>(msg));
  }
};

Context

      如前文所述,Pipeline中直接管理的並非Handler,而是Context,爲了便於理解,此處再把Pipeline中的addBack源碼列出來:

template <class H>
PipelineBase& PipelineBase::addBack(std::shared_ptr<H> handler) {
  typedef typename ContextType<H>::type Context;// 聲明Conetxt類型,ContextImpl<Handler>、InboundContextImpl<Handler>、OutboundContextImpl<Handler>其中之一
  // 使用Context包裝Handler後,將其添加到pipeline中,Context中還持有pipeline的引用
  return addHelper(
           std::make_shared<Context>(shared_from_this(), std::move(handler)),
           false);// false標識添加到尾部
}

      其中,ContextType的定義以下,它會根據Handler的類型(具體來講是方向)決定Context的類型,若是Handler是雙向的,那麼Context類型爲ContextImpl<Handler>,若是Handler的方向爲IN,那麼Context類型爲InboundContextImpl<Handler>,若是Handler的方向爲OUT,那麼Context類型爲OutboundContextImpl<Handler>。

template <class Handler>
struct ContextType {
  // template< bool B, class T, class F >
  // type T if B == true, F if B == false
  typedef typename std::conditional <
  Handler::dir == HandlerDir::BOTH,        //若是是雙向
          ContextImpl<Handler>,            //類型就是ContextImpl<Handler>
          typename std::conditional<       //若是不是雙向,那麼還須要細分
          Handler::dir == HandlerDir::IN,  //若是是IN類型
          InboundContextImpl<Handler>,     //那麼類型就是InboundContextImpl<Handler>
          OutboundContextImpl<Handler>     //不然就是OutboundContextImpl<Handler>
          >::type >::type
          type;                            // Context類型
};

     其實,InboundContextImpl和OutboundContextImpl都是ContextImpl的子類,ContextImpl的繼承關係爲:

template <class H>
class ContextImpl
  : public HandlerContext<typename H::rout, typename H::wout>,
    public InboundLink<typename H::rin>,
    public OutboundLink<typename H::win>,
    public ContextImplBase<H, HandlerContext<typename H::rout, typename H::wout>>

      能夠看到,ContextImpl一個繼承自四個父類:HandlerContext、InboundLink、OutboundLink和ContextImplBase,其中HandlerContext中主要定義了以fire開頭的事件傳遞方法;InboundLink和OutboundLink分別定義了Handler中Inbound和Outbound類型的方法接口,還記得Pipeline中用於管理IN方向和OUT方向的兩個鏈表:front_和back_,它們就分別是InboundLink和OutboundLink類型;ContextImplBase主要提供了Pipeline中Context在組裝鏈表時的接口,好比:setNextIn、setNextOut,以及用於將Context綁定到handler上的attachPipeline方法。

      首先來看HandlerContext基類:

// HandlerContext定義(集inbound和outbound類型於一身)
// 以fire開始的方法都是Context中的事件方法
template <class In, class Out>
class HandlerContext {
public:
  virtual ~HandlerContext() = default;

  // inbound類型事件接口
  virtual void fireRead(In msg) = 0;
  virtual void fireReadEOF() = 0;
  virtual void fireReadException(folly::exception_wrapper e) = 0;
  virtual void fireTransportActive() = 0;
  virtual void fireTransportInactive() = 0;

  // outbound類型事件接口
  virtual folly::Future<folly::Unit> fireWrite(Out msg) = 0;
  virtual folly::Future<folly::Unit> fireWriteException(
    folly::exception_wrapper e) = 0;
  virtual folly::Future<folly::Unit> fireClose() = 0;


  virtual PipelineBase* getPipeline() = 0;
  virtual std::shared_ptr<PipelineBase> getPipelineShared() = 0;
  std::shared_ptr<folly::AsyncTransport> getTransport() {
    return getPipeline()->getTransport();
  }

  virtual void setWriteFlags(folly::WriteFlags flags) = 0;
  virtual folly::WriteFlags getWriteFlags() = 0;

  virtual void setReadBufferSettings(
    uint64_t minAvailable,
    uint64_t allocationSize) = 0;
  virtual std::pair<uint64_t, uint64_t> getReadBufferSettings() = 0;
};

    HandlerContext主要定義了以fire開頭的事件傳播方法:fireRead、fireReadEOF、fireReadException、fireTransportActive、fireTransportInactive、fireWrite、fireWriteException、fireClose,以及getPipeline用於獲取Context綁定的Pipeline、getPipelineShared以智能指針的形式獲取Pipeline、getTransport用於獲取Pipeline對應的Transport。

     根據事件流向的不一樣,Context也能夠細分定義,InboundHandlerContext定義爲:

// inbound 類型的InboundHandlerContext
template <class In>
class InboundHandlerContext {
public:
  virtual ~InboundHandlerContext() = default;

  virtual void fireRead(In msg) = 0;
  virtual void fireReadEOF() = 0;
  virtual void fireReadException(folly::exception_wrapper e) = 0;
  virtual void fireTransportActive() = 0;
  virtual void fireTransportInactive() = 0;

  virtual PipelineBase* getPipeline() = 0;
  virtual std::shared_ptr<PipelineBase> getPipelineShared() = 0;
  std::shared_ptr<folly::AsyncTransport> getTransport() {
    return getPipeline()->getTransport();
  }
};

     同理,OutboundHandlerContext定義爲:

// outbound 類型的OutboundHandlerContext
template <class Out>
class OutboundHandlerContext {
public:
  virtual ~OutboundHandlerContext() = default;

  virtual folly::Future<folly::Unit> fireWrite(Out msg) = 0;
  virtual folly::Future<folly::Unit> fireWriteException(
    folly::exception_wrapper e) = 0;
  virtual folly::Future<folly::Unit> fireClose() = 0;

  virtual PipelineBase* getPipeline() = 0;
  virtual std::shared_ptr<PipelineBase> getPipelineShared() = 0;
  std::shared_ptr<folly::AsyncTransport> getTransport() {
    return getPipeline()->getTransport();
  }
};

      如前文所述,PipelineContext主要定義瞭如何在Pipeline中組織Context鏈表的操做接口,好比setNextIn用於設置下一個IN類型的Context,setNextOut用來設置下一個OUT類型Context,具體定義以下:

class PipelineContext {
public:
  virtual ~PipelineContext() = default;

  // 依附到一個pipeline中
  virtual void attachPipeline() = 0;
  // 從pipeline中分離
  virtual void detachPipeline() = 0;

  // 將一個HandlerContext綁定到handler上
  template <class H, class HandlerContext>
  void attachContext(H* handler, HandlerContext* ctx) {
    // 只有第一次綁定的時候纔會設置
    if (++handler->attachCount_ == 1) {
      handler->ctx_ = ctx;
    } else {
      // 爲什麼在此設置的時候就爲nullptr
      handler->ctx_ = nullptr;
    }
  }

  // 設置下一個inbound類型的Context
  virtual void setNextIn(PipelineContext* ctx) = 0;
  // 設置下一個outbound類型的Context
  virtual void setNextOut(PipelineContext* ctx) = 0;

  // 獲取方向(Context方向依賴於Handler方向)
  virtual HandlerDir getDirection() = 0;
};

      ContextImplBase主要實現了PipelineContext接口方法,同時它的兩個成員:nextIn_和nextOut_就是鏈表的指針,用來串聯起整個Context。

template <class H, class Context>
class ContextImplBase : public PipelineContext {
public:
  ~ContextImplBase() = default;

  // 獲取Context綁定的Handler
  H* getHandler() {
    return handler_.get();
  }

  // Context初始化,參數爲Context所屬的Pipeline weak_ptr,Context要綁定的Handler  shared_ptr
  void initialize(std::weak_ptr<PipelineBase> pipeline,std::shared_ptr<H> handler) {
    pipelineWeak_ = pipeline;
    pipelineRaw_ = pipeline.lock().get();//裸指針
    handler_ = std::move(handler);
  }

  // PipelineContext overrides
  void attachPipeline() override {
    // 若是該Context尚未被綁定
    if (!attached_) {
      this->attachContext(handler_.get(), impl_);// 將該Context綁定到handler上
      handler_->attachPipeline(impl_); // 調用Handler的attachPipeline,有具體的Handler實現
      attached_ = true;//標記Context已經attached到一個pipeline中
    }
  }

  // 從pipeline中分離
  void detachPipeline() override {
    handler_->detachPipeline(impl_);// 調用Handler的detachPipeline,有具體的Handler實現
    // 依附標誌位爲false
    attached_ = false;
  }

  void setNextIn(PipelineContext* ctx) override {
    if (!ctx) {
      nextIn_ = nullptr;
      return;
    }

    // 轉成InboundLink,由於Context是InboundLink子類
    auto nextIn = dynamic_cast<InboundLink<typename H::rout>*>(ctx);
    if (nextIn) {
      nextIn_ = nextIn;
    } else {
      throw std::invalid_argument(folly::sformat(
                                    "inbound type mismatch after {}", folly::demangle(typeid(H))));
    }
  }

  void setNextOut(PipelineContext* ctx) override {
    if (!ctx) {
      nextOut_ = nullptr;
      return;
    }
    auto nextOut = dynamic_cast<OutboundLink<typename H::wout>*>(ctx);
    if (nextOut) {
      nextOut_ = nextOut;
    } else {
      throw std::invalid_argument(folly::sformat(
                                    "outbound type mismatch after {}", folly::demangle(typeid(H))));
    }
  }

  // 獲取Context的方向
  HandlerDir getDirection() override {
    return H::dir;
  }

protected:
  Context* impl_;                                    // 具體的Context實現
  std::weak_ptr<PipelineBase> pipelineWeak_;         //
  PipelineBase* pipelineRaw_;                        // 該Context綁定的pipeline
  std::shared_ptr<H> handler_;                       // 該Context包含的Handler
  InboundLink<typename H::rout>* nextIn_{nullptr};   // 下一個inbound類型的Context地址
  OutboundLink<typename H::wout>* nextOut_{nullptr}; // 下一個outbound類型的Context地址

private:
  bool attached_{false}; // 這個Context是否已經被綁定
};

      ContextImpl就是最終的Context實現,也就是要被添加到Pipeline中(好比使用addBack)的容器(ctxs_,inCtxs_,outCtxs_)的最終Context,在最後的finalize方法中還會進一步將容器中的Context組裝成front_和back_單向鏈表。

      ContextImpl的主要功能就是實現了各類事件傳遞方法(以fire開頭的方法),以fireRead爲例,這是一個IN類型的事件,因爲Context中持有的Pipeline是一個weak類型的指針,所以先嚐試lock,保證在事件傳播階段這個Pipeline不會銷燬,而後會去調用下一個IN類型的Context的read方法。read方法是InboundLink中定義的接口(注意這裏的read不是Handler中的也不是Pipeline中的),ContextImpl的也實現了這個read方法,它的功能很簡單,首先仍是先lock住這個Pipeline,而後直接調用Context內部包含的Handler的read方法。

template <class H>
class ContextImpl
  : public HandlerContext<typename H::rout, typename H::wout>,
    public InboundLink<typename H::rin>,
    public OutboundLink<typename H::win>,
    public ContextImplBase<H, HandlerContext<typename H::rout, typename H::wout>> {
public:
  typedef typename H::rin Rin;
  typedef typename H::rout Rout;
  typedef typename H::win Win;
  typedef typename H::wout Wout;
  static const HandlerDir dir = HandlerDir::BOTH;

  explicit ContextImpl(std::weak_ptr<PipelineBase> pipeline,std::shared_ptr<H> handler) {
    this->impl_ = this;//實現就是本身
    this->initialize(pipeline, std::move(handler));//初始化
  }

  // For StaticPipeline
  ContextImpl() {
    this->impl_ = this;
  }

  ~ContextImpl() = default;

  // HandlerContext overrides
  // Inbound類型的事件:read事件
  void fireRead(Rout msg) override {
    auto guard = this->pipelineWeak_.lock();// 鎖住,確保一旦鎖住成功,在操做期間,pipeline不會被銷燬
    // 若是尚未到最後
    if (this->nextIn_) {
      //  將事件繼續向下傳播(傳給下一個Inbound類型的Context)
      //  注意:這裏調用的是下一個Contex的read而不是fireRead
      //  即調用下一個Context裏面的Handler方法
      this->nextIn_->read(std::forward<Rout>(msg));
    } else {
      LOG(WARNING) << "read reached end of pipeline";
    }
  }

  void fireReadEOF() override {
    auto guard = this->pipelineWeak_.lock();
    if (this->nextIn_) {
      this->nextIn_->readEOF();
    } else {
      LOG(WARNING) << "readEOF reached end of pipeline";
    }
  }

  void fireReadException(folly::exception_wrapper e) override {
    auto guard = this->pipelineWeak_.lock();
    if (this->nextIn_) {
      this->nextIn_->readException(std::move(e));
    } else {
      LOG(WARNING) << "readException reached end of pipeline";
    }
  }

  void fireTransportActive() override {
    auto guard = this->pipelineWeak_.lock();
    if (this->nextIn_) {
      this->nextIn_->transportActive();
    }
  }

  void fireTransportInactive() override {
    auto guard = this->pipelineWeak_.lock();
    if (this->nextIn_) {
      this->nextIn_->transportInactive();
    }
  }

  //Outbound類型的事件傳播
  folly::Future<folly::Unit> fireWrite(Wout msg) override {
    auto guard = this->pipelineWeak_.lock();
    if (this->nextOut_) {
      return this->nextOut_->write(std::forward<Wout>(msg));
    } else {
      LOG(WARNING) << "write reached end of pipeline";
      // 若是到了最後,返回一個future
      return folly::makeFuture();
    }
  }

  folly::Future<folly::Unit> fireWriteException(
    folly::exception_wrapper e) override {
    auto guard = this->pipelineWeak_.lock();
    if (this->nextOut_) {
      return this->nextOut_->writeException(std::move(e));
    } else {
      LOG(WARNING) << "close reached end of pipeline";
      return folly::makeFuture();
    }
  }

  folly::Future<folly::Unit> fireClose() override {
    auto guard = this->pipelineWeak_.lock();
    if (this->nextOut_) {
      return this->nextOut_->close();
    } else {
      LOG(WARNING) << "close reached end of pipeline";
      return folly::makeFuture();
    }
  }
 
  // 獲取Context綁定的pipeline指針
  PipelineBase* getPipeline() override {
    return this->pipelineRaw_;
  }

  // 獲取Context綁定的pipeline引用
  std::shared_ptr<PipelineBase> getPipelineShared() override {
    return this->pipelineWeak_.lock();
  }

  // 設置和獲取wirte標誌位
  void setWriteFlags(folly::WriteFlags flags) override {
    this->pipelineRaw_->setWriteFlags(flags);
  }

  folly::WriteFlags getWriteFlags() override {
    return this->pipelineRaw_->getWriteFlags();
  }

  // 設置read緩衝區參數 minAvailable、allocationSize
  void setReadBufferSettings(
    uint64_t minAvailable,
    uint64_t allocationSize) override {
    this->pipelineRaw_->setReadBufferSettings(minAvailable, allocationSize);
  }

  std::pair<uint64_t, uint64_t> getReadBufferSettings() override {
    return this->pipelineRaw_->getReadBufferSettings();
  }

  // InboundLink overrides
  void read(Rin msg) override {
    // 保證pipeline不會被刪除
    auto guard = this->pipelineWeak_.lock();
    // 調用該Context綁定的Handler的read方法,至於事件是都須要繼續傳播,徹底受read中的實現
    this->handler_->read(this, std::forward<Rin>(msg));
  }

  void readEOF() override {
    auto guard = this->pipelineWeak_.lock();
    this->handler_->readEOF(this);
  }

  void readException(folly::exception_wrapper e) override {
    auto guard = this->pipelineWeak_.lock();
    this->handler_->readException(this, std::move(e));
  }

  void transportActive() override {
    auto guard = this->pipelineWeak_.lock();
    this->handler_->transportActive(this);
  }

  void transportInactive() override {
    auto guard = this->pipelineWeak_.lock();
    this->handler_->transportInactive(this);
  }

  // OutboundLink overrides
  folly::Future<folly::Unit> write(Win msg) override {
    auto guard = this->pipelineWeak_.lock();
    return this->handler_->write(this, std::forward<Win>(msg));
  }

  folly::Future<folly::Unit> writeException(
    folly::exception_wrapper e) override {
    auto guard = this->pipelineWeak_.lock();
    return this->handler_->writeException(this, std::move(e));
  }

  folly::Future<folly::Unit> close() override {
    auto guard = this->pipelineWeak_.lock();
    return this->handler_->close(this);
  }
};

      一樣,Context也能夠根據傳輸方向進行細分,首先是InboundContextImpl:

template <class H>
class InboundContextImpl
  : public InboundHandlerContext<typename H::rout>,
    public InboundLink<typename H::rin>,
    public ContextImplBase<H, InboundHandlerContext<typename H::rout>> {
public:
  typedef typename H::rin Rin;
  typedef typename H::rout Rout;
  typedef typename H::win Win;
  typedef typename H::wout Wout;
  static const HandlerDir dir = HandlerDir::IN;

  explicit InboundContextImpl(
    std::weak_ptr<PipelineBase> pipeline,
    std::shared_ptr<H> handler) {
    this->impl_ = this;
    this->initialize(pipeline, std::move(handler));
  }

  // For StaticPipeline
  InboundContextImpl() {
    this->impl_ = this;
  }

  ~InboundContextImpl() = default;

  // InboundHandlerContext overrides
  void fireRead(Rout msg) override {
    auto guard = this->pipelineWeak_.lock();
    if (this->nextIn_) {
      this->nextIn_->read(std::forward<Rout>(msg));
    } else {
      LOG(WARNING) << "read reached end of pipeline";
    }
  }

  void fireReadEOF() override {
    auto guard = this->pipelineWeak_.lock();
    if (this->nextIn_) {
      this->nextIn_->readEOF();
    } else {
      LOG(WARNING) << "readEOF reached end of pipeline";
    }
  }

  void fireReadException(folly::exception_wrapper e) override {
    auto guard = this->pipelineWeak_.lock();
    if (this->nextIn_) {
      this->nextIn_->readException(std::move(e));
    } else {
      LOG(WARNING) << "readException reached end of pipeline";
    }
  }

  void fireTransportActive() override {
    auto guard = this->pipelineWeak_.lock();
    if (this->nextIn_) {
      this->nextIn_->transportActive();
    }
  }

  void fireTransportInactive() override {
    auto guard = this->pipelineWeak_.lock();
    if (this->nextIn_) {
      this->nextIn_->transportInactive();
    }
  }

  PipelineBase* getPipeline() override {
    return this->pipelineRaw_;
  }

  std::shared_ptr<PipelineBase> getPipelineShared() override {
    return this->pipelineWeak_.lock();
  }

  // InboundLink overrides
  void read(Rin msg) override {
    auto guard = this->pipelineWeak_.lock();
    this->handler_->read(this, std::forward<Rin>(msg));
  }

  void readEOF() override {
    auto guard = this->pipelineWeak_.lock();
    this->handler_->readEOF(this);
  }

  void readException(folly::exception_wrapper e) override {
    auto guard = this->pipelineWeak_.lock();
    this->handler_->readException(this, std::move(e));
  }

  void transportActive() override {
    auto guard = this->pipelineWeak_.lock();
    this->handler_->transportActive(this);
  }

  void transportInactive() override {
    auto guard = this->pipelineWeak_.lock();
    this->handler_->transportInactive(this);
  }
};

       其次是OutboundContextImpl:

template <class H>
class OutboundContextImpl
  : public OutboundHandlerContext<typename H::wout>,
    public OutboundLink<typename H::win>,
    public ContextImplBase<H, OutboundHandlerContext<typename H::wout>> {
public:
  typedef typename H::rin Rin;
  typedef typename H::rout Rout;
  typedef typename H::win Win;
  typedef typename H::wout Wout;
  static const HandlerDir dir = HandlerDir::OUT;

  explicit OutboundContextImpl(
    std::weak_ptr<PipelineBase> pipeline,
    std::shared_ptr<H> handler) {
    this->impl_ = this;
    this->initialize(pipeline, std::move(handler));
  }

  // For StaticPipeline
  OutboundContextImpl() {
    this->impl_ = this;
  }

  ~OutboundContextImpl() = default;

  // OutboundHandlerContext overrides
  folly::Future<folly::Unit> fireWrite(Wout msg) override {
    auto guard = this->pipelineWeak_.lock();
    if (this->nextOut_) {
      return this->nextOut_->write(std::forward<Wout>(msg));
    } else {
      LOG(WARNING) << "write reached end of pipeline";
      return folly::makeFuture();
    }
  }

  folly::Future<folly::Unit> fireWriteException(
    folly::exception_wrapper e) override {
    auto guard = this->pipelineWeak_.lock();
    if (this->nextOut_) {
      return this->nextOut_->writeException(std::move(e));
    } else {
      LOG(WARNING) << "close reached end of pipeline";
      return folly::makeFuture();
    }
  }

  folly::Future<folly::Unit> fireClose() override {
    auto guard = this->pipelineWeak_.lock();
    if (this->nextOut_) {
      return this->nextOut_->close();
    } else {
      LOG(WARNING) << "close reached end of pipeline";
      return folly::makeFuture();
    }
  }

  PipelineBase* getPipeline() override {
    return this->pipelineRaw_;
  }

  std::shared_ptr<PipelineBase> getPipelineShared() override {
    return this->pipelineWeak_.lock();
  }

  // OutboundLink overrides
  folly::Future<folly::Unit> write(Win msg) override {
    auto guard = this->pipelineWeak_.lock();
    return this->handler_->write(this, std::forward<Win>(msg));
  }

  folly::Future<folly::Unit> writeException(
    folly::exception_wrapper e) override {
    auto guard = this->pipelineWeak_.lock();
    return this->handler_->writeException(this, std::move(e));
  }

  folly::Future<folly::Unit> close() override {
    auto guard = this->pipelineWeak_.lock();
    return this->handler_->close(this);
  }
};

        按照慣例,仍是來一張圖總結一下吧:

              

本系列文章

Wangle源碼分析:Service

Wangle源碼分析:ServerBootstrap

Wangle源碼分析:編解碼Handler 

Wangle源碼分析:EventBaseHandler、AsyncSocketHandler 

Wangle源碼分析:Pipeline、Handler、Context

Wangle源碼分析:ClientBootstrap

相關文章
相關標籤/搜索