比特幣源碼分析:多線程檢查腳本

多線程腳本檢查啓動

多線程腳本檢查啓動代碼:c++

bool AppInitMain(Config &config, boost::thread_group &threadGroup, CScheduler &scheduler) {
    ...
    if (nScriptCheckThreads) {
        for (int i = 0; i < nScriptCheckThreads - 1; i++) {
            threadGroup.create_thread(&ThreadScriptCheck);
        }
    }
    ...
}
static CCheckQueue<CScriptCheck> scriptcheckqueue(128);     

void ThreadScriptCheck() {
    RenameThread("bitcoin-scriptch");
    scriptcheckqueue.Thread();  
}
複製代碼

AppInitMain 中根據選項,建立多個線程。 此處使用了boost的線程庫,在綁定的線程函數ThreadScriptCheck中,調用一個全局狀態的任務隊列scriptcheckqueue。每一個線程都去該隊列中去任務,當隊列中無任務可執行時,線程被條件變量阻塞。安全

任務隊列

任務隊列代碼:多線程

template <typename T> class CCheckQueue {
private:
    boost::mutex mutex;
    boost::condition_variable condWorker;
    boost::condition_variable condMaster;
    std::vector<T> queue;
    int nIdle;
    int nTotal;
    bool fAllOk;
    unsigned int nTodo;
    bool fQuit;
    unsigned int nBatchSize;
    bool Loop(bool fMaster = false);
public:
    //! Create a new check queue
    CCheckQueue(unsigned int nBatchSizeIn)
            : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), fQuit(false),
              nBatchSize(nBatchSizeIn) {}

    void Thread() { Loop(); }
    
    bool Wait() { return Loop(true); }
    
    void Add(std::vector<T> &vChecks) {
        boost::unique_lock<boost::mutex> lock(mutex);
    
        for (T &check : vChecks) {
            queue.push_back(std::move(check));
        }
    
        nTodo += vChecks.size();
        if (vChecks.size() == 1) {
            condWorker.notify_one();
        } else if (vChecks.size() > 1) {
            condWorker.notify_all();
        }
    }
    bool IsIdle() {
        boost::unique_lock<boost::mutex> lock(mutex);
        return (nTotal == nIdle && nTodo == 0 && fAllOk == true);
    }
    ~CCheckQueue() {}
}

bool CCheckQueue::Loop(bool fMaster = false){
    boost::condition_variable &cond = fMaster ? condMaster : condWorker;

    std::vector<T> vChecks; 
    vChecks.reserve(nBatchSize);
    unsigned int nNow = 0;      
    bool fOk = true;
    do {
        {
            boost::unique_lock<boost::mutex> lock(mutex);       
            // first do the clean-up of the previous loop run (allowing us
            // to do it in the same critsect) 
            if (nNow) {
                fAllOk &= fOk;
                nTodo -= nNow;
                if (nTodo == 0 && !fMaster)
                    // We processed the last element; inform the master it
                    // can exit and return the result 
                    condMaster.notify_one();
            } else {
                nTotal++;
            }
           
            while (queue.empty()) {
                if ((fMaster || fQuit) && nTodo == 0) {
                    nTotal--;
                    bool fRet = fAllOk;     
                    // reset the status for new work later
                    if (fMaster) fAllOk = true;
                    return fRet;
                }
                nIdle++;
                cond.wait(lock); 
                nIdle--;
            }
            nNow = std::max(
                1U, std::min(nBatchSize, (unsigned int)queue.size() /
                                             (nTotal + nIdle + 1)));
            vChecks.resize(nNow);
            for (unsigned int i = 0; i < nNow; i++) {
                vChecks[i].swap(queue.back());
                queue.pop_back();       //將放到局部隊列中的任務清除
            }
            fOk = fAllOk;
        }
        // execute work; 執行本線程剛分到的工做。
        for (T &check : vChecks) {
            if (fOk) fOk = check();
        }
        vChecks.clear();
    } while (true);
    
}
複製代碼

