Ceph的Paxos源碼註釋 - Phase 1

歡迎關注存儲老少夥的博客。git

上篇是Leader 選舉部分。這篇是Ceph的Paxos協議的Phase1(Prepare),其目的是就PN達成一致。bootstrap

1. 幾個要點說明

1.1 Epoch

每次選舉產生新的leader,也會產生新的epoch。不選舉則不會修改epoch。
一個leader當選期間,發送的全部消息,都會帶有這個epoch。
若是因爲網絡分割等現象,有新的選舉發生,則根據epoch就發現leader已經變了。
注意,按照paxos論文描述,沒有Leader也是能夠正常運行的,只是可能下降效率。
沒有leader則不須要epoch

1.2 PN (Proposal Number)

Leader當選後,會首先執行一次phase 1過程,以肯定PN。 在其爲leader期間,
全部的phase 2操做都共用一個PN。因此省略了大量的phase 1操做,這也是
paxos可以減少網絡開銷的緣由。 "Paxos made simple"文中說:
"A newly chosen leader executes phase 1 for infinitely many
instances of the consensus algorithm"。
PN是必須的,不管是否有leader,都必須有PN

1.3 Version

能夠理解成Paxos 的instance ID,或者raft的logID。

1.4 持久化

對比Raft,雖然ceph的複製也能夠當作一個個log的追加,
可是全部信息都寫在k/v中,而不是寫log文件, 好比,instanceID爲X的log,
在k/v存儲中,其key是X,value是log內容。
其餘各類須要持久化的值,都寫在k/v存儲中。

1.5其餘須要持久化的數據結構

除了log之外,每一個paxos成員,都維護如下幾個須要持久化的變量。
你們能夠跟raft的paper作些簡單對比。網絡

名稱 含義 其餘
last_pn 上次當選leader後生成的PN get_new_proposal_number()使用,下次當選後,接着生成
accepted_pn 我接受過的PN,多是別的leader提議的PN peon根據這個值拒絕較小的PN
first_committed 本節點記錄的第一個被commit的版本 更早的版本(日誌),本節點沒有了
last_committed 本節點記錄的最後一次被commit的版本 日後的版本,未被commit,可能有一個
uncommitted_v 本節點記錄的未commit的版本,若是有,只能等於last_commit+1 ceph只容許有一個未commit的版本
uncommitted_pn 未commit的版本對應的PN 與uncommitted_v,uncommitted_value在一個事務中記錄
uncommitted_value 爲commit的版本的內容 見上面

注意,上述三個"uncommitted"開頭的值,可能壓根就不存在,好比正常關機,所有都commit了。數據結構

1.6 Phase 1交互過程簡介

Phase 1就是 paxos協議的Propose階段,包括三個步驟,以下表:app

步驟 Leader Peons 備註
1 collect() => Leader給quorum中各個peon發送PN以及其餘附帶信息
2 <=handle_collect() Peon贊成或者拒絕PN。並中間可能分享已經commit的數據
3 handle_last() Quorum中peon所有贊成leader的PN,纔算成功

2. 代碼

2.1 初始化

void Paxos::init()
{ //幾個持久化的變量,加載時即從從kv讀出。
  // load paxos variables from stable storage
  //上次產生的PN
  last_pn = get_store()->get(get_name(), "last_pn");
  //上次接受的pn
  accepted_pn = get_store()->get(get_name(), "accepted_pn");
  //最近或最後一個被commit的verion,其實是paxos 的instance ID。
  last_committed = get_store()->get(get_name(), "last_committed");
  //保存的最先被commit的版本(log)。更早的log可能已經被truncate掉了
  first_committed = get_store()->get(get_name(), "first_committed");

  //paxos的 first_committed,並非某個monitor的first_committed,各個monitor
  //對應值可能都是不同的。
  assert(is_consistent());
}

2.2 Leader發起的collect

// PHASE 1:  collect和handle_collect基本能對應paxos的phase 1
//這是leader的當選後執行函數,用於肯定新的PN。
//collect過程,至關於完成當選期間全部提議的phase 1。
//在其當選期間,會一直使用這個PN
void Paxos::collect(version_t oldpn)
{
 // we're recoverying, it seems!
  state = STATE_RECOVERING;
  assert(mon->is_leader());

  // reset the number of lasts received
  uncommitted_v = 0; //新當選,初始化
  uncommitted_pn = 0;
  uncommitted_value.clear();
  peer_first_committed.clear();
  peer_last_committed.clear();
  
  //ceph的實現中,只容許有一個proposal處於pending狀態(跟raft相同)。
  //若是新leader當選後發現有pending的提議,那麼其instanceID/version
  //只能是last_committed+1
  if (get_store()->exists(get_name(), last_committed+1)) {
    /*pending_v, pending_pn和last_committed+1是一個事務寫的。
     因此一塊兒檢查 */
    version_t v = get_store()->get(get_name(), "pending_v");  
    version_t pn = get_store()->get(get_name(), "pending_pn"); 
    if (v && pn && v == last_committed + 1){//這個是正常分支
      uncommitted_pn = pn;
    } else {
      dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << accepted_pn  << " and crossing our fingers" << dendl;
      uncommitted_pn = accepted_pn;
    }
    uncommitted_v = last_committed+1;
    //找到uncommitted_v (這個key)對應的value
    get_store()->get(get_name(), last_committed+1, uncommitted_value);
    //uncommitted_v存在,要求uncommitted_value必須存在。    
    assert(uncommitted_value.length()); 
    logger->inc(l_paxos_collect_uncommitted);
  }

  //生成一個新的更大的PN,並本身先accept
  accepted_pn = get_new_proposal_number(MAX(accepted_pn, oldpn));
  accepted_pn_from = last_committed;
  num_last = 1;//1, 表示本身已經投票了

  //給quorum中各個成員發送
  for (set<int>::const_iterator p = mon->get_quorum().begin();
       p != mon->get_quorum().end();
       ++p) {
    //跳過本身,已經算投過了並修改了accepted_pn       
    if (*p == mon->rank) continue; 

    //epoch的用意: 若是網絡分割,別人又發起了選舉,現任leader不知道,接收方會發現epoch不對    
    MMonPaxos *collect = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COLLECT,

    collect->last_committed = last_committed; //用來與peer比較的
    collect->first_committed = first_committed;
    //這個操做自己帶的PN是剛生成的。
    collect->pn = accepted_pn; 
    mon->messenger->send_message(collect, mon->monmap->get_inst(*p));
  }

  //設置超時處理
  collect_timeout_event = new C_CollectTimeout(this);
  mon->timer.add_event_after(g_conf->mon_accept_timeout, collect_timeout_event);
}

2.3 Peon處理collect請求

//Peon,能夠對應Raft的follower
void Paxos::handle_collect(MonOpRequestRef op)
{
  op->mark_paxos_event("handle_collect");

  MMonPaxos *collect = static_cast<MMonPaxos*>(op->get_req());

  assert(mon->is_peon()); // mon epoch filter should catch strays

  // we're recoverying, it seems!
  state = STATE_RECOVERING;

  //我落後的太遠,中間相差的已沒法經過log補齊,只有bootstrap(自舉)了。
  if (collect->first_committed > last_committed+1) {
    dout(5) << __func__
            << " leader's lowest version is too high for our last committed"
            << " (theirs: " << collect->first_committed
            << "; ours: " << last_committed << ") -- bootstrap!" << dendl;
    op->mark_paxos_event("need to bootstrap");
    mon->bootstrap();
    return;
  }

  // reply
  MMonPaxos *last = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LAST,
                  ceph_clock_now(g_ceph_context));
  //本地保存的兩個committed,返回給leader                  
  last->last_committed = last_committed;
  last->first_committed = first_committed;
  
  version_t previous_pn = accepted_pn;//這個是本地記錄的之前的accepted_pn

  //這個是標準的paxos PN比較,若是收到的PN大於我以前接受過的PN ,則贊成
  if (collect->pn > accepted_pn) {
    accepted_pn = collect->pn;
    accepted_pn_from = collect->pn_from;
    dout(10) << "accepting pn " << accepted_pn << " from " 
         << accepted_pn_from << dendl;
  
    MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
    //須要先持久化,而後再回復
    t->put(get_name(), "accepted_pn", accepted_pn);
    dout(30) << __func__ << " transaction dump:\n";
    JSONFormatter f(true);
    t->dump(&f);
    f.flush(*_dout);
    *_dout << dendl;

    logger->inc(l_paxos_collect);
    logger->inc(l_paxos_collect_keys, t->get_keys());
    logger->inc(l_paxos_collect_bytes, t->get_bytes());
    utime_t start = ceph_clock_now(NULL);
    get_store()->apply_transaction(t);

    utime_t end = ceph_clock_now(NULL);
    logger->tinc(l_paxos_collect_latency, end - start);
  } else {//其餘狀況,不接受
    // don't accept!
    dout(10) << "NOT accepting pn " << collect->pn << " from " << collect->pn_from
         << ", we already accepted " << accepted_pn
         << " from " << accepted_pn_from << dendl;
  }
  //若是collect->pn(對方發過來的pn)小於個人PN,那麼這個回覆,就是拒絕。
  last->pn = accepted_pn;
  last->pn_from = accepted_pn_from;

  // share whatever committed values we have
  /*已經committed的數據都是能夠信任的,若是對方的last_committed比個人小,
    那麼我把我知道的已經commit的都分享作同步。share_state時,
    對方的處理函數是store_stat() 。完成後,對方也會修改了last_committed*/
  share_state(last, collect->first_committed, collect->last_committed);

  // do we have an accepted but uncommitted value?
  //  (it'll be at last_committed+1)
  bufferlist bl;
  if (collect->last_committed <= last_committed &&
      get_store()->exists(get_name(), last_committed+1)) {
      //前面提過,last_committed+1這個版本若是存在,那是一個未決的提議,
      //須要告訴leader。
    get_store()->get(get_name(), last_committed+1, bl);
    assert(bl.length() > 0);
    dout(10) << " sharing our accepted but uncommitted value for " 
         << last_committed+1 << " (" << bl.length() << " bytes)" << dendl;
    last->values[last_committed+1] = bl;

    version_t v = get_store()->get(get_name(), "pending_v");
    version_t pn = get_store()->get(get_name(), "pending_pn");
    
    if (v && pn && v == last_committed + 1) {
      /*若是有pending_pn,那麼返回的uncommitted_pn就是 pending_pn,
      不然就在下面直接用previous_pn代替了*/
      last->uncommitted_pn = pn; 
    } else {
      // previously we didn't record which pn a value was accepted
      // under!  use the pn value we just had...  :(
      dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << previous_pn
           << " and crossing our fingers" << dendl;
      last->uncommitted_pn = previous_pn;
    }

    logger->inc(l_paxos_collect_uncommitted);
  }

  //reply多是拒絕,若是個人pn比leader給的大
  collect->get_connection()->send_message(last);
}

2.4 分享已經commit的數據的兩個函數

/**對方的處理函數是: store_state。share的是兩者last_committed之間的各個版本對應的value。
 * @note This is Okay. We share our versions between peer_last_committed and
 *     our last_committed (inclusive), and add their bufferlists to the
 *     message. It will be the peer's job to apply them to its store, as
 *     these bufferlists will contain raw transactions.
 *     This function is called by both the Peon and the Leader. The Peon will
 *     share the state with the Leader during handle_collect(), sharing any
 *     values the leader may be missing (i.e., the leader's last_committed is
 *     lower than the peon's last_committed). The Leader will share the state
 *     with the Peon during handle_last(), if the peon's last_committed is
 *     lower than the leader's last_committed.
 */
void Paxos::share_state(MMonPaxos *m, version_t peer_first_committed,
            version_t peer_last_committed)
{
  assert(peer_last_committed < last_committed);

  dout(10) << "share_state peer has fc " << peer_first_committed 
       << " lc " << peer_last_committed << dendl;
  version_t v = peer_last_committed + 1;

  // include incrementals
  uint64_t bytes = 0;
  for ( ; v <= last_committed; v++) {
    /*注意這裏面並無進行消息傳遞,只是把兩個版本之間的內容給打包
      進了msg,隨着msg的其餘內容一塊兒發送*/
    if (get_store()->exists(get_name(), v)) {
      get_store()->get(get_name(), v, m->values[v]);
      assert(m->values[v].length());
      dout(10) << " sharing " << v << " ("
           << m->values[v].length() << " bytes)" << dendl;
      bytes += m->values[v].length() + 16;  // paxos_ + 10 digits = 16
    }
  }
  logger->inc(l_paxos_share_state);
  logger->inc(l_paxos_share_state_keys, m->values.size());
  logger->inc(l_paxos_share_state_bytes, bytes);

  m->last_committed = last_committed;
}

/**
 * Store on disk a state that was shared with us
 *
 * Basically, we received a set of version. Or just one. It doesn't matter.
 * What matters is that we have to stash it in the store. So, we will simply
 * write every single bufferlist into their own versions on our side (i.e.,
 * onto paxos-related keys), and then we will decode those same bufferlists
 * we just wrote and apply the transactions they hold. We will also update
 * our first and last committed values to point to the new values, if need
 * be. All all this is done tightly wrapped in a transaction to ensure we
 * enjoy the atomicity guarantees given by our awesome k/v store.
 */
bool Paxos::store_state(MMonPaxos *m)
{
  MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
  map<version_t,bufferlist>::iterator start = m->values.begin();
  bool changed = false;

  // build map of values to store
  // we want to write the range [last_committed, m->last_committed] only. 
  //對方狀態比我快太多,無法根據收到的值去catchup
  if (start != m->values.end() &&
      start->first > last_committed + 1) {
    // ignore everything if values start in the future.
    dout(10) << "store_state ignoring all values, they start at " << start->first
         << " > last_committed+1" << dendl;
    start = m->values.end();
  }

  // push forward the start position on the message's values iterator, up until
  // we run out of positions or we find a position matching 'last_committed'.
  while (start != m->values.end() && start->first <= last_committed) {
  //移到個人last_committed開始
    ++start;
  }

  // make sure we get the right interval of values to apply by pushing forward
  // the 'end' iterator until it matches the message's 'last_committed'.
  map<version_t,bufferlist>::iterator end = start;
  while (end != m->values.end() && end->first <= m->last_committed) {
    last_committed = end->first;//內存中先修改
    ++end;
  }

  if (start == end) {
    dout(10) << "store_state nothing to commit" << dendl;
  } else {
    dout(10) << "store_state [" << start->first << ".." 
         << last_committed << "]" << dendl;
    //用一個事務,寫入全部變化,包括last_committed和各個version
    t->put(get_name(), "last_committed", last_committed);

    // we should apply the state here -- decode every single bufferlist in the
    // map and append the transactions to 't'.
    map<version_t,bufferlist>::iterator it;
    for (it = start; it != end; ++it) {
      // write the bufferlist as the version's value
      //要store的version和相應value,先推入t
      t->put(get_name(), it->first, it->second);
      // decode the bufferlist and append it to the transaction we will shortly
      // apply.
      decode_append_transaction(t, it->second);
    }

    // discard obsolete uncommitted value?
    if (uncommitted_v && uncommitted_v <= last_committed) {
      dout(10) << " forgetting obsolete uncommitted value " << uncommitted_v
           << " pn " << uncommitted_pn << dendl;
      uncommitted_v = 0;
      uncommitted_pn = 0;
      uncommitted_value.clear();
    }
  }
  if (!t->empty()) {//t非空,說明有值要寫

    logger->inc(l_paxos_store_state);
    logger->inc(l_paxos_store_state_bytes, t->get_bytes());
    logger->inc(l_paxos_store_state_keys, t->get_keys());
    utime_t start = ceph_clock_now(NULL);

    /*事務提交,包括last_committed和一些version及values。
      這個函數實際上會等待事務完成。*/
    get_store()->apply_transaction(t); 

    utime_t end = ceph_clock_now(NULL);
    logger->tinc(l_paxos_store_state_latency, end - start);

    //first_committed可能在事務執行過程當中trim被修改了(log被trim了),刷新下
    first_committed = get_store()->get(get_name(), "first_committed");

    _sanity_check_store();
    changed = true;//說明有修改的值
  }

  remove_legacy_versions();//erase掉比first_committed更早的

  return changed;
}

2.5 Leader處理Peon的回覆

/*Leader收到回覆後的處理。
  在 Ceph的election過程當中,用預設的rank做爲優先級。
  當選的leader不必定持有最新的數據,所以collection過程當中,
  Leader須要更新下本身的數據。這些更新,都是根據已經"commit"的數據。*/
void Paxos::handle_last(MonOpRequestRef op)
{
  op->mark_paxos_event("handle_last");
  MMonPaxos *last = static_cast<MMonPaxos*>(op->get_req());
  bool need_refresh = false;
  
  //from是對方的編號
  int from = last->get_source().num();

  dout(10) << "handle_last " << *last << dendl;

  if (!mon->is_leader()) {
    dout(10) << "not leader, dropping" << dendl;
    return;
  }

  // note peer's first_ and last_committed, in case we learn a new
  // commit and need to push it to them.
  
  //本次返回的結果,插入map。
  peer_first_committed[from] = last->first_committed;
  peer_last_committed[from] = last->last_committed;
  
  //跟peer相比,本身落後不少,以致於別人也沒有保留當時的各個版本的raw transaction信息。
  //只有直接走bootstrap流程,作徹底同步。
  if (last->first_committed > last_committed + 1) {
    dout(5) << __func__
            << " mon." << from
        << " lowest version is too high for our last committed"
            << " (theirs: " << last->first_committed
            << "; ours: " << last_committed << ") -- bootstrap!" << dendl;
    op->mark_paxos_event("need to bootstrap");
    mon->bootstrap();
    return;
  }

  assert(g_conf->paxos_kill_at != 1);
  
  /*對應handle_collect 內部的share_state,對方可能給我共享了
    一部分更新的已commit數據(leader的狀態比較舊)*/
  need_refresh = store_state(last);

  assert(g_conf->paxos_kill_at != 2);
  
  //store_state()會改變leader的last_committed和first_committed。
  //而後就可能發現某個peon也須要被更新
  for (map<int,version_t>::iterator p = peer_last_committed.begin();
       p != peer_last_committed.end();
       ++p) {
    if (p->second + 1 < first_committed && first_committed > 1) {
    //對方版本太舊,無法同步了。
      dout(5) << __func__
          << " peon " << p->first
          << " last_committed (" << p->second
          << ") is too low for our first_committed (" << first_committed
          << ") -- bootstrap!" << dendl;
      op->mark_paxos_event("need to bootstrap");
      mon->bootstrap();
      return;
    }
    
    //對方比我舊,可是還在可同步範圍。
    if (p->second < last_committed) {
      // share committed values
      dout(10) << " sending commit to mon." << p->first << dendl;
      MMonPaxos *commit = new MMonPaxos(mon->get_epoch(),
      ceph_clock_now(g_ceph_context));
      //構造一條commit消息,給peon分享已經commit的數據
      share_state(commit, peer_first_committed[p->first], p->second);
      mon->messenger->send_message(commit, mon->monmap->get_inst(p->first));
    }
  }

  //Peon接受過的PN比Leader生成的PN大,按照paxos協議,提升PN,重試!
  if (last->pn > accepted_pn) {
    // no, try again.
    dout(10) << " they had a higher pn than us, picking a new one." << dendl; 

    // cancel timeout event
    mon->timer.cancel_event(collect_timeout_event);
    collect_timeout_event = 0;
    //注意,此次用新的PN繼續collect。但不是從新選舉。
    collect(last->pn);
  } else if (last->pn == accepted_pn) {//對方接受了
    // yes, they accepted our pn.  great.
    num_last++;

    // did this person send back an accepted but uncommitted value?
    if (last->uncommitted_pn) {
    //last_commited對應的確定在此前達成quorum一致的,
    //而uncommitted則是認爲沒有造成quorum一致的,須要處理。
    //保證有大的uncommitted_pn,才符合paxos只直接受更大PN原則
      if (last->uncommitted_pn >= uncommitted_pn && 
      last->last_committed >= last_committed &&
      last->last_committed + 1 >= uncommitted_v) {
      //這個比較,是由於Leader會收到多個peon的uncommitted_v,取大的。
    uncommitted_v = last->last_committed+1; 
    // uncommitted_v 會一直朝大的變化
    uncommitted_pn = last->uncommitted_pn;
    uncommitted_value = last->values[uncommitted_v];
    dout(10) << "we learned an uncommitted value for " << uncommitted_v
         << " pn " << uncommitted_pn
         << " " << uncommitted_value.length() << " bytes"
         << dendl;
      } else {
    dout(10) << "ignoring uncommitted value for " << (last->last_committed+1)
         << " pn " << last->uncommitted_pn
         << " " << last->values[last->last_committed+1].length() << " bytes"
         << dendl;
      }
    }
    
    // is that everyone?
    if (num_last == mon->get_quorum().size()) {
      //這裏要求quorum成員全體都響應
      // cancel timeout event
      mon->timer.cancel_event(collect_timeout_event);
      collect_timeout_event = 0;
      peer_first_committed.clear();
      peer_last_committed.clear();

      // almost...

      // did we learn an old value?
      if (uncommitted_v == last_committed+1 &&  //只容許差1
      //消息中攜帶的,上面剛剛賦了值
    dout(10) << "that's everyone.  begin on old learned value" << dendl;
    //選舉結束,可是發現選舉前,有未commit的value。
    //以前的value不必定造成了多數派,因此要從新走一次accept過程。
    state = STATE_UPDATING_PREVIOUS;
    
    //這個value可能只造成了少數派,不能直接commit。
    //而是用原來的PN最大的value,使用新的PN,從新走一次phase 2。
    begin(uncommitted_value);
      } else {//這個分支,實際上存在少數派宕機重啓的不肯定性問題 
    // active!
    dout(10) << "that's everyone.  active!" << dendl;
    extend_lease();

    need_refresh = false;
    if (do_refresh()) {
      finish_round();
    }
      }
    }
  } else {
    // no, this is an old message, discard
    dout(10) << "old pn, ignoring" << dendl;
  }

  if (need_refresh)
    (void)do_refresh();
}

2.6 超時處理函數

/*collect的超時處理:直接調用bootstrap。同步monitor信息,並從新選舉leader*/
void Paxos::collect_timeout()
{
  dout(1) << "collect timeout, calling fresh election" << dendl;
  collect_timeout_event = 0;
  logger->inc(l_paxos_collect_timeout);
  assert(mon->is_leader());
  mon->bootstrap();
}
相關文章
相關標籤/搜索