All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH] ceph/rbd block driver for qemu-kvm (v3)
@ 2010-05-31 19:31 ` Christian Brunner
  0 siblings, 0 replies; 19+ messages in thread
From: Christian Brunner @ 2010-05-31 19:31 UTC (permalink / raw)
  To: Kevin Wolf; +Cc: kvm, qemu-devel, ceph-devel

Hi Kevin,

here is an updated patch for the ceph/rbd driver. I hope that everything 
is fine now.

Regards,
Christian


This is a block driver for the distributed file system Ceph
(http://ceph.newdream.net/). This driver uses librados (which
is part of the Ceph server) for direct access to the Ceph object
store and is running entirely in userspace. Therefore it is
called "rbd" - rados block device.

To compile the driver a recent version of ceph (unstable/testing git
head or 0.20.3 once it is released) is needed.

Additional information is available on the Ceph-Wiki:

http://ceph.newdream.net/wiki/Kvm-rbd

The patch is based on git://repo.or.cz/qemu/kevin.git block


Signed-off-by: Christian Brunner <chb@muc.de>
---
 Makefile.objs     |    1 +
 block/rbd.c       |  600 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 block/rbd_types.h |   64 ++++++
 configure         |   31 +++
 4 files changed, 696 insertions(+), 0 deletions(-)
 create mode 100644 block/rbd.c
 create mode 100644 block/rbd_types.h

diff --git a/Makefile.objs b/Makefile.objs
index 1a942e5..08dc11f 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -18,6 +18,7 @@ block-nested-y += parallels.o nbd.o blkdebug.o
 block-nested-$(CONFIG_WIN32) += raw-win32.o
 block-nested-$(CONFIG_POSIX) += raw-posix.o
 block-nested-$(CONFIG_CURL) += curl.o
+block-nested-$(CONFIG_RBD) += rbd.o
 
 block-obj-y +=  $(addprefix block/, $(block-nested-y))
 
diff --git a/block/rbd.c b/block/rbd.c
new file mode 100644
index 0000000..4a60dda
--- /dev/null
+++ b/block/rbd.c
@@ -0,0 +1,600 @@
+/*
+ * QEMU Block driver for RADOS (Ceph)
+ *
+ * Copyright (C) 2010 Christian Brunner <chb@muc.de>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ *
+ */
+
+#include "qemu-common.h"
+#include "qemu-error.h"
+#include <sys/types.h>
+#include <stdbool.h>
+
+#include <qemu-common.h>
+
+#include "rbd_types.h"
+#include "module.h"
+#include "block_int.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <rados/librados.h>
+
+#include <signal.h>
+
+/*
+ * When specifying the image filename use:
+ *
+ * rbd:poolname/devicename
+ *
+ * poolname must be the name of an existing rados pool
+ *
+ * devicename is the basename for all objects used to
+ * emulate the raw device.
+ *
+ * Metadata information (image size, ...) is stored in an
+ * object with the name "devicename.rbd".
+ *
+ * The raw device is split into 4MB sized objects by default.
+ * The sequencenumber is encoded in a 12 byte long hex-string,
+ * and is attached to the devicename, separated by a dot.
+ * e.g. "devicename.1234567890ab"
+ *
+ */
+
+#define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
+
+typedef struct RBDAIOCB {
+    BlockDriverAIOCB common;
+    QEMUBH *bh;
+    int ret;
+    QEMUIOVector *qiov;
+    char *bounce;
+    int write;
+    int64_t sector_num;
+    int aiocnt;
+    int error;
+} RBDAIOCB;
+
+typedef struct RADOSCB {
+    int rcbid;
+    RBDAIOCB *acb;
+    int done;
+    int64_t segsize;
+    char *buf;
+} RADOSCB;
+
+typedef struct BDRVRBDState {
+    rados_pool_t pool;
+    char name[RBD_MAX_OBJ_NAME_SIZE];
+    uint64_t size;
+    uint64_t objsize;
+} BDRVRBDState;
+
+typedef struct rbd_obj_header_ondisk RbdHeader1;
+
+static int rbd_parsename(const char *filename, char *pool, char *name)
+{
+    const char *rbdname;
+    char *p;
+    int l;
+
+    if (!strstart(filename, "rbd:", &rbdname)) {
+        return -EINVAL;
+    }
+
+    pstrcpy(pool, 2 * RBD_MAX_SEG_NAME_SIZE, rbdname);
+    p = strchr(pool, '/');
+    if (p == NULL) {
+        return -EINVAL;
+    }
+
+    *p = '\0';
+
+    l = strlen(pool);
+    if(l >= RBD_MAX_SEG_NAME_SIZE) {
+        error_report("pool name to long");
+        return -EINVAL;
+    } else if (l <= 0) {
+        error_report("pool name to short");
+        return -EINVAL;
+    }
+
+    l = strlen(++p);
+    if (l >= RBD_MAX_OBJ_NAME_SIZE) {
+        error_report("object name to long");
+        return -EINVAL;
+    } else if (l <= 0) {
+        error_report("object name to short");
+        return -EINVAL;
+    }
+
+    strcpy(name, p);
+
+    return l;
+}
+
+static int create_tmap_op(uint8_t op, const char *name, char **tmap_desc)
+{
+    uint32_t len = strlen(name);
+    /* total_len = encoding op + name + empty buffer */
+    uint32_t total_len = 1 + (sizeof(uint32_t) + len) + sizeof(uint32_t);
+    char *desc = NULL;
+
+    qemu_malloc(total_len);
+
+    *tmap_desc = desc;
+
+    *desc = op;
+    desc++;
+    memcpy(desc, &len, sizeof(len));
+    desc += sizeof(len);
+    memcpy(desc, name, len);
+    desc += len;
+    len = 0;
+    memcpy(desc, &len, sizeof(len));
+    desc += sizeof(len);
+
+    return desc - *tmap_desc;
+}
+
+static void free_tmap_op(char *tmap_desc)
+{
+    qemu_free(tmap_desc);
+}
+
+static int rbd_register_image(rados_pool_t pool, const char *name)
+{
+    char *tmap_desc;
+    const char *dir = RBD_DIRECTORY;
+    int ret;
+
+    ret = create_tmap_op(CEPH_OSD_TMAP_SET, name, &tmap_desc);
+    if (ret < 0) {
+        return ret;
+    }
+
+    ret = rados_tmap_update(pool, dir, tmap_desc, ret);
+    free_tmap_op(tmap_desc);
+
+    return ret;
+}
+
+static int rbd_create(const char *filename, QEMUOptionParameter *options)
+{
+    int64_t bytes = 0;
+    int64_t objsize;
+    uint64_t size;
+    time_t mtime;
+    uint8_t obj_order = RBD_DEFAULT_OBJ_ORDER;
+    char pool[RBD_MAX_SEG_NAME_SIZE];
+    char n[RBD_MAX_SEG_NAME_SIZE];
+    char name[RBD_MAX_SEG_NAME_SIZE];
+    RbdHeader1 header;
+    rados_pool_t p;
+    int ret;
+
+    if (rbd_parsename(filename, pool, name) < 0) {
+        return -EINVAL;
+    }
+
+    snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s%s", name, RBD_SUFFIX);
+
+    /* Read out options */
+    while (options && options->name) {
+        if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
+            bytes = options->value.n;
+        } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) {
+            if (options->value.n) {
+                objsize = options->value.n;
+                if ((objsize - 1) & objsize) {    /* not a power of 2? */
+                    error_report("obj size needs to be power of 2");
+                    return -EINVAL;
+                }
+                if (objsize < 4096) {
+                    error_report("obj size too small");
+                    return -EINVAL;
+                }
+
+                for (obj_order = 0; obj_order < 64; obj_order++) {
+                    if (objsize == 1) {
+                        break;
+                    }
+                    objsize >>= 1;
+                }
+            }
+        }
+        options++;
+    }
+
+    memset(&header, 0, sizeof(header));
+    pstrcpy(header.text, sizeof(header.text), RBD_HEADER_TEXT);
+    pstrcpy(header.signature, sizeof(header.signature), RBD_HEADER_SIGNATURE);
+    pstrcpy(header.version, sizeof(header.version), RBD_HEADER_VERSION);
+    header.image_size = bytes;
+    cpu_to_le64s((uint64_t *) & header.image_size);
+    header.options.order = obj_order;
+    header.options.crypt_type = RBD_CRYPT_NONE;
+    header.options.comp_type = RBD_COMP_NONE;
+    header.snap_seq = 0;
+    header.snap_count = 0;
+    cpu_to_le32s(&header.snap_count);
+
+    if (rados_initialize(0, NULL) < 0) {
+        error_report("error initializing");
+        return -EIO;
+    }
+
+    if (rados_open_pool(pool, &p)) {
+        error_report("error opening pool %s", pool);
+        rados_deinitialize();
+        return -EIO;
+    }
+
+    /* check for existing rbd header file */
+    ret = rados_stat(p, n, &size, &mtime);
+    if (ret == 0) {
+        ret=-EEXIST;
+        goto done;
+    }
+
+    /* create header file */
+    ret = rados_write(p, n, 0, (const char *)&header, sizeof(header));
+    if (ret < 0) {
+        goto done;
+    }
+
+    ret = rbd_register_image(p, name);
+done:
+    rados_close_pool(p);
+    rados_deinitialize();
+
+    return ret;
+}
+
+static int rbd_open(BlockDriverState *bs, const char *filename, int flags)
+{
+    BDRVRBDState *s = bs->opaque;
+    char pool[RBD_MAX_SEG_NAME_SIZE];
+    char n[RBD_MAX_SEG_NAME_SIZE];
+    char hbuf[4096];
+    int r;
+
+    if (rbd_parsename(filename, pool, s->name) < 0) {
+        return -EINVAL;
+    }
+    snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s%s", s->name, RBD_SUFFIX);
+
+    if ((r = rados_initialize(0, NULL)) < 0) {
+        error_report("error initializing");
+        return r;
+    }
+
+    if ((r = rados_open_pool(pool, &s->pool))) {
+        error_report("error opening pool %s", pool);
+        rados_deinitialize();
+        return r;
+    }
+
+    if ((r = rados_read(s->pool, n, 0, hbuf, 4096)) < 0) {
+        error_report("error reading header from %s", s->name);
+        goto failed;
+    }
+
+    if (strncmp(hbuf + 64, RBD_HEADER_SIGNATURE, 4)) {
+        error_report("Invalid header signature %s", hbuf + 64);
+        r = -EMEDIUMTYPE;
+        goto failed;
+    }
+
+    if (strncmp(hbuf + 68, RBD_HEADER_VERSION, 8)) {
+        error_report("Unknown image version %s", hbuf + 68);
+        r = -EMEDIUMTYPE;
+        goto failed;
+    }
+
+    RbdHeader1 *header;
+
+    header = (RbdHeader1 *) hbuf;
+    le64_to_cpus((uint64_t *) & header->image_size);
+    s->size = header->image_size;
+    s->objsize = 1 << header->options.order;
+
+    return 0;
+
+failed:
+    rados_close_pool(s->pool);
+    rados_deinitialize();
+    return r;
+}
+
+static void rbd_close(BlockDriverState *bs)
+{
+    BDRVRBDState *s = bs->opaque;
+
+    rados_close_pool(s->pool);
+    rados_deinitialize();
+}
+
+static int rbd_rw(BlockDriverState *bs, int64_t sector_num,
+                  uint8_t *buf, int nb_sectors, int write)
+{
+    BDRVRBDState *s = bs->opaque;
+    char n[RBD_MAX_SEG_NAME_SIZE];
+
+    int64_t segnr, segoffs, segsize, r;
+    int64_t off, size;
+
+    off = sector_num * BDRV_SECTOR_SIZE;
+    size = nb_sectors * BDRV_SECTOR_SIZE;
+    segnr = off / s->objsize;
+    segoffs = off % s->objsize;
+    segsize = s->objsize - segoffs;
+
+    while (size > 0) {
+        if (size < segsize) {
+            segsize = size;
+        }
+
+        snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s.%012" PRIx64, s->name, segnr);
+
+        if (write) {
+            if ((r = rados_write(s->pool, n, segoffs, (const char *)buf,
+                segsize)) < 0) {
+                return r;
+            }
+        } else {
+            r = rados_read(s->pool, n, segoffs, (char *)buf, segsize);
+            if (r == -ENOENT) {
+                memset(buf, 0, segsize);
+            } else if (r < 0) {
+                return r;
+            } else if (r < segsize) {
+                memset(buf + r, 0, segsize - r);
+            }
+        }
+
+        buf += segsize;
+        size -= segsize;
+        segoffs = 0;
+        segsize = s->objsize;
+        segnr++;
+    }
+
+    return 0;
+}
+
+static int rbd_read(BlockDriverState *bs, int64_t sector_num,
+                    uint8_t *buf, int nb_sectors)
+{
+    return rbd_rw(bs, sector_num, buf, nb_sectors, 0);
+}
+
+static int rbd_write(BlockDriverState *bs, int64_t sector_num,
+                     const uint8_t *buf, int nb_sectors)
+{
+    return rbd_rw(bs, sector_num, (uint8_t *) buf, nb_sectors, 1);
+}
+
+static void rbd_aio_cancel(BlockDriverAIOCB *blockacb)
+{
+    RBDAIOCB *acb = (RBDAIOCB *) blockacb;
+    qemu_bh_delete(acb->bh);
+    acb->bh = NULL;
+    qemu_aio_release(acb);
+}
+
+static AIOPool rbd_aio_pool = {
+    .aiocb_size = sizeof(RBDAIOCB),
+    .cancel = rbd_aio_cancel,
+};
+
+/* This is the callback function for rados_aio_read and _write */
+static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
+{
+    RBDAIOCB *acb = rcb->acb;
+    int64_t r;
+    int i;
+
+    acb->aiocnt--;
+    r = rados_aio_get_return_value(c);
+    rados_aio_release(c);
+    if (acb->write) {
+        if (r < 0) {
+            acb->ret = r;
+            acb->error = 1;
+        } else if (!acb->error) {
+            acb->ret += rcb->segsize;
+        }
+    } else {
+        if (r == -ENOENT) {
+            memset(rcb->buf, 0, rcb->segsize);
+            if (!acb->error) {
+                acb->ret += rcb->segsize;
+            }
+        } else if (r < 0) {
+            acb->ret = r;
+            acb->error = 1;
+        } else if (r < rcb->segsize) {
+            memset(rcb->buf + r, 0, rcb->segsize - r);
+            if (!acb->error) {
+                acb->ret += rcb->segsize;
+            }
+        } else if (!acb->error) {
+            acb->ret += r;
+        }
+    }
+    qemu_free(rcb);
+    i = 0;
+    if (!acb->aiocnt && acb->bh) {
+        qemu_bh_schedule(acb->bh);
+    }
+}
+
+/* Callback when all queued rados_aio requests are complete */
+static void rbd_aio_bh_cb(void *opaque)
+{
+    RBDAIOCB *acb = opaque;
+
+    if (!acb->write) {
+        qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
+    }
+    qemu_vfree(acb->bounce);
+    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
+    qemu_bh_delete(acb->bh);
+    acb->bh = NULL;
+    qemu_aio_release(acb);
+}
+
+static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
+                                           int64_t sector_num,
+                                           QEMUIOVector *qiov,
+                                           int nb_sectors,
+                                           BlockDriverCompletionFunc *cb,
+                                           void *opaque, int write)
+{
+    RBDAIOCB *acb;
+    RADOSCB *rcb;
+    rados_completion_t c;
+    char n[RBD_MAX_SEG_NAME_SIZE];
+    int64_t segnr, segoffs, segsize, last_segnr;
+    int64_t off, size;
+    char *buf;
+
+    BDRVRBDState *s = bs->opaque;
+
+    acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque);
+    acb->write = write;
+    acb->qiov = qiov;
+    acb->bounce = qemu_blockalign(bs, qiov->size);
+    acb->aiocnt = 0;
+    acb->ret = 0;
+    acb->error = 0;
+
+    if (!acb->bh) {
+        acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
+    }
+
+    if (write) {
+        qemu_iovec_to_buffer(acb->qiov, acb->bounce);
+    }
+
+    buf = acb->bounce;
+
+    off = sector_num * BDRV_SECTOR_SIZE;
+    size = nb_sectors * BDRV_SECTOR_SIZE;
+    segnr = off / s->objsize;
+    segoffs = off % s->objsize;
+    segsize = s->objsize - segoffs;
+
+    last_segnr = ((off + size - 1) / s->objsize);
+    acb->aiocnt = (last_segnr - segnr) + 1;
+
+    while (size > 0) {
+        if (size < segsize) {
+            segsize = size;
+        }
+
+        snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s.%012llx", s->name,
+                 (long long unsigned int)segnr);
+
+        rcb = qemu_malloc(sizeof(RADOSCB));
+        rcb->done = 0;
+        rcb->acb = acb;
+        rcb->segsize = segsize;
+        rcb->buf = buf;
+
+        if (write) {
+            rados_aio_create_completion(rcb, NULL,
+                                        (rados_callback_t) rbd_finish_aiocb,
+                                        &c);
+            rados_aio_write(s->pool, n, segoffs, buf, segsize, c);
+        } else {
+            rados_aio_create_completion(rcb,
+                                        (rados_callback_t) rbd_finish_aiocb,
+                                        NULL, &c);
+            rados_aio_read(s->pool, n, segoffs, buf, segsize, c);
+        }
+
+        buf += segsize;
+        size -= segsize;
+        segoffs = 0;
+        segsize = s->objsize;
+        segnr++;
+    }
+
+    return &acb->common;
+}
+
+static BlockDriverAIOCB *rbd_aio_readv(BlockDriverState * bs,
+                                       int64_t sector_num, QEMUIOVector * qiov,
+                                       int nb_sectors,
+                                       BlockDriverCompletionFunc * cb,
+                                       void *opaque)
+{
+    return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
+}
+
+static BlockDriverAIOCB *rbd_aio_writev(BlockDriverState * bs,
+                                        int64_t sector_num, QEMUIOVector * qiov,
+                                        int nb_sectors,
+                                        BlockDriverCompletionFunc * cb,
+                                        void *opaque)
+{
+    return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
+}
+
+static int rbd_getinfo(BlockDriverState * bs, BlockDriverInfo * bdi)
+{
+    BDRVRBDState *s = bs->opaque;
+    bdi->cluster_size = s->objsize;
+    return 0;
+}
+
+static int64_t rbd_getlength(BlockDriverState * bs)
+{
+    BDRVRBDState *s = bs->opaque;
+
+    return s->size;
+}
+
+static QEMUOptionParameter rbd_create_options[] = {
+    {
+     .name = BLOCK_OPT_SIZE,
+     .type = OPT_SIZE,
+     .help = "Virtual disk size"
+    },
+    {
+     .name = BLOCK_OPT_CLUSTER_SIZE,
+     .type = OPT_SIZE,
+     .help = "RBD object size"
+    },
+    {NULL}
+};
+
+static BlockDriver bdrv_rbd = {
+    .format_name        = "rbd",
+    .instance_size      = sizeof(BDRVRBDState),
+    .bdrv_file_open     = rbd_open,
+    .bdrv_read          = rbd_read,
+    .bdrv_write         = rbd_write,
+    .bdrv_close         = rbd_close,
+    .bdrv_create        = rbd_create,
+    .bdrv_get_info      = rbd_getinfo,
+    .create_options     = rbd_create_options,
+    .bdrv_getlength     = rbd_getlength,
+    .protocol_name      = "rbd",
+
+    .bdrv_aio_readv     = rbd_aio_readv,
+    .bdrv_aio_writev    = rbd_aio_writev,
+};
+
+static void bdrv_rbd_init(void)
+{
+    bdrv_register(&bdrv_rbd);
+}
+
+block_init(bdrv_rbd_init);
diff --git a/block/rbd_types.h b/block/rbd_types.h
new file mode 100644
index 0000000..91ac4f9
--- /dev/null
+++ b/block/rbd_types.h
@@ -0,0 +1,64 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2010 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef QEMU_BLOCK_RBD_TYPES_H
+#define QEMU_BLOCK_RBD_TYPES_H
+
+
+/*
+ * rbd image 'foo' consists of objects
+ *   foo.rbd      - image metadata
+ *   foo.00000000
+ *   foo.00000001
+ *   ...          - data
+ */
+
+#define RBD_SUFFIX              ".rbd"
+#define RBD_DIRECTORY           "rbd_directory"
+
+#define RBD_DEFAULT_OBJ_ORDER   22   /* 4MB */
+
+#define RBD_MAX_OBJ_NAME_SIZE   96
+#define RBD_MAX_SEG_NAME_SIZE   128
+
+#define RBD_COMP_NONE           0
+#define RBD_CRYPT_NONE          0
+
+#define RBD_HEADER_TEXT         "<<< Rados Block Device Image >>>\n"
+#define RBD_HEADER_SIGNATURE    "RBD"
+#define RBD_HEADER_VERSION      "001.004"
+
+struct rbd_obj_snap_ondisk {
+    uint64_t id;
+    uint64_t image_size;
+} __attribute__((packed));
+
+struct rbd_obj_header_ondisk {
+    char text[64];
+    char signature[4];
+    char version[8];
+    struct {
+        uint8_t order;
+        uint8_t crypt_type;
+        uint8_t comp_type;
+        uint8_t unused;
+    } __attribute__((packed)) options;
+    uint64_t image_size;
+    uint64_t snap_seq;
+    uint32_t snap_count;
+    uint32_t reserved;
+    uint64_t snap_names_len;
+    struct rbd_obj_snap_ondisk snaps[0];
+} __attribute__((packed));
+
+
+#endif
diff --git a/configure b/configure
index 3cd2c5f..3f5c8ce 100755
--- a/configure
+++ b/configure
@@ -299,6 +299,7 @@ pkgversion=""
 check_utests="no"
 user_pie="no"
 zero_malloc=""
+rbd=""
 
 # OS specific
 if check_define __linux__ ; then
@@ -660,6 +661,10 @@ for opt do
   ;;
   --enable-vhost-net) vhost_net="yes"
   ;;
+  --disable-rbd) rbd="no"
+  ;;
+  --enable-rbd) rbd="yes"
+  ;;
   *) echo "ERROR: unknown option $opt"; show_help="yes"
   ;;
   esac
@@ -826,6 +831,7 @@ echo "  --enable-docs            enable documentation build"
 echo "  --disable-docs           disable documentation build"
 echo "  --disable-vhost-net      disable vhost-net acceleration support"
 echo "  --enable-vhost-net       enable vhost-net acceleration support"
+echo "  --enable-rbd		 enable building the rados block device (rbd)"
 echo ""
 echo "NOTE: The object files are built at the place where configure is launched"
 exit 1
@@ -1579,6 +1585,27 @@ if test "$mingw32" != yes -a "$pthread" = no; then
 fi
 
 ##########################################
+# rbd probe
+if test "$rbd" != "no" ; then
+  cat > $TMPC <<EOF
+#include <stdio.h>
+#include <rados/librados.h>
+int main(void) { rados_initialize(0, NULL); return 0; }
+EOF
+  rbd_libs="-lrados -lcrypto"
+  if compile_prog "" "$rbd_libs" ; then
+    rbd=yes
+    libs_tools="$rbd_libs $libs_tools"
+    libs_softmmu="$rbd_libs $libs_softmmu"
+  else
+    if test "$rbd" = "yes" ; then
+      feature_not_found "rados block device"
+    fi
+    rbd=no
+  fi
+fi
+
+##########################################
 # linux-aio probe
 
 if test "$linux_aio" != "no" ; then
@@ -2041,6 +2068,7 @@ echo "preadv support    $preadv"
 echo "fdatasync         $fdatasync"
 echo "uuid support      $uuid"
 echo "vhost-net support $vhost_net"
+echo "rbd support       $rbd"
 
 if test $sdl_too_old = "yes"; then
 echo "-> Your SDL version is too old - please upgrade to have SDL support"
@@ -2270,6 +2298,9 @@ echo "CONFIG_UNAME_RELEASE=\"$uname_release\"" >> $config_host_mak
 if test "$zero_malloc" = "yes" ; then
   echo "CONFIG_ZERO_MALLOC=y" >> $config_host_mak
 fi
+if test "$rbd" = "yes" ; then
+  echo "CONFIG_RBD=y" >> $config_host_mak
+fi
 
 # USB host support
 case "$usb" in
-- 
1.7.0.4


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [Qemu-devel] [PATCH] ceph/rbd block driver for qemu-kvm (v3)
@ 2010-05-31 19:31 ` Christian Brunner
  0 siblings, 0 replies; 19+ messages in thread
From: Christian Brunner @ 2010-05-31 19:31 UTC (permalink / raw)
  To: Kevin Wolf; +Cc: ceph-devel, qemu-devel, kvm

Hi Kevin,

here is an updated patch for the ceph/rbd driver. I hope that everything 
is fine now.

Regards,
Christian


This is a block driver for the distributed file system Ceph
(http://ceph.newdream.net/). This driver uses librados (which
is part of the Ceph server) for direct access to the Ceph object
store and is running entirely in userspace. Therefore it is
called "rbd" - rados block device.

To compile the driver a recent version of ceph (unstable/testing git
head or 0.20.3 once it is released) is needed.

Additional information is available on the Ceph-Wiki:

http://ceph.newdream.net/wiki/Kvm-rbd

The patch is based on git://repo.or.cz/qemu/kevin.git block


Signed-off-by: Christian Brunner <chb@muc.de>
---
 Makefile.objs     |    1 +
 block/rbd.c       |  600 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 block/rbd_types.h |   64 ++++++
 configure         |   31 +++
 4 files changed, 696 insertions(+), 0 deletions(-)
 create mode 100644 block/rbd.c
 create mode 100644 block/rbd_types.h

diff --git a/Makefile.objs b/Makefile.objs
index 1a942e5..08dc11f 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -18,6 +18,7 @@ block-nested-y += parallels.o nbd.o blkdebug.o
 block-nested-$(CONFIG_WIN32) += raw-win32.o
 block-nested-$(CONFIG_POSIX) += raw-posix.o
 block-nested-$(CONFIG_CURL) += curl.o
+block-nested-$(CONFIG_RBD) += rbd.o
 
 block-obj-y +=  $(addprefix block/, $(block-nested-y))
 
