【劉文彬】 Debug EOS:nodeos + mongo_db_plugin

原文連接:醒者呆的博客園,www.cnblogs.com/Evsward/p/s…html

上文書說到區塊鏈的存儲方式,並結合了EOSIO進行分析,其中也提到了使用CLion調試EOS的方法。本文將繼續深刻細緻地展開對加載了mongo_db_plugin的nodeos的調試過程以及心得。node

關鍵字:源碼分析,Debug EOS,nodeos,mongo_db_plugin,CLion,C++,boost::asio::signal_set,queuelinux

本文涉及的環境:clang-6.0, clang++-6.0, GDB Debugger, make 4.1, mongodb-linux-x86_64-3.6.3, boost 1.67.0sql

調試EOS: nodeos

關於EOS的調試環境的搭建這裏再也不贅述了,下文開始針對nodeos程序進行調試。mongodb

(一)CMakeList.txt

nodeos開始運行前,要先使用項目的總CmakeList.txt配置,這裏我配置了boost庫的位置,若是你配置了boost的環境變量能夠跳過這裏。數據庫

set( BOOST_ROOT "/home/evsward/opt/boost")
複製代碼
  • 這個文件中有不少的set語句,這些語句都是開關,或者路徑,或者全局變量,是配置各個子CMakeList.txt而用的。
  • include 語句是爲runtime引入相關依賴庫。
  • add_subdirectory語句設置了子目錄程序。
  • install語句是將相關命令安裝到指定位置以供runtime後續使用。

總的CMakeList文件介紹完了,下面會執行到nodeos目錄下的CMakeList.txt文件:json

  • add_executable( nodeos main.cpp )語句設定了nodeos程序執行入口。
  • set, configure_file, include, install 等都是爲runtime準備的環境相關的。
  • 重點語句target_link_libraries,這裏定義了鏈runtime環境須要啓動的plugin。(注意記住這個順序)

(二)static register plugin

咱們打開每個plugin的cpp文件,會發現有一個static的register方法的調用。這裏會首先執行按上面plugin定義的順序中第一個login_plugin,它的static語句以下:數組

static appbase::abstract_plugin& _login_plugin = app().register_plugin<login_plugin>();
複製代碼

執行此語句時,會先執行app(),這是application單例方法。bash

(三)application

  • application是nodeos的執行者,上面調用的app函數:
application& application::instance() {
   static application _app;
   return _app;
}
application& app() { return application::instance(); }
複製代碼
application與plugin擁有相同的實現函數,而因爲它做爲執行者、統籌者的存在,它會安排調用全部plugin,例如set_program_options。
複製代碼
  • 執行app()之後獲取到了application的實例,而後調用了register_plugin函數,經過模板類(泛型類)攜帶了login_plugin的類型。register_plugin函數是模板函數,定義在application.hpp文件中。數據結構

  • application.hpp 中定義了私有的內存變量

    map<string, std::unique_ptr<abstract_plugin>> plugins;
    複製代碼
  • abstract_plugin是全部plugin的基類,它定義了虛函數,須要繼承它的子類去實現。他們與application的關係是:

    abstract_plugin=>plugin(對基類的虛函數進一步使用,由application定義管理)=>各個plugin
    複製代碼
template<typename Plugin>
auto& register_plugin() {
    auto existing = find_plugin<Plugin>(); // 從plugins尋找該plugin是否已註冊。
    if(existing)
       return *existing; // 若是已註冊了直接返回plugin的實例

    auto plug = new Plugin(); // 建立該未註冊plugin的實例
    plugins[plug->name()].reset(plug); // 先插入到上面定義的內存變量plugins
    plug->register_dependencies();// 註冊該plugin的依賴plugins,每一個plugin內部都會調用APPBASE_PLUGIN_REQUIRES((chain_plugin))來註冊本身依賴的別的plugin。
    return *plug; // 返回plugin的實例
 }
複製代碼

(四)main.cpp->main()

在編譯runtime環境結束之後,進入入口函數main(),

int main(int argc, char** argv)
複製代碼

main函數的參數就是調用命令nodeos的經過--加入的參數,咱們能夠經過nodeos的Edit Configuration來調整。其中argc是個數,argv是參數的值,是一個數組類型。以下圖:

咱們接着來看main函數,它的函數體是經過app()對application單例進行的設置,包括版本號、data路徑、config路徑,而後是對於application實例內部方法的調用:

  • initialize<chain_plugin, http_plugin, net_plugin, producer_plugin>
  • startup()
  • exec()

main函數執行了內部函數initialize_logging()還經過ilog打印了日誌,輸出了版本號以及eosio root路徑地址。

因爲main函數是入口函數,上面也介紹了它主要是對application實例的使用,以及一些異常處理等,接下來會逐一進行debug跟蹤分析。

(五)initialize plugin

這個初始化函數是一個模板函數,模板類參數是plugin基類,在main函數調用該函數時傳入了基本的插件依賴(這些是不須要咱們在config中配置的,是鏈啓動的基礎插件):chain_plugin, http_plugin, net_plugin, producer_plugin。下面來看initialize函數在application頭文件中的聲明:

/**
  * @brief 查看 --plugin(存在於命令行或者config配置文件中)調用這些plugin的 initialize方法。
  * 
  * @tparam Plugin plugin的列表用來初始化,即便在config中沒有配置的但被其餘plugin所依賴的plugin,均可以統一使用該模板類沒有影響。
  * @return true:plugin初始化完成,false:出現異常。
*/
template<typename... Plugin>
bool initialize(int argc, char** argv) {
    return initialize_impl(argc, argv, {find_plugin<Plugin>()...}); // ...是可變參數的語法,上面經過main函數的調用,咱們傳入了多個plugin。
}
複製代碼

