Ceph 0.94.1數據結構
Qemu 2.4.0app
rbd.c異步
在qemu_rbd_open函數中,操做了一個重要的數據結構——結構體BDRVRBDState,這個結構體保存了很是重要的信息。函數
typedef struct BDRVRBDState { rados_t cluster; //cluster的handle rados_ioctx_t io_ctx; //cluster的IO上下文 rbd_image_t image; //rbd鏡像的結構體 char name[RBD_MAX_IMAGE_NAME_SIZE]; //rbd鏡像的名字 char *snap; //rbd鏡像快照的名字 } BDRVRBDState;
對qemu_rbd_open函數的分析:spa
static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags, Error **errp) { BDRVRBDState *s = bs->opaque; char pool[RBD_MAX_POOL_NAME_SIZE]; char snap_buf[RBD_MAX_SNAP_NAME_SIZE]; char conf[RBD_MAX_CONF_SIZE]; char clientname_buf[RBD_MAX_CONF_SIZE]; char *clientname; QemuOpts *opts; Error *local_err = NULL; const char *filename; int r; /*這裏應該是參數相關*/ opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort); qemu_opts_absorb_qdict(opts, options, &local_err); if (local_err) { error_propagate(errp, local_err); qemu_opts_del(opts); return -EINVAL; } filename = qemu_opt_get(opts, "filename"); /*分析filename,而後分別賦值*/ if (qemu_rbd_parsename(filename, pool, sizeof(pool), snap_buf, sizeof(snap_buf), s->name, sizeof(s->name), conf, sizeof(conf), errp) < 0) { r = -EINVAL; goto failed_opts; } /* pool:pool的名字 snap_buf:鏡像快照的名字 s->name:rbd鏡像的名字 conf:配置的內容 */ clientname = qemu_rbd_parse_clientname(conf, clientname_buf); /*client_buf:client的名字*/ r = rados_create(&s->cluster, clientname); //建立cluster handle if (r < 0) { error_setg(errp, "error initializing"); goto failed_opts; } /*複製snap_buf中的內容到s->snap*/ s->snap = NULL; if (snap_buf[0] != '\0') { s->snap = g_strdup(snap_buf); } /*設置配置文件,若是用戶沒有設置則採用默認設置*/ if (strstr(conf, "conf=") == NULL) { /* try default location, but ignore failure */ rados_conf_read_file(s->cluster, NULL); } else if (conf[0] != '\0') { r = qemu_rbd_set_conf(s->cluster, conf, true, errp); if (r < 0) { goto failed_shutdown; } } /*這裏沒有理解是什麼意思*/ if (conf[0] != '\0') { r = qemu_rbd_set_conf(s->cluster, conf, false, errp); if (r < 0) { goto failed_shutdown; } } /* * Fallback to more conservative semantics if setting cache * options fails. Ignore errors from setting rbd_cache because the * only possible error is that the option does not exist, and * librbd defaults to no caching. If write through caching cannot * be set up, fall back to no caching. */ /*設置cache的參數*/ if (flags & BDRV_O_NOCACHE) { rados_conf_set(s->cluster, "rbd_cache", "false"); } else { rados_conf_set(s->cluster, "rbd_cache", "true"); } r = rados_connect(s->cluster); //鏈接cluster if (r < 0) { error_setg(errp, "error connecting"); goto failed_shutdown; } r = rados_ioctx_create(s->cluster, pool, &s->io_ctx); //建立IO上下文 if (r < 0) { error_setg(errp, "error opening pool %s", pool); goto failed_shutdown; } r = rbd_open(s->io_ctx, s->name, &s->image, s->snap); //在取得了IO上下文的狀況下,打開rbd鏡像 if (r < 0) { error_setg(errp, "error reading header from %s", s->name); goto failed_open; } bs->read_only = (s->snap != NULL); qemu_opts_del(opts); return 0; failed_open: rados_ioctx_destroy(s->io_ctx); failed_shutdown: rados_shutdown(s->cluster); g_free(s->snap); failed_opts: qemu_opts_del(opts); return r; }
可見,在完成qemu_rbd_open函數以後,關於rbd鏡像的信息被保存在了bs->opaque中,從而能夠被其它函數所利用。code
qemu_rbd_close函數很是簡單,在這裏就不貼出來了。接口
qemu對於rbd的讀寫操做貌似都是以異步的方式進行的。對讀、寫和flush函數的分析以下:文檔
/*異步讀函數*/ static BlockAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs, int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, BlockCompletionFunc *cb, void *opaque) { return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque, RBD_AIO_READ); } /*異步寫函數*/ static BlockAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs, int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, BlockCompletionFunc *cb, void *opaque) { return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque, RBD_AIO_WRITE); } /*flush操做函數*/ #ifdef LIBRBD_SUPPORTS_AIO_FLUSH static BlockAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs, BlockCompletionFunc *cb, void *opaque) { return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH); } #else static int qemu_rbd_co_flush(BlockDriverState *bs) { #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1) /* rbd_flush added in 0.1.1 */ BDRVRBDState *s = bs->opaque; return rbd_flush(s->image); #else return 0; #endif } #endif #ifdef LIBRBD_SUPPORTS_DISCARD static BlockAIOCB* qemu_rbd_aio_discard(BlockDriverState *bs, int64_t sector_num, int nb_sectors, BlockCompletionFunc *cb, void *opaque) { return rbd_start_aio(bs, sector_num, NULL, nb_sectors, cb, opaque, RBD_AIO_DISCARD); } #endif
可見,讀寫和flush都是經過rbd_start_aio這個函數來完成。固然,這是在librbd支持異步flush的狀況下。那麼rbd_start_aio又是怎樣的呢?get
/*開始異步IO操做的函數*/ /*在參數列表當中,sector_num, qiov, nb_sectors, cb應該都是與qemu磁盤操做有關的參數*/ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs, int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, BlockCompletionFunc *cb, void *opaque, RBDAIOCmd cmd) { RBDAIOCB *acb; RADOSCB *rcb = NULL; rbd_completion_t c; int64_t off, size; char *buf; int r; BDRVRBDState *s = bs->opaque; acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque); acb->cmd = cmd; acb->qiov = qiov; if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) { acb->bounce = NULL; } else { acb->bounce = qemu_try_blockalign(bs, qiov->size); if (acb->bounce == NULL) { goto failed; } } acb->ret = 0; acb->error = 0; acb->s = s; acb->bh = NULL; if (cmd == RBD_AIO_WRITE) { qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size); } buf = acb->bounce; off = sector_num * BDRV_SECTOR_SIZE; size = nb_sectors * BDRV_SECTOR_SIZE; rcb = g_new(RADOSCB, 1); rcb->acb = acb; rcb->buf = buf; rcb->s = acb->s; rcb->size = size; r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c); if (r < 0) { goto failed; } /*acb和rcb應該都是與磁盤IO相關的數據結構,描述了磁盤IO的信息,好比寫入位置的扇區位置,寫入大小,寫入信息等等。上面的代碼初始化了磁盤IO的信息*/ /*根據command的類型,來對磁盤鏡像進行操做*/ switch (cmd) { case RBD_AIO_WRITE: r = rbd_aio_write(s->image, off, size, buf, c); break; case RBD_AIO_READ: r = rbd_aio_read(s->image, off, size, buf, c); break; case RBD_AIO_DISCARD: r = rbd_aio_discard_wrapper(s->image, off, size, c); break; case RBD_AIO_FLUSH: r = rbd_aio_flush_wrapper(s->image, c); break; default: r = -EINVAL; } if (r < 0) { goto failed_completion; } return &acb->common; failed_completion: rbd_aio_release(c); failed: g_free(rcb); qemu_vfree(acb->bounce); qemu_aio_unref(acb); return NULL; }
在對rbd.c的分析中,咱們能夠看到不管是讀仍是寫,最後都是對image的操做。cmd
那麼,咱們來rbd_open這個函數是如何利用IO上下文和snap來打開一個rbd鏡像的。
在Ceph的文檔裏並無介紹如何使用C/C++版的librbd,差評啊。只能本身對照qemu和Ceph的源碼進行分析了。
Ceph的源碼版本:0.94.1
在src/include/rbd/librbd.h中聲明瞭librbd的C語言接口
其中rbd_open的定義以下:
CEPH_RBD_API int rbd_open(rados_ioctx_t io, const char *name, rbd_image_t *image, const char *snap_name);
rbd_close的定義以下:
CEPH_RBD_API int rbd_close(rbd_image_t image);
咱們的目標是讓一個image鏈接到兩個pool,一個ssd pool和一個hdd pool。在更改源碼的過程中,咱們儘可能少的改動源碼。當前個人想法是:
能夠設置一個當前的默認pool,將image映射到該pool上,而後在運行的過程中,image對應的pool可能改變,這樣的話就沒必要改寫那些讀寫image的代碼。
librbd.h中對應的函數聲明在src/librbd/librbd.cc中定義
extern "C" int rbd_open(rados_ioctx_t p, const char *name, rbd_image_t *image, const char *snap_name) { librados::IoCtx io_ctx; librados::IoCtx::from_rados_ioctx_t(p, io_ctx); librbd::ImageCtx *ictx = new librbd::ImageCtx(name, "", snap_name, io_ctx, false); //建立鏡像上下文 tracepoint(librbd, open_image_enter, ictx, ictx->name.c_str(), ictx->id.c_str(), ictx->snap_name.c_str(), ictx->read_only); //建立回溯點? int r = librbd::open_image(ictx); //打開rbd鏡像 if (r >= 0) *image = (rbd_image_t)ictx; 將鏡像上下文賦給image,提供給函數的調用者使用 tracepoint(librbd, open_image_exit, r); return r; } extern "C" int rbd_close(rbd_image_t image) { librbd::ImageCtx *ctx = (librbd::ImageCtx *)image; tracepoint(librbd, close_image_enter, ctx, ctx->name.c_str(), ctx->id.c_str()); librbd::close_image(ctx); tracepoint(librbd, close_image_exit); return 0; }