diff --git a/block/rbd.c b/block/rbd.c
new file mode 100644
index 0000000..4a60dda
--- /dev/null
+++ b/block/rbd.c
@@ -0,0 +1,600 @@
+/*
+ * QEMU Block driver for RADOS (Ceph)
+ *
+ * Copyright (C) 2010 Christian Brunner <chb@muc.de>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ *
+ */
+
+#include "qemu-common.h"
+#include "qemu-error.h"
+#include <sys/types.h>
+#include <stdbool.h>
+
+#include <qemu-common.h>
+
+#include "rbd_types.h"
+#include "module.h"
+#include "block_int.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <rados/librados.h>
+
+#include <signal.h>
+
+/*
+ * When specifying the image filename use:
+ *
+ * rbd:poolname/devicename
+ *
+ * poolname must be the name of an existing rados pool
+ *
+ * devicename is the basename for all objects used to
+ * emulate the raw device.
+ *
+ * Metadata information (image size, ...) is stored in an
+ * object with the name "devicename.rbd".
+ *
+ * The raw device is split into 4MB sized objects by default.
+ * The sequencenumber is encoded in a 12 byte long hex-string,
+ * and is attached to the devicename, separated by a dot.
+ * e.g. "devicename.1234567890ab"
+ *
+ */
+
+#define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
+
+typedef struct RBDAIOCB {
+    BlockDriverAIOCB common;
+    QEMUBH *bh;
+    int ret;
+    QEMUIOVector *qiov;
+    char *bounce;
+    int write;
+    int64_t sector_num;
+    int aiocnt;
+    int error;
+} RBDAIOCB;
+
+typedef struct RADOSCB {
+    int rcbid;
+    RBDAIOCB *acb;
+    int done;
+    int64_t segsize;
+    char *buf;
+} RADOSCB;
+
+typedef struct BDRVRBDState {
+    rados_pool_t pool;
+    char name[RBD_MAX_OBJ_NAME_SIZE];
+    uint64_t size;
+    uint64_t objsize;
+} BDRVRBDState;
+
+typedef struct rbd_obj_header_ondisk RbdHeader1;
+
+static int rbd_parsename(const char *filename, char *pool, char *name)
+{
+    const char *rbdname;
+    char *p;
+    int l;
+
+    if (!strstart(filename, "rbd:", &rbdname)) {
+        return -EINVAL;
+    }
+
+    pstrcpy(pool, 2 * RBD_MAX_SEG_NAME_SIZE, rbdname);
+    p = strchr(pool, '/');
+    if (p == NULL) {
+        return -EINVAL;
+    }
+
+    *p = '\0';
+
+    l = strlen(pool);
+    if(l >= RBD_MAX_SEG_NAME_SIZE) {
+        error_report("pool name to long");
+        return -EINVAL;
+    } else if (l <= 0) {
+        error_report("pool name to short");
+        return -EINVAL;
+    }
+
+    l = strlen(++p);
+    if (l >= RBD_MAX_OBJ_NAME_SIZE) {
+        error_report("object name to long");
+        return -EINVAL;
+    } else if (l <= 0) {
+        error_report("object name to short");
+        return -EINVAL;
+    }
+
+    strcpy(name, p);
+
+    return l;
+}
+
+static int create_tmap_op(uint8_t op, const char *name, char **tmap_desc)
+{
+    uint32_t len = strlen(name);
+    /* total_len = encoding op + name + empty buffer */
+    uint32_t total_len = 1 + (sizeof(uint32_t) + len) + sizeof(uint32_t);
+    char *desc = NULL;
+
+    qemu_malloc(total_len);
+
+    *tmap_desc = desc;
+
+    *desc = op;
+    desc++;
+    memcpy(desc, &len, sizeof(len));
+    desc += sizeof(len);
+    memcpy(desc, name, len);
+    desc += len;
+    len = 0;
+    memcpy(desc, &len, sizeof(len));
+    desc += sizeof(len);
+
+    return desc - *tmap_desc;
+}
+
+static void free_tmap_op(char *tmap_desc)
+{
+    qemu_free(tmap_desc);
+}
+
+static int rbd_register_image(rados_pool_t pool, const char *name)
+{
+    char *tmap_desc;
+    const char *dir = RBD_DIRECTORY;
+    int ret;
+
+    ret = create_tmap_op(CEPH_OSD_TMAP_SET, name, &tmap_desc);
+    if (ret < 0) {
+        return ret;
+    }
+
+    ret = rados_tmap_update(pool, dir, tmap_desc, ret);
+    free_tmap_op(tmap_desc);
+
+    return ret;
+}
+
+static int rbd_create(const char *filename, QEMUOptionParameter *options)
+{
+    int64_t bytes = 0;
+    int64_t objsize;
+    uint64_t size;
+    time_t mtime;
+    uint8_t obj_order = RBD_DEFAULT_OBJ_ORDER;
+    char pool[RBD_MAX_SEG_NAME_SIZE];
+    char n[RBD_MAX_SEG_NAME_SIZE];
+    char name[RBD_MAX_SEG_NAME_SIZE];
+    RbdHeader1 header;
+    rados_pool_t p;
+    int ret;
+
+    if (rbd_parsename(filename, pool, name) < 0) {
+        return -EINVAL;
+    }
+
+    snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s%s", name, RBD_SUFFIX);
+
+    /* Read out options */
+    while (options && options->name) {
+        if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
+            bytes = options->value.n;
+        } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) {
+            if (options->value.n) {
+                objsize = options->value.n;
+                if ((objsize - 1) & objsize) {    /* not a power of 2? */
+                    error_report("obj size needs to be power of 2");
+                    return -EINVAL;
+                }
+                if (objsize < 4096) {
+                    error_report("obj size too small");
+                    return -EINVAL;
+                }
+
+                for (obj_order = 0; obj_order < 64; obj_order++) {
+                    if (objsize == 1) {
+                        break;
+                    }
+                    objsize >>= 1;
+                }
+            }
+        }
+        options++;
+    }
+
+    memset(&header, 0, sizeof(header));
+    pstrcpy(header.text, sizeof(header.text), RBD_HEADER_TEXT);
+    pstrcpy(header.signature, sizeof(header.signature), RBD_HEADER_SIGNATURE);
+    pstrcpy(header.version, sizeof(header.version), RBD_HEADER_VERSION);
+    header.image_size = bytes;
+    cpu_to_le64s((uint64_t *) & header.image_size);
+    header.options.order = obj_order;
+    header.options.crypt_type = RBD_CRYPT_NONE;
+    header.options.comp_type = RBD_COMP_NONE;
+    header.snap_seq = 0;
+    header.snap_count = 0;
+    cpu_to_le32s(&header.snap_count);
+
+    if (rados_initialize(0, NULL) < 0) {
+        error_report("error initializing");
+        return -EIO;
+    }
+
+    if (rados_open_pool(pool, &p)) {
+        error_report("error opening pool %s", pool);
+        rados_deinitialize();
+        return -EIO;
+    }
+
+    /* check for existing rbd header file */
+    ret = rados_stat(p, n, &size, &mtime);
+    if (ret == 0) {
+        ret=-EEXIST;
+        goto done;
+    }
+
+    /* create header file */
+    ret = rados_write(p, n, 0, (const char *)&header, sizeof(header));
+    if (ret < 0) {
+        goto done;
+    }
+
+    ret = rbd_register_image(p, name);
+done:
+    rados_close_pool(p);
+    rados_deinitialize();
+
+    return ret;
+}
+
+static int rbd_open(BlockDriverState *bs, const char *filename, int flags)
+{
+    BDRVRBDState *s = bs->opaque;
+    char pool[RBD_MAX_SEG_NAME_SIZE];
+    char n[RBD_MAX_SEG_NAME_SIZE];
+    char hbuf[4096];
+    int r;
+
+    if (rbd_parsename(filename, pool, s->name) < 0) {
+        return -EINVAL;
+    }
+    snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s%s", s->name, RBD_SUFFIX);
+
+    if ((r = rados_initialize(0, NULL)) < 0) {
+        error_report("error initializing");
+        return r;
+    }
+
+    if ((r = rados_open_pool(pool, &s->pool))) {
+        error_report("error opening pool %s", pool);
+        rados_deinitialize();
+        return r;
+    }
+
+    if ((r = rados_read(s->pool, n, 0, hbuf, 4096)) < 0) {
+        error_report("error reading header from %s", s->name);
+        goto failed;
+    }
+
+    if (strncmp(hbuf + 64, RBD_HEADER_SIGNATURE, 4)) {
+        error_report("Invalid header signature %s", hbuf + 64);
+        r = -EMEDIUMTYPE;
+        goto failed;
+    }
+
+    if (strncmp(hbuf + 68, RBD_HEADER_VERSION, 8)) {
+        error_report("Unknown image version %s", hbuf + 68);
+        r = -EMEDIUMTYPE;
+        goto failed;
+    }
+
+    RbdHeader1 *header;
+
+    header = (RbdHeader1 *) hbuf;
+    le64_to_cpus((uint64_t *) & header->image_size);
+    s->size = header->image_size;
+    s->objsize = 1 << header->options.order;
+
+    return 0;
+
+failed:
+    rados_close_pool(s->pool);
+    rados_deinitialize();
+    return r;
+}
+
+static void rbd_close(BlockDriverState *bs)
+{
+    BDRVRBDState *s = bs->opaque;
+
+    rados_close_pool(s->pool);
+    rados_deinitialize();
+}
+
+static int rbd_rw(BlockDriverState *bs, int64_t sector_num,
+                  uint8_t *buf, int nb_sectors, int write)
+{
+    BDRVRBDState *s = bs->opaque;
+    char n[RBD_MAX_SEG_NAME_SIZE];
+
+    int64_t segnr, segoffs, segsize, r;
+    int64_t off, size;
+
+    off = sector_num * BDRV_SECTOR_SIZE;
+    size = nb_sectors * BDRV_SECTOR_SIZE;
+    segnr = off / s->objsize;
+    segoffs = off % s->objsize;
+    segsize = s->objsize - segoffs;
+
+    while (size > 0) {
+        if (size < segsize) {
+            segsize = size;
+        }
+
+        snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s.%012" PRIx64, s->name, segnr);
+
+        if (write) {
+            if ((r = rados_write(s->pool, n, segoffs, (const char *)buf,
+                segsize)) < 0) {
+                return r;
+            }
+        } else {
+            r = rados_read(s->pool, n, segoffs, (char *)buf, segsize);
+            if (r == -ENOENT) {
+                memset(buf, 0, segsize);
+            } else if (r < 0) {
+                return r;
+            } else if (r < segsize) {
+                memset(buf + r, 0, segsize - r);
+            }
+        }
+
+        buf += segsize;
+        size -= segsize;
+        segoffs = 0;
+        segsize = s->objsize;
+        segnr++;
+    }
+
+    return 0;
+}
+
+static int rbd_read(BlockDriverState *bs, int64_t sector_num,
+                    uint8_t *buf, int nb_sectors)
+{
+    return rbd_rw(bs, sector_num, buf, nb_sectors, 0);
+}
+
+static int rbd_write(BlockDriverState *bs, int64_t sector_num,
+                     const uint8_t *buf, int nb_sectors)
+{
+    return rbd_rw(bs, sector_num, (uint8_t *) buf, nb_sectors, 1);
+}
+
+static void rbd_aio_cancel(BlockDriverAIOCB *blockacb)
+{
+    RBDAIOCB *acb = (RBDAIOCB *) blockacb;
+    qemu_bh_delete(acb->bh);
+    acb->bh = NULL;
+    qemu_aio_release(acb);
+}
+
+static AIOPool rbd_aio_pool = {
+    .aiocb_size = sizeof(RBDAIOCB),
+    .cancel = rbd_aio_cancel,
+};
+
+/* This is the callback function for rados_aio_read and _write */
+static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
+{
+    RBDAIOCB *acb = rcb->acb;
+    int64_t r;
+    int i;
+
+    acb->aiocnt--;
+    r = rados_aio_get_return_value(c);
+    rados_aio_release(c);
+    if (acb->write) {
+        if (r < 0) {
+            acb->ret = r;
+            acb->error = 1;
+        } else if (!acb->error) {
+            acb->ret += rcb->segsize;
+        }
+    } else {
+        if (r == -ENOENT) {
+            memset(rcb->buf, 0, rcb->segsize);
+            if (!acb->error) {
+                acb->ret += rcb->segsize;
+            }
+        } else if (r < 0) {
+            acb->ret = r;
+            acb->error = 1;
+        } else if (r < rcb->segsize) {
+            memset(rcb->buf + r, 0, rcb->segsize - r);
+            if (!acb->error) {
+                acb->ret += rcb->segsize;
+            }
+        } else if (!acb->error) {
+            acb->ret += r;
+        }
+    }
+    qemu_free(rcb);
+    i = 0;
+    if (!acb->aiocnt && acb->bh) {
+        qemu_bh_schedule(acb->bh);
+    }
+}
+
+/* Callback when all queued rados_aio requests are complete */
+static void rbd_aio_bh_cb(void *opaque)
+{
+    RBDAIOCB *acb = opaque;
+
+    if (!acb->write) {
+        qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
+    }
+    qemu_vfree(acb->bounce);
+    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
+    qemu_bh_delete(acb->bh);
+    acb->bh = NULL;
+    qemu_aio_release(acb);
+}
+
+static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
+                                           int64_t sector_num,
+                                           QEMUIOVector *qiov,
+                                           int nb_sectors,
+                                           BlockDriverCompletionFunc *cb,
+                                           void *opaque, int write)
+{
+    RBDAIOCB *acb;
+    RADOSCB *rcb;
+    rados_completion_t c;
+    char n[RBD_MAX_SEG_NAME_SIZE];
+    int64_t segnr, segoffs, segsize, last_segnr;
+    int64_t off, size;
+    char *buf;
+
+    BDRVRBDState *s = bs->opaque;
+
+    acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque);
+    acb->write = write;
+    acb->qiov = qiov;
+    acb->bounce = qemu_blockalign(bs, qiov->size);
+    acb->aiocnt = 0;
+    acb->ret = 0;
+    acb->error = 0;
+
+    if (!acb->bh) {
+        acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
+    }
+
+    if (write) {
+        qemu_iovec_to_buffer(acb->qiov, acb->bounce);
+    }
+
+    buf = acb->bounce;
+
+    off = sector_num * BDRV_SECTOR_SIZE;
+    size = nb_sectors * BDRV_SECTOR_SIZE;
+    segnr = off / s->objsize;
+    segoffs = off % s->objsize;
+    segsize = s->objsize - segoffs;
+
+    last_segnr = ((off + size - 1) / s->objsize);
+    acb->aiocnt = (last_segnr - segnr) + 1;
+
+    while (size > 0) {
+        if (size < segsize) {
+            segsize = size;
+        }
+
+        snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s.%012llx", s->name,
+                 (long long unsigned int)segnr);
+
+        rcb = qemu_malloc(sizeof(RADOSCB));
+        rcb->done = 0;
+        rcb->acb = acb;
+        rcb->segsize = segsize;
+        rcb->buf = buf;
+
+        if (write) {
+            rados_aio_create_completion(rcb, NULL,
+                                        (rados_callback_t) rbd_finish_aiocb,
+                                        &c);
+            rados_aio_write(s->pool, n, segoffs, buf, segsize, c);
+        } else {
+            rados_aio_create_completion(rcb,
+                                        (rados_callback_t) rbd_finish_aiocb,
+                                        NULL, &c);
+            rados_aio_read(s->pool, n, segoffs, buf, segsize, c);
+        }
+
+        buf += segsize;
+        size -= segsize;
+        segoffs = 0;
+        segsize = s->objsize;
+        segnr++;
+    }
+
+    return &acb->common;
+}
+
+static BlockDriverAIOCB *rbd_aio_readv(BlockDriverState * bs,
+                                       int64_t sector_num, QEMUIOVector * qiov,
+                                       int nb_sectors,
+                                       BlockDriverCompletionFunc * cb,
+                                       void *opaque)
+{
+    return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
+}
+
+static BlockDriverAIOCB *rbd_aio_writev(BlockDriverState * bs,
+                                        int64_t sector_num, QEMUIOVector * qiov,
+                                        int nb_sectors,
+                                        BlockDriverCompletionFunc * cb,
+                                        void *opaque)
+{
+    return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
+}
+
+static int rbd_getinfo(BlockDriverState * bs, BlockDriverInfo * bdi)
+{
+    BDRVRBDState *s = bs->opaque;
+    bdi->cluster_size = s->objsize;
+    return 0;
+}
+
+static int64_t rbd_getlength(BlockDriverState * bs)
+{
+    BDRVRBDState *s = bs->opaque;
+
+    return s->size;
+}
+
+static QEMUOptionParameter rbd_create_options[] = {
+    {
+     .name = BLOCK_OPT_SIZE,
+     .type = OPT_SIZE,
+     .help = "Virtual disk size"
+    },
+    {
+     .name = BLOCK_OPT_CLUSTER_SIZE,
+     .type = OPT_SIZE,
+     .help = "RBD object size"
+    },
+    {NULL}
+};
+
+static BlockDriver bdrv_rbd = {
+    .format_name        = "rbd",
+    .instance_size      = sizeof(BDRVRBDState),
+    .bdrv_file_open     = rbd_open,
+    .bdrv_read          = rbd_read,
+    .bdrv_write         = rbd_write,
+    .bdrv_close         = rbd_close,
+    .bdrv_create        = rbd_create,
+    .bdrv_get_info      = rbd_getinfo,
+    .create_options     = rbd_create_options,
+    .bdrv_getlength     = rbd_getlength,
+    .protocol_name      = "rbd",
+
+    .bdrv_aio_readv     = rbd_aio_readv,
+    .bdrv_aio_writev    = rbd_aio_writev,
+};
+
+static void bdrv_rbd_init(void)
+{
+    bdrv_register(&bdrv_rbd);
+}
+
+block_init(bdrv_rbd_init);
diff --git a/block/rbd_types.h b/block/rbd_types.h
new file mode 100644
index 0000000..91ac4f9
--- /dev/null
+++ b/block/rbd_types.h
@@ -0,0 +1,64 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2010 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef QEMU_BLOCK_RBD_TYPES_H
+#define QEMU_BLOCK_RBD_TYPES_H
+
+
+/*
+ * rbd image 'foo' consists of objects
+ *   foo.rbd      - image metadata
+ *   foo.00000000
+ *   foo.00000001
+ *   ...          - data
+ */
+
+#define RBD_SUFFIX              ".rbd"
+#define RBD_DIRECTORY           "rbd_directory"
+
+#define RBD_DEFAULT_OBJ_ORDER   22   /* 4MB */
+
+#define RBD_MAX_OBJ_NAME_SIZE   96
+#define RBD_MAX_SEG_NAME_SIZE   128
+
+#define RBD_COMP_NONE           0
+#define RBD_CRYPT_NONE          0
+
+#define RBD_HEADER_TEXT         "<<< Rados Block Device Image >>>\n"
+#define RBD_HEADER_SIGNATURE    "RBD"
+#define RBD_HEADER_VERSION      "001.004"
+
+struct rbd_obj_snap_ondisk {
+    uint64_t id;
+    uint64_t image_size;
+} __attribute__((packed));
+
+struct rbd_obj_header_ondisk {
+    char text[64];
+    char signature[4];
+    char version[8];
+    struct {
+        uint8_t order;
+        uint8_t crypt_type;
+        uint8_t comp_type;
+        uint8_t unused;
+    } __attribute__((packed)) options;
+    uint64_t image_size;
+    uint64_t snap_seq;
+    uint32_t snap_count;
+    uint32_t reserved;
+    uint64_t snap_names_len;
+    struct rbd_obj_snap_ondisk snaps[0];
+} __attribute__((packed));
+
+
+#endif
diff --git a/configure b/configure
index 3cd2c5f..3f5c8ce 100755
--- a/configure
+++ b/configure
@@ -299,6 +299,7 @@ pkgversion=""
 check_utests="no"
 user_pie="no"
 zero_malloc=""
+rbd=""
 
 # OS specific
 if check_define __linux__ ; then
@@ -660,6 +661,10 @@ for opt do
   ;;
   --enable-vhost-net) vhost_net="yes"
   ;;
+  --disable-rbd) rbd="no"
+  ;;
+  --enable-rbd) rbd="yes"
+  ;;
   *) echo "ERROR: unknown option $opt"; show_help="yes"
   ;;
   esac
@@ -826,6 +831,7 @@ echo "  --enable-docs            enable documentation build"
 echo "  --disable-docs           disable documentation build"
 echo "  --disable-vhost-net      disable vhost-net acceleration support"
 echo "  --enable-vhost-net       enable vhost-net acceleration support"
+echo "  --enable-rbd		 enable building the rados block device (rbd)"
 echo ""
 echo "NOTE: The object files are built at the place where configure is launched"
 exit 1
@@ -1579,6 +1585,27 @@ if test "$mingw32" != yes -a "$pthread" = no; then
 fi
 
 ##########################################
+# rbd probe
+if test "$rbd" != "no" ; then
+  cat > $TMPC <<EOF
+#include <stdio.h>
+#include <rados/librados.h>
+int main(void) { rados_initialize(0, NULL); return 0; }
+EOF
+  rbd_libs="-lrados -lcrypto"
+  if compile_prog "" "$rbd_libs" ; then
+    rbd=yes
+    libs_tools="$rbd_libs $libs_tools"
+    libs_softmmu="$rbd_libs $libs_softmmu"
+  else
+    if test "$rbd" = "yes" ; then
+      feature_not_found "rados block device"
+    fi
+    rbd=no
+  fi
+fi
+
+##########################################
 # linux-aio probe
 
 if test "$linux_aio" != "no" ; then
@@ -2041,6 +2068,7 @@ echo "preadv support    $preadv"
 echo "fdatasync         $fdatasync"
 echo "uuid support      $uuid"
 echo "vhost-net support $vhost_net"
+echo "rbd support       $rbd"
 
 if test $sdl_too_old = "yes"; then
 echo "-> Your SDL version is too old - please upgrade to have SDL support"
@@ -2270,6 +2298,9 @@ echo "CONFIG_UNAME_RELEASE=\"$uname_release\"" >> $config_host_mak
 if test "$zero_malloc" = "yes" ; then
   echo "CONFIG_ZERO_MALLOC=y" >> $config_host_mak
 fi
+if test "$rbd" = "yes" ; then
+  echo "CONFIG_RBD=y" >> $config_host_mak
+fi
 
 # USB host support
 case "$usb" in
-- 
1.7.0.4

^ permalink raw reply related	[flat|nested] 19+ messages in thread

* Re: [Qemu-devel] [PATCH] ceph/rbd block driver for qemu-kvm (v3)
  2010-05-31 19:31 ` [Qemu-devel] " Christian Brunner
  (?)
@ 2010-06-01  8:43 ` Kevin Wolf
  2010-06-02  7:42   ` Christian Brunner
  -1 siblings, 1 reply; 19+ messages in thread
From: Kevin Wolf @ 2010-06-01  8:43 UTC (permalink / raw)
  To: Christian Brunner; +Cc: ceph-devel, qemu-devel, kvm

Hi Christian,

Am 31.05.2010 21:31, schrieb Christian Brunner:
> Hi Kevin,
> 
> here is an updated patch for the ceph/rbd driver. I hope that everything 
> is fine now.

I'll try to get to give it a final review later this week. In the
meantime, I would be happy to see another review by someone else.

Do you have some specific tests for the driver or should we extend
qemu-iotests to work with protocols and use only that?

Kevin

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [Qemu-devel] [PATCH] ceph/rbd block driver for qemu-kvm (v3)
  2010-06-01  8:43 ` Kevin Wolf
@ 2010-06-02  7:42   ` Christian Brunner
  0 siblings, 0 replies; 19+ messages in thread
From: Christian Brunner @ 2010-06-02  7:42 UTC (permalink / raw)
  To: Kevin Wolf; +Cc: ceph-devel, qemu-devel, kvm

Hi Kevin,

2010/6/1 Kevin Wolf <kwolf@redhat.com>:
> Do you have some specific tests for the driver or should we extend
> qemu-iotests to work with protocols and use only that?

Right now I don't have any specific tests, but I'll take a look at
qemu-iotests soon.

Christian

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH] ceph/rbd block driver for qemu-kvm (v3)
  2010-05-31 19:31 ` [Qemu-devel] " Christian Brunner
