比特幣源碼分析-boost::signal的使用

bitcoin 代碼中大量使用 boost::signal, boost::signal 實現了信號與槽的事件通知機制,或者說是一種消息的發佈與訂閱機制, signal 類型是一個可調用類型,slot 就是callback 對象,或者說事件的訂閱者,signal 實例是一個可調用對象,調用signal 對象,就至關於發佈了相應的事件, signal 的connectdisconnect 方法分別至關於對事件的訂閱,取消。ios

#include <boost/signals2.hpp>
#include <iostream>

void print_args(float x, float y) {
	  std::cout << "The arguments are " << x << " and " << y << std::endl;
}

void print_sum(float x, float y) {
	  std::cout << "The sum is " << x + y << std::endl;
}

void print_product(float x, float y) {
	  std::cout << "The product is " << x * y << std::endl;
}

void print_difference(float x, float y) {
	  std::cout << "The difference is " << x - y << std::endl;
}

void print_quotient(float x, float y) {
	  std::cout << "The quotient is " << x / y << std::endl;
}

int main() {
	boost::signals2::signal<void(float, float)> sig;

	sig.connect(print_args);
	sig.connect(print_sum);
	sig.connect(print_product);
	sig.connect(print_difference);
	sig.connect(print_quotient);

	sig(5., 3.);
	return 0;

複製代碼

上面這個例子, 有五個函數訂閱sig 事件,sig(5. , 3.) 的調用觸發事件,參數5,3,至關於事件攜帶的消息paylaod, 傳給了五個事件訂閱者。c++

bitcoin 中定義了類型CMainSignals 來統一管理各個功能模塊的事件通知,CMainSignal 是一個資源管理類型, 主要工做代理給由unique_ptr 管理內存的成員 m_internals , 它的類型是MainSignalsInstance,內部定義十個boost signal 變量, 分別表達十種要通知的事件。網絡

class CMainSignals {
private:
    std::unique_ptr<MainSignalsInstance> m_internals;

    friend void ::RegisterValidationInterface(CValidationInterface*);
    friend void ::UnregisterValidationInterface(CValidationInterface*);
    friend void ::UnregisterAllValidationInterfaces();
    friend void ::CallFunctionInValidationInterfaceQueue(std::function<void ()> func);

    void MempoolEntryRemoved(CTransactionRef tx, MemPoolRemovalReason reason);

public:
    /** Register a CScheduler to give callbacks which should run in the background (may only be called once) */
    void RegisterBackgroundSignalScheduler(CScheduler& scheduler);
    /** Unregister a CScheduler to give callbacks which should run in the background - these callbacks will now be dropped! */
    void UnregisterBackgroundSignalScheduler();
    /** Call any remaining callbacks on the calling thread */
    void FlushBackgroundCallbacks();

    size_t CallbacksPending();

    /** Register with mempool to call TransactionRemovedFromMempool callbacks */
    void RegisterWithMempoolSignals(CTxMemPool& pool);
    /** Unregister with mempool */
    void UnregisterWithMempoolSignals(CTxMemPool& pool);

    void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload);
    void TransactionAddedToMempool(const CTransactionRef &);
    void BlockConnected(const std::shared_ptr<const CBlock> &, const CBlockIndex *pindex, const std::shared_ptr<const std::vector<CTransactionRef>> &);
    void BlockDisconnected(const std::shared_ptr<const CBlock> &);
    void SetBestChain(const CBlockLocator &);
    void Inventory(const uint256 &);
    void Broadcast(int64_t nBestBlockTime, CConnman* connman);
    void BlockChecked(const CBlock&, const CValidationState&);
    void NewPoWValidBlock(const CBlockIndex *, const std::shared_ptr<const CBlock>&);
};

