歡迎關注存儲老少夥的博客。git
上篇是Leader 選舉部分。這篇是Ceph的Paxos協議的Phase1(Prepare),其目的是就PN達成一致。bootstrap
每次選舉產生新的leader,也會產生新的epoch。不選舉則不會修改epoch。
一個leader當選期間,發送的全部消息,都會帶有這個epoch。
若是因爲網絡分割等現象,有新的選舉發生,則根據epoch就發現leader已經變了。
注意,按照paxos論文描述,沒有Leader也是能夠正常運行的,只是可能下降效率。
沒有leader則不須要epoch
Leader當選後,會首先執行一次phase 1過程,以肯定PN。 在其爲leader期間,
全部的phase 2操做都共用一個PN。因此省略了大量的phase 1操做,這也是
paxos可以減少網絡開銷的緣由。 "Paxos made simple"文中說:
"A newly chosen leader executes phase 1 for infinitely many
instances of the consensus algorithm"。
PN是必須的,不管是否有leader,都必須有PN
能夠理解成Paxos 的instance ID,或者raft的logID。
對比Raft,雖然ceph的複製也能夠當作一個個log的追加,
可是全部信息都寫在k/v中,而不是寫log文件, 好比,instanceID爲X的log,
在k/v存儲中,其key是X,value是log內容。
其餘各類須要持久化的值,都寫在k/v存儲中。
除了log之外,每一個paxos成員,都維護如下幾個須要持久化的變量。
你們能夠跟raft的paper作些簡單對比。網絡
名稱 | 含義 | 其餘 |
---|---|---|
last_pn | 上次當選leader後生成的PN | get_new_proposal_number()使用,下次當選後,接着生成 |
accepted_pn | 我接受過的PN,多是別的leader提議的PN | peon根據這個值拒絕較小的PN |
first_committed | 本節點記錄的第一個被commit的版本 | 更早的版本(日誌),本節點沒有了 |
last_committed | 本節點記錄的最後一次被commit的版本 | 日後的版本,未被commit,可能有一個 |
uncommitted_v | 本節點記錄的未commit的版本,若是有,只能等於last_commit+1 | ceph只容許有一個未commit的版本 |
uncommitted_pn | 未commit的版本對應的PN | 與uncommitted_v,uncommitted_value在一個事務中記錄 |
uncommitted_value | 爲commit的版本的內容 | 見上面 |
注意,上述三個"uncommitted"開頭的值,可能壓根就不存在,好比正常關機,所有都commit了。數據結構
Phase 1就是 paxos協議的Propose階段,包括三個步驟,以下表:app
步驟 | Leader | Peons | 備註 |
---|---|---|---|
1 | collect() => | Leader給quorum中各個peon發送PN以及其餘附帶信息 | |
2 | <=handle_collect() | Peon贊成或者拒絕PN。並中間可能分享已經commit的數據 | |
3 | handle_last() | Quorum中peon所有贊成leader的PN,纔算成功 |
void Paxos::init() { //幾個持久化的變量,加載時即從從kv讀出。 // load paxos variables from stable storage //上次產生的PN last_pn = get_store()->get(get_name(), "last_pn"); //上次接受的pn accepted_pn = get_store()->get(get_name(), "accepted_pn"); //最近或最後一個被commit的verion,其實是paxos 的instance ID。 last_committed = get_store()->get(get_name(), "last_committed"); //保存的最先被commit的版本(log)。更早的log可能已經被truncate掉了 first_committed = get_store()->get(get_name(), "first_committed"); //paxos的 first_committed,並非某個monitor的first_committed,各個monitor //對應值可能都是不同的。 assert(is_consistent()); }
// PHASE 1: collect和handle_collect基本能對應paxos的phase 1 //這是leader的當選後執行函數,用於肯定新的PN。 //collect過程,至關於完成當選期間全部提議的phase 1。 //在其當選期間,會一直使用這個PN void Paxos::collect(version_t oldpn) { // we're recoverying, it seems! state = STATE_RECOVERING; assert(mon->is_leader()); // reset the number of lasts received uncommitted_v = 0; //新當選,初始化 uncommitted_pn = 0; uncommitted_value.clear(); peer_first_committed.clear(); peer_last_committed.clear(); //ceph的實現中,只容許有一個proposal處於pending狀態(跟raft相同)。 //若是新leader當選後發現有pending的提議,那麼其instanceID/version //只能是last_committed+1 if (get_store()->exists(get_name(), last_committed+1)) { /*pending_v, pending_pn和last_committed+1是一個事務寫的。 因此一塊兒檢查 */ version_t v = get_store()->get(get_name(), "pending_v"); version_t pn = get_store()->get(get_name(), "pending_pn"); if (v && pn && v == last_committed + 1){//這個是正常分支 uncommitted_pn = pn; } else { dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << accepted_pn << " and crossing our fingers" << dendl; uncommitted_pn = accepted_pn; } uncommitted_v = last_committed+1; //找到uncommitted_v (這個key)對應的value get_store()->get(get_name(), last_committed+1, uncommitted_value); //uncommitted_v存在,要求uncommitted_value必須存在。 assert(uncommitted_value.length()); logger->inc(l_paxos_collect_uncommitted); } //生成一個新的更大的PN,並本身先accept accepted_pn = get_new_proposal_number(MAX(accepted_pn, oldpn)); accepted_pn_from = last_committed; num_last = 1;//1, 表示本身已經投票了 //給quorum中各個成員發送 for (set<int>::const_iterator p = mon->get_quorum().begin(); p != mon->get_quorum().end(); ++p) { //跳過本身,已經算投過了並修改了accepted_pn if (*p == mon->rank) continue; //epoch的用意: 若是網絡分割,別人又發起了選舉,現任leader不知道,接收方會發現epoch不對 MMonPaxos *collect = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COLLECT, collect->last_committed = last_committed; //用來與peer比較的 collect->first_committed = first_committed; //這個操做自己帶的PN是剛生成的。 collect->pn = accepted_pn; mon->messenger->send_message(collect, mon->monmap->get_inst(*p)); } //設置超時處理 collect_timeout_event = new C_CollectTimeout(this); mon->timer.add_event_after(g_conf->mon_accept_timeout, collect_timeout_event); }
//Peon,能夠對應Raft的follower void Paxos::handle_collect(MonOpRequestRef op) { op->mark_paxos_event("handle_collect"); MMonPaxos *collect = static_cast<MMonPaxos*>(op->get_req()); assert(mon->is_peon()); // mon epoch filter should catch strays // we're recoverying, it seems! state = STATE_RECOVERING; //我落後的太遠,中間相差的已沒法經過log補齊,只有bootstrap(自舉)了。 if (collect->first_committed > last_committed+1) { dout(5) << __func__ << " leader's lowest version is too high for our last committed" << " (theirs: " << collect->first_committed << "; ours: " << last_committed << ") -- bootstrap!" << dendl; op->mark_paxos_event("need to bootstrap"); mon->bootstrap(); return; } // reply MMonPaxos *last = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LAST, ceph_clock_now(g_ceph_context)); //本地保存的兩個committed,返回給leader last->last_committed = last_committed; last->first_committed = first_committed; version_t previous_pn = accepted_pn;//這個是本地記錄的之前的accepted_pn //這個是標準的paxos PN比較,若是收到的PN大於我以前接受過的PN ,則贊成 if (collect->pn > accepted_pn) { accepted_pn = collect->pn; accepted_pn_from = collect->pn_from; dout(10) << "accepting pn " << accepted_pn << " from " << accepted_pn_from << dendl; MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); //須要先持久化,而後再回復 t->put(get_name(), "accepted_pn", accepted_pn); dout(30) << __func__ << " transaction dump:\n"; JSONFormatter f(true); t->dump(&f); f.flush(*_dout); *_dout << dendl; logger->inc(l_paxos_collect); logger->inc(l_paxos_collect_keys, t->get_keys()); logger->inc(l_paxos_collect_bytes, t->get_bytes()); utime_t start = ceph_clock_now(NULL); get_store()->apply_transaction(t); utime_t end = ceph_clock_now(NULL); logger->tinc(l_paxos_collect_latency, end - start); } else {//其餘狀況,不接受 // don't accept! dout(10) << "NOT accepting pn " << collect->pn << " from " << collect->pn_from << ", we already accepted " << accepted_pn << " from " << accepted_pn_from << dendl; } //若是collect->pn(對方發過來的pn)小於個人PN,那麼這個回覆,就是拒絕。 last->pn = accepted_pn; last->pn_from = accepted_pn_from; // share whatever committed values we have /*已經committed的數據都是能夠信任的,若是對方的last_committed比個人小, 那麼我把我知道的已經commit的都分享作同步。share_state時, 對方的處理函數是store_stat() 。完成後,對方也會修改了last_committed*/ share_state(last, collect->first_committed, collect->last_committed); // do we have an accepted but uncommitted value? // (it'll be at last_committed+1) bufferlist bl; if (collect->last_committed <= last_committed && get_store()->exists(get_name(), last_committed+1)) { //前面提過,last_committed+1這個版本若是存在,那是一個未決的提議, //須要告訴leader。 get_store()->get(get_name(), last_committed+1, bl); assert(bl.length() > 0); dout(10) << " sharing our accepted but uncommitted value for " << last_committed+1 << " (" << bl.length() << " bytes)" << dendl; last->values[last_committed+1] = bl; version_t v = get_store()->get(get_name(), "pending_v"); version_t pn = get_store()->get(get_name(), "pending_pn"); if (v && pn && v == last_committed + 1) { /*若是有pending_pn,那麼返回的uncommitted_pn就是 pending_pn, 不然就在下面直接用previous_pn代替了*/ last->uncommitted_pn = pn; } else { // previously we didn't record which pn a value was accepted // under! use the pn value we just had... :( dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << previous_pn << " and crossing our fingers" << dendl; last->uncommitted_pn = previous_pn; } logger->inc(l_paxos_collect_uncommitted); } //reply多是拒絕,若是個人pn比leader給的大 collect->get_connection()->send_message(last); }
/**對方的處理函數是: store_state。share的是兩者last_committed之間的各個版本對應的value。 * @note This is Okay. We share our versions between peer_last_committed and * our last_committed (inclusive), and add their bufferlists to the * message. It will be the peer's job to apply them to its store, as * these bufferlists will contain raw transactions. * This function is called by both the Peon and the Leader. The Peon will * share the state with the Leader during handle_collect(), sharing any * values the leader may be missing (i.e., the leader's last_committed is * lower than the peon's last_committed). The Leader will share the state * with the Peon during handle_last(), if the peon's last_committed is * lower than the leader's last_committed. */ void Paxos::share_state(MMonPaxos *m, version_t peer_first_committed, version_t peer_last_committed) { assert(peer_last_committed < last_committed); dout(10) << "share_state peer has fc " << peer_first_committed << " lc " << peer_last_committed << dendl; version_t v = peer_last_committed + 1; // include incrementals uint64_t bytes = 0; for ( ; v <= last_committed; v++) { /*注意這裏面並無進行消息傳遞,只是把兩個版本之間的內容給打包 進了msg,隨着msg的其餘內容一塊兒發送*/ if (get_store()->exists(get_name(), v)) { get_store()->get(get_name(), v, m->values[v]); assert(m->values[v].length()); dout(10) << " sharing " << v << " (" << m->values[v].length() << " bytes)" << dendl; bytes += m->values[v].length() + 16; // paxos_ + 10 digits = 16 } } logger->inc(l_paxos_share_state); logger->inc(l_paxos_share_state_keys, m->values.size()); logger->inc(l_paxos_share_state_bytes, bytes); m->last_committed = last_committed; } /** * Store on disk a state that was shared with us * * Basically, we received a set of version. Or just one. It doesn't matter. * What matters is that we have to stash it in the store. So, we will simply * write every single bufferlist into their own versions on our side (i.e., * onto paxos-related keys), and then we will decode those same bufferlists * we just wrote and apply the transactions they hold. We will also update * our first and last committed values to point to the new values, if need * be. All all this is done tightly wrapped in a transaction to ensure we * enjoy the atomicity guarantees given by our awesome k/v store. */ bool Paxos::store_state(MMonPaxos *m) { MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); map<version_t,bufferlist>::iterator start = m->values.begin(); bool changed = false; // build map of values to store // we want to write the range [last_committed, m->last_committed] only. //對方狀態比我快太多,無法根據收到的值去catchup if (start != m->values.end() && start->first > last_committed + 1) { // ignore everything if values start in the future. dout(10) << "store_state ignoring all values, they start at " << start->first << " > last_committed+1" << dendl; start = m->values.end(); } // push forward the start position on the message's values iterator, up until // we run out of positions or we find a position matching 'last_committed'. while (start != m->values.end() && start->first <= last_committed) { //移到個人last_committed開始 ++start; } // make sure we get the right interval of values to apply by pushing forward // the 'end' iterator until it matches the message's 'last_committed'. map<version_t,bufferlist>::iterator end = start; while (end != m->values.end() && end->first <= m->last_committed) { last_committed = end->first;//內存中先修改 ++end; } if (start == end) { dout(10) << "store_state nothing to commit" << dendl; } else { dout(10) << "store_state [" << start->first << ".." << last_committed << "]" << dendl; //用一個事務,寫入全部變化,包括last_committed和各個version t->put(get_name(), "last_committed", last_committed); // we should apply the state here -- decode every single bufferlist in the // map and append the transactions to 't'. map<version_t,bufferlist>::iterator it; for (it = start; it != end; ++it) { // write the bufferlist as the version's value //要store的version和相應value,先推入t t->put(get_name(), it->first, it->second); // decode the bufferlist and append it to the transaction we will shortly // apply. decode_append_transaction(t, it->second); } // discard obsolete uncommitted value? if (uncommitted_v && uncommitted_v <= last_committed) { dout(10) << " forgetting obsolete uncommitted value " << uncommitted_v << " pn " << uncommitted_pn << dendl; uncommitted_v = 0; uncommitted_pn = 0; uncommitted_value.clear(); } } if (!t->empty()) {//t非空,說明有值要寫 logger->inc(l_paxos_store_state); logger->inc(l_paxos_store_state_bytes, t->get_bytes()); logger->inc(l_paxos_store_state_keys, t->get_keys()); utime_t start = ceph_clock_now(NULL); /*事務提交,包括last_committed和一些version及values。 這個函數實際上會等待事務完成。*/ get_store()->apply_transaction(t); utime_t end = ceph_clock_now(NULL); logger->tinc(l_paxos_store_state_latency, end - start); //first_committed可能在事務執行過程當中trim被修改了(log被trim了),刷新下 first_committed = get_store()->get(get_name(), "first_committed"); _sanity_check_store(); changed = true;//說明有修改的值 } remove_legacy_versions();//erase掉比first_committed更早的 return changed; }
/*Leader收到回覆後的處理。 在 Ceph的election過程當中,用預設的rank做爲優先級。 當選的leader不必定持有最新的數據,所以collection過程當中, Leader須要更新下本身的數據。這些更新,都是根據已經"commit"的數據。*/ void Paxos::handle_last(MonOpRequestRef op) { op->mark_paxos_event("handle_last"); MMonPaxos *last = static_cast<MMonPaxos*>(op->get_req()); bool need_refresh = false; //from是對方的編號 int from = last->get_source().num(); dout(10) << "handle_last " << *last << dendl; if (!mon->is_leader()) { dout(10) << "not leader, dropping" << dendl; return; } // note peer's first_ and last_committed, in case we learn a new // commit and need to push it to them. //本次返回的結果,插入map。 peer_first_committed[from] = last->first_committed; peer_last_committed[from] = last->last_committed; //跟peer相比,本身落後不少,以致於別人也沒有保留當時的各個版本的raw transaction信息。 //只有直接走bootstrap流程,作徹底同步。 if (last->first_committed > last_committed + 1) { dout(5) << __func__ << " mon." << from << " lowest version is too high for our last committed" << " (theirs: " << last->first_committed << "; ours: " << last_committed << ") -- bootstrap!" << dendl; op->mark_paxos_event("need to bootstrap"); mon->bootstrap(); return; } assert(g_conf->paxos_kill_at != 1); /*對應handle_collect 內部的share_state,對方可能給我共享了 一部分更新的已commit數據(leader的狀態比較舊)*/ need_refresh = store_state(last); assert(g_conf->paxos_kill_at != 2); //store_state()會改變leader的last_committed和first_committed。 //而後就可能發現某個peon也須要被更新 for (map<int,version_t>::iterator p = peer_last_committed.begin(); p != peer_last_committed.end(); ++p) { if (p->second + 1 < first_committed && first_committed > 1) { //對方版本太舊,無法同步了。 dout(5) << __func__ << " peon " << p->first << " last_committed (" << p->second << ") is too low for our first_committed (" << first_committed << ") -- bootstrap!" << dendl; op->mark_paxos_event("need to bootstrap"); mon->bootstrap(); return; } //對方比我舊,可是還在可同步範圍。 if (p->second < last_committed) { // share committed values dout(10) << " sending commit to mon." << p->first << dendl; MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), ceph_clock_now(g_ceph_context)); //構造一條commit消息,給peon分享已經commit的數據 share_state(commit, peer_first_committed[p->first], p->second); mon->messenger->send_message(commit, mon->monmap->get_inst(p->first)); } } //Peon接受過的PN比Leader生成的PN大,按照paxos協議,提升PN,重試! if (last->pn > accepted_pn) { // no, try again. dout(10) << " they had a higher pn than us, picking a new one." << dendl; // cancel timeout event mon->timer.cancel_event(collect_timeout_event); collect_timeout_event = 0; //注意,此次用新的PN繼續collect。但不是從新選舉。 collect(last->pn); } else if (last->pn == accepted_pn) {//對方接受了 // yes, they accepted our pn. great. num_last++; // did this person send back an accepted but uncommitted value? if (last->uncommitted_pn) { //last_commited對應的確定在此前達成quorum一致的, //而uncommitted則是認爲沒有造成quorum一致的,須要處理。 //保證有大的uncommitted_pn,才符合paxos只直接受更大PN原則 if (last->uncommitted_pn >= uncommitted_pn && last->last_committed >= last_committed && last->last_committed + 1 >= uncommitted_v) { //這個比較,是由於Leader會收到多個peon的uncommitted_v,取大的。 uncommitted_v = last->last_committed+1; // uncommitted_v 會一直朝大的變化 uncommitted_pn = last->uncommitted_pn; uncommitted_value = last->values[uncommitted_v]; dout(10) << "we learned an uncommitted value for " << uncommitted_v << " pn " << uncommitted_pn << " " << uncommitted_value.length() << " bytes" << dendl; } else { dout(10) << "ignoring uncommitted value for " << (last->last_committed+1) << " pn " << last->uncommitted_pn << " " << last->values[last->last_committed+1].length() << " bytes" << dendl; } } // is that everyone? if (num_last == mon->get_quorum().size()) { //這裏要求quorum成員全體都響應 // cancel timeout event mon->timer.cancel_event(collect_timeout_event); collect_timeout_event = 0; peer_first_committed.clear(); peer_last_committed.clear(); // almost... // did we learn an old value? if (uncommitted_v == last_committed+1 && //只容許差1 //消息中攜帶的,上面剛剛賦了值 dout(10) << "that's everyone. begin on old learned value" << dendl; //選舉結束,可是發現選舉前,有未commit的value。 //以前的value不必定造成了多數派,因此要從新走一次accept過程。 state = STATE_UPDATING_PREVIOUS; //這個value可能只造成了少數派,不能直接commit。 //而是用原來的PN最大的value,使用新的PN,從新走一次phase 2。 begin(uncommitted_value); } else {//這個分支,實際上存在少數派宕機重啓的不肯定性問題 // active! dout(10) << "that's everyone. active!" << dendl; extend_lease(); need_refresh = false; if (do_refresh()) { finish_round(); } } } } else { // no, this is an old message, discard dout(10) << "old pn, ignoring" << dendl; } if (need_refresh) (void)do_refresh(); }
/*collect的超時處理:直接調用bootstrap。同步monitor信息,並從新選舉leader*/ void Paxos::collect_timeout() { dout(1) << "collect timeout, calling fresh election" << dendl; collect_timeout_event = 0; logger->inc(l_paxos_collect_timeout); assert(mon->is_leader()); mon->bootstrap(); }