Qemu打開rbd鏡像的源碼分析

Ceph 0.94.1數據結構

Qemu 2.4.0app

rbd.c異步

在qemu_rbd_open函數中,操做了一個重要的數據結構——結構體BDRVRBDState,這個結構體保存了很是重要的信息。函數

typedef struct BDRVRBDState {
    rados_t cluster;        //cluster的handle
    rados_ioctx_t io_ctx;    //cluster的IO上下文
    rbd_image_t image;    //rbd鏡像的結構體
    char name[RBD_MAX_IMAGE_NAME_SIZE];  //rbd鏡像的名字
    char *snap;    //rbd鏡像快照的名字
} BDRVRBDState;

對qemu_rbd_open函數的分析spa

static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
                         Error **errp)
{
    BDRVRBDState *s = bs->opaque;
    char pool[RBD_MAX_POOL_NAME_SIZE];
    char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
    char conf[RBD_MAX_CONF_SIZE];
    char clientname_buf[RBD_MAX_CONF_SIZE];
    char *clientname;
    QemuOpts *opts;
    Error *local_err = NULL;
    const char *filename;
    int r;
 /*這裏應該是參數相關*/
    opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
    qemu_opts_absorb_qdict(opts, options, &local_err);
    if (local_err) {
        error_propagate(errp, local_err);
        qemu_opts_del(opts);
        return -EINVAL;
    }
    filename = qemu_opt_get(opts, "filename");
	
 /*分析filename,而後分別賦值*/
    if (qemu_rbd_parsename(filename, pool, sizeof(pool),
                           snap_buf, sizeof(snap_buf),
                           s->name, sizeof(s->name),
                           conf, sizeof(conf), errp) < 0) {
        r = -EINVAL;
        goto failed_opts;
    }
 /*
 pool:pool的名字
 snap_buf:鏡像快照的名字
 s->name:rbd鏡像的名字
 conf:配置的內容
 */
	
    clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
 /*client_buf:client的名字*/
    r = rados_create(&s->cluster, clientname);	 //建立cluster handle
    if (r < 0) {
        error_setg(errp, "error initializing");
        goto failed_opts;
    }
	
 /*複製snap_buf中的內容到s->snap*/
    s->snap = NULL;
    if (snap_buf[0] != '\0') {
        s->snap = g_strdup(snap_buf);
    }
	
 /*設置配置文件,若是用戶沒有設置則採用默認設置*/
    if (strstr(conf, "conf=") == NULL) {
        /* try default location, but ignore failure */
        rados_conf_read_file(s->cluster, NULL);
    } else if (conf[0] != '\0') {
        r = qemu_rbd_set_conf(s->cluster, conf, true, errp);
        if (r < 0) {
            goto failed_shutdown;
        }
    }
 /*這裏沒有理解是什麼意思*/
    if (conf[0] != '\0') {
        r = qemu_rbd_set_conf(s->cluster, conf, false, errp);
        if (r < 0) {
            goto failed_shutdown;
        }
    }
    /*
     * Fallback to more conservative semantics if setting cache
     * options fails. Ignore errors from setting rbd_cache because the
     * only possible error is that the option does not exist, and
     * librbd defaults to no caching. If write through caching cannot
     * be set up, fall back to no caching.
     */
 /*設置cache的參數*/ 
    if (flags & BDRV_O_NOCACHE) {
        rados_conf_set(s->cluster, "rbd_cache", "false");
    } else {
        rados_conf_set(s->cluster, "rbd_cache", "true");
    }
    r = rados_connect(s->cluster);	 //鏈接cluster
    if (r < 0) {
        error_setg(errp, "error connecting");
        goto failed_shutdown;
    }
    r = rados_ioctx_create(s->cluster, pool, &s->io_ctx);	//建立IO上下文
    if (r < 0) {
        error_setg(errp, "error opening pool %s", pool);
        goto failed_shutdown;
    }
    r = rbd_open(s->io_ctx, s->name, &s->image, s->snap);	 //在取得了IO上下文的狀況下,打開rbd鏡像
    if (r < 0) {
        error_setg(errp, "error reading header from %s", s->name);
        goto failed_open;
    }
    bs->read_only = (s->snap != NULL);
    qemu_opts_del(opts);
    return 0;
failed_open:
    rados_ioctx_destroy(s->io_ctx);
failed_shutdown:
    rados_shutdown(s->cluster);
    g_free(s->snap);
failed_opts:
    qemu_opts_del(opts);
    return r;
}

可見,在完成qemu_rbd_open函數以後,關於rbd鏡像的信息被保存在了bs->opaque中,從而能夠被其它函數所利用。code

qemu_rbd_close函數很是簡單,在這裏就不貼出來了。接口

qemu對於rbd的讀寫操做貌似都是以異步的方式進行的。對讀、寫和flush函數的分析以下:文檔

/*異步讀函數*/
static BlockAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs,
                                      int64_t sector_num,
                                      QEMUIOVector *qiov,
                                      int nb_sectors,
                                      BlockCompletionFunc *cb,
                                      void *opaque)
{
    return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque,
                         RBD_AIO_READ);
}
/*異步寫函數*/
static BlockAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs,
                                       int64_t sector_num,
                                       QEMUIOVector *qiov,
                                       int nb_sectors,
                                       BlockCompletionFunc *cb,
                                       void *opaque)
{
    return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque,
                         RBD_AIO_WRITE);
}
/*flush操做函數*/
#ifdef LIBRBD_SUPPORTS_AIO_FLUSH
static BlockAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs,
                                      BlockCompletionFunc *cb,
                                      void *opaque)
{
    return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH);
}
#else
static int qemu_rbd_co_flush(BlockDriverState *bs)
{
#if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1)
    /* rbd_flush added in 0.1.1 */
    BDRVRBDState *s = bs->opaque;
    return rbd_flush(s->image);
#else
    return 0;
#endif
}
#endif
#ifdef LIBRBD_SUPPORTS_DISCARD
static BlockAIOCB* qemu_rbd_aio_discard(BlockDriverState *bs,
                                        int64_t sector_num,
                                        int nb_sectors,
                                        BlockCompletionFunc *cb,
                                        void *opaque)
{
    return rbd_start_aio(bs, sector_num, NULL, nb_sectors, cb, opaque,
                         RBD_AIO_DISCARD);
}
#endif