struct MainSignalsInstance {
    boost::signals2::signal<void (const CBlockIndex *, const CBlockIndex *, bool fInitialDownload)> UpdatedBlockTip;
    boost::signals2::signal<void (const CTransactionRef &)> TransactionAddedToMempool;
    boost::signals2::signal<void (const std::shared_ptr<const CBlock> &, const CBlockIndex *pindex, const std::vector<CTransactionRef>&)> BlockConnected;
    boost::signals2::signal<void (const std::shared_ptr<const CBlock> &)> BlockDisconnected;
    boost::signals2::signal<void (const CTransactionRef &)> TransactionRemovedFromMempool;
    boost::signals2::signal<void (const CBlockLocator &)> SetBestChain;
    boost::signals2::signal<void (const uint256 &)> Inventory;
    boost::signals2::signal<void (int64_t nBestBlockTime, CConnman* connman)> Broadcast;
    boost::signals2::signal<void (const CBlock&, const CValidationState&)> BlockChecked;
    boost::signals2::signal<void (const CBlockIndex *, const std::shared_ptr<const CBlock>&)> NewPoWValidBlock;

    // We are not allowed to assume the scheduler only runs in one thread,
    // but must ensure all callbacks happen in-order, so we end up creating
    // our own queue here :(
    SingleThreadedSchedulerClient m_schedulerClient;

    explicit MainSignalsInstance(CScheduler *pscheduler) : m_schedulerClient(pscheduler) {}
};

複製代碼

類型CValidationInterface 主要是統一表示某些對MainSignalsInstance 中定義的十個事件感興趣的對象, 即這些事件的訂閱者, 全部對這些事件感興趣代碼繼承CValidationInterface類型, 提供本身版本的這些虛成員函數的實現,覆蓋baseCValidationInterface 中對應的空方法, 表達對相應的事件感興趣, 不感興趣的事件的回調方法繼續是那些繼承自base class 的空方法。app

class CValidationInterface {
protected:
    
    virtual void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {}
    
    virtual void TransactionAddedToMempool(const CTransactionRef &ptxn) {}
   
    virtual void TransactionRemovedFromMempool(const CTransactionRef &ptx) {}
    
    virtual void BlockConnected(const std::shared_ptr<const CBlock> &block, const CBlockIndex *pindex, const std::vector<CTransactionRef> &txnConflicted) {}
    
    virtual void BlockDisconnected(const std::shared_ptr<const CBlock> &block) {}
    
    virtual void SetBestChain(const CBlockLocator &locator) {}
   
    virtual void Inventory(const uint256 &hash) {}

    virtual void ResendWalletTransactions(int64_t nBestBlockTime, CConnman* connman) {}
    
    virtual void BlockChecked(const CBlock&, const CValidationState&) {}
    
    virtual void NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock>& block) {};
    friend void ::RegisterValidationInterface(CValidationInterface*);
    friend void ::UnregisterValidationInterface(CValidationInterface*);
    friend void ::UnregisterAllValidationInterfaces();
};

複製代碼

CValidationInterface 有四個子類, CWallet , CZMQNotificationInterface, submitblock_StateCatcher, PeerLogicValidation 分別對應四個對MainSignalsInstance 中的事件感興趣的訂閱者。ide

class CWallet final : public CCryptoKeyStore, public CValidationInterface
{
       ...............
           void TransactionAddedToMempool(const CTransactionRef& tx) override;
           void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex *pindex, const std::vector<CTransactionRef>& vtxConflicted) override;
           void BlockDisconnected(const std::shared_ptr<const CBlock>& pblock) override;
           void TransactionRemovedFromMempool(const CTransactionRef &ptx) override;
           void ResendWalletTransactions(int64_t nBestBlockTime, CConnman* connman) override;
           void SetBestChain(const CBlockLocator& loc) override;
           void Inventory(const uint256 &hash) override
          {
              {
                  LOCK(cs_wallet);
                 std::map<uint256, int>::iterator mi = mapRequestCount.find(hash);
                 if (mi != mapRequestCount.end())
                             (*mi).second++;
              }
          }
       ...............
};

