從零開始山寨Caffe·陸:IO系統(一)

你說你學過操做系統這門課?寫個無Bug的生產者和消費者模型試試!git

                              ——你真的學好了操做系統這門課嘛?程序員

 

在第壹章,展現過這樣圖:github

其中,左半部分構成了新版Caffe最惱人、最龐大的IO系統。編程

也是從來最不重視的一部分。數組

第伍章又對左半部分的獨立性進行了分析,我是這麼描述到:網絡

Datum和Blob(Batch)不是上下文相關的。數據結構

Blob包含着正向傳播的shape信息,這些信息只有初始化網絡在初始化時才能肯定。多線程

而Datum則只是與輸入樣本有關。機器學習

因此,Datum的讀取工做能夠在網絡未初始化以前就開始,這就是DataReader採用線程設計的內涵。異步

因此,左半部分又能夠分爲左左半部分,和左右半部分。

阻塞隊列

生產者與消費者

第伍章講到,在一個機器學習系統中,生產者和消費者的執行週期是不同的。

爲了平衡在週期上的差別,節約計算資源,咱們顯然須要對生產者作必定限制。

存儲生產資源,能夠用數組,也能夠用STL容器。

再考慮生產者和消費者的行爲:

①不存在隨機訪問:

顯然,消費者是按照固定順序訪問緩衝區的。

咱們沒有必要考慮隨機訪問的狀況。

②不存在隨機寫入:

顯然,生產者每次只須要將資源放置於緩衝區兩端。

咱們沒有必要考慮在線性表中間位置寫入的狀況。

因爲vector底層由順序表實現,其訪問速度隨着元素數量的遞增而遞減,

而queue底層由鏈式表實現,其訪問速度不隨元素數量的遞增而遞減,且沒有隨機寫入/訪問的狀況。

因此,選擇queue做爲緩衝區是比較優異的。

爲了限制生產者的行爲,咱們須要在STL提供的queue基礎上,改進出一種新的數據結構——Blocking Queue。

互斥鎖

第肆章簡單提到了mutex問題,這是阻塞隊列除了Blocking以外,須要考慮的第二大問題。

而且已經證實了:生產者和消費者之間必然是異步的。

咱們以隊列的push和pop操做爲例,分析一下,爲何在多線程狀況下,須要加mutex。

假設線程A預備執行push操做,因此它是一個生產者;

假設線程B預備執行pop操做,因此它是一個消費者;

設有臨界緩衝區隊列Q,在某時刻T,線程A發出push操做,在T+1時候,線程B發出pop操做,

且push須要10個單位時間,pop只須要一個單位時間,問T+2時刻,pop出去的資源你敢用嘛?

顯然,沒人敢用這個執行push的半成品。

發生上述問題的癥結在於,兩個異步線程對於同一個資源,產生了爭奪行爲。

解決方案就是:在push時,鎖住資源,禁止pop;在pop時,鎖住資源,禁止push。

廣義上,咱們能夠認爲,須要將push和pop函數變成原子函數,即:執行期間不可中斷的函數。

———————————————————————————————————————————————————————————

另外,須要注意的是,mutex與blocking是兩個概念。

在廣義上,mutex會將多個線程對同一個資源的異步並行操做,拉成一個串行執行隊列,串行等待執行。

而blocking則是將線程休眠,CPU會暫時放棄對其控制。

在程序員界,雖然有時候會把mutex和blocking都稱爲阻塞,但其原理和內涵是徹底不一樣的。

———————————————————————————————————————————————————————————

boost提供不俗的mutex功能,使用前須要 #include "boost/thread/mutex.hpp"

你能夠將一個boost::mutex對象嵌入到一個類當中,這樣,容許每個類對象擁有一把鎖。

因爲對一個queue對象,主要是鎖住來自該對象的push和pop操做,

因此,mutex理應當是以類對象爲一個單位的,參考代碼以下:

template <typename T>
class BlockingQueue{
public:
    void push(const T& t){
        boost::mutex::scoped_lock lock(mutex);
        Q.push(t);
    }
    T pop(){
        boost::mutex::scoped_lock lock(mutex);
        T t = Q.front();
       Q.pop();
    return t;
    }
private:
    boost::mutex mutex;
    queue<T> Q;
};    

boost::mutex::scoped_lock lock提供局部鎖定功能。

它與boost::scoped_ptr有相似的效果,scoped_ptr在做用域結束後,就當即釋放對象。

scoped_lock在做用域結束後,會當即解鎖,若是不用scoped_lock,咱們能夠這麼寫:

