Ceph的Paxos實現基本介紹,參見Github的這篇博客
本博客分爲幾個部分,先從簡單的開始。這是第一部分,Paxos 的Leader Election。html
當選的leader,不必定有最新的數據
。因此在phase 1中,會根據已經commit的數據,進行leader和peon之間的同步全部提議,必須quorum中所有成員贊成
。Elector::defer()node
接受別人選舉,延遲推選本身
Elector::handle_propose()git
處理別人的自薦消息
/*Ceph的monitor使用leveldb做爲持久化存儲,下面的mon->store 就是leveldb操做的封裝,因爲不少數據共用同一leveldb,因此對 key的空間作了分級,Monitor::MONITOR_NAME能夠認爲是第一級key*/ void Elector::init() { //選舉的epoch,遞增分配。每次修改都作了持久化,這是從kv db讀取。 epoch = mon->store->get(Monitor::MONITOR_NAME, "election_epoch"); if (!epoch)//首次使用 epoch = 1; }
/*將本身的epoch修改成參數 e,須要持久化到 kv 存儲。*/ void Elector::bump_epoch(epoch_t e) { dout(10) << "bump_epoch " << epoch << " to " << e << dendl; assert(epoch <= e); epoch = e; //使用一個事務,寫kv存儲 MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); t->put(Monitor::MONITOR_NAME, "election_epoch", epoch); mon->store->apply_transaction(t);//持久化收到的epoch。 //join election會使 monitor 進入 STATE_ELECTING 狀態 mon->join_election(); // clear up some state electing_me = false; //由於別人的epoch比本身大,放棄選本身 acked_me.clear(); //這個acked_m,是選本身才有意義,在此清空。 classic_mons.clear(); }
//在啓動後或者leader超時等場合,會發起自薦 void Elector::start()//推選本身 { if (!participating) { return; } //清空,表示還沒人響應本身 acked_me.clear(); classic_mons.clear(); //從store獲取持久化的epoch. init(); if (epoch % 2 == 0) //epoch是偶數代表是穩定態 bump_epoch(epoch+1); //odd == election cycle 選舉都是用的奇數epoch start_stamp = ceph_clock_now(g_ceph_context); electing_me = true; //設置爲true,前面的bump_epoch可能設置爲false了 //填寫map的key是本身的rank,代表本身先贊成了本身 acked_me[mon->rank] = CEPH_FEATURES_ALL; leader_acked = -1;//無效值,代表我沒有ack別人 //給monmap中每一個成員發送消息。能夠認爲monmap成員是預先配置的,且配置了rank for (unsigned i=0; i<mon->monmap->size(); ++i) { if ((int)i == mon->rank) continue; //消息中帶有本身的epoch Message *m = new MMonElection(MMonElection::OP_PROPOSE, epoch, mon->monmap); mon->messenger->send_message(m, mon->monmap->get_inst(i)); } reset_timer();//設置選舉用的timer,參見expire()函數 }
//defer就是暫時放棄推選本身 void Elector::defer(int who) { if (electing_me) {//放棄選本身 acked_me.clear(); classic_mons.clear(); electing_me = false; } //代表我支持了"who"當leader. leader_acked不須要持久化,由於任何一個monitor在 //reboot後都會從新發起election。 leader_acked = who; ack_stamp = ceph_clock_now(g_ceph_context); //返回OP_ACK消息,即同意對方當leader MMonElection *m = new MMonElection(MMonElection::OP_ACK, epoch, mon->monmap); m->sharing_bl = mon->get_supported_commands_bl(); mon->messenger->send_message(m, mon->monmap->get_inst(who)); // set a timer 對方在必定時間內,應該宣佈本身當選纔對 reset_timer(1.0); // give the leader some extra time to declare victory }
設置timer的工具函數github
void Elector::reset_timer(double plus) { // set the timer cancel_timer(); expire_event = new C_ElectionExpire(this); mon->timer.add_event_after(g_conf->mon_lease + plus, expire_event); }
取消timer的工具函數bootstrap
void Elector::cancel_timer() { if (expire_event) { mon->timer.cancel_event(expire_event); expire_event = 0; } }
超時處理app
void Elector::expire() { // 若是是自薦,只要超過半數贊成,就認爲成功 if (electing_me && acked_me.size() > (unsigned)(mon->monmap->size() / 2)) { //注意,expire判斷的是 > monmap->size()/2,而handle_ack裏面是等待所有ack。 // i win victory(); } else {//沒有推選本身 // whoever i deferred to didn't declare victory quickly enough. if (mon->has_ever_joined) start();//以前我加入過quorum,直接從新發動選舉。由於monmap中會包含我。 else mon->bootstrap();//不然,走bootstrap } }
void Elector::victory() { leader_acked = -1; electing_me = false; uint64_t features = CEPH_FEATURES_ALL; set<int> quorum; for (map<int, uint64_t>::iterator p = acked_me.begin(); p != acked_me.end(); ++p) {//若是是從expire()調用的victory(),則不是monmap的記錄的全部node, //可是確定是超過半數。 quorum.insert(p->first);//ack過個人,所有進入quorum。 features &= p->second; } // decide what command set we're supporting bool use_classic_commands = !classic_mons.empty(); // keep a copy to share with the monitor; we clear classic_mons in bump_epoch set<int> copy_classic_mons = classic_mons; cancel_timer(); assert(epoch % 2 == 1); // 選舉期間用的奇數epoch bump_epoch(epoch+1); // 選舉完成,epoch變成偶數 // decide my supported commands for peons to advertise const bufferlist *cmds_bl = NULL; const MonCommand *cmds; int cmdsize; if (use_classic_commands) { mon->get_classic_monitor_commands(&cmds, &cmdsize); cmds_bl = &mon->get_classic_commands_bl(); } else { mon->get_locally_supported_monitor_commands(&cmds, &cmdsize); cmds_bl = &mon->get_supported_commands_bl(); } //通知你們本身當選 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p) { if (*p == mon->rank) continue; MMonElection *m = new MMonElection(MMonElection::OP_VICTORY, epoch, mon->monmap); m->quorum = quorum; m->quorum_features = features; m->sharing_bl = *cmds_bl; mon->messenger->send_message(m, mon->monmap->get_inst(*p)); } //調用monitor的函數,它會發起paxos的propose mon->win_election(epoch, quorum, features, cmds, cmdsize, ©_classic_mons); }
根據狀況決定是否支持,或者決定該推薦本身ide
void Elector::handle_propose(MonOpRequestRef op) { MMonElection *m = static_cast<MMonElection*>(op->get_req()); dout(5) << "handle_propose from " << m->get_source() << dendl; int from = m->get_source().num();//獲取對方的rank assert(m->epoch % 2 == 1); // election uint64_t required_features = mon->get_required_features(); if ((required_features ^ m->get_connection()->get_features()) & required_features) {//要求對方的feature,覆蓋required_features nak_old_peer(op); return; } else if (m->epoch > epoch) {//對方epoch比我大,放棄選本身,追隨它。 bump_epoch(m->epoch); } else if (m->epoch < epoch) {//對方epoch過小,否決 // got an "old" propose, //發送消息的peer多是剛剛加進來的,之前不在quorum裏面。因此epoch比較小 if (epoch % 2 == 0 && // in a non-election cycle mon->quorum.count(from) == 0) { // from someone outside the quorum // a mon just started up, call a new election so they can rejoin! //爲何我要start_election? 由於其epoch太舊,不可能當選。 mon->start_election(); } else {//認爲收到了舊消息,忽略 dout(5) << " ignoring old propose" << dendl; return; } } //我比發送方的rank高。若是我沒有響應過其餘比我rank高的,就推選本身 if (mon->rank < from) { // i would win over them. if (leader_acked >= 0) { // we already acked someone assert(leader_acked < from); // and they still win, of course 不然不可能ack它 } else {//若是沒有acked過,比我優先級低的在推選本身,那麼我應該選本身才對。 // wait, i should win! if (!electing_me) { mon->start_election(); } } } else {//發送方rank比我高 // they would win over me //以前我沒有同意誰,或者以前那個優先級沒如今這個高,則同意如今這個 if (leader_acked < 0 || // haven't acked anyone yet, or leader_acked > from || // they would win over who you did ack, or leader_acked == from) { // this is the guy we're already deferring to defer(from);//這個函數內部會發送 OP_ACK,支持發送方 } else {//我以前響應過別人,堅持以前的選擇 // ignore them! dout(5) << "no, we already acked " << leader_acked << dendl; } } }
//收到別人響應後的處理 void Elector::handle_ack(MonOpRequestRef op) { op->mark_event("elector:handle_ack"); MMonElection *m = static_cast<MMonElection*>(op->get_req()); dout(5) << "handle_ack from " << m->get_source() << dendl; int from = m->get_source().num(); assert(m->epoch % 2 == 1); // election狀態,必須是奇數 //下面dout解釋了出現這個現象的緣由,即我在本身重啓,out了。 if (m->epoch > epoch) { dout(5) << "woah, that's a newer epoch, i must have rebooted. bumping and re-starting!" << dendl; bump_epoch(m->epoch);//必須用新的epoch才能引發有效選舉,不然被忽略了 start(); return; } assert(m->epoch == epoch); uint64_t required_features = mon->get_required_features(); if ((required_features ^ m->get_connection()->get_features()) & required_features) { dout(5) << " ignoring ack from mon" << from << " without required features" << dendl; return; } //若是正在推選本身 if (electing_me) { // thanks //acked_me是個map acked_me[from] = m->get_connection()->get_features(); if (!m->sharing_bl.length()) classic_mons.insert(from); dout(5) << " so far i have " << acked_me << dendl; //全部人都同意我 if (acked_me.size() == mon->monmap->size()) { // if yes, shortcut to election finish victory(); } } else {//之前我曾經推選過本身,可是如今我已經投別人了 // ignore, i'm deferring already. assert(leader_acked >= 0); } }
//收到別人宣告選舉勝利的消息後的處理 void Elector::handle_victory(MonOpRequestRef op) { op->mark_event("elector:handle_victory"); MMonElection *m = static_cast<MMonElection*>(op->get_req()); int from = m->get_source().num(); assert(from < mon->rank); assert(m->epoch % 2 == 0); leader_acked = -1; //以前我必定選舉了它,因此epoch必須match,不然有問題。 // i should have seen this election if i'm getting the victory. if (m->epoch != epoch + 1) { //在victory()中,已經加1,因此是偶數,且比peon看到的大1 dout(5) << "woah, that's a funny epoch, i must have rebooted. bumping and re-starting!" << dendl; bump_epoch(m->epoch); start(); return; } bump_epoch(m->epoch);//我也變成偶數epoch // they win mon->lose_election(epoch, m->quorum, from, m->quorum_features); // cancel my timer cancel_timer();//選舉timer沒用了 // stash leader's commands if (m->sharing_bl.length()) { MonCommand *new_cmds; int cmdsize; bufferlist::iterator bi = m->sharing_bl.begin(); MonCommand::decode_array(&new_cmds, &cmdsize, bi); mon->set_leader_supported_commands(new_cmds, cmdsize); } else { // they are a legacy monitor; use known legacy command set const MonCommand *new_cmds; int cmdsize; mon->get_classic_monitor_commands(&new_cmds, &cmdsize); mon->set_leader_supported_commands(new_cmds, cmdsize); } }
消息的dispatch,裏面有些關於monmap的處理。函數
/*monmap,是集羣當前配置的全部monitor的集合。 *monmap在bootstrp過程當中會在montior間同步,這裏沒仔細討論。 *monmap中的各個monitor,只有參與選舉投票的,纔會進入quorum。*/ void Elector::dispatch(MonOpRequestRef op) { op->mark_event("elector:dispatch"); assert(op->is_type_election()); switch (op->get_req()->get_type()) { case MSG_MON_ELECTION://elector只收election這個類別的消息 { if (!participating) { return; } if (op->get_req()->get_source().num() >= mon->monmap->size()) { dout(5) << " ignoring bogus election message with bad mon rank " << op->get_req()->get_source() << dendl; return; } MMonElection *em = static_cast<MMonElection*>(op->get_req()); // assume an old message encoding would have matched if (em->fsid != mon->monmap->fsid) { dout(0) << " ignoring election msg fsid " << em->fsid << " != " << mon->monmap->fsid << dendl; return; } //選舉是根據monmap幹活的。monmap在bootstrap階段你們已經同步了。 if (!mon->monmap->contains(em->get_source_addr())) { dout(1) << "discarding election message: " << em->get_source_addr() << " not in my monmap " << *mon->monmap << dendl; return; } MonMap *peermap = new MonMap; peermap->decode(em->monmap_bl); //比較兩者的monmap的epoch,即兩者看到的monitor配置應該相同。 if (peermap->epoch > mon->monmap->epoch) { dout(0) << em->get_source_inst() << " has newer monmap epoch " << peermap->epoch << " > my epoch " << mon->monmap->epoch << ", taking it" << dendl; mon->monmap->decode(em->monmap_bl); MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); //更新了本身的monmap,而且寫盤。實際上信任了對方的monmap。 t->put("monmap", mon->monmap->epoch, em->monmap_bl); t->put("monmap", "last_committed", mon->monmap->epoch); mon->store->apply_transaction(t); //mon->monmon()->paxos->stash_latest(mon->monmap->epoch, em->monmap_bl); cancel_timer(); mon->bootstrap();//從新作一次自舉。自舉後會從新選舉 delete peermap; return; } if (peermap->epoch < mon->monmap->epoch) { //這種狀況下,會用個人map去同步對方的。 dout(0) << em->get_source_inst() << " has older monmap epoch " << peermap->epoch << " < my epoch " << mon->monmap->epoch << dendl; } delete peermap; switch (em->op) { case MMonElection::OP_PROPOSE: handle_propose(op); return; } if (em->epoch < epoch) {//爲何又比較這個epoch? dout(5) << "old epoch, dropping" << dendl; break; } switch (em->op) { case MMonElection::OP_ACK: handle_ack(op); return; case MMonElection::OP_VICTORY: handle_victory(op); return; case MMonElection::OP_NAK: handle_nak(op); return; default: assert(0); } } break; default: assert(0); } }
//admin command處理,觸發選舉 void Elector::start_participating() { if (!participating) { participating = true; call_election(); } }