class CZMQNotificationInterface final : public CValidationInterface
{
     ................
    // CValidationInterface
    void TransactionAddedToMempool(const CTransactionRef& tx) override;
    void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected, const std::vector<CTransactionRef>& vtxConflicted) override;
    void BlockDisconnected(const std::shared_ptr<const CBlock>& pblock) override;
    void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;
    .................
};
class submitblock_StateCatcher : public CValidationInterface
{
public:
    uint256 hash;
    bool found;
    CValidationState state;

    explicit submitblock_StateCatcher(const uint256 &hashIn) : hash(hashIn), found(false), state() {}

protected:
    void BlockChecked(const CBlock& block, const CValidationState& stateIn) override {
        if (block.GetHash() != hash)
            return;
        found = true;
        state = stateIn;
    }
};

class PeerLogicValidation : public CValidationInterface, public NetEventsInterface 
{
private:
    CConnman* const connman;

public:
    explicit PeerLogicValidation(CConnman* connman, CScheduler &scheduler);

    void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected, const std::vector<CTransactionRef>& vtxConflicted) override;
    void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;
    void BlockChecked(const CBlock& block, const CValidationState& state) override;
    void NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock>& pblock) override;
    ........
};

複製代碼

這四個訂閱者經過調用RegisterValidationInterfaceUnregisterValidationInterface 訂閱,取消事件通知,函數接受參數是指向訂閱者的指針。函數

void RegisterValidationInterface(CValidationInterface* pwalletIn) {
    g_signals.m_internals->UpdatedBlockTip.connect(boost::bind(&CValidationInterface::UpdatedBlockTip, pwalletIn, _1, _2, _3));
    g_signals.m_internals->TransactionAddedToMempool.connect(boost::bind(&CValidationInterface::TransactionAddedToMempool, pwalletIn, _1));
    g_signals.m_internals->BlockConnected.connect(boost::bind(&CValidationInterface::BlockConnected, pwalletIn, _1, _2, _3));
    g_signals.m_internals->BlockDisconnected.connect(boost::bind(&CValidationInterface::BlockDisconnected, pwalletIn, _1));
    g_signals.m_internals->TransactionRemovedFromMempool.connect(boost::bind(&CValidationInterface::TransactionRemovedFromMempool, pwalletIn, _1));
    g_signals.m_internals->SetBestChain.connect(boost::bind(&CValidationInterface::SetBestChain, pwalletIn, _1));
    g_signals.m_internals->Inventory.connect(boost::bind(&CValidationInterface::Inventory, pwalletIn, _1));
    g_signals.m_internals->Broadcast.connect(boost::bind(&CValidationInterface::ResendWalletTransactions, pwalletIn, _1, _2));
    g_signals.m_internals->BlockChecked.connect(boost::bind(&CValidationInterface::BlockChecked, pwalletIn, _1, _2));
    g_signals.m_internals->NewPoWValidBlock.connect(boost::bind(&CValidationInterface::NewPoWValidBlock, pwalletIn, _1, _2));
}

void UnregisterValidationInterface(CValidationInterface* pwalletIn) {
    g_signals.m_internals->BlockChecked.disconnect(boost::bind(&CValidationInterface::BlockChecked, pwalletIn, _1, _2));
    g_signals.m_internals->Broadcast.disconnect(boost::bind(&CValidationInterface::ResendWalletTransactions, pwalletIn, _1, _2));
    g_signals.m_internals->Inventory.disconnect(boost::bind(&CValidationInterface::Inventory, pwalletIn, _1));
    g_signals.m_internals->SetBestChain.disconnect(boost::bind(&CValidationInterface::SetBestChain, pwalletIn, _1));
    g_signals.m_internals->TransactionAddedToMempool.disconnect(boost::bind(&CValidationInterface::TransactionAddedToMempool, pwalletIn, _1));
    g_signals.m_internals->BlockConnected.disconnect(boost::bind(&CValidationInterface::BlockConnected, pwalletIn, _1, _2, _3));
    g_signals.m_internals->BlockDisconnected.disconnect(boost::bind(&CValidationInterface::BlockDisconnected, pwalletIn, _1));
    g_signals.m_internals->TransactionRemovedFromMempool.disconnect(boost::bind(&CValidationInterface::TransactionRemovedFromMempool, pwalletIn, _1));
    g_signals.m_internals->UpdatedBlockTip.disconnect(boost::bind(&CValidationInterface::UpdatedBlockTip, pwalletIn, _1, _2, _3));
    g_signals.m_internals->NewPoWValidBlock.disconnect(boost::bind(&CValidationInterface::NewPoWValidBlock, pwalletIn, _1, _2));
}
複製代碼