可見,讀寫和flush都是經過rbd_start_aio這個函數來完成。固然,這是在librbd支持異步flush的狀況下。那麼rbd_start_aio又是怎樣的呢?get

/*開始異步IO操做的函數*/
/*在參數列表當中,sector_num, qiov, nb_sectors, cb應該都是與qemu磁盤操做有關的參數*/
static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
                                 int64_t sector_num,
                                 QEMUIOVector *qiov,
                                 int nb_sectors,
                                 BlockCompletionFunc *cb,
                                 void *opaque,
                                 RBDAIOCmd cmd)
{
    RBDAIOCB *acb;
    RADOSCB *rcb = NULL;
    rbd_completion_t c;
    int64_t off, size;
    char *buf;
    int r;
    BDRVRBDState *s = bs->opaque;
    acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
    acb->cmd = cmd;
    acb->qiov = qiov;
    if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
        acb->bounce = NULL;
    } else {
        acb->bounce = qemu_try_blockalign(bs, qiov->size);
        if (acb->bounce == NULL) {
            goto failed;
        }
    }
    acb->ret = 0;
    acb->error = 0;
    acb->s = s;
    acb->bh = NULL;
    if (cmd == RBD_AIO_WRITE) {
        qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
    }
    buf = acb->bounce;
    off = sector_num * BDRV_SECTOR_SIZE;
    size = nb_sectors * BDRV_SECTOR_SIZE;
    rcb = g_new(RADOSCB, 1);
    rcb->acb = acb;
    rcb->buf = buf;
    rcb->s = acb->s;
    rcb->size = size;
    r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
    if (r < 0) {
        goto failed;
    }
 /*acb和rcb應該都是與磁盤IO相關的數據結構,描述了磁盤IO的信息,好比寫入位置的扇區位置,寫入大小,寫入信息等等。上面的代碼初始化了磁盤IO的信息*/
	
 /*根據command的類型,來對磁盤鏡像進行操做*/
    switch (cmd) {
    case RBD_AIO_WRITE:
        r = rbd_aio_write(s->image, off, size, buf, c);
        break;
    case RBD_AIO_READ:
        r = rbd_aio_read(s->image, off, size, buf, c);
        break;
    case RBD_AIO_DISCARD:
        r = rbd_aio_discard_wrapper(s->image, off, size, c);
        break;
    case RBD_AIO_FLUSH:
        r = rbd_aio_flush_wrapper(s->image, c);
        break;
    default:
        r = -EINVAL;
    }
    if (r < 0) {
        goto failed_completion;
    }
    return &acb->common;
failed_completion:
    rbd_aio_release(c);
failed:
    g_free(rcb);
    qemu_vfree(acb->bounce);
    qemu_aio_unref(acb);
    return NULL;
}

在對rbd.c的分析中,咱們能夠看到不管是讀仍是寫,最後都是對image的操做。cmd

那麼,咱們來rbd_open這個函數是如何利用IO上下文和snap來打開一個rbd鏡像的。

在Ceph的文檔裏並無介紹如何使用C/C++版的librbd,差評啊。只能本身對照qemu和Ceph的源碼進行分析了。

Ceph的源碼版本:0.94.1

在src/include/rbd/librbd.h中聲明瞭librbd的C語言接口

其中rbd_open的定義以下:

CEPH_RBD_API int rbd_open(rados_ioctx_t io, const char *name, rbd_image_t *image, const char *snap_name);

rbd_close的定義以下:

CEPH_RBD_API int rbd_close(rbd_image_t image);

咱們的目標是讓一個image鏈接到兩個pool,一個ssd pool和一個hdd pool。在更改源碼的過程中,咱們儘可能少的改動源碼。當前個人想法是:

能夠設置一個當前的默認pool,將image映射到該pool上,而後在運行的過程中,image對應的pool可能改變,這樣的話就沒必要改寫那些讀寫image的代碼。

librbd.h中對應的函數聲明在src/librbd/librbd.cc中定義

extern "C" int rbd_open(rados_ioctx_t p, const char *name, rbd_image_t *image,
 const char *snap_name) 
{
  librados::IoCtx io_ctx;
  librados::IoCtx::from_rados_ioctx_t(p, io_ctx);
  librbd::ImageCtx *ictx = new librbd::ImageCtx(name, "", snap_name, io_ctx,
 false);     //建立鏡像上下文
  tracepoint(librbd, open_image_enter, ictx, ictx->name.c_str(), ictx->id.c_str(), ictx->snap_name.c_str(), ictx->read_only);    //建立回溯點?
  int r = librbd::open_image(ictx);
    //打開rbd鏡像 
  if (r >= 0)
    *image = (rbd_image_t)ictx;
    將鏡像上下文賦給image,提供給函數的調用者使用
  tracepoint(librbd, open_image_exit, r);
  return r;
}
extern "C" int rbd_close(rbd_image_t image)
{
  librbd::ImageCtx *ctx = (librbd::ImageCtx *)image;
  tracepoint(librbd, close_image_enter, ctx, ctx->name.c_str(), ctx->id.c_str());
  librbd::close_image(ctx);
  tracepoint(librbd, close_image_exit);
  return 0;
}
相關文章
相關標籤/搜索