Ceph的Paxos源碼註釋之 Election

Ceph的Paxos實現基本介紹,參見Github的這篇博客
本博客分爲幾個部分,先從簡單的開始。這是第一部分,Paxos 的Leader Election。html

Leader Election基本設計

  • 按照rank表示優先級解決衝突問題,爲每一個monitor預先分配了一個rank
  • 只會接受優先級(rank)比本身高、epoch比上次已接受的epoch大的選舉請求
  • 當選的leader,不必定有最新的數據。因此在phase 1中,會根據已經commit的數據,進行leader和peon之間的同步
  • 用奇數的epoch表示選舉狀態,偶數表示穩定狀態
  • 一旦選舉成功,會造成一個quorum,在該leader當選期間,全部提議,必須quorum中所有成員贊成

Leader Election主要過程和函數

  • Elector::init()
    初始化處理,從kv讀取以前持久化的信息
  • Elector::shutdown()
    退出處理
  • Elector::start()
    推選本身(自薦)
  • Elector::defer()node

    接受別人選舉,延遲推選本身
  • Elector::handle_propose()git

    處理別人的自薦消息
  • Elector::handle_ack()
    處理別人的ack
  • Elector::victory()
    宣佈本身當選
  • Elector::handle_victory()
    處理別人當選消息
  • Elector::expire()
    選舉超時處理
  • Elector::reset_timer()
    設置選舉timer
  • Elector::cancel_timer()
    取消timer
  • Elector::dispatch()
    選舉消息的分發函數

代碼

初始化

/*Ceph的monitor使用leveldb做爲持久化存儲,下面的mon->store
就是leveldb操做的封裝,因爲不少數據共用同一leveldb,因此對
key的空間作了分級,Monitor::MONITOR_NAME能夠認爲是第一級key*/
void Elector::init()
{
  //選舉的epoch,遞增分配。每次修改都作了持久化,這是從kv db讀取。
  epoch = mon->store->get(Monitor::MONITOR_NAME, "election_epoch"); 
  if (!epoch)//首次使用
    epoch = 1;
}

bump_epoch

/*將本身的epoch修改成參數 e,須要持久化到 kv 存儲。*/
void Elector::bump_epoch(epoch_t e) 
{
  dout(10) << "bump_epoch " << epoch << " to " << e << dendl;
  assert(epoch <= e);
  epoch = e;
  //使用一個事務,寫kv存儲
  MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
  t->put(Monitor::MONITOR_NAME, "election_epoch", epoch);
  mon->store->apply_transaction(t);//持久化收到的epoch。
  
  //join election會使 monitor 進入 STATE_ELECTING 狀態
  mon->join_election();
  
  // clear up some state
  electing_me = false;  //由於別人的epoch比本身大,放棄選本身
  acked_me.clear(); //這個acked_m,是選本身才有意義,在此清空。
  classic_mons.clear();
}

推選本身爲leader的函數(自薦)

//在啓動後或者leader超時等場合,會發起自薦
void Elector::start()//推選本身
{
  if (!participating) {
    return;
  }

  //清空,表示還沒人響應本身
  acked_me.clear();
  classic_mons.clear();
  //從store獲取持久化的epoch.
  init();

  if (epoch % 2 == 0) //epoch是偶數代表是穩定態
    bump_epoch(epoch+1);  //odd == election cycle 選舉都是用的奇數epoch
  start_stamp = ceph_clock_now(g_ceph_context);
  electing_me = true; //設置爲true,前面的bump_epoch可能設置爲false了
  //填寫map的key是本身的rank,代表本身先贊成了本身
  acked_me[mon->rank] = CEPH_FEATURES_ALL; 
  
  leader_acked = -1;//無效值,代表我沒有ack別人

  //給monmap中每一個成員發送消息。能夠認爲monmap成員是預先配置的,且配置了rank
  for (unsigned i=0; i<mon->monmap->size(); ++i) { 
    if ((int)i == mon->rank) continue;
    //消息中帶有本身的epoch
    Message *m = new MMonElection(MMonElection::OP_PROPOSE, epoch, mon->monmap);
    mon->messenger->send_message(m, mon->monmap->get_inst(i));
  }

  reset_timer();//設置選舉用的timer,參見expire()函數
}

響應別人自薦

