Ceph的Paxos源碼註釋 - Phase 2

歡迎關注存儲老少夥的博客。bootstrap

上篇是Phase 1,即leader當選後肯定PN的部分。這篇主要是Phase 2,即正常工做過程當中的Propose、accept和commit過程。網絡

Ceph的paxos實現,不算很精妙,近期修改也不大活躍。可是對於咱們理解paxos協議仍然有幫助。app

1. 幾個要點說明

1.1 Phase 2的交互過程

Leader Peons 說明
begin()=> Leader給quorum中各個成員發送提議,包含PN、version和內容
<= handle_begin() Peon處理提議,有可能會拒絕
handle_accept() 只有quorum中全部成員都贊成,纔算成功
commit_start() handle_accept在收到全部ack後調用,用事務寫commit記錄,並設置回調函數
commit_finish()=> 上一步的回調函數,在實際事務完成時執行
handle_commit() Peon根據leader的commit消息同步狀態

從begin()到commit_finish()稱爲一輪,即一次提議的完成過程。異步

1.2 串行化提議

Ceph的paxos實現,每次只容許一個提議在執行中,即上面提到
的一輪完成才能執行下一輪。在這個過程當中,會有屢次寫盤操做。
這個過程實際上比較慢。對於ceph自身來講,osd等的狀態變動,
不頻繁,無需極高的paxos性能。 可是若是用於作用於分佈式數據
庫等系統的日誌,這種方式則有不足。分佈式

2. 代碼

2.1 Leader的提議

//這是Phase 2的開始,leader正常工做時,全部提議都是從這個函數開始
//bufferlist是內容的打包,paxos層不須要知道具體語義
//leader
void Paxos::begin(bufferlist& v)
{
  assert(mon->is_leader());
  assert(is_updating() || is_updating_previous()); 
  //STATE_UPDATING_PREVIOUS對應剛當選後,發現有uncommited value,而且是
  //下一個版本(last_committed+1)

  // we must already have a majority for this to work.
  assert(mon->get_quorum().size() == 1 ||
     num_last > (unsigned)mon->monmap->size()/2);
  
  // and no value, yet.
  assert(new_value.length() == 0);

  //清空"已accept的成員"列表
  accepted.clear(); 
  accepted.insert(mon->rank);//表示 本身先accept
  new_value = v;

  if (last_committed == 0) {
    //paxos從未進行過提交。將"first_committed"的初始化,也打包到一個事務中
    // initial base case; set first_committed too
    t->put(get_name(), "first_committed", 1);
    decode_append_transaction(t, new_value);

    bufferlist tx_bl;
    t->encode(tx_bl);
    new_value = tx_bl; 
  }

  // store the proposed value in the store. IF it is accepted, we will then
  // have to decode it into a transaction and apply it.
  MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);

  //下這幾個k/v,在同一個事務中寫下去。
  //實際上,咱們只會建議last_committed+1,所以不會有多個pending的版本。
  t->put(get_name(), last_committed+1, new_value);
  //pending_v就是上面的version
  t->put(get_name(), "pending_v", last_committed + 1);
  //配套的PN,這些都記爲pending.
  t->put(get_name(), "pending_pn", accepted_pn);

  logger->inc(l_paxos_begin);
  logger->inc(l_paxos_begin_keys, t->get_keys());
  logger->inc(l_paxos_begin_bytes, t->get_bytes());
  utime_t start = ceph_clock_now(NULL);
  
  //保存前面put的三個 k/v
  get_store()->apply_transaction(t);

  utime_t end = ceph_clock_now(NULL);
  logger->tinc(l_paxos_begin_latency, end - start);

  assert(g_conf->paxos_kill_at != 3);

  //quorum size是1,這是all in one 配置纔有的場景
  if (mon->get_quorum().size() == 1) {
    // we're alone, take it easy
    commit_start();
    return;
  }


  //給quorum中各個成員發提議
  for (set<int>::const_iterator p = mon->get_quorum().begin();
       p != mon->get_quorum().end();
       ++p) {
    if (*p == mon->rank) continue;
    
    MMonPaxos *begin = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_BEGIN,
                     ceph_clock_now(g_ceph_context));
    //消息包含下面三個值
    begin->values[last_committed+1] = new_value;
    begin->last_committed = last_committed;//這個直接發送過去了。
    begin->pn = accepted_pn;
    
    mon->messenger->send_message(begin, mon->monmap->get_inst(*p));
  }

  // set timeout event
  accept_timeout_event = new C_AcceptTimeout(this);
  mon->timer.add_event_after(g_conf->mon_accept_timeout, accept_timeout_event);
}

