ceph osd端處理回調相關流程

本文主要介紹on_applied、on_commit、on_applied_sync、on_all_commit、on_all_applied在數據IO處理流程中的回調代碼梳理。寫的比較簡單,不過關鍵點都已經整理。以filestore爲例:app

OSD端詳細程分析:https://blog.51cto.com/wendashuai/2497104less

主端:ide

PrimaryLogPG::execute_ctx()->issue_repop(repop, ctx)->pgbackend->submit_transaction()->issue_op(); parent->queue_transactions()

1.函數

void PrimaryLogPG::issue_repop(RepGather *repop, OpContext *ctx)
{
  Context *onapplied_sync = new C_OSD_OndiskWriteUnlock()
  Context *on_all_applied = new C_OSD_RepopApplied(this, repop);
  Context *on_all_commit = new C_OSD_RepopCommit(this, repop);

  pgbackend->submit_transaction(//-> ReplicatedBackend::submit_transaction(...,on_local_applied_sync,on_all_acked,on_all_commit,...)
    soid,
    ctx->delta_stats,
    ctx->at_version,
    std::move(ctx->op_t),
    pg_trim_to,
    min_last_complete_ondisk,
    ctx->log,
    ctx->updated_hset_history,
    onapplied_sync,
    on_all_applied,
    on_all_commit,
    repop->rep_tid,
    ctx->reqid,
    ctx->op);
}

2.this

void ReplicatedBackend::submit_transaction(
  const hobject_t &soid,
  const object_stat_sum_t &delta_stats,
  const eversion_t &at_version,
  PGTransactionUPtr &&_t,
  const eversion_t &trim_to,
  const eversion_t &roll_forward_to,
  const vector<pg_log_entry_t> &_log_entries,
  boost::optional<pg_hit_set_history_t> &hset_history,
  Context *on_local_applied_sync,
  Context *on_all_acked,
  Context *on_all_commit,
  ceph_tid_t tid,
  osd_reqid_t reqid,
  OpRequestRef orig_op)
{

  InProgressOp &op = in_progress_ops.insert(
    make_pair(
      tid,
      InProgressOp(
  tid, on_all_commit, on_all_acked,
  orig_op, at_version)
      )
    ).first->second;

  op.waiting_for_applied.insert(
    parent->get_actingbackfill_shards().begin(),
    parent->get_actingbackfill_shards().end());
  op.waiting_for_commit.insert(
    parent->get_actingbackfill_shards().begin(),
    parent->get_actingbackfill_shards().end());
//issue_op將ops的信息封裝成message發送給replica osd副本的。這個操做就是在封裝message,這裏就再也不多說了
  issue_op(
    soid,
    at_version,
    tid,
    reqid,
    trim_to,
    at_version,
    added.size() ? *(added.begin()) : hobject_t(),
    removed.size() ? *(removed.begin()) : hobject_t(),
    log_entries,
    hset_history,
    &op,
    op_t);

  op_t.register_on_applied_sync(on_local_applied_sync);  --->on_applied_sync
  op_t.register_on_applied(                              --->on_applied
    parent->bless_context(
      new C_OSD_OnOpApplied(this, &op)));
  op_t.register_on_commit(                                --->on_commit
    parent->bless_context(
      new C_OSD_OnOpCommit(this, &op)));
  parent->queue_transactions(tls, op.op);//int FileStore::queue_transactions()
}

3.線程

void ReplicatedBackend::issue_op(
  const hobject_t &soid,
  const eversion_t &at_version,
  ceph_tid_t tid,
  osd_reqid_t reqid,
  eversion_t pg_trim_to,
  eversion_t pg_roll_forward_to,
  hobject_t new_temp_oid,
  hobject_t discard_temp_oid,
  const vector<pg_log_entry_t> &log_entries,
  boost::optional<pg_hit_set_history_t> &hset_hist,
  InProgressOp *op,
  ObjectStore::Transaction &op_t)
{

  get_parent()->send_message_osd_cluster(peer.osd, wr, get_osdmap()->get_epoch());//go
}

4.
副本端:code

ReplicatedBackend::handle_message()--->sub_op_modify(op)--->queue_transactions()
// sub op modify 當pg的從副本接收到MSG_OSD_REPOP,調用該函數,完成本地對象的數據寫入
void ReplicatedBackend::sub_op_modify(OpRequestRef op)
{

  rm->opt.register_on_commit(
    parent->bless_context(
      new C_OSD_RepModifyCommit(this, rm)));
  rm->localt.register_on_applied(
    parent->bless_context(
      new C_OSD_RepModifyApply(this, rm)));  
  parent->queue_transactions(tls, op);// ->int FileStore::queue_transactions()
}

5.對象

主回調:
class C_OSD_OnOpCommit : public Context {
  ReplicatedBackend *pg;
  ReplicatedBackend::InProgressOp *op;
public:
  C_OSD_OnOpCommit(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op) 
    : pg(pg), op(op) {}
  void finish(int) override {
    pg->op_commit(op);
  }
};