@ 2010-06-11 19:51   ` Simone Gotti
  -1 siblings, 0 replies; 19+ messages in thread
From: Simone Gotti @ 2010-06-11 19:51 UTC (permalink / raw)
  To: Christian Brunner; +Cc: Kevin Wolf, kvm, qemu-devel, ceph-devel

Hi Christian,

thanks for you patch. I tried it a little and it worked quite well but
during some live migration tests I noticed a problem.


The problem is related to live migration with high I/O using the AIO
calls (I triggered it with a simple "dd").

If you launch a live migration and the guest is stopped and started on
the new qemu process while some AIO was in flight the guest on the new
qemu will wait undefinitely for data this will never come. With ata
emulation an ata reset is sent after some seconds but with virtio this
won't happen.

I'm not a qemu expert but from what I understand qemu in
savevm.c:do_savevm calls qemu_aio_flush to wait that all the asyncronous
aio returned (the callback si called). But the rbd block driver doesn't
use the qemu aio model but the rados one so that function will never
know of the rados aio.

So a solution will be to glue the block driver with the qemu aio model.
I tried to do this to test if this will work in the attached patch. I
only tested with one rbd block device but the live migration tests
worked (in the patch I removed all the debug prints I adedd to see if
all AIO requets really returned.

This is an RFC just to know what you think about this possible solution.
As qemu's aio model is event based and it needs a file descriptor for
event communication i used eventfd to do this.
Let me know if you need a detailed description of the patch!


I've also got a question: as librados is multithreaded the callbacks are
called in another thread. Is there the need to protect some critical
sections with a lock (for example in rbd_aio_rw_vector and in
rbd_finish_aiocb)?


Thanks!

Bye!


From: Simone Gotti <simone.gotti@gmail.com>
Date: Fri, 11 Jun 2010 21:19:39 +0200
Subject: [PATCH] block/rbd: Added glue to qemu aio model to fix live
migration with outstanding aio

Signed-off-by: Simone Gotti <simone.gotti@gmail.com>


---
 block/rbd.c |   63
+++++++++++++++++++++++++++++++++++++++++++++++++++++-----
 1 files changed, 57 insertions(+), 6 deletions(-)

diff --git a/block/rbd.c b/block/rbd.c
index 4d22069..83b7898 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -25,6 +25,8 @@
 
 #include <signal.h>
 
+#include <sys/eventfd.h>
+
 /*
  * When specifying the image filename use:
  *
@@ -47,6 +49,15 @@
 
 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
 
+typedef struct BDRVRBDState {
+    int efd;
+    rados_pool_t pool;
+    char name[RBD_MAX_OBJ_NAME_SIZE];
+    uint64_t size;
+    uint64_t objsize;
+    int qemu_aio_count;
+} BDRVRBDState;
+
 typedef struct RBDAIOCB {
     BlockDriverAIOCB common;
     QEMUBH *bh;
@@ -57,6 +68,7 @@ typedef struct RBDAIOCB {
     int64_t sector_num;
     int aiocnt;
     int error;
+    BDRVRBDState *s;
 } RBDAIOCB;
 
 typedef struct RADOSCB {
@@ -67,12 +79,6 @@ typedef struct RADOSCB {
     char *buf;
 } RADOSCB;
 
-typedef struct BDRVRBDState {
-    rados_pool_t pool;
-    char name[RBD_MAX_OBJ_NAME_SIZE];
-    uint64_t size;
-    uint64_t objsize;
-} BDRVRBDState;
 
 typedef struct rbd_obj_header_ondisk RbdHeader1;
 
@@ -255,6 +261,31 @@ done:
     return ret;
 }
 
+static void rbd_aio_completion_cb(void *opaque)
+{
+    BDRVRBDState *s = opaque;
+
+    uint64_t val;
+    ssize_t ret;
+
+    do {
+        if ((ret = read(s->efd, &val, sizeof(val))) > 0) {
+            s->qemu_aio_count -= val;
+       }
+    } while (ret == -1 && errno == EINTR);
+
+    return;
+}
+
+static int rbd_aio_flush_cb(void *opaque)
+{
+    BDRVRBDState *s = opaque;
+
+    return (s->qemu_aio_count > 0) ? 1 : 0;
+}
+
+
+
 static int rbd_open(BlockDriverState *bs, const char *filename, int flags)
 {
     BDRVRBDState *s = bs->opaque;
@@ -303,6 +334,15 @@ static int rbd_open(BlockDriverState *bs, const
char *filename, int flags)
     s->size = header->image_size;
     s->objsize = 1 << header->options.order;
 
+    s->efd = eventfd(0, 0);
+    if (s->efd == -1) {
+        error_report("error opening eventfd");
+        goto failed;
+    }
+    fcntl(s->efd, F_SETFL, O_NONBLOCK);
+    qemu_aio_set_fd_handler(s->efd, rbd_aio_completion_cb, NULL,
+        rbd_aio_flush_cb, NULL, s);
+
     return 0;
 
 failed:
@@ -393,6 +433,7 @@ static AIOPool rbd_aio_pool = {
 };
 
 /* This is the callback function for rados_aio_read and _write */
+
 static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
 {
     RBDAIOCB *acb = rcb->acb;
@@ -427,6 +468,8 @@ static void rbd_finish_aiocb(rados_completion_t c,
RADOSCB *rcb)
             acb->ret += r;
         }
     }
+    uint64_t buf = 1;
+    write(acb->s->efd, &buf, sizeof(buf));
     qemu_free(rcb);
     i = 0;
     if (!acb->aiocnt && acb->bh) {
@@ -435,6 +478,7 @@ static void rbd_finish_aiocb(rados_completion_t c,
RADOSCB *rcb)
 }
 
 /* Callback when all queued rados_aio requests are complete */
+
 static void rbd_aio_bh_cb(void *opaque)
 {
     RBDAIOCB *acb = opaque;
@@ -446,6 +490,10 @@ static void rbd_aio_bh_cb(void *opaque)
     acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
     qemu_bh_delete(acb->bh);
     acb->bh = NULL;
+
+    uint64_t buf = 1;
+    write(acb->s->efd, &buf, sizeof(buf));
+
     qemu_aio_release(acb);
 }
 
@@ -473,6 +521,7 @@ static BlockDriverAIOCB
*rbd_aio_rw_vector(BlockDriverState *bs,
     acb->aiocnt = 0;
     acb->ret = 0;
     acb->error = 0;
+    acb->s = s;
 
     if (!acb->bh) {
         acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
@@ -493,6 +542,8 @@ static BlockDriverAIOCB
*rbd_aio_rw_vector(BlockDriverState *bs,
     last_segnr = ((off + size - 1) / s->objsize);
     acb->aiocnt = (last_segnr - segnr) + 1;
 
+    s->qemu_aio_count+=acb->aiocnt + 1; /* All the RADOSCB and the
related RBDAIOCB */
+
     while (size > 0) {
         if (size < segsize) {
             segsize = size;
-- 
1.7.0.1




 

On 05/31/2010 09:31 PM, Christian Brunner wrote:
> Hi Kevin,
>
> here is an updated patch for the ceph/rbd driver. I hope that everything 
> is fine now.
>
> Regards,
> Christian
>
>
> This is a block driver for the distributed file system Ceph
> (http://ceph.newdream.net/). This driver uses librados (which
> is part of the Ceph server) for direct access to the Ceph object
> store and is running entirely in userspace. Therefore it is
> called "rbd" - rados block device.
>
> To compile the driver a recent version of ceph (unstable/testing git
> head or 0.20.3 once it is released) is needed.
>
> Additional information is available on the Ceph-Wiki:
>
> http://ceph.newdream.net/wiki/Kvm-rbd
>
> The patch is based on git://repo.or.cz/qemu/kevin.git block
>
>
> Signed-off-by: Christian Brunner <chb@muc.de>
> ---
>  Makefile.objs     |    1 +
>  block/rbd.c       |  600 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>  block/rbd_types.h |   64 ++++++
>  configure         |   31 +++
>  4 files changed, 696 insertions(+), 0 deletions(-)
>  create mode 100644 block/rbd.c
>  create mode 100644 block/rbd_types.h
>
> diff --git a/Makefile.objs b/Makefile.objs
> index 1a942e5..08dc11f 100644
> --- a/Makefile.objs
> +++ b/Makefile.objs
> @@ -18,6 +18,7 @@ block-nested-y += parallels.o nbd.o blkdebug.o
>  block-nested-$(CONFIG_WIN32) += raw-win32.o
>  block-nested-$(CONFIG_POSIX) += raw-posix.o
>  block-nested-$(CONFIG_CURL) += curl.o
> +block-nested-$(CONFIG_RBD) += rbd.o
>  
>  block-obj-y +=  $(addprefix block/, $(block-nested-y))
>  
> diff --git a/block/rbd.c b/block/rbd.c
> new file mode 100644
> index 0000000..4a60dda
> --- /dev/null
> +++ b/block/rbd.c
> @@ -0,0 +1,600 @@
> +/*
> + * QEMU Block driver for RADOS (Ceph)
> + *
> + * Copyright (C) 2010 Christian Brunner <chb@muc.de>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2.  See
> + * the COPYING file in the top-level directory.
> + *
> + */
> +
> +#include "qemu-common.h"
> +#include "qemu-error.h"
> +#include <sys/types.h>
> +#include <stdbool.h>
> +
> +#include <qemu-common.h>
> +
> +#include "rbd_types.h"
> +#include "module.h"
> +#include "block_int.h"
> +
> +#include <stdio.h>
> +#include <stdlib.h>
> +#include <rados/librados.h>
> +
> +#include <signal.h>
> +
> +/*
> + * When specifying the image filename use:
> + *
> + * rbd:poolname/devicename
> + *
> + * poolname must be the name of an existing rados pool
> + *
> + * devicename is the basename for all objects used to
> + * emulate the raw device.
> + *
> + * Metadata information (image size, ...) is stored in an
> + * object with the name "devicename.rbd".
> + *
> + * The raw device is split into 4MB sized objects by default.
> + * The sequencenumber is encoded in a 12 byte long hex-string,
> + * and is attached to the devicename, separated by a dot.
> + * e.g. "devicename.1234567890ab"
> + *
> + */
> +
> +#define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
> +
> +typedef struct RBDAIOCB {
> +    BlockDriverAIOCB common;
> +    QEMUBH *bh;
> +    int ret;
> +    QEMUIOVector *qiov;
> +    char *bounce;
> +    int write;
> +    int64_t sector_num;
> +    int aiocnt;
> +    int error;
> +} RBDAIOCB;
> +
> +typedef struct RADOSCB {
> +    int rcbid;
> +    RBDAIOCB *acb;
> +    int done;
> +    int64_t segsize;
> +    char *buf;
> +} RADOSCB;
> +
> +typedef struct BDRVRBDState {
> +    rados_pool_t pool;
> +    char name[RBD_MAX_OBJ_NAME_SIZE];
> +    uint64_t size;
> +    uint64_t objsize;
> +} BDRVRBDState;
> +
> +typedef struct rbd_obj_header_ondisk RbdHeader1;
> +
> +static int rbd_parsename(const char *filename, char *pool, char *name)
> +{
> +    const char *rbdname;
> +    char *p;
> +    int l;
> +
> +    if (!strstart(filename, "rbd:", &rbdname)) {
> +        return -EINVAL;
> +    }
> +
> +    pstrcpy(pool, 2 * RBD_MAX_SEG_NAME_SIZE, rbdname);
> +    p = strchr(pool, '/');
> +    if (p == NULL) {
> +        return -EINVAL;
> +    }
> +
> +    *p = '\0';
> +
> +    l = strlen(pool);
> +    if(l >= RBD_MAX_SEG_NAME_SIZE) {
> +        error_report("pool name to long");
> +        return -EINVAL;
> +    } else if (l <= 0) {
> +        error_report("pool name to short");
> +        return -EINVAL;
> +    }
> +
> +    l = strlen(++p);
> +    if (l >= RBD_MAX_OBJ_NAME_SIZE) {
> +        error_report("object name to long");
> +        return -EINVAL;
> +    } else if (l <= 0) {
> +        error_report("object name to short");
> +        return -EINVAL;
> +    }
> +
> +    strcpy(name, p);
> +
> +    return l;
> +}
> +
> +static int create_tmap_op(uint8_t op, const char *name, char **tmap_desc)
> +{
> +    uint32_t len = strlen(name);
> +    /* total_len = encoding op + name + empty buffer */
> +    uint32_t total_len = 1 + (sizeof(uint32_t) + len) + sizeof(uint32_t);
> +    char *desc = NULL;
> +
> +    qemu_malloc(total_len);
> +
> +    *tmap_desc = desc;
> +
> +    *desc = op;
> +    desc++;
> +    memcpy(desc, &len, sizeof(len));
> +    desc += sizeof(len);
> +    memcpy(desc, name, len);
> +    desc += len;
> +    len = 0;
> +    memcpy(desc, &len, sizeof(len));
> +    desc += sizeof(len);
> +
> +    return desc - *tmap_desc;
> +}
> +
> +static void free_tmap_op(char *tmap_desc)
> +{
> +    qemu_free(tmap_desc);
> +}
> +
> +static int rbd_register_image(rados_pool_t pool, const char *name)
> +{
> +    char *tmap_desc;
> +    const char *dir = RBD_DIRECTORY;
> +    int ret;
> +
> +    ret = create_tmap_op(CEPH_OSD_TMAP_SET, name, &tmap_desc);
> +    if (ret < 0) {
> +        return ret;
> +    }
> +
> +    ret = rados_tmap_update(pool, dir, tmap_desc, ret);
> +    free_tmap_op(tmap_desc);
> +
> +    return ret;
> +}
> +
> +static int rbd_create(const char *filename, QEMUOptionParameter *options)
> +{
> +    int64_t bytes = 0;
> +    int64_t objsize;
> +    uint64_t size;
> +    time_t mtime;
> +    uint8_t obj_order = RBD_DEFAULT_OBJ_ORDER;
> +    char pool[RBD_MAX_SEG_NAME_SIZE];
> +    char n[RBD_MAX_SEG_NAME_SIZE];
> +    char name[RBD_MAX_SEG_NAME_SIZE];
> +    RbdHeader1 header;
> +    rados_pool_t p;
> +    int ret;
> +
> +    if (rbd_parsename(filename, pool, name) < 0) {
> +        return -EINVAL;
> +    }
> +
> +    snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s%s", name, RBD_SUFFIX);
> +
> +    /* Read out options */
> +    while (options && options->name) {
> +        if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
> +            bytes = options->value.n;
> +        } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) {
> +            if (options->value.n) {
> +                objsize = options->value.n;
> +                if ((objsize - 1) & objsize) {    /* not a power of 2? */
> +                    error_report("obj size needs to be power of 2");
> +                    return -EINVAL;
> +                }
> +                if (objsize < 4096) {
> +                    error_report("obj size too small");
> +                    return -EINVAL;
> +                }
> +
> +                for (obj_order = 0; obj_order < 64; obj_order++) {
> +                    if (objsize == 1) {
> +                        break;
> +                    }
> +                    objsize >>= 1;
> +                }
> +            }
> +        }
> +        options++;
> +    }
> +
> +    memset(&header, 0, sizeof(header));
> +    pstrcpy(header.text, sizeof(header.text), RBD_HEADER_TEXT);
> +    pstrcpy(header.signature, sizeof(header.signature), RBD_HEADER_SIGNATURE);
> +    pstrcpy(header.version, sizeof(header.version), RBD_HEADER_VERSION);
> +    header.image_size = bytes;
> +    cpu_to_le64s((uint64_t *) & header.image_size);
> +    header.options.order = obj_order;
> +    header.options.crypt_type = RBD_CRYPT_NONE;
> +    header.options.comp_type = RBD_COMP_NONE;
> +    header.snap_seq = 0;
> +    header.snap_count = 0;
> +    cpu_to_le32s(&header.snap_count);
> +
> +    if (rados_initialize(0, NULL) < 0) {
> +        error_report("error initializing");
> +        return -EIO;
> +    }
> +
> +    if (rados_open_pool(pool, &p)) {
> +        error_report("error opening pool %s", pool);
> +        rados_deinitialize();
> +        return -EIO;
> +    }
> +
> +    /* check for existing rbd header file */
> +    ret = rados_stat(p, n, &size, &mtime);
> +    if (ret == 0) {
> +        ret=-EEXIST;
> +        goto done;
> +    }
> +
> +    /* create header file */
> +    ret = rados_write(p, n, 0, (const char *)&header, sizeof(header));
> +    if (ret < 0) {
> +        goto done;
> +    }
> +
> +    ret = rbd_register_image(p, name);
> +done:
> +    rados_close_pool(p);
> +    rados_deinitialize();
> +
> +    return ret;
> +}
> +
> +static int rbd_open(BlockDriverState *bs, const char *filename, int flags)
> +{
> +    BDRVRBDState *s = bs->opaque;
> +    char pool[RBD_MAX_SEG_NAME_SIZE];
> +    char n[RBD_MAX_SEG_NAME_SIZE];
> +    char hbuf[4096];
> +    int r;
> +
> +    if (rbd_parsename(filename, pool, s->name) < 0) {
> +        return -EINVAL;
> +    }
> +    snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s%s", s->name, RBD_SUFFIX);
> +
> +    if ((r = rados_initialize(0, NULL)) < 0) {
> +        error_report("error initializing");
> +        return r;
> +    }
> +
> +    if ((r = rados_open_pool(pool, &s->pool))) {
> +        error_report("error opening pool %s", pool);
> +        rados_deinitialize();
> +        return r;
> +    }
> +
> +    if ((r = rados_read(s->pool, n, 0, hbuf, 4096)) < 0) {
> +        error_report("error reading header from %s", s->name);
> +        goto failed;
> +    }
> +
> +    if (strncmp(hbuf + 64, RBD_HEADER_SIGNATURE, 4)) {
> +        error_report("Invalid header signature %s", hbuf + 64);
> +        r = -EMEDIUMTYPE;
> +        goto failed;
> +    }
> +
> +    if (strncmp(hbuf + 68, RBD_HEADER_VERSION, 8)) {
> +        error_report("Unknown image version %s", hbuf + 68);
> +        r = -EMEDIUMTYPE;
> +        goto failed;
> +    }
> +
> +    RbdHeader1 *header;
> +
> +    header = (RbdHeader1 *) hbuf;
> +    le64_to_cpus((uint64_t *) & header->image_size);
> +    s->size = header->image_size;
> +    s->objsize = 1 << header->options.order;
> +
> +    return 0;
> +
> +failed:
> +    rados_close_pool(s->pool);
> +    rados_deinitialize();
> +    return r;
> +}
> +
> +static void rbd_close(BlockDriverState *bs)
> +{
> +    BDRVRBDState *s = bs->opaque;
> +
> +    rados_close_pool(s->pool);
> +    rados_deinitialize();
> +}
> +
> +static int rbd_rw(BlockDriverState *bs, int64_t sector_num,
> +                  uint8_t *buf, int nb_sectors, int write)
> +{
> +    BDRVRBDState *s = bs->opaque;
> +    char n[RBD_MAX_SEG_NAME_SIZE];
> +
> +    int64_t segnr, segoffs, segsize, r;
> +    int64_t off, size;
> +
> +    off = sector_num * BDRV_SECTOR_SIZE;
> +    size = nb_sectors * BDRV_SECTOR_SIZE;
> +    segnr = off / s->objsize;
> +    segoffs = off % s->objsize;
> +    segsize = s->objsize - segoffs;
> +
> +    while (size > 0) {
> +        if (size < segsize) {
> +            segsize = size;
> +        }
> +
> +        snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s.%012" PRIx64, s->name, segnr);
> +
> +        if (write) {
> +            if ((r = rados_write(s->pool, n, segoffs, (const char *)buf,
> +                segsize)) < 0) {
> +                return r;
> +            }
> +        } else {
> +            r = rados_read(s->pool, n, segoffs, (char *)buf, segsize);
> +            if (r == -ENOENT) {
> +                memset(buf, 0, segsize);
> +            } else if (r < 0) {
> +                return r;
> +            } else if (r < segsize) {
> +                memset(buf + r, 0, segsize - r);
> +            }
> +        }
> +
> +        buf += segsize;
> +        size -= segsize;
> +        segoffs = 0;
> +        segsize = s->objsize;
> +        segnr++;
> +    }
> +
> +    return 0;
> +}
> +
> +static int rbd_read(BlockDriverState *bs, int64_t sector_num,
> +                    uint8_t *buf, int nb_sectors)
> +{
> +    return rbd_rw(bs, sector_num, buf, nb_sectors, 0);
> +}
> +
> +static int rbd_write(BlockDriverState *bs, int64_t sector_num,
> +                     const uint8_t *buf, int nb_sectors)
> +{
> +    return rbd_rw(bs, sector_num, (uint8_t *) buf, nb_sectors, 1);
> +}
> +
> +static void rbd_aio_cancel(BlockDriverAIOCB *blockacb)
> +{
> +    RBDAIOCB *acb = (RBDAIOCB *) blockacb;
> +    qemu_bh_delete(acb->bh);
> +    acb->bh = NULL;
> +    qemu_aio_release(acb);
> +}
> +
> +static AIOPool rbd_aio_pool = {
> +    .aiocb_size = sizeof(RBDAIOCB),
> +    .cancel = rbd_aio_cancel,
> +};
> +
> +/* This is the callback function for rados_aio_read and _write */
> +static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
> +{
> +    RBDAIOCB *acb = rcb->acb;
> +    int64_t r;
> +    int i;
> +
> +    acb->aiocnt--;
> +    r = rados_aio_get_return_value(c);
> +    rados_aio_release(c);
> +    if (acb->write) {
> +        if (r < 0) {
> +            acb->ret = r;
> +            acb->error = 1;
> +        } else if (!acb->error) {
> +            acb->ret += rcb->segsize;
> +        }
> +    } else {
> +        if (r == -ENOENT) {
> +            memset(rcb->buf, 0, rcb->segsize);
> +            if (!acb->error) {
> +                acb->ret += rcb->segsize;
> +            }
> +        } else if (r < 0) {
> +            acb->ret = r;
> +            acb->error = 1;
> +        } else if (r < rcb->segsize) {
> +            memset(rcb->buf + r, 0, rcb->segsize - r);
> +            if (!acb->error) {
> +                acb->ret += rcb->segsize;
> +            }
> +        } else if (!acb->error) {
> +            acb->ret += r;
> +        }
> +    }
> +    qemu_free(rcb);
> +    i = 0;
> +    if (!acb->aiocnt && acb->bh) {
> +        qemu_bh_schedule(acb->bh);
> +    }
> +}
> +
> +/* Callback when all queued rados_aio requests are complete */
> +static void rbd_aio_bh_cb(void *opaque)
> +{
> +    RBDAIOCB *acb = opaque;
> +
> +    if (!acb->write) {
> +        qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
> +    }
> +    qemu_vfree(acb->bounce);
> +    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
> +    qemu_bh_delete(acb->bh);
> +    acb->bh = NULL;
> +    qemu_aio_release(acb);
> +}
> +
> +static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
> +                                           int64_t sector_num,
> +                                           QEMUIOVector *qiov,
> +                                           int nb_sectors,
> +                                           BlockDriverCompletionFunc *cb,
> +                                           void *opaque, int write)
> +{
> +    RBDAIOCB *acb;
> +    RADOSCB *rcb;
> +    rados_completion_t c;
> +    char n[RBD_MAX_SEG_NAME_SIZE];
> +    int64_t segnr, segoffs, segsize, last_segnr;
> +    int64_t off, size;
> +    char *buf;
> +
> +    BDRVRBDState *s = bs->opaque;
> +
> +    acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque);
> +    acb->write = write;
> +    acb->qiov = qiov;
> +    acb->bounce = qemu_blockalign(bs, qiov->size);
> +    acb->aiocnt = 0;
> +    acb->ret = 0;
> +    acb->error = 0;
> +
> +    if (!acb->bh) {
> +        acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
> +    }
> +
> +    if (write) {
> +        qemu_iovec_to_buffer(acb->qiov, acb->bounce);
> +    }
> +
> +    buf = acb->bounce;
> +
> +    off = sector_num * BDRV_SECTOR_SIZE;
> +    size = nb_sectors * BDRV_SECTOR_SIZE;
> +    segnr = off / s->objsize;
> +    segoffs = off % s->objsize;
> +    segsize = s->objsize - segoffs;
> +
> +    last_segnr = ((off + size - 1) / s->objsize);
> +    acb->aiocnt = (last_segnr - segnr) + 1;
> +
> +    while (size > 0) {
> +        if (size < segsize) {
> +            segsize = size;
> +        }
> +
> +        snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s.%012llx", s->name,
> +                 (long long unsigned int)segnr);
> +
> +        rcb = qemu_malloc(sizeof(RADOSCB));
> +        rcb->done = 0;
> +        rcb->acb = acb;
> +        rcb->segsize = segsize;
> +        rcb->buf = buf;
> +
> +        if (write) {
> +            rados_aio_create_completion(rcb, NULL,
> +                                        (rados_callback_t) rbd_finish_aiocb,
> +                                        &c);
> +            rados_aio_write(s->pool, n, segoffs, buf, segsize, c);
> +        } else {
> +            rados_aio_create_completion(rcb,
> +                                        (rados_callback_t) rbd_finish_aiocb,
> +                                        NULL, &c);
> +            rados_aio_read(s->pool, n, segoffs, buf, segsize, c);
> +        }
> +
> +        buf += segsize;
> +        size -= segsize;
> +        segoffs = 0;
> +        segsize = s->objsize;
> +        segnr++;
> +    }
> +
> +    return &acb->common;
> +}
> +
> +static BlockDriverAIOCB *rbd_aio_readv(BlockDriverState * bs,
> +                                       int64_t sector_num, QEMUIOVector * qiov,
> +                                       int nb_sectors,
> +                                       BlockDriverCompletionFunc * cb,
> +                                       void *opaque)
> +{
> +    return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
> +}
> +
> +static BlockDriverAIOCB *rbd_aio_writev(BlockDriverState * bs,
> +                                        int64_t sector_num, QEMUIOVector * qiov,
> +                                        int nb_sectors,
> +                                        BlockDriverCompletionFunc * cb,
> +                                        void *opaque)
> +{
> +    return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
> +}
> +
> +static int rbd_getinfo(BlockDriverState * bs, BlockDriverInfo * bdi)
> +{
> +    BDRVRBDState *s = bs->opaque;
> +    bdi->cluster_size = s->objsize;
> +    return 0;
> +}
> +
> +static int64_t rbd_getlength(BlockDriverState * bs)
> +{
> +    BDRVRBDState *s = bs->opaque;
> +
> +    return s->size;
> +}
> +
> +static QEMUOptionParameter rbd_create_options[] = {
> +    {
> +     .name = BLOCK_OPT_SIZE,
> +     .type = OPT_SIZE,
> +     .help = "Virtual disk size"
> +    },
> +    {
> +     .name = BLOCK_OPT_CLUSTER_SIZE,
> +     .type = OPT_SIZE,
> +     .help = "RBD object size"
> +    },
> +    {NULL}
> +};
> +
> +static BlockDriver bdrv_rbd = {
> +    .format_name        = "rbd",
> +    .instance_size      = sizeof(BDRVRBDState),
> +    .bdrv_file_open     = rbd_open,
> +    .bdrv_read          = rbd_read,
> +    .bdrv_write         = rbd_write,
> +    .bdrv_close         = rbd_close,
> +    .bdrv_create        = rbd_create,
> +    .bdrv_get_info      = rbd_getinfo,
> +    .create_options     = rbd_create_options,
> +    .bdrv_getlength     = rbd_getlength,
> +    .protocol_name      = "rbd",
> +
> +    .bdrv_aio_readv     = rbd_aio_readv,
> +    .bdrv_aio_writev    = rbd_aio_writev,
> +};
> +
> +static void bdrv_rbd_init(void)
> +{
> +    bdrv_register(&bdrv_rbd);
> +}
> +
> +block_init(bdrv_rbd_init);
> diff --git a/block/rbd_types.h b/block/rbd_types.h
> new file mode 100644
> index 0000000..91ac4f9
> --- /dev/null
> +++ b/block/rbd_types.h
> @@ -0,0 +1,64 @@
> +/*
> + * Ceph - scalable distributed file system
> + *
> + * Copyright (C) 2004-2010 Sage Weil <sage@newdream.net>
> + *
> + * This is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License version 2.1, as published by the Free Software
> + * Foundation.  See file COPYING.
> + *
> + */
> +
> +#ifndef QEMU_BLOCK_RBD_TYPES_H
> +#define QEMU_BLOCK_RBD_TYPES_H
> +
> +
> +/*
> + * rbd image 'foo' consists of objects
> + *   foo.rbd      - image metadata
> + *   foo.00000000
> + *   foo.00000001
> + *   ...          - data
> + */
> +
> +#define RBD_SUFFIX              ".rbd"
> +#define RBD_DIRECTORY           "rbd_directory"
> +
> +#define RBD_DEFAULT_OBJ_ORDER   22   /* 4MB */
> +
> +#define RBD_MAX_OBJ_NAME_SIZE   96
> +#define RBD_MAX_SEG_NAME_SIZE   128
> +
> +#define RBD_COMP_NONE           0
> +#define RBD_CRYPT_NONE          0
> +
> +#define RBD_HEADER_TEXT         "<<< Rados Block Device Image >>>\n"
> +#define RBD_HEADER_SIGNATURE    "RBD"
> +#define RBD_HEADER_VERSION      "001.004"
> +
> +struct rbd_obj_snap_ondisk {
> +    uint64_t id;
> +    uint64_t image_size;
> +} __attribute__((packed));
> +
> +struct rbd_obj_header_ondisk {
> +    char text[64];
> +    char signature[4];
> +    char version[8];
> +    struct {
> +        uint8_t order;
> +        uint8_t crypt_type;
> +        uint8_t comp_type;
> +        uint8_t unused;
> +    } __attribute__((packed)) options;
> +    uint64_t image_size;
> +    uint64_t snap_seq;
> +    uint32_t snap_count;
> +    uint32_t reserved;
> +    uint64_t snap_names_len;
> +    struct rbd_obj_snap_ondisk snaps[0];
> +} __attribute__((packed));
> +
> +
> +#endif
> diff --git a/configure b/configure
> index 3cd2c5f..3f5c8ce 100755
> --- a/configure
> +++ b/configure
> @@ -299,6 +299,7 @@ pkgversion=""
>  check_utests="no"
>  user_pie="no"
>  zero_malloc=""
> +rbd=""
>  
>  # OS specific
>  if check_define __linux__ ; then
> @@ -660,6 +661,10 @@ for opt do
>    ;;
>    --enable-vhost-net) vhost_net="yes"
>    ;;
> +  --disable-rbd) rbd="no"
> +  ;;
> +  --enable-rbd) rbd="yes"
> +  ;;
>    *) echo "ERROR: unknown option $opt"; show_help="yes"
>    ;;
>    esac
> @@ -826,6 +831,7 @@ echo "  --enable-docs            enable documentation build"
>  echo "  --disable-docs           disable documentation build"
>  echo "  --disable-vhost-net      disable vhost-net acceleration support"
>  echo "  --enable-vhost-net       enable vhost-net acceleration support"
> +echo "  --enable-rbd		 enable building the rados block device (rbd)"
>  echo ""
>  echo "NOTE: The object files are built at the place where configure is launched"
>  exit 1
> @@ -1579,6 +1585,27 @@ if test "$mingw32" != yes -a "$pthread" = no; then
>  fi
>  
>  ##########################################
> +# rbd probe
> +if test "$rbd" != "no" ; then
> +  cat > $TMPC <<EOF
> +#include <stdio.h>
> +#include <rados/librados.h>
> +int main(void) { rados_initialize(0, NULL); return 0; }
> +EOF
> +  rbd_libs="-lrados -lcrypto"
> +  if compile_prog "" "$rbd_libs" ; then
> +    rbd=yes
> +    libs_tools="$rbd_libs $libs_tools"
> +    libs_softmmu="$rbd_libs $libs_softmmu"
> +  else
> +    if test "$rbd" = "yes" ; then
> +      feature_not_found "rados block device"
> +    fi
> +    rbd=no
> +  fi
> +fi
> +
> +##########################################
>  # linux-aio probe
>  
>  if test "$linux_aio" != "no" ; then
> @@ -2041,6 +2068,7 @@ echo "preadv support    $preadv"
>  echo "fdatasync         $fdatasync"
>  echo "uuid support      $uuid"
>  echo "vhost-net support $vhost_net"
> +echo "rbd support       $rbd"
>  
>  if test $sdl_too_old = "yes"; then
>  echo "-> Your SDL version is too old - please upgrade to have SDL support"
> @@ -2270,6 +2298,9 @@ echo "CONFIG_UNAME_RELEASE=\"$uname_release\"" >> $config_host_mak
>  if test "$zero_malloc" = "yes" ; then
>    echo "CONFIG_ZERO_MALLOC=y" >> $config_host_mak
>  fi
> +if test "$rbd" = "yes" ; then
> +  echo "CONFIG_RBD=y" >> $config_host_mak
> +fi
>  
>  # USB host support
>  case "$usb" in
>   


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [Qemu-devel] Re: [PATCH] ceph/rbd block driver for qemu-kvm (v3)
@ 2010-06-11 19:51   ` Simone Gotti
  0 siblings, 0 replies; 19+ messages in thread