//defer就是暫時放棄推選本身
void Elector::defer(int who) 
{
  if (electing_me) {//放棄選本身
    acked_me.clear();
    classic_mons.clear();
    electing_me = false;
  }

  //代表我支持了"who"當leader. leader_acked不須要持久化,由於任何一個monitor在 
  //reboot後都會從新發起election。
  leader_acked = who; 
  ack_stamp = ceph_clock_now(g_ceph_context);
  //返回OP_ACK消息,即同意對方當leader
  MMonElection *m = new MMonElection(MMonElection::OP_ACK, epoch, mon->monmap);
  m->sharing_bl = mon->get_supported_commands_bl();
  mon->messenger->send_message(m, mon->monmap->get_inst(who));
  // set a timer  對方在必定時間內,應該宣佈本身當選纔對
  reset_timer(1.0);  // give the leader some extra time to declare victory
}

timer相關工具函數

設置timer的工具函數github

void Elector::reset_timer(double plus)
{
  // set the timer
  cancel_timer();
  expire_event = new C_ElectionExpire(this);
  mon->timer.add_event_after(g_conf->mon_lease + plus,
                 expire_event);
}

取消timer的工具函數bootstrap

void Elector::cancel_timer()
{
  if (expire_event) {
    mon->timer.cancel_event(expire_event);
    expire_event = 0;
  }
}

超時處理app

void Elector::expire()
{
  // 若是是自薦,只要超過半數贊成,就認爲成功
  if (electing_me &&
      acked_me.size() > (unsigned)(mon->monmap->size() / 2)) {
    //注意,expire判斷的是 > monmap->size()/2,而handle_ack裏面是等待所有ack。
    // i win
    victory();
  } else {//沒有推選本身
    // whoever i deferred to didn't declare victory quickly enough.
    if (mon->has_ever_joined)
      start();//以前我加入過quorum,直接從新發動選舉。由於monmap中會包含我。
    else
      mon->bootstrap();//不然,走bootstrap
  }
}

成功當選leader處理

void Elector::victory()
{
  leader_acked = -1;
  electing_me = false;

  uint64_t features = CEPH_FEATURES_ALL;
  set<int> quorum;
  for (map<int, uint64_t>::iterator p = acked_me.begin(); p != acked_me.end();
       ++p) {//若是是從expire()調用的victory(),則不是monmap的記錄的全部node, 
       //可是確定是超過半數。
    quorum.insert(p->first);//ack過個人,所有進入quorum。
    features &= p->second;
  }

  // decide what command set we're supporting
  bool use_classic_commands = !classic_mons.empty();
  // keep a copy to share with the monitor; we clear classic_mons in bump_epoch
  set<int> copy_classic_mons = classic_mons;

  cancel_timer();

  assert(epoch % 2 == 1);  // 選舉期間用的奇數epoch
  bump_epoch(epoch+1);     // 選舉完成,epoch變成偶數

  // decide my supported commands for peons to advertise
  const bufferlist *cmds_bl = NULL;
  const MonCommand *cmds;
  int cmdsize;
  if (use_classic_commands) {
    mon->get_classic_monitor_commands(&cmds, &cmdsize);
    cmds_bl = &mon->get_classic_commands_bl();
  } else {
    mon->get_locally_supported_monitor_commands(&cmds, &cmdsize);
    cmds_bl = &mon->get_supported_commands_bl();
  }

  //通知你們本身當選
  for (set<int>::iterator p = quorum.begin();
       p != quorum.end();
       ++p) {
    if (*p == mon->rank) continue;
    MMonElection *m = new MMonElection(MMonElection::OP_VICTORY, epoch, mon->monmap);
    m->quorum = quorum;
    m->quorum_features = features;
    m->sharing_bl = *cmds_bl;
    mon->messenger->send_message(m, mon->monmap->get_inst(*p));
  }

  //調用monitor的函數,它會發起paxos的propose 
  mon->win_election(epoch, quorum, features, cmds, cmdsize, &copy_classic_mons);
}

處理別人的自薦消息

根據狀況決定是否支持,或者決定該推薦本身ide