在程序啓動,初始化階段,啓動調度器線程oop

bool AppInitMain() {
      .................
     CScheduler::Function serviceLoop = boost::bind(&CScheduler::serviceQueue, &scheduler);
     threadGroup.create_thread(boost::bind(&TraceThread<CScheduler::Function>, "scheduler", serviceLoop));
     .................
}
複製代碼

調用RegisterBackgroundSignalScheduler(), 初始化全局MainSignalsInstance 實例; 調用RegisterWithMempoolSignals(), 訂閱全局內存池對象的NotifyEntryRemoved 事件通知, MainSignalsInstance 只對因爲超時,大小限制, blockchian 重組,替換等緣由發生的離開內存池事件感興趣, 收到後MainSignalsInstance 再做爲事件發佈者, 轉發給其餘訂閱者, 如CWallet。ui

bool AppInitMain() {
     ......
    GetMainSignals().RegisterBackgroundSignalScheduler(scheduler);
    GetMainSignals().RegisterWithMempoolSignals(mempool);
    ........
 }
複製代碼

初始化全局的鏈接管理對象connman 後, 初始化全局的peerLogic 對象, 而後調用RegisterValidationInterface(), peerLogic 成爲MainSignalsInstance 對象的訂閱者, 若是用戶編譯了zeromq支持模塊,調用RegisterValidationInterface(), pzmqNotificationInterface 訂閱MainSignalsInstance 。this

bool AppInitMain() {
    .............
    assert(!g_connman);
    g_connman = std::unique_ptr<CConnman>(new CConnman(GetRand(std::numeric_limits<uint64_t>::max()), GetRand(std::numeric_limits<uint64_t>::max())));
    CConnman& connman = *g_connman;

    peerLogic.reset(new PeerLogicValidation(&connman, scheduler));
    RegisterValidationInterface(peerLogic.get());
    .............
    
#if ENABLE_ZMQ
    pzmqNotificationInterface = CZMQNotificationInterface::Create();

    if (pzmqNotificationInterface) {
        RegisterValidationInterface(pzmqNotificationInterface);
    }
#endif
    ...............
}
複製代碼

若是錢包功能開啓, 啓動後打開錢包過程,會把錢包註冊成爲MainSignalsInstance的訂閱者。編碼

bool AppInitMain() {
      ........
#ifdef ENABLE_WALLET
    if (!OpenWallets())
        return false;
#else
    LogPrintf("No wallet support compiled in!\n");
#endif

      ........
}

bool OpenWallets() {
    if (gArgs.GetBoolArg("-disablewallet", DEFAULT_DISABLE_WALLET)) {
        LogPrintf("Wallet disabled!\n");
        return true;
    }

    for (const std::string& walletFile : gArgs.GetArgs("-wallet")) {
        CWallet * const pwallet = CWallet::CreateWalletFromFile(walletFile);
        if (!pwallet) {
            return false;
        }
        vpwallets.push_back(pwallet);
    }

    return true;
}

CWallet* CWallet::CreateWalletFromFile(const std::string walletFile)
{
        ......
            CWallet *walletInstance = new CWallet(std::move(dbw));
            RegisterValidationInterface(walletInstance);
        ......
}    
複製代碼