From: Simone Gotti @ 2010-06-11 19:51 UTC (permalink / raw)
  To: Christian Brunner; +Cc: Kevin Wolf, ceph-devel, qemu-devel, kvm

Hi Christian,

thanks for you patch. I tried it a little and it worked quite well but
during some live migration tests I noticed a problem.


The problem is related to live migration with high I/O using the AIO
calls (I triggered it with a simple "dd").

If you launch a live migration and the guest is stopped and started on
the new qemu process while some AIO was in flight the guest on the new
qemu will wait undefinitely for data this will never come. With ata
emulation an ata reset is sent after some seconds but with virtio this
won't happen.

I'm not a qemu expert but from what I understand qemu in
savevm.c:do_savevm calls qemu_aio_flush to wait that all the asyncronous
aio returned (the callback si called). But the rbd block driver doesn't
use the qemu aio model but the rados one so that function will never
know of the rados aio.

So a solution will be to glue the block driver with the qemu aio model.
I tried to do this to test if this will work in the attached patch. I
only tested with one rbd block device but the live migration tests
worked (in the patch I removed all the debug prints I adedd to see if
all AIO requets really returned.

This is an RFC just to know what you think about this possible solution.
As qemu's aio model is event based and it needs a file descriptor for
event communication i used eventfd to do this.
Let me know if you need a detailed description of the patch!


I've also got a question: as librados is multithreaded the callbacks are
called in another thread. Is there the need to protect some critical
sections with a lock (for example in rbd_aio_rw_vector and in
rbd_finish_aiocb)?


Thanks!

Bye!


From: Simone Gotti <simone.gotti@gmail.com>
Date: Fri, 11 Jun 2010 21:19:39 +0200
Subject: [PATCH] block/rbd: Added glue to qemu aio model to fix live
migration with outstanding aio

Signed-off-by: Simone Gotti <simone.gotti@gmail.com>


---
 block/rbd.c |   63
+++++++++++++++++++++++++++++++++++++++++++++++++++++-----
 1 files changed, 57 insertions(+), 6 deletions(-)

diff --git a/block/rbd.c b/block/rbd.c
index 4d22069..83b7898 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -25,6 +25,8 @@
 
 #include <signal.h>
 
+#include <sys/eventfd.h>
+
 /*
  * When specifying the image filename use:
  *
@@ -47,6 +49,15 @@
 
 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
 
+typedef struct BDRVRBDState {
+    int efd;
+    rados_pool_t pool;
+    char name[RBD_MAX_OBJ_NAME_SIZE];
+    uint64_t size;
+    uint64_t objsize;
+    int qemu_aio_count;
+} BDRVRBDState;
+
 typedef struct RBDAIOCB {
     BlockDriverAIOCB common;
     QEMUBH *bh;
@@ -57,6 +68,7 @@ typedef struct RBDAIOCB {
     int64_t sector_num;
     int aiocnt;
     int error;
+    BDRVRBDState *s;
 } RBDAIOCB;
 
 typedef struct RADOSCB {
@@ -67,12 +79,6 @@ typedef struct RADOSCB {
     char *buf;
 } RADOSCB;
 
-typedef struct BDRVRBDState {
-    rados_pool_t pool;
-    char name[RBD_MAX_OBJ_NAME_SIZE];
-    uint64_t size;
-    uint64_t objsize;
-} BDRVRBDState;
 
 typedef struct rbd_obj_header_ondisk RbdHeader1;
 
@@ -255,6 +261,31 @@ done:
     return ret;
 }
 
+static void rbd_aio_completion_cb(void *opaque)
+{
+    BDRVRBDState *s = opaque;
+
+    uint64_t val;
+    ssize_t ret;
+
+    do {
+        if ((ret = read(s->efd, &val, sizeof(val))) > 0) {
+            s->qemu_aio_count -= val;
+       }
+    } while (ret == -1 && errno == EINTR);
+
+    return;
+}
+
+static int rbd_aio_flush_cb(void *opaque)
+{
+    BDRVRBDState *s = opaque;
+
+    return (s->qemu_aio_count > 0) ? 1 : 0;
+}
+
+
+
 static int rbd_open(BlockDriverState *bs, const char *filename, int flags)
 {
     BDRVRBDState *s = bs->opaque;
@@ -303,6 +334,15 @@ static int rbd_open(BlockDriverState *bs, const
char *filename, int flags)
     s->size = header->image_size;
     s->objsize = 1 << header->options.order;
 
+    s->efd = eventfd(0, 0);
+    if (s->efd == -1) {
+        error_report("error opening eventfd");
+        goto failed;
+    }
+    fcntl(s->efd, F_SETFL, O_NONBLOCK);
+    qemu_aio_set_fd_handler(s->efd, rbd_aio_completion_cb, NULL,
+        rbd_aio_flush_cb, NULL, s);
+
     return 0;
 
 failed:
@@ -393,6 +433,7 @@ static AIOPool rbd_aio_pool = {
 };
 
 /* This is the callback function for rados_aio_read and _write */
+
 static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
 {
     RBDAIOCB *acb = rcb->acb;
@@ -427,6 +468,8 @@ static void rbd_finish_aiocb(rados_completion_t c,
RADOSCB *rcb)
             acb->ret += r;
         }
     }
+    uint64_t buf = 1;
+    write(acb->s->efd, &buf, sizeof(buf));
     qemu_free(rcb);
     i = 0;
     if (!acb->aiocnt && acb->bh) {
@@ -435,6 +478,7 @@ static void rbd_finish_aiocb(rados_completion_t c,
RADOSCB *rcb)
 }
 
 /* Callback when all queued rados_aio requests are complete */
+
 static void rbd_aio_bh_cb(void *opaque)
 {
     RBDAIOCB *acb = opaque;
@@ -446,6 +490,10 @@ static void rbd_aio_bh_cb(void *opaque)
     acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
     qemu_bh_delete(acb->bh);
     acb->bh = NULL;
+
+    uint64_t buf = 1;
+    write(acb->s->efd, &buf, sizeof(buf));
+
     qemu_aio_release(acb);
 }
 
@@ -473,6 +521,7 @@ static BlockDriverAIOCB
*rbd_aio_rw_vector(BlockDriverState *bs,
     acb->aiocnt = 0;
     acb->ret = 0;
     acb->error = 0;
+    acb->s = s;
 
     if (!acb->bh) {
         acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
@@ -493,6 +542,8 @@ static BlockDriverAIOCB
*rbd_aio_rw_vector(BlockDriverState *bs,
     last_segnr = ((off + size - 1) / s->objsize);
     acb->aiocnt = (last_segnr - segnr) + 1;
 
+    s->qemu_aio_count+=acb->aiocnt + 1; /* All the RADOSCB and the
related RBDAIOCB */
+
     while (size > 0) {
         if (size < segsize) {
             segsize = size;
-- 
1.7.0.1




 

On 05/31/2010 09:31 PM, Christian Brunner wrote:
> Hi Kevin,
>
> here is an updated patch for the ceph/rbd driver. I hope that everything 
> is fine now.
>
> Regards,
> Christian
>
>
> This is a block driver for the distributed file system Ceph
> (http://ceph.newdream.net/). This driver uses librados (which
> is part of the Ceph server) for direct access to the Ceph object
> store and is running entirely in userspace. Therefore it is
> called "rbd" - rados block device.
>
> To compile the driver a recent version of ceph (unstable/testing git
> head or 0.20.3 once it is released) is needed.
>
> Additional information is available on the Ceph-Wiki:
>
> http://ceph.newdream.net/wiki/Kvm-rbd
>
> The patch is based on git://repo.or.cz/qemu/kevin.git block
>
>
> Signed-off-by: Christian Brunner <chb@muc.de>
> ---
>  Makefile.objs     |    1 +
>  block/rbd.c       |  600 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>  block/rbd_types.h |   64 ++++++
>  configure         |   31 +++
>  4 files changed, 696 insertions(+), 0 deletions(-)
>  create mode 100644 block/rbd.c
>  create mode 100644 block/rbd_types.h
>
> diff --git a/Makefile.objs b/Makefile.objs
> index 1a942e5..08dc11f 100644
> --- a/Makefile.objs
> +++ b/Makefile.objs
> @@ -18,6 +18,7 @@ block-nested-y += parallels.o nbd.o blkdebug.o
>  block-nested-$(CONFIG_WIN32) += raw-win32.o
>  block-nested-$(CONFIG_POSIX) += raw-posix.o
>  block-nested-$(CONFIG_CURL) += curl.o
> +block-nested-$(CONFIG_RBD) += rbd.o
>  
>  block-obj-y +=  $(addprefix block/, $(block-nested-y))
>  
> diff --git a/block/rbd.c b/block/rbd.c
> new file mode 100644
> index 0000000..4a60dda
> --- /dev/null
> +++ b/block/rbd.c
> @@ -0,0 +1,600 @@
> +/*
> + * QEMU Block driver for RADOS (Ceph)
> + *
> + * Copyright (C) 2010 Christian Brunner <chb@muc.de>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2.  See
> + * the COPYING file in the top-level directory.
> + *
> + */
> +
> +#include "qemu-common.h"
> +#include "qemu-error.h"
> +#include <sys/types.h>
> +#include <stdbool.h>
> +
> +#include <qemu-common.h>
> +
> +#include "rbd_types.h"
> +#include "module.h"
> +#include "block_int.h"
> +
> +#include <stdio.h>
> +#include <stdlib.h>
> +#include <rados/librados.h>
> +
> +#include <signal.h>
> +
> +/*
> + * When specifying the image filename use:
> + *
> + * rbd:poolname/devicename
> + *
> + * poolname must be the name of an existing rados pool
> + *
> + * devicename is the basename for all objects used to
> + * emulate the raw device.
> + *
> + * Metadata information (image size, ...) is stored in an
> + * object with the name "devicename.rbd".
> + *
> + * The raw device is split into 4MB sized objects by default.
> + * The sequencenumber is encoded in a 12 byte long hex-string,
> + * and is attached to the devicename, separated by a dot.
> + * e.g. "devicename.1234567890ab"
> + *
> + */
> +
> +#define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
> +
> +typedef struct RBDAIOCB {
> +    BlockDriverAIOCB common;
> +    QEMUBH *bh;
> +    int ret;
> +    QEMUIOVector *qiov;
> +    char *bounce;
> +    int write;
> +    int64_t sector_num;
> +    int aiocnt;
> +    int error;
> +} RBDAIOCB;
> +
> +typedef struct RADOSCB {
> +    int rcbid;
> +    RBDAIOCB *acb;
> +    int done;
> +    int64_t segsize;
> +    char *buf;
> +} RADOSCB;
> +
> +typedef struct BDRVRBDState {
> +    rados_pool_t pool;
> +    char name[RBD_MAX_OBJ_NAME_SIZE];
> +    uint64_t size;
> +    uint64_t objsize;
> +} BDRVRBDState;
> +
> +typedef struct rbd_obj_header_ondisk RbdHeader1;
> +
> +static int rbd_parsename(const char *filename, char *pool, char *name)
> +{
> +    const char *rbdname;
> +    char *p;
> +    int l;
> +
> +    if (!strstart(filename, "rbd:", &rbdname)) {
> +        return -EINVAL;
> +    }
> +
> +    pstrcpy(pool, 2 * RBD_MAX_SEG_NAME_SIZE, rbdname);
> +    p = strchr(pool, '/');
> +    if (p == NULL) {
> +        return -EINVAL;
> +    }
> +
> +    *p = '\0';
> +
> +    l = strlen(pool);
> +    if(l >= RBD_MAX_SEG_NAME_SIZE) {
> +        error_report("pool name to long");
> +        return -EINVAL;
> +    } else if (l <= 0) {
> +        error_report("pool name to short");
> +        return -EINVAL;
> +    }
> +
> +    l = strlen(++p);
> +    if (l >= RBD_MAX_OBJ_NAME_SIZE) {
> +        error_report("object name to long");
> +        return -EINVAL;
> +    } else if (l <= 0) {
> +        error_report("object name to short");
> +        return -EINVAL;
> +    }
> +
> +    strcpy(name, p);
> +
> +    return l;
> +}
> +
> +static int create_tmap_op(uint8_t op, const char *name, char **tmap_desc)
> +{
> +    uint32_t len = strlen(name);
> +    /* total_len = encoding op + name + empty buffer */
> +    uint32_t total_len = 1 + (sizeof(uint32_t) + len) + sizeof(uint32_t);
> +    char *desc = NULL;
> +
> +    qemu_malloc(total_len);
> +
> +    *tmap_desc = desc;
> +
> +    *desc = op;
> +    desc++;
> +    memcpy(desc, &len, sizeof(len));
> +    desc += sizeof(len);
> +    memcpy(desc, name, len);
> +    desc += len;
> +    len = 0;
> +    memcpy(desc, &len, sizeof(len));
> +    desc += sizeof(len);
> +
> +    return desc - *tmap_desc;
> +}
> +
> +static void free_tmap_op(char *tmap_desc)
> +{
> +    qemu_free(tmap_desc);
> +}
> +
> +static int rbd_register_image(rados_pool_t pool, const char *name)
> +{
> +    char *tmap_desc;
> +    const char *dir = RBD_DIRECTORY;
> +    int ret;
> +
> +    ret = create_tmap_op(CEPH_OSD_TMAP_SET, name, &tmap_desc);
> +    if (ret < 0) {
> +        return ret;
> +    }
> +
> +    ret = rados_tmap_update(pool, dir, tmap_desc, ret);
> +    free_tmap_op(tmap_desc);
> +
> +    return ret;
> +}
> +
> +static int rbd_create(const char *filename, QEMUOptionParameter *options)
> +{
> +    int64_t bytes = 0;
> +    int64_t objsize;
> +    uint64_t size;
> +    time_t mtime;
> +    uint8_t obj_order = RBD_DEFAULT_OBJ_ORDER;
> +    char pool[RBD_MAX_SEG_NAME_SIZE];
> +    char n[RBD_MAX_SEG_NAME_SIZE];
> +    char name[RBD_MAX_SEG_NAME_SIZE];
> +    RbdHeader1 header;
> +    rados_pool_t p;
> +    int ret;
> +
> +    if (rbd_parsename(filename, pool, name) < 0) {
> +        return -EINVAL;
> +    }
> +
> +    snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s%s", name, RBD_SUFFIX);
> +
> +    /* Read out options */
> +    while (options && options->name) {
> +        if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
> +            bytes = options->value.n;
> +        } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) {
> +            if (options->value.n) {
> +                objsize = options->value.n;
> +                if ((objsize - 1) & objsize) {    /* not a power of 2? */
> +                    error_report("obj size needs to be power of 2");
> +                    return -EINVAL;
> +                }
> +                if (objsize < 4096) {
> +                    error_report("obj size too small");
> +                    return -EINVAL;
> +                }
> +
> +                for (obj_order = 0; obj_order < 64; obj_order++) {
> +                    if (objsize == 1) {
> +                        break;
> +                    }
> +                    objsize >>= 1;
> +                }
> +            }
> +        }
> +        options++;
> +    }
> +
> +    memset(&header, 0, sizeof(header));
> +    pstrcpy(header.text, sizeof(header.text), RBD_HEADER_TEXT);
> +    pstrcpy(header.signature, sizeof(header.signature), RBD_HEADER_SIGNATURE);
> +    pstrcpy(header.version, sizeof(header.version), RBD_HEADER_VERSION);
> +    header.image_size = bytes;
> +    cpu_to_le64s((uint64_t *) & header.image_size);
> +    header.options.order = obj_order;
> +    header.options.crypt_type = RBD_CRYPT_NONE;
> +    header.options.comp_type = RBD_COMP_NONE;
> +    header.snap_seq = 0;
> +    header.snap_count = 0;
> +    cpu_to_le32s(&header.snap_count);
> +
> +    if (rados_initialize(0, NULL) < 0) {
> +        error_report("error initializing");
> +        return -EIO;
> +    }
> +
> +    if (rados_open_pool(pool, &p)) {
> +        error_report("error opening pool %s", pool);
> +        rados_deinitialize();
> +        return -EIO;
> +    }
> +
> +    /* check for existing rbd header file */
> +    ret = rados_stat(p, n, &size, &mtime);
> +    if (ret == 0) {
> +        ret=-EEXIST;
> +        goto done;
> +    }
> +
> +    /* create header file */
> +    ret = rados_write(p, n, 0, (const char *)&header, sizeof(header));
> +    if (ret < 0) {
> +        goto done;
> +    }
> +
> +    ret = rbd_register_image(p, name);
> +done:
> +    rados_close_pool(p);
> +    rados_deinitialize();
> +
> +    return ret;
> +}
> +
> +static int rbd_open(BlockDriverState *bs, const char *filename, int flags)
> +{
> +    BDRVRBDState *s = bs->opaque;
> +    char pool[RBD_MAX_SEG_NAME_SIZE];
> +    char n[RBD_MAX_SEG_NAME_SIZE];
> +    char hbuf[4096];
> +    int r;
> +
> +    if (rbd_parsename(filename, pool, s->name) < 0) {
> +        return -EINVAL;
> +    }
> +    snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s%s", s->name, RBD_SUFFIX);
> +
> +    if ((r = rados_initialize(0, NULL)) < 0) {
> +        error_report("error initializing");
> +        return r;
> +    }
> +
> +    if ((r = rados_open_pool(pool, &s->pool))) {
> +        error_report("error opening pool %s", pool);
> +        rados_deinitialize();
> +        return r;
> +    }
> +
> +    if ((r = rados_read(s->pool, n, 0, hbuf, 4096)) < 0) {
> +        error_report("error reading header from %s", s->name);
> +        goto failed;
> +    }
> +
> +    if (strncmp(hbuf + 64, RBD_HEADER_SIGNATURE, 4)) {
> +        error_report("Invalid header signature %s", hbuf + 64);
> +        r = -EMEDIUMTYPE;
> +        goto failed;
> +    }
> +
> +    if (strncmp(hbuf + 68, RBD_HEADER_VERSION, 8)) {
> +        error_report("Unknown image version %s", hbuf + 68);
> +        r = -EMEDIUMTYPE;
> +        goto failed;
> +    }
> +
> +    RbdHeader1 *header;
> +
> +    header = (RbdHeader1 *) hbuf;
> +    le64_to_cpus((uint64_t *) & header->image_size);
> +    s->size = header->image_size;
> +    s->objsize = 1 << header->options.order;
> +
> +    return 0;
> +
> +failed:
> +    rados_close_pool(s->pool);
> +    rados_deinitialize();
> +    return r;
> +}
> +
> +static void rbd_close(BlockDriverState *bs)
> +{
> +    BDRVRBDState *s = bs->opaque;
> +
> +    rados_close_pool(s->pool);
> +    rados_deinitialize();
> +}
> +
> +static int rbd_rw(BlockDriverState *bs, int64_t sector_num,
> +                  uint8_t *buf, int nb_sectors, int write)
> +{
> +    BDRVRBDState *s = bs->opaque;
> +    char n[RBD_MAX_SEG_NAME_SIZE];
> +
> +    int64_t segnr, segoffs, segsize, r;
> +    int64_t off, size;
> +
> +    off = sector_num * BDRV_SECTOR_SIZE;
> +    size = nb_sectors * BDRV_SECTOR_SIZE;
> +    segnr = off / s->objsize;
> +    segoffs = off % s->objsize;
> +    segsize = s->objsize - segoffs;
> +
> +    while (size > 0) {
> +        if (size < segsize) {
> +            segsize = size;
> +        }
> +
> +        snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s.%012" PRIx64, s->name, segnr);
> +
> +        if (write) {
> +            if ((r = rados_write(s->pool, n, segoffs, (const char *)buf,
> +                segsize)) < 0) {
> +                return r;
> +            }
> +        } else {
> +            r = rados_read(s->pool, n, segoffs, (char *)buf, segsize);
> +            if (r == -ENOENT) {
> +                memset(buf, 0, segsize);
> +            } else if (r < 0) {
> +                return r;
> +            } else if (r < segsize) {
> +                memset(buf + r, 0, segsize - r);
> +            }
> +        }
> +
> +        buf += segsize;
> +        size -= segsize;
> +        segoffs = 0;
> +        segsize = s->objsize;
> +        segnr++;
> +    }
> +
> +    return 0;
> +}
> +
> +static int rbd_read(BlockDriverState *bs, int64_t sector_num,
> +                    uint8_t *buf, int nb_sectors)
> +{
> +    return rbd_rw(bs, sector_num, buf, nb_sectors, 0);
> +}
> +
> +static int rbd_write(BlockDriverState *bs, int64_t sector_num,
> +                     const uint8_t *buf, int nb_sectors)
> +{
> +    return rbd_rw(bs, sector_num, (uint8_t *) buf, nb_sectors, 1);
> +}
> +
> +static void rbd_aio_cancel(BlockDriverAIOCB *blockacb)
> +{
> +    RBDAIOCB *acb = (RBDAIOCB *) blockacb;
> +    qemu_bh_delete(acb->bh);
> +    acb->bh = NULL;
> +    qemu_aio_release(acb);
> +}
> +
> +static AIOPool rbd_aio_pool = {
> +    .aiocb_size = sizeof(RBDAIOCB),
> +    .cancel = rbd_aio_cancel,
> +};
> +
> +/* This is the callback function for rados_aio_read and _write */
> +static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
> +{
> +    RBDAIOCB *acb = rcb->acb;
> +    int64_t r;
> +    int i;
> +
> +    acb->aiocnt--;
> +    r = rados_aio_get_return_value(c);
> +    rados_aio_release(c);
> +    if (acb->write) {
> +        if (r < 0) {
> +            acb->ret = r;
> +            acb->error = 1;
> +        } else if (!acb->error) {
> +            acb->ret += rcb->segsize;
> +        }
> +    } else {
> +        if (r == -ENOENT) {
> +            memset(rcb->buf, 0, rcb->segsize);
> +            if (!acb->error) {
> +                acb->ret += rcb->segsize;
> +            }
> +        } else if (r < 0) {
> +            acb->ret = r;
> +            acb->error = 1;
> +        } else if (r < rcb->segsize) {
> +            memset(rcb->buf + r, 0, rcb->segsize - r);
> +            if (!acb->error) {
> +                acb->ret += rcb->segsize;
> +            }
> +        } else if (!acb->error) {
> +            acb->ret += r;
> +        }
> +    }
> +    qemu_free(rcb);
> +    i = 0;
> +    if (!acb->aiocnt && acb->bh) {
> +        qemu_bh_schedule(acb->bh);
> +    }
> +}
> +
> +/* Callback when all queued rados_aio requests are complete */
> +static void rbd_aio_bh_cb(void *opaque)
> +{
> +    RBDAIOCB *acb = opaque;
> +
> +    if (!acb->write) {
> +        qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
> +    }
> +    qemu_vfree(acb->bounce);
> +    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
> +    qemu_bh_delete(acb->bh);
> +    acb->bh = NULL;
> +    qemu_aio_release(acb);
> +}
> +
> +static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
> +                                           int64_t sector_num,
> +                                           QEMUIOVector *qiov,
> +                                           int nb_sectors,
> +                                           BlockDriverCompletionFunc *cb,
> +                                           void *opaque, int write)
> +{
> +    RBDAIOCB *acb;
> +    RADOSCB *rcb;
> +    rados_completion_t c;
> +    char n[RBD_MAX_SEG_NAME_SIZE];
> +    int64_t segnr, segoffs, segsize, last_segnr;
> +    int64_t off, size;
> +    char *buf;
> +
> +    BDRVRBDState *s = bs->opaque;
> +
> +    acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque);
> +    acb->write = write;
> +    acb->qiov = qiov;
> +    acb->bounce = qemu_blockalign(bs, qiov->size);
> +    acb->aiocnt = 0;
> +    acb->ret = 0;
> +    acb->error = 0;
> +
> +    if (!acb->bh) {
> +        acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
> +    }
> +
> +    if (write) {
> +        qemu_iovec_to_buffer(acb->qiov, acb->bounce);
> +    }
> +
> +    buf = acb->bounce;
> +
> +    off = sector_num * BDRV_SECTOR_SIZE;
> +    size = nb_sectors * BDRV_SECTOR_SIZE;
> +    segnr = off / s->objsize;
> +    segoffs = off % s->objsize;
> +    segsize = s->objsize - segoffs;
> +
> +    last_segnr = ((off + size - 1) / s->objsize);
> +    acb->aiocnt = (last_segnr - segnr) + 1;
> +
> +    while (size > 0) {
> +        if (size < segsize) {
> +            segsize = size;
> +        }
> +
> +        snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s.%012llx", s->name,
> +                 (long long unsigned int)segnr);
> +
> +        rcb = qemu_malloc(sizeof(RADOSCB));
> +        rcb->done = 0;
> +        rcb->acb = acb;
> +        rcb->segsize = segsize;
> +        rcb->buf = buf;
> +
> +        if (write) {
> +            rados_aio_create_completion(rcb, NULL,
> +                                        (rados_callback_t) rbd_finish_aiocb,
> +                                        &c);
> +            rados_aio_write(s->pool, n, segoffs, buf, segsize, c);
> +        } else {
> +            rados_aio_create_completion(rcb,
> +                                        (rados_callback_t) rbd_finish_aiocb,
> +                                        NULL, &c);
> +            rados_aio_read(s->pool, n, segoffs, buf, segsize, c);
> +        }
> +
> +        buf += segsize;
> +        size -= segsize;
> +        segoffs = 0;
> +        segsize = s->objsize;
> +        segnr++;
> +    }
> +
> +    return &acb->common;
> +}
> +
> +static BlockDriverAIOCB *rbd_aio_readv(BlockDriverState * bs,
> +                                       int64_t sector_num, QEMUIOVector * qiov,
> +                                       int nb_sectors,
> +                                       BlockDriverCompletionFunc * cb,
> +                                       void *opaque)
> +{
> +    return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
> +}
> +
> +static BlockDriverAIOCB *rbd_aio_writev(BlockDriverState * bs,
> +                                        int64_t sector_num, QEMUIOVector * qiov,
> +                                        int nb_sectors,
> +                                        BlockDriverCompletionFunc * cb,
> +                                        void *opaque)
> +{
> +    return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
> +}
> +
> +static int rbd_getinfo(BlockDriverState * bs, BlockDriverInfo * bdi)
> +{
> +    BDRVRBDState *s = bs->opaque;
> +    bdi->cluster_size = s->objsize;
> +    return 0;
> +}
> +
> +static int64_t rbd_getlength(BlockDriverState * bs)
> +{
> +    BDRVRBDState *s = bs->opaque;
> +
> +    return s->size;
> +}
> +
> +static QEMUOptionParameter rbd_create_options[] = {
> +    {
> +     .name = BLOCK_OPT_SIZE,
> +     .type = OPT_SIZE,
> +     .help = "Virtual disk size"
> +    },
> +    {
> +     .name = BLOCK_OPT_CLUSTER_SIZE,
> +     .type = OPT_SIZE,
> +     .help = "RBD object size"
> +    },
> +    {NULL}
> +};
> +
> +static BlockDriver bdrv_rbd = {
> +    .format_name        = "rbd",
> +    .instance_size      = sizeof(BDRVRBDState),
> +    .bdrv_file_open     = rbd_open,
> +    .bdrv_read          = rbd_read,
> +    .bdrv_write         = rbd_write,
> +    .bdrv_close         = rbd_close,
> +    .bdrv_create        = rbd_create,
> +    .bdrv_get_info      = rbd_getinfo,
> +    .create_options     = rbd_create_options,
> +    .bdrv_getlength     = rbd_getlength,
> +    .protocol_name      = "rbd",
> +
> +    .bdrv_aio_readv     = rbd_aio_readv,
> +    .bdrv_aio_writev    = rbd_aio_writev,
> +};
> +
> +static void bdrv_rbd_init(void)
> +{
> +    bdrv_register(&bdrv_rbd);
> +}
> +
> +block_init(bdrv_rbd_init);
> diff --git a/block/rbd_types.h b/block/rbd_types.h
> new file mode 100644
> index 0000000..91ac4f9
> --- /dev/null
> +++ b/block/rbd_types.h
> @@ -0,0 +1,64 @@
> +/*
> + * Ceph - scalable distributed file system
> + *
> + * Copyright (C) 2004-2010 Sage Weil <sage@newdream.net>
> + *
> + * This is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License version 2.1, as published by the Free Software
> + * Foundation.  See file COPYING.
> + *
> + */
> +
> +#ifndef QEMU_BLOCK_RBD_TYPES_H
> +#define QEMU_BLOCK_RBD_TYPES_H
> +
> +
> +/*
> + * rbd image 'foo' consists of objects
> + *   foo.rbd      - image metadata
> + *   foo.00000000
> + *   foo.00000001
> + *   ...          - data
> + */
> +
> +#define RBD_SUFFIX              ".rbd"
> +#define RBD_DIRECTORY           "rbd_directory"
> +
> +#define RBD_DEFAULT_OBJ_ORDER   22   /* 4MB */
> +
> +#define RBD_MAX_OBJ_NAME_SIZE   96
> +#define RBD_MAX_SEG_NAME_SIZE   128
> +
> +#define RBD_COMP_NONE           0
> +#define RBD_CRYPT_NONE          0
> +
> +#define RBD_HEADER_TEXT         "<<< Rados Block Device Image >>>\n"
> +#define RBD_HEADER_SIGNATURE    "RBD"
> +#define RBD_HEADER_VERSION      "001.004"
> +
> +struct rbd_obj_snap_ondisk {
> +    uint64_t id;
> +    uint64_t image_size;
> +} __attribute__((packed));
> +
> +struct rbd_obj_header_ondisk {
> +    char text[64];
> +    char signature[4];
> +    char version[8];
> +    struct {
> +        uint8_t order;
> +        uint8_t crypt_type;
> +        uint8_t comp_type;
> +        uint8_t unused;
> +    } __attribute__((packed)) options;
> +    uint64_t image_size;
> +    uint64_t snap_seq;
> +    uint32_t snap_count;
> +    uint32_t reserved;
> +    uint64_t snap_names_len;
> +    struct rbd_obj_snap_ondisk snaps[0];
> +} __attribute__((packed));
> +
> +
> +#endif
> diff --git a/configure b/configure
> index 3cd2c5f..3f5c8ce 100755
> --- a/configure
> +++ b/configure
> @@ -299,6 +299,7 @@ pkgversion=""
>  check_utests="no"
>  user_pie="no"
>  zero_malloc=""
> +rbd=""
>  
>  # OS specific
>  if check_define __linux__ ; then
> @@ -660,6 +661,10 @@ for opt do
>    ;;
>    --enable-vhost-net) vhost_net="yes"
>    ;;
> +  --disable-rbd) rbd="no"
> +  ;;
> +  --enable-rbd) rbd="yes"
> +  ;;
>    *) echo "ERROR: unknown option $opt"; show_help="yes"
>    ;;
>    esac
> @@ -826,6 +831,7 @@ echo "  --enable-docs            enable documentation build"
>  echo "  --disable-docs           disable documentation build"
>  echo "  --disable-vhost-net      disable vhost-net acceleration support"
>  echo "  --enable-vhost-net       enable vhost-net acceleration support"
> +echo "  --enable-rbd		 enable building the rados block device (rbd)"
>  echo ""
>  echo "NOTE: The object files are built at the place where configure is launched"
>  exit 1
> @@ -1579,6 +1585,27 @@ if test "$mingw32" != yes -a "$pthread" = no; then
>  fi
>  
>  ##########################################
> +# rbd probe
> +if test "$rbd" != "no" ; then
> +  cat > $TMPC <<EOF
> +#include <stdio.h>
> +#include <rados/librados.h>
> +int main(void) { rados_initialize(0, NULL); return 0; }
> +EOF
> +  rbd_libs="-lrados -lcrypto"
> +  if compile_prog "" "$rbd_libs" ; then
> +    rbd=yes
> +    libs_tools="$rbd_libs $libs_tools"
> +    libs_softmmu="$rbd_libs $libs_softmmu"
> +  else
> +    if test "$rbd" = "yes" ; then
> +      feature_not_found "rados block device"
> +    fi
> +    rbd=no
> +  fi
> +fi
> +
> +##########################################
>  # linux-aio probe
>  
>  if test "$linux_aio" != "no" ; then
> @@ -2041,6 +2068,7 @@ echo "preadv support    $preadv"
>  echo "fdatasync         $fdatasync"
>  echo "uuid support      $uuid"
>  echo "vhost-net support $vhost_net"
> +echo "rbd support       $rbd"
>  
>  if test $sdl_too_old = "yes"; then
>  echo "-> Your SDL version is too old - please upgrade to have SDL support"
> @@ -2270,6 +2298,9 @@ echo "CONFIG_UNAME_RELEASE=\"$uname_release\"" >> $config_host_mak
>  if test "$zero_malloc" = "yes" ; then
>    echo "CONFIG_ZERO_MALLOC=y" >> $config_host_mak
>  fi
> +if test "$rbd" = "yes" ; then
> +  echo "CONFIG_RBD=y" >> $config_host_mak
> +fi
>  
>  # USB host support
>  case "$usb" in
>   