void Elector::handle_propose(MonOpRequestRef op)
{
  MMonElection *m = static_cast<MMonElection*>(op->get_req());
  dout(5) << "handle_propose from " << m->get_source() << dendl;
  int from = m->get_source().num();//獲取對方的rank

  assert(m->epoch % 2 == 1); // election
  uint64_t required_features = mon->get_required_features();

  if ((required_features ^ m->get_connection()->get_features()) &
      required_features) {//要求對方的feature,覆蓋required_features
    nak_old_peer(op);
    return;
  } else if (m->epoch > epoch) {//對方epoch比我大,放棄選本身,追隨它。
    bump_epoch(m->epoch);
  } else if (m->epoch < epoch) {//對方epoch過小,否決
    // got an "old" propose, 
    //發送消息的peer多是剛剛加進來的,之前不在quorum裏面。因此epoch比較小
    if (epoch % 2 == 0 &&    // in a non-election cycle
    mon->quorum.count(from) == 0) {  // from someone outside the quorum
      // a mon just started up, call a new election so they can rejoin!   
      //爲何我要start_election? 由於其epoch太舊,不可能當選。
      mon->start_election(); 
    } else {//認爲收到了舊消息,忽略
      dout(5) << " ignoring old propose" << dendl;
      return;
    }
  }
  //我比發送方的rank高。若是我沒有響應過其餘比我rank高的,就推選本身
  if (mon->rank < from) {
    // i would win over them.
    if (leader_acked >= 0) {        // we already acked someone
      assert(leader_acked < from);  // and they still win, of course 不然不可能ack它
    } else {//若是沒有acked過,比我優先級低的在推選本身,那麼我應該選本身才對。
      // wait, i should win!
      if (!electing_me) {
    mon->start_election();
      }
    }
  } else {//發送方rank比我高
    // they would win over me  
    //以前我沒有同意誰,或者以前那個優先級沒如今這個高,則同意如今這個
    if (leader_acked < 0 ||      // haven't acked anyone yet, or
    leader_acked > from ||   // they would win over who you did ack, or
    leader_acked == from) {  // this is the guy we're already deferring to
      defer(from);//這個函數內部會發送 OP_ACK,支持發送方
    } else {//我以前響應過別人,堅持以前的選擇
      // ignore them!
      dout(5) << "no, we already acked " << leader_acked << dendl;
    }
  }
}

收到 ack以後的處理

//收到別人響應後的處理
void Elector::handle_ack(MonOpRequestRef op)
{
  op->mark_event("elector:handle_ack");
  MMonElection *m = static_cast<MMonElection*>(op->get_req());
  dout(5) << "handle_ack from " << m->get_source() << dendl;
  int from = m->get_source().num();

  assert(m->epoch % 2 == 1); // election狀態,必須是奇數
  //下面dout解釋了出現這個現象的緣由,即我在本身重啓,out了。
  if (m->epoch > epoch) {
    dout(5) << "woah, that's a newer epoch, i must have rebooted.
      bumping and re-starting!" << dendl;
    bump_epoch(m->epoch);//必須用新的epoch才能引發有效選舉,不然被忽略了
    start();
    return;
  }
  assert(m->epoch == epoch);
  uint64_t required_features = mon->get_required_features();
  if ((required_features ^ m->get_connection()->get_features()) &
      required_features) {
    dout(5) << " ignoring ack from mon" << from
        << " without required features" << dendl;
    return;
  }
  //若是正在推選本身
  if (electing_me) {
    // thanks
    //acked_me是個map
    acked_me[from] = m->get_connection()->get_features(); 
    if (!m->sharing_bl.length())
      classic_mons.insert(from);
    dout(5) << " so far i have " << acked_me << dendl;

    //全部人都同意我
    if (acked_me.size() == mon->monmap->size()) {
       // if yes, shortcut to election finish
      victory();
    }
  } else {//之前我曾經推選過本身,可是如今我已經投別人了
    // ignore, i'm deferring already.
    assert(leader_acked >= 0);
  }
}

在別人當選後的處理

