eos源碼剖析之controller

controller::block_status,區塊狀態枚舉類,包括:

irreversible = 0,該區塊已經被當前節點應用,而且被認爲是不可逆的。
validated = 1,這是由一個有效生產者簽名的完整區塊,而且以前已經被當前節點應用,所以該區塊已被驗證但未成爲不可逆。
complete = 2,這是一個由有效生產者簽名的完整區塊,可是尚未成爲不可逆,也沒有被當前節點應用。
incomplete = 3,這是一個未完成的區塊,未被生產者簽名也沒有被某個節點生產。
實際上塊的狀態就是,1未簽名未應用 2已簽名未應用 3 已簽名和應用,未變成不可逆 4 變成不可逆mongodb

controller的私有成員:

apply_context,應用上下文,處理節點應用區塊的上下文環境,其中包含了迭代器緩存iterator_cache<key_value_object>
transaction_context,事務上下文環境。包括controller,db的session,signed_transaction等。
mutable_db(),返回一個chainbase::database的引用
controller_impl結構體的實例的惟一指針my。controller核心功能都是經過impl實現調用的。從fork_db好比獲取區塊信息
數據庫

controller的信號

controller 包括瞭如下幾個信號:
signal<void(const signed_block_ptr&)> pre_accepted_block; // 預認可區塊(認可其餘節點廣播過來的區塊是正確的)
signal<void(const block_state_ptr&)> accepted_block_header; // 認可區塊頭(對區塊頭作過校驗)
signal<void(const block_state_ptr&)> accepted_block; // 認可區塊
signal<void(const block_state_ptr&)> irreversible_block; // 不可逆區塊
signal<void(const transaction_metadata_ptr&)> accepted_transaction; // 認可事務
signal<void(const transaction_trace_ptr&)> applied_transaction; // 應用事務(認可其餘節點數據要先校驗,經過之後能夠應用在本地節點)
signal<void(const header_confirmation&)> accepted_confirmation; // 認可確認
signal<void(const int&)> bad_alloc; // 內存分配錯誤信號
全部信號的發射時機都是在controller中。
分別介紹每一個信號發射和接收時機緩存

  1. pre_accepted_block
    發射時機:push_block函數,在producer_plugin中 on_incoming_block中會對已籤區塊校驗,包括不能有pending塊,不能push空塊,區塊狀態不能是incomplete。經過校驗後,調用push_block會發射該信號,攜帶該區塊。
    on_incoming_block函數
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    void producer_plugin::on_incoming_block(const signed_block_ptr& block) {
    //判斷是否在fork_db中
    auto existing = chain.fetch_block_by_id( id );
    if( existing ) { return; }
    // 開始建立一個blockstate
    auto bsf = chain.create_block_state_future( block );
    // 清除 pending狀態,將交易放到unappliedtrx map中
    chain.abort_block();
    // 當拋出異常時,重啓loop
    auto ensure = fc::make_scoped_exit([this](){
    schedule_production_loop();
    });
    // push the new block
    bool except = false;
    try {
    chain.push_block( bsf );
    } catch ( const guard_exception& e ) {
    chain_plug->handle_guard_exception(e);
    return;
    } catch( const fc::exception& e ) {
    elog((e.to_detail_string()));
    except = true;
    } catch ( boost::interprocess::bad_alloc& ) {
    chain_plugin::handle_db_exhaustion();
    return;
    }

on_incoming_block作了這樣幾個事,
1 判斷forkdb是否有該新來的塊,
2 其次根據該塊投遞到線程池生成blockstate,
3 清除以前的pending狀態,pending中的交易取出放到unapplied map中.(之後會經過schedule_production_loop調用producer_plugin::start_block處理unapplied map,內部調用controller::start_block從新組織pending)
4 爲防止producer異常退出,設置schedule_production_loop重啓。
5 調用controller的push_blocksession

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void controller::push_block( std::future<block_state_ptr>& block_state_future ) {
controller::block_status s = controller::block_status::complete;
EOS_ASSERT(!pending, block_validate_exception, "it is not valid to push a block when there is a pending block");

auto reset_prod_light_validation = fc::make_scoped_exit([old_value=trusted_producer_light_validation, this]() {
trusted_producer_light_validation = old_value;
});
try {
block_state_ptr new_header_state = block_state_future.get();
auto& b = new_header_state->block;
emit( self.pre_accepted_block, b );

fork_db.add( new_header_state, false );

if (conf.trusted_producers.count(b->producer)) {
trusted_producer_light_validation = true;
};
emit( self.accepted_block_header, new_header_state );

if ( read_mode != db_read_mode::IRREVERSIBLE ) {
maybe_switch_forks( s );
}

} FC_LOG_AND_RETHROW( )
}

 

在push_block中發送了pre_accepted_block信號
插件捕捉處理: chain_plugin鏈接該信號,在plugin_initialize中綁定了信號的處理
可是該channel沒有訂閱者。app

1
2
3
4
5
6
7
8
9
10
11
12
13
// relay signals to channels
my->pre_accepted_block_connection = my->chain->pre_accepted_block.connect([this](const signed_block_ptr& blk) {
auto itr = my->loaded_checkpoints.find( blk->block_num() );
if( itr != my->loaded_checkpoints.end() ) {
auto id = blk->id();
EOS_ASSERT( itr->second == id, checkpoint_exception,
"Checkpoint does not match for block number ${num}: expected: ${expected} actual: ${actual}",
("num", blk->block_num())("expected", itr->second)("actual", id)
);
}

my->pre_accepted_block_channel.publish(priority::medium, blk);
});

 

  1. accepted_block_header

發射時機1: commit_block函數,函數

1
2
3
4
5
6
7
8
9
void commit_block( bool add_to_fork_db ) {
if (add_to_fork_db) {
pending->_pending_block_state->validated = true;
auto new_bsp = fork_db.add(pending->_pending_block_state, true);
emit(self.accepted_block_header, pending->_pending_block_state);
head = fork_db.head();
EOS_ASSERT(new_bsp == head, fork_database_exception, "committed block did not become the new head in fork database");
}
}

 

add_to_fork_db爲true,將_pending_block_state->validated設置爲true,_pending_block_state放入fork_db,而後發送accepted_block_headeroop

發射時機2: push_block函數,獲取區塊的可信狀態,發射完pre_accepted_block之後,添加可信狀態至fork_db,而後發射accepted_block_header信號,攜帶fork_db添加成功後返回的狀態區塊。fetch

插件捕捉處理: chain_plugin鏈接該信號,ui

1
2
3
4
5
6
void chain_plugin::plugin_initialize(const variables_map& options) {
my->accepted_block_header_connection = my->chain->accepted_block_header.connect(
[this]( const block_state_ptr& blk ) {
my->accepted_block_header_channel.publish( priority::medium, blk );
} );
}

 

由信號槽轉播到channel,accepted_block_header_channel發佈該區塊,bnet_plugin訂閱該channelthis

1
2
3
4
5
6
void bnet_plugin::plugin_startup() {
my->_on_accepted_block_header_handle = app().get_channel<channels::accepted_block_header>()
.subscribe( [this]( block_state_ptr s ){
my->on_accepted_block_header(s);
});
}

 

綁定了回調函數,回調函數內部調用bnet_plugin_impl的on_accepted_block_header(s);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void on_accepted_block_header( const block_state_ptr& s ) {
...
const auto& id = s->id;
if( fc::time_point::now() - s->block->timestamp < fc::seconds(6) ) {
auto itr = _block_status.find( id );
//_remote_request_irreversible_only(對端請求可逆)且(以前狀態集合中沒有該塊或沒收到過該塊)
if( !_remote_request_irreversible_only && ( itr == _block_status.end() || !itr->received_from_peer ) ) {
//加入通知集合,通知對方防止對方重複發送區塊頭
_block_header_notices.insert( id );
}
//加入狀態集合
if( itr == _block_status.end() ) {
_block_status.insert( block_status(id, false, false) );
}
}
}

 

  1. accepted_block
    發射時機: commit_block函數,fork_db以及重播的處理結束後,發射認可區塊的信號,攜帶pending狀態區塊數據。
    插件捕捉處理
    1 net_plugin鏈接該信號,綁定處理函數,打印日誌的同時調用dispatch_manager::bcast_block,傳入區塊數據。send_all向全部鏈接發送廣播
    2 chain_plugin鏈接該信號,由信號槽轉播到channel,accepted_block_channel發佈該區塊
    1
    2
    3
    4
    5
    void chain_plugin::plugin_initialize(const variables_map& options) {
    my->accepted_block_connection = my->chain->accepted_block.connect( [this]( const block_state_ptr& blk ) {
    my->accepted_block_channel.publish( priority::high, blk );
    } );
    }

bnet_plugin訂閱該channel

1
2
3
4
5
6
void bnet_plugin::plugin_startup() {
my->_on_accepted_block_handle = app().get_channel<channels::accepted_block>()
.subscribe( [this]( block_state_ptr s ){
my->on_accepted_block(s);
});
}

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void on_accepted_block( const block_state_ptr& s ) {
verify_strand_in_this_thread(_strand, __func__, __LINE__);
const auto& id = s->id;

_local_head_block_id = id;
_local_head_block_num = block_header::num_from_id(id);

if( _local_head_block_num < _last_sent_block_num ) {
_last_sent_block_num = _local_lib;
_last_sent_block_id = _local_lib_id;
}

purge_transaction_cache();

for( const auto& receipt : s->block->transactions ) {
if( receipt.trx.which() == 1 ) {
const auto& pt = receipt.trx.get<packed_transaction>();
const auto& tid = pt.id();
auto itr = _transaction_status.find( tid );
if( itr != _transaction_status.end() )
_transaction_status.erase(itr);
}
}

maybe_send_next_message(); /// attempt to send if we are idle
}

on_accepted_block函數,刪除緩存中的全部事務,遍歷接收到的區塊的事務receipt,得到事務的打包對象,事務id,在多索引表_transaction_status中查找該id,若是找到了則刪除。接下來若是在空閒狀態下,嘗試發送下一條pingpong心跳鏈接信息。
3 mongo_db_plugin鏈接該信號,綁定其mongo_db_plugin_impl::accepted_block函數,傳入區塊內容。

1
2
3
my->accepted_block_connection.emplace( chain.accepted_block.connect( [&]( const chain::block_state_ptr& bs ) {
my->accepted_block( bs );
} ));

 

accepted_block_connection.emplace 插入信號鏈接器,當accepted_block從controller發出後,mongodb一樣捕獲到該信號,調用my->accepted_block( bs );

插件捕捉處理④: producer_plugin鏈接該信號,執行其on_block函數,傳入區塊數據。

1
my->_accepted_block_connection.emplace(chain.accepted_block.connect( [this]( const auto& bsp ){ my->on_block( bsp ); } ));

 

on_block該函數主要是
函數首先作了校驗,包括時間是否大於最後簽名區塊的時間以及大於當前時間,還有區塊號是否大於最後簽名區塊號。校驗經過之後,活躍生產者帳戶集合active_producers開闢新空間,插入計劃出塊生產者。接下來利用set_intersection取本地生產者與集合active_producers的交集(若是結果爲空,說明本地生產者沒有出塊權利不屬於活躍生產者的一份子)。
將結果存入一個迭代器,迭代執行內部函數,若是交集生產者不等於接收區塊的生產者,說明是校驗別人生產的區塊,若是是相等的沒必要作特殊處理。校驗別人生產的區塊,首先要在活躍生產者的key中找到匹配的key(本地生產者帳戶公鑰),不然說明該區塊不是合法生產者簽名拋棄不處理。接下來,獲取本地生產者私鑰,組裝生產確認數據字段,包括區塊id,區塊摘要,生產者,簽名。更新producer插件本地標誌位_last_signed_block_time和_last_signed_block_num。最後發射信號confirmed_block,攜帶以上組裝好的數據。但通過搜索,項目中目前沒有對該信號設置槽connection。在區塊建立以前要爲該區塊的生產者設置水印用來標示該區塊的生產者是誰。

  1. irreversible_block
    發射時機
    on_irreversible函數,更改區塊狀態爲irreversible的函數,操做成功最後發射該信號。
    插件捕捉處理
    1
    2
    3
    my->irreversible_block_connection = my->chain->irreversible_block.connect( [this]( const block_state_ptr& blk ) {
    my->irreversible_block_channel.publish( priority::low, blk );
    } );

chain_plugin鏈接該信號,由信號槽轉播到channel,irreversible_block_channel發佈該區塊。bnet_plugin訂閱該channel,

1
2
3
4
my->_on_irb_handle = app().get_channel<channels::irreversible_block>()
.subscribe( [this]( block_state_ptr s ){
my->on_irreversible_block(s);
});

 

依然線程池遍歷會話,執行on_new_lib函數,當本地庫領先時能夠清除歷史直到知足當前庫,或者直到最後一個被遠端節點所知道的區塊。最後若是空閒,嘗試發送下一條pingpong心跳鏈接信息。
3 mongo_db_plugin鏈接該信號,執行applied_irreversible_block函數,仍舊參照mongo配置項的值決定是否儲存區塊、狀態區塊以及事務數據,而後將區塊數據塞入隊列等待消費。
4 producer_plugin鏈接該信號,綁定執行函數on_irreversible_block,設置producer成員_irreversible_block_time的值爲區塊的時間。

  1. accepted_transaction

發射時機:
1 push_scheduled_transaction函數,推送計劃事務時,將事務體通過一系列轉型以及校驗,接着發射該信號,認可事務。
2 push_transaction函數,新事務到大狀態區塊,要通過身份認證以及決定是否如今執行仍是延期執行,最後要插入到pending區塊的receipt接收事務中去。當檢查事務未被認可時,發射一次該信號。最後所有函數處理完畢,再次發射該信號。
插件捕捉處理
1 chain_plugin鏈接該信號,由信號槽轉播到channel,accepted_transaction_channel發佈該事務。

1
2
3
4
my->accepted_transaction_connection = my->chain->accepted_transaction.connect(
[this]( const transaction_metadata_ptr& meta ) {
my->accepted_transaction_channel.publish( priority::low, meta );
} );

 

bnet_plugin訂閱該channel,線程池遍歷會話,執行函數on_accepted_transaction。

1
2
3
4
my->_on_appled_trx_handle = app().get_channel<channels::accepted_transaction>()
.subscribe( [this]( transaction_metadata_ptr t ){
my->on_accepted_transaction(t);
});

 

在多是多個的投機塊中一個事務被認可,當一個區塊包含該認可事務或者切換分叉時,該事務狀態變爲「receive now」,被添加至數據庫表中,做爲發送給其餘節點的證據。當該事務被髮送給其餘節點時,根據這個狀態能夠保證以後不會重複發送。每一次事務被「accepted」,都會延時5秒鐘。每次一個區塊被應用,全部超過5秒未被應用的但被認可的事務都將被清除。
2 mongo_db_plugin鏈接該信號,執行函數accepted_transaction,校驗加入隊列待消費。

  1. applied_transaction

發射時機
1 push_scheduled_transaction函數,事務過時時間小於pending區塊時間處理後發射該信號。反之大於等於處理後發射該信號。當事務的sender發送者不爲空且沒有主觀失敗的處理後發射該信號。基於生產和校驗的主觀修改,主觀處理後發射該信號,非主觀處理髮射該信號。
2 push_transaction函數,發射兩次該信號,邏輯較多,這段包括以上那個函數的可讀性不好,註釋幾乎沒有。
插件捕捉處理
1 net_plugin鏈接該信號,綁定函數applied_transaction,打印日誌。
2 chain_plugin鏈接該信號,由信號槽轉播到channel,原理基本同上,再也不重複。
3 mongo_db_plugin同上。

  1. accepted_confirmation

發射時機:
1 push_confirmation函數,推送確認信息,在此階段不容許有pending區塊存在,接着fork_db添加確認信息,發射該信號。
插件捕捉處理
2 net_plugin鏈接該信號,綁定函數accepted_confirmation,打印日誌。
插件捕捉處理
1: chain_plugin鏈接該信號,由信號槽轉播到channel,基本同上。

謝謝關注個人公衆號

 

相關文章
相關標籤/搜索