實現類initialize_impl的內容較多,不粘貼源碼,直接總結一下:

(1)set_program_options()函數

application.cpp文件中的set_program_options()函數是用來生成初始化的config.ini文件內容以及nodeos命令行--help的輸出內容。
複製代碼

該函數首先遍歷插件列表,調用每一個插件都會實現的plugin基類的虛函數set_program_options(options_description& cli, options_description& cfg),例以下面就是mongo_db_plugin的實現:

void mongo_db_plugin::set_program_options(options_description& cli, options_description& cfg)
{
   cfg.add_options()
         ("mongodb-queue-size,q", bpo::value<uint32_t>()->default_value(256),
         "The target queue size between nodeos and MongoDB plugin thread.")
         ("mongodb-wipe", bpo::bool_switch()->default_value(false),
         "Required with --replay-blockchain, --hard-replay-blockchain, or --delete-all-blocks to wipe mongo db."
         "This option required to prevent accidental wipe of mongo db.")
         ("mongodb-block-start", bpo::value<uint32_t>()->default_value(0),
         "If specified then only abi data pushed to mongodb until specified block is reached.")
         ("mongodb-uri,m", bpo::value<std::string>(),
         "MongoDB URI connection string, see: https://docs.mongodb.com/master/reference/connection-string/."
               " If not specified then plugin is disabled. Default database 'EOS' is used if not specified in URI."
               " Example: mongodb://127.0.0.1:27017/EOS")
         ;
}
複製代碼

經過調用mongo_db_plugin的這個方法,就能夠拼湊到config.ini文件中關於mongo_db_plugin的部分,由於這個插件只有對於config.ini配置文件的配置,沒有對於命令行的內容,咱們能夠去查看chain_plugin的實現,它會同時有配置文件和命令行兩個方面的內容設置,源碼較長請自行查看。

配置的對象options_description_easy_init是一個靈活的結構。能夠表示:一個配置項,一個配置的值;一個配置項,一個配置的值,一個註釋或者描述;一個配置項,一個註釋或者描述。這些多種組合,咱們也能夠直接去查看本身的config.ini的每個配置項去對應。

那麼是如何拼湊全部的插件配置內容呢?
複製代碼

application.cpp文件中的set_program_options()函數的函數體中使用了application的兩個類變量來存儲這些參數:

  • _app_options:用於接收來自於命令行和config.ini兩個參數來源的參數。
  • _cfg_options:僅存儲來自於config.ini配置文件的參數。

插件遍歷結束後,咱們已經有了全部插件的config.ini配置內容以及命令行提示配置內容,下面要從宏觀角度去配置一些屬於application的配置項,config.ini中加入了plugins的配置,經過這個配置咱們能夠手動指定要啓動的插件,例如mongo_db_plugin就是須要手動配置的。接着,要配置命令行的參數內容,包括help, version, print-default-config, data-dir, config-dir, config, logconf。將他們追加存儲到上面那兩個類變量中。

到這裏,application.cpp文件中的set_program_options()函數的工做就完成了。

上面提到的_app_options和_cfg_options仍就是傻傻分不清楚,他們的用意究竟是什麼呢?
複製代碼

簡單來理解就是,命令行可以作全部配置文件config.ini中的配置的工做,同時命令行還有專屬於本身的help, version, print-default-config, data-dir, config-dir, config, logconf配置。這樣就好理解了,config.ini是命令行配置的子集,命令行配置是全集。

(2)app全局參數的檢測與合併

咱們回到initialize_impl,目前咱們已經擁有了兩套默認配置參數,這裏直接使用全集_app_options配置,咱們先接收來自於命令行的參數,將以它爲優先級高的方式與_app_options配置合併:

bpo::variables_map options;
bpo::store(bpo::parse_command_line(argc, argv, my->_app_options), options);
複製代碼

(3)app全局參數配置項生效與響應

拿到合併後的配置對象options,依次針對配置項的內容進行響應:

  • help:直接輸出_app_options配置項的所有內容。
  • version:輸出application實例的類成員_version的值。
  • print-default-config:與_app_options無關,從新去每一個plugin找配置,而後會基於_cfg_options生成一份默認的config配置打印到終端界面。
  • data-dir:是設置data目錄的命令保存至application的類成員_data_dir,沒有響應的輸出。
  • config-dir:設置config路徑,保存在類成員_config_dir中。config和data的路徑結構以下:
evsward@evsward-TM1701:~/.local/share/eosio/nodeos$ tree
.
├── config
│   └── config.ini
└── data
    ├── blocks
    │   ├── blocks.index
    │   ├── blocks.log
    │   └── reversible
    │       ├── shared_memory.bin
    │       └── shared_memory.meta
    └── state
        ├── forkdb.dat
        ├── shared_memory.bin
        └── shared_memory.meta

5 directories, 8 files
複製代碼
  • logconf:默認是logging.json,放置在config目錄下面,可自定義設置,保存在類成員_logging_conf中。
  • config:指定配置文件的名字,默認是config.ini。若是發如今config目錄下找到config.ini文件,則按照該文件的配置載入。
bpo::store(bpo::parse_config_file<char>(config_file_name.make_preferred().string().c_str(),
                                           my->_cfg_options, true), options);
複製代碼

獲得整合好本地config.ini文本配置的options對象。而後對其參數配置項進行設置。

  • plugin:讀取配置文件中的plugin配置(多條),對於每個plugin,要從新調用各自的initialize方法去按照新的配置初始化。
  • autostart_plugins:設置前面的初始化插件chain_plugin, http_plugin, net_plugin, producer_plugin,一樣分別調用他們的初始化函數設置新的配置。