class C_OSD_OnOpApplied : public Context {
  ReplicatedBackend *pg;
  ReplicatedBackend::InProgressOp *op;
public:
  C_OSD_OnOpApplied(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op) 
    : pg(pg), op(op) {}
  void finish(int) override {
    pg->op_applied(op);
  }
};

6.blog

副本回調:
struct ReplicatedBackend::C_OSD_RepModifyApply : public Context {
  ReplicatedBackend *pg;
  RepModifyRef rm;
  C_OSD_RepModifyApply(ReplicatedBackend *pg, RepModifyRef r)
    : pg(pg), rm(r) {}
  void finish(int r) override {
    pg->repop_applied(rm);
  }
};

struct ReplicatedBackend::C_OSD_RepModifyCommit : public Context {
  ReplicatedBackend *pg;
  RepModifyRef rm;
  C_OSD_RepModifyCommit(ReplicatedBackend *pg, RepModifyRef r)
    : pg(pg), rm(r) {}
  void finish(int r) override {
    pg->repop_commit(rm);
  }
};

7.filestore端rem

int FileStore::queue_transactions(Sequencer *posr, vector<Transaction>& tls,
          TrackedOpRef osd_op,
          ThreadPool::TPHandle *handle)
{
  Context *onreadable;
  Context *ondisk;
  Context *onreadable_sync;
  ObjectStore::Transaction::collect_contexts(
    tls, &onreadable, &ondisk, &onreadable_sync);
}

static void collect_contexts(
    vector<Transaction>& t,
    Context **out_on_applied,
    Context **out_on_commit,
    Context **out_on_applied_sync) 
{
  list<Context *> on_applied, on_commit, on_applied_sync;
  for (vector<Transaction>::iterator i = t.begin();i != t.end();++i) 
  {
    on_applied.splice(on_applied.end(), (*i).on_applied);
    on_commit.splice(on_commit.end(), (*i).on_commit);
    on_applied_sync.splice(on_applied_sync.end(), (*i).on_applied_sync);
  }
    *out_on_applied = C_Contexts::list_to_context(on_applied);
    *out_on_commit = C_Contexts::list_to_context(on_commit);
    *out_on_applied_sync = C_Contexts::list_to_context(on_applied_sync);
}

8.寫journal和data回調關係

_op_journal_transactions(tbl, orig_len, o->op,new C_JournaledAhead(this, osr, o, ondisk),osd_op); ->onjournal ->oncommit
寫完journal,回調ondisk,Finisher線程ondisk_finishers ---> *Finisher::finisher_thread_entry() --->complete() ---> finish()
  if (ondisk) {
    dout(10) << " queueing ondisk " << ondisk << dendl;
    ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(ondisk);
  }

寫完data,回調onreadable,Finisher線程apply_finishers ---> *Finisher::finisher_thread_entry() --->complete() ---> finish()
  if (o->onreadable) {//寫完filestore後,數據開始可讀。(此時可能寫到page cache了)
    apply_finishers[osr->id % m_apply_finisher_num]->queue(o->onreadable);
  }

onreadable      ---> out_on_applied ---on_applied
ondisk          ---> out_on_commit --->on_commit
onreadable_sync --->out_on_applied_sync  --->on_applied_sync

總結:

主完成了journal的寫入: C_OSD_OnOpCommit    pg->op_commit(op);      ondisk         ondisk_finishers;     此時可繼續寫?
主完成data寫入:        C_OSD_OnOpApplied   pg->op_applied(op)      onreadable     apply_finishers;    此時寫入的數據可讀?

副本完成了journal的寫入: C_OSD_RepModifyCommit  pg->repop_commit(rm)  send_message_osd_cluster發送到主
副本完成data寫入:        C_OSD_RepModifyApply   pg->repop_applied(rm) send_message_osd_cluster發送到主  

完成全部副本journal寫入:all_committed  on_all_commit C_OSD_RepopCommit   repop_all_committed  waiting_for_ondisk //called when all commit
完成全部副本data寫入:  all_applied    on_all_applied  C_OSD_RepopApplied  repop_all_applied   waiting_for_ack //called when all acked

Context *on_all_commit = new C_OSD_RepopCommit(this, repop);//on_all_commit
Context *on_all_applied = new C_OSD_RepopApplied(this, repop);//on_all_acked
Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(xxx); //on_local_applied_sync  本地端sync完成

全部端完成了journal的寫入後,此時數據已經寫到journal盤,會處理waiting_for_ondisk list,ondisk狀態。環境一旦崩潰,能夠journal replay方式回放恢復;
全部端完成了data的寫入後,即寫入到了filestore層,此時表明apply成功,會處理waiting_for_ack list,向client端發送ack通知寫完成,此時數據處於可讀狀態onreadable;
相關文章
相關標籤/搜索