^ permalink raw reply related	[flat|nested] 19+ messages in thread

* Re: [PATCH] ceph/rbd block driver for qemu-kvm (v3)
  2010-06-11 19:51   ` [Qemu-devel] " Simone Gotti
@ 2010-06-17 19:05     ` Christian Brunner
  -1 siblings, 0 replies; 19+ messages in thread
From: Christian Brunner @ 2010-06-17 19:05 UTC (permalink / raw)
  To: Simone Gotti; +Cc: Kevin Wolf, kvm, qemu-devel, ceph-devel

Hi Simone,

sorry for the late reply. I've been on vacation for a week.

Thanks for sending the patch. At first sight your patch looks good.
I'll do some testing by the weekend.

Kevin also sent me a note about the missing aio support, but I didn't
have the time to implement it yet. Now it seems, that I don't have to
do it, since you where quicker... :)

Regarding locking: There were some problems with the thread handling,
when I started writing the driver. But Yehuda removed the use of
SIGUSERx and Sage modified librados, so that the Ceph Thread class is
masking signals on any new thread it creates. (see
http://ceph.newdream.net/git/?p=ceph.git;a=commit;h=cf4414684dd2ca5f2a565449be4686849695f62f
and http://ceph.newdream.net/git/?p=ceph.git;a=commit;h=e4e775b60f117ba2d07da9e0e438714b409447b6).
I think that this is also sufficient for the aio callbacks.

Regards

Christian

2010/6/11 Simone Gotti <simone.gotti@gmail.com>:
> Hi Christian,
>
> thanks for you patch. I tried it a little and it worked quite well but
> during some live migration tests I noticed a problem.
>
>
> The problem is related to live migration with high I/O using the AIO
> calls (I triggered it with a simple "dd").
>
> If you launch a live migration and the guest is stopped and started on
> the new qemu process while some AIO was in flight the guest on the new
> qemu will wait undefinitely for data this will never come. With ata
> emulation an ata reset is sent after some seconds but with virtio this
> won't happen.
>
> I'm not a qemu expert but from what I understand qemu in
> savevm.c:do_savevm calls qemu_aio_flush to wait that all the asyncronous
> aio returned (the callback si called). But the rbd block driver doesn't
> use the qemu aio model but the rados one so that function will never
> know of the rados aio.
>
> So a solution will be to glue the block driver with the qemu aio model.
> I tried to do this to test if this will work in the attached patch. I
> only tested with one rbd block device but the live migration tests
> worked (in the patch I removed all the debug prints I adedd to see if
> all AIO requets really returned.
>
> This is an RFC just to know what you think about this possible solution.
> As qemu's aio model is event based and it needs a file descriptor for
> event communication i used eventfd to do this.
> Let me know if you need a detailed description of the patch!
>
>
> I've also got a question: as librados is multithreaded the callbacks are
> called in another thread. Is there the need to protect some critical
> sections with a lock (for example in rbd_aio_rw_vector and in
> rbd_finish_aiocb)?
>
>
> Thanks!
>
> Bye!
>
>
> From: Simone Gotti <simone.gotti@gmail.com>
> Date: Fri, 11 Jun 2010 21:19:39 +0200
> Subject: [PATCH] block/rbd: Added glue to qemu aio model to fix live
> migration with outstanding aio
>
> Signed-off-by: Simone Gotti <simone.gotti@gmail.com>
>
>
> ---
>  block/rbd.c |   63
> +++++++++++++++++++++++++++++++++++++++++++++++++++++-----
>  1 files changed, 57 insertions(+), 6 deletions(-)
>
> diff --git a/block/rbd.c b/block/rbd.c
> index 4d22069..83b7898 100644
> --- a/block/rbd.c
> +++ b/block/rbd.c
> @@ -25,6 +25,8 @@
>
>  #include <signal.h>
>
> +#include <sys/eventfd.h>
> +
>  /*
>  * When specifying the image filename use:
>  *
> @@ -47,6 +49,15 @@
>
>  #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
>
> +typedef struct BDRVRBDState {
> +    int efd;
> +    rados_pool_t pool;
> +    char name[RBD_MAX_OBJ_NAME_SIZE];
> +    uint64_t size;
> +    uint64_t objsize;
> +    int qemu_aio_count;
> +} BDRVRBDState;
> +
>  typedef struct RBDAIOCB {
>     BlockDriverAIOCB common;
>     QEMUBH *bh;
> @@ -57,6 +68,7 @@ typedef struct RBDAIOCB {
>     int64_t sector_num;
>     int aiocnt;
>     int error;
> +    BDRVRBDState *s;
>  } RBDAIOCB;
>
>  typedef struct RADOSCB {
> @@ -67,12 +79,6 @@ typedef struct RADOSCB {
>     char *buf;
>  } RADOSCB;
>
> -typedef struct BDRVRBDState {
> -    rados_pool_t pool;
> -    char name[RBD_MAX_OBJ_NAME_SIZE];
> -    uint64_t size;
> -    uint64_t objsize;
> -} BDRVRBDState;
>
>  typedef struct rbd_obj_header_ondisk RbdHeader1;
>
> @@ -255,6 +261,31 @@ done:
>     return ret;
>  }
>
> +static void rbd_aio_completion_cb(void *opaque)
> +{
> +    BDRVRBDState *s = opaque;
> +
> +    uint64_t val;
> +    ssize_t ret;
> +
> +    do {
> +        if ((ret = read(s->efd, &val, sizeof(val))) > 0) {
> +            s->qemu_aio_count -= val;
> +       }
> +    } while (ret == -1 && errno == EINTR);
> +
> +    return;
> +}
> +
> +static int rbd_aio_flush_cb(void *opaque)
> +{
> +    BDRVRBDState *s = opaque;
> +
> +    return (s->qemu_aio_count > 0) ? 1 : 0;
> +}
> +
> +
> +
>  static int rbd_open(BlockDriverState *bs, const char *filename, int flags)
>  {
>     BDRVRBDState *s = bs->opaque;
> @@ -303,6 +334,15 @@ static int rbd_open(BlockDriverState *bs, const
> char *filename, int flags)
>     s->size = header->image_size;
>     s->objsize = 1 << header->options.order;
>
> +    s->efd = eventfd(0, 0);
> +    if (s->efd == -1) {
> +        error_report("error opening eventfd");
> +        goto failed;
> +    }
> +    fcntl(s->efd, F_SETFL, O_NONBLOCK);
> +    qemu_aio_set_fd_handler(s->efd, rbd_aio_completion_cb, NULL,
> +        rbd_aio_flush_cb, NULL, s);
> +
>     return 0;
>
>  failed:
> @@ -393,6 +433,7 @@ static AIOPool rbd_aio_pool = {
>  };
>
>  /* This is the callback function for rados_aio_read and _write */
> +
>  static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
>  {
>     RBDAIOCB *acb = rcb->acb;
> @@ -427,6 +468,8 @@ static void rbd_finish_aiocb(rados_completion_t c,
> RADOSCB *rcb)
>             acb->ret += r;
>         }
>     }
> +    uint64_t buf = 1;
> +    write(acb->s->efd, &buf, sizeof(buf));
>     qemu_free(rcb);
>     i = 0;
>     if (!acb->aiocnt && acb->bh) {
> @@ -435,6 +478,7 @@ static void rbd_finish_aiocb(rados_completion_t c,
> RADOSCB *rcb)
>  }
>
>  /* Callback when all queued rados_aio requests are complete */
> +
>  static void rbd_aio_bh_cb(void *opaque)
>  {
>     RBDAIOCB *acb = opaque;
> @@ -446,6 +490,10 @@ static void rbd_aio_bh_cb(void *opaque)
>     acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
>     qemu_bh_delete(acb->bh);
>     acb->bh = NULL;
> +
> +    uint64_t buf = 1;
> +    write(acb->s->efd, &buf, sizeof(buf));
> +
>     qemu_aio_release(acb);
>  }
>
> @@ -473,6 +521,7 @@ static BlockDriverAIOCB
> *rbd_aio_rw_vector(BlockDriverState *bs,
>     acb->aiocnt = 0;
>     acb->ret = 0;
>     acb->error = 0;
> +    acb->s = s;
>
>     if (!acb->bh) {
>         acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
> @@ -493,6 +542,8 @@ static BlockDriverAIOCB
> *rbd_aio_rw_vector(BlockDriverState *bs,
>     last_segnr = ((off + size - 1) / s->objsize);
>     acb->aiocnt = (last_segnr - segnr) + 1;
>
> +    s->qemu_aio_count+=acb->aiocnt + 1; /* All the RADOSCB and the
> related RBDAIOCB */
> +
>     while (size > 0) {
>         if (size < segsize) {
>             segsize = size;
> --
> 1.7.0.1
>
>
>
>
>
>
> On 05/31/2010 09:31 PM, Christian Brunner wrote:
>> Hi Kevin,
>>
>> here is an updated patch for the ceph/rbd driver. I hope that everything
>> is fine now.
>>
>> Regards,
>> Christian
>>
>>
>> This is a block driver for the distributed file system Ceph
>> (http://ceph.newdream.net/). This driver uses librados (which
>> is part of the Ceph server) for direct access to the Ceph object
>> store and is running entirely in userspace. Therefore it is
>> called "rbd" - rados block device.
>>
>> To compile the driver a recent version of ceph (unstable/testing git
>> head or 0.20.3 once it is released) is needed.
>>
>> Additional information is available on the Ceph-Wiki:
>>
>> http://ceph.newdream.net/wiki/Kvm-rbd
>>
>> The patch is based on git://repo.or.cz/qemu/kevin.git block
>>
>>
>> Signed-off-by: Christian Brunner <chb@muc.de>
>> ---
>>  Makefile.objs     |    1 +
>>  block/rbd.c       |  600 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>>  block/rbd_types.h |   64 ++++++
>>  configure         |   31 +++
>>  4 files changed, 696 insertions(+), 0 deletions(-)
>>  create mode 100644 block/rbd.c
>>  create mode 100644 block/rbd_types.h
>>
>> diff --git a/Makefile.objs b/Makefile.objs
>> index 1a942e5..08dc11f 100644
>> --- a/Makefile.objs
>> +++ b/Makefile.objs
>> @@ -18,6 +18,7 @@ block-nested-y += parallels.o nbd.o blkdebug.o
>>  block-nested-$(CONFIG_WIN32) += raw-win32.o
>>  block-nested-$(CONFIG_POSIX) += raw-posix.o
>>  block-nested-$(CONFIG_CURL) += curl.o
>> +block-nested-$(CONFIG_RBD) += rbd.o
>>
>>  block-obj-y +=  $(addprefix block/, $(block-nested-y))
>>
>> diff --git a/block/rbd.c b/block/rbd.c
>> new file mode 100644
>> index 0000000..4a60dda
>> --- /dev/null
>> +++ b/block/rbd.c
>> @@ -0,0 +1,600 @@
>> +/*
>> + * QEMU Block driver for RADOS (Ceph)
>> + *
>> + * Copyright (C) 2010 Christian Brunner <chb@muc.de>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2.  See
>> + * the COPYING file in the top-level directory.
>> + *
>> + */
>> +
>> +#include "qemu-common.h"
>> +#include "qemu-error.h"
>> +#include <sys/types.h>
>> +#include <stdbool.h>
>> +
>> +#include <qemu-common.h>
>> +
>> +#include "rbd_types.h"
>> +#include "module.h"
>> +#include "block_int.h"
>> +
>> +#include <stdio.h>
>> +#include <stdlib.h>
>> +#include <rados/librados.h>
>> +
>> +#include <signal.h>
>> +
>> +/*
>> + * When specifying the image filename use:
>> + *
>> + * rbd:poolname/devicename
>> + *
>> + * poolname must be the name of an existing rados pool
>> + *
>> + * devicename is the basename for all objects used to
>> + * emulate the raw device.
>> + *
>> + * Metadata information (image size, ...) is stored in an
>> + * object with the name "devicename.rbd".
>> + *
>> + * The raw device is split into 4MB sized objects by default.
>> + * The sequencenumber is encoded in a 12 byte long hex-string,
>> + * and is attached to the devicename, separated by a dot.
>> + * e.g. "devicename.1234567890ab"
>> + *
>> + */
>> +
>> +#define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
>> +
>> +typedef struct RBDAIOCB {
>> +    BlockDriverAIOCB common;
>> +    QEMUBH *bh;
>> +    int ret;
>> +    QEMUIOVector *qiov;
>> +    char *bounce;
>> +    int write;
>> +    int64_t sector_num;
>> +    int aiocnt;
>> +    int error;
>> +} RBDAIOCB;
>> +
>> +typedef struct RADOSCB {
>> +    int rcbid;
>> +    RBDAIOCB *acb;
>> +    int done;
>> +    int64_t segsize;
>> +    char *buf;
>> +} RADOSCB;
>> +
>> +typedef struct BDRVRBDState {
>> +    rados_pool_t pool;
>> +    char name[RBD_MAX_OBJ_NAME_SIZE];
>> +    uint64_t size;
>> +    uint64_t objsize;
>> +} BDRVRBDState;
>> +
>> +typedef struct rbd_obj_header_ondisk RbdHeader1;
>> +
>> +static int rbd_parsename(const char *filename, char *pool, char *name)
>> +{
>> +    const char *rbdname;
>> +    char *p;
>> +    int l;
>> +
>> +    if (!strstart(filename, "rbd:", &rbdname)) {
>> +        return -EINVAL;
>> +    }
>> +
>> +    pstrcpy(pool, 2 * RBD_MAX_SEG_NAME_SIZE, rbdname);
>> +    p = strchr(pool, '/');
>> +    if (p == NULL) {
>> +        return -EINVAL;
>> +    }
>> +
>> +    *p = '\0';
>> +
>> +    l = strlen(pool);
>> +    if(l >= RBD_MAX_SEG_NAME_SIZE) {
>> +        error_report("pool name to long");
>> +        return -EINVAL;
>> +    } else if (l <= 0) {
>> +        error_report("pool name to short");
>> +        return -EINVAL;
>> +    }
>> +
>> +    l = strlen(++p);
>> +    if (l >= RBD_MAX_OBJ_NAME_SIZE) {
>> +        error_report("object name to long");
>> +        return -EINVAL;
>> +    } else if (l <= 0) {
>> +        error_report("object name to short");
>> +        return -EINVAL;
>> +    }
>> +
>> +    strcpy(name, p);
>> +
>> +    return l;
>> +}
>> +
>> +static int create_tmap_op(uint8_t op, const char *name, char **tmap_desc)
>> +{
>> +    uint32_t len = strlen(name);
>> +    /* total_len = encoding op + name + empty buffer */
>> +    uint32_t total_len = 1 + (sizeof(uint32_t) + len) + sizeof(uint32_t);
>> +    char *desc = NULL;
>> +
>> +    qemu_malloc(total_len);
>> +
>> +    *tmap_desc = desc;
>> +
>> +    *desc = op;
>> +    desc++;
>> +    memcpy(desc, &len, sizeof(len));
>> +    desc += sizeof(len);
>> +    memcpy(desc, name, len);
>> +    desc += len;
>> +    len = 0;
>> +    memcpy(desc, &len, sizeof(len));
>> +    desc += sizeof(len);
>> +
>> +    return desc - *tmap_desc;
>> +}
>> +
>> +static void free_tmap_op(char *tmap_desc)
>> +{
>> +    qemu_free(tmap_desc);
>> +}
>> +
>> +static int rbd_register_image(rados_pool_t pool, const char *name)
>> +{
>> +    char *tmap_desc;
>> +    const char *dir = RBD_DIRECTORY;
>> +    int ret;
>> +
>> +    ret = create_tmap_op(CEPH_OSD_TMAP_SET, name, &tmap_desc);
>> +    if (ret < 0) {
>> +        return ret;
>> +    }
>> +
>> +    ret = rados_tmap_update(pool, dir, tmap_desc, ret);
>> +    free_tmap_op(tmap_desc);
>> +
>> +    return ret;
>> +}
>> +
>> +static int rbd_create(const char *filename, QEMUOptionParameter *options)
>> +{
>> +    int64_t bytes = 0;
>> +    int64_t objsize;
>> +    uint64_t size;
>> +    time_t mtime;
>> +    uint8_t obj_order = RBD_DEFAULT_OBJ_ORDER;
>> +    char pool[RBD_MAX_SEG_NAME_SIZE];
>> +    char n[RBD_MAX_SEG_NAME_SIZE];
>> +    char name[RBD_MAX_SEG_NAME_SIZE];
>> +    RbdHeader1 header;
>> +    rados_pool_t p;
>> +    int ret;
>> +
>> +    if (rbd_parsename(filename, pool, name) < 0) {
>> +        return -EINVAL;
>> +    }
>> +
>> +    snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s%s", name, RBD_SUFFIX);
>> +
>> +    /* Read out options */
>> +    while (options && options->name) {
>> +        if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
>> +            bytes = options->value.n;
>> +        } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) {
>> +            if (options->value.n) {
>> +                objsize = options->value.n;
>> +                if ((objsize - 1) & objsize) {    /* not a power of 2? */
>> +                    error_report("obj size needs to be power of 2");
>> +                    return -EINVAL;
>> +                }
>> +                if (objsize < 4096) {
>> +                    error_report("obj size too small");
>> +                    return -EINVAL;
>> +                }
>> +
>> +                for (obj_order = 0; obj_order < 64; obj_order++) {
>> +                    if (objsize == 1) {
>> +                        break;
>> +                    }
>> +                    objsize >>= 1;
>> +                }
>> +            }
>> +        }
>> +        options++;
>> +    }
>> +
>> +    memset(&header, 0, sizeof(header));
>> +    pstrcpy(header.text, sizeof(header.text), RBD_HEADER_TEXT);
>> +    pstrcpy(header.signature, sizeof(header.signature), RBD_HEADER_SIGNATURE);
>> +    pstrcpy(header.version, sizeof(header.version), RBD_HEADER_VERSION);
>> +    header.image_size = bytes;
>> +    cpu_to_le64s((uint64_t *) & header.image_size);
>> +    header.options.order = obj_order;
>> +    header.options.crypt_type = RBD_CRYPT_NONE;
>> +    header.options.comp_type = RBD_COMP_NONE;
>> +    header.snap_seq = 0;
>> +    header.snap_count = 0;
>> +    cpu_to_le32s(&header.snap_count);
>> +
>> +    if (rados_initialize(0, NULL) < 0) {
>> +        error_report("error initializing");
>> +        return -EIO;
>> +    }
>> +
>> +    if (rados_open_pool(pool, &p)) {
>> +        error_report("error opening pool %s", pool);
>> +        rados_deinitialize();
>> +        return -EIO;
>> +    }
>> +
>> +    /* check for existing rbd header file */
>> +    ret = rados_stat(p, n, &size, &mtime);
>> +    if (ret == 0) {
>> +        ret=-EEXIST;
>> +        goto done;
>> +    }
>> +
>> +    /* create header file */
>> +    ret = rados_write(p, n, 0, (const char *)&header, sizeof(header));
>> +    if (ret < 0) {
>> +        goto done;
>> +    }
>> +
>> +    ret = rbd_register_image(p, name);
>> +done:
>> +    rados_close_pool(p);
>> +    rados_deinitialize();
>> +
>> +    return ret;
>> +}
>> +
>> +static int rbd_open(BlockDriverState *bs, const char *filename, int flags)
>> +{
>> +    BDRVRBDState *s = bs->opaque;
>> +    char pool[RBD_MAX_SEG_NAME_SIZE];
>> +    char n[RBD_MAX_SEG_NAME_SIZE];
>> +    char hbuf[4096];
>> +    int r;
>> +
>> +    if (rbd_parsename(filename, pool, s->name) < 0) {
>> +        return -EINVAL;
>> +    }
>> +    snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s%s", s->name, RBD_SUFFIX);
>> +
>> +    if ((r = rados_initialize(0, NULL)) < 0) {
>> +        error_report("error initializing");
>> +        return r;
>> +    }
>> +
>> +    if ((r = rados_open_pool(pool, &s->pool))) {
>> +        error_report("error opening pool %s", pool);
>> +        rados_deinitialize();
>> +        return r;
>> +    }
>> +
>> +    if ((r = rados_read(s->pool, n, 0, hbuf, 4096)) < 0) {
>> +        error_report("error reading header from %s", s->name);
>> +        goto failed;
>> +    }
>> +
>> +    if (strncmp(hbuf + 64, RBD_HEADER_SIGNATURE, 4)) {
>> +        error_report("Invalid header signature %s", hbuf + 64);
>> +        r = -EMEDIUMTYPE;
>> +        goto failed;
>> +    }
>> +
>> +    if (strncmp(hbuf + 68, RBD_HEADER_VERSION, 8)) {
>> +        error_report("Unknown image version %s", hbuf + 68);
>> +        r = -EMEDIUMTYPE;
>> +        goto failed;
>> +    }
>> +
>> +    RbdHeader1 *header;
>> +
>> +    header = (RbdHeader1 *) hbuf;
>> +    le64_to_cpus((uint64_t *) & header->image_size);
>> +    s->size = header->image_size;
>> +    s->objsize = 1 << header->options.order;
>> +
>> +    return 0;
>> +
>> +failed:
>> +    rados_close_pool(s->pool);
>> +    rados_deinitialize();
>> +    return r;
>> +}
>> +
>> +static void rbd_close(BlockDriverState *bs)
>> +{
>> +    BDRVRBDState *s = bs->opaque;
>> +
>> +    rados_close_pool(s->pool);
>> +    rados_deinitialize();
>> +}
>> +
>> +static int rbd_rw(BlockDriverState *bs, int64_t sector_num,
>> +                  uint8_t *buf, int nb_sectors, int write)
>> +{
>> +    BDRVRBDState *s = bs->opaque;
>> +    char n[RBD_MAX_SEG_NAME_SIZE];
>> +
>> +    int64_t segnr, segoffs, segsize, r;
>> +    int64_t off, size;
>> +
>> +    off = sector_num * BDRV_SECTOR_SIZE;
>> +    size = nb_sectors * BDRV_SECTOR_SIZE;
>> +    segnr = off / s->objsize;
>> +    segoffs = off % s->objsize;
>> +    segsize = s->objsize - segoffs;
>> +
>> +    while (size > 0) {
>> +        if (size < segsize) {
>> +            segsize = size;
>> +        }
>> +
>> +        snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s.%012" PRIx64, s->name, segnr);
>> +
>> +        if (write) {
>> +            if ((r = rados_write(s->pool, n, segoffs, (const char *)buf,
>> +                segsize)) < 0) {
>> +                return r;
>> +            }
>> +        } else {
>> +            r = rados_read(s->pool, n, segoffs, (char *)buf, segsize);
>> +            if (r == -ENOENT) {
>> +                memset(buf, 0, segsize);
>> +            } else if (r < 0) {
>> +                return r;
>> +            } else if (r < segsize) {
>> +                memset(buf + r, 0, segsize - r);
>> +            }
>> +        }
>> +
>> +        buf += segsize;
>> +        size -= segsize;
>> +        segoffs = 0;
>> +        segsize = s->objsize;
>> +        segnr++;
>> +    }
>> +
>> +    return 0;
>> +}
>> +
>> +static int rbd_read(BlockDriverState *bs, int64_t sector_num,
>> +                    uint8_t *buf, int nb_sectors)
>> +{
>> +    return rbd_rw(bs, sector_num, buf, nb_sectors, 0);
>> +}
>> +
>> +static int rbd_write(BlockDriverState *bs, int64_t sector_num,
>> +                     const uint8_t *buf, int nb_sectors)
>> +{
>> +    return rbd_rw(bs, sector_num, (uint8_t *) buf, nb_sectors, 1);
>> +}
>> +
>> +static void rbd_aio_cancel(BlockDriverAIOCB *blockacb)
>> +{
>> +    RBDAIOCB *acb = (RBDAIOCB *) blockacb;
>> +    qemu_bh_delete(acb->bh);
>> +    acb->bh = NULL;
>> +    qemu_aio_release(acb);
>> +}
>> +
>> +static AIOPool rbd_aio_pool = {
>> +    .aiocb_size = sizeof(RBDAIOCB),
>> +    .cancel = rbd_aio_cancel,
>> +};
>> +
>> +/* This is the callback function for rados_aio_read and _write */
>> +static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
>> +{
>> +    RBDAIOCB *acb = rcb->acb;
>> +    int64_t r;
>> +    int i;
>> +
>> +    acb->aiocnt--;
>> +    r = rados_aio_get_return_value(c);
>> +    rados_aio_release(c);
>> +    if (acb->write) {
>> +        if (r < 0) {
>> +            acb->ret = r;
>> +            acb->error = 1;
>> +        } else if (!acb->error) {
>> +            acb->ret += rcb->segsize;
>> +        }
>> +    } else {
>> +        if (r == -ENOENT) {
>> +            memset(rcb->buf, 0, rcb->segsize);
>> +            if (!acb->error) {
>> +                acb->ret += rcb->segsize;
>> +            }
>> +        } else if (r < 0) {
>> +            acb->ret = r;
>> +            acb->error = 1;
>> +        } else if (r < rcb->segsize) {
>> +            memset(rcb->buf + r, 0, rcb->segsize - r);
>> +            if (!acb->error) {
>> +                acb->ret += rcb->segsize;
>> +            }
>> +        } else if (!acb->error) {
>> +            acb->ret += r;
>> +        }
>> +    }
>> +    qemu_free(rcb);
>> +    i = 0;
>> +    if (!acb->aiocnt && acb->bh) {
>> +        qemu_bh_schedule(acb->bh);
>> +    }
>> +}
>> +
>> +/* Callback when all queued rados_aio requests are complete */
>> +static void rbd_aio_bh_cb(void *opaque)
>> +{
>> +    RBDAIOCB *acb = opaque;
>> +
>> +    if (!acb->write) {
>> +        qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
>> +    }
>> +    qemu_vfree(acb->bounce);
>> +    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
>> +    qemu_bh_delete(acb->bh);
>> +    acb->bh = NULL;
>> +    qemu_aio_release(acb);
>> +}
>> +
>> +static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
>> +                                           int64_t sector_num,
>> +                                           QEMUIOVector *qiov,
>> +                                           int nb_sectors,
>> +                                           BlockDriverCompletionFunc *cb,
>> +                                           void *opaque, int write)
>> +{
>> +    RBDAIOCB *acb;
>> +    RADOSCB *rcb;
>> +    rados_completion_t c;
>> +    char n[RBD_MAX_SEG_NAME_SIZE];
>> +    int64_t segnr, segoffs, segsize, last_segnr;
>> +    int64_t off, size;
>> +    char *buf;
>> +
>> +    BDRVRBDState *s = bs->opaque;
>> +
>> +    acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque);
>> +    acb->write = write;
>> +    acb->qiov = qiov;
>> +    acb->bounce = qemu_blockalign(bs, qiov->size);
>> +    acb->aiocnt = 0;
>> +    acb->ret = 0;
>> +    acb->error = 0;
>> +
>> +    if (!acb->bh) {
>> +        acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
>> +    }
>> +
>> +    if (write) {
>> +        qemu_iovec_to_buffer(acb->qiov, acb->bounce);
>> +    }
>> +
>> +    buf = acb->bounce;
>> +
>> +    off = sector_num * BDRV_SECTOR_SIZE;
>> +    size = nb_sectors * BDRV_SECTOR_SIZE;
>> +    segnr = off / s->objsize;
>> +    segoffs = off % s->objsize;
>> +    segsize = s->objsize - segoffs;
>> +
>> +    last_segnr = ((off + size - 1) / s->objsize);
>> +    acb->aiocnt = (last_segnr - segnr) + 1;
>> +
>> +    while (size > 0) {
>> +        if (size < segsize) {
>> +            segsize = size;
>> +        }
>> +
>> +        snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s.%012llx", s->name,
>> +                 (long long unsigned int)segnr);
>> +
>> +        rcb = qemu_malloc(sizeof(RADOSCB));
>> +        rcb->done = 0;
>> +        rcb->acb = acb;
>> +        rcb->segsize = segsize;
>> +        rcb->buf = buf;
>> +
>> +        if (write) {
>> +            rados_aio_create_completion(rcb, NULL,
>> +                                        (rados_callback_t) rbd_finish_aiocb,
>> +                                        &c);
>> +            rados_aio_write(s->pool, n, segoffs, buf, segsize, c);
>> +        } else {
>> +            rados_aio_create_completion(rcb,
>> +                                        (rados_callback_t) rbd_finish_aiocb,
>> +                                        NULL, &c);
>> +            rados_aio_read(s->pool, n, segoffs, buf, segsize, c);
>> +        }
>> +
>> +        buf += segsize;
>> +        size -= segsize;
>> +        segoffs = 0;
>> +        segsize = s->objsize;
>> +        segnr++;
>> +    }
>> +
>> +    return &acb->common;
>> +}
>> +
>> +static BlockDriverAIOCB *rbd_aio_readv(BlockDriverState * bs,
>> +                                       int64_t sector_num, QEMUIOVector * qiov,
>> +                                       int nb_sectors,
>> +                                       BlockDriverCompletionFunc * cb,
>> +                                       void *opaque)
>> +{
>> +    return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
>> +}
>> +
>> +static BlockDriverAIOCB *rbd_aio_writev(BlockDriverState * bs,
>> +                                        int64_t sector_num, QEMUIOVector * qiov,
>> +                                        int nb_sectors,
>> +                                        BlockDriverCompletionFunc * cb,
>> +                                        void *opaque)
>> +{
>> +    return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
>> +}
>> +
>> +static int rbd_getinfo(BlockDriverState * bs, BlockDriverInfo * bdi)
>> +{
>> +    BDRVRBDState *s = bs->opaque;
>> +    bdi->cluster_size = s->objsize;
>> +    return 0;
>> +}
>> +
>> +static int64_t rbd_getlength(BlockDriverState * bs)
>> +{
>> +    BDRVRBDState *s = bs->opaque;
>> +
>> +    return s->size;
>> +}
>> +
>> +static QEMUOptionParameter rbd_create_options[] = {
>> +    {
>> +     .name = BLOCK_OPT_SIZE,
>> +     .type = OPT_SIZE,
>> +     .help = "Virtual disk size"
>> +    },
>> +    {
>> +     .name = BLOCK_OPT_CLUSTER_SIZE,
>> +     .type = OPT_SIZE,
>> +     .help = "RBD object size"
>> +    },
>> +    {NULL}
>> +};
>> +
>> +static BlockDriver bdrv_rbd = {
>> +    .format_name        = "rbd",
>> +    .instance_size      = sizeof(BDRVRBDState),
>> +    .bdrv_file_open     = rbd_open,
>> +    .bdrv_read          = rbd_read,
>> +    .bdrv_write         = rbd_write,
>> +    .bdrv_close         = rbd_close,
>> +    .bdrv_create        = rbd_create,
>> +    .bdrv_get_info      = rbd_getinfo,
>> +    .create_options     = rbd_create_options,
>> +    .bdrv_getlength     = rbd_getlength,
>> +    .protocol_name      = "rbd",
>> +
>> +    .bdrv_aio_readv     = rbd_aio_readv,
>> +    .bdrv_aio_writev    = rbd_aio_writev,
>> +};
>> +
>> +static void bdrv_rbd_init(void)
>> +{
>> +    bdrv_register(&bdrv_rbd);
>> +}
>> +
>> +block_init(bdrv_rbd_init);
>> diff --git a/block/rbd_types.h b/block/rbd_types.h
>> new file mode 100644
>> index 0000000..91ac4f9
>> --- /dev/null
>> +++ b/block/rbd_types.h
>> @@ -0,0 +1,64 @@
>> +/*
>> + * Ceph - scalable distributed file system
>> + *
>> + * Copyright (C) 2004-2010 Sage Weil <sage@newdream.net>
>> + *
>> + * This is free software; you can redistribute it and/or
>> + * modify it under the terms of the GNU Lesser General Public
>> + * License version 2.1, as published by the Free Software
>> + * Foundation.  See file COPYING.
>> + *
>> + */
>> +
>> +#ifndef QEMU_BLOCK_RBD_TYPES_H
>> +#define QEMU_BLOCK_RBD_TYPES_H
>> +
>> +
>> +/*
>> + * rbd image 'foo' consists of objects
>> + *   foo.rbd      - image metadata
>> + *   foo.00000000
>> + *   foo.00000001
>> + *   ...          - data
>> + */
>> +
>> +#define RBD_SUFFIX              ".rbd"
>> +#define RBD_DIRECTORY           "rbd_directory"
>> +
>> +#define RBD_DEFAULT_OBJ_ORDER   22   /* 4MB */
>> +
>> +#define RBD_MAX_OBJ_NAME_SIZE   96
>> +#define RBD_MAX_SEG_NAME_SIZE   128
>> +
>> +#define RBD_COMP_NONE           0
>> +#define RBD_CRYPT_NONE          0
>> +
>> +#define RBD_HEADER_TEXT         "<<< Rados Block Device Image >>>\n"
>> +#define RBD_HEADER_SIGNATURE    "RBD"
>> +#define RBD_HEADER_VERSION      "001.004"
>> +
>> +struct rbd_obj_snap_ondisk {
>> +    uint64_t id;
>> +    uint64_t image_size;
>> +} __attribute__((packed));
>> +
>> +struct rbd_obj_header_ondisk {
>> +    char text[64];
>> +    char signature[4];
>> +    char version[8];
>> +    struct {
>> +        uint8_t order;
>> +        uint8_t crypt_type;
>> +        uint8_t comp_type;
>> +        uint8_t unused;
>> +    } __attribute__((packed)) options;
>> +    uint64_t image_size;
>> +    uint64_t snap_seq;
>> +    uint32_t snap_count;
>> +    uint32_t reserved;
>> +    uint64_t snap_names_len;
>> +    struct rbd_obj_snap_ondisk snaps[0];
>> +} __attribute__((packed));
>> +
>> +
>> +#endif
>> diff --git a/configure b/configure
>> index 3cd2c5f..3f5c8ce 100755
>> --- a/configure
>> +++ b/configure
>> @@ -299,6 +299,7 @@ pkgversion=""
>>  check_utests="no"
>>  user_pie="no"
>>  zero_malloc=""
>> +rbd=""
>>
>>  # OS specific
>>  if check_define __linux__ ; then
>> @@ -660,6 +661,10 @@ for opt do
>>    ;;
>>    --enable-vhost-net) vhost_net="yes"
>>    ;;
>> +  --disable-rbd) rbd="no"
>> +  ;;
>> +  --enable-rbd) rbd="yes"
>> +  ;;
>>    *) echo "ERROR: unknown option $opt"; show_help="yes"
>>    ;;
>>    esac
>> @@ -826,6 +831,7 @@ echo "  --enable-docs            enable documentation build"
>>  echo "  --disable-docs           disable documentation build"
>>  echo "  --disable-vhost-net      disable vhost-net acceleration support"
>>  echo "  --enable-vhost-net       enable vhost-net acceleration support"
>> +echo "  --enable-rbd          enable building the rados block device (rbd)"
>>  echo ""
>>  echo "NOTE: The object files are built at the place where configure is launched"
>>  exit 1
>> @@ -1579,6 +1585,27 @@ if test "$mingw32" != yes -a "$pthread" = no; then
>>  fi
>>
>>  ##########################################
>> +# rbd probe
>> +if test "$rbd" != "no" ; then
>> +  cat > $TMPC <<EOF
>> +#include <stdio.h>
>> +#include <rados/librados.h>
>> +int main(void) { rados_initialize(0, NULL); return 0; }
>> +EOF
>> +  rbd_libs="-lrados -lcrypto"
>> +  if compile_prog "" "$rbd_libs" ; then
>> +    rbd=yes
>> +    libs_tools="$rbd_libs $libs_tools"
>> +    libs_softmmu="$rbd_libs $libs_softmmu"
>> +  else
>> +    if test "$rbd" = "yes" ; then
>> +      feature_not_found "rados block device"
>> +    fi
>> +    rbd=no
>> +  fi
>> +fi
>> +
>> +##########################################
>>  # linux-aio probe
>>
>>  if test "$linux_aio" != "no" ; then
>> @@ -2041,6 +2068,7 @@ echo "preadv support    $preadv"
>>  echo "fdatasync         $fdatasync"
>>  echo "uuid support      $uuid"
>>  echo "vhost-net support $vhost_net"
>> +echo "rbd support       $rbd"
>>
>>  if test $sdl_too_old = "yes"; then
>>  echo "-> Your SDL version is too old - please upgrade to have SDL support"
>> @@ -2270,6 +2298,9 @@ echo "CONFIG_UNAME_RELEASE=\"$uname_release\"" >> $config_host_mak
>>  if test "$zero_malloc" = "yes" ; then
>>    echo "CONFIG_ZERO_MALLOC=y" >> $config_host_mak
>>  fi
>> +if test "$rbd" = "yes" ; then
>> +  echo "CONFIG_RBD=y" >> $config_host_mak
>> +fi
>>
>>  # USB host support
>>  case "$usb" in
>>
>
> --
> To unsubscribe from this list: send the line "unsubscribe kvm" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
>
--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