(4)plugin initialize

承接上文,全部相關的plugin的執行各自的initialize。這個initialize函數是abstract_plugin的虛函數,而該虛函數被plugin類所複寫:

virtual void initialize(const variables_map& options) override {
    if(_state == registered) {//若是註冊過
       _state = initialized;
       static_cast<Impl*>(this)->plugin_requires([&](auto& plug){ plug.initialize(options); });// 先執行依賴plugin的initialize方法。
       static_cast<Impl*>(this)->plugin_initialize(options);// 調用自身的plugin_initialize方法實現。
       //ilog( "initializing plugin ${name}", ("name",name()) );
       app().plugin_initialized(*this);// 保存到一個initialized_plugins類成員變量中,用來按順序記錄已經開始啓動運行的plugin。
    }
    assert(_state == initialized); /// 若是插件未註冊,則不能執行initialize方法。
}
複製代碼

因此在plugin調用initialize函數的時候,是先執行的以上覆寫的plugin的虛函數。咱們這裏先設定幾個要跟蹤的plugin爲目標吧,不然plugin太多,望山止步。

目標:主要研究mongo_db_plugin,以及基礎plugin(chain_plugin, http_plugin, net_plugin, producer_plugin),路線是研究主plugin,如有額外的依賴plugin,看狀況控制研究的深淺程度。
複製代碼

(5)eosio::mongo_db_plugin::plugin_initialize

前面寫set_program_options()提到了mongo_db_plugin,這裏研究它的plugin_initialize方法。傳入的參數是結合了命令行以及本地config文件的合併配置項,按照此配置環境。

void mongo_db_plugin::plugin_initialize(const variables_map& options)
{
   try {
      if( options.count( "mongodb-uri" )) {//查mongodb-uri的配置,config.ini中有對應的。
         ilog( "initializing mongo_db_plugin" );
         my->configured = true;//設置標誌位:已配置

         if( options.at( "replay-blockchain" ).as<bool>() || options.at( "hard-replay-blockchain" ).as<bool>() || options.at( "delete-all-blocks" ).as<bool>() ) {//捕捉是否有replay-blockchain、hard-replay-blockchain、delete-all-blocks三個動做,有的話要判斷是否要擦出mongo歷史數據。
            if( options.at( "mongodb-wipe" ).as<bool>()) {//檢查擦除項mongodb-wipe的配置
               ilog( "Wiping mongo database on startup" );
               my->wipe_database_on_startup = true;//若是設置擦除,這裏設置本地標誌位wipe_database_on_startup
            } else if( options.count( "mongodb-block-start" ) == 0 ) {//若是設置是從0開始,檢查是否要所有擦除歷史數據。
               EOS_ASSERT( false, chain::plugin_config_exception, "--mongodb-wipe required with --replay-blockchain, --hard-replay-blockchain, or --delete-all-blocks"
                                 " --mongodb-wipe will remove all EOS collections from mongodb." );
            }
         }

         if( options.count( "abi-serializer-max-time-ms") == 0 ) {//eosio::chain_plugin的配置
            EOS_ASSERT(false, chain::plugin_config_exception, "--abi-serializer-max-time-ms required as default value not appropriate for parsing full blocks");
         }
         my->abi_serializer_max_time = app().get_plugin<chain_plugin>().get_abi_serializer_max_time();

         if( options.count( "mongodb-queue-size" )) {// queue大小
            my->queue_size = options.at( "mongodb-queue-size" ).as<uint32_t>();
         }
         if( options.count( "mongodb-block-start" )) {// mongo對應的開始區塊號
            my->start_block_num = options.at( "mongodb-block-start" ).as<uint32_t>();
         }
         if( my->start_block_num == 0 ) {
            my->start_block_reached = true;
         }

         std::string uri_str = options.at( "mongodb-uri" ).as<std::string>();
         ilog( "connecting to ${u}", ("u", uri_str));
         mongocxx::uri uri = mongocxx::uri{uri_str};
         my->db_name = uri.database();
         if( my->db_name.empty())
            my->db_name = "EOS";// 默認起的庫名字爲EOS,若是在mongodb-uri有配置的話就使用配置的名字。
         my->mongo_conn = mongocxx::client{uri};// 客戶端鏈接到mongod

         // controller中拉取得信號,在init函數中註冊信號機制,始終監聽鏈上信號,做出反應。
         chain_plugin* chain_plug = app().find_plugin<chain_plugin>();//檢查chain_plugin是否加載,chain_plugin是必要依賴,下面咱們要使用chain的數據。
         EOS_ASSERT( chain_plug, chain::missing_chain_plugin_exception, ""  );
         auto& chain = chain_plug->chain();// 得到chain實例
         my->chain_id.emplace( chain.get_chain_id());

         // accepted_block_connection對應了chain的signal,是boost提供的一種信號槽機制,這種connection對象有四個,見當前源碼的下方展現。
         my->accepted_block_connection.emplace( chain.accepted_block.connect( [&]( const chain::block_state_ptr& bs ) {// 創建connect,每當chain有accepted_block信號(這些信號是定義在controller.hpp中,稍後會介紹),即調用下面的函數。
            my->accepted_block( bs );// accepted_block認同block信息
         } ));
         my->irreversible_block_connection.emplace(//含義同上
               chain.irreversible_block.connect( [&]( const chain::block_state_ptr& bs ) {
                  my->applied_irreversible_block( bs );// applied_irreversible_block,應用不可逆區塊
               } ));
         my->accepted_transaction_connection.emplace(//含義同上
               chain.accepted_transaction.connect( [&]( const chain::transaction_metadata_ptr& t ) {
                  my->accepted_transaction( t );// accepted_transaction認同交易
               } ));
         my->applied_transaction_connection.emplace(//含義同上
               chain.applied_transaction.connect( [&]( const chain::transaction_trace_ptr& t ) {
                  my->applied_transaction( t );// applied_transaction,應用交易
               } ));

         if( my->wipe_database_on_startup ) {
            my->wipe_database();// 擦除mongo歷史數據
         }
         my->init();//初始化函數
      } else {
         wlog( "eosio::mongo_db_plugin configured, but no --mongodb-uri specified." );
         wlog( "mongo_db_plugin disabled." );
      }
   } FC_LOG_AND_RETHROW()
}
複製代碼

