從零開始山寨Caffe·捌:IO系統(二)

生產者

雙緩衝組與信號量機制

在第陸章中提到了,如何模擬,以及取代根本不存的Q.full()函數。git

其本質是:除了爲生產者提供一個成品緩衝隊列,還提供一個零件緩衝隊列。github

當咱們從外部給定了固定容量的零件以後,生產者的產能就受到了限制。數據庫

由兩個阻塞隊列組成的QueuePair,並非Caffe的首創,它其實是生產者與消費者的編程方式之一。
編程

在大部分操做系統教材中,雙緩衝區free、full一般由兩個信號量empty、full實現。數組

 

信號量(Semaphore)由操做系統底層實現,而且幾乎沒有人會直接使用信號量去編程。數據結構

由於在邏輯上,能夠由信號量可由mutex+計數器模擬獲得。負載均衡

信號量的名字頗有趣,它實際上由兩部分組成,信號(激活信號)、量(計數器)。機器學習

漢語的博大精深恰當地詮釋的信號量的語義精神,而從Semaphore中,你讀不出任何精華。函數

激活信號掩蓋了mutex的功與名,信號量的第一大功能,就是mutex鎖。學習

量,顯然代表信號量能夠計數,實際上,信號量常常會被拿來爲臨界資源計數。

下面的僞代碼摘自個人操做系統課本,《計算機操做系統 <第四版> 湯小丹等 著》:

int in=0,out=0;
item buffer[n];
semaphore mutex=1,empty=n,full=0;
void wait(S){
    while(S<=0);
    S--;
}
void signal(S) {S++;}
void producer{
    while(1){
        produce an item in nexp;
        ...
        wait(empty);
        wait(mutex);
        buffer[in]=nexp;
        in=(in+1)%n;
        signal(mutex);
        signal(full);
    }
}

能夠看到,除了mutex履行其互斥鎖的職責以外,empty和full用來計數。

做爲生產者,每次生產時,都要讓empty減1,讓full加1。

當empty小於等於零時,造成第二把鎖,固然,這把鎖不是爲了互斥,只是爲了阻塞。

爲了增長效率,這第二把鎖能夠修改爲條件阻塞,讓生產者交出CPU控制權,固然這須要操做系統的支持。

 

信號量在現代編程中是多餘的,事實上,也沒有哪一個線程庫會提供。

當"量"爲1時,信號量一般是去實現互斥鎖功能。

當"量"爲臨界資源數量時,信號量一般是去實現資源計數、而且條件阻塞的功能。

這兩部分的精神內涵都在Blocking Queue中實現了,So,忘記信號量吧。

多生產者單緩衝區

做爲通常的機器學習玩家,你是用不着考慮多生產者的。

若是你比較有錢,常常喜歡擺弄4-way泰坦交火,那麼就須要考慮一下多生產者的模型了。

在第肆章中,介紹了多GPU的基本運行原理,給出了以下這張圖:

對於每一個GPU而言,它至少須要一個對它負責的DataReader,每一個DataRedaer應當有不一樣的數據來源。

Caffe中,將控制一個數據來源的類對象稱爲Body,默認有一個類靜態成員的Body關聯容器:

class DataReader
{
public:
    .....
private:
    static map<string, boost::weak_ptr<Body> > global_bodies;
};

值得注意的是,此處應該使用weak_ptr,而不是shared_ptr,由於Body自己將由一個shared_ptr控制。

將Body的shared_ptr存入map容器,將會致使指針計數器永遠爲1。

這樣,當咱們準備將Body從map容器中清除時,沒法獲知它是否已經被釋放。

而weak_ptr指向shared_ptr時,不會增長指針計數器計數,當計數爲0時,便可將其從map裏清除。

每個DataReader只能擁有一個Body,而每一個Body能夠有多個成品存儲緩衝區(非用於零件緩衝,下節講)。

每一個Body控制一個數據來源,不一樣的數據來源能夠用關鍵字來hash,默認Caffe提供的關鍵字是:

static string source_key(const LayerParameter& param){
    return param.name() + ":" + param.data_param().source();
}

即Layer名,加上數據庫路徑。

多生產者主要用於多數據庫同時並行訓練,這是一種很是經典的模型。

一部分代碼涉及到上層的DataLayer,將後續詳解。

另一種模型是單生產者,以單數據庫,不一樣數據區域同時並行訓練,該方法也能夠採用。(下節講)

Caffe的默認源碼中,既沒有完整實現多生產者並行模型,也沒有完整實現單生產者並行模型,這點使人遺憾。

不過,從源碼中仍然能夠看出一點端倪,本教程只介紹大致思路,一樣並不提供具體代碼。

單生產者多緩衝區