2.2 Peon處理提議

//Phase 2, Peon收到提議後的處理
void Paxos::handle_begin(MonOpRequestRef op)
{
  op->mark_paxos_event("handle_begin");
  MMonPaxos *begin = static_cast<MMonPaxos*>(op->get_req());

  //比較PN,肯定是否應該accept。這個是標準paxos協議作法
  if (begin->pn < accepted_pn) {
    /*可能已經有人發起了新的選舉,新的leader作了collect修改了PN。
      好比以前的leader網絡故障,而後恢復了,它仍然按照舊狀態在運行*/
    dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << dendl;
    op->mark_paxos_event("have higher pn, ignore");
    return;
  }
  //實際上保證leader仍是那個leader,即朝代沒改。
  assert(begin->pn == accepted_pn);
  assert(begin->last_committed == last_committed);//由於leader沒有變化,
  //請求一個一個處理,因此老是維持一致的last_committed,雙方知道下一個版本是啥
  
  assert(g_conf->paxos_kill_at != 4);

  logger->inc(l_paxos_begin);

  // set state.
  state = STATE_UPDATING; //不一樣狀態下能作不一樣的事情。
  lease_expire = utime_t();  // cancel lease

  // yes.
  version_t v = last_committed+1; //設置version,這時不會有uncommitted吧? 應該是同步完成了。
  dout(10) << "accepting value for " << v << " pn " << accepted_pn << dendl;
  // store the accepted value onto our store. We will have to decode it and
  // apply its transaction once we receive permission to commit.
  MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
  //如下幾個k/v,在一個事務中寫入 
  t->put(get_name(), v, begin->values[v]);
  // note which pn this pending value is for.
  t->put(get_name(), "pending_v", v);//下面會apply,可是這些都是pending的
  t->put(get_name(), "pending_pn", accepted_pn);//不用寫pending_version,
  //由於有last_committed已經持久化了,這個含義明確。

  logger->inc(l_paxos_begin_bytes, t->get_bytes());
  utime_t start = ceph_clock_now(NULL);

  get_store()->apply_transaction(t);

  utime_t end = ceph_clock_now(NULL);
  logger->tinc(l_paxos_begin_latency, end - start);

  assert(g_conf->paxos_kill_at != 5);

  // reply
  MMonPaxos *accept = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_ACCEPT,
                    ceph_clock_now(g_ceph_context));
  accept->pn = accepted_pn;
  accept->last_committed = last_committed;
  begin->get_connection()->send_message(accept);
}

2.3 Leader收到ack後處理

//Phase 2, Leader 收到Peon的ack後的處理
// leader
void Paxos::handle_accept(MonOpRequestRef op)
{
  op->mark_paxos_event("handle_accept");
  MMonPaxos *accept = static_cast<MMonPaxos*>(op->get_req());
  dout(10) << "handle_accept " << *accept << dendl;
  int from = accept->get_source().num();

  if (accept->pn != accepted_pn) {//更高的PN,可能別人已經當選leader
    // we accepted a higher pn, from some other leader 
    dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << dendl;
    op->mark_paxos_event("have higher pn, ignore");
    return;
  }
  if (last_committed > 0 &&
      accept->last_committed < last_committed-1) {//舊的響應,好比網絡延遲太大引發。拋棄。
    dout(10) << " this is from an old round, ignoring" << dendl;
    op->mark_paxos_event("old round, ignore");
    return;
  }
  assert(accept->last_committed == last_committed ||   // not committed
     accept->last_committed == last_committed-1);  // committed   
     // 只容許差1

  assert(is_updating() || is_updating_previous());
  assert(accepted.count(from) == 0); //確認同一個peon不重複發送
  accepted.insert(from);
  dout(10) << " now " << accepted << " have accepted" << dendl;

  assert(g_conf->paxos_kill_at != 6);

  // only commit (and expose committed state) when we get *all* quorum
  // members to accept.  otherwise, they may still be sharing the now
  // stale state.
  // FIXME: we can improve this with an additional lease revocation message
  // that doesn't block for the persist.  
  if (accepted == mon->get_quorum()) {
    //全部的屬於quorum的都響應才行,不然會走到timeout分支
    // yay, commit!
    op->mark_paxos_event("commit_start");
    commit_start(); //這個函數,最終引發調用commit_finish,修改last_committed。
  }
}