void push(const T& t){
        mutex.lock();
        Q.push(t);
        mutex.unlock();
}

條件阻塞與激活

前面幾章說了那麼久的阻塞,其中大部分指的應該是blocking。

mutex大部分狀況下,都只是在鎖一個局部函數,阻塞週期很是短。

惟一的例外是Layer的正向傳播函數forward,mutex鎖住的週期很是長。

blocking和mutex的惟一不一樣在於:

blocking以後,操做系統會挑撥CPU放棄對線程的處理。

這是很是危險的一個行爲,由於該線程被家長趕去睡覺了,並且不能反抗家長的命令。

除非家長通知它:噢,你能夠活動了。在此以前,該線程將永遠處於無效狀態。

上面的例子有兩個重點:

①CPU放棄線程

②不可主動激活

既然如此,爲了激活這個線程,模型就必須設計成「對偶模型」,而生產者和消費者,偏偏正是對偶的。

———————————————————————————————————————————————————————————

boost::condition_variable提供了簡單的blocking功能,爲了統一控制,能夠將其與mutex捆在一塊兒:

template <typename T>
class BlockingQueue
{
public:
    class Sync{
    public:
        boost::mutex mutex;
        boost::condition_variable condition;
    };
private:
    queue<T> Q;
    boost::shared_ptr<Sync> sync;
};

如今考慮一下,什麼時候須要註銷、阻塞一個線程,大體有兩種狀況:

①緩衝區空,此時消費者不能消費,拒絕pop操做以後,能夠交出CPU控制權。

②緩衝區滿,此時生產者不能生產,拒絕push操做以後,能夠交出CPU控制權。

爲了激活彼此,就須要模型是對偶的:

①經歷緩衝區空以後,忽然push了一個元素,此時應當由生產者激活消費者線程。

②經歷緩衝區滿以後,忽然pop了一個元素,此時應當由消費者激活生產者線程。

看起來,咱們能夠將代碼寫成這樣:

void BlockingQueue<T>::push(const T& t){
    boost::mutex::scoped_lock lock(sync->mutex);
    while (Q.full()){
        sync->condition.wait(lock); //suspend, spare CPU clock
    }
    Q.push(t);
    sync->condition.notify_one();
}

template<typename T>
T BlockingQueue<T>::pop(const string& log_waiting_msg){
    boost::mutex::scoped_lock lock(sync->mutex);
    while (Q.empty()){
        sync->condition.wait(lock); //suspend, spare CPU clock
    }
    T t = Q.front();
    Q.pop();
    sync->condition.notify_one();
    return t;
}

其中,sync->condition.wait(lock)表示使用當前mutex爲標記,交出CPU控制權。

sync->condition.notify_one()則表示激活一個線程的CPU控制權。

能夠看到,blocking和activating的代碼是徹底對偶的,blocking本身,activating對方。

雙阻塞隊列

上節代碼是不可能實現的,由於沒有Q.full()這個函數。

在傳統生產者、消費者程序中,一般會使用單緩衝隊列。

使用單緩衝隊列是沒有問題的,由於在這種簡單的代碼結構中,咱們很容易知道緩衝隊列的上界。

好比,設定緩衝隊列大小爲20,在編程中,能夠經過檢測 if(count==20)來達到。

當代碼結構複雜時,好比,緩衝隊列大小變量一般在很是上層上層上層的位置,而處於底層的緩衝隊列,

是沒法探知何謂「緩衝隊列滿」的含義的,這就爲編程帶來很大的難題。

———————————————————————————————————————————————————————————

解決方案是,使用雙緩衝隊列組方案,咱們設定兩個阻塞隊列,一個叫free,一個叫full。

二者組成一個QueuePair:

class QueuePair{
public:
    QueuePair(const int size);
    ~QueuePair();
    BlockingQueue<Datum*> free; // as producter queue
    BlockingQueue<Datum*> full; // as consumer queue
};

爲了不檢測緩衝隊列的上界,咱們能夠先放置與上界數量等量的空元素指針到free隊列。

每次生產者生產時,從free隊列中pop一個空Datum元素,填充,再扔進full隊列。

這樣,BlockingQueue的push操做就不須要檢測上界了。

原理很簡單,生產者想要push,以前必須pop,pop能夠經過檢測緩衝隊列空來實現。

這樣,就用檢測一個緩衝隊列的空,模擬且替代了檢測另外一個緩衝隊列的滿。