^ permalink raw reply	[flat|nested] 19+ messages in thread

* [Qemu-devel] Re: [PATCH] ceph/rbd block driver for qemu-kvm (v3)
@ 2010-06-17 19:05     ` Christian Brunner
  0 siblings, 0 replies; 19+ messages in thread
From: Christian Brunner @ 2010-06-17 19:05 UTC (permalink / raw)
  To: Simone Gotti; +Cc: Kevin Wolf, ceph-devel, qemu-devel, kvm

Hi Simone,

sorry for the late reply. I've been on vacation for a week.

Thanks for sending the patch. At first sight your patch looks good.
I'll do some testing by the weekend.

Kevin also sent me a note about the missing aio support, but I didn't
have the time to implement it yet. Now it seems, that I don't have to
do it, since you where quicker... :)

Regarding locking: There were some problems with the thread handling,
when I started writing the driver. But Yehuda removed the use of
SIGUSERx and Sage modified librados, so that the Ceph Thread class is
masking signals on any new thread it creates. (see
http://ceph.newdream.net/git/?p=ceph.git;a=commit;h=cf4414684dd2ca5f2a565449be4686849695f62f
and http://ceph.newdream.net/git/?p=ceph.git;a=commit;h=e4e775b60f117ba2d07da9e0e438714b409447b6).
I think that this is also sufficient for the aio callbacks.

Regards

Christian

2010/6/11 Simone Gotti <simone.gotti@gmail.com>:
> Hi Christian,
>
> thanks for you patch. I tried it a little and it worked quite well but
> during some live migration tests I noticed a problem.
>
>
> The problem is related to live migration with high I/O using the AIO
> calls (I triggered it with a simple "dd").
>
> If you launch a live migration and the guest is stopped and started on
> the new qemu process while some AIO was in flight the guest on the new
> qemu will wait undefinitely for data this will never come. With ata
> emulation an ata reset is sent after some seconds but with virtio this
> won't happen.
>
> I'm not a qemu expert but from what I understand qemu in
> savevm.c:do_savevm calls qemu_aio_flush to wait that all the asyncronous
> aio returned (the callback si called). But the rbd block driver doesn't
> use the qemu aio model but the rados one so that function will never
> know of the rados aio.
>
> So a solution will be to glue the block driver with the qemu aio model.
> I tried to do this to test if this will work in the attached patch. I
> only tested with one rbd block device but the live migration tests
> worked (in the patch I removed all the debug prints I adedd to see if
> all AIO requets really returned.
>
> This is an RFC just to know what you think about this possible solution.
> As qemu's aio model is event based and it needs a file descriptor for
> event communication i used eventfd to do this.
> Let me know if you need a detailed description of the patch!
>
>
> I've also got a question: as librados is multithreaded the callbacks are
> called in another thread. Is there the need to protect some critical
> sections with a lock (for example in rbd_aio_rw_vector and in
> rbd_finish_aiocb)?
>
>
> Thanks!
>
> Bye!
>
>
> From: Simone Gotti <simone.gotti@gmail.com>
> Date: Fri, 11 Jun 2010 21:19:39 +0200
> Subject: [PATCH] block/rbd: Added glue to qemu aio model to fix live
> migration with outstanding aio
>
> Signed-off-by: Simone Gotti <simone.gotti@gmail.com>
>
>
> ---
>  block/rbd.c |   63
> +++++++++++++++++++++++++++++++++++++++++++++++++++++-----
>  1 files changed, 57 insertions(+), 6 deletions(-)
>
> diff --git a/block/rbd.c b/block/rbd.c
> index 4d22069..83b7898 100644
> --- a/block/rbd.c
> +++ b/block/rbd.c
> @@ -25,6 +25,8 @@
>
>  #include <signal.h>
>
> +#include <sys/eventfd.h>
> +
>  /*
>  * When specifying the image filename use:
>  *
> @@ -47,6 +49,15 @@
>
>  #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
>
> +typedef struct BDRVRBDState {
> +    int efd;
> +    rados_pool_t pool;
> +    char name[RBD_MAX_OBJ_NAME_SIZE];
> +    uint64_t size;
> +    uint64_t objsize;
> +    int qemu_aio_count;
> +} BDRVRBDState;
> +
>  typedef struct RBDAIOCB {
>     BlockDriverAIOCB common;
>     QEMUBH *bh;
> @@ -57,6 +68,7 @@ typedef struct RBDAIOCB {
>     int64_t sector_num;
>     int aiocnt;
>     int error;
> +    BDRVRBDState *s;
>  } RBDAIOCB;
>
>  typedef struct RADOSCB {
> @@ -67,12 +79,6 @@ typedef struct RADOSCB {
>     char *buf;
>  } RADOSCB;
>
> -typedef struct BDRVRBDState {
> -    rados_pool_t pool;
> -    char name[RBD_MAX_OBJ_NAME_SIZE];
> -    uint64_t size;
> -    uint64_t objsize;
> -} BDRVRBDState;
>
>  typedef struct rbd_obj_header_ondisk RbdHeader1;
>
> @@ -255,6 +261,31 @@ done:
>     return ret;
>  }
>
> +static void rbd_aio_completion_cb(void *opaque)
> +{
> +    BDRVRBDState *s = opaque;
> +
> +    uint64_t val;
> +    ssize_t ret;
> +
> +    do {
> +        if ((ret = read(s->efd, &val, sizeof(val))) > 0) {
> +            s->qemu_aio_count -= val;
> +       }
> +    } while (ret == -1 && errno == EINTR);
> +
> +    return;
> +}
> +
> +static int rbd_aio_flush_cb(void *opaque)
> +{
> +    BDRVRBDState *s = opaque;
> +
> +    return (s->qemu_aio_count > 0) ? 1 : 0;
> +}
> +
> +
> +
>  static int rbd_open(BlockDriverState *bs, const char *filename, int flags)
>  {
>     BDRVRBDState *s = bs->opaque;
> @@ -303,6 +334,15 @@ static int rbd_open(BlockDriverState *bs, const
> char *filename, int flags)
>     s->size = header->image_size;
>     s->objsize = 1 << header->options.order;
>
> +    s->efd = eventfd(0, 0);
> +    if (s->efd == -1) {
> +        error_report("error opening eventfd");
> +        goto failed;
> +    }
> +    fcntl(s->efd, F_SETFL, O_NONBLOCK);
> +    qemu_aio_set_fd_handler(s->efd, rbd_aio_completion_cb, NULL,
> +        rbd_aio_flush_cb, NULL, s);
> +
>     return 0;
>
>  failed:
> @@ -393,6 +433,7 @@ static AIOPool rbd_aio_pool = {
>  };
>
>  /* This is the callback function for rados_aio_read and _write */
> +
>  static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
>  {
>     RBDAIOCB *acb = rcb->acb;
> @@ -427,6 +468,8 @@ static void rbd_finish_aiocb(rados_completion_t c,
> RADOSCB *rcb)
>             acb->ret += r;
>         }
>     }
> +    uint64_t buf = 1;
> +    write(acb->s->efd, &buf, sizeof(buf));
>     qemu_free(rcb);
>     i = 0;
>     if (!acb->aiocnt && acb->bh) {
> @@ -435,6 +478,7 @@ static void rbd_finish_aiocb(rados_completion_t c,
> RADOSCB *rcb)
>  }
>
>  /* Callback when all queued rados_aio requests are complete */
> +
>  static void rbd_aio_bh_cb(void *opaque)
>  {
>     RBDAIOCB *acb = opaque;
> @@ -446,6 +490,10 @@ static void rbd_aio_bh_cb(void *opaque)
>     acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
>     qemu_bh_delete(acb->bh);
>     acb->bh = NULL;
> +
> +    uint64_t buf = 1;
> +    write(acb->s->efd, &buf, sizeof(buf));
> +
>     qemu_aio_release(acb);
>  }
>
> @@ -473,6 +521,7 @@ static BlockDriverAIOCB
> *rbd_aio_rw_vector(BlockDriverState *bs,
>     acb->aiocnt = 0;
>     acb->ret = 0;
>     acb->error = 0;
> +    acb->s = s;
>
>     if (!acb->bh) {
>         acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
> @@ -493,6 +542,8 @@ static BlockDriverAIOCB
> *rbd_aio_rw_vector(BlockDriverState *bs,
>     last_segnr = ((off + size - 1) / s->objsize);
>     acb->aiocnt = (last_segnr - segnr) + 1;
>
> +    s->qemu_aio_count+=acb->aiocnt + 1; /* All the RADOSCB and the
> related RBDAIOCB */
> +
>     while (size > 0) {
>         if (size < segsize) {
>             segsize = size;
> --
> 1.7.0.1
>
>
>
>
>
>
> On 05/31/2010 09:31 PM, Christian Brunner wrote:
>> Hi Kevin,
>>
>> here is an updated patch for the ceph/rbd driver. I hope that everything
>> is fine now.
>>
>> Regards,
>> Christian
>>
>>
>> This is a block driver for the distributed file system Ceph
>> (http://ceph.newdream.net/). This driver uses librados (which
>> is part of the Ceph server) for direct access to the Ceph object
>> store and is running entirely in userspace. Therefore it is
>> called "rbd" - rados block device.
>>
>> To compile the driver a recent version of ceph (unstable/testing git
>> head or 0.20.3 once it is released) is needed.
>>
>> Additional information is available on the Ceph-Wiki:
>>
>> http://ceph.newdream.net/wiki/Kvm-rbd
>>
>> The patch is based on git://repo.or.cz/qemu/kevin.git block
>>
>>
>> Signed-off-by: Christian Brunner <chb@muc.de>
>> ---
>>  Makefile.objs     |    1 +
>>  block/rbd.c       |  600 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>>  block/rbd_types.h |   64 ++++++
>>  configure         |   31 +++
>>  4 files changed, 696 insertions(+), 0 deletions(-)
>>  create mode 100644 block/rbd.c
>>  create mode 100644 block/rbd_types.h
>>
>> diff --git a/Makefile.objs b/Makefile.objs
>> index 1a942e5..08dc11f 100644
>> --- a/Makefile.objs
>> +++ b/Makefile.objs
>> @@ -18,6 +18,7 @@ block-nested-y += parallels.o nbd.o blkdebug.o
>>  block-nested-$(CONFIG_WIN32) += raw-win32.o
>>  block-nested-$(CONFIG_POSIX) += raw-posix.o
>>  block-nested-$(CONFIG_CURL) += curl.o
>> +block-nested-$(CONFIG_RBD) += rbd.o
>>
>>  block-obj-y +=  $(addprefix block/, $(block-nested-y))
>>
>> diff --git a/block/rbd.c b/block/rbd.c
>> new file mode 100644
>> index 0000000..4a60dda
>> --- /dev/null
>> +++ b/block/rbd.c
>> @@ -0,0 +1,600 @@
>> +/*
>> + * QEMU Block driver for RADOS (Ceph)
>> + *
>> + * Copyright (C) 2010 Christian Brunner <chb@muc.de>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2.  See
>> + * the COPYING file in the top-level directory.
>> + *
>> + */
>> +
>> +#include "qemu-common.h"
>> +#include "qemu-error.h"
>> +#include <sys/types.h>
>> +#include <stdbool.h>
>> +
>> +#include <qemu-common.h>
>> +
>> +#include "rbd_types.h"
>> +#include "module.h"
>> +#include "block_int.h"
>> +
>> +#include <stdio.h>
>> +#include <stdlib.h>
>> +#include <rados/librados.h>
>> +
>> +#include <signal.h>
>> +
>> +/*
>> + * When specifying the image filename use:
>> + *
>> + * rbd:poolname/devicename
>> + *
>> + * poolname must be the name of an existing rados pool
>> + *
>> + * devicename is the basename for all objects used to
>> + * emulate the raw device.
>> + *
>> + * Metadata information (image size, ...) is stored in an
>> + * object with the name "devicename.rbd".
>> + *
>> + * The raw device is split into 4MB sized objects by default.
>> + * The sequencenumber is encoded in a 12 byte long hex-string,
>> + * and is attached to the devicename, separated by a dot.
>> + * e.g. "devicename.1234567890ab"
>> + *
>> + */
>> +
>> +#define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
>> +
>> +typedef struct RBDAIOCB {
>> +    BlockDriverAIOCB common;
>> +    QEMUBH *bh;
>> +    int ret;
>> +    QEMUIOVector *qiov;
>> +    char *bounce;
>> +    int write;
>> +    int64_t sector_num;
>> +    int aiocnt;
>> +    int error;
>> +} RBDAIOCB;
>> +
>> +typedef struct RADOSCB {
>> +    int rcbid;
>> +    RBDAIOCB *acb;
>> +    int done;
>> +    int64_t segsize;
>> +    char *buf;
>> +} RADOSCB;
>> +
>> +typedef struct BDRVRBDState {
>> +    rados_pool_t pool;
>> +    char name[RBD_MAX_OBJ_NAME_SIZE];
>> +    uint64_t size;
>> +    uint64_t objsize;
>> +} BDRVRBDState;
>> +
>> +typedef struct rbd_obj_header_ondisk RbdHeader1;
>> +
>> +static int rbd_parsename(const char *filename, char *pool, char *name)
>> +{
>> +    const char *rbdname;
>> +    char *p;
>> +    int l;
>> +
>> +    if (!strstart(filename, "rbd:", &rbdname)) {
>> +        return -EINVAL;
>> +    }
>> +
>> +    pstrcpy(pool, 2 * RBD_MAX_SEG_NAME_SIZE, rbdname);
>> +    p = strchr(pool, '/');
>> +    if (p == NULL) {
>> +        return -EINVAL;
>> +    }
>> +
>> +    *p = '\0';
>> +
>> +    l = strlen(pool);
>> +    if(l >= RBD_MAX_SEG_NAME_SIZE) {
>> +        error_report("pool name to long");
>> +        return -EINVAL;
>> +    } else if (l <= 0) {
>> +        error_report("pool name to short");
>> +        return -EINVAL;
>> +    }
>> +
>> +    l = strlen(++p);
>> +    if (l >= RBD_MAX_OBJ_NAME_SIZE) {
>> +        error_report("object name to long");
>> +        return -EINVAL;
>> +    } else if (l <= 0) {
>> +        error_report("object name to short");
>> +        return -EINVAL;
>> +    }
>> +
>> +    strcpy(name, p);
>> +
>> +    return l;
>> +}
>> +
>> +static int create_tmap_op(uint8_t op, const char *name, char **tmap_desc)
>> +{
>> +    uint32_t len = strlen(name);
>> +    /* total_len = encoding op + name + empty buffer */
>> +    uint32_t total_len = 1 + (sizeof(uint32_t) + len) + sizeof(uint32_t);
>> +    char *desc = NULL;
>> +
>> +    qemu_malloc(total_len);
>> +
>> +    *tmap_desc = desc;
>> +
>> +    *desc = op;
>> +    desc++;
>> +    memcpy(desc, &len, sizeof(len));
>> +    desc += sizeof(len);
>> +    memcpy(desc, name, len);
>> +    desc += len;
>> +    len = 0;
>> +    memcpy(desc, &len, sizeof(len));
>> +    desc += sizeof(len);
>> +
>> +    return desc - *tmap_desc;
>> +}
>> +
>> +static void free_tmap_op(char *tmap_desc)
>> +{
>> +    qemu_free(tmap_desc);
>> +}
>> +
>> +static int rbd_register_image(rados_pool_t pool, const char *name)
>> +{
>> +    char *tmap_desc;
>> +    const char *dir = RBD_DIRECTORY;
>> +    int ret;
>> +
>> +    ret = create_tmap_op(CEPH_OSD_TMAP_SET, name, &tmap_desc);
>> +    if (ret < 0) {
>> +        return ret;
>> +    }
>> +
>> +    ret = rados_tmap_update(pool, dir, tmap_desc, ret);
>> +    free_tmap_op(tmap_desc);
>> +
>> +    return ret;
>> +}
>> +
>> +static int rbd_create(const char *filename, QEMUOptionParameter *options)
>> +{
>> +    int64_t bytes = 0;
>> +    int64_t objsize;
>> +    uint64_t size;
>> +    time_t mtime;
>> +    uint8_t obj_order = RBD_DEFAULT_OBJ_ORDER;
>> +    char pool[RBD_MAX_SEG_NAME_SIZE];
>> +    char n[RBD_MAX_SEG_NAME_SIZE];
>> +    char name[RBD_MAX_SEG_NAME_SIZE];
>> +    RbdHeader1 header;
>> +    rados_pool_t p;
>> +    int ret;
>> +
>> +    if (rbd_parsename(filename, pool, name) < 0) {
>> +        return -EINVAL;
>> +    }
>> +
>> +    snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s%s", name, RBD_SUFFIX);
>> +
>> +    /* Read out options */
>> +    while (options && options->name) {
>> +        if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
>> +            bytes = options->value.n;
>> +        } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) {
>> +            if (options->value.n) {
>> +                objsize = options->value.n;
>> +                if ((objsize - 1) & objsize) {    /* not a power of 2? */
>> +                    error_report("obj size needs to be power of 2");
>> +                    return -EINVAL;
>> +                }
>> +                if (objsize < 4096) {
>> +                    error_report("obj size too small");
>> +                    return -EINVAL;
>> +                }
>> +
>> +                for (obj_order = 0; obj_order < 64; obj_order++) {
>> +                    if (objsize == 1) {
>> +                        break;
>> +                    }
>> +                    objsize >>= 1;
>> +                }
>> +            }
>> +        }
>> +        options++;
>> +    }
>> +
>> +    memset(&header, 0, sizeof(header));
>> +    pstrcpy(header.text, sizeof(header.text), RBD_HEADER_TEXT);
>> +    pstrcpy(header.signature, sizeof(header.signature), RBD_HEADER_SIGNATURE);
>> +    pstrcpy(header.version, sizeof(header.version), RBD_HEADER_VERSION);
>> +    header.image_size = bytes;
>> +    cpu_to_le64s((uint64_t *) & header.image_size);
>> +    header.options.order = obj_order;
>> +    header.options.crypt_type = RBD_CRYPT_NONE;
>> +    header.options.comp_type = RBD_COMP_NONE;
>> +    header.snap_seq = 0;
>> +    header.snap_count = 0;
>> +    cpu_to_le32s(&header.snap_count);
>> +
>> +    if (rados_initialize(0, NULL) < 0) {
>> +        error_report("error initializing");
>> +        return -EIO;
>> +    }
>> +
>> +    if (rados_open_pool(pool, &p)) {
>> +        error_report("error opening pool %s", pool);
>> +        rados_deinitialize();
>> +        return -EIO;
>> +    }
>> +
>> +    /* check for existing rbd header file */
>> +    ret = rados_stat(p, n, &size, &mtime);
>> +    if (ret == 0) {
>> +        ret=-EEXIST;
>> +        goto done;
>> +    }
>> +
>> +    /* create header file */
>> +    ret = rados_write(p, n, 0, (const char *)&header, sizeof(header));
>> +    if (ret < 0) {
>> +        goto done;
>> +    }
>> +
>> +    ret = rbd_register_image(p, name);
>> +done:
>> +    rados_close_pool(p);
>> +    rados_deinitialize();
>> +
>> +    return ret;
>> +}
>> +
>> +static int rbd_open(BlockDriverState *bs, const char *filename, int flags)
>> +{
>> +    BDRVRBDState *s = bs->opaque;
>> +    char pool[RBD_MAX_SEG_NAME_SIZE];
>> +    char n[RBD_MAX_SEG_NAME_SIZE];
>> +    char hbuf[4096];
>> +    int r;
>> +
>> +    if (rbd_parsename(filename, pool, s->name) < 0) {
>> +        return -EINVAL;
>> +    }
>> +    snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s%s", s->name, RBD_SUFFIX);
>> +
>> +    if ((r = rados_initialize(0, NULL)) < 0) {
>> +        error_report("error initializing");
>> +        return r;
>> +    }
>> +
>> +    if ((r = rados_open_pool(pool, &s->pool))) {
>> +        error_report("error opening pool %s", pool);
>> +        rados_deinitialize();
>> +        return r;
>> +    }
>> +
>> +    if ((r = rados_read(s->pool, n, 0, hbuf, 4096)) < 0) {
>> +        error_report("error reading header from %s", s->name);
>> +        goto failed;
>> +    }
>> +
>> +    if (strncmp(hbuf + 64, RBD_HEADER_SIGNATURE, 4)) {
>> +        error_report("Invalid header signature %s", hbuf + 64);
>> +        r = -EMEDIUMTYPE;
>> +        goto failed;
>> +    }
>> +
>> +    if (strncmp(hbuf + 68, RBD_HEADER_VERSION, 8)) {
>> +        error_report("Unknown image version %s", hbuf + 68);
>> +        r = -EMEDIUMTYPE;
>> +        goto failed;
>> +    }
>> +
>> +    RbdHeader1 *header;
>> +
>> +    header = (RbdHeader1 *) hbuf;
>> +    le64_to_cpus((uint64_t *) & header->image_size);
>> +    s->size = header->image_size;
>> +    s->objsize = 1 << header->options.order;
>> +
>> +    return 0;
>> +
>> +failed:
>> +    rados_close_pool(s->pool);
>> +    rados_deinitialize();
>> +    return r;
>> +}
>> +
>> +static void rbd_close(BlockDriverState *bs)
>> +{
>> +    BDRVRBDState *s = bs->opaque;
>> +
>> +    rados_close_pool(s->pool);
>> +    rados_deinitialize();
>> +}
>> +
>> +static int rbd_rw(BlockDriverState *bs, int64_t sector_num,
>> +                  uint8_t *buf, int nb_sectors, int write)
>> +{
>> +    BDRVRBDState *s = bs->opaque;
>> +    char n[RBD_MAX_SEG_NAME_SIZE];
>> +
>> +    int64_t segnr, segoffs, segsize, r;
>> +    int64_t off, size;
>> +
>> +    off = sector_num * BDRV_SECTOR_SIZE;
>> +    size = nb_sectors * BDRV_SECTOR_SIZE;
>> +    segnr = off / s->objsize;
>> +    segoffs = off % s->objsize;
>> +    segsize = s->objsize - segoffs;
>> +
>> +    while (size > 0) {
>> +        if (size < segsize) {
>> +            segsize = size;
>> +        }
>> +
>> +        snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s.%012" PRIx64, s->name, segnr);
>> +
>> +        if (write) {
>> +            if ((r = rados_write(s->pool, n, segoffs, (const char *)buf,
>> +                segsize)) < 0) {
>> +                return r;
>> +            }
>> +        } else {
>> +            r = rados_read(s->pool, n, segoffs, (char *)buf, segsize);
>> +            if (r == -ENOENT) {
>> +                memset(buf, 0, segsize);
>> +            } else if (r < 0) {
>> +                return r;
>> +            } else if (r < segsize) {
>> +                memset(buf + r, 0, segsize - r);
>> +            }
>> +        }
>> +
>> +        buf += segsize;
>> +        size -= segsize;
>> +        segoffs = 0;
>> +        segsize = s->objsize;
>> +        segnr++;
>> +    }
>> +
>> +    return 0;
>> +}
>> +
>> +static int rbd_read(BlockDriverState *bs, int64_t sector_num,
>> +                    uint8_t *buf, int nb_sectors)
>> +{
>> +    return rbd_rw(bs, sector_num, buf, nb_sectors, 0);
>> +}
>> +
>> +static int rbd_write(BlockDriverState *bs, int64_t sector_num,
>> +                     const uint8_t *buf, int nb_sectors)
>> +{
>> +    return rbd_rw(bs, sector_num, (uint8_t *) buf, nb_sectors, 1);
>> +}
>> +
>> +static void rbd_aio_cancel(BlockDriverAIOCB *blockacb)
>> +{
>> +    RBDAIOCB *acb = (RBDAIOCB *) blockacb;
>> +    qemu_bh_delete(acb->bh);
>> +    acb->bh = NULL;
>> +    qemu_aio_release(acb);
>> +}
>> +
>> +static AIOPool rbd_aio_pool = {
>> +    .aiocb_size = sizeof(RBDAIOCB),
>> +    .cancel = rbd_aio_cancel,
>> +};
>> +
>> +/* This is the callback function for rados_aio_read and _write */
>> +static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
>> +{
>> +    RBDAIOCB *acb = rcb->acb;
>> +    int64_t r;
>> +    int i;
>> +
>> +    acb->aiocnt--;
>> +    r = rados_aio_get_return_value(c);
>> +    rados_aio_release(c);
>> +    if (acb->write) {
>> +        if (r < 0) {
>> +            acb->ret = r;
>> +            acb->error = 1;
>> +        } else if (!acb->error) {
>> +            acb->ret += rcb->segsize;
>> +        }
>> +    } else {
>> +        if (r == -ENOENT) {
>> +            memset(rcb->buf, 0, rcb->segsize);
>> +            if (!acb->error) {
>> +                acb->ret += rcb->segsize;
>> +            }
>> +        } else if (r < 0) {
>> +            acb->ret = r;
>> +            acb->error = 1;
>> +        } else if (r < rcb->segsize) {
>> +            memset(rcb->buf + r, 0, rcb->segsize - r);
>> +            if (!acb->error) {
>> +                acb->ret += rcb->segsize;
>> +            }
>> +        } else if (!acb->error) {
>> +            acb->ret += r;
>> +        }
>> +    }
>> +    qemu_free(rcb);
>> +    i = 0;
>> +    if (!acb->aiocnt && acb->bh) {
>> +        qemu_bh_schedule(acb->bh);
>> +    }
>> +}
>> +
>> +/* Callback when all queued rados_aio requests are complete */
>> +static void rbd_aio_bh_cb(void *opaque)
>> +{
>> +    RBDAIOCB *acb = opaque;
>> +
>> +    if (!acb->write) {
>> +        qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
>> +    }
>> +    qemu_vfree(acb->bounce);
>> +    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
>> +    qemu_bh_delete(acb->bh);
>> +    acb->bh = NULL;
>> +    qemu_aio_release(acb);
>> +}
>> +
>> +static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
>> +                                           int64_t sector_num,
>> +                                           QEMUIOVector *qiov,
>> +                                           int nb_sectors,
>> +                                           BlockDriverCompletionFunc *cb,
>> +                                           void *opaque, int write)
>> +{
>> +    RBDAIOCB *acb;
>> +    RADOSCB *rcb;
>> +    rados_completion_t c;
>> +    char n[RBD_MAX_SEG_NAME_SIZE];
>> +    int64_t segnr, segoffs, segsize, last_segnr;
>> +    int64_t off, size;
>> +    char *buf;
>> +
>> +    BDRVRBDState *s = bs->opaque;
>> +
>> +    acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque);
>> +    acb->write = write;
>> +    acb->qiov = qiov;
>> +    acb->bounce = qemu_blockalign(bs, qiov->size);
>> +    acb->aiocnt = 0;
>> +    acb->ret = 0;
>> +    acb->error = 0;
>> +
>> +    if (!acb->bh) {
>> +        acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
>> +    }
>> +
>> +    if (write) {
>> +        qemu_iovec_to_buffer(acb->qiov, acb->bounce);
>> +    }
>> +
>> +    buf = acb->bounce;
>> +
>> +    off = sector_num * BDRV_SECTOR_SIZE;
>> +    size = nb_sectors * BDRV_SECTOR_SIZE;
>> +    segnr = off / s->objsize;
>> +    segoffs = off % s->objsize;
>> +    segsize = s->objsize - segoffs;
>> +
>> +    last_segnr = ((off + size - 1) / s->objsize);
>> +    acb->aiocnt = (last_segnr - segnr) + 1;
>> +
>> +    while (size > 0) {
>> +        if (size < segsize) {
>> +            segsize = size;
>> +        }
>> +
>> +        snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s.%012llx", s->name,
>> +                 (long long unsigned int)segnr);
>> +
>> +        rcb = qemu_malloc(sizeof(RADOSCB));
>> +        rcb->done = 0;
>> +        rcb->acb = acb;
>> +        rcb->segsize = segsize;
>> +        rcb->buf = buf;
>> +
>> +        if (write) {
>> +            rados_aio_create_completion(rcb, NULL,
>> +                                        (rados_callback_t) rbd_finish_aiocb,
>> +                                        &c);
>> +            rados_aio_write(s->pool, n, segoffs, buf, segsize, c);
>> +        } else {
>> +            rados_aio_create_completion(rcb,
>> +                                        (rados_callback_t) rbd_finish_aiocb,
>> +                                        NULL, &c);
>> +            rados_aio_read(s->pool, n, segoffs, buf, segsize, c);
>> +        }
>> +
>> +        buf += segsize;
>> +        size -= segsize;
>> +        segoffs = 0;
>> +        segsize = s->objsize;
>> +        segnr++;
>> +    }
>> +
>> +    return &acb->common;
>> +}
>> +
>> +static BlockDriverAIOCB *rbd_aio_readv(BlockDriverState * bs,
>> +                                       int64_t sector_num, QEMUIOVector * qiov,
>> +                                       int nb_sectors,
>> +                                       BlockDriverCompletionFunc * cb,
>> +                                       void *opaque)
>> +{
>> +    return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
>> +}
>> +
>> +static BlockDriverAIOCB *rbd_aio_writev(BlockDriverState * bs,
>> +                                        int64_t sector_num, QEMUIOVector * qiov,
>> +                                        int nb_sectors,
>> +                                        BlockDriverCompletionFunc * cb,
>> +                                        void *opaque)
>> +{
>> +    return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
>> +}
>> +
>> +static int rbd_getinfo(BlockDriverState * bs, BlockDriverInfo * bdi)
>> +{
>> +    BDRVRBDState *s = bs->opaque;
>> +    bdi->cluster_size = s->objsize;
>> +    return 0;
>> +}
>> +
>> +static int64_t rbd_getlength(BlockDriverState * bs)
>> +{
>> +    BDRVRBDState *s = bs->opaque;
>> +
>> +    return s->size;
>> +}
>> +
>> +static QEMUOptionParameter rbd_create_options[] = {
>> +    {
>> +     .name = BLOCK_OPT_SIZE,
>> +     .type = OPT_SIZE,
>> +     .help = "Virtual disk size"
>> +    },
>> +    {
>> +     .name = BLOCK_OPT_CLUSTER_SIZE,
>> +     .type = OPT_SIZE,
>> +     .help = "RBD object size"
>> +    },
>> +    {NULL}
>> +};
>> +
>> +static BlockDriver bdrv_rbd = {
>> +    .format_name        = "rbd",
>> +    .instance_size      = sizeof(BDRVRBDState),
>> +    .bdrv_file_open     = rbd_open,
>> +    .bdrv_read          = rbd_read,
>> +    .bdrv_write         = rbd_write,
>> +    .bdrv_close         = rbd_close,
>> +    .bdrv_create        = rbd_create,
>> +    .bdrv_get_info      = rbd_getinfo,
>> +    .create_options     = rbd_create_options,
>> +    .bdrv_getlength     = rbd_getlength,
>> +    .protocol_name      = "rbd",
>> +
>> +    .bdrv_aio_readv     = rbd_aio_readv,
>> +    .bdrv_aio_writev    = rbd_aio_writev,
>> +};
>> +
>> +static void bdrv_rbd_init(void)
>> +{
>> +    bdrv_register(&bdrv_rbd);
>> +}
>> +
>> +block_init(bdrv_rbd_init);
>> diff --git a/block/rbd_types.h b/block/rbd_types.h
>> new file mode 100644
>> index 0000000..91ac4f9
>> --- /dev/null
>> +++ b/block/rbd_types.h
>> @@ -0,0 +1,64 @@
>> +/*
>> + * Ceph - scalable distributed file system
>> + *
>> + * Copyright (C) 2004-2010 Sage Weil <sage@newdream.net>
>> + *
>> + * This is free software; you can redistribute it and/or
>> + * modify it under the terms of the GNU Lesser General Public
>> + * License version 2.1, as published by the Free Software
>> + * Foundation.  See file COPYING.
>> + *
>> + */
>> +
>> +#ifndef QEMU_BLOCK_RBD_TYPES_H
>> +#define QEMU_BLOCK_RBD_TYPES_H
>> +
>> +
>> +/*
>> + * rbd image 'foo' consists of objects
>> + *   foo.rbd      - image metadata
>> + *   foo.00000000
>> + *   foo.00000001
>> + *   ...          - data
>> + */
>> +
>> +#define RBD_SUFFIX              ".rbd"
>> +#define RBD_DIRECTORY           "rbd_directory"
>> +
>> +#define RBD_DEFAULT_OBJ_ORDER   22   /* 4MB */
>> +
>> +#define RBD_MAX_OBJ_NAME_SIZE   96
>> +#define RBD_MAX_SEG_NAME_SIZE   128
>> +
>> +#define RBD_COMP_NONE           0
>> +#define RBD_CRYPT_NONE          0
>> +
>> +#define RBD_HEADER_TEXT         "<<< Rados Block Device Image >>>\n"
>> +#define RBD_HEADER_SIGNATURE    "RBD"
>> +#define RBD_HEADER_VERSION      "001.004"
>> +
>> +struct rbd_obj_snap_ondisk {
>> +    uint64_t id;
>> +    uint64_t image_size;
>> +} __attribute__((packed));
>> +
>> +struct rbd_obj_header_ondisk {
>> +    char text[64];
>> +    char signature[4];
>> +    char version[8];
>> +    struct {
>> +        uint8_t order;
>> +        uint8_t crypt_type;
>> +        uint8_t comp_type;
>> +        uint8_t unused;
>> +    } __attribute__((packed)) options;
>> +    uint64_t image_size;
>> +    uint64_t snap_seq;
>> +    uint32_t snap_count;
>> +    uint32_t reserved;
>> +    uint64_t snap_names_len;
>> +    struct rbd_obj_snap_ondisk snaps[0];
>> +} __attribute__((packed));
>> +
>> +
>> +#endif
>> diff --git a/configure b/configure
>> index 3cd2c5f..3f5c8ce 100755
>> --- a/configure
>> +++ b/configure
>> @@ -299,6 +299,7 @@ pkgversion=""
>>  check_utests="no"
>>  user_pie="no"
>>  zero_malloc=""
>> +rbd=""
>>
>>  # OS specific
>>  if check_define __linux__ ; then
>> @@ -660,6 +661,10 @@ for opt do
>>    ;;
>>    --enable-vhost-net) vhost_net="yes"
>>    ;;
>> +  --disable-rbd) rbd="no"
>> +  ;;
>> +  --enable-rbd) rbd="yes"
>> +  ;;
>>    *) echo "ERROR: unknown option $opt"; show_help="yes"
>>    ;;
>>    esac
>> @@ -826,6 +831,7 @@ echo "  --enable-docs            enable documentation build"
>>  echo "  --disable-docs           disable documentation build"
>>  echo "  --disable-vhost-net      disable vhost-net acceleration support"
>>  echo "  --enable-vhost-net       enable vhost-net acceleration support"
>> +echo "  --enable-rbd          enable building the rados block device (rbd)"
>>  echo ""
>>  echo "NOTE: The object files are built at the place where configure is launched"
>>  exit 1
>> @@ -1579,6 +1585,27 @@ if test "$mingw32" != yes -a "$pthread" = no; then
>>  fi
>>
>>  ##########################################
>> +# rbd probe
>> +if test "$rbd" != "no" ; then
>> +  cat > $TMPC <<EOF
>> +#include <stdio.h>
>> +#include <rados/librados.h>
>> +int main(void) { rados_initialize(0, NULL); return 0; }
>> +EOF
>> +  rbd_libs="-lrados -lcrypto"
>> +  if compile_prog "" "$rbd_libs" ; then
>> +    rbd=yes
>> +    libs_tools="$rbd_libs $libs_tools"
>> +    libs_softmmu="$rbd_libs $libs_softmmu"
>> +  else
>> +    if test "$rbd" = "yes" ; then
>> +      feature_not_found "rados block device"
>> +    fi
>> +    rbd=no
>> +  fi
>> +fi
>> +
>> +##########################################
>>  # linux-aio probe
>>
>>  if test "$linux_aio" != "no" ; then
>> @@ -2041,6 +2068,7 @@ echo "preadv support    $preadv"
>>  echo "fdatasync         $fdatasync"
>>  echo "uuid support      $uuid"
>>  echo "vhost-net support $vhost_net"
>> +echo "rbd support       $rbd"
>>
>>  if test $sdl_too_old = "yes"; then
>>  echo "-> Your SDL version is too old - please upgrade to have SDL support"
>> @@ -2270,6 +2298,9 @@ echo "CONFIG_UNAME_RELEASE=\"$uname_release\"" >> $config_host_mak
>>  if test "$zero_malloc" = "yes" ; then
>>    echo "CONFIG_ZERO_MALLOC=y" >> $config_host_mak
>>  fi
>> +if test "$rbd" = "yes" ; then
>> +  echo "CONFIG_RBD=y" >> $config_host_mak
>> +fi
>>
>>  # USB host support
>>  case "$usb" in
>>
>
> --
> To unsubscribe from this list: send the line "unsubscribe kvm" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
>

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [Qemu-devel] Re: [PATCH] ceph/rbd block driver for qemu-kvm (v3)
  2010-06-17 19:05     ` [Qemu-devel] " Christian Brunner