void Paxos::accept_timeout()
{
  dout(1) << "accept timeout, calling fresh election" << dendl;
  accept_timeout_event = 0;
  assert(mon->is_leader());
  assert(is_updating() || is_updating_previous() || is_writing() ||
     is_writing_previous());
  logger->inc(l_paxos_accept_timeout);
  mon->bootstrap();//注意,是直接自舉,即觸發從新覈對qurom,並選leader。Ceph實現的特殊之處。
}

struct C_Committed : public Context {
  Paxos *paxos;
  C_Committed(Paxos *p) : paxos(p) {}
  void finish(int r) {
    assert(r >= 0);
    //這個類,在構造函數中得到鎖,析構函數中釋放鎖
    Mutex::Locker l(paxos->mon->lock);
    //下面這一段執行,是受mon->lock保護的
    paxos->commit_finish();
  }
};

//commit_start只有leader會調用,因此commit_finish 也就只有leader用了
void Paxos::commit_start()
{
  dout(10) << __func__ << " " << (last_committed+1) << dendl;

  assert(g_conf->paxos_kill_at != 7);

  MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);

  // commit locally
  t->put(get_name(), "last_committed", last_committed + 1);
  //這個修改,下面是queue_trans,不是apply

  // decode the value and apply its transaction to the store.
  // this value can now be read from last_committed.
  decode_append_transaction(t, new_value);

  dout(30) << __func__ << " transaction dump:\n";
  JSONFormatter f(true);
  t->dump(&f);
  f.flush(*_dout);
  *_dout << dendl;

  logger->inc(l_paxos_commit);
  logger->inc(l_paxos_commit_keys, t->get_keys());
  logger->inc(l_paxos_commit_bytes, t->get_bytes());
  commit_start_stamp = ceph_clock_now(NULL);
//C_Committed在finish執行時,會獲取鎖(在finish函數),可是當前上下文應該持有了鎖,
//何時放鎖的?  不然C_Committed:finish() 無法拿到鎖。
  get_store()->queue_transaction(t, new C_Committed(this));
  // 鉤子函數,txn結束時callback, C_Committed的finish? trans
  //在MonitorDBStore.h的finish()中有註釋,其實是在作了txn apply後,調用callback
  if (is_updating_previous())
    state = STATE_WRITING_PREVIOUS;
  else if (is_updating())
    state = STATE_WRITING;//設置這兩個狀態後,依賴於異步的commit完成回調,去作清除,
    //本context實際上很快結束,不會修改此狀態。參見paxos::commit_finish()
  else
    assert(0);

  if (mon->get_quorum().size() > 1) {
    // cancel timeout event
    mon->timer.cancel_event(accept_timeout_event);
    accept_timeout_event = 0;
  }
}
//Leader的函數
void Paxos::commit_finish()
{
  dout(20) << __func__ << " " << (last_committed+1) << dendl;
  utime_t end = ceph_clock_now(NULL);
  logger->tinc(l_paxos_commit_latency, end - commit_start_stamp);

  assert(g_conf->paxos_kill_at != 8);

  // cancel lease - it was for the old value.
  //  (this would only happen if message layer lost the 'begin', but
  //   leader still got a majority and committed with out us.)
  lease_expire = utime_t();  // cancel lease

  //這裏才修改last_committed
  last_committed++;//這裏才修改last_committed
  last_commit_time = ceph_clock_now(NULL);

  // refresh first_committed; this txn may have trimmed.  //說了可能trim log。流程還麼仔細看
  first_committed = get_store()->get(get_name(), "first_committed");

  _sanity_check_store();

  // tell everyone
  for (set<int>::const_iterator p = mon->get_quorum().begin();
       p != mon->get_quorum().end();
       ++p) {
    if (*p == mon->rank) continue;

    dout(10) << " sending commit to mon." << *p << dendl;
    MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT,
                      ceph_clock_now(g_ceph_context));
    //leader 在commit完成後,通知peon
    commit->values[last_committed] = new_value;
    commit->pn = accepted_pn;
    commit->last_committed = last_committed;

    mon->messenger->send_message(commit, mon->monmap->get_inst(*p));
    //本地的last_committed等信息已經修改了,通知其餘的
  }

  assert(g_conf->paxos_kill_at != 9);

  // get ready for a new round.
  new_value.clear();

  remove_legacy_versions();

  // WRITING -> REFRESH
  // among other things, this lets do_refresh() -> mon->bootstrap() know
  // it doesn't need to flush the store queue
  assert(is_writing() || is_writing_previous());
  state = STATE_REFRESH;
  //do_refresh(),是讓服務層刷新知道該propose了。
  if (do_refresh()) {//do_refresh的註釋中(.h),在它返回false時,abort,應該是異常狀況吧。
    commit_proposal();//這個應該是調用上次註冊的complete函數
    if (mon->get_quorum().size() > 1) {//若是隻有我本身,單個monitor,那就無需lease了。
      extend_lease();
    }
    //喚醒等待者
    finish_contexts(g_ceph_context, waiting_for_commit); 
    assert(g_conf->paxos_kill_at != 10);

    finish_round();//修改狀態,本輪結束。讓pending的proposal能夠繼續執行。
    //修改了last_committed和status。可是這個同步寫完再作下一個的方式,比較慢。
  }
}

