原文連接:醒者呆的博客園,www.cnblogs.com/Evsward/p/c…html
EOS提供了大量的rpc接口,其中功能性最強,使用最頻繁的一部分接口是EOS的行爲核心,由chain_api_plugin提供,具體實現是在chain_plugin。 關鍵字:EOS,區塊鏈,chain_plugin,chain_api_plugin,rpc,FC_REFLECT,反射,method模板,channel模板node
rpc調用邏輯,chainbase數據庫底層原理,nodeos啓動流程,plugin生命週期在前文都有介紹。本節直接研究chain_plugin的內容,研究入口會從chain_api_plugin中暴漏的rpc接口切入,這些接口是很是熟悉的,由於以前演練cleos相關命令時調用也是rpc。首先展現一下全部的接口內容:web
_http_plugin.add_api({
CHAIN_RO_CALL(get_info, 200l),
CHAIN_RO_CALL(get_block, 200),
CHAIN_RO_CALL(get_block_header_state, 200),
CHAIN_RO_CALL(get_account, 200),
CHAIN_RO_CALL(get_code, 200),
CHAIN_RO_CALL(get_code_hash, 200),
CHAIN_RO_CALL(get_abi, 200),
CHAIN_RO_CALL(get_raw_code_and_abi, 200),
CHAIN_RO_CALL(get_raw_abi, 200),
CHAIN_RO_CALL(get_table_rows, 200),
CHAIN_RO_CALL(get_table_by_scope, 200),
CHAIN_RO_CALL(get_currency_balance, 200),
CHAIN_RO_CALL(get_currency_stats, 200),
CHAIN_RO_CALL(get_producers, 200),
CHAIN_RO_CALL(get_producer_schedule, 200),
CHAIN_RO_CALL(get_scheduled_transactions, 200),
CHAIN_RO_CALL(abi_json_to_bin, 200),
CHAIN_RO_CALL(abi_bin_to_json, 200),
CHAIN_RO_CALL(get_required_keys, 200),
CHAIN_RO_CALL(get_transaction_id, 200),
CHAIN_RW_CALL_ASYNC(push_block, chain_apis::read_write::push_block_results, 202),
CHAIN_RW_CALL_ASYNC(push_transaction, chain_apis::read_write::push_transaction_results, 202),
CHAIN_RW_CALL_ASYNC(push_transactions, chain_apis::read_write::push_transactions_results, 202)
});
複製代碼
這些接口能夠分爲兩類,一類是經過宏CHAIN_RO_CALL調用的,另外一類是經過宏CHAIN_RW_CALL_ASYNC調用。算法
#define CHAIN_RO_CALL(call_name, http_response_code) CALL(chain, ro_api, chain_apis::read_only, call_name, http_response_code)
複製代碼
採用同步只讀的方式調用宏CALL。call_name是調用的函數名,http_response_code是響應碼。下面進入宏CALL。數據庫
/**
* @attention 目前調用CALL函數的只有read_only應用。
* @param api_name "chain' * @param api_handle app().get_plugin<chain_plugin>().get_read_only_api(); * @param api_namespace chain_apis::read_only * @param call_name -INHERIT * @param http_response_code -INHERIT */ #define CALL(api_name, api_handle, api_namespace, call_name, http_response_code) \ {std::string("/v1/" #api_name "/" #call_name), \ /*拼接接口url:http://ip:port/v1/chain/{call_name}*/ \ /* * @param body:http請求體 * @param cb:回調函數,用於返回處理結果 */ \ [api_handle](string, string body, url_response_callback cb) mutable { \ api_handle.validate(); \ try { \ if (body.empty()) body = "{}"; \ /* * api_handle爲chain_plugin中read_only類的實例 * call_name爲函數名,實現體找chain_plugin.cpp文件 * 函數參數1個:此處規定了一個命名規則,接口名加入後綴_param即爲請求參數結構 */ \ auto result = api_handle.call_name(fc::json::from_string(body).as<api_namespace::call_name ## _params>()); \ /*回調函數返回處理結果,此處也規定了一個命名規則,接口名加入後綴_result即爲返回結構,轉化爲json的格式返回。*/ \ cb(http_response_code, fc::json::to_string(result)); \ } catch (...) { \ /*捕捉到異常,調用http_plugin的異常處理函數handle_exception*/ \ http_plugin::handle_exception(#api_name, #call_name, body, cb); \ } \ } \ } 複製代碼
api_handle參數編程
同步只讀的請求傳入的api_handle參數值爲ro_api變量,該變量是在chain_api_plugin插件啓動chain_api_plugin::plugin_startup時(插件的生命週期前文已有介紹)初始化的,json
auto ro_api = app().get_plugin<chain_plugin>().get_read_only_api();
api
app()函數以及與application類相關的內容前文已經介紹過,經過get_plugin<chain_plugin>獲取chain_plugin的實例,而後調用其成員函數get_read_only_api(),數組
chain_apis::read_only get_read_only_api() const { return chain_apis::read_only(chain(), get_abi_serializer_max_time()); } //注意const修飾符,函數體內返回值是不可修改的。
緩存
返回的是chain_apis::read_only構造函數返回的read_only實例。類read_only中包含了全部基於只讀機制的接口實現,與上面接口列表中聲明的保持一致。
read_only(const controller& db, const fc::microseconds& abi_serializer_max_time) : db(db), abi_serializer_max_time(abi_serializer_max_time) {}
所以,最後傳入CALL宏的api_handle參數值實際就是這個類read_only的實例。以後使用該實例去調用call_name,就是簡單的實例調用自身成員函數(通常這個成員函數是聲明和實現都有的)的邏輯了。
(2) CHAIN_RW_CALL_ASYNC
#define CHAIN_RW_CALL_ASYNC(call_name, call_result, http_response_code) CALL_ASYNC(chain, rw_api, chain_apis::read_write, call_name, call_result, http_response_code)
複製代碼
採用異步讀寫的方式調用異步處理宏CALL_ASYNC。call_name是調用的函數名,call_result傳入聲明的結果接收體(例如chain_apis::read_write::push_transaction_results),http_response_code是響應碼。下面進入宏CALL_ASYNC。
/**
* @attention 目前調用CALL_ASYNC函數的只有read_write的應用。
* @param api_name "chain' * @param api_handle app().get_plugin<chain_plugin>().get_read_write_api(); * @param api_namespace chain_apis::read_write * @param call_name -INHERIT * @param call_result -INHERIT * @param http_response_code -INHERIT */ #define CALL_ASYNC(api_name, api_handle, api_namespace, call_name, call_result, http_response_code) \ {std::string("/v1/" #api_name "/" #call_name), \ /*同上,拼接接口url:http://ip:port/v1/chain/{call_name}*/ \ /* * http處理請求的函數結構不變,同上。 * @param body:http請求體 * @param cb:回調函數,用於返回處理結果 */ \ [api_handle](string, string body, url_response_callback cb) mutable { \ if (body.empty()) body = "{}"; \ api_handle.validate(); \ /* * api_handle爲chain_plugin中read_only類的實例 * call_name爲函數名,實現體找chain_plugin.cpp文件 * 函數參數2個: * @param 此處規定了一個命名規則,接口名加入後綴_param即爲請求參數結構 * @param lambda表達式,將cb和body按值傳遞進內部函數,該內部函數總體做爲異步操做的回調函數,注意與http的回調函數cb區分。 */ \ api_handle.call_name(fc::json::from_string(body).as<api_namespace::call_name ## _params>(),\ [cb, body](const fc::static_variant<fc::exception_ptr, call_result>& result){\ /*捕獲異常,分發異常處理*/ \ if (result.contains<fc::exception_ptr>()) {\ try {\ result.get<fc::exception_ptr>()->dynamic_rethrow_exception();\ } catch (...) {\ http_plugin::handle_exception(#api_name, #call_name, body, cb);\ }\ } else {\ /* * 異步處理成功,經過http的回調函數cb返回結果。 */ \ cb(http_response_code, result.visit(async_result_visitor()));\ }\ });\ }\ } 複製代碼
其中最後處理結果的語句比較使人好奇result.visit(async_result_visitor())
result的類型是:const fc::static_variant<fc::exception_ptr, call_result>&
async_result_visitor()函數:
struct async_result_visitor : public fc::visitor<std::string> {
template<typename T>
std::string operator()(const T& v) const {
return fc::json::to_string(v); //與CALL處理返回結果相同的是,此處作的也是轉換json的工做。
}
};
複製代碼
接着,進入fc庫的static_variant.hpp文件中尋找類static_variant,它包含一個模板函數visit:
template<typename visitor>
typename visitor::result_type visit(const visitor& v)const {
return impl::storage_ops<0, Types...>::apply(_tag, storage, v);
}
複製代碼
異步處理將處理結果轉型放置在結果容器中。
api_handle參數
異步讀寫的請求傳入的api_handle參數值爲rw_api變量,該變量是在chain_api_plugin插件啓動chain_api_plugin::plugin_startup時(插件的生命週期前文已有介紹)初始化的,
auto rw_api = app().get_plugin<chain_plugin>().get_read_write_api();
app()函數以及與application類相關的內容前文已經介紹過,經過get_plugin<chain_plugin>獲取chain_plugin的實例,而後調用其成員函數get_read_write_api(),
chain_apis::read_write get_read_write_api() { return chain_apis::read_write(chain(), get_abi_serializer_max_time()); }
返回的是chain_apis::read_write構造函數返回的read_write實例。類read_write中包含了全部基於讀寫機制的接口實現,與上面接口列表中聲明的保持一致。
read_write(controller& db, const fc::microseconds& abi_serializer_max_time): db(db), abi_serializer_max_time(abi_serializer_max_time) {}
所以,最後傳入CALL_ASYNC宏的api_handle參數值實際就是這個類read_write的實例。以後使用該實例去調用call_name,就是簡單的實例調用自身成員函數(通常這個成員函數是聲明和實現都有的)的邏輯了。
chain_api_plugin生命週期
FC_REFLECT爲結構體提供序列化成員的能力。
複製代碼
FC_REFLECT是FC庫中提供反射功能的宏。反射的意義在於瞭解一個未知的對象,反射是不限編程語言的,經過反射可以獲取到對象的成員結構。宏#define FC_REFLECT( TYPE, MEMBERS )
內部又調用了宏#define FC_REFLECT_DERIVED( TYPE, INHERITS, MEMBERS )
,反射功能的具體實現就不深刻探究了。下面來看其應用,舉個例子:
FC_REFLECT( eosio::chain_apis::read_only::get_required_keys_params, (transaction)(available_keys) )
FC_REFLECT( eosio::chain_apis::read_only::get_required_keys_result, (required_keys) )
複製代碼
兩行代碼分別包含了關於get_required_keys的兩個結構體,
struct get_required_keys_params {
fc::variant transaction;
flat_set<public_key_type> available_keys;
};
struct get_required_keys_result {
flat_set<public_key_type> required_keys;
};
複製代碼
get_required_keys是chain的RPC接口,結構體get_required_keys_params是該接口的請求參數的結構,而另外一個get_required_keys_result是接口處理後返回的結構。
回過頭繼續看FC_REFLECT的兩行代碼,第一個參數傳入的是結構體。第二個參數用圓括號包含,能夠有多個,內容與結構體的成員一致。
FC_REFLECT實際上實現了面向對象編程中類成員的getter/setter方法。
複製代碼
與基類定義的生命週期相同,也包含四個階段。
在nodeos程序調試部分有詳細介紹。主要是添加chain_plugin相關的配置參數,一組是命令行的,另外一組是來自配置文件的,其中命令行的配置項優先級更高。
這個函數也是從nodeos程序入口而來,會傳入配置項調用chain_plugin的初始化函數。初始胡函數獲取到來自命令行和配置文件的中和配置參數之後,結合創世塊配置,逐一處理相關參數邏輯。這些參數對應的處理邏輯以下表(對應controller的成員屬性介紹)所示:
param | explanation | detail |
---|---|---|
action-blacklist | 添加action黑名單 | 每一條數據是有帳戶和action名組成 |
key-blacklist | 公鑰黑名單 | 公鑰集合 |
blocks-dir | 設置數據目錄 | 最終會處理爲絕對路徑保存到內存 |
checkpoint | 檢查點 | 緩存區塊的檢查點,用於快速掃描 |
wasm-runtime | 虛擬機類型 | 能夠指定運行時webassembly虛擬機類型 |
abi-serializer-max-time-ms | abi序列化最大時間 | 要提升這個數值防止abi序列化失敗 |
chain-state-db-size-mb | 鏈狀態庫大小 | 基於chainbase的狀態主庫的大小 |
chain-state-db-guard-size-mb | 鏈狀態庫守衛大小 | 也是controller中提到的未包含在公開屬性中的 |
reversible-blocks-db-size-mb | 鏈可逆區塊庫大小 | 鏈可逆區塊庫也是基於chainbase的狀態數據庫 |
reversible-blocks-db-guard-size-mb | 鏈可逆區塊庫守衛大小 | 也是controller中提到的未包含在公開屬性中的 |
force-all-checks | 是否強制執行全部檢查 | 默認爲false |
disable-replay-opts | 是否禁止重播參數 | 默認爲false |
contracts-console | 是否容許合約輸出到控制檯 | 通常爲了調試合約使用,默認爲false |
disable-ram-billing-notify-checks | 是否容許內存帳單通知 | 默認爲false |
extract-genesis-json/print-genesis-json | 輸出創世塊配置 | 以json格式輸出 |
export-reversible-blocks | 導出可逆區塊到路徑 | 將可逆區塊目錄reversible中數據導入到指定路徑 |
delete-all-blocks | 刪除全部區塊數據 | 重置區塊鏈 |
truncate-at-blocks | 區塊截取點 | 全部生效的指令都要截止到本參數設置的區塊號 |
hard-replay-blockchain | 強制重播區塊鏈 | 清空狀態庫,經過repair_log得到backup,搭配fix-reversible-blocks從backup中恢復可逆區塊到區塊目錄。 |
replay-blockchain | 重播區塊鏈 | 清空狀態庫,搭配fix-reversible-blocks從原區塊目錄的可逆區塊目錄自我修復 |
fix-reversible-blocks | 修復可逆區塊 | 調用函數recover_reversible_blocks傳入源路徑和新路徑,可逆緩存大小,以及是否有截取點truncate-at-blocks |
import-reversible-blocks | 導入可逆區塊路徑(必須獨立使用,沒有其餘參數命令) | 清空可逆區塊目錄,調用import_reversible_blocks函數導入 |
snapshot | 指定導入的快照路徑 | 在controller的快照部分有詳述 |
genesis-json | 指定創世塊配置文件 | 從文件中導出創世塊的配置項到內存 |
genesis-timestamp | 指定創世塊的時間 | 一樣將該時間配置到內存中對應的變量 |
read-mode | 狀態主庫的讀取模式 | controller部分有詳述 |
validation-mode | 校驗模式 | controller部分有詳述 |
chain_plugin參數處理完畢後,設置方法提供者(並無找到該provider的應用)。接着轉播信號到頻道,爲chain_plugin_impl的惟一指針my的connection屬性賦值,建立信號槽。
chain_plugin的插件初始化工做完畢,主要是對chain_plugin的配置參數的處理,以及信號槽的實現。
chain_plugin插件的啓動,首先是快照的處理,這部分在快照的內容中有介紹,是根據nodeos過來的快照參數,判斷是否要加入快照參數調用controller的startup。這期間若有捕捉到異常,則執行controller的reset重置操做。而後根據controller的屬性輸出鏈日誌信息。
重置全部的信號槽,重置controller。
外部rpc調用經過chain_api_plugin插件包裹的接口服務,內部接口的實現是在chain_plugin中,對應關係是在chain_api_plugin的接口列表,經過函數名字匹配。
// 返回值爲read_only的實體成員get_info_results結構的實例。
read_only::get_info_results read_only::get_info(const read_only::get_info_params&) const {
const auto& rm = db.get_resource_limits_manager();
return {
// 如下字段都與get_info_results結構匹配,最終構造出get_info_results實例返回。
eosio::utilities::common::itoh(static_cast<uint32_t>(app().version())), // server_version
db.get_chain_id(), // chain_id
db.fork_db_head_block_num(), // head_block_num
db.last_irreversible_block_num(), // last_irreversible_block_num
db.last_irreversible_block_id(), // last_irreversible_block_id
db.fork_db_head_block_id(), // head_block_id
db.fork_db_head_block_time(), // head_block_time
db.fork_db_head_block_producer(), // head_block_producer
rm.get_virtual_block_cpu_limit(), // virtual_block_cpu_limit
rm.get_virtual_block_net_limit(), // virtual_block_net_limit
rm.get_block_cpu_limit(), // block_cpu_limit
rm.get_block_net_limit(), // block_net_limit
//std::bitset<64>(db.get_dynamic_global_properties().recent_slots_filled).to_string(), // recent_slots
//__builtin_popcountll(db.get_dynamic_global_properties().recent_slots_filled) / 64.0, // participation_rate
app().version_string(), // server_version_string
};
}
複製代碼
能夠看到get_info_results的部分字段是經過read_only::db對象獲取,還有一部分資源相關的內容是經過db的資源限制管理器得到,而關於版本方面的數據是從application實例得到。
// 特殊的是,此處並無建立一個get_block_result的結構體做爲返回值的容器,是利用了variant語法將signed_block_ptr轉換成可輸出的狀態。
fc::variant read_only::get_block(const read_only::get_block_params& params) const {
signed_block_ptr block;
// 若是參數block_num_or_id爲空或者block_num_or_id的長度大於64,屬於非法參數不處理,會報錯。
EOS_ASSERT(!params.block_num_or_id.empty() && params.block_num_or_id.size() <= 64, chain::block_id_type_exception, "Invalid Block number or ID, must be greater than 0 and less than 64 characters" );
try {
// 經過variant語法將參數block_num_or_id類型擦除而後經過as語法轉化爲block_id_type類型,
block = db.fetch_block_by_id(fc::variant(params.block_num_or_id).as<block_id_type>());
if (!block) {// 若是經過id的方法得到的block爲空,則嘗試使用區塊號的方式獲取。
block = db.fetch_block_by_number(fc::to_uint64(params.block_num_or_id));// 利用to_uint64將參數轉型。
}// 若是獲取失敗,拋出異常,無效的參數block_num_or_id
} EOS_RETHROW_EXCEPTIONS(chain::block_id_type_exception, "Invalid block ID: ${block_num_or_id}", ("block_num_or_id", params.block_num_or_id))
EOS_ASSERT( block, unknown_block_exception, "Could not find block: ${block}", ("block", params.block_num_or_id));
// 經過校驗,開始返回對象。
fc::variant pretty_output;
// i將結果block的數據經過resolver解析到pretty_output
abi_serializer::to_variant(*block, pretty_output, make_resolver(this, abi_serializer_max_time), abi_serializer_max_time);
// 引用區塊的前綴設置
uint32_t ref_block_prefix = block->id()._hash[1];
return fc::mutable_variant_object(pretty_output.get_object())
("id", block->id())
("block_num",block->block_num())
("ref_block_prefix", ref_block_prefix);
}
複製代碼
進一步研究區塊的id是如何生成的,以及如何經過id得到區塊號。是在block_heade.cpp中定義:
namespace eosio { namespace chain {
digest_type block_header::digest()const
{
return digest_type::hash(*this);// hash算法爲sha256,而後使用fc::raw::pack打包獲取結果
}
uint32_t block_header::num_from_id(const block_id_type& id)
{
return fc::endian_reverse_u32(id._hash[0]);// 其實是對區塊id併入區塊號的算法的逆向工程,得到區塊號。
}
// id的類型爲block_id_type
block_id_type block_header::id()const
{
// id不包括簽名區塊頭屬性,尤爲是生產者簽名除外。
block_id_type result = digest();//digest_type::hash(*this),this是id()的調用者。
result._hash[0] &= 0xffffffff00000000;//對結果進行位操做,併入一個十六進制頭。
result._hash[0] += fc::endian_reverse_u32(block_num()); // 經過上一個區塊id找到其區塊號而後自增得到當前區塊號。併入id數據成爲其一部分。
return result;
}
} }
複製代碼
get_block拼接好id、block_num、ref_block_prefix最後三個字段之後,返回的數據結構以下圖所示:
注意與上面的get_block的實現區分,get_block_header_state是經過fetch_block_state_by_number
或fetch_block_state_by_id
函數獲取到的是狀態庫中的區塊對象,也就是說是可逆區塊數據,而不是get_block經過fetch_block_by_number
或fetch_block_by_id
函數獲取到的不可逆區塊。 get_block_header_state獲取到可逆區塊之後,經過如下代碼獲得其區塊頭數據並返回。
fc::variant vo;
fc::to_variant( static_cast<const block_header_state&>(*b), vo );// block_state_ptr b;
return vo;
複製代碼
這個功能的實現函數代碼較長,但作的工做實際上並不複雜,能夠採用從返回的account數據結構來逆向分析該功能的實現方法:
struct get_account_results {
name account_name; // 帳戶名,入參的值。
uint32_t head_block_num = 0; // 頭塊號,controller的狀態主庫db獲取
fc::time_point head_block_time; // 頭塊時間,controller的狀態主庫db獲取
bool privileged = false; // 是否超級帳戶,默認false。controller的狀態主庫db獲取帳戶的屬性之一。
fc::time_point last_code_update; // 最後的code修改時間,例如給帳戶set contract的時間。controller的狀態主庫db獲取帳戶的屬性之一。
fc::time_point created; // 帳戶建立時間。controller的狀態主庫db獲取帳戶的屬性之一。
optional<asset> core_liquid_balance; // 主幣的餘額,在accounts狀態表裏查到的
int64_t ram_quota = 0; // 內存限額(資源相關部分有詳細介紹),從controller的資源管理器獲取
int64_t net_weight = 0; // 網絡帶寬資源權重,從controller的資源管理器獲取
int64_t cpu_weight = 0; // cpu資源權重,從controller的資源管理器獲取
account_resource_limit net_limit; // 網絡帶寬資源,包括已使用、剩餘可用、總量。從controller的資源管理器獲取
account_resource_limit cpu_limit; // cpu帶寬資源,包括已使用、剩餘可用、總量。從controller的資源管理器獲取
int64_t ram_usage = 0; // 內存已使用量。從controller的資源管理器獲取
vector<permission> permissions; // 帳戶的權限內容(帳戶多簽名部分有詳細介紹),在狀態主庫的表裏查到的。
fc::variant total_resources; // 總資源量,包括網絡、cpu、內存資源總量。在userres狀態表裏查到的
fc::variant self_delegated_bandwidth; // 自我抵押帶寬。在delband狀態表裏查到的
fc::variant refund_request; // 退款請求。在refunds狀態表裏查到的
fc::variant voter_info; // 投票相關。在voters狀態表裏查到的
};
複製代碼
注意該接口修改了源碼,不支持返回wast數據了。所以在請求該接口的時候,要使用的參數以下:
{
"account_name": "eosio.token",
"code_as_wasm": true
}
複製代碼
返回的數據將包括
read_only::get_code_results read_only::get_code( const get_code_params& params )const {
get_code_results result;
result.account_name = params.account_name;
const auto& d = db.db();
const auto& accnt = d.get<account_object,by_name>( params.account_name );// 從controller狀態庫中獲取帳戶信息
// 當前默認不支持返回wast數據
EOS_ASSERT( params.code_as_wasm, unsupported_feature, "Returning WAST from get_code is no longer supported" );
if( accnt.code.size() ) {
if (params.code_as_wasm) {
// 完整的原始帳戶的code的數據
result.wasm = string(accnt.code.begin(), accnt.code.end());
}
// 得到code的哈希值:將帳戶信息下的code的data和size值作sha256哈希獲得的值
result.code_hash = fc::sha256::hash( accnt.code.data(), accnt.code.size() );
}
// 獲取帳戶的abi數據:經過abi_serializer將帳戶的abi數據解析出來。
abi_def abi;
if( abi_serializer::to_abi(accnt.abi, abi) ) {
result.abi = std::move(abi);
}
return result;
}
複製代碼
實現方法參照get_code,返回數據只包括code的hash值。
###7. 得到帳戶的abi數據 get_abi
實現方法參照get_code,返回數據只包括帳戶的abi數據。
read_only::get_raw_code_and_abi_results read_only::get_raw_code_and_abi( const get_raw_code_and_abi_params& params)const {
get_raw_code_and_abi_results result;
result.account_name = params.account_name;
const auto& d = db.db();
const auto& accnt = d.get<account_object,by_name>(params.account_name);
result.wasm = blob{{accnt.code.begin(), accnt.code.end()}}; // 原始wasm值,完整取出便可。
result.abi = blob{{accnt.abi.begin(), accnt.abi.end()}}; // 原始abi值,完整取出便可。
return result;
}
複製代碼
實現方法參照get_raw_code_and_abi,返回數據只包括帳戶的原始abi數據。
首先查看該接口的傳入參數的數據結構:
struct get_table_rows_params {
bool json = false; // 是不是json的格式
name code; // 傳入code值,即擁有該table的帳戶名
string scope; // 傳入scope值,即查詢條件
name table; // 傳入table的名字
string table_key; // table主鍵
string lower_bound; // 設置檢索數據的下限,默認是first
string upper_bound; // 設置檢索數據的上限,默認是last
uint32_t limit = 10; // 數據結果的最大條目限制
string key_type; // 經過指定鍵的數據類型,定位查詢依賴的鍵
string index_position; // 經過傳入鍵的位置,,定位查詢依賴的鍵。1 - 主鍵(first), 2 - 二級索引 (multi_index定義), 3 - 三級索引,等等
string encode_type{"dec"}; //加密類型,有十進制仍是十六進制,默認是十進制dec。
};
複製代碼
除了code、scope、table之外都是可選的參數,這三個參數是定位檢索數據的關鍵,因此不可省略。下面來看該接口的返回值類型:
struct get_table_rows_result {
vector<fc::variant> rows; // 數據集合。一行是一條,不管是十六進制加密串仍是解析成json對象,都表明一行。
bool more = false; // 若是最後顯示的元素(受制於limit)並非數據庫中最後一個,則該字段會置爲true
};
複製代碼
進入接口實現的函數體,內容較多。首先經過傳入參數對象中的index_position字段來肯定查詢依賴的鍵,這是經過函數get_table_index_name完成的工做,同時會修改primary原對象的值,返回table名字的同時告知是不是主鍵(table的鍵的名字是與table名字相關的)。 接着,若是是主鍵:
// 對比入參對象的table名字是否與經過index反查的table名字保持一致。
EOS_ASSERT( p.table == table_with_index, chain::contract_table_query_exception, "Invalid table name ${t}", ( "t", p.table ));
auto table_type = get_table_type( abi, p.table );// 得到table類型
if( table_type == KEYi64 || p.key_type == "i64" || p.key_type == "name" ) {//支持這三種table類型
return get_table_rows_ex<key_value_index>(p,abi);// 具體檢索table的函數。
}
// 若是是已支持的三種table類型以外的,則會報錯。
EOS_ASSERT( false, chain::contract_table_query_exception, "Invalid table type ${type}", ("type",table_type)("abi",abi));
複製代碼
具體檢索table的函數get_table_rows_ex,這是很是重要的一個函數,須要源碼分析:
/**
* 檢索table的核心函數
* @tparam IndexType 模板類,支持不一樣的索引類型
* @param p get_table_rows接口入參對象
* @param abi 經過controller查詢入參code對應的程序abi
* @return 查詢結果
*/
template <typename IndexType>
read_only::get_table_rows_result get_table_rows_ex( const read_only::get_table_rows_params& p, const abi_def& abi )const {
read_only::get_table_rows_result result; // 首先定義結果容器
const auto& d = db.db(); // 狀態主庫對象
uint64_t scope = convert_to_type<uint64_t>(p.scope, "scope"); // 得到查詢條件。
abi_serializer abis;
abis.set_abi(abi, abi_serializer_max_time);// 將abi_def類型的abi經過序列化轉到abi_serializer類型的對象abis。
// 查詢狀態庫表的標準範式,返回的是經過code、scope、table檢索到的結果集的數據迭代器,
const auto* t_id = d.find<chain::table_id_object, chain::by_code_scope_table>(boost::make_tuple(p.code, scope, p.table));
if (t_id != nullptr) { // 迭代器不爲空
const auto& idx = d.get_index<IndexType, chain::by_scope_primary>(); // 傳入查詢依賴的鍵,指定迭代器的索引。
decltype(t_id->id) next_tid(t_id->id._id + 1);
auto lower = idx.lower_bound(boost::make_tuple(t_id->id)); // 獲取結果集上限
auto upper = idx.lower_bound(boost::make_tuple(next_tid)); // 獲取結果集下限
if (p.lower_bound.size()) {// 若是入參對象設置告終果集下限
if (p.key_type == "name") { // 主鍵類型是帳戶名字,設置下限對象
name s(p.lower_bound);
lower = idx.lower_bound( boost::make_tuple( t_id->id, s.value ));
} else {// 主鍵類型是其餘類型,設置下限對象
auto lv = convert_to_type<typename IndexType::value_type::key_type>( p.lower_bound, "lower_bound" );
lower = idx.lower_bound( boost::make_tuple( t_id->id, lv ));
}
}
if (p.upper_bound.size()) {// 若是入參對象設置告終果集上限
if (p.key_type == "name") {// 主鍵類型是帳戶名字,設置上限對象
name s(p.upper_bound);
upper = idx.lower_bound( boost::make_tuple( t_id->id, s.value ));
} else {// 主鍵類型是其餘類型,設置上限對象
auto uv = convert_to_type<typename IndexType::value_type::key_type>( p.upper_bound, "upper_bound" );
upper = idx.lower_bound( boost::make_tuple( t_id->id, uv ));
}
}
// 迭代器啓動迭代,開始檢索
vector<char> data;
auto end = fc::time_point::now() + fc::microseconds(1000 * 10); /// 10ms 是最長時間
unsigned int count = 0;
auto itr = lower;
for (; itr != upper; ++itr) {
copy_inline_row(*itr, data); // 將迭代器當前指針指向的對象複製到data容器中去。
if (p.json) { // 處理data爲json格式,經過方法binary_to_variant,向result的結果集rows中插入解析後的明文json格式的data
result.rows.emplace_back( abis.binary_to_variant( abis.get_table_type(p.table), data, abi_serializer_max_time, shorten_abi_errors ) );
} else { // 未要求json格式,則直接返回data,data不是可讀的。
result.rows.emplace_back(fc::variant(data));
}
if (++count == p.limit || fc::time_point::now() > end) { // 兩個限制:一是結果集行數limit限制,二是執行時間是否超時
++itr;
break;
}
}
if (itr != upper) { // 若是實際返回的結果集並無徹底輸出全部符合要求的數據,則將more字段置爲true,提醒用戶還有符合要求的數據沒顯示。
result.more = true;
}
}
return result;
}
複製代碼
繼續回到get_table_rows接口函數體,若是不是主鍵,則須要按照鍵類型來區分處理,鍵類型包括i64, i128, i256, float64, float128, ripemd160, sha256。這裏與主鍵不一樣的是,檢索table的核心函數改成get_table_rows_by_seckey,該函數與主鍵處理函數大部分邏輯是一致的,只是特殊在鍵的處理上,因爲該函數是處理二級索引的,所以要先經過代碼const auto& secidx = d.get_index<IndexType, chain::by_secondary>();得到二級索引。而後對迭代器數據集進行處理,得到結果集的循環起止位,最後循環導出結果集便可。
此處的scope並非前面理解的查詢條件,而是字面意思,表示一個範圍,上面提到了,在表數據中,範圍就是上限和下限以及條目限制。所以不難猜出get_table_by_scope接口的入參對象結構:
struct get_table_by_scope_params {
name code; // 必須字段,傳入帳戶名
name table = 0; // 可選,做爲過濾器
string lower_bound; // 範圍下限,可選
string upper_bound; // 範圍上限,可選
uint32_t limit = 10; // 範圍數量,限制條目
};
複製代碼
那麼處理結果集就簡單了,實際上就是上面函數get_table_rows_ex的一部分,取出相關結果集返回便可。
接口入參結構:
struct get_currency_balance_params {
name code; // 帳戶名,token合約的owner,通常是eosio.token帳戶
name account; // 帳戶名,檢索條件,查詢該帳戶的餘額
optional<string> symbol; // 檢索條件,須要的token符號,例如SYS(主幣),EOS等。
};
複製代碼
函數的處理邏輯:
vector<asset> read_only::get_currency_balance( const read_only::get_currency_balance_params& p )const {
const abi_def abi = eosio::chain_apis::get_abi( db, p.code ); // get_abi與前面RPC接口實現函數爲同一個。先經過帳戶code獲取eosio.token合約對象abi數據。
auto table_type = get_table_type( abi, "accounts" ); // 在abi中找到accounts表,返回該表的索引類型。
vector<asset> results; // 結果容器
walk_key_value_table(p.code, p.account, N(accounts), [&](const key_value_object& obj){
// 表數據的value值超過了assert數據類型的大小,說明是無效數據。
EOS_ASSERT( obj.value.size() >= sizeof(asset), chain::asset_type_exception, "Invalid data on table");
asset cursor;
// obj.value.data()是原始數據。
fc::datastream<const char *> ds(obj.value.data(), obj.value.size());
fc::raw::unpack(ds, cursor); // 將datastream數據解包到cursor
EOS_ASSERT( cursor.get_symbol().valid(), chain::asset_type_exception, "Invalid asset");
if( !p.symbol || boost::iequals(cursor.symbol_name(), *p.symbol) ) { // 對比token符號,一致的添加至結果集。
results.emplace_back(cursor);
}
return !(p.symbol && boost::iequals(cursor.symbol_name(), *p.symbol));
});
return results;
}
複製代碼
get_table_type函數:
string get_table_type( const abi_def& abi, const name& table_name ) {
for( const auto& t : abi.tables ) { //遍歷abi下的全部table
if( t.name == table_name ){ // 找到符合條件的table
return t.index_type; // 返回該table的索引類型。
}
}
// 若是沒查到,報錯提示當前ABI中並未找到目標table。
EOS_ASSERT( false, chain::contract_table_query_exception, "Table ${table} is not specified in the ABI", ("table",table_name) );
}
複製代碼
傳入eosio.token合約owner帳戶以及token符號便可請求到該token的狀態信息。
fc::variant read_only::get_currency_stats( const read_only::get_currency_stats_params& p )const {
fc::mutable_variant_object results; // 結果容器
const abi_def abi = eosio::chain_apis::get_abi( db, p.code );
auto table_type = get_table_type( abi, "stat" ); // 在abi的表中找到stat表,返回其索引類型。
uint64_t scope = ( eosio::chain::string_to_symbol( 0, boost::algorithm::to_upper_copy(p.symbol).c_str() ) >> 8 );
walk_key_value_table(p.code, scope, N(stat), [&](const key_value_object& obj){
EOS_ASSERT( obj.value.size() >= sizeof(read_only::get_currency_stats_result), chain::asset_type_exception, "Invalid data on table");
fc::datastream<const char *> ds(obj.value.data(), obj.value.size());
read_only::get_currency_stats_result result; // 接口的返回對象
fc::raw::unpack(ds, result.supply);// 已發行量
fc::raw::unpack(ds, result.max_supply);// 最大發行量
fc::raw::unpack(ds, result.issuer); // 發行人
results[result.supply.symbol_name()] = result; // 數組下標爲token符號,內容是token信息。
return true;
});
return results;
}
複製代碼
###14. 獲取生產者信息 get_producers
入參的結構有是否以json格式輸出的布爾類型對象、數據集下限、數據集條目限制,三個都是可選參數。該接口得到的是當前鏈的生產者信息。該接口的返回值是一個顯示全部生產者信息的列表,以及生產者投票總權重信息,最後也有一個more字段用於說明是否有更多未展現的符合條件的數據。
生產者信息是在system合約的producers表中存儲。
複製代碼
具體接口的實現函數較長且與前面獲取其餘狀態庫表數據的邏輯類似,不在這裏重複分析源碼。源碼中複雜的部分在於對各類二級索引的處理。
無請求參數,返回參數的結構有三個字段:
請求參數的結構:
struct get_scheduled_transactions_params {
bool json = false;
string lower_bound; // 傳入時間戳或者交易id。
uint32_t limit = 50;
};
複製代碼
返回值結構:
struct get_scheduled_transactions_result {
fc::variants transactions; // 事務數組
string more;
};
複製代碼
transactions的一個元素的結構爲:
auto row = fc::mutable_variant_object()
("trx_id", itr->trx_id)
("sender", itr->sender)
("sender_id", itr->sender_id)
("payer", itr->payer)
("delay_until", itr->delay_until)
("expiration", itr->expiration)
("published", itr->published)
;
複製代碼
結果集會根據是否按照json格式輸出而作出相應處理,若是不是json格式,要進行事務打包packed,這個以前也分析過。本接口實現函數內容較多,鑑於接口自己使用並不頻繁,這裏不展開研究。
入參結構:
struct abi_json_to_bin_params {
name code; // 合約owner帳戶
name action; // action名字
fc::variant args; // action參數,json明文格式
};
複製代碼
返回值就是二進制串集合。實現函數:
read_only::abi_json_to_bin_result read_only::abi_json_to_bin( const read_only::abi_json_to_bin_params& params )const try {
abi_json_to_bin_result result;
const auto code_account = db.db().find<account_object,by_name>( params.code ); // 找到合約owner帳戶
EOS_ASSERT(code_account != nullptr, contract_query_exception, "Contract can't be found ${contract}", ("contract", params.code));
abi_def abi;
if( abi_serializer::to_abi(code_account->abi, abi) ) {// 反序列化解析abi
abi_serializer abis( abi, abi_serializer_max_time );
auto action_type = abis.get_action_type(params.action); // 得到action類型,在abi的action中尋找目標action
EOS_ASSERT(!action_type.empty(), action_validate_exception, "Unknown action ${action} in contract ${contract}", ("action", params.action)("contract", params.code));
try {
result.binargs = abis.variant_to_binary( action_type, params.args, abi_serializer_max_time, shorten_abi_errors ); //將入參args由json轉爲二進制
} EOS_RETHROW_EXCEPTIONS(chain::invalid_action_args_exception,
"'${args}' is invalid args for action '${action}' code '${code}'. expected '${proto}'",
("args", params.args)("action", params.action)("code", params.code)("proto", action_abi_to_variant(abi, action_type)))
} else {
EOS_ASSERT(false, abi_not_found_exception, "No ABI found for ${contract}", ("contract", params.code));
}
return result;
} FC_RETHROW_EXCEPTIONS( warn, "code: ${code}, action: ${action}, args: ${args}",
("code", params.code)( "action", params.action )( "args", params.args ))
複製代碼
實際上的轉換工做是由variant_to_binary函數執行的。
功能正好與上一個接口相反。入參結構中惟一不一樣的字段是json格式的args改成了二進制類型的binargs,實際上這個二進制是字符的集合vector。返回值是json格式。函數實現與上面相似,再也不展現。實際的轉換工做是由binary_to_variant函數執行的。總結這兩個接口實現函數能夠發現,binary對應的就是二進制數據格式,而variant變體對應的是json格式。
傳入使用密鑰的transaction(json格式),以及當前支持的密鑰集合。
read_only::get_required_keys_result read_only::get_required_keys( const get_required_keys_params& params )const {
transaction pretty_input;
auto resolver = make_resolver(this, abi_serializer_max_time);
try {
abi_serializer::from_variant(params.transaction, pretty_input, resolver, abi_serializer_max_time);//根據明文json事務,經過abi序列化器將數據輸出到pretty_input,轉爲transaction對象。
} EOS_RETHROW_EXCEPTIONS(chain::transaction_type_exception, "Invalid transaction")
// 經過認證管理器得到必須密鑰
auto required_keys_set = db.get_authorization_manager().get_required_keys( pretty_input, params.available_keys, fc::seconds( pretty_input.delay_sec ));
get_required_keys_result result;
result.required_keys = required_keys_set;
return result;
}
複製代碼
因此核心處理函數爲認證管理器authorization_manager的get_required_keys函數:
flat_set<public_key_type> authorization_manager::get_required_keys( const transaction& trx,
const flat_set<public_key_type>& candidate_keys,
fc::microseconds provided_delay
)const
{
auto checker = make_auth_checker( [&](const permission_level& p){ return get_permission(p).auth; },// 獲取權限內容
_control.get_global_properties().configuration.max_authority_depth, // 當前全局屬性的最大權限深度
candidate_keys,
{},
provided_delay,
_noop_checktime
); // 獲取認證檢查器
for (const auto& act : trx.actions ) { // 遍歷事務的action
for (const auto& declared_auth : act.authorization) {
EOS_ASSERT( checker.satisfied(declared_auth), unsatisfied_authorization,
"transaction declares authority '${auth}', but does not have signatures for it.",
("auth", declared_auth) );// 若是在密鑰集合中發現沒有能知足任意action須要的權限的,即報錯提醒。
}
}
return checker.used_keys();
}
複製代碼
入參對象會轉爲transaction結構,返回對象是transaction_id_type,過程就很簡單了,由於自己transaction_id_type就是transaction的成員,所以將入參轉型後直接返回對象的調用便可。
入參爲chain::signed_block類型:
struct signed_block : public signed_block_header {
using signed_block_header::signed_block_header; // 簽名區塊頭
signed_block() = default; // 默認構造器
signed_block( const signed_block_header& h ):signed_block_header(h){} // 構造器,傳入簽名區塊頭
vector<transaction_receipt> transactions; // 包含收到事務的集合
extensions_type block_extensions; // 區塊擴展
};
複製代碼
該接口的返回值push_block_results爲空,沒有返回值。接口的函數實現:
void read_write::push_block(const read_write::push_block_params& params, next_function<read_write::push_block_results> next) {
try {
app().get_method<incoming::methods::block_sync>()(std::make_shared<signed_block>(params));// 命名空間incoming::methods下的成員block_sync
next(read_write::push_block_results{});// 調用next寫入結果,實際上結果爲空。
} catch ( boost::interprocess::bad_alloc& ) {
chain_plugin::handle_db_exhaustion();
} CATCH_AND_CALL(next);
}
複製代碼
查看incoming::methods命名空間下的成員block_sync:
namespace incoming {
namespace methods {
// 推送block到一個獨立的provider
using block_sync = method_decl<chain_plugin_interface, void(const signed_block_ptr&), first_provider_policy>;
}
}
複製代碼
繼續看method_decl的定義:
/**
* @tparam Tag - API鑑定器,用於區分相同方法的不一樣簽名
* @tparam FunctionSig - 方法簽名
* @tparam DispatchPolicy - 分發策略,規定了provider是如何被訪問的
*/
template< typename Tag, typename FunctionSig, template <typename> class DispatchPolicy = first_success_policy>
struct method_decl {
using method_type = method<FunctionSig, DispatchPolicy<FunctionSig>>;
using tag_type = Tag;
};
複製代碼
method_decl中調用了method模板,該特性是由appbase/method提供,它是一個鬆散的連接應用程序層級的函數。調用者Caller能夠抓取一個方法而且調用它,而提供者Providers可以抓取一個方法而後註冊它。method模板消除了應用程序中不一樣插件之間的耦合度,能夠在不一樣插件之間完成鬆散地函數調用。
method模板的使用方式以下圖:
實體A註冊了一個函數到method裏,使用FunctionSig做爲key。實體B傳入FunctionSig在method中尋找method並調用。一樣的,實體C、實體D均可以來調用,實體A並不關心誰來調用,它不會與調用者發生強關係。 回到:push_block,這一行代碼:
app().get_method
複製代碼
block_sync就是key,經過該鍵可以找到對應的method: app().get_method<incoming::methods::block_sync>()
。獲取到method之後,能夠直接調用,傳入參數,經過make_shared將rpc參數轉成signed_block對象的(共享)指針: std::make_shared<signed_block>(params)
。下面去找到key爲block_sync的method的位置,查找其相關的register語句:
my->_incoming_block_sync_provider = app().get_method<incoming::methods::block_sync>().register_provider([this](const signed_block_ptr& block){
my->on_incoming_block(block);
});
複製代碼
在producer_plugin中找到了method的註冊位置,真實調用的函數爲生產插件中的on_incoming_block函數,參數在外部處理傳入符合signed_block指針類型。
on_incoming_block函數
下面來看on_incoming_block函數。首先打印日誌,提醒告知接收到區塊的區塊號。而後區塊時間與本地節點時間對時,超過本地7秒開外的就終止程序,日誌提示。接着,獲取節點當前鏈環境:
chain::controller& chain = app().get_plugin<chain_plugin>().chain();
複製代碼
接下來,判斷本地節點是否已包含該區塊,
signed_block_ptr controller::fetch_block_by_id( block_id_type id )const {// 傳入區塊id
auto state = my->fork_db.get_block(id);// 在本地fork_db庫中查找,是否以前已接收到分叉庫了。
if( state && state->block ) return state->block; // 若是找到了,則返回區塊。
auto bptr = fetch_block_by_number( block_header::num_from_id(id) ); //將id轉爲區塊號,嘗試以區塊號來查找。
if( bptr && bptr->id() == id ) return bptr; // 以區塊號來查找而且找到了,則直接返回區塊。
return signed_block_ptr(); // 返回爲空的signed_block對象。
}
複製代碼
若是判斷存在,則終止程序。不存在能夠繼續處理。處理接收新區塊時,仍舊要丟棄掉pending狀態的區塊。
pending狀態區塊的優先級有時候很低,前面講到在寫入快照時,此處又提到接收新區塊時,都要將pending區塊先丟棄再進行。
複製代碼
總結全部須要先丟棄pending區塊的操做還有:
接着設置異常回調,若是發生異常則執行回調函數,迴歸正常計劃的出塊循環節奏:
auto ensure = fc::make_scoped_exit([this](){
schedule_production_loop(); // 正常計劃的出塊循環節奏。
});
複製代碼
接下來,向鏈推送目標區塊chain.push_block(block);
。異常處理,相關標誌位處理,日誌輸出結果。繼續回到push_block函數。
push_block函數
首先要判斷是不是pending狀態,推送區塊前要保證沒有pending區塊。接着校驗是否爲空區塊,區塊狀態是否爲incomplete。經過校驗後,發射預認可區塊信號,攜帶區塊對象。
emit( self.pre_accepted_block, b ); // 預認可區塊信號
接着,若是節點未特殊配置強制檢查以及區塊狀態爲不可逆irreversible或者檢驗過validated,則將區塊構建爲可信block_state對象加入到fork_db。經歷一系列校驗,執行auto inserted = my->index.insert(n)添加區塊到分叉庫建立的多索引庫fork_multi_index_type中,返回狀態區塊對象。回到push_block,檢查區塊生產者是否在可信生產者列表中,若是在,則將可信的生產者執行輕量級校驗的標誌位置爲true。而後發射認可區塊頭信號,並攜帶區塊狀態數據。
emit( self.accepted_block_header, new_header_state ); // 認可區塊頭信號
接着判斷若是當前數據庫讀取模式不是IRREVERSIBLE不可逆,則須要調用maybe_switch_forks處理分叉合併的事務。最後判斷若是區塊狀態爲irreversible,則發出第三個信號,不可逆區塊信號,並攜帶區塊數據。
emit( self.irreversible_block, new_header_state ); // 不可逆區塊信號
push_block函數執行完畢,共發射了三個信號,對應的是前文提到的controller維護的信號,經過信號槽機制,找到connection,並執行對應函數操做便可,信號槽機制曾屢次分析闡述,此處不展開。 push_block接口是推送本地的區塊處理,並未涉及到區塊鏈網絡節點的廣播。
該接口的函數實現方式以及採用語法特性與push_block類似,本段不重複介紹。該接口的入參類型是一個變體對象variant_object,也就是說它沒有像其餘接口那樣特別聲明參數結構,而是在函數實現中,加入了對象的構造過程,參數對象最終經過abi_serializer::from_variant被構形成packed_transaction打包事務類型。返回值結構是有定義的:
struct push_transaction_results {
chain::transaction_id_type transaction_id; // 事務id
fc::variant processed; // 加工過的事務對象
};
複製代碼
回到函數體,一樣是基於method模板的功能,在producer_plugin中找到transaction_async註冊的函數,傳入了處理好的打包事務對象,是否存留標誌位,用來接收返回值的next函數。實際調用了producer_plugin_impl::on_incoming_transaction_async函數。
on_incoming_transaction_async函數
該函數內容較多。首先,仍及是獲取節點當前鏈環境:
chain::controller& chain = app().get_plugin<chain_plugin>().chain();
接着,判斷當前鏈如果不存在pending塊,則增長到pending塊。接着推送區塊是經過channel模板的機制,這是與method模板想相似的機制。首先來看函數中該機制首次出現的位置:
_transaction_ack_channel.publish(std::pair<fc::exception_ptr, packed_transaction_ptr>(response.get<fc::exception_ptr>(), trx)); // 傳入事務對象trx
_transaction_ack_channel是當前實例的成員,找到當前實例的構造函數,發現該成員的初始化信息:
_transaction_ack_channel(app().get_channel<compat::channels::transaction_ack>())
app().get_channel的結構與上面介紹method的機制很是類似,查看transaction_ack的聲明:
namespace compat {
namespace channels {
using transaction_ack = channel_decl<struct accepted_transaction_tag, std::pair<fc::exception_ptr, packed_transaction_ptr>>;
}
}
複製代碼
該聲明與上面method相關key的聲明在同一個文件中,說明設計者的思路是有將他們歸爲一類的:都屬於解耦調用的橋樑。接着查看channel_decl:
/**
* @tparam Tag - API鑑定器,用於區分相同數據類型
* @tparam Data - channel攜帶的數據類型
* @tparam DispatchPolicy - 當前channel的分發策略。默認是drop_exceptions
*/
template< typename Tag, typename Data, typename DispatchPolicy = drop_exceptions >
struct channel_decl {
using channel_type = channel<Data, DispatchPolicy>;
using tag_type = Tag;
};
複製代碼
與method_decl很是類似了。具體channel機制的分析以下圖所示。
能夠看得出,channel的訂閱與發佈的模式,對應的是method的註冊和調用,主要區別在於主體的角色轉換。
channel的訂閱是要依賴頻道自己的內容發佈的,也就是說頻道是要先存在的,主體A能夠來訂閱,主體C、主體D均可以來訂閱,而與做爲發佈方的主體B無關,主體B不用知道有誰訂閱了。而method的註冊和調用正好是相反的。實際上對於本文研究到的channel和method,主體A都是producer_plugin。本例中,一個區塊被廣播出來,須要全部的訂閱者來執行本地的區塊接收操做,所以須要採用channel機制。
複製代碼
下面搜索transaction_ack頻道的訂閱處:
my->incoming_transaction_ack_subscription = app().get_channel<channels::transaction_ack> ().subscribe(boost::bind(&net_plugin_impl::transaction_ack, my.get(), _1));
延伸到實際的調用函數net_plugin_impl::transaction_ack
void net_plugin_impl::transaction_ack(const std::pair<fc::exception_ptr, packed_transaction_ptr>& results) {
transaction_id_type id = results.second->id();
if (results.first) { // first位置是用來放異常信息的,若是first不爲空則說明有異常。
fc_ilog(logger,"signaled NACK, trx-id = ${id} : ${why}",("id", id)("why", results.first->to_detail_string()));
dispatcher->rejected_transaction(id);// 調用rejected_transaction,從received_transactions接收事務集合中將其刪除。
} else {
fc_ilog(logger,"signaled ACK, trx-id = ${id}",("id", id));
dispatcher->bcast_transaction(*results.second);
}
}
複製代碼
bcast_transaction函數
調用廣播事務函數dispatch_manager::bcast_transaction。
void dispatch_manager::bcast_transaction (const packed_transaction& trx) {
std::set<connection_ptr> skips; // 跳過的數據集合
transaction_id_type id = trx.id();
auto range = received_transactions.equal_range(id);
for (auto org = range.first; org != range.second; ++org) {
skips.insert(org->second); // 在接收事務集合中找到對應id的事務數據遍歷放於skips。
}
received_transactions.erase(range.first, range.second); // 從received_transactions接收事務集合中將其刪除。
for (auto ref = req_trx.begin(); ref != req_trx.end(); ++ref) {
if (*ref == id) { // 本地請求事務集合中,找到目標事務刪除
req_trx.erase(ref);
break;
}
}
if( my_impl->local_txns.get<by_id>().find( id ) != my_impl->local_txns.end( ) ) {
fc_dlog(logger, "found trxid in local_trxs" );
return;// 在本地事務集合中找到目標事務了,終止沒必要重複處理。
}
uint32_t packsiz = 0;
uint32_t bufsiz = 0;
time_point_sec trx_expiration = trx.expiration();
net_message msg(trx);
packsiz = fc::raw::pack_size(msg);
bufsiz = packsiz + sizeof(packsiz);
vector<char> buff(bufsiz);
fc::datastream<char*> ds( buff.data(), bufsiz);
ds.write( reinterpret_cast<char*>(&packsiz), sizeof(packsiz) );
fc::raw::pack( ds, msg );// trx轉爲net_message結構,打包經過數據流ds到緩存buff中。
node_transaction_state nts = {id,
trx_expiration,
trx,
std::move(buff),
0, 0, 0};
my_impl->local_txns.insert(std::move(nts)); // 插入到本地事務集,net_plugin自定義的多索引庫node_transaction_index中。
if( !large_msg_notify || bufsiz <= just_send_it_max) { // max-implicit-request參數決定just_send_it_max,最大請求數量
my_impl->send_all( trx, [id, &skips, trx_expiration](connection_ptr c) -> bool {
if( skips.find(c) != skips.end() || c->syncing ) {// skips中一旦有了當前鏈接,或者connection正在同步中,則退出。
return false;
}
const auto& bs = c->trx_state.find(id); // 鏈接中的事務狀態多索引庫中尋找目標事務,返回事務數據
bool unknown = bs == c->trx_state.end();
if( unknown) {// 沒找到則插入
c->trx_state.insert(transaction_state({id,true,true,0,trx_expiration,time_point() }));
fc_dlog(logger, "sending whole trx to ${n}", ("n",c->peer_name() ) );
} else { // 找到則更新過時時間、狀態庫數據
update_txn_expiry ute(trx_expiration);
c->trx_state.modify(bs, ute);
}
return unknown;
});
}else {// 超過最大請求數量之後,不處理trx,而是pending_notify
notice_message pending_notify;
pending_notify.known_trx.mode = normal;
pending_notify.known_trx.ids.push_back( id );
pending_notify.known_blocks.mode = none;
my_impl->send_all(pending_notify, [id, &skips, trx_expiration](connection_ptr c) -> bool {
if (skips.find(c) != skips.end() || c->syncing) {
return false;
}
const auto& bs = c->trx_state.find(id);
bool unknown = bs == c->trx_state.end();
if( unknown) {
fc_dlog(logger, "sending notice to ${n}", ("n",c->peer_name() ) );
c->trx_state.insert(transaction_state({id,false,true,0,trx_expiration,time_point() }));
} else {
update_txn_expiry ute(trx_expiration);
c->trx_state.modify(bs, ute);
}
return unknown;
});
}
}
複製代碼
這個接口是針對網絡狀況不理想,一次請求但願攜帶更多事務的場景而設計的,實現函數是調用一個遞歸函數push_recurse遍歷傳入的transactions數組,每一個transaction最終仍舊會經過以上push_transaction函數逐一處理。
目前事務數組最多支持1000筆,多了報錯。
複製代碼
chain_plugin是EOS的核心,承載了大部分鏈相關的功能。本文按照rpc訪問的脈絡分析,從chain_api_plugin的rpc接口列表展開介紹,延伸到chain_plugin的接口實現,深刻分析了全部的rpc接口的背後實現邏輯,其中涉及到了FC_REFLECT反射技術,經過method模板關聯到了producer_plugin,經過channel模板技術關聯到了net_plugin。chain_plugin是核心鏈處理插件,本文在該範疇下進行了詳盡地調研,加深了對於fork_db,多索引庫以及各類出現的數據結構的理解。
圓方圓學院聚集大批區塊鏈名師,打造精品的區塊鏈技術課程。 在各大平臺都長期有優質免費公開課,歡迎報名收看。
公開課地址:ke.qq.com/course/3451…