在第捌章,IO系統(二)中,生產者DataReader提供了外部消費接口:html
class DataReader { public: ......... BlockingQueue<Datum*>& free() const { return ptr_pair->free; } BlockingQueue<Datum*>& full() const { return ptr_pair->full; } ......... };
生產者DataReader自己繼承了線程DragonThread,在其異步的線程工做函數中interfaceKernel()中,數據庫
不斷地從pair的Free阻塞隊列取出空Datum,在read_one()用KV數據庫內容填充,再塞到Full隊列中,如圖:數組
做爲消費者(DataLayer),在從Datum得到數據後,當即作一份Copy,再把Datum塞回到Free隊列中,繼續生產。網絡
整個過程就好像是一個工廠生產的循環鏈,Datum就比如一個包裝盒。異步
生產者將產品放置其中,傳遞包裝盒給消費者。消費者從中取出產品,讓生產者回收包裝盒。async
在第拾章,IO系統(三)中,變形者DataTransformer提供了數據變形的基接口:函數
void DataTransformer<Dtype>::transform(const Datum& datum, Dtype* shadow_data)
仔細觀察一下transform的兩個參數,你會發現整個transform過程,就是將Datum數據Copy到shadow_data的數組裏。fetch
這就是上節提到的「Copy」過程——從包裝盒中取出產品,再變形加工。this
加工放置的數組,之因此叫shadow_data,是由於它映射的是一個Blob的局部內存。spa
回憶一下Blob的shape,[batch_size,channels,height,width],即可知,一個Datum僅僅是一個Blob的1/batch_size。
讓Transformer對映射的內存處理,避免了直接對Datum變形的不便。映射的內存空間,就是最終成品的實際空間,如圖:
在上圖中,Transformer提供了一個由Datum堆砌成Blob的途徑。
咱們只須要給Transformer提供Datum元素,以及一段內存空間(數組首指針)便可。
爲了保證內存空間提供的正確性,有兩點須要保障:
①每一個Datum在Blob的偏移位置必須計算出來,第玖章BlobFlow給了一點偏移的思路,
只要偏移offset=Blob.offset(i)便可,i 爲一個Batch內的樣本數據下標。
②內存空間,也就是Blob具體的shape必須提早計算出來,並且必須啓動SyncedMemory自動機,分配實際內存。
考慮一個Blob的shape,[batch_size,channels,height,width],後三個shape均可以由Datum推斷出來。
至於batch_size,是一個由使用者提供的超參,能夠根據網絡定義直接獲取。
由Datum推理channe/height/width,由DataTransformer的inferBlobShape完成,在第拾章IO系統(三)已經給出。
第捌章IO系統(二)介紹了LayerParameter中的prefetch概念。
在構造一個DataReader時,指定了默認Pair的緩衝區大小:
DataReader::DataReader(const LayerParameter& param){ ptr_pair.reset(new QueuePair( param.data_param().prefech()*param.data_param().batch_size())); ........ }
total_size=prefetch*batch_size
這個大小代表了DataReader須要預緩衝prefetch個Batch,每一個Batch有batch_size個Datum單元。
在 單生產者單緩衝區 一節的最後,討論了多GPU下,如何使用單Pair的補救措施:
這多是Caffe源碼的本意。在這種方案中,DataReader和DataLayer是無須改動代碼的。
只要咱們加大DataParameter裏的prefech數值,讓CPU多緩衝幾個Batch,爲多個GPU準備就行了。
prefetch的數量由用戶指定,並且也是一個上界,並且顯然不能所有將整個KV數據庫prefetch完。
因而,以Batch爲單位的二級封裝,須要一個二級生產者和消費者,並且一樣是異步的,如IO系統(二)的圖:
二級生產者,在Caffe裏就是DataLayer衍生的線程。二級消費者,偏偏就是DataLayer自己。
DataLayer集二級生產者與消費者於一體,這歸功於面向對象技術的多重繼承。
最終使用的是DataLayer,被拆解成3個類BaseDataLayer、BasePrefetchingDataLayer、DataLayer。
三個類負責不一樣的任務,你也能夠整合在一塊兒寫。
二級C++最喜歡考繼承類的執行順序,固然,這裏搞清楚這點相當重要。
除了基本的類構造函數外,咱們還須要考慮Layer類setup的具體函數layerSetup。
幾個DataLayer的layerSetup至關混亂,幾乎每一個都各司其職,①②③④順序不能顛倒。
完成所有setup以後,才能讓二級生產者工做。
生產單位以一個Batch爲單元,每一個Batch包含DataBlob和LabelBlob(可選)。
二級緩衝區構建於BasePrefetchingDataLayer類中。
template<typename Dtype> class BasePrefetchingDataLayer :public BaseDataLayer<Dtype>,public DragonThread { public: ....... const int PREFETCH_COUNT; protected: ....... Batch<Dtype>* prefetch; BlockingQueue<Batch<Dtype>*> free; BlockingQueue<Batch<Dtype>*> full; };
產能上界由常數PREFETCH_COUNT指定,來源於proto參數DataParamter裏prefetch大小。
在BasePrefetchingDataLayer構造函數中,用new申請等量的堆內存prefetch。
注意這裏不要使用shared_ptr,比較麻煩,並且Batch有可能會被智能指針提早釋放,應當手動析構。
能夠看到,默認提供了和DataReader相似的消費者接口free/full,不過這消費的是Batch,而不是Datum。
沒有用函數封裝,是由於DataLayer本身生產,本身消費。
同DataReader的一級生產相似,二級生產須要從free隊列pop,填充,再塞入full。
生產過程於BasePrefetchingDataLayer的interfaceKernel函數中。
因爲多重繼承的關係,interfaceKernel函數來自父類DragonThread
template<typename Dtype> void BasePrefetchingDataLayer<Dtype>::interfaceKernel(){ try{ while (!must_stop()){ Batch<Dtype> *batch = free.pop(); //batch has already reshape in dataLayerSetup loadBatch(batch); // pure abstract function full.push(batch); //product } } catch (boost::thread_interrupted&) {} }
loadBatch函數與DataReader的read_one函數效果相似,負責填充batch。
第貳章主存模型末尾介紹了SyncedMemory異步提交顯存Memcpy的方法。
第玖章BlobFlow中,且已知SyncedMemory隸屬於Blob的成員變量。
當數據緩衝至Blob級別時,就須要考慮提早向顯存複製數據了。
第陸章IO系統(一)開頭給了這張圖:
能夠看到,DataLayer處於CPU與GPU的分界點,DataLayer源輸入由CPU主控,存於內存。
而DataLayer的下一層是計算層,源輸入必須存於顯存。
因而,儘管DataLayer的前向傳播函數forward(bottom,top)只是複製數據,可是更重要的是轉換數據。
在上一節的CPU異步線程工做函數interfaceKernel中,咱們能夠看到,Batch(Blob)級別已經構成,
而此時整個神經網絡Net可能正在初始化,距離Net正式啓動前向傳播函數Net.forward(),須要顯存數據,還有一段時間。
利用這段時間,能夠利用CUDA的異步流預先由內存向顯存轉換數據,據此,完善interfaceKernel函數:
template<typename Dtype> void BasePrefetchingDataLayer<Dtype>::interfaceKernel(){ // create GPU async stream // speed up memcpy between CPU and GPU // because cudaMemcpy will be called frequently // rather than malloc gpu memory firstly(just call cudaMemcpy) #ifndef CPU_ONLY cudaStream_t stream; if (Dragon::get_mode() == Dragon::GPU) CUDA_CHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking)); #endif try{ while (!must_stop()){ Batch<Dtype> *batch = free.pop(); //batch has already reshape in dataLayerSetup loadBatch(batch); // pure abstract function #ifndef CPU_ONLY if (Dragon::get_mode() == Dragon::GPU){ batch->data.data()->async_gpu_data(stream); // blocking this thread until host->device memcpy finished CUDA_CHECK(cudaStreamSynchronize(stream)); } #endif full.push(batch); //product } } catch (boost::thread_interrupted&) {} // destroy async stream #ifndef CPU_ONLY if (Dragon::get_mode() == Dragon::GPU) CUDA_CHECK(cudaStreamDestroy(stream)); #endif }
使用異步流,須要用cudaStreamCreateWithFlags申請Flag爲cudaStreamNonBlocking的流。
cudaStreamNonBlocking的值爲0x1,表明此流非默認Memcpy流(默認流)
與之相對的是Flag爲cudaStreamDefault的流,值爲0x0,這是主複製流,cudaMemcpy的任務所有提交於此。
使用Blob內提交異步流的函數async_gpu_data(stream)[稍後給出]後,須要當即阻塞(同步)該CPU線程。
使用cudaStreamSynchronize(stream),直到GPU返回複製完畢信號以前,CPU一直同步在本行代碼。
最後,須要釋放異步流。
即DataLayer的forward函數。
因爲大量工做已經在父類中作完,DataLayer的消費函數相對簡單。
template <typename Dtype> void DataLayer<Dtype>::forward_cpu(const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top){ // consume Batch<Dtype> *batch = full.pop("DataLayer prefectching queue is now empty"); dragon_copy<Dtype>(batch->data.count(), top[0]->mutable_cpu_data(), batch->data.cpu_data()); if (has_labels) dragon_copy(batch->label.count(), top[1]->mutable_cpu_data(), batch->label.cpu_data()); free.push(batch); }
直接訪問full隊列獲取一個可用的Batch,完成消費。
將batch數據(data/label)分別複製到top裏,完成Blob的Flow,提供給下一層計算。
template <typename Dtype> void DataLayer<Dtype>::forward_gpu(const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top){ Batch<Dtype> *batch = full.pop("DataLayer prefectching queue is now empty"); dragon_gpu_copy(batch->data.count(), top[0]->mutable_gpu_data(), batch->data.gpu_data()); if (has_labels) dragon_gpu_copy(batch->label.count(), top[1]->mutable_gpu_data(), batch->label.gpu_data()); free.push(batch); }
GPU版本,直接替換copy函數爲GPU版本便可。
(注:Caffe在forward_gpu()最後,對默認流的強制同步是沒有必要的。
Memcpy也自己不是異步執行,不須要額外同步。對默認流同步,也不會影響異步流)