使用解讀:函數

  • boost::mutex mutex;: 互斥鎖保護內部的狀態
  • boost::condition_variable condWorker;: 在沒有工做時,工做線程阻塞條件變量
  • boost::condition_variable condMaster;: 在沒有工做時,master線程阻塞條件變量
  • std::vector<T> queue;: 要處理元素的隊列
  • int nIdle;: 空閒的工做線程數量(包含主線程)
  • int nTotal;: 總的工做線程的數量,包含主線程
  • bool fAllOk;: 臨時評估結果
  • unsigned int nTodo;: 還有多少驗證任務沒有完成,包括不在排隊的,但仍在工做線程本身的批次中的任務數量
  • bool fQuit;: 是否須要退出
  • unsigned int nBatchSize;: 每一個批次最大的元素處理數量

隊列中使用了模板類,執行的驗證任務由T標識,T都必須提供一個重載的operator()方法,而且反回一個bool。 默認爲主線程push 批量任務到隊列中,其餘的工做線程去處理這些任務,當主線程push完任務後,也去處理這些任務,直到任務隊列所有處理完畢。 上述是隊列的實現:主要的任務處理是在Loop()函數中; 該隊列會進行兩種調用,來處理隊列中的任務:oop

  1. 添加任務後:自動喚醒阻塞的工做線程去處理添加的任務;細節請看:void Add(std::vector<T> &vChecks)
  2. 主線程添加完任務後,調用bool Wait(),也去處理隊列中的任務,隊列中的所有任務處理完後,主線程退出。 void Add():給類的內部隊列批量添加任務,本次操做受鎖保護,並更新全部的狀態。

若是剛添加的任務數量爲1,只喚醒一個工做線程去處理;不然,喚醒所有工做線程。區塊鏈

採用RAII機制去操做任務隊列

RAII機制(Resource Acquisition Is Initialization)是Bjarne Stroustrup首先提出的。要解決的是這樣一個問題:ui

在C++中,若是在這個程序段結束時須要完成一些資源釋放工做,那麼正常狀況下天然是沒有什麼問題,可是當一個異常拋出時,釋放資源的語句就不會被執行。 因而 [Bjarne Stroustrup] 就想到確保能運行資源釋放代碼的地方就是在這個程序段(棧幀)中放置的對象的析構函數了,由於 stack winding 會保證它們的析構函數都會被執行。spa

將初始化和資源釋放都移動到一個包裝類中的好處:線程

  • 保證了資源的正常釋放
  • 省去了在異常處理中冗長而重複甚至有些還不必定執行到的清理邏輯,進而確保了代碼的異常安全。
  • 簡化代碼體積。
template <typename T> class CCheckQueueControl {
private:
    CCheckQueue<T> *pqueue;
    bool fDone;

public:
    CCheckQueueControl(CCheckQueue<T> *pqueueIn)
        : pqueue(pqueueIn), fDone(false) {
        if (pqueue != nullptr) {
            bool isIdle = pqueue->IsIdle();    
            assert(isIdle);
        }
    }
    
    bool Wait() {
        if (pqueue == nullptr) return true;
        bool fRet = pqueue->Wait();    
        fDone = true;
        return fRet;
    }

    void Add(std::vector<T> &vChecks) {
        if (pqueue != nullptr) pqueue->Add(vChecks);
    }
    
    ~CCheckQueueControl() {
        if (!fDone) Wait();
    }
};
複製代碼

該類主要是用來管理 CCheckQueue對象;採用RAII機制,保證每次析構該類的對象時,CCheckQueue中的任務隊列被所有處理。 用來構建該對象的任務隊列只能是nil, 或者隊列中無任務。 由於建立的該對象在析構時會調用任務隊列的wait()方法去處理完隊列中全部的任務,而後退出。 方法解釋:code

  • bool Wait()處理完隊列中的全部任務後,該方法退出,並返回這些任務的處理結果
  • void Add()向 CCheckQueue 中添加任務,喚醒子線程去處理
  • ~CCheckQueueControl()對象析構時,調用wait()方法保證了該隊列中的全部任務都被處理

CCheckQueue的使用