四個connection對象的聲明以下:

fc::optional<boost::signals2::scoped_connection> accepted_block_connection;
fc::optional<boost::signals2::scoped_connection> irreversible_block_connection;
fc::optional<boost::signals2::scoped_connection> accepted_transaction_connection;
fc::optional<boost::signals2::scoped_connection> applied_transaction_connection;
複製代碼

queue

這段代碼中涉及到四個函數分別是accepted_block,applied_irreversible_block,accepted_transaction,applied_transaction,他們都對應着對queue的操做,mongo_db_plugin_impl類成員定義了一下幾種queue:

std::deque<chain::transaction_metadata_ptr> transaction_metadata_queue;
std::deque<chain::transaction_metadata_ptr> transaction_metadata_process_queue;
std::deque<chain::transaction_trace_ptr> transaction_trace_queue;
std::deque<chain::transaction_trace_ptr> transaction_trace_process_queue;
std::deque<chain::block_state_ptr> block_state_queue;
std::deque<chain::block_state_ptr> block_state_process_queue;
std::deque<chain::block_state_ptr> irreversible_block_state_queue;
std::deque<chain::block_state_ptr> irreversible_block_state_process_queue;
複製代碼

queue是mongo_db_plugin本身定義的:

/**
 * 模板類Queue,能夠匹配以上咱們定義的多個queue類型。
 * 模板類Entry,能夠匹配block_state_ptr以及transaction_trace_ptr做爲被存儲實體類型。
 */
template<typename Queue, typename Entry>
void queue(boost::mutex& mtx, boost::condition_variable& condition, Queue& queue, const Entry& e, size_t queue_size) {
   int sleep_time = 100;//默認線程睡眠時間
   size_t last_queue_size = 0;
   boost::mutex::scoped_lock lock(mtx);//mutex鎖機制
   if (queue.size() > queue_size) {//若是超過了咱們設定的queue大小,則採起以下措施。
      lock.unlock();//先解鎖
      condition.notify_one();// 見下文對condition的介紹
      if (last_queue_size < queue.size()) {//說明queue的增長速度大於咱們程序消費處理的速度
         sleep_time += 100;//增長睡眠時間
      } else {
         sleep_time -= 100;//說明queue的增長速度小於咱們消費的速度,就要減小睡眠時間,儘快更新last_queue_size的值。
         if (sleep_time < 0) sleep_time = 100;
      }
      last_queue_size = queue.size();
      boost::this_thread::sleep_for(boost::chrono::milliseconds(sleep_time));//線程睡眠,睡眠的時間按照上面的機制定奪。
      lock.lock();//上鎖
   }
   queue.emplace_back(e);//生效部分:插入到隊列中去。
   lock.unlock();//解鎖
   condition.notify_one();
}
複製代碼

mongo_db_plugin_impl::wipe_database()

真正執行擦除mongo歷史數據的函數,這個動做是由咱們配置mongodb-wipe參數來指定。擦除的函數體以下:

void mongo_db_plugin_impl::wipe_database() {
   ilog("mongo db wipe_database");

   // 定義的六張mongo的表類型,經過客戶端鏈接獲取到六張表的權限。
   auto block_states = mongo_conn[db_name][block_states_col];
   auto blocks = mongo_conn[db_name][blocks_col];
   auto trans = mongo_conn[db_name][trans_col];
   auto trans_traces = mongo_conn[db_name][trans_traces_col];
   auto actions = mongo_conn[db_name][actions_col];
   accounts = mongo_conn[db_name][accounts_col];

   // 分別刪除,執行drop動做。
   block_states.drop();
   blocks.drop();
   trans.drop();
   trans_traces.drop();
   actions.drop();
   accounts.drop();
}
複製代碼

mongo_db_plugin_impl::init()

源碼較多不粘貼,上面wipe_database函數,咱們刪除了六張表,在init函數中,咱們要對應的創建這六張表。表名初始化:

const std::string mongo_db_plugin_impl::block_states_col = "block_states";
const std::string mongo_db_plugin_impl::blocks_col = "blocks";
const std::string mongo_db_plugin_impl::trans_col = "transactions";
const std::string mongo_db_plugin_impl::trans_traces_col = "transaction_traces";
const std::string mongo_db_plugin_impl::actions_col = "actions";
const std::string mongo_db_plugin_impl::accounts_col = "accounts";
複製代碼

這就是劉張表對應的名字。這六張表在初始化創建時是一個總體操做,也就是說互爲依賴關係,accounts表先建立,經過

accounts = mongo_conn[db_name][accounts_col];
複製代碼

便可建立成功accounts表,其餘表亦然,後面不贅述。表數據是由make_document進行組裝的。首先咱們向accounts表插入一條數據,結構是name爲eosio,createAt是當前時間。

  • chain::config::system_account_name ).to_string()
  • std::chrono::duration_caststd::chrono::milliseconds(std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()});

經過insert_one方法將該條數據插入accounts表中。

接下來經過create_index方法對五張表創建索引,注意transaction_traces是沒有索引的,init操做時不涉及transaction_traces表。

