thrift之TTransport層的緩存傳輸類TBufferedTransport和緩衝基類TBufferBase

本節主要介紹緩衝相關的傳輸類,緩存的做用就是爲了提升讀寫的效率。Thrift在實現緩存傳輸的時候首先創建一個緩存的基類,而後須要實現緩存功能的類均可以直接從這個基類繼承。下面就詳細分析這個基類以及一個具體的實現類。
  緩存基類TBufferBase
  緩存基類就是讓傳輸類全部的讀寫函數都提供緩存來提升性能。它在一般狀況下采用memcpy來設計和實現快路徑的讀寫訪問操做,這些操做函數通常都是小、非虛擬和內聯函數。TBufferBase是一個抽象的基類,子類必須實現慢路徑的讀寫函數等操做,慢路徑的讀寫等操做主要是爲了在緩存已經滿或空的狀況下執行。首先看看緩存基類的定義,代碼以下:apache

  class TBufferBase : public TVirtualTransport<TBufferBase> {
   public:
    uint32_t read(uint8_t* buf, uint32_t len) {//讀函數
      uint8_t* new_rBase = rBase_ + len;//獲得須要讀到的緩存邊界
      if (TDB_LIKELY(new_rBase <= rBound_)) {//判斷緩存是否有足夠的數據可讀,採用了分支預測技術
        std::memcpy(buf, rBase_, len);//直接內存拷貝
        rBase_ = new_rBase;//更新新的緩存讀基地址
        return len;//返回讀取的長度
      }
      return readSlow(buf, len);//若是緩存已經不可以知足讀取長度須要就執行慢讀
    }
    uint32_t readAll(uint8_t* buf, uint32_t len) {
      uint8_t* new_rBase = rBase_ + len;//同read函數
      if (TDB_LIKELY(new_rBase <= rBound_)) {
        std::memcpy(buf, rBase_, len);
        rBase_ = new_rBase;
        return len;
      }
      return apache::thrift::transport::readAll(*this, buf, len);//調用父類的
    }
    void write(const uint8_t* buf, uint32_t len) {//快速寫函數
      uint8_t* new_wBase = wBase_ + len;//寫入後的新緩存基地址
      if (TDB_LIKELY(new_wBase <= wBound_)) {//判斷緩存是否有足夠的空間能夠寫入
        std::memcpy(wBase_, buf, len);//內存拷貝
        wBase_ = new_wBase;//更新基地址
        return;
      }
      writeSlow(buf, len);//緩存空間不足就調用慢寫函數
    }
    const uint8_t* borrow(uint8_t* buf, uint32_t* len) {//快速路徑借
      if (TDB_LIKELY(static_cast<ptrdiff_t>(*len) <= rBound_ - rBase_)) {//判斷是否足夠借的長度
        *len = static_cast<uint32_t>(rBound_ - rBase_);
        return rBase_;//返回借的基地址
      }
      return borrowSlow(buf, len);//不足就採用慢路徑借
    }
    void consume(uint32_t len) {//消費函數
      if (TDB_LIKELY(static_cast<ptrdiff_t>(len) <= rBound_ - rBase_)) {//判斷緩存是否夠消費
        rBase_ += len;//更新已經消耗的長度
      } else {
        throw TTransportException(TTransportException::BAD_ARGS,
                                  "consume did not follow a borrow.");//不足拋異常
      }
    }
   protected:
    virtual uint32_t readSlow(uint8_t* buf, uint32_t len) = 0;//慢函數
    virtual void writeSlow(const uint8_t* buf, uint32_t len) = 0;
    virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) = 0;
    TBufferBase()
      : rBase_(NULL)
      , rBound_(NULL)
      , wBase_(NULL)
      , wBound_(NULL)
    {}//構造函數,把全部的緩存空間設置爲NULL
    void setReadBuffer(uint8_t* buf, uint32_t len) {//設置讀緩存空間地址
      rBase_ = buf;//讀緩存開始地址
      rBound_ = buf+len;//讀緩存地址界限
    }
    void setWriteBuffer(uint8_t* buf, uint32_t len) {//設置寫緩存地址空間
      wBase_ = buf;//
      wBound_ = buf+len;//邊界
    }
    virtual ~TBufferBase() {}
    uint8_t* rBase_;//讀從這兒開始
    uint8_t* rBound_;//讀界限
    uint8_t* wBase_;//寫開始地址
    uint8_t* wBound_;//寫界限
  };

  從TBufferBase定義能夠看出,它也是從虛擬類繼承,主要採用了memcpy函數來實現緩存的快速讀取,在判斷是否有足夠的緩存空間能夠操做時採用了分支預測技術來提供代碼的執行效率,且全部快路徑函數都是非虛擬的、內聯的小代碼量函數。下面繼續看看一個具體實現緩存基類的一個子類的狀況!
  TBufferedTransport
  緩存傳輸類是從緩存基類繼承而來,它對於讀:實際讀數據的大小比實際請求的大不少,多餘的數據將爲未來超過本地緩存的數據服務;對於寫:數據在它被髮送出去之前將被先寫入內存緩存。
  緩存的大小默認是512字節(代碼:static const int DEFAULT_BUFFER_SIZE = 512;),提供多個構造函數,能夠只指定一個傳輸類(另外一層次的)、也能夠指定讀寫緩存公用的大小或者分別指定。由於它是一個能夠實際使用的緩存類,因此須要實現慢讀和慢寫功能的函數。它還實現了打開函數open、關閉函數close、刷新函數flush等,判斷是否有數據處於未決狀態函數peek定義和實現以下:緩存

    bool peek() {
      if (rBase_ == rBound_) {//判斷讀的基地址與讀邊界是否重合了,也就是已經讀取完畢
        setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_));//是:從新讀取底層來的數據
      }
      return (rBound_ > rBase_);//邊界大於基地址就是有未決狀態數據
    }
  下面繼續看看慢讀函數和慢寫函數的實現細節(快讀和快寫繼承基類的:也就是默認的讀寫都是直接從緩存中讀取,所謂的快讀和快寫)。慢讀函數實現以下(詳細註釋):
  uint32_t TBufferedTransport::readSlow(uint8_t* buf, uint32_t len) {
    uint32_t have = rBound_ - rBase_;//計算還有多少數據在緩存中
  
    // 若是讀取緩存中已經存在的數據不能知足咱們,
    // 咱們(也僅僅在這種狀況下)應該才從慢路徑讀數據。
    assert(have < len);
  
    // 若是咱們有一些數據在緩存,拷貝出來並返回它
    // 咱們不得不返回它而去嘗試讀更多的數據,由於咱們不能保證
    // 下層傳輸實際有更多的數據, 所以嘗試阻塞式讀取它。
    if (have > 0) {
      memcpy(buf, rBase_, have);//拷貝數據
      setReadBuffer(rBuf_.get(), 0);//設置讀緩存,基類實現該函數
      return have;//返回緩存中已經存在的不完整數據
    }
  
    // 在咱們的緩存中沒有更多的數據可用。從下層傳輸獲得更多以達到buffer的大小。
    // 注意若是len小於rBufSize_可能會產生多種場景不然幾乎是沒有意義的。
    setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_));//讀取數據並設置讀緩存
  
    // 處理咱們已有的數據
    uint32_t give = std::min(len, static_cast<uint32_t>(rBound_ - rBase_));
    memcpy(buf, rBase_, give);
    rBase_ += give;
  
    return give;
  }

  慢讀函數主要考慮的問題就是緩存中還有一部分數據,可是不夠咱們須要讀取的長度;還有比較麻煩的狀況是雖然如今緩存中沒有數據,可是咱們從下層傳輸去讀,讀取的長度可能大於、小於或等於咱們須要讀取的長度,因此須要考慮各類狀況。下面繼續分析慢寫函數實現細節:函數

  void TBufferedTransport::writeSlow(const uint8_t* buf, uint32_t len) {
    uint32_t have_bytes = wBase_ - wBuf_.get();//計算寫緩存區中已有的字節數
    uint32_t space = wBound_ - wBase_;//計算剩餘寫緩存空間
    // 若是在緩存區的空閒空間不能容納咱們的數據,咱們採用慢路徑寫(僅僅)
    assert(wBound_ - wBase_ < static_cast<ptrdiff_t>(len));
  
       //已有數據加上須要寫入的數據是否大於2倍寫緩存區或者緩存區爲空
    if ((have_bytes + len >= 2*wBufSize_) || (have_bytes == 0)) {
      if (have_bytes > 0) {//緩存大於0且加上須要再寫入數據的長度大於2倍緩存區
        transport_->write(wBuf_.get(), have_bytes);//先將已有數據寫入下層傳輸
      }
      transport_->write(buf, len);//寫入此次的len長度的數據
      wBase_ = wBuf_.get();//從新獲得寫緩存的基地址
      return;
    }
  
    memcpy(wBase_, buf, space);//填充咱們的內部緩存區爲了寫
    buf += space;
    len -= space;
    transport_->write(wBuf_.get(), wBufSize_);//寫入下層傳輸
  
    assert(len < wBufSize_);
    memcpy(wBuf_.get(), buf, len);//拷貝剩餘的數據到咱們的緩存
    wBase_ = wBuf_.get() + len;//從新獲得寫緩存基地址
    return;
  }

  慢寫函數也有棘手的問題,就是咱們應該拷貝咱們的數據到咱們的內部緩存而且從那兒發送出去,或者咱們應該僅僅用一次系統調用把當前內部寫緩存區的內容寫出去,而後再用一次系統調用把咱們當前須要寫入長度爲len的數據再次寫入出去。若是當前緩存區的數據加上咱們此次須要寫入數據的長度至少是咱們緩存區長度的兩倍,咱們將不得不至少調用兩次系統調用(緩存區爲空時有可能例外),那麼咱們就不拷貝了。不然咱們就是按順序遞加的。具體實現分狀況處理,最後咱們在看看慢借函數的實現,借相關函數主要是爲了實現可變長度編碼。慢借函數實現細節以下:性能

  const uint8_t* TBufferedTransport::borrowSlow(uint8_t* buf, uint32_t* len) {
    (void) buf;
    (void) len;
    return NULL;//默認返回空
  }

  在這個類咱們能夠看出,它什麼也沒有作,只是簡單的返回NULL,因此須要阻塞去借。按照官方的說法,下面兩種行爲應該當前的版本中實現,在未來的版本可能會發生改變:
  若是須要借的長度最多爲緩存區的長度,那麼永遠不會返回NULL。依賴底層傳輸,它應該拋出一個異常或者永遠不會掛掉;
  一些借用請求可能內部字節拷貝,若是借用的長度最可能是緩存區的一半,那麼不去內部拷貝。爲了優化性能保存這個限制。優化

相關文章
相關標籤/搜索