多線程腳本檢查啓動代碼:c++
bool AppInitMain(Config &config, boost::thread_group &threadGroup, CScheduler &scheduler) {
...
if (nScriptCheckThreads) {
for (int i = 0; i < nScriptCheckThreads - 1; i++) {
threadGroup.create_thread(&ThreadScriptCheck);
}
}
...
}
static CCheckQueue<CScriptCheck> scriptcheckqueue(128);
void ThreadScriptCheck() {
RenameThread("bitcoin-scriptch");
scriptcheckqueue.Thread();
}
複製代碼
在 AppInitMain
中根據選項,建立多個線程。 此處使用了boost的線程庫,在綁定的線程函數ThreadScriptCheck
中,調用一個全局狀態的任務隊列scriptcheckqueue
。每一個線程都去該隊列中去任務,當隊列中無任務可執行時,線程被條件變量阻塞。安全
任務隊列代碼:多線程
template <typename T> class CCheckQueue {
private:
boost::mutex mutex;
boost::condition_variable condWorker;
boost::condition_variable condMaster;
std::vector<T> queue;
int nIdle;
int nTotal;
bool fAllOk;
unsigned int nTodo;
bool fQuit;
unsigned int nBatchSize;
bool Loop(bool fMaster = false);
public:
//! Create a new check queue
CCheckQueue(unsigned int nBatchSizeIn)
: nIdle(0), nTotal(0), fAllOk(true), nTodo(0), fQuit(false),
nBatchSize(nBatchSizeIn) {}
void Thread() { Loop(); }
bool Wait() { return Loop(true); }
void Add(std::vector<T> &vChecks) {
boost::unique_lock<boost::mutex> lock(mutex);
for (T &check : vChecks) {
queue.push_back(std::move(check));
}
nTodo += vChecks.size();
if (vChecks.size() == 1) {
condWorker.notify_one();
} else if (vChecks.size() > 1) {
condWorker.notify_all();
}
}
bool IsIdle() {
boost::unique_lock<boost::mutex> lock(mutex);
return (nTotal == nIdle && nTodo == 0 && fAllOk == true);
}
~CCheckQueue() {}
}
bool CCheckQueue::Loop(bool fMaster = false){
boost::condition_variable &cond = fMaster ? condMaster : condWorker;
std::vector<T> vChecks;
vChecks.reserve(nBatchSize);
unsigned int nNow = 0;
bool fOk = true;
do {
{
boost::unique_lock<boost::mutex> lock(mutex);
// first do the clean-up of the previous loop run (allowing us
// to do it in the same critsect)
if (nNow) {
fAllOk &= fOk;
nTodo -= nNow;
if (nTodo == 0 && !fMaster)
// We processed the last element; inform the master it
// can exit and return the result
condMaster.notify_one();
} else {
nTotal++;
}
while (queue.empty()) {
if ((fMaster || fQuit) && nTodo == 0) {
nTotal--;
bool fRet = fAllOk;
// reset the status for new work later
if (fMaster) fAllOk = true;
return fRet;
}
nIdle++;
cond.wait(lock);
nIdle--;
}
nNow = std::max(
1U, std::min(nBatchSize, (unsigned int)queue.size() /
(nTotal + nIdle + 1)));
vChecks.resize(nNow);
for (unsigned int i = 0; i < nNow; i++) {
vChecks[i].swap(queue.back());
queue.pop_back(); //將放到局部隊列中的任務清除
}
fOk = fAllOk;
}
// execute work; 執行本線程剛分到的工做。
for (T &check : vChecks) {
if (fOk) fOk = check();
}
vChecks.clear();
} while (true);
}
複製代碼
使用解讀:函數
boost::mutex mutex;
: 互斥鎖保護內部的狀態boost::condition_variable condWorker;
: 在沒有工做時,工做線程阻塞條件變量boost::condition_variable condMaster;
: 在沒有工做時,master線程阻塞條件變量std::vector<T> queue;
: 要處理元素的隊列int nIdle;
: 空閒的工做線程數量(包含主線程)int nTotal;
: 總的工做線程的數量,包含主線程bool fAllOk;
: 臨時評估結果unsigned int nTodo;
: 還有多少驗證任務沒有完成,包括不在排隊的,但仍在工做線程本身的批次中的任務數量bool fQuit;
: 是否須要退出unsigned int nBatchSize;
: 每一個批次最大的元素處理數量隊列中使用了模板類,執行的驗證任務由T標識,T都必須提供一個重載的operator()方法,而且反回一個bool。 默認爲主線程push 批量任務到隊列中,其餘的工做線程去處理這些任務,當主線程push完任務後,也去處理這些任務,直到任務隊列所有處理完畢。 上述是隊列的實現:主要的任務處理是在Loop()
函數中; 該隊列會進行兩種調用,來處理隊列中的任務:oop
void Add(std::vector<T> &vChecks)
bool Wait()
,也去處理隊列中的任務,隊列中的所有任務處理完後,主線程退出。 void Add()
:給類的內部隊列批量添加任務,本次操做受鎖保護,並更新全部的狀態。若是剛添加的任務數量爲1,只喚醒一個工做線程去處理;不然,喚醒所有工做線程。區塊鏈
RAII機制(Resource Acquisition Is Initialization)是Bjarne Stroustrup首先提出的。要解決的是這樣一個問題:ui
在C++中,若是在這個程序段結束時須要完成一些資源釋放工做,那麼正常狀況下天然是沒有什麼問題,可是當一個異常拋出時,釋放資源的語句就不會被執行。 因而 [Bjarne Stroustrup] 就想到確保能運行資源釋放代碼的地方就是在這個程序段(棧幀)中放置的對象的析構函數了,由於 stack winding 會保證它們的析構函數都會被執行。spa
將初始化和資源釋放都移動到一個包裝類中的好處:線程
template <typename T> class CCheckQueueControl {
private:
CCheckQueue<T> *pqueue;
bool fDone;
public:
CCheckQueueControl(CCheckQueue<T> *pqueueIn)
: pqueue(pqueueIn), fDone(false) {
if (pqueue != nullptr) {
bool isIdle = pqueue->IsIdle();
assert(isIdle);
}
}
bool Wait() {
if (pqueue == nullptr) return true;
bool fRet = pqueue->Wait();
fDone = true;
return fRet;
}
void Add(std::vector<T> &vChecks) {
if (pqueue != nullptr) pqueue->Add(vChecks);
}
~CCheckQueueControl() {
if (!fDone) Wait();
}
};
複製代碼
該類主要是用來管理 CCheckQueue
對象;採用RAII機制,保證每次析構該類的對象時,CCheckQueue
中的任務隊列被所有處理。 用來構建該對象的任務隊列只能是nil, 或者隊列中無任務。 由於建立的該對象在析構時會調用任務隊列的wait()方法去處理完隊列中全部的任務,而後退出。 方法解釋:code
bool Wait()
處理完隊列中的全部任務後,該方法退出,並返回這些任務的處理結果void Add()
向 CCheckQueue 中添加任務,喚醒子線程去處理~CCheckQueueControl()
對象析構時,調用wait()方法保證了該隊列中的全部任務都被處理在塊來的時候激活主鏈使用使用了檢查隊列:
static bool ConnectBlock(const Config &config, const CBlock &block, CValidationState &state, CBlockIndex *pindex, CCoinsViewCache &view, const CChainParams &chainparams, bool fJustCheck = false) {
...
CCheckQueueControl<CScriptCheck> control(fScriptChecks ? &scriptcheckqueue : nullptr);
...
for (size_t i = 0; i < block.vtx.size(); i++) {
...
if (!tx.IsCoinBase()) {
Amount fee = view.GetValueIn(tx) - tx.GetValueOut();
nFees += fee.GetSatoshis();
// Don't cache results if we're actually connecting blocks (still
// consult the cache, though).
bool fCacheResults = fJustCheck;
std::vector<CScriptCheck> vChecks;
if (!CheckInputs(tx, state, view, fScriptChecks, flags,
fCacheResults, fCacheResults,
PrecomputedTransactionData(tx), &vChecks)) {
return error("ConnectBlock(): CheckInputs on %s failed with %s",
tx.GetId().ToString(), FormatStateMessage(state));
}
control.Add(vChecks);
}
...
}
...
}
複製代碼
ConnectBlock
將該區塊連接到當前激活鏈上,並更新UTXO集合。 在該方法中:使用了全局對象scriptcheckqueue
去構造了一個臨時的管理對象,並經過該管理對象來操做全局任務隊列,用來添加任務,以及執行任務。當該臨時的管理對象析構時,會調用wait()方法,加入任務處理,處理完全部任務後,該對象析構完成。
CScriptCheck源代碼:
class CScriptCheck {
private:
CScript scriptPubKey;
Amount amount;
const CTransaction *ptxTo;
unsigned int nIn;
uint32_t nFlags;
bool cacheStore;
ScriptError error;
PrecomputedTransactionData txdata;
public:
CScriptCheck()
: amount(0), ptxTo(0), nIn(0), nFlags(0), cacheStore(false),
error(SCRIPT_ERR_UNKNOWN_ERROR), txdata() {}
CScriptCheck(const CScript &scriptPubKeyIn, const Amount amountIn,
const CTransaction &txToIn, unsigned int nInIn,
uint32_t nFlagsIn, bool cacheIn,
const PrecomputedTransactionData &txdataIn)
: scriptPubKey(scriptPubKeyIn), amount(amountIn), ptxTo(&txToIn),
nIn(nInIn), nFlags(nFlagsIn), cacheStore(cacheIn),
error(SCRIPT_ERR_UNKNOWN_ERROR), txdata(txdataIn) {}
bool operator()();
void swap(CScriptCheck &check) {
scriptPubKey.swap(check.scriptPubKey);
std::swap(ptxTo, check.ptxTo);
std::swap(amount, check.amount);
std::swap(nIn, check.nIn);
std::swap(nFlags, check.nFlags);
std::swap(cacheStore, check.cacheStore);
std::swap(error, check.error);
std::swap(txdata, check.txdata);
}
ScriptError GetScriptError() const { return error; }
};
複製代碼
代碼解釋:
CScript scriptPubKey;
鎖定腳本(即該驗證交易的某個引用輸出對應的鎖定腳本)Amount amount;
上述鎖定腳本對應的金額(即花費的UTXO的金額)const CTransaction *ptxTo;
正在花費的交易,即要檢查的交易unsigned int nIn;
要檢查該交易的第幾個輸入;uint32_t nFlags;
檢查標識ScriptError error;
驗證出錯的緣由bool operator()();
此處重載了()運算符,執行腳本檢查操做;詳情見下篇文章:《腳本驗證》
本文由 Copernicus團隊 姚永芯
寫做,轉載無需受權。