auto blocks = mongo_conn[db_name][blocks_col]; // Blocks
blocks.create_index( bsoncxx::from_json( R"xxx({ "block_num" : 1 })xxx" ));
blocks.create_index( bsoncxx::from_json( R"xxx({ "block_id" : 1 })xxx" ));// block創建了兩個索引

auto block_stats = mongo_conn[db_name][block_states_col];
block_stats.create_index( bsoncxx::from_json( R"xxx({ "block_num" : 1 })xxx" ));
block_stats.create_index( bsoncxx::from_json( R"xxx({ "block_id" : 1 })xxx" ));// block_stats創建了兩個索引

// accounts indexes
accounts.create_index( bsoncxx::from_json( R"xxx({ "name" : 1 })xxx" ));

// transactions indexes
auto trans = mongo_conn[db_name][trans_col]; // Transactions
trans.create_index( bsoncxx::from_json( R"xxx({ "trx_id" : 1 })xxx" ));

auto actions = mongo_conn[db_name][actions_col];
actions.create_index( bsoncxx::from_json( R"xxx({ "trx_id" : 1 })xxx" ));
複製代碼

初始化準備就完成了,接下來要創建線程監聽出塊消息,同步到mongo數據庫中來。

ilog("starting db plugin thread");

consume_thread = boost::thread([this] { consume_blocks(); });

startup = false;// 結束,調用析構函數,關閉mongo_db_plugin:設定標誌位done = true;
複製代碼

(6)mongo_db_plugin_impl::consume_blocks()

上面init函數執行到最後時,開啓了一個線程,執行的是consume_blocks()函數,如字面含義這是消費區塊的函數。這個函數是一個無限循環,保持線程的存活,監聽queue的數據隨時消費同步到mongo數據庫中去,而queue的數據的是由上面plugin_initialize函數中的connect信號槽機制鏈接chain的出塊信號往queue裏面插入/同步鏈上數據。

condition

無線循環第一部分就是對condition.wait(lock)的操做,condition在上面queue的源碼中有一個notify_one()的調用,實際上就是與wait相互應的操做。

boost::mutex::scoped_lock lock(mtx);
while ( transaction_metadata_queue.empty() &&
         transaction_trace_queue.empty() &&
         block_state_queue.empty() &&
         irreversible_block_state_queue.empty() &&
         !done ) {
    condition.wait(lock);
}
複製代碼
消費區塊佔用了一個線程,這個線程在上面四個queue是空的時候而且done也沒有完成是flase的時候,該線程會經過condition來阻塞線程,參數是mutex的一個鎖。
複製代碼

condition.notify_one()會從新喚起這個阻塞的線程,而在mongo_db_plugin中,condition.notify_one()出現了3次:

  • queue模板類型,有了新的數據插入的時候。
  • 當queue模板類型的隊列超過設置值的時候,要主動喚起consume_block開啓消費線程加速消費(上面介紹queue的時候也談到了隊列大小超限時會增長queue插入的睡眠等待時間,這兩方面至關於針對中間隊列對兩邊進行開源節流,從而控制了隊列的大小)
  • ~mongo_db_plugin_impl()析構函數中
mongo_db_plugin_impl::~mongo_db_plugin_impl() {
   if (!startup) {//標誌位,上面init函數結尾有這個值的賦值。
      try {
         ilog( "mongo_db_plugin shutdown in process please be patient this can take a few minutes" );
         done = true;//設定標誌位done,consume_block()會使用到。
         condition.notify_one();// 喚醒consume_thread線程繼續消費掉queue中殘餘數據。

         consume_thread.join();// 掛起主線程,等待consume_thread線程執行完畢再喚起主線程。
      } catch( std::exception& e ) {
         elog( "Exception on mongo_db_plugin shutdown of consume thread: ${e}", ("e", e.what()));
      }
   }
}
複製代碼

process_queue準備

咱們要將鏈上的數據同步至mongo,是經過上面判斷是否爲空的那四個queue來作,爲了增長消費效率,進入consume_block函數之後,要先將數據move導入到一個process_queue中來慢慢處理,至關於一箇中轉站。

size_t transaction_metadata_size = transaction_metadata_queue.size();
if (transaction_metadata_size > 0) {
    transaction_metadata_process_queue = move(transaction_metadata_queue);
    transaction_metadata_queue.clear();
}
size_t transaction_trace_size = transaction_trace_queue.size();
if (transaction_trace_size > 0) {
    transaction_trace_process_queue = move(transaction_trace_queue);
    transaction_trace_queue.clear();
}
size_t block_state_size = block_state_queue.size();
if (block_state_size > 0) {
    block_state_process_queue = move(block_state_queue);
    block_state_queue.clear();
}
size_t irreversible_block_size = irreversible_block_state_queue.size();
if (irreversible_block_size > 0) {
    irreversible_block_state_process_queue = move(irreversible_block_state_queue);
    irreversible_block_state_queue.clear();
}
複製代碼

隊列大小報警器

接下來是一個針對四個源隊列的大小進行一個監控,當任意超過限額的75%時,會觸發報警,打印到控制檯。

分發到具體執行函數消費隊列

接下來,就是將上面的四個中轉的process_queue的數據分別分發到不一樣的函數(對應下面四個_process函數)中去消費處理。最後每一箇中轉隊列處理一條,就pop出去一條,都處理結束之後,會再次判斷四個源隊列的大小是否爲空,都消費完了,同時也得有析構函數的done標誌位爲true,纔會中斷consume_thread線程的consume_block()的無線循環。