在塊來的時候激活主鏈使用使用了檢查隊列:

static bool ConnectBlock(const Config &config, const CBlock &block, CValidationState &state, CBlockIndex *pindex, CCoinsViewCache &view, const CChainParams &chainparams, bool fJustCheck = false) {
    ...
    
    CCheckQueueControl<CScriptCheck> control(fScriptChecks ? &scriptcheckqueue : nullptr);
        ...
    for (size_t i = 0; i < block.vtx.size(); i++) {
        ...
        if (!tx.IsCoinBase()) {
            Amount fee = view.GetValueIn(tx) - tx.GetValueOut();
            nFees += fee.GetSatoshis();

            // Don't cache results if we're actually connecting blocks (still
            // consult the cache, though).
            bool fCacheResults = fJustCheck;

            std::vector<CScriptCheck> vChecks;
          
            if (!CheckInputs(tx, state, view, fScriptChecks, flags,
                             fCacheResults, fCacheResults,
                             PrecomputedTransactionData(tx), &vChecks)) {
                return error("ConnectBlock(): CheckInputs on %s failed with %s",
                             tx.GetId().ToString(), FormatStateMessage(state));
            }

            control.Add(vChecks);   
        }
        ...
    }
    
    ...
}
複製代碼

ConnectBlock將該區塊連接到當前激活鏈上,並更新UTXO集合。 在該方法中:使用了全局對象scriptcheckqueue去構造了一個臨時的管理對象,並經過該管理對象來操做全局任務隊列,用來添加任務,以及執行任務。當該臨時的管理對象析構時,會調用wait()方法,加入任務處理,處理完全部任務後,該對象析構完成。

CScriptCheck(根據每一個交易輸入構造的檢查任務)

CScriptCheck源代碼:

class CScriptCheck {
private:
    CScript scriptPubKey;   
    Amount amount;      
    const CTransaction *ptxTo;
    unsigned int nIn;         
    uint32_t nFlags;          
    bool cacheStore;
    ScriptError error;       
    PrecomputedTransactionData txdata;  
public:
    CScriptCheck()
        : amount(0), ptxTo(0), nIn(0), nFlags(0), cacheStore(false),
          error(SCRIPT_ERR_UNKNOWN_ERROR), txdata() {}

    CScriptCheck(const CScript &scriptPubKeyIn, const Amount amountIn,
                 const CTransaction &txToIn, unsigned int nInIn,
                 uint32_t nFlagsIn, bool cacheIn,
                 const PrecomputedTransactionData &txdataIn)
        : scriptPubKey(scriptPubKeyIn), amount(amountIn), ptxTo(&txToIn),
          nIn(nInIn), nFlags(nFlagsIn), cacheStore(cacheIn),
          error(SCRIPT_ERR_UNKNOWN_ERROR), txdata(txdataIn) {}

    bool operator()();   
    
    void swap(CScriptCheck &check) {
        scriptPubKey.swap(check.scriptPubKey);
        std::swap(ptxTo, check.ptxTo);
        std::swap(amount, check.amount);
        std::swap(nIn, check.nIn);
        std::swap(nFlags, check.nFlags);
        std::swap(cacheStore, check.cacheStore);
        std::swap(error, check.error);
        std::swap(txdata, check.txdata);
    }

    ScriptError GetScriptError() const { return error; }
};
複製代碼

代碼解釋:

  • CScript scriptPubKey; 鎖定腳本(即該驗證交易的某個引用輸出對應的鎖定腳本)
  • Amount amount; 上述鎖定腳本對應的金額(即花費的UTXO的金額)
  • const CTransaction *ptxTo; 正在花費的交易,即要檢查的交易
  • unsigned int nIn; 要檢查該交易的第幾個輸入;
  • uint32_t nFlags; 檢查標識
  • ScriptError error; 驗證出錯的緣由
  • bool operator()(); 此處重載了()運算符,執行腳本檢查操做;

詳情見下篇文章:《腳本驗證》


本文由 Copernicus團隊 姚永芯寫做,轉載無需受權。

相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息