本文主要介紹on_applied、on_commit、on_applied_sync、on_all_commit、on_all_applied在數據IO處理流程中的回調代碼梳理。寫的比較簡單,不過關鍵點都已經整理。以filestore爲例:app
OSD端詳細程分析:https://blog.51cto.com/wendashuai/2497104less
主端:ide
PrimaryLogPG::execute_ctx()->issue_repop(repop, ctx)->pgbackend->submit_transaction()->issue_op(); parent->queue_transactions()
1.函數
void PrimaryLogPG::issue_repop(RepGather *repop, OpContext *ctx) { Context *onapplied_sync = new C_OSD_OndiskWriteUnlock() Context *on_all_applied = new C_OSD_RepopApplied(this, repop); Context *on_all_commit = new C_OSD_RepopCommit(this, repop); pgbackend->submit_transaction(//-> ReplicatedBackend::submit_transaction(...,on_local_applied_sync,on_all_acked,on_all_commit,...) soid, ctx->delta_stats, ctx->at_version, std::move(ctx->op_t), pg_trim_to, min_last_complete_ondisk, ctx->log, ctx->updated_hset_history, onapplied_sync, on_all_applied, on_all_commit, repop->rep_tid, ctx->reqid, ctx->op); }
2.this
void ReplicatedBackend::submit_transaction( const hobject_t &soid, const object_stat_sum_t &delta_stats, const eversion_t &at_version, PGTransactionUPtr &&_t, const eversion_t &trim_to, const eversion_t &roll_forward_to, const vector<pg_log_entry_t> &_log_entries, boost::optional<pg_hit_set_history_t> &hset_history, Context *on_local_applied_sync, Context *on_all_acked, Context *on_all_commit, ceph_tid_t tid, osd_reqid_t reqid, OpRequestRef orig_op) { InProgressOp &op = in_progress_ops.insert( make_pair( tid, InProgressOp( tid, on_all_commit, on_all_acked, orig_op, at_version) ) ).first->second; op.waiting_for_applied.insert( parent->get_actingbackfill_shards().begin(), parent->get_actingbackfill_shards().end()); op.waiting_for_commit.insert( parent->get_actingbackfill_shards().begin(), parent->get_actingbackfill_shards().end()); //issue_op將ops的信息封裝成message發送給replica osd副本的。這個操做就是在封裝message,這裏就再也不多說了 issue_op( soid, at_version, tid, reqid, trim_to, at_version, added.size() ? *(added.begin()) : hobject_t(), removed.size() ? *(removed.begin()) : hobject_t(), log_entries, hset_history, &op, op_t); op_t.register_on_applied_sync(on_local_applied_sync); --->on_applied_sync op_t.register_on_applied( --->on_applied parent->bless_context( new C_OSD_OnOpApplied(this, &op))); op_t.register_on_commit( --->on_commit parent->bless_context( new C_OSD_OnOpCommit(this, &op))); parent->queue_transactions(tls, op.op);//int FileStore::queue_transactions() }
3.線程
void ReplicatedBackend::issue_op( const hobject_t &soid, const eversion_t &at_version, ceph_tid_t tid, osd_reqid_t reqid, eversion_t pg_trim_to, eversion_t pg_roll_forward_to, hobject_t new_temp_oid, hobject_t discard_temp_oid, const vector<pg_log_entry_t> &log_entries, boost::optional<pg_hit_set_history_t> &hset_hist, InProgressOp *op, ObjectStore::Transaction &op_t) { get_parent()->send_message_osd_cluster(peer.osd, wr, get_osdmap()->get_epoch());//go }
4.
副本端:code
ReplicatedBackend::handle_message()--->sub_op_modify(op)--->queue_transactions()
// sub op modify 當pg的從副本接收到MSG_OSD_REPOP,調用該函數,完成本地對象的數據寫入 void ReplicatedBackend::sub_op_modify(OpRequestRef op) { rm->opt.register_on_commit( parent->bless_context( new C_OSD_RepModifyCommit(this, rm))); rm->localt.register_on_applied( parent->bless_context( new C_OSD_RepModifyApply(this, rm))); parent->queue_transactions(tls, op);// ->int FileStore::queue_transactions() }
5.對象
主回調: class C_OSD_OnOpCommit : public Context { ReplicatedBackend *pg; ReplicatedBackend::InProgressOp *op; public: C_OSD_OnOpCommit(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op) : pg(pg), op(op) {} void finish(int) override { pg->op_commit(op); } }; class C_OSD_OnOpApplied : public Context { ReplicatedBackend *pg; ReplicatedBackend::InProgressOp *op; public: C_OSD_OnOpApplied(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op) : pg(pg), op(op) {} void finish(int) override { pg->op_applied(op); } };
6.blog
副本回調: struct ReplicatedBackend::C_OSD_RepModifyApply : public Context { ReplicatedBackend *pg; RepModifyRef rm; C_OSD_RepModifyApply(ReplicatedBackend *pg, RepModifyRef r) : pg(pg), rm(r) {} void finish(int r) override { pg->repop_applied(rm); } }; struct ReplicatedBackend::C_OSD_RepModifyCommit : public Context { ReplicatedBackend *pg; RepModifyRef rm; C_OSD_RepModifyCommit(ReplicatedBackend *pg, RepModifyRef r) : pg(pg), rm(r) {} void finish(int r) override { pg->repop_commit(rm); } };
7.filestore端rem
int FileStore::queue_transactions(Sequencer *posr, vector<Transaction>& tls, TrackedOpRef osd_op, ThreadPool::TPHandle *handle) { Context *onreadable; Context *ondisk; Context *onreadable_sync; ObjectStore::Transaction::collect_contexts( tls, &onreadable, &ondisk, &onreadable_sync); } static void collect_contexts( vector<Transaction>& t, Context **out_on_applied, Context **out_on_commit, Context **out_on_applied_sync) { list<Context *> on_applied, on_commit, on_applied_sync; for (vector<Transaction>::iterator i = t.begin();i != t.end();++i) { on_applied.splice(on_applied.end(), (*i).on_applied); on_commit.splice(on_commit.end(), (*i).on_commit); on_applied_sync.splice(on_applied_sync.end(), (*i).on_applied_sync); } *out_on_applied = C_Contexts::list_to_context(on_applied); *out_on_commit = C_Contexts::list_to_context(on_commit); *out_on_applied_sync = C_Contexts::list_to_context(on_applied_sync); }
8.寫journal和data回調關係
_op_journal_transactions(tbl, orig_len, o->op,new C_JournaledAhead(this, osr, o, ondisk),osd_op); ->onjournal ->oncommit 寫完journal,回調ondisk,Finisher線程ondisk_finishers ---> *Finisher::finisher_thread_entry() --->complete() ---> finish() if (ondisk) { dout(10) << " queueing ondisk " << ondisk << dendl; ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(ondisk); } 寫完data,回調onreadable,Finisher線程apply_finishers ---> *Finisher::finisher_thread_entry() --->complete() ---> finish() if (o->onreadable) {//寫完filestore後,數據開始可讀。(此時可能寫到page cache了) apply_finishers[osr->id % m_apply_finisher_num]->queue(o->onreadable); } onreadable ---> out_on_applied ---on_applied ondisk ---> out_on_commit --->on_commit onreadable_sync --->out_on_applied_sync --->on_applied_sync
主完成了journal的寫入: C_OSD_OnOpCommit pg->op_commit(op); ondisk ondisk_finishers; 此時可繼續寫? 主完成data寫入: C_OSD_OnOpApplied pg->op_applied(op) onreadable apply_finishers; 此時寫入的數據可讀? 副本完成了journal的寫入: C_OSD_RepModifyCommit pg->repop_commit(rm) send_message_osd_cluster發送到主 副本完成data寫入: C_OSD_RepModifyApply pg->repop_applied(rm) send_message_osd_cluster發送到主 完成全部副本journal寫入:all_committed on_all_commit C_OSD_RepopCommit repop_all_committed waiting_for_ondisk //called when all commit 完成全部副本data寫入: all_applied on_all_applied C_OSD_RepopApplied repop_all_applied waiting_for_ack //called when all acked Context *on_all_commit = new C_OSD_RepopCommit(this, repop);//on_all_commit Context *on_all_applied = new C_OSD_RepopApplied(this, repop);//on_all_acked Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(xxx); //on_local_applied_sync 本地端sync完成 全部端完成了journal的寫入後,此時數據已經寫到journal盤,會處理waiting_for_ondisk list,ondisk狀態。環境一旦崩潰,能夠journal replay方式回放恢復; 全部端完成了data的寫入後,即寫入到了filestore層,此時表明apply成功,會處理waiting_for_ack list,向client端發送ack通知寫完成,此時數據處於可讀狀態onreadable;