1. mongo_db_plugin_impl::_process_accepted_transaction() 執行接收交易, 須要start_block_reached標識位爲true。源碼較長不粘貼,語言介紹一下,該函數的主要工做是得到mongo的鏈接以及庫表對象,同時解析傳入的const chain::transaction_metadata_ptr& t 對象,該對象的路線是:

chain=>signal=>mongo_db_plugin connect signal=>queue=>process_queue=>遍歷出一條數據便是t
複製代碼

得到這個對象之後,也準備好了mongo數據庫的鏈接庫表對象,剩下的工做就是從t解析導入mongo庫表了。

mongo做爲列存儲的nosql文件數據庫,這裏只接收document類型
複製代碼

這裏建立了一個它的對象act_doc,解析過程:

  • 鏈數據對象的解析
const auto trx_id = t->id;
const auto trx_id_str = trx_id.str();
const auto& trx = t->trx;
const chain::transaction_header& trx_header = trx;
複製代碼
  • mongo數據庫存儲結構的定義,值數據的傳入,經過process_action函數進行處理,
act_doc.append( kvp( "action_num", b_int32{act_num} ), kvp( "trx_id", trx_id_str ));
act_doc.append( kvp( "cfa", b_bool{cfa} ));
act_doc.append( kvp( "account", act.account.to_string()));
act_doc.append( kvp( "name", act.name.to_string()));
act_doc.append( kvp( "authorization", [&act]( bsoncxx::builder::basic::sub_array subarr ) {
    for( const auto& auth : act.authorization ) {
        subarr.append( [&auth]( bsoncxx::builder::basic::sub_document subdoc ) {
          subdoc.append( kvp( "actor", auth.actor.to_string()),
                 kvp( "permission", auth.permission.to_string()));
        } );
    }
} ));
複製代碼

process_action函數處理的是action數據的匹配,而若是action涉及到新帳戶的建立,這部分要在process_action函數中繼續經過update_account函數進行處理。update_account函數只會過濾由system合約執行的newaccount動做,system合約默認是由chain::config::system_account_name(就是eosio)來建立的。因此過濾後的action的結構以下:

field value
account eosio
name newaccount

而後會同步在mongo的accounts表中添加一條記錄,要有當時的添加時間createdAt。添加以前,要根據這個用戶名去mongo中查找,經過函數find_account,若是查找到了則update,未查到就insert。

auto find_account(mongocxx::collection& accounts, const account_name& name) {
    using bsoncxx::builder::basic::make_document;
    using bsoncxx::builder::basic::kvp;
    return accounts.find_one( make_document( kvp( "name", name.to_string())));
}
複製代碼

接着,是transaction表的數據插入,這個工做是對trans_doc文本類型變量的設置:

  • trx_id設置
  • irreversible設置
  • transaction_header設置
  • signing_keys設置
  • actions設置:遍歷源trx的actions,每一項去調用上面定義的process_action函數執行action數據的處理髮到action_array變量中,賦給actions。
  • context_free_actions,與action的處理過程差很少。
  • transaction_extensions設置
  • signatures
  • context_free_data
  • createdAt

整合完畢,將trans_doc插入到transaction表中去。整個_process_accepted_transaction執行完畢,其中涉及到了transaction, action, accounts三張表的內容的增長與修改。

2. mongo_db_plugin_impl::_process_applied_transaction 執行應用交易,須要start_block_reached標識位爲true。這個函數是對mongo中transaction_traces表的操做。一樣的,是經過一個文本類型變量trans_traces_doc操做。這個函數的參數傳入是transaction_trace_ptr類型的對象t(對應的上面_process_accepted_transaction接收的是transaction_metadata_ptr類型的)

abi_serializer::to_variant, 轉化成abi格式的json數據。
abi_serializer::from_variant, 經過abi格式的json數據轉換出來對應的對象數據。
複製代碼

3. mongo_db_plugin_impl::_process_accepted_block

這裏先要從process_accepted_block函數進入,上面的下劃線_開頭的函數都是又沒有下劃線的相同名字的函數調用的,只是他們除了調用之外都是一些異常的處理,日誌的輸出工做。而process_accepted_block函數有了簡單的邏輯,就是根據標誌位start_block_reached做出了處理。前面咱們介紹plugin_initialize函數的時候,經過配置文件的配置項"mongodb-block-start",咱們設定了全局變量start_block_num做爲標誌位。這裏面就是對於這個參數值的一個判斷,若是達到了這個設定的起始區塊,則設定全局變量標誌位start_block_reached爲true,那麼就能夠進入到_process_accepted_block函數進行處理了。
複製代碼

這個函數是接收區塊處理。傳入的參數爲block_state_ptr類型的對象bs。它的路線與上面介紹過的其餘函數的參數t是相同的,只是類結構不一樣,存的數據不一樣。該函數涉及到mongo的兩張表,一個是block_states,另外一個是blocks。咱們分別來研究。

  • block_state_doc