2.4 Peon處理commit消息

//Peon的函數,leader通知哪些已經commit,這些是能夠信任的
void Paxos::handle_commit(MonOpRequestRef op)
{
  op->mark_paxos_event("handle_commit");
  MMonPaxos *commit = static_cast<MMonPaxos*>(op->get_req());
  dout(10) << "handle_commit on " << commit->last_committed << dendl;

  logger->inc(l_paxos_commit);

  if (!mon->is_peon()) {
    dout(10) << "not a peon, dropping" << dendl;
    assert(0);
    return;
  }

  op->mark_paxos_event("store_state");
  store_state(commit);

  if (do_refresh()) {//讓service層刷新狀態
    finish_contexts(g_ceph_context, waiting_for_commit);//Peon端,沒有等待被propose的。
  }
}

2.5 讓上層服務刷新狀態的工具函數

/*一個paxos過程結束後,須要讓上層的各個service(monitor)刷新狀態。
由於paxos這層自己不知道語義,只是肯定執行順序而已。一個paxos決議可能
包含了幾個上層service的內容。
*/
bool Paxos::do_refresh()
{
  bool need_bootstrap = false;

  utime_t start = ceph_clock_now(NULL);

  // make sure we have the latest state loaded up
  mon->refresh_from_paxos(&need_bootstrap);

  utime_t end = ceph_clock_now(NULL);
  logger->inc(l_paxos_refresh);
  logger->tinc(l_paxos_refresh_latency, end - start);

  if (need_bootstrap) {//須要bootstrap才返回false,正常都是成功
    dout(10) << " doing requested bootstrap" << dendl;
    mon->bootstrap();
    return false;
  }

  return true;
}

//喚醒上層等待當前提議完成的上下文
void Paxos::commit_proposal()
{
  dout(10) << __func__ << dendl;
  assert(mon->is_leader());
  assert(is_refresh());

  list<Context*> ls;
  ls.swap(committing_finishers);
  //從pending_finishers ==swap==>  committing_finishers  ==swap==>  ls
  finish_contexts(g_ceph_context, ls); 
  //作callback,paxosservice調用 queue_pending_finisher()註冊的鉤子
}

2.6 完成本輪,開始下一輪

//已經完成上一輪提議過程,能夠開始下一個
void Paxos::finish_round()
{
  dout(10) << __func__ << dendl;
  assert(mon->is_leader());

  // ok, now go active!
  state = STATE_ACTIVE;//不是active是不會去propose的。

  dout(20) << __func__ << " waiting_for_acting" << dendl;
  finish_contexts(g_ceph_context, waiting_for_active);
  dout(20) << __func__ << " waiting_for_readable" << dendl;
  finish_contexts(g_ceph_context, waiting_for_readable);
  dout(20) << __func__ << " waiting_for_writeable" << dendl;
  finish_contexts(g_ceph_context, waiting_for_writeable);
  
  dout(10) << __func__ << " done w/ waiters, state " << state << dendl;

  if (should_trim()) {
    trim();
  }

  if (is_active() && pending_proposal) {
    propose_pending();
  }
}

2.7 其餘

/*
 * return a globally unique, monotonically increasing proposal number
 */
