從零開始山寨Caffe·拾貳:IO系統(四)

消費者

回憶:生產者提供產品的接口

在第捌章,IO系統(二)中,生產者DataReader提供了外部消費接口:html

class DataReader
{
public:
    .........
    BlockingQueue<Datum*>& free() const  { return ptr_pair->free; }
    BlockingQueue<Datum*>& full() const  { return ptr_pair->full; }
    .........
};

生產者DataReader自己繼承了線程DragonThread,在其異步的線程工做函數中interfaceKernel()中,數據庫

不斷地從pair的Free阻塞隊列取出空Datum,在read_one()用KV數據庫內容填充,再塞到Full隊列中,如圖:數組

做爲消費者(DataLayer),在從Datum得到數據後,當即作一份Copy,再把Datum塞回到Free隊列中,繼續生產。網絡

整個過程就好像是一個工廠生產的循環鏈,Datum就比如一個包裝盒。異步

生產者將產品放置其中,傳遞包裝盒給消費者。消費者從中取出產品,讓生產者回收包裝盒。async

回憶: 變形者加工產品接口

在第拾章,IO系統(三)中,變形者DataTransformer提供了數據變形的基接口:函數

void DataTransformer<Dtype>::transform(const Datum& datum, Dtype* shadow_data)

仔細觀察一下transform的兩個參數,你會發現整個transform過程,就是將Datum數據Copy到shadow_data的數組裏。fetch

這就是上節提到的「Copy」過程——從包裝盒中取出產品,再變形加工。this

加工放置的數組,之因此叫shadow_data,是由於它映射的是一個Blob的局部內存。spa

回憶一下Blob的shape,[batch_size,channels,height,width],即可知,一個Datum僅僅是一個Blob的1/batch_size。

讓Transformer對映射的內存處理,避免了直接對Datum變形的不便。映射的內存空間,就是最終成品的實際空間,如圖:

二級封裝:從Datum到Blob

在上圖中,Transformer提供了一個由Datum堆砌成Blob的途徑。

咱們只須要給Transformer提供Datum元素,以及一段內存空間(數組首指針)便可。

爲了保證內存空間提供的正確性,有兩點須要保障:

①每一個Datum在Blob的偏移位置必須計算出來,第玖章BlobFlow給了一點偏移的思路,

只要偏移offset=Blob.offset(i)便可,i 爲一個Batch內的樣本數據下標。

②內存空間,也就是Blob具體的shape必須提早計算出來,並且必須啓動SyncedMemory自動機,分配實際內存。

 

考慮一個Blob的shape,[batch_size,channels,height,width],後三個shape均可以由Datum推斷出來。

至於batch_size,是一個由使用者提供的超參,能夠根據網絡定義直接獲取。

由Datum推理channe/height/width,由DataTransformer的inferBlobShape完成,在第拾章IO系統(三)已經給出。

二級生產者

第捌章IO系統(二)介紹了LayerParameter中的prefetch概念。

在構造一個DataReader時,指定了默認Pair的緩衝區大小:

DataReader::DataReader(const LayerParameter& param){
    ptr_pair.reset(new QueuePair(
        param.data_param().prefech()*param.data_param().batch_size()));
    ........
}

total_size=prefetch*batch_size

這個大小代表了DataReader須要預緩衝prefetch個Batch,每一個Batch有batch_size個Datum單元。

在 單生產者單緩衝區 一節的最後,討論了多GPU下,如何使用單Pair的補救措施:

這多是Caffe源碼的本意。在這種方案中,DataReader和DataLayer是無須改動代碼的。

只要咱們加大DataParameter裏的prefech數值,讓CPU多緩衝幾個Batch,爲多個GPU準備就行了。

prefetch的數量由用戶指定,並且也是一個上界,並且顯然不能所有將整個KV數據庫prefetch完。

因而,以Batch爲單位的二級封裝,須要一個二級生產者和消費者,並且一樣是異步的,如IO系統(二)的圖:

二級生產者,在Caffe裏就是DataLayer衍生的線程。二級消費者,偏偏就是DataLayer自己。

DataLayer集二級生產者與消費者於一體,這歸功於面向對象技術的多重繼承。

類繼承體系

最終使用的是DataLayer,被拆解成3個類BaseDataLayer、BasePrefetchingDataLayer、DataLayer。

三個類負責不一樣的任務,你也能夠整合在一塊兒寫。

構造函數執行順序與二級生產者預緩衝流程

二級C++最喜歡考繼承類的執行順序,固然,這裏搞清楚這點相當重要。

除了基本的類構造函數外,咱們還須要考慮Layer類setup的具體函數layerSetup。

幾個DataLayer的layerSetup至關混亂,幾乎每一個都各司其職,①②③④順序不能顛倒。

完成所有setup以後,才能讓二級生產者工做。

生產單位以一個Batch爲單元,每一個Batch包含DataBlob和LabelBlob(可選)。

二級生產緩衝區

二級緩衝區構建於BasePrefetchingDataLayer類中。

template<typename Dtype>
class BasePrefetchingDataLayer :public BaseDataLayer<Dtype>,public DragonThread {
public:
    .......
    const int PREFETCH_COUNT;
protected:
    .......
    Batch<Dtype>* prefetch;
    BlockingQueue<Batch<Dtype>*> free;
    BlockingQueue<Batch<Dtype>*> full;
};

產能上界由常數PREFETCH_COUNT指定,來源於proto參數DataParamter裏prefetch大小。

在BasePrefetchingDataLayer構造函數中,用new申請等量的堆內存prefetch。