mongocxx::options::update update_opts{};
   update_opts.upsert( true );// upsert模式爲true,表明update操做若是未找到對象則新增一條數據。
   
   auto block_states = mongo_conn[db_name][block_states_col];
   auto block_state_doc = bsoncxx::builder::basic::document{};
   // 數據結構映射
   block_state_doc.append(kvp( "block_num", b_int32{static_cast<int32_t>(block_num)} ),
                          kvp( "block_id", block_id_str ),
                          kvp( "validated", b_bool{bs->validated} ),
                          kvp( "in_current_chain", b_bool{bs->in_current_chain} ));

   auto json = fc::json::to_string( bhs );
   try {
      const auto& value = bsoncxx::from_json( json );
      block_state_doc.append( kvp( "block_header_state", value ));// 追加block_header_state的值
   } catch( bsoncxx::exception& ) {
      try {
         json = fc::prune_invalid_utf8(json);
         const auto& value = bsoncxx::from_json( json );
         block_state_doc.append( kvp( "block_header_state", value ));
         block_state_doc.append( kvp( "non-utf8-purged", b_bool{true}));
      } catch( bsoncxx::exception& e ) {
         elog( "Unable to convert block_header_state JSON to MongoDB JSON: ${e}", ("e", e.what()));
         elog( " JSON: ${j}", ("j", json));
      }
   }
   block_state_doc.append(kvp( "createdAt", b_date{now} ));// 追加createdAt的值

   try {
      // update_one, 沒有查詢到相關數據則直接新增一條
      if( !block_states.update_one( make_document( kvp( "block_id", block_id_str )),
                                    make_document( kvp( "$set", block_state_doc.view())), update_opts )) {
         EOS_ASSERT( false, chain::mongo_db_insert_fail, "Failed to insert block_state ${bid}", ("bid", block_id));
      }
   } catch(...) {
      handle_mongo_exception("block_states insert: " + json, __LINE__);
   }
複製代碼
  • block_doc
auto blocks = mongo_conn[db_name][blocks_col];
   auto block_doc = bsoncxx::builder::basic::document{};
   // 數據結構映射
   block_doc.append(kvp( "block_num", b_int32{static_cast<int32_t>(block_num)} ),
                    kvp( "block_id", block_id_str ),
                    kvp( "irreversible", b_bool{false} ));

   auto v = to_variant_with_abi( *bs->block, accounts, abi_serializer_max_time );// 轉化爲abi格式的數據存儲。
   json = fc::json::to_string( v );
   try {
      const auto& value = bsoncxx::from_json( json );
      block_doc.append( kvp( "block", value ));// 追加block的值,爲json
   } catch( bsoncxx::exception& ) {
      try {
         json = fc::prune_invalid_utf8(json);
         const auto& value = bsoncxx::from_json( json );
         block_doc.append( kvp( "block", value ));
         block_doc.append( kvp( "non-utf8-purged", b_bool{true}));
      } catch( bsoncxx::exception& e ) {
         elog( "Unable to convert block JSON to MongoDB JSON: ${e}", ("e", e.what()));
         elog( " JSON: ${j}", ("j", json));
      }
   }
   block_doc.append(kvp( "createdAt", b_date{now} ));// 追加createdAt的值

   try {
      // update_one, 沒有查詢到相關數據則直接新增一條
      if( !blocks.update_one( make_document( kvp( "block_id", block_id_str )),
                              make_document( kvp( "$set", block_doc.view())), update_opts )) {
         EOS_ASSERT( false, chain::mongo_db_insert_fail, "Failed to insert block ${bid}", ("bid", block_id));
      }
   } catch(...) {
      handle_mongo_exception("blocks insert: " + json, __LINE__);
   }
複製代碼

4. mongo_db_plugin_impl::_process_irreversible_block 執行不可逆區塊,,須要start_block_reached標識位爲true。涉及mongo的兩張表:blocks表和transactions表。

// 創世塊區塊號爲1,沒有信號到accepted_block處理。
if (block_num < 2) return;
複製代碼

傳入的參數,思想與上面的幾個函數設計是相同的,它的類型與上面的_process_accepted_block函數相同,是block_state_ptr類型的對象bs。從bs中獲取到區塊,首先會經過find_block去mongo中查詢,若是有的話就再也不處理。

  • blocks 數據映射更新插入。因爲它是在_process_accepted_block函數的後面執行,因此語句update_opts.upsert( true );在這裏的update_one也是有效的。

bulk: 是一系列操做的集合。

mongocxx::options::bulk_write bulk_opts;
bulk_opts.ordered(false);// false說明能夠並行,全部操做互不影響。若爲true,則順序執行,一旦遇錯直接停止,後面的操做不會被執行到。
auto bulk = trans.create_bulk_write(bulk_opts);//全部的操做針對的是trans對象,對應的mongo表爲transactions。
複製代碼
auto update_doc = make_document( kvp( "$set", make_document( kvp( "irreversible", b_bool{true} ),
                                                                kvp( "validated", b_bool{bs->validated} ),
                                                                kvp( "in_current_chain", b_bool{bs->in_current_chain} ),
                                                                kvp( "updatedAt", b_date{now}))));

blocks.update_one( make_document( kvp( "_id", ir_block->view()["_id"].get_oid())), update_doc.view());
複製代碼
  • transactions transactons是一個數組,一個block能夠包含不少條transaction,所以這裏要有個循環來處理。對於transaction在mongo中的存儲歷史,也有對應的find_transaction去mongo中查詢,若是有的話就再也不處理。
auto update_doc = make_document( kvp( "$set", make_document( kvp( "irreversible", b_bool{true} ),
                                                                      kvp( "block_id", block_id_str),
                                                                      kvp( "block_num", b_int32{static_cast<int32_t>(block_num)} ),
                                                                      kvp( "updatedAt", b_date{now}))));

 mongocxx::model::update_one update_op{ make_document(kvp("_id", ir_trans->view()["_id"].get_oid())), update_doc.view()};
複製代碼

最後經過在transaction循環中設定一個標誌位transactions_in_block來肯定transaction遍歷結束。

mongo_db_plugin總結

咱們是經過nodeos命令的initialize函數跟蹤到mongo_db_plugin的,關於mongo_db_plugin的一切,能夠總結爲順序:

