歡迎關注存儲老少夥的博客。bootstrap
上篇是Phase 1,即leader當選後肯定PN的部分。這篇主要是Phase 2,即正常工做過程當中的Propose、accept和commit過程。網絡
Ceph的paxos實現,不算很精妙,近期修改也不大活躍。可是對於咱們理解paxos協議仍然有幫助。app
Leader | Peons | 說明 |
---|---|---|
begin()=> | Leader給quorum中各個成員發送提議,包含PN、version和內容 | |
<= handle_begin() | Peon處理提議,有可能會拒絕 | |
handle_accept() | 只有quorum中全部成員都贊成,纔算成功 | |
commit_start() | handle_accept在收到全部ack後調用,用事務寫commit記錄,並設置回調函數 | |
commit_finish()=> | 上一步的回調函數,在實際事務完成時執行 | |
handle_commit() | Peon根據leader的commit消息同步狀態 |
從begin()到commit_finish()稱爲一輪,即一次提議的完成過程。異步
Ceph的paxos實現,每次只容許一個提議在執行中,即上面提到
的一輪完成才能執行下一輪。在這個過程當中,會有屢次寫盤操做。
這個過程實際上比較慢。對於ceph自身來講,osd等的狀態變動,
不頻繁,無需極高的paxos性能。 可是若是用於作用於分佈式數據
庫等系統的日誌,這種方式則有不足。分佈式
//這是Phase 2的開始,leader正常工做時,全部提議都是從這個函數開始 //bufferlist是內容的打包,paxos層不須要知道具體語義 //leader void Paxos::begin(bufferlist& v) { assert(mon->is_leader()); assert(is_updating() || is_updating_previous()); //STATE_UPDATING_PREVIOUS對應剛當選後,發現有uncommited value,而且是 //下一個版本(last_committed+1) // we must already have a majority for this to work. assert(mon->get_quorum().size() == 1 || num_last > (unsigned)mon->monmap->size()/2); // and no value, yet. assert(new_value.length() == 0); //清空"已accept的成員"列表 accepted.clear(); accepted.insert(mon->rank);//表示 本身先accept new_value = v; if (last_committed == 0) { //paxos從未進行過提交。將"first_committed"的初始化,也打包到一個事務中 // initial base case; set first_committed too t->put(get_name(), "first_committed", 1); decode_append_transaction(t, new_value); bufferlist tx_bl; t->encode(tx_bl); new_value = tx_bl; } // store the proposed value in the store. IF it is accepted, we will then // have to decode it into a transaction and apply it. MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); //下這幾個k/v,在同一個事務中寫下去。 //實際上,咱們只會建議last_committed+1,所以不會有多個pending的版本。 t->put(get_name(), last_committed+1, new_value); //pending_v就是上面的version t->put(get_name(), "pending_v", last_committed + 1); //配套的PN,這些都記爲pending. t->put(get_name(), "pending_pn", accepted_pn); logger->inc(l_paxos_begin); logger->inc(l_paxos_begin_keys, t->get_keys()); logger->inc(l_paxos_begin_bytes, t->get_bytes()); utime_t start = ceph_clock_now(NULL); //保存前面put的三個 k/v get_store()->apply_transaction(t); utime_t end = ceph_clock_now(NULL); logger->tinc(l_paxos_begin_latency, end - start); assert(g_conf->paxos_kill_at != 3); //quorum size是1,這是all in one 配置纔有的場景 if (mon->get_quorum().size() == 1) { // we're alone, take it easy commit_start(); return; } //給quorum中各個成員發提議 for (set<int>::const_iterator p = mon->get_quorum().begin(); p != mon->get_quorum().end(); ++p) { if (*p == mon->rank) continue; MMonPaxos *begin = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_BEGIN, ceph_clock_now(g_ceph_context)); //消息包含下面三個值 begin->values[last_committed+1] = new_value; begin->last_committed = last_committed;//這個直接發送過去了。 begin->pn = accepted_pn; mon->messenger->send_message(begin, mon->monmap->get_inst(*p)); } // set timeout event accept_timeout_event = new C_AcceptTimeout(this); mon->timer.add_event_after(g_conf->mon_accept_timeout, accept_timeout_event); }
//Phase 2, Peon收到提議後的處理 void Paxos::handle_begin(MonOpRequestRef op) { op->mark_paxos_event("handle_begin"); MMonPaxos *begin = static_cast<MMonPaxos*>(op->get_req()); //比較PN,肯定是否應該accept。這個是標準paxos協議作法 if (begin->pn < accepted_pn) { /*可能已經有人發起了新的選舉,新的leader作了collect修改了PN。 好比以前的leader網絡故障,而後恢復了,它仍然按照舊狀態在運行*/ dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << dendl; op->mark_paxos_event("have higher pn, ignore"); return; } //實際上保證leader仍是那個leader,即朝代沒改。 assert(begin->pn == accepted_pn); assert(begin->last_committed == last_committed);//由於leader沒有變化, //請求一個一個處理,因此老是維持一致的last_committed,雙方知道下一個版本是啥 assert(g_conf->paxos_kill_at != 4); logger->inc(l_paxos_begin); // set state. state = STATE_UPDATING; //不一樣狀態下能作不一樣的事情。 lease_expire = utime_t(); // cancel lease // yes. version_t v = last_committed+1; //設置version,這時不會有uncommitted吧? 應該是同步完成了。 dout(10) << "accepting value for " << v << " pn " << accepted_pn << dendl; // store the accepted value onto our store. We will have to decode it and // apply its transaction once we receive permission to commit. MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); //如下幾個k/v,在一個事務中寫入 t->put(get_name(), v, begin->values[v]); // note which pn this pending value is for. t->put(get_name(), "pending_v", v);//下面會apply,可是這些都是pending的 t->put(get_name(), "pending_pn", accepted_pn);//不用寫pending_version, //由於有last_committed已經持久化了,這個含義明確。 logger->inc(l_paxos_begin_bytes, t->get_bytes()); utime_t start = ceph_clock_now(NULL); get_store()->apply_transaction(t); utime_t end = ceph_clock_now(NULL); logger->tinc(l_paxos_begin_latency, end - start); assert(g_conf->paxos_kill_at != 5); // reply MMonPaxos *accept = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_ACCEPT, ceph_clock_now(g_ceph_context)); accept->pn = accepted_pn; accept->last_committed = last_committed; begin->get_connection()->send_message(accept); }
//Phase 2, Leader 收到Peon的ack後的處理 // leader void Paxos::handle_accept(MonOpRequestRef op) { op->mark_paxos_event("handle_accept"); MMonPaxos *accept = static_cast<MMonPaxos*>(op->get_req()); dout(10) << "handle_accept " << *accept << dendl; int from = accept->get_source().num(); if (accept->pn != accepted_pn) {//更高的PN,可能別人已經當選leader // we accepted a higher pn, from some other leader dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << dendl; op->mark_paxos_event("have higher pn, ignore"); return; } if (last_committed > 0 && accept->last_committed < last_committed-1) {//舊的響應,好比網絡延遲太大引發。拋棄。 dout(10) << " this is from an old round, ignoring" << dendl; op->mark_paxos_event("old round, ignore"); return; } assert(accept->last_committed == last_committed || // not committed accept->last_committed == last_committed-1); // committed // 只容許差1 assert(is_updating() || is_updating_previous()); assert(accepted.count(from) == 0); //確認同一個peon不重複發送 accepted.insert(from); dout(10) << " now " << accepted << " have accepted" << dendl; assert(g_conf->paxos_kill_at != 6); // only commit (and expose committed state) when we get *all* quorum // members to accept. otherwise, they may still be sharing the now // stale state. // FIXME: we can improve this with an additional lease revocation message // that doesn't block for the persist. if (accepted == mon->get_quorum()) { //全部的屬於quorum的都響應才行,不然會走到timeout分支 // yay, commit! op->mark_paxos_event("commit_start"); commit_start(); //這個函數,最終引發調用commit_finish,修改last_committed。 } } void Paxos::accept_timeout() { dout(1) << "accept timeout, calling fresh election" << dendl; accept_timeout_event = 0; assert(mon->is_leader()); assert(is_updating() || is_updating_previous() || is_writing() || is_writing_previous()); logger->inc(l_paxos_accept_timeout); mon->bootstrap();//注意,是直接自舉,即觸發從新覈對qurom,並選leader。Ceph實現的特殊之處。 } struct C_Committed : public Context { Paxos *paxos; C_Committed(Paxos *p) : paxos(p) {} void finish(int r) { assert(r >= 0); //這個類,在構造函數中得到鎖,析構函數中釋放鎖 Mutex::Locker l(paxos->mon->lock); //下面這一段執行,是受mon->lock保護的 paxos->commit_finish(); } }; //commit_start只有leader會調用,因此commit_finish 也就只有leader用了 void Paxos::commit_start() { dout(10) << __func__ << " " << (last_committed+1) << dendl; assert(g_conf->paxos_kill_at != 7); MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); // commit locally t->put(get_name(), "last_committed", last_committed + 1); //這個修改,下面是queue_trans,不是apply // decode the value and apply its transaction to the store. // this value can now be read from last_committed. decode_append_transaction(t, new_value); dout(30) << __func__ << " transaction dump:\n"; JSONFormatter f(true); t->dump(&f); f.flush(*_dout); *_dout << dendl; logger->inc(l_paxos_commit); logger->inc(l_paxos_commit_keys, t->get_keys()); logger->inc(l_paxos_commit_bytes, t->get_bytes()); commit_start_stamp = ceph_clock_now(NULL); //C_Committed在finish執行時,會獲取鎖(在finish函數),可是當前上下文應該持有了鎖, //何時放鎖的? 不然C_Committed:finish() 無法拿到鎖。 get_store()->queue_transaction(t, new C_Committed(this)); // 鉤子函數,txn結束時callback, C_Committed的finish? trans //在MonitorDBStore.h的finish()中有註釋,其實是在作了txn apply後,調用callback if (is_updating_previous()) state = STATE_WRITING_PREVIOUS; else if (is_updating()) state = STATE_WRITING;//設置這兩個狀態後,依賴於異步的commit完成回調,去作清除, //本context實際上很快結束,不會修改此狀態。參見paxos::commit_finish() else assert(0); if (mon->get_quorum().size() > 1) { // cancel timeout event mon->timer.cancel_event(accept_timeout_event); accept_timeout_event = 0; } } //Leader的函數 void Paxos::commit_finish() { dout(20) << __func__ << " " << (last_committed+1) << dendl; utime_t end = ceph_clock_now(NULL); logger->tinc(l_paxos_commit_latency, end - commit_start_stamp); assert(g_conf->paxos_kill_at != 8); // cancel lease - it was for the old value. // (this would only happen if message layer lost the 'begin', but // leader still got a majority and committed with out us.) lease_expire = utime_t(); // cancel lease //這裏才修改last_committed last_committed++;//這裏才修改last_committed last_commit_time = ceph_clock_now(NULL); // refresh first_committed; this txn may have trimmed. //說了可能trim log。流程還麼仔細看 first_committed = get_store()->get(get_name(), "first_committed"); _sanity_check_store(); // tell everyone for (set<int>::const_iterator p = mon->get_quorum().begin(); p != mon->get_quorum().end(); ++p) { if (*p == mon->rank) continue; dout(10) << " sending commit to mon." << *p << dendl; MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT, ceph_clock_now(g_ceph_context)); //leader 在commit完成後,通知peon commit->values[last_committed] = new_value; commit->pn = accepted_pn; commit->last_committed = last_committed; mon->messenger->send_message(commit, mon->monmap->get_inst(*p)); //本地的last_committed等信息已經修改了,通知其餘的 } assert(g_conf->paxos_kill_at != 9); // get ready for a new round. new_value.clear(); remove_legacy_versions(); // WRITING -> REFRESH // among other things, this lets do_refresh() -> mon->bootstrap() know // it doesn't need to flush the store queue assert(is_writing() || is_writing_previous()); state = STATE_REFRESH; //do_refresh(),是讓服務層刷新知道該propose了。 if (do_refresh()) {//do_refresh的註釋中(.h),在它返回false時,abort,應該是異常狀況吧。 commit_proposal();//這個應該是調用上次註冊的complete函數 if (mon->get_quorum().size() > 1) {//若是隻有我本身,單個monitor,那就無需lease了。 extend_lease(); } //喚醒等待者 finish_contexts(g_ceph_context, waiting_for_commit); assert(g_conf->paxos_kill_at != 10); finish_round();//修改狀態,本輪結束。讓pending的proposal能夠繼續執行。 //修改了last_committed和status。可是這個同步寫完再作下一個的方式,比較慢。 } }
//Peon的函數,leader通知哪些已經commit,這些是能夠信任的 void Paxos::handle_commit(MonOpRequestRef op) { op->mark_paxos_event("handle_commit"); MMonPaxos *commit = static_cast<MMonPaxos*>(op->get_req()); dout(10) << "handle_commit on " << commit->last_committed << dendl; logger->inc(l_paxos_commit); if (!mon->is_peon()) { dout(10) << "not a peon, dropping" << dendl; assert(0); return; } op->mark_paxos_event("store_state"); store_state(commit); if (do_refresh()) {//讓service層刷新狀態 finish_contexts(g_ceph_context, waiting_for_commit);//Peon端,沒有等待被propose的。 } }
/*一個paxos過程結束後,須要讓上層的各個service(monitor)刷新狀態。 由於paxos這層自己不知道語義,只是肯定執行順序而已。一個paxos決議可能 包含了幾個上層service的內容。 */ bool Paxos::do_refresh() { bool need_bootstrap = false; utime_t start = ceph_clock_now(NULL); // make sure we have the latest state loaded up mon->refresh_from_paxos(&need_bootstrap); utime_t end = ceph_clock_now(NULL); logger->inc(l_paxos_refresh); logger->tinc(l_paxos_refresh_latency, end - start); if (need_bootstrap) {//須要bootstrap才返回false,正常都是成功 dout(10) << " doing requested bootstrap" << dendl; mon->bootstrap(); return false; } return true; } //喚醒上層等待當前提議完成的上下文 void Paxos::commit_proposal() { dout(10) << __func__ << dendl; assert(mon->is_leader()); assert(is_refresh()); list<Context*> ls; ls.swap(committing_finishers); //從pending_finishers ==swap==> committing_finishers ==swap==> ls finish_contexts(g_ceph_context, ls); //作callback,paxosservice調用 queue_pending_finisher()註冊的鉤子 }
//已經完成上一輪提議過程,能夠開始下一個 void Paxos::finish_round() { dout(10) << __func__ << dendl; assert(mon->is_leader()); // ok, now go active! state = STATE_ACTIVE;//不是active是不會去propose的。 dout(20) << __func__ << " waiting_for_acting" << dendl; finish_contexts(g_ceph_context, waiting_for_active); dout(20) << __func__ << " waiting_for_readable" << dendl; finish_contexts(g_ceph_context, waiting_for_readable); dout(20) << __func__ << " waiting_for_writeable" << dendl; finish_contexts(g_ceph_context, waiting_for_writeable); dout(10) << __func__ << " done w/ waiters, state " << state << dendl; if (should_trim()) { trim(); } if (is_active() && pending_proposal) { propose_pending(); } }
/* * return a globally unique, monotonically increasing proposal number */ version_t Paxos::get_new_proposal_number(version_t gt) { if (last_pn < gt) last_pn = gt; //每一個monitor有本身的rank,把rank做爲本身產生的PN的低位數,則各自不一樣。 //好比,rank=5的,產生的rank只多是105, 205, 305等,即n*100 +5 // update. make it unique among all monitors. last_pn /= 100; //因爲gt多是別人發過來的,是不一樣的rank,若是直接 //把last_pn +=100,last_pn所帶的rank就是別人的,不是本身應該產生的合法pn。 last_pn++; last_pn *= 100; last_pn += (version_t)mon->rank;//若是以前last_pn = 306,而個人rank是 5, //則獲得了新的last_pn是405 // write MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); t->put(get_name(), "last_pn", last_pn);//持久化到kv dout(30) << __func__ << " transaction dump:\n"; JSONFormatter f(true); t->dump(&f); f.flush(*_dout); *_dout << dendl; logger->inc(l_paxos_new_pn); utime_t start = ceph_clock_now(NULL); get_store()->apply_transaction(t); utime_t end = ceph_clock_now(NULL); logger->tinc(l_paxos_new_pn_latency, end - start); dout(10) << "get_new_proposal_number = " << last_pn << dendl; return last_pn; } void Paxos::cancel_events() { if (collect_timeout_event) { mon->timer.cancel_event(collect_timeout_event); collect_timeout_event = 0; } if (accept_timeout_event) { mon->timer.cancel_event(accept_timeout_event); accept_timeout_event = 0; } if (lease_renew_event) { mon->timer.cancel_event(lease_renew_event); lease_renew_event = 0; } if (lease_ack_timeout_event) { mon->timer.cancel_event(lease_ack_timeout_event); lease_ack_timeout_event = 0; } if (lease_timeout_event) { mon->timer.cancel_event(lease_timeout_event); lease_timeout_event = 0; } } void Paxos::shutdown() { dout(10) << __func__ << " cancel all contexts" << dendl; // discard pending transaction pending_proposal.reset(); finish_contexts(g_ceph_context, waiting_for_writeable, -ECANCELED); finish_contexts(g_ceph_context, waiting_for_commit, -ECANCELED); finish_contexts(g_ceph_context, waiting_for_readable, -ECANCELED); finish_contexts(g_ceph_context, waiting_for_active, -ECANCELED); finish_contexts(g_ceph_context, pending_finishers, -ECANCELED); finish_contexts(g_ceph_context, committing_finishers, -ECANCELED); if (logger) g_ceph_context->get_perfcounters_collection()->remove(logger); delete logger; } void Paxos::leader_init() { cancel_events(); new_value.clear(); // discard pending transaction pending_proposal.reset();//當選leader以前的都廢棄掉 finish_contexts(g_ceph_context, pending_finishers, -EAGAIN); finish_contexts(g_ceph_context, committing_finishers, -EAGAIN); logger->inc(l_paxos_start_leader); if (mon->get_quorum().size() == 1) { state = STATE_ACTIVE; return; } state = STATE_RECOVERING; lease_expire = utime_t(); dout(10) << "leader_init -- starting paxos recovery" << dendl; collect(0); } void Paxos::peon_init() { cancel_events(); new_value.clear(); state = STATE_RECOVERING; lease_expire = utime_t(); dout(10) << "peon_init -- i am a peon" << dendl; // start a timer, in case the leader never manages to issue a lease reset_lease_timeout(); // discard pending transaction pending_proposal.reset(); // no chance to write now! finish_contexts(g_ceph_context, waiting_for_writeable, -EAGAIN); finish_contexts(g_ceph_context, waiting_for_commit, -EAGAIN); finish_contexts(g_ceph_context, pending_finishers, -EAGAIN); finish_contexts(g_ceph_context, committing_finishers, -EAGAIN); logger->inc(l_paxos_start_peon); } void Paxos::restart() { dout(10) << "restart -- canceling timeouts" << dendl; cancel_events(); new_value.clear(); if (is_writing() || is_writing_previous()) { dout(10) << __func__ << " flushing" << dendl; mon->lock.Unlock(); mon->store->flush(); mon->lock.Lock(); dout(10) << __func__ << " flushed" << dendl; } state = STATE_RECOVERING; // discard pending transaction pending_proposal.reset(); finish_contexts(g_ceph_context, committing_finishers, -EAGAIN); finish_contexts(g_ceph_context, pending_finishers, -EAGAIN); finish_contexts(g_ceph_context, waiting_for_commit, -EAGAIN); finish_contexts(g_ceph_context, waiting_for_active, -EAGAIN); logger->inc(l_paxos_restart); }
void Paxos::dispatch(MonOpRequestRef op) { assert(op->is_type_paxos()); op->mark_paxos_event("dispatch"); PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req()); // election in progress? if (!mon->is_leader() && !mon->is_peon()) { dout(5) << "election in progress, dropping " << *m << dendl; return; } // check sanity assert(mon->is_leader() || (mon->is_peon() && m->get_source().num() == mon->get_leader())); //應該是指peon只接受leader的消息,可是不能接受其餘peon的消息 switch (m->get_type()) { case MSG_MON_PAXOS: { MMonPaxos *pm = reinterpret_cast<MMonPaxos*>(m); // NOTE: these ops are defined in messages/MMonPaxos.h switch (pm->op) { // learner case MMonPaxos::OP_COLLECT: handle_collect(op); break; case MMonPaxos::OP_LAST: handle_last(op); break; case MMonPaxos::OP_BEGIN: handle_begin(op); break; case MMonPaxos::OP_ACCEPT: handle_accept(op); break; case MMonPaxos::OP_COMMIT: handle_commit(op); break; case MMonPaxos::OP_LEASE: handle_lease(op); break; case MMonPaxos::OP_LEASE_ACK: handle_lease_ack(op); break; default: assert(0); } } break; default: assert(0); } }
// -- WRITE -- bool Paxos::is_writeable() { return mon->is_leader() && is_active() && is_lease_valid(); } void Paxos::propose_pending() { assert(is_active()); assert(pending_proposal); cancel_events(); bufferlist bl; pending_proposal->encode(bl); dout(10) << __func__ << " " << (last_committed + 1) << " " << bl.length() << " bytes" << dendl; dout(30) << __func__ << " transaction dump:\n"; JSONFormatter f(true); pending_proposal->dump(&f); f.flush(*_dout); *_dout << dendl; pending_proposal.reset();//讓pending_proposal再也不ref裏面的Transaction類型對象, //參見http://en.cppreference.com/w/cpp/memory/shared_ptr/reset committing_finishers.swap(pending_finishers); //list::swap()的含義: Exchanges the contents of two lists, // http://www.cplusplus.com/reference/list/list/swap-free/ state = STATE_UPDATING; begin(bl); } void Paxos::queue_pending_finisher(Context *onfinished) { dout(5) << __func__ << " " << onfinished << dendl; assert(onfinished); pending_finishers.push_back(onfinished); } //注意,上層經過這個函數獲取txn,而後再往裏面添加內容。 //按照這麼理解,一次propose,可能有多個操做被打包 MonitorDBStore::TransactionRef Paxos::get_pending_transaction() //pending transaction,不是proposal { assert(mon->is_leader()); if (!pending_proposal) { pending_proposal.reset(new MonitorDBStore::Transaction); assert(pending_finishers.empty()); } return pending_proposal; } bool Paxos::trigger_propose() { if (is_active()) { dout(10) << __func__ << " active, proposing now" << dendl; propose_pending(); return true; } else { dout(10) << __func__ << " not active, will propose later" << dendl; return false; } } bool Paxos::is_consistent() { return (first_committed <= last_committed); }