注意這裏不要使用shared_ptr,比較麻煩,並且Batch有可能會被智能指針提早釋放,應當手動析構。

能夠看到,默認提供了和DataReader相似的消費者接口free/full,不過這消費的是Batch,而不是Datum。

沒有用函數封裝,是由於DataLayer本身生產,本身消費。

二級生產

同DataReader的一級生產相似,二級生產須要從free隊列pop,填充,再塞入full。

生產過程於BasePrefetchingDataLayer的interfaceKernel函數中。

因爲多重繼承的關係,interfaceKernel函數來自父類DragonThread

template<typename Dtype>
void BasePrefetchingDataLayer<Dtype>::interfaceKernel(){
    try{
        while (!must_stop()){
            Batch<Dtype> *batch = free.pop(); //batch has already reshape in dataLayerSetup
            loadBatch(batch); // pure abstract function
            full.push(batch); //product
        }
    }
    catch (boost::thread_interrupted&) {}
}

loadBatch函數與DataReader的read_one函數效果相似,負責填充batch。

二級生產與異步流同步

第貳章主存模型末尾介紹了SyncedMemory異步提交顯存Memcpy的方法。

第玖章BlobFlow中,且已知SyncedMemory隸屬於Blob的成員變量。

當數據緩衝至Blob級別時,就須要考慮提早向顯存複製數據了。

第陸章IO系統(一)開頭給了這張圖:

能夠看到,DataLayer處於CPU與GPU的分界點,DataLayer源輸入由CPU主控,存於內存。

而DataLayer的下一層是計算層,源輸入必須存於顯存。

因而,儘管DataLayer的前向傳播函數forward(bottom,top)只是複製數據,可是更重要的是轉換數據。

在上一節的CPU異步線程工做函數interfaceKernel中,咱們能夠看到,Batch(Blob)級別已經構成,

而此時整個神經網絡Net可能正在初始化,距離Net正式啓動前向傳播函數Net.forward(),須要顯存數據,還有一段時間。

利用這段時間,能夠利用CUDA的異步流預先由內存向顯存轉換數據,據此,完善interfaceKernel函數:

template<typename Dtype>
void BasePrefetchingDataLayer<Dtype>::interfaceKernel(){
    //    create GPU async stream
    //    speed up memcpy between CPU and GPU
    //    because cudaMemcpy will be called frequently 
    //    rather than malloc gpu memory firstly(just call cudaMemcpy)
#ifndef CPU_ONLY
    cudaStream_t stream;
    if (Dragon::get_mode() == Dragon::GPU)
        CUDA_CHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
#endif
    try{
        while (!must_stop()){
            Batch<Dtype> *batch = free.pop(); //batch has already reshape in dataLayerSetup
            loadBatch(batch); // pure abstract function
#ifndef CPU_ONLY
            if (Dragon::get_mode() == Dragon::GPU){
                batch->data.data()->async_gpu_data(stream);
                // blocking this thread until host->device memcpy finished
                CUDA_CHECK(cudaStreamSynchronize(stream));
            }
#endif
            full.push(batch); //product
        }
    }
    catch (boost::thread_interrupted&) {}
    //    destroy async stream
#ifndef CPU_ONLY
    if (Dragon::get_mode() == Dragon::GPU) CUDA_CHECK(cudaStreamDestroy(stream));
#endif
}

使用異步流,須要用cudaStreamCreateWithFlags申請Flag爲cudaStreamNonBlocking的流。

cudaStreamNonBlocking的值爲0x1,表明此流非默認Memcpy流(默認流)

與之相對的是Flag爲cudaStreamDefault的流,值爲0x0,這是主複製流,cudaMemcpy的任務所有提交於此。

使用Blob內提交異步流的函數async_gpu_data(stream)[稍後給出]後,須要當即阻塞(同步)該CPU線程。

使用cudaStreamSynchronize(stream),直到GPU返回複製完畢信號以前,CPU一直同步在本行代碼。

最後,須要釋放異步流。

二級消費者

即DataLayer的forward函數。

因爲大量工做已經在父類中作完,DataLayer的消費函數相對簡單。

template <typename Dtype>
void DataLayer<Dtype>::forward_cpu(const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top){
    // consume
    Batch<Dtype> *batch = full.pop("DataLayer prefectching queue is now empty");
    dragon_copy<Dtype>(batch->data.count(), top[0]->mutable_cpu_data(), batch->data.cpu_data());
    if (has_labels)
        dragon_copy(batch->label.count(), top[1]->mutable_cpu_data(), batch->label.cpu_data());
    free.push(batch);
}

直接訪問full隊列獲取一個可用的Batch,完成消費。

將batch數據(data/label)分別複製到top裏,完成Blob的Flow,提供給下一層計算。

template <typename Dtype>
void DataLayer<Dtype>::forward_gpu(const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top){
    Batch<Dtype> *batch = full.pop("DataLayer prefectching queue is now empty");
    dragon_gpu_copy(batch->data.count(), top[0]->mutable_gpu_data(), batch->data.gpu_data());
    if (has_labels)
        dragon_gpu_copy(batch->label.count(), top[1]->mutable_gpu_data(), batch->label.gpu_data());
    free.push(batch);
}

 GPU版本,直接替換copy函數爲GPU版本便可。

(注:Caffe在forward_gpu()最後,對默認流的強制同步是沒有必要的。

     Memcpy也自己不是異步執行,不須要額外同步。對默認流同步,也不會影響異步流)

相關文章
相關標籤/搜索