幀傳輸類就是按照一幀的固定大小來傳輸數據,全部的寫操做首先都是在內存中完成的直到調用了flush操做,而後傳輸節點在flush操做以後將全部數據根據數據的有效載荷寫入數據的長度的二進制塊發送出去,容許在接收的另外一端按照固定的長度來讀取。
幀傳輸類一樣仍是從緩存基類TBufferBase繼承而來,實現的接口固然也基本相同,只是實現的方式不一樣而已,下面就來看看具體的實現過程和原理。
這個類所採用的默認緩存長度是512(static const int DEFAULT_BUFFER_SIZE = 512;),兩個基本構造函數一個採用默認的緩存長度,另外一個能夠指定一個須要的緩存長度。下面仍是重點分析慢讀、讀幀等操做的實現過程:
(1)慢讀實現以下:緩存
uint32_t TFramedTransport::readSlow(uint8_t* buf, uint32_t len) { uint32_t want = len;//想要讀取的長度 uint32_t have = rBound_ - rBase_;//內存緩存中已經有的數據的長度 assert(have < want);//若是之後數據長度知足須要讀的長度就不須要採用慢讀 // 若是咱們有一些數據在緩存,拷貝出來而且返回它。 // 咱們沒有試圖讀取更多的數據而是不得不返回它,由於咱們不能保證在下面的 // 傳輸層實際上有更多的數據,所以應該嘗試阻塞式讀它。 if (have > 0) { memcpy(buf, rBase_, have);//拷貝出緩存中已有的數據 setReadBuffer(rBuf_.get(), 0);//從新設置緩存基地址 return have;//返回 } // 讀取另外一幀。 if (!readFrame()) { // EOF. No frame available. return 0; } // 處理咱們已有的數據 uint32_t give = std::min(want, static_cast<uint32_t>(rBound_ - rBase_));//已有數據想要讀取長度取短的 memcpy(buf, rBase_, give);//拷貝 rBase_ += give;//調整緩存基地址 want -= give;//計算還有多少想要的數據沒有獲得 return (len - want);//返回實際讀取長度 }
緩存中沒有數據的時候就會調用讀取幀的函數readFrame,這個函數實現以下:安全
bool TFramedTransport::readFrame() { //首先讀下一幀數據的長度 int32_t sz;//存放長度的變量 uint32_t size_bytes_read = 0;//讀取長度數據的字節數 while (size_bytes_read < sizeof(sz)) {//表示長度的數據小於存放長度數據的字節數 uint8_t* szp = reinterpret_cast<uint8_t*>(&sz) + size_bytes_read;//長度變量轉換爲指針 uint32_t bytes_read = transport_->read(szp, sizeof(sz) - size_bytes_read);//讀取 if (bytes_read == 0) {//若是返回爲0表示沒有數據了 if (size_bytes_read == 0) {//沒有任何數據讀到,返回false return false; } else { // 部分的幀頭部,拋出異常。 throw TTransportException(TTransportException::END_OF_FILE, "No more data to read after " "partial frame header."); } } size_bytes_read += bytes_read;//以讀取的長度 } sz = ntohl(sz);//長整數的網絡字節序轉換爲主機字節序 if (sz < 0) {//幀的長度不能是負數澀,拋出異常 throw TTransportException("Frame size has negative value"); } // 讀取有效數據負載,從新設置緩存標記。 if (sz > static_cast<int32_t>(rBufSize_)) { rBuf_.reset(new uint8_t[sz]);//接收基地址 rBufSize_ = sz;//緩存大小 } transport_->readAll(rBuf_.get(), sz);//調用readAll讀取sz長度的數據 setReadBuffer(rBuf_.get(), sz);//設置讀緩存基地址 return true; }
從上面實現代碼看出,在按幀讀取的過程當中,首先須要讀取這一幀的頭部信息,而這個頭部信息就是這一幀的長度,後面就根據頭部信息中給定的長度來讀取數據部分,讀出來的數據放入緩存中。讀取頭部信息時注意處理異常的狀況,還有就是讀出來的數據須要通過網絡字節序到主機字節序的轉換。下面繼續看慢寫函數和flush刷新函數的實現過程,慢寫函數實現以下(快讀和快寫基類TBufferBase的實現已經知足要求了,因此不須要再去單獨實現了):網絡
void TFramedTransport::writeSlow(const uint8_t* buf, uint32_t len) { // 直到有足夠的雙緩衝大小 uint32_t have = wBase_ - wBuf_.get();//緩存空間已經有多少數據 uint32_t new_size = wBufSize_; if (len + have < have /* overflow */ || len + have > 0x7fffffff) {//若是長度溢出或大於2GB了 throw TTransportException(TTransportException::BAD_ARGS, "Attempted to write over 2 GB to TFramedTransport.");//拋出異常 } while (new_size < len + have) {//緩存空間的長度小於已有數據的長度和須要寫入數據長度的和 new_size = new_size > 0 ? new_size * 2 : 1;若是緩存空間長度是大於0的話就擴容一倍的空間 } uint8_t* new_buf = new uint8_t[new_size];// 分配新空間 memcpy(new_buf, wBuf_.get(), have);// 拷貝已有的數據到新空間. wBuf_.reset(new_buf);// 緩存地址從新設置 wBufSize_ = new_size;// 緩存新長度 wBase_ = wBuf_.get() + have;//新的開始寫入地址 wBound_ = wBuf_.get() + wBufSize_;//寫入界限 memcpy(wBase_, buf, len);//拷貝數據到新緩存地址 wBase_ += len;//更新緩存基地址 }
上面代碼就是實現把從上層傳輸的數據寫入緩存中以供下層發送使用,這段代碼須要注意的是while循環,這個while循環保證有足夠的緩存來存放寫入的數據到緩存中,每次增加的長度是上次的一倍;還須要注意的是,分配了新的空間須要把原來尚未真正寫入的數據拷貝到新緩存中來,否則就會形成內容丟失;最後就是更新緩存的基地址和長度等描述緩存的信息。繼續看flush函數的實現代碼:函數
void TFramedTransport::flush() { int32_t sz_hbo, sz_nbo; assert(wBufSize_ > sizeof(sz_nbo));//斷言緩存長度應該大於個字節sizeof(int32_t) sz_hbo = wBase_ - (wBuf_.get() + sizeof(sz_nbo));// 滑動到第一幀數據的開始位置。 sz_nbo = (int32_t)htonl((uint32_t)(sz_hbo));//主機字節序轉換爲網絡字節序 memcpy(wBuf_.get(), (uint8_t*)&sz_nbo, sizeof(sz_nbo));//頭部長度拷貝寫緩存 if (sz_hbo > 0) {//保證緩存有須要寫入的數據 //若是底層傳輸寫拋出了異常注意確保咱們處於安全的狀態 //(例如內部緩衝區清理),重置咱們寫入前的狀態(由於底層沒有傳輸成功) wBase_ = wBuf_.get() + sizeof(sz_nbo);//獲得 // 寫入長度和幀 transport_->write(wBuf_.get(), sizeof(sz_nbo)+sz_hbo); } // 刷新底層傳輸. transport_->flush(); }
刷新函數就是把緩存中的數據真正的發送出去,可是在寫入到底層時,底層傳輸可能不會真正成功(如網絡忽然斷了),這個時候底層會拋出異常,那麼咱們須要捕獲異常,以便從新處理這些數據,只有數據真正寫入成功的時候咱們才計算咱們寫如數據的長度。因此還有寫結束和讀結束函數writeEnd、readEnd,它們都只有簡單的一句代碼就是計算真正完成讀寫數據的長度。
整個按幀傳輸的類的功能介紹完畢了,主要須要注意的就是緩存的操做,保證數據不丟失。未來實現考慮分配內存使用c語言的malloc類函數,而不是使用new操做,這樣也能提升很多的效率。ui