@ 2010-06-18 10:09       ` Kevin Wolf
  -1 siblings, 0 replies; 19+ messages in thread
From: Kevin Wolf @ 2010-06-18 10:09 UTC (permalink / raw)
  To: Christian Brunner; +Cc: Simone Gotti, ceph-devel, qemu-devel, kvm

Am 17.06.2010 21:05, schrieb Christian Brunner:
> Hi Simone,
> 
> sorry for the late reply. I've been on vacation for a week.
> 
> Thanks for sending the patch. At first sight your patch looks good.
> I'll do some testing by the weekend.
> 
> Kevin also sent me a note about the missing aio support, but I didn't
> have the time to implement it yet. Now it seems, that I don't have to
> do it, since you where quicker... :)

Are you going to send a final version which includes Simone's patch or
should I apply them as two patches and just accept that rbd is broken
after the first one? Or were there any other problems that need to be
solved first?

Kevin

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [Qemu-devel] Re: [PATCH] ceph/rbd block driver for qemu-kvm (v3)
@ 2010-06-18 10:09       ` Kevin Wolf
  0 siblings, 0 replies; 19+ messages in thread
From: Kevin Wolf @ 2010-06-18 10:09 UTC (permalink / raw)
  To: Christian Brunner; +Cc: ceph-devel, Simone Gotti, qemu-devel, kvm