//收到別人宣告選舉勝利的消息後的處理
void Elector::handle_victory(MonOpRequestRef op)
{
  op->mark_event("elector:handle_victory");
  MMonElection *m = static_cast<MMonElection*>(op->get_req());
  int from = m->get_source().num();

  assert(from < mon->rank);
  assert(m->epoch % 2 == 0);

  leader_acked = -1;
  //以前我必定選舉了它,因此epoch必須match,不然有問題。
  // i should have seen this election if i'm getting the victory.
  if (m->epoch != epoch + 1) { 
  //在victory()中,已經加1,因此是偶數,且比peon看到的大1
    dout(5) << "woah, that's a funny epoch, i must have rebooted. 
    bumping and re-starting!" << dendl;
    bump_epoch(m->epoch);
    start();
    return;
  }

  bump_epoch(m->epoch);//我也變成偶數epoch

  // they win
  mon->lose_election(epoch, m->quorum, from, m->quorum_features);

  // cancel my timer
  cancel_timer();//選舉timer沒用了

  // stash leader's commands
  if (m->sharing_bl.length()) {
    MonCommand *new_cmds;
    int cmdsize;
    bufferlist::iterator bi = m->sharing_bl.begin();
    MonCommand::decode_array(&new_cmds, &cmdsize, bi);
    mon->set_leader_supported_commands(new_cmds, cmdsize);
  } else { // they are a legacy monitor; use known legacy command set
    const MonCommand *new_cmds;
    int cmdsize;
    mon->get_classic_monitor_commands(&new_cmds, &cmdsize);
    mon->set_leader_supported_commands(new_cmds, cmdsize);
  }
}

消息分發函數

消息的dispatch,裏面有些關於monmap的處理。函數

/*monmap,是集羣當前配置的全部monitor的集合。
 *monmap在bootstrp過程當中會在montior間同步,這裏沒仔細討論。
 *monmap中的各個monitor,只有參與選舉投票的,纔會進入quorum。*/
void Elector::dispatch(MonOpRequestRef op)
{
  op->mark_event("elector:dispatch");
  assert(op->is_type_election());

  switch (op->get_req()->get_type()) {

  case MSG_MON_ELECTION://elector只收election這個類別的消息
    {
      if (!participating) {
        return;
      }
      if (op->get_req()->get_source().num() >= mon->monmap->size()) {
    dout(5) << " ignoring bogus election message with bad mon rank "
        << op->get_req()->get_source() << dendl;
    return;
      }

      MMonElection *em = static_cast<MMonElection*>(op->get_req());

      // assume an old message encoding would have matched
      if (em->fsid != mon->monmap->fsid) {
    dout(0) << " ignoring election msg fsid "
        << em->fsid << " != " << mon->monmap->fsid << dendl;
    return;
      }
    //選舉是根據monmap幹活的。monmap在bootstrap階段你們已經同步了。
      if (!mon->monmap->contains(em->get_source_addr())) {
    dout(1) << "discarding election message: " << em->get_source_addr()
        << " not in my monmap " << *mon->monmap << dendl;
    return;
      }

      MonMap *peermap = new MonMap;
      peermap->decode(em->monmap_bl);
      //比較兩者的monmap的epoch,即兩者看到的monitor配置應該相同。
      if (peermap->epoch > mon->monmap->epoch) {
    dout(0) << em->get_source_inst() << " has newer monmap epoch " << peermap->epoch
        << " > my epoch " << mon->monmap->epoch
        << ", taking it"
        << dendl;
    mon->monmap->decode(em->monmap_bl);
        MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
        //更新了本身的monmap,而且寫盤。實際上信任了對方的monmap。
        t->put("monmap", mon->monmap->epoch, em->monmap_bl);
        t->put("monmap", "last_committed", mon->monmap->epoch);
        mon->store->apply_transaction(t);
    //mon->monmon()->paxos->stash_latest(mon->monmap->epoch, em->monmap_bl);
    cancel_timer();
    mon->bootstrap();//從新作一次自舉。自舉後會從新選舉
    delete peermap;
    return;
      }
      if (peermap->epoch < mon->monmap->epoch) { 
      //這種狀況下,會用個人map去同步對方的。
    dout(0) << em->get_source_inst() << " has older monmap epoch " << peermap->epoch
        << " < my epoch " << mon->monmap->epoch
        << dendl;
      }
      delete peermap;
      switch (em->op) {
      case MMonElection::OP_PROPOSE:
    handle_propose(op);
    return;
      }

      if (em->epoch < epoch) {//爲何又比較這個epoch?
    dout(5) << "old epoch, dropping" << dendl;
    break;
      }

      switch (em->op) {
      case MMonElection::OP_ACK:
    handle_ack(op);
    return;
      case MMonElection::OP_VICTORY:
    handle_victory(op);
    return;
      case MMonElection::OP_NAK:
    handle_nak(op);
    return;
      default:
    assert(0);
      }
    }
    break;

  default:
    assert(0);
  }
}
//admin command處理,觸發選舉
void Elector::start_participating()
{
  if (!participating) {
    participating = true;
    call_election();
  }
}
相關文章
相關標籤/搜索