對於上層代碼而言,咱們僅僅須要預先填充N個元素至free隊列中便可,很是方便。

這部分是DataReader的設計核心。

代碼實戰

★數據結構

———————————————————————————————————————————————————————————

創建blocking_queue.hpp。

template <typename T>
class BlockingQueue
{
public:
    BlockingQueue();
    void push(const T& t); 
    T pop(const string& log_waiting_msg="");
    T peek();
    size_t size();
    // try_func return false when need blocking
    // try_func for destructor
    bool try_pop(T* t);
    bool try_peek(T* t);
    class Sync{
    public:
        boost::mutex mutex;
        boost::condition_variable condition;
    };
private:
    queue<T> Q;
    boost::shared_ptr<Sync> sync;
};
★class BlockingQueue

除了push和pop以外,追加隊列第三個經常使用操做——peek。

peek目的是取出隊首元素,可是不從隊列裏pop掉。

peek用於實驗性讀取Datum,爲DataTransfomer初始化所用。

除了經過返回值以外獲取以外,咱們還要準備try系列函數。

try除了獲取元素外,同時返回一個bool值,代表成功或者失敗。

主要用於對Datum的析構,這也是全部代碼裏,惟一一處對protobuff數值的析構。

★實現

———————————————————————————————————————————————————————————

創建blocking_queue.cpp。

總體代碼沒有什麼好說的,細節以及在上文講解了。

template<typename T>
BlockingQueue<T>::BlockingQueue() :sync(new Sync()) {}

template<typename T>
void BlockingQueue<T>::push(const T& t){

    //    function_local mutex and unlock automaticly
    //    cause another thread could call pop externally
    //    when this thread is calling push pop&peer at the same time

    boost::mutex::scoped_lock lock(sync->mutex);
    Q.push(t);

    //    must wake one opposite operation avoid deadlock
    //  formula: wait_kind_num = notify_kind_num
    //  referring Producter-Consumer Model and it's semaphore setup method
    sync->condition.notify_one();
}

template<typename T>
T BlockingQueue<T>::pop(const string& log_waiting_msg){
    boost::mutex::scoped_lock lock(sync->mutex);
    while (Q.empty()){
        if (!log_waiting_msg.empty()){ LOG_EVERY_N(INFO, 1000) << log_waiting_msg; }
        sync->condition.wait(lock); //suspend, spare CPU clock
    }
    T t = Q.front();
    Q.pop();
    return t;
}

template<typename T>
T BlockingQueue<T>::peek(){
    boost::mutex::scoped_lock lock(sync->mutex);
    while (Q.empty())
        sync->condition.wait(lock);
    T t = Q.front();
    return t;
}

template<typename T>
bool BlockingQueue<T>::try_pop(T* t){
    boost::mutex::scoped_lock lock(sync->mutex);
    if (Q.empty()) return false;
    *t = Q.front();
    Q.pop();
    return true;
}

template<typename T>
bool BlockingQueue<T>::try_peek(T* t){
    boost::mutex::scoped_lock lock(sync->mutex);
    if (Q.empty()) return false;
    *t = Q.front();
    return true;
}

template<typename T>
size_t BlockingQueue<T>::size(){
    boost::mutex::scoped_lock lock(sync->mutex);
    return Q.size();
}
實現

模板實例化

在第壹章,咱們提到了INSTANTIATE_CLASS(classname)宏的做用。

本段將重點解釋,出如今blocking_queue.cpp最後的實例化代碼。

模板機制與編譯空間

template<typename T>能夠說是整個Caffe裏出現頻率最高的代碼了。

C++編譯器有個好玩的特性,就是對於在cpp文件裏出現的模板定義代碼,

只檢查最基本的語法錯誤,好比標點符號之類的。甚至你把變量名拼錯了,編譯仍然能經過。

因此,我在最初山寨Caffe的時候,寫了一堆錯誤的代碼,編譯器都沒告訴我。

後來在醫院體檢時,偶然轉了幾圈,大概猜到了編譯器應該是爲模板代碼開了獨立的編譯檢查空間。

爲了便於理解,參考圖以下:

因爲C/C++是強類型檢查語言,類型檢查處於編譯先鋒位置。

而未肯定類型的模板定義代碼,將不會進行大部分詞法分析、語法分析、語義分析。

頭文件與源文件

奇怪的是,若是你將模板定義代碼寫在頭文件裏,那麼它就會被上升到普通編譯空間。

原理大體以下:

編譯器不會對未include的頭文件進行最終編譯。

這意味着,若是你要使用一個模板類型,好比A<int> a;