Am 17.06.2010 21:05, schrieb Christian Brunner:
> Hi Simone,
> 
> sorry for the late reply. I've been on vacation for a week.
> 
> Thanks for sending the patch. At first sight your patch looks good.
> I'll do some testing by the weekend.
> 
> Kevin also sent me a note about the missing aio support, but I didn't
> have the time to implement it yet. Now it seems, that I don't have to
> do it, since you where quicker... :)

Are you going to send a final version which includes Simone's patch or
should I apply them as two patches and just accept that rbd is broken
after the first one? Or were there any other problems that need to be
solved first?

Kevin

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [Qemu-devel] Re: [PATCH] ceph/rbd block driver for qemu-kvm (v3)
  2010-06-18 10:09       ` Kevin Wolf
@ 2010-06-19 15:48         ` Christian Brunner
  -1 siblings, 0 replies; 19+ messages in thread
From: Christian Brunner @ 2010-06-19 15:48 UTC (permalink / raw)
  To: Kevin Wolf; +Cc: Simone Gotti, ceph-devel, qemu-devel, kvm

>
> Are you going to send a final version which includes Simone's patch or
> should I apply them as two patches and just accept that rbd is broken
> after the first one? Or were there any other problems that need to be
> solved first?

I'll send a final version, when I've tested everything.

There is another problem with very large i/o requests. I suspect that
this can be triggered only
with qemu-io and not in kvm, but I'll try to get a proper solution it anyway.

Christian

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [Qemu-devel] Re: [PATCH] ceph/rbd block driver for qemu-kvm (v3)
@ 2010-06-19 15:48         ` Christian Brunner
  0 siblings, 0 replies; 19+ messages in thread
From: Christian Brunner @ 2010-06-19 15:48 UTC (permalink / raw)
  To: Kevin Wolf; +Cc: ceph-devel, Simone Gotti, qemu-devel, kvm

>
> Are you going to send a final version which includes Simone's patch or
> should I apply them as two patches and just accept that rbd is broken
> after the first one? Or were there any other problems that need to be
> solved first?

I'll send a final version, when I've tested everything.

There is another problem with very large i/o requests. I suspect that
this can be triggered only
with qemu-io and not in kvm, but I'll try to get a proper solution it anyway.

Christian

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [Qemu-devel] Re: [PATCH] ceph/rbd block driver for qemu-kvm (v3)
  2010-06-19 15:48         ` Christian Brunner
@ 2010-07-13 18:27           ` Yehuda Sadeh Weinraub
  -1 siblings, 0 replies; 19+ messages in thread
From: Yehuda Sadeh Weinraub @ 2010-07-13 18:27 UTC (permalink / raw)
  To: Christian Brunner; +Cc: Kevin Wolf, Simone Gotti, ceph-devel, qemu-devel, kvm

On Sat, Jun 19, 2010 at 8:48 AM, Christian Brunner <chb@muc.de> wrote:
>>
>> Are you going to send a final version which includes Simone's patch or
>> should I apply them as two patches and just accept that rbd is broken
>> after the first one? Or were there any other problems that need to be
>> solved first?
>
> I'll send a final version, when I've tested everything.
>
> There is another problem with very large i/o requests. I suspect that
> this can be triggered only
> with qemu-io and not in kvm, but I'll try to get a proper solution it anyway.
>

Have you made any progress with this issue? Just note that there were
a few changes we introduced recently (a format change that allows
renaming of rbd images, and some snapshots support), so everything
will needed to be reposted once we figure out the aio issue.

Thanks,
Yehuda

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [Qemu-devel] Re: [PATCH] ceph/rbd block driver for qemu-kvm (v3)
@ 2010-07-13 18:27           ` Yehuda Sadeh Weinraub
  0 siblings, 0 replies; 19+ messages in thread
From: Yehuda Sadeh Weinraub @ 2010-07-13 18:27 UTC (permalink / raw)
  To: Christian Brunner; +Cc: Kevin Wolf, ceph-devel, Simone Gotti, qemu-devel, kvm

On Sat, Jun 19, 2010 at 8:48 AM, Christian Brunner <chb@muc.de> wrote:
>>
>> Are you going to send a final version which includes Simone's patch or
>> should I apply them as two patches and just accept that rbd is broken
>> after the first one? Or were there any other problems that need to be
>> solved first?
>
> I'll send a final version, when I've tested everything.
>
> There is another problem with very large i/o requests. I suspect that
> this can be triggered only
> with qemu-io and not in kvm, but I'll try to get a proper solution it anyway.
>

Have you made any progress with this issue? Just note that there were
a few changes we introduced recently (a format change that allows
renaming of rbd images, and some snapshots support), so everything
will needed to be reposted once we figure out the aio issue.

Thanks,
Yehuda

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [Qemu-devel] Re: [PATCH] ceph/rbd block driver for qemu-kvm (v3)
  2010-07-13 18:27           ` Yehuda Sadeh Weinraub
  (?)
@ 2010-07-13 19:23             ` Christian Brunner
  -1 siblings, 0 replies; 19+ messages in thread
From: Christian Brunner @ 2010-07-13 19:23 UTC (permalink / raw)
  To: Yehuda Sadeh Weinraub
  Cc: Kevin Wolf, Simone Gotti, ceph-devel, qemu-devel, kvm

[-- Attachment #1: Type: text/plain, Size: 1018 bytes --]

On Tue, Jul 13, 2010 at 11:27:03AM -0700, Yehuda Sadeh Weinraub wrote:
> >
> > There is another problem with very large i/o requests. I suspect that
> > this can be triggered only
> > with qemu-io and not in kvm, but I'll try to get a proper solution it anyway.
> >
> 
> Have you made any progress with this issue? Just note that there were
> a few changes we introduced recently (a format change that allows
> renaming of rbd images, and some snapshots support), so everything
> will needed to be reposted once we figure out the aio issue.

Attached is a patch where I'm trying to solve the issue
with pthreads locking. It works well with qemu-io, but I'm
not sure if there are interferences with other threads in
qemu/kvm (I didn't have time to test this, yet).

Another thing I'm not sure about is the fact, that these
large I/O requests only happen with qemu-io. I've never seen
this happen inside a virtual machine. So do we really have
to fix this, as it is only a warning message (laggy).

Regards,

Christian


[-- Attachment #2: 0027-add-queueing-delay-based-on-queuesize.patch --]
[-- Type: text/plain, Size: 3023 bytes --]

From fcef3d897e0357b252a189ed59e43bfd5c24d229 Mon Sep 17 00:00:00 2001
From: Christian Brunner <chb@muc.de>
Date: Tue, 22 Jun 2010 21:51:09 +0200
Subject: [PATCH 27/27] add queueing delay based on queuesize

---
 block/rbd.c |   31 ++++++++++++++++++++++++++++++-
 1 files changed, 30 insertions(+), 1 deletions(-)

diff --git a/block/rbd.c b/block/rbd.c
index 10daf20..c6693d7 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -24,7 +24,7 @@
 #include <rados/librados.h>
 
 #include <signal.h>
-
+#include <pthread.h>
 
 int eventfd(unsigned int initval, int flags);
 
@@ -50,6 +50,7 @@ int eventfd(unsigned int initval, int flags);
  */
 
 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
+#define MAX_QUEUE_SIZE 33554432 // 32MB
 
 typedef struct RBDAIOCB {
     BlockDriverAIOCB common;
@@ -79,6 +80,9 @@ typedef struct BDRVRBDState {
     uint64_t size;
     uint64_t objsize;
     int qemu_aio_count;
+    uint64_t queuesize;
+    pthread_mutex_t *queue_mutex;
+    pthread_cond_t *queue_threshold;
 } BDRVRBDState;
 
 typedef struct rbd_obj_header_ondisk RbdHeader1;
@@ -334,6 +338,12 @@ static int rbd_open(BlockDriverState *bs, const char *filename, int flags)
     le64_to_cpus((uint64_t *) & header->image_size);
     s->size = header->image_size;
     s->objsize = 1 << header->options.order;
+    s->queuesize = 0;
+
+    s->queue_mutex = qemu_malloc(sizeof(pthread_mutex_t));
+    pthread_mutex_init(s->queue_mutex, NULL);
+    s->queue_threshold = qemu_malloc(sizeof(pthread_cond_t));
+    pthread_cond_init (s->queue_threshold, NULL);
 
     s->efd = eventfd(0, 0);
     if (s->efd < 0) {
@@ -356,6 +366,11 @@ static void rbd_close(BlockDriverState *bs)
 {
     BDRVRBDState *s = bs->opaque;
 
+    pthread_cond_destroy(s->queue_threshold);
+    qemu_free(s->queue_threshold);
+    pthread_mutex_destroy(s->queue_mutex);
+    qemu_free(s->queue_mutex);
+
     rados_close_pool(s->pool);
     rados_deinitialize();
 }
@@ -443,6 +458,12 @@ static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
     int i;
 
     acb->aiocnt--;
+    acb->s->queuesize -= rcb->segsize;
+    if (acb->s->queuesize+rcb->segsize > MAX_QUEUE_SIZE && acb->s->queuesize <= MAX_QUEUE_SIZE) {
+        pthread_mutex_lock(acb->s->queue_mutex);
+        pthread_cond_signal(acb->s->queue_threshold);
+        pthread_mutex_unlock(acb->s->queue_mutex);
+    }
     r = rados_aio_get_return_value(c);
     rados_aio_release(c);
     if (acb->write) {
@@ -560,6 +581,14 @@ static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
         rcb->segsize = segsize;
         rcb->buf = buf;
 
+        while  (s->queuesize > MAX_QUEUE_SIZE) {
+            pthread_mutex_lock(s->queue_mutex);
+            pthread_cond_wait(s->queue_threshold, s->queue_mutex);
+            pthread_mutex_unlock(s->queue_mutex);
+        }
+
+        s->queuesize += segsize;
+
         if (write) {
             rados_aio_create_completion(rcb, NULL,
                                         (rados_callback_t) rbd_finish_aiocb,
-- 
1.7.0.4


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* Re: [Qemu-devel] Re: [PATCH] ceph/rbd block driver for qemu-kvm (v3)
@ 2010-07-13 19:23             ` Christian Brunner
  0 siblings, 0 replies; 19+ messages in thread
From: Christian Brunner @ 2010-07-13 19:23 UTC (permalink / raw)
  To: Yehuda Sadeh Weinraub
  Cc: Kevin Wolf, Simone Gotti, ceph-devel, qemu-devel, kvm

[-- Attachment #1: Type: text/plain, Size: 1018 bytes --]

On Tue, Jul 13, 2010 at 11:27:03AM -0700, Yehuda Sadeh Weinraub wrote:
> >
> > There is another problem with very large i/o requests. I suspect that
> > this can be triggered only
> > with qemu-io and not in kvm, but I'll try to get a proper solution it anyway.
> >
> 
> Have you made any progress with this issue? Just note that there were
> a few changes we introduced recently (a format change that allows
> renaming of rbd images, and some snapshots support), so everything
> will needed to be reposted once we figure out the aio issue.

Attached is a patch where I'm trying to solve the issue
with pthreads locking. It works well with qemu-io, but I'm
not sure if there are interferences with other threads in
qemu/kvm (I didn't have time to test this, yet).

Another thing I'm not sure about is the fact, that these
large I/O requests only happen with qemu-io. I've never seen
this happen inside a virtual machine. So do we really have
to fix this, as it is only a warning message (laggy).

Regards,

Christian


[-- Attachment #2: 0027-add-queueing-delay-based-on-queuesize.patch --]
[-- Type: text/plain, Size: 3024 bytes --]

>From fcef3d897e0357b252a189ed59e43bfd5c24d229 Mon Sep 17 00:00:00 2001
From: Christian Brunner <chb@muc.de>
Date: Tue, 22 Jun 2010 21:51:09 +0200
Subject: [PATCH 27/27] add queueing delay based on queuesize

---
 block/rbd.c |   31 ++++++++++++++++++++++++++++++-
 1 files changed, 30 insertions(+), 1 deletions(-)

diff --git a/block/rbd.c b/block/rbd.c
index 10daf20..c6693d7 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -24,7 +24,7 @@
 #include <rados/librados.h>
 
 #include <signal.h>
-
+#include <pthread.h>
 
 int eventfd(unsigned int initval, int flags);
 
@@ -50,6 +50,7 @@ int eventfd(unsigned int initval, int flags);
  */
 
 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
+#define MAX_QUEUE_SIZE 33554432 // 32MB
 
 typedef struct RBDAIOCB {
     BlockDriverAIOCB common;
@@ -79,6 +80,9 @@ typedef struct BDRVRBDState {
     uint64_t size;
     uint64_t objsize;
     int qemu_aio_count;
+    uint64_t queuesize;
+    pthread_mutex_t *queue_mutex;
+    pthread_cond_t *queue_threshold;
 } BDRVRBDState;
 
 typedef struct rbd_obj_header_ondisk RbdHeader1;
@@ -334,6 +338,12 @@ static int rbd_open(BlockDriverState *bs, const char *filename, int flags)
     le64_to_cpus((uint64_t *) & header->image_size);
     s->size = header->image_size;
     s->objsize = 1 << header->options.order;
+    s->queuesize = 0;
+
+    s->queue_mutex = qemu_malloc(sizeof(pthread_mutex_t));
+    pthread_mutex_init(s->queue_mutex, NULL);
+    s->queue_threshold = qemu_malloc(sizeof(pthread_cond_t));
+    pthread_cond_init (s->queue_threshold, NULL);
 
     s->efd = eventfd(0, 0);
     if (s->efd < 0) {
@@ -356,6 +366,11 @@ static void rbd_close(BlockDriverState *bs)
 {
     BDRVRBDState *s = bs->opaque;
 
+    pthread_cond_destroy(s->queue_threshold);
+    qemu_free(s->queue_threshold);
+    pthread_mutex_destroy(s->queue_mutex);
+    qemu_free(s->queue_mutex);
+
     rados_close_pool(s->pool);
     rados_deinitialize();
 }
@@ -443,6 +458,12 @@ static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
     int i;
 
     acb->aiocnt--;
+    acb->s->queuesize -= rcb->segsize;
+    if (acb->s->queuesize+rcb->segsize > MAX_QUEUE_SIZE && acb->s->queuesize <= MAX_QUEUE_SIZE) {
+        pthread_mutex_lock(acb->s->queue_mutex);
+        pthread_cond_signal(acb->s->queue_threshold);
+        pthread_mutex_unlock(acb->s->queue_mutex);
+    }
     r = rados_aio_get_return_value(c);
     rados_aio_release(c);
     if (acb->write) {
@@ -560,6 +581,14 @@ static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
         rcb->segsize = segsize;
         rcb->buf = buf;
 
+        while  (s->queuesize > MAX_QUEUE_SIZE) {
+            pthread_mutex_lock(s->queue_mutex);
+            pthread_cond_wait(s->queue_threshold, s->queue_mutex);
+            pthread_mutex_unlock(s->queue_mutex);
+        }
+
+        s->queuesize += segsize;
+
         if (write) {
             rados_aio_create_completion(rcb, NULL,
                                         (rados_callback_t) rbd_finish_aiocb,
-- 
1.7.0.4


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* Re: [Qemu-devel] Re: [PATCH] ceph/rbd block driver for qemu-kvm (v3)
@ 2010-07-13 19:23             ` Christian Brunner
  0 siblings, 0 replies; 19+ messages in thread
From: Christian Brunner @ 2010-07-13 19:23 UTC (permalink / raw)
  To: Yehuda Sadeh Weinraub
  Cc: Kevin Wolf, ceph-devel, Simone Gotti, qemu-devel, kvm

[-- Attachment #1: Type: text/plain, Size: 1018 bytes --]

On Tue, Jul 13, 2010 at 11:27:03AM -0700, Yehuda Sadeh Weinraub wrote:
> >
> > There is another problem with very large i/o requests. I suspect that
> > this can be triggered only
> > with qemu-io and not in kvm, but I'll try to get a proper solution it anyway.
> >
> 
> Have you made any progress with this issue? Just note that there were
> a few changes we introduced recently (a format change that allows
> renaming of rbd images, and some snapshots support), so everything
> will needed to be reposted once we figure out the aio issue.

Attached is a patch where I'm trying to solve the issue
with pthreads locking. It works well with qemu-io, but I'm
not sure if there are interferences with other threads in
qemu/kvm (I didn't have time to test this, yet).

Another thing I'm not sure about is the fact, that these
large I/O requests only happen with qemu-io. I've never seen
this happen inside a virtual machine. So do we really have
to fix this, as it is only a warning message (laggy).

Regards,

Christian


[-- Attachment #2: 0027-add-queueing-delay-based-on-queuesize.patch --]
[-- Type: text/plain, Size: 3024 bytes --]

>From fcef3d897e0357b252a189ed59e43bfd5c24d229 Mon Sep 17 00:00:00 2001
From: Christian Brunner <chb@muc.de>
Date: Tue, 22 Jun 2010 21:51:09 +0200
Subject: [PATCH 27/27] add queueing delay based on queuesize

---
 block/rbd.c |   31 ++++++++++++++++++++++++++++++-
 1 files changed, 30 insertions(+), 1 deletions(-)

diff --git a/block/rbd.c b/block/rbd.c
index 10daf20..c6693d7 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -24,7 +24,7 @@
 #include <rados/librados.h>
 
 #include <signal.h>
-
+#include <pthread.h>
 
 int eventfd(unsigned int initval, int flags);
 
@@ -50,6 +50,7 @@ int eventfd(unsigned int initval, int flags);
  */
 
 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
+#define MAX_QUEUE_SIZE 33554432 // 32MB
 
 typedef struct RBDAIOCB {
     BlockDriverAIOCB common;
@@ -79,6 +80,9 @@ typedef struct BDRVRBDState {
     uint64_t size;
     uint64_t objsize;
     int qemu_aio_count;
+    uint64_t queuesize;
+    pthread_mutex_t *queue_mutex;
+    pthread_cond_t *queue_threshold;
 } BDRVRBDState;
 
 typedef struct rbd_obj_header_ondisk RbdHeader1;
@@ -334,6 +338,12 @@ static int rbd_open(BlockDriverState *bs, const char *filename, int flags)
     le64_to_cpus((uint64_t *) & header->image_size);
     s->size = header->image_size;
     s->objsize = 1 << header->options.order;
+    s->queuesize = 0;
+
+    s->queue_mutex = qemu_malloc(sizeof(pthread_mutex_t));
+    pthread_mutex_init(s->queue_mutex, NULL);
+    s->queue_threshold = qemu_malloc(sizeof(pthread_cond_t));
+    pthread_cond_init (s->queue_threshold, NULL);
 
     s->efd = eventfd(0, 0);
     if (s->efd < 0) {
@@ -356,6 +366,11 @@ static void rbd_close(BlockDriverState *bs)
 {
     BDRVRBDState *s = bs->opaque;
 
+    pthread_cond_destroy(s->queue_threshold);
+    qemu_free(s->queue_threshold);
+    pthread_mutex_destroy(s->queue_mutex);
+    qemu_free(s->queue_mutex);
+
     rados_close_pool(s->pool);
     rados_deinitialize();
 }
@@ -443,6 +458,12 @@ static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
     int i;
 
     acb->aiocnt--;
+    acb->s->queuesize -= rcb->segsize;
+    if (acb->s->queuesize+rcb->segsize > MAX_QUEUE_SIZE && acb->s->queuesize <= MAX_QUEUE_SIZE) {
+        pthread_mutex_lock(acb->s->queue_mutex);
+        pthread_cond_signal(acb->s->queue_threshold);
+        pthread_mutex_unlock(acb->s->queue_mutex);
+    }
     r = rados_aio_get_return_value(c);
     rados_aio_release(c);
     if (acb->write) {
@@ -560,6 +581,14 @@ static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
         rcb->segsize = segsize;
         rcb->buf = buf;
 
+        while  (s->queuesize > MAX_QUEUE_SIZE) {
+            pthread_mutex_lock(s->queue_mutex);
+            pthread_cond_wait(s->queue_threshold, s->queue_mutex);
+            pthread_mutex_unlock(s->queue_mutex);
+        }
+
+        s->queuesize += segsize;
+
         if (write) {
             rados_aio_create_completion(rcb, NULL,
                                         (rados_callback_t) rbd_finish_aiocb,
-- 
1.7.0.4


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* Re: [Qemu-devel] Re: [PATCH] ceph/rbd block driver for qemu-kvm (v3)
  2010-07-13 19:23             ` Christian Brunner
@ 2010-07-13 19:41               ` Yehuda Sadeh Weinraub
  -1 siblings, 0 replies; 19+ messages in thread
From: Yehuda Sadeh Weinraub @ 2010-07-13 19:41 UTC (permalink / raw)
  To: Christian Brunner; +Cc: Kevin Wolf, Simone Gotti, ceph-devel, qemu-devel, kvm

On Tue, Jul 13, 2010 at 12:23 PM, Christian Brunner <chb@muc.de> wrote:
> On Tue, Jul 13, 2010 at 11:27:03AM -0700, Yehuda Sadeh Weinraub wrote:
>> >
>> > There is another problem with very large i/o requests. I suspect that
>> > this can be triggered only
>> > with qemu-io and not in kvm, but I'll try to get a proper solution it anyway.
>> >
>>
>> Have you made any progress with this issue? Just note that there were
>> a few changes we introduced recently (a format change that allows
>> renaming of rbd images, and some snapshots support), so everything
>> will needed to be reposted once we figure out the aio issue.
>
> Attached is a patch where I'm trying to solve the issue
> with pthreads locking. It works well with qemu-io, but I'm
> not sure if there are interferences with other threads in
> qemu/kvm (I didn't have time to test this, yet).
>
> Another thing I'm not sure about is the fact, that these
> large I/O requests only happen with qemu-io. I've never seen
> this happen inside a virtual machine. So do we really have
> to fix this, as it is only a warning message (laggy).
>

We can have it configurable, and by default not use it. We don't need
to feed the osds with more data that they can digest anyway, since
that will only increase our memory usage -- whether it's just a
warning or a real error. So a bounded approach that doesn't hurt
performance makes sense.
I'll merge this one into our tree so that it could get some broader
testing, however, I think the qemu code requires using the qemu_cond
wrappers instead of directly using the pthread_cond_*().

Thanks,
Yehuda

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [Qemu-devel] Re: [PATCH] ceph/rbd block driver for qemu-kvm (v3)
@ 2010-07-13 19:41               ` Yehuda Sadeh Weinraub
  0 siblings, 0 replies; 19+ messages in thread
From: Yehuda Sadeh Weinraub @ 2010-07-13 19:41 UTC (permalink / raw)
  To: Christian Brunner; +Cc: Kevin Wolf, ceph-devel, Simone Gotti, qemu-devel, kvm

On Tue, Jul 13, 2010 at 12:23 PM, Christian Brunner <chb@muc.de> wrote:
> On Tue, Jul 13, 2010 at 11:27:03AM -0700, Yehuda Sadeh Weinraub wrote:
>> >
>> > There is another problem with very large i/o requests. I suspect that
>> > this can be triggered only
>> > with qemu-io and not in kvm, but I'll try to get a proper solution it anyway.
>> >
>>
>> Have you made any progress with this issue? Just note that there were
>> a few changes we introduced recently (a format change that allows
>> renaming of rbd images, and some snapshots support), so everything
>> will needed to be reposted once we figure out the aio issue.
>
> Attached is a patch where I'm trying to solve the issue
> with pthreads locking. It works well with qemu-io, but I'm
> not sure if there are interferences with other threads in
> qemu/kvm (I didn't have time to test this, yet).
>
> Another thing I'm not sure about is the fact, that these
> large I/O requests only happen with qemu-io. I've never seen
> this happen inside a virtual machine. So do we really have
> to fix this, as it is only a warning message (laggy).
>

We can have it configurable, and by default not use it. We don't need
to feed the osds with more data that they can digest anyway, since
that will only increase our memory usage -- whether it's just a
warning or a real error. So a bounded approach that doesn't hurt
performance makes sense.
I'll merge this one into our tree so that it could get some broader
testing, however, I think the qemu code requires using the qemu_cond
wrappers instead of directly using the pthread_cond_*().

Thanks,
Yehuda

^ permalink raw reply	[flat|nested] 19+ messages in thread

end of thread, other threads:[~2010-07-13 19:41 UTC | newest]

Thread overview: 19+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2010-05-31 19:31 [PATCH] ceph/rbd block driver for qemu-kvm (v3) Christian Brunner
2010-05-31 19:31 ` [Qemu-devel] " Christian Brunner
2010-06-01  8:43 ` Kevin Wolf
2010-06-02  7:42   ` Christian Brunner
2010-06-11 19:51 ` Simone Gotti
2010-06-11 19:51   ` [Qemu-devel] " Simone Gotti
2010-06-17 19:05   ` Christian Brunner
2010-06-17 19:05     ` [Qemu-devel] " Christian Brunner
2010-06-18 10:09     ` Kevin Wolf
2010-06-18 10:09       ` Kevin Wolf
2010-06-19 15:48       ` Christian Brunner
2010-06-19 15:48         ` Christian Brunner
2010-07-13 18:27         ` Yehuda Sadeh Weinraub
2010-07-13 18:27           ` Yehuda Sadeh Weinraub
2010-07-13 19:23           ` Christian Brunner
2010-07-13 19:23             ` Christian Brunner
2010-07-13 19:23             ` Christian Brunner
2010-07-13 19:41             ` Yehuda Sadeh Weinraub
2010-07-13 19:41               ` Yehuda Sadeh Weinraub

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.