在這種模型下,將只有一個DataReader,一個Body,可是有多個Pair,如圖:

有趣的是,Body結構體中,提供了QueuePair數組容器:

class Body :public DragonThread{
public:
        .......
    BlockingQueue<boost::shared_ptr<QueuePair> > new_pairs;
};

可是,Caffe源碼中的DataReader,默認只會使用該容器數組的第一個QueuePair,並無完整實現多緩衝區:

class DataReader
{
public:
    DataReader(const LayerParameter& param){
           ........
         ptr_body->new_pairs.push(ptr_pair);
    }
    BlockingQueue<Datum*>& free() const  { return ptr_pair->free; }
    BlockingQueue<Datum*>& full() const  { return ptr_pair->full; }
private:
    boost::shared_ptr<QueuePair> ptr_pair;
    boost::shared_ptr<Body> ptr_body;
};

能夠看到,儘管咱們設置了Body,存儲多個QueuePair,可是提供的外部訪問接口,竟然直接使用了ptr_pair。

固然,若是你要編程使用多緩衝區,必定要修改DataReader的訪問接口。

對於單個數據庫的順序數據讀取,如何將順序資源,平攤到多個緩衝區?

Caffe使用了循環讀取法:

void Body::interfaceKernel(){
    boost::shared_ptr<DB> db(GetDB(param.data_param().backend()));
    db->Open(param.data_param().source(), DB::READ);
    boost::shared_ptr<Cursor> cursor(db->NewCursor());
    vector<boost::shared_ptr<QueuePair> >  container;
    try{
         ...............
         while (!must_stop()){
            for (int i = 0; i < solver_count; i++) 
                read_one(cursor.get(), container[i].get());
        }
    } catch (boost::thread_interrupted&) {}
}

能夠看到,在Body的線程函數中,利用全局管理器提供的solver_count,循環均攤數據到多個QueuePair中。

當你將solver_count設置成大於1時,將可使用Body中的多個緩衝區QueuePair,這點須要注意。

單生產者單緩衝區(默認代碼)

仔細思考一下,就會發現,單生產者多緩衝區方案是毫無心義的,看起來咱們彷佛模擬了多緩衝區。

可是實質只是一個線程,把資源分了一下組,多個組在DataLayer進行消費的時候,又會被合併成一個Batch:

如圖,由於一個DataLayer只能有一個Prefetching Thread,因此必然是每次從各個Pair裏取一次。

若是咱們先把Pair0取完,再取Pair1,再取Pair2,這樣也是能夠的,是一種不錯的shuffle,可是須要追加代碼。

從計算角度分析,多緩衝區不會加速,反而會減速,若是是爲了作上述的shuffle,是情有可原的。

若是不是,只是單純地爲了負載均衡,輪流從各個Pair裏取,那麼本質上,就會退化成單生產者單緩衝區。

————————————————————————————————————————————————————

這多是Caffe源碼的本意。在這種方案中,DataReader和DataLayer是無須改動代碼的。

只要咱們加大DataParameter裏的prefech數值,讓CPU多緩衝幾個Batch,爲多個GPU準備就行了。

三種速度方案排名:

多生產者單緩衝區>單生產者單緩衝區>單生產者多緩衝區

線程嵌套線程與Socket

Caffe的源碼真的頗有啓發性,在DataReader的構造和析構函數中,能夠發現貢獻者悄悄加了mutex:

DataReader::DataReader(const LayerParameter& param){
    ......
    boost::mutex::scoped_lock lock(bodies_mutex);
        ......
}

DataReader::~DataReader(){
        ......
    boost::mutex::scoped_lock lock(bodies_mutex);
        ......
}

熟悉C++的人應該知道,在常規狀況下,構造和析構函數是不會並行執行的,也就是不會被線程執行。

線程並行的僅僅是工做函數,工做以前主進程構造,工做以後,主進程析構。

若是偏要認爲構造和析構可能並行的話,那麼將出現一種好玩的狀況:

因爲DataReader自己是線程,線程並行線程,將致使線程嵌套線程。

在個人操做系統課上,個人老師這麼說:

線程僅僅擁有進程的少部分資源,權限很小。

那麼線程可以嵌套線程麼?通過百度以後,我發現真還能夠。

當今的操做系統,不管是Linux,仍是Windows,線程的資源權限都是很是大的。

————————————————————————————————————————————————————

線程嵌套線程,會不會和多GPU有關?我認爲無關。

每一個GPU的監督線程,這裏咱們假設使用DragonThread,在須要工做時,

只須要傳入:Solver::solve函數就能夠了,Solver、Net、Layer的構造和析構,顯然是在主進程裏執行的。

那麼,線程嵌套線程,有什麼意義,有什麼狀況是必須在線程裏觸發構造函數?

頗有趣,通常來說,只有Socket線程是這樣的。

Socket線程無須使用DragonThread,實際上,Boost的Socket也是由boost::asio而不是boost::thread實現的。

不像多GPU,咱們沒法預估,在某一時刻,實際有多少個Socket在執行,有多少個用戶發出了訪問請求。

所以,不能直接把Solver、Net、Layer的構造,放在主進程當中。否則你知道你要構造多少份嘛?顯然你不知道。

因此,從直覺上,將這些的構造,放在每個啓動的Socket線程裏,用多少,構造多少,看起來不錯,如圖:

 

這樣,假如這幾個Solver使用了不一樣數據來源,那麼global_bodies就有被幾個Solver同時修改的可能。

這是構造和析構函數裏,須要加mutex的直接緣由。

————————————————————————————————————————————————————

Socket的意義何在?

①從訓練角度,多個用戶能夠遠程操控一臺主機,訓練不一樣的Net。

這點與多GPU訓練一個模型是不同的。通常而言,咱們不會認爲,多個用戶經過Socket,竟然想要訓練同一個模型。

固然,這也是能夠的。

②從測試角度,多個用戶,能夠利用同一個Net的參數,並行獲得本身提供的數據的測試結果。

注意,這樣就不要share整個Net,每一個用戶的solver使用獨立的Net,獨立讀取訓練好的參數。

不然,多個用戶會在一個Net上卡半天。

代碼實戰

創建data_reader.hpp、data_reader.cpp。

QueuePair

class QueuePair{
public:
    QueuePair(const int size);
    ~QueuePair();
    BlockingQueue<Datum*> free; // as producter queue
    BlockingQueue<Datum*> full; // as consumer queue
};

QueuePair的結構在上一章已經介紹過,每個QueuePair將做爲一個緩衝區。

QueuePair只須要實現構造函數和析構函數:

QueuePair::QueuePair(const int size){
    // set the upbound for a producter
    for (int i = 0; i < size; i++) free.push(new Datum());
}

QueuePair::~QueuePair(){
    // release and clear
    Datum *datum;
    while (free.try_pop(&datum)) delete datum;
    while (full.try_pop(&datum)) delete datum;
}

在構造函數中,咱們進行"零件"的填充,注意裏面的Datum全是空元素,且存入隊列的應該是指針。

切記勿存入實體對象Datum,這在應用程序開發中是大忌,由於C++並不是Python,默認執行的深拷貝。

深拷貝大內存數據結構體,會嚴重拖慢程序執行,並且仍是沒有意義的,傳遞指針更恰當。

在析構函數中,實際上這是惟一一處對Protocol Buffer對象的主動析構,由於Datum沒有用shared_ptr。

主動析構主要利用Blocking Queue提供的try,來控制循環進度。

此處切記不要把pop寫成peek,不然會形成對空指針的delete,致使程序崩潰。

LayerParameter

DataReader的上層是DataLayer,它是DataLayer的成員變量之一,須要DataLayer提供proto參數。

在你的proto腳本中,追加以下項:

message DataParameter{
    enum DB{
        LEVELDB=0;
        LMDB=1;
    }
    optional string source=1;
    optional uint32 batch_size=2;
    optional DB backend=3 [default=LMDB];
    //4-way pre-buffering is enough for normal machines
    optional uint32 prefech=4 [default=4];
}

message LayerParameter{
    optional string name=1;
    optional string type=2;
    optional DataParameter data_param=8;
}

從新編譯後,覆蓋你的舊頭文件和源文件。

DataParameter中,包含:數據庫源路徑、batch大小、數據庫類型,以及預緩衝區大小。

比較特別的是預緩衝大小,默認是開4個Batch的預緩衝。若是你的GPU計算速度過快,明顯大於

CPU供給數據的速度,消費者(DataLayer)常常提示缺數據,你得考慮加大預緩衝區數量。

將DataParameter嵌入到LayerParameter中去。

LayerParameter是一個巨型的數據結構,將包含全部類型Layer的超參數,你能夠將其視爲基類。

Body

class Body :public DragonThread{
public:
    Body(const LayerParameter& param);
    virtual ~Body();
    vector<boost::shared_ptr<QueuePair>> new_pairs;
protected:
    void interfaceKernel(); 
    void read_one(Cursor *cursor, QueuePair *pair);
    LayerParameter param;
};

Body其實是一個線程,而DataReader卻不是,儘管Body是DataReader成員變量。

Body的構造函數和析構函數就是啓動線程和中止線程:

Body::Body(const LayerParameter& param) :param(param) { startThread();}
Body::~Body() { stopThread();}

線程工做函數比較複雜:

void Body::interfaceKernel(){
    boost::shared_ptr<DB> db(GetDB(param.data_param().backend()));
    db->Open(param.data_param().source(), DB::READ);
    boost::shared_ptr<Cursor> cursor(db->NewCursor());
    try{
        //    default solver_count=1
        int solver_count = param.phase() == TRAIN ? Dragon::get_solver_count() : 1;
        //    working period
        while (!must_stop()){
            for (int i = 0; i < solver_count; i++) 
                read_one(cursor.get(), new_pairs[i].get());
        }
        //  complex condition
    } catch (boost::thread_interrupted&) {}
}

該函數將會一直卡在循環裏,直到訓練結束,Body執行析構函數,將線程執行中止。

Body-DataReader構成了Caffe數據緩衝的第一級別:數據庫->Datum

在DataLayer中,還會進行第二級別的緩衝:Datum->Blob->Batch,將在後續分析。

最後,還剩下一個read_one函數:

void Body::read_one(Cursor *cursor, QueuePair *pair){
    Datum *datum = pair->free.pop();
    datum->ParseFromString(cursor->value());
    pair->full.push(datum);
    cursor->Next();
    if (!cursor->valid()){
        DLOG(INFO) << "Restarting data prefeching from start.\n";
        cursor->SeekToFirst();
    }
}

read_one每次從一個雙緩衝組的free隊列中取出空Datum指針。

利用Protocol Buffer的反序列化函數ParseFromString,從數據庫中還原Datum,再扔到full隊列裏。

感謝Protocol Buffer,不然這部分的代碼估計不下200行。

當數據庫跑完以後,須要回到開頭,再次重讀,爲迭代過程反覆提供數據。

這一步只適合訓練過程,若是你要一次測試本身的數據,請忘記這個函數,重寫一個不要反覆讀的版本。

DataReader

class DataReader
{
public:
    DataReader(const LayerParameter& param);
    BlockingQueue<Datum*>& free() const  { return ptr_pair->free; }
    BlockingQueue<Datum*>& full() const  { return ptr_pair->full; }
    ~DataReader();
    static string source_key(const LayerParameter& param){
        return param.name() + ":" + param.data_param().source();
    }
private:
    LayerParameter param;
    boost::shared_ptr<QueuePair> ptr_pair;
    boost::shared_ptr<Body> ptr_body;
    static map<string, boost::weak_ptr<Body> > global_bodies;
};

該結構上文已經全面解析過。

在cpp的實現中,首先完成類靜態成員變量的外部初始化。

map<string, boost::weak_ptr<Body> > DataReader::global_bodies;

以及一個靜態mutex的定義:

static boost::mutex bodies_mutex;

該mutex是Caffe挖的坑之一,雖然默認不會生效,卻是給出了不錯的指導。

當構建多生產者單緩衝區時,咱們將會有多個Body,即多個DataReader,即多個DragonThread。

這意味着,Body的Hash容器將成爲一個互斥資源。

該Hash容器的存在不是沒有必要的,因爲:

每一個數據來源只能用一次,爲了不重複路徑,顯然須要Hash。

DataReader::DataReader(const LayerParameter& param){
    ptr_pair.reset(new QueuePair(
        param.data_param().prefech()*param.data_param().batch_size()));
    boost::mutex::scoped_lock lock(bodies_mutex);
    string hash_key = source_key(param);
    boost::weak_ptr<Body> weak = global_bodies[hash_key];
    ptr_body = weak.lock();
    if (!ptr_body){
        ptr_body.reset(new Body(param));
        global_bodies[hash_key] = boost::weak_ptr<Body>(ptr_body);
    }
    ptr_body->new_pairs.push(ptr_pair);
}

DataReader的構造函數首先根據用戶指定的預緩衝區大小,初始化默認的雙緩衝隊列組。

接下來,要在Body的Hash容器中登記,mutex鎖住,修改以後解鎖。

登記所使用的是weak_ptr,weak_ptr可看做shared_ptr的助手,一般視爲觀察者(Viewer)。

不可以使用->,只能調用lock函數得到shared_ptr。

DataReader的析構,主要任務是析構Body,以及從Hash容器中反登記。

DataReader::~DataReader(){
    string hash_key = source_key(param);
    ptr_body.reset();
    boost::mutex::scoped_lock lock(bodies_mutex);
    if (global_bodies[hash_key].expired()) global_bodies.erase(hash_key);
}

析構體系

DataReader中涉及幾個比較重要的析構,這裏以圖描述下:

完整代碼

data_reader.hpp

https://github.com/neopenx/Dragon/blob/master/Dragon/data_include/data_reader.hpp

data_reader.cpp

https://github.com/neopenx/Dragon/blob/master/Dragon/data_src/data_reader.cpp

相關文章
相關標籤/搜索