必然處於include下,此時必然是指定類型的,編譯器就沒必要將代碼push到模板空間。

或者,存在一種轉移,編譯器將定義代碼由模板空間轉到普通空間,進行下一步分析。

 

然而,若是咱們將模板定義代碼寫在源文件A.cpp裏,而後在B.cpp裏,使用A<int> a,

此時編譯器應該去哪裏找模板類A的定義代碼?按照編譯鏈追溯,應該是到A.hpp裏,

再由A.hpp,找到A.cpp。

這種思路在模板定義於A.cpp是不可能實現的,如圖所示:

這是兩種空間本質區別,因爲模板空間的分析沒有結束,C++不會讓你由hpp找到cpp中的定義代碼的。

實例化

爲了能讓編譯A.cpp時,從模板空間遷移到普通空間,咱們必須爲其提供明確的類型。

好比在blocking_queue.cpp的結尾,你應該添加如下代碼:

template class BlockingQueue<Batch<float>*>;
template class BlockingQueue<Batch<double>*>;
template class BlockingQueue < Datum* > ;
template class BlockingQueue < boost::shared_ptr<QueuePair> > ;

這四行代碼枚舉了BlockingQueue中可能出現的全部具體類型,此時編譯器纔會對A.cpp進行完整的編譯。

在common.hpp中的實例化宏則要簡單的多,

#define INSTANTIATE_CLASS(classname) \
  template class classname<float>; \
  template class classname<double>

該宏用於Blob、Layer、Net和Solver四大數據結構,由於它們的類型,除了float,就是double。

特殊化

模板機制中存在模板特殊化的概念,它在功能上等效於實例化。

模板特殊化在math_functions.cpp中將會大量存在。

好比此函數:

template<>
void dragon_cpu_gemm<double>(const CBLAS_TRANSPOSE transA, const CBLAS_TRANSPOSE transB,
    const int M, const int N, const int K, const double alpha, const double* A, const double* B,
    const double beta, double *C){
    int lda = (transA == CblasNoTrans) ? K : M;
    int ldb = (transB == CblasNoTrans) ? N : K;
    cblas_dgemm(CblasRowMajor, transA, transB, M, N, K, alpha, A, lda, B, ldb, beta, C, N);
}

注意實例化與特殊化template附近的區別,特殊化須要添加<>。

模板特殊化必需要明確給出指定類型的代碼,而實例化則沒必要給出。

模板實例化本質是模板特殊化的特例,條件是:全部類型,執行相同的代碼。

而這份相同的代碼,如下述形式給出:

template<typename T>
XXX<T>::Y(){
    ......
    ......
}

你能夠將實例化視爲聲明,特殊化視爲定義。

二者給出其一,就能讓編譯器完整編譯分離的模板定義代碼,前提是,必須寫在cpp文件中。

CUDA與NVCC編譯器

NVCC編譯cu文件時,會無視A.cpp裏的任何實例化、特殊化代碼。

Caffe中給出的解決方案是,追加對cu文件中函數的特別實例化。

由如下幾個宏實現:

#define INSTANTIATE_LAYER_GPU_FORWARD(classname) \
  template void classname<float>::forward_gpu( \
      const vector<Blob<float>*>& bottom, \
      const vector<Blob<float>*>& top); \
  template void classname<double>::forward_gpu( \
      const vector<Blob<double>*>& bottom, \
      const vector<Blob<double>*>& top);

#define INSTANTIATE_LAYER_GPU_BACKWARD(classname) \
  template void classname<float>::backward_gpu( \
      const vector<Blob<float>*>& top, \
      const vector<bool> &data_need_bp, \
      const vector<Blob<float>*>& bottom); \
  template void classname<double>::backward_gpu( \
      const vector<Blob<double>*>& top, \
      const vector<bool> &data_need_bp, \
      const vector<Blob<double>*>& bottom)

#define INSTANTIATE_LAYER_GPU_FUNCS(classname) \
  INSTANTIATE_LAYER_GPU_FORWARD(classname); \
  INSTANTIATE_LAYER_GPU_BACKWARD(classname)

更多參考

見CSDN板塊的討論:http://bbs.csdn.net/topics/380250382

完整代碼

blocking_queue.hpp

https://github.com/neopenx/Dragon/blob/master/Dragon/include/blocking_queue.hpp

blocking_queue.cpp

https://github.com/neopenx/Dragon/blob/master/Dragon/src/blocking_queue.cpp

相關文章
相關標籤/搜索