1. set_program_option,設置配置參數
2. plugin_initialize,初始化使plugin配置參數生效,準備mongo鏈接,queue機制,信號槽監聽chain出塊action。
3. init,mongo庫表初始化,創建索引,定義了consume_thread線程用來消費queue區塊數據。initialize週期結束。
4. consume_block,線程觸發與等待策略,process_queue消費中轉策略,根據四種數據結構(即上文反覆提到的那四個結構)分發消費函數。
複製代碼
table function insert function update
transactions accepted_trx irreversible_block(bulk)
actions accepted_trx(bulk)
block_states accepted_block
blocks accepted_block irreversible_block
transaction_traces applied_trx
accounts accepted_trx
比較特殊的一個表是accounts,它能夠過濾actions內容,找到newaccount的action並保存帳戶到表裏。這給咱們以啓發,咱們能夠本身定義新的表來過濾本身須要的action,例如咱們本身寫的智能合約。
複製代碼

(六)initialize_logging()

日誌系統初始化。

void initialize_logging()
{
   auto config_path = app().get_logging_conf();
   if(fc::exists(config_path))
     fc::configure_logging(config_path); //故意不去捕捉異常
   for(auto iter : fc::get_appender_map())
     iter.second->initialize(app().get_io_service());
   // 重複以上代碼邏輯,利用boost::asio::signal\_set機制,async\_wait。
   logging_conf_loop();
}
複製代碼

(七)startup()

void application::startup() {
   try {
      for (auto plugin : initialized_plugins)//遍歷全部已初始化的插件,執行他們的startup函數。
         plugin->startup();
   } catch(...) {
      shutdown();//若有異常,則調用shutdown函數,清空容器,釋放資源。
      throw;
   }
}
複製代碼

這裏仍舊以mongo_db_plugin爲例,它的startup()是空。而對於其餘plugin而言,startup都有不少工做要作,例如producer_plugin和chain_plugin都很是重要,此外涉及到重要的控制器部分controller也須要仔細研究。因爲本文篇幅過長,咱們重點仍舊圍繞mongo_db_plugin來介紹整個nodeos啓動的生命週期。

(八)exec()

main入口函數執行到最後一個步驟:exec函數。

void application::exec() {
   std::shared_ptr<boost::asio::signal_set> sigint_set(new boost::asio::signal_set(*io_serv, SIGINT));
   sigint_set->async_wait([sigint_set,this](const boost::system::error_code& err, int num) {
     quit();
     sigint_set->cancel();
   });

   std::shared_ptr<boost::asio::signal_set> sigterm_set(new boost::asio::signal_set(*io_serv, SIGTERM));
   sigterm_set->async_wait([sigterm_set,this](const boost::system::error_code& err, int num) {
     quit();
     sigterm_set->cancel();
   });

   std::shared_ptr<boost::asio::signal_set> sigpipe_set(new boost::asio::signal_set(*io_serv, SIGPIPE));
   sigpipe_set->async_wait([sigpipe_set,this](const boost::system::error_code& err, int num) {
     quit();
     sigpipe_set->cancel();
   });

   io_serv->run();// 與上面initialize_logging的get_io_service()獲取到的io\_serv是同一個對象

   shutdown(); /// 同步推出
}
複製代碼

這個函數與initialize_logging的循環中涉及到相同的信號機制boost::asio::signal_set。

boost::asio::signal_set

boost庫的信號量技術。它要使用到boost::asio::io_service,這也是上面提到屢次的。信號量對象的初始化可參照前文一段代碼,以下:

std::shared_ptr<boost::asio::signal_set> sigint_set(new boost::asio::signal_set(*io_serv, SIGINT));
複製代碼

共享指針這裏不談了,感興趣的同窗請轉到這裏。它的構造函數是傳入了一個boost::asio::io_service以及一個信號number SIGINT。這個SIGINT的聲明爲:

#define SIGINT 2 /* Interrupt (ANSI). */
複製代碼

這個構造函數實現了向信號量集合中添加了一個信號2。

接着,我要經過async_wait來使用信號量。能夠貼上上面initialize_logging函數的logging_conf_loop函數。

void logging_conf_loop()
{
   std::shared_ptr<boost::asio::signal_set> sighup_set(new boost::asio::signal_set(app().get_io_service(), SIGHUP));
   sighup_set->async_wait([sighup_set](const boost::system::error_code& err, int /*num*/) {
      if(!err)
      {
         ilog("Received HUP. Reloading logging configuration.");
         auto config_path = app().get_logging_conf();
         if(fc::exists(config_path))
            ::detail::configure_logging(config_path);
         for(auto iter : fc::get_appender_map())
            iter.second->initialize(app().get_io_service());
         logging_conf_loop();
      }
   });
}
複製代碼

能夠直接經過sighup_set->async_wait的方式來使用。它的聲明定義是:

void (boost::system::error_code, int)) 
複製代碼

會在所監聽的信號觸發時調用函數體。當發生錯誤的時候,退出logging_conf_loop函數的遞歸調用。

總結

寫到這裏,咱們的nodeos的命令就啓動成功了,因爲篇幅限制,咱們沒有仔細去研究全部依賴的plugin,以及controller的邏輯。本文重點研究了mongo_db_plugin的源碼實現,經過該插件,咱們全面分析了nodeos命令啓動的全部流程。而對於mongo_db_plugin插件自己的學習,咱們也明白了鏈數據是如何同步到mongo裏面的。接下來,我會繼續深刻分析其餘相關插件的初始化流程以及啓動流程,還有controller的邏輯細節,以及出塊邏輯等等。

參考資料

EOSIO/eos


相關文章和視頻推薦

【許曉笛】EOS 數據庫與持久化 API —— 實戰

圓方圓學院聚集大批區塊鏈名師,打造精品的區塊鏈技術課程。 在各大平臺都長期有優質免費公開課,歡迎報名收看。

公開課地址:ke.qq.com/course/3451…

相關文章
相關標籤/搜索