submitblock rpc調用中, 用戶提交hex編碼的原始block, 解析後, 調用ProcessNewBlock()檢查處理,使用類型submitblock_StateCatcher 的對象sc 做爲MainSignalsInstance 的訂閱者, 對提交過去的block 的驗證結果,做爲事件通知返回給rpc 調用。

UniValue submitblock(const JSONRPCRequest& request)
{
     ...........
     
    submitblock_StateCatcher sc(block.GetHash());
    RegisterValidationInterface(&sc);
    bool fAccepted = ProcessNewBlock(Params(), blockptr, true, nullptr);
    UnregisterValidationInterface(&sc);

     ...........
}
複製代碼

CMainSignals 類型上面定義了一堆觸發事件的方法, 別的代碼模塊調用這些方法, 觸發相應的事件,把事件通知發給相關的訂閱者。

void CMainSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {
    m_internals->m_schedulerClient.AddToProcessQueue([pindexNew, pindexFork, fInitialDownload, this] {
        m_internals->UpdatedBlockTip(pindexNew, pindexFork, fInitialDownload);
    });
}

void CMainSignals::TransactionAddedToMempool(const CTransactionRef &ptx) {
    m_internals->m_schedulerClient.AddToProcessQueue([ptx, this] {
        m_internals->TransactionAddedToMempool(ptx);
    });
}

void CMainSignals::BlockConnected(const std::shared_ptr<const CBlock> &pblock, const CBlockIndex *pindex, const std::shared_ptr<const std::vector<CTransactionRef>>& pvtxConflicted) {
    m_internals->m_schedulerClient.AddToProcessQueue([pblock, pindex, pvtxConflicted, this] {
        m_internals->BlockConnected(pblock, pindex, *pvtxConflicted);
    });
}

void CMainSignals::BlockDisconnected(const std::shared_ptr<const CBlock> &pblock) {
    m_internals->m_schedulerClient.AddToProcessQueue([pblock, this] {
        m_internals->BlockDisconnected(pblock);
    });
}

void CMainSignals::SetBestChain(const CBlockLocator &locator) {
    m_internals->m_schedulerClient.AddToProcessQueue([locator, this] {
        m_internals->SetBestChain(locator);
    });
}

void CMainSignals::Inventory(const uint256 &hash) {
    m_internals->m_schedulerClient.AddToProcessQueue([hash, this] {
        m_internals->Inventory(hash);
    });
}

void CMainSignals::Broadcast(int64_t nBestBlockTime, CConnman* connman) {
    m_internals->Broadcast(nBestBlockTime, connman);
}

void CMainSignals::BlockChecked(const CBlock& block, const CValidationState& state) {
    m_internals->BlockChecked(block, state);
}

void CMainSignals::NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock> &block) {
    m_internals->NewPoWValidBlock(pindex, block);
}
複製代碼

CChainState 的ActivateBestChain方法裏, 發佈BlockConnected, UpdatedBlockTip 事件:

bool CChainState::ActivateBestChain(CValidationState &state, const CChainParams& chainparams, std::shared_ptr<const CBlock> pblock) {
           ..............................
           for (const PerBlockConnectTrace& trace : connectTrace.GetBlocksConnected()) {
                assert(trace.pblock && trace.pindex);
                GetMainSignals().BlockConnected(trace.pblock, trace.pindex, trace.conflictedTxs);
            }

           ..............................
           GetMainSignals().UpdatedBlockTip(pindexNewTip, pindexFork, fInitialDownload);
           ...............................
}
複製代碼

CChainState 的AcceptBlock 方法裏,發佈NewPoWValidBlock 事件:

bool CChainState::AcceptBlock(const std::shared_ptr<const CBlock>& pblock, CValidationState& state, const CChainParams& chainparams, CBlockIndex** ppindex, bool fRequested, const CDiskBlockPos* dbp, bool* fNewBlock)
{
        .......................
        
        if (!IsInitialBlockDownload() && chainActive.Tip() == pindex->pprev)
               GetMainSignals().NewPoWValidBlock(pindex, pblock);

        ......................
}
複製代碼

CChainState 的DisconnectTip 方法裏,發佈 BlockDisconnected 事件:

bool CChainState::DisconnectTip(CValidationState& state, const CChainParams& chainparams, DisconnectedBlockTransactions *disconnectpool)
{
   ...................
     GetMainSignals().BlockDisconnected(pblock);
     ..............
}
複製代碼

CChainState 的ConnectTip 方法裏,發佈 BlockChecked 事件:

bool CChainState::ConnectTip(CValidationState& state, const CChainParams& chainparams, CBlockIndex* pindexNew, const std::shared_ptr<const CBlock>& pblock, ConnectTrace& connectTrace, DisconnectedBlockTransactions &disconnectpool)
{
     ................
     
        CCoinsViewCache view(pcoinsTip.get());
        bool rv = ConnectBlock(blockConnecting, state, pindexNew, view, chainparams);
        GetMainSignals().BlockChecked(blockConnecting, state);

     ................
}
複製代碼

PeerLogicValidation 的 SendMessage 方法裏, 發佈Broadcast事件, 通知錢包從新發送未確認的交易:

bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic<bool>& interruptMsgProc)
{
    .............
    if (!fReindex && !fImporting && !IsInitialBlockDownload())
    {
            GetMainSignals().Broadcast(nTimeBestReceived, connman);
    }
    .............
}
複製代碼

從網絡上收到INV消息後,通知給錢包,更新內部狀態。

bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman* connman, const std::atomic<bool>& interruptMsgProc) {
            .................
            for (CInv &inv : vInv)
           {
               ............
               GetMainSignals().Inventory(inv.hash);

           }
            ................
}
複製代碼

validation.cpp 裏面的 ProcessNewBlock在內部調用 CheckBlock 後,檢查失敗後,發佈 BlockCheck s事件, 通告給相關訂閱者。

bool ProcessNewBlock(const CChainParams& chainparams, const std::shared_ptr<const CBlock> pblock, bool fForceProcessing, bool *fNewBlock) {
      ............
      bool ret = CheckBlock(*pblock, state, chainparams.GetConsensus());
      if (!ret) {
            GetMainSignals().BlockChecked(*pblock, state);
            return error("%s: AcceptBlock FAILED (%s)", __func__, state.GetDebugMessage());
     }
      ...........

}
複製代碼

FlushStateToDisk, 發佈SetBestChain 事件, 通知錢包

bool static FlushStateToDisk(const CChainParams& chainparams, CValidationState &state, FlushStateMode mode, int nManualPruneHeight) {
     ...............
     if (fDoFullFlush || ((mode == FLUSH_STATE_ALWAYS || mode == FLUSH_STATE_PERIODIC) && nNow > nLastSetChain + (int64_t)DATABASE_WRITE_INTERVAL * 1000000)) {
        // Update best block in wallet (so we can detect restored wallets).
        GetMainSignals().SetBestChain(chainActive.GetLocator());
        nLastSetChain = nNow;
    }
     ...............
}
複製代碼

validation.cpp 裏面的AcceptToMemoryPoolWorker, 在結束前, 發佈TransactionAddedToMempool 事件。

static bool AcceptToMemoryPoolWorker(const CChainParams& chainparams, CTxMemPool& pool, CValidationState& state, const CTransactionRef& ptx, bool* pfMissingInputs, int64_t nAcceptTime, std::list<CTransactionRef>* plTxnReplaced, bool bypass_limits, const CAmount& nAbsurdFee, std::vector<COutPoint>& coins_to_uncache) {
         ................
         GetMainSignals().TransactionAddedToMempool(ptx);

}
複製代碼

本文由 Copernicus團隊 喻建寫做,轉載無需受權

原文連接:mp.weixin.qq.com/s/Gru2eMWLn…

相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息