version_t Paxos::get_new_proposal_number(version_t gt)
{
  if (last_pn < gt) 
    last_pn = gt;
  //每一個monitor有本身的rank,把rank做爲本身產生的PN的低位數,則各自不一樣。 
  //好比,rank=5的,產生的rank只多是105, 205, 305等,即n*100 +5
  // update. make it unique among all monitors.
  last_pn /= 100; //因爲gt多是別人發過來的,是不一樣的rank,若是直接 
  //把last_pn +=100,last_pn所帶的rank就是別人的,不是本身應該產生的合法pn。
  last_pn++;
  last_pn *= 100;
  last_pn += (version_t)mon->rank;//若是以前last_pn = 306,而個人rank是 5,  
  //則獲得了新的last_pn是405

  // write
  MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
  t->put(get_name(), "last_pn", last_pn);//持久化到kv

  dout(30) << __func__ << " transaction dump:\n";
  JSONFormatter f(true);
  t->dump(&f);
  f.flush(*_dout);
  *_dout << dendl;

  logger->inc(l_paxos_new_pn);
  utime_t start = ceph_clock_now(NULL);

  get_store()->apply_transaction(t);

  utime_t end = ceph_clock_now(NULL);
  logger->tinc(l_paxos_new_pn_latency, end - start);

  dout(10) << "get_new_proposal_number = " << last_pn << dendl;
  return last_pn;
}


void Paxos::cancel_events()
{
  if (collect_timeout_event) {
    mon->timer.cancel_event(collect_timeout_event);
    collect_timeout_event = 0;
  }
  if (accept_timeout_event) {
    mon->timer.cancel_event(accept_timeout_event);
    accept_timeout_event = 0;
  }
  if (lease_renew_event) {
    mon->timer.cancel_event(lease_renew_event);
    lease_renew_event = 0;
  }
  if (lease_ack_timeout_event) {
    mon->timer.cancel_event(lease_ack_timeout_event);
    lease_ack_timeout_event = 0;
  }  
  if (lease_timeout_event) {
    mon->timer.cancel_event(lease_timeout_event);
    lease_timeout_event = 0;
  }
}

void Paxos::shutdown()
{
  dout(10) << __func__ << " cancel all contexts" << dendl;

  // discard pending transaction
  pending_proposal.reset();

  finish_contexts(g_ceph_context, waiting_for_writeable, -ECANCELED);
  finish_contexts(g_ceph_context, waiting_for_commit, -ECANCELED);
  finish_contexts(g_ceph_context, waiting_for_readable, -ECANCELED);
  finish_contexts(g_ceph_context, waiting_for_active, -ECANCELED);
  finish_contexts(g_ceph_context, pending_finishers, -ECANCELED);
  finish_contexts(g_ceph_context, committing_finishers, -ECANCELED);
  if (logger)
    g_ceph_context->get_perfcounters_collection()->remove(logger);
  delete logger;
}

void Paxos::leader_init()
{
  cancel_events();
  new_value.clear();

  // discard pending transaction
  pending_proposal.reset();//當選leader以前的都廢棄掉

  finish_contexts(g_ceph_context, pending_finishers, -EAGAIN);
  finish_contexts(g_ceph_context, committing_finishers, -EAGAIN);

  logger->inc(l_paxos_start_leader);

  if (mon->get_quorum().size() == 1) {
    state = STATE_ACTIVE;
    return;
  }

  state = STATE_RECOVERING;
  lease_expire = utime_t();
  dout(10) << "leader_init -- starting paxos recovery" << dendl;
  collect(0);
}

void Paxos::peon_init()
{
  cancel_events();
  new_value.clear();

  state = STATE_RECOVERING;
  lease_expire = utime_t();
  dout(10) << "peon_init -- i am a peon" << dendl;

  // start a timer, in case the leader never manages to issue a lease
  reset_lease_timeout();

  // discard pending transaction
  pending_proposal.reset();

  // no chance to write now!
  finish_contexts(g_ceph_context, waiting_for_writeable, -EAGAIN);
  finish_contexts(g_ceph_context, waiting_for_commit, -EAGAIN);
  finish_contexts(g_ceph_context, pending_finishers, -EAGAIN);
  finish_contexts(g_ceph_context, committing_finishers, -EAGAIN);

  logger->inc(l_paxos_start_peon);
}

void Paxos::restart()
{
  dout(10) << "restart -- canceling timeouts" << dendl;
  cancel_events();
  new_value.clear();

  if (is_writing() || is_writing_previous()) {
    dout(10) << __func__ << " flushing" << dendl;
    mon->lock.Unlock();
    mon->store->flush();
    mon->lock.Lock();
    dout(10) << __func__ << " flushed" << dendl;
  }
  state = STATE_RECOVERING;

  // discard pending transaction
  pending_proposal.reset();

  finish_contexts(g_ceph_context, committing_finishers, -EAGAIN);
  finish_contexts(g_ceph_context, pending_finishers, -EAGAIN);
  finish_contexts(g_ceph_context, waiting_for_commit, -EAGAIN);
  finish_contexts(g_ceph_context, waiting_for_active, -EAGAIN);

  logger->inc(l_paxos_restart);
}
void Paxos::dispatch(MonOpRequestRef op)
{
  assert(op->is_type_paxos());
  op->mark_paxos_event("dispatch");
  PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
  // election in progress?
  if (!mon->is_leader() && !mon->is_peon()) {
    dout(5) << "election in progress, dropping " << *m << dendl;
    return;    
  }

  // check sanity
  assert(mon->is_leader() || 
     (mon->is_peon() && m->get_source().num() == mon->get_leader())); 
     //應該是指peon只接受leader的消息,可是不能接受其餘peon的消息
  
  switch (m->get_type()) {

  case MSG_MON_PAXOS:
    {
      MMonPaxos *pm = reinterpret_cast<MMonPaxos*>(m);

      // NOTE: these ops are defined in messages/MMonPaxos.h
      switch (pm->op) {
    // learner
      case MMonPaxos::OP_COLLECT:
    handle_collect(op);
    break;
      case MMonPaxos::OP_LAST:
    handle_last(op);
    break;
      case MMonPaxos::OP_BEGIN:
    handle_begin(op);
    break;
      case MMonPaxos::OP_ACCEPT:
    handle_accept(op);
    break;        
      case MMonPaxos::OP_COMMIT:
    handle_commit(op);
    break;
      case MMonPaxos::OP_LEASE:
    handle_lease(op);
    break;
      case MMonPaxos::OP_LEASE_ACK:
    handle_lease_ack(op);
    break;
      default:
    assert(0);
      }
    }
    break;
    
  default:
    assert(0);
  }
}
// -- WRITE --

bool Paxos::is_writeable()
{
  return
    mon->is_leader() &&
    is_active() &&
    is_lease_valid();
}

void Paxos::propose_pending()
{
  assert(is_active());
  assert(pending_proposal);

  cancel_events();

  bufferlist bl;
  pending_proposal->encode(bl);

  dout(10) << __func__ << " " << (last_committed + 1)
       << " " << bl.length() << " bytes" << dendl;
  dout(30) << __func__ << " transaction dump:\n";
  JSONFormatter f(true);
  pending_proposal->dump(&f);
  f.flush(*_dout);
  *_dout << dendl;

  pending_proposal.reset();//讓pending_proposal再也不ref裏面的Transaction類型對象,
  //參見http://en.cppreference.com/w/cpp/memory/shared_ptr/reset

  committing_finishers.swap(pending_finishers); 
  //list::swap()的含義: Exchanges the contents of two lists, 
  // http://www.cplusplus.com/reference/list/list/swap-free/
  state = STATE_UPDATING;
  begin(bl);
}

void Paxos::queue_pending_finisher(Context *onfinished)
{
  dout(5) << __func__ << " " << onfinished << dendl;
  assert(onfinished);
  pending_finishers.push_back(onfinished);
}
//注意,上層經過這個函數獲取txn,而後再往裏面添加內容。
//按照這麼理解,一次propose,可能有多個操做被打包
MonitorDBStore::TransactionRef Paxos::get_pending_transaction()  
//pending transaction,不是proposal
{
  assert(mon->is_leader());
  if (!pending_proposal) {
    pending_proposal.reset(new MonitorDBStore::Transaction);
    assert(pending_finishers.empty());
  }
  return pending_proposal;
}

bool Paxos::trigger_propose()
{
  if (is_active()) {
    dout(10) << __func__ << " active, proposing now" << dendl;
    propose_pending();
    return true;
  } else {
    dout(10) << __func__ << " not active, will propose later" << dendl;
    return false;
  }
}

bool Paxos::is_consistent()
{
  return (first_committed <= last_committed);
}
相關文章
相關標籤/搜索