All of lore.kernel.org
 help / color / mirror / Atom feed
From: Yehuda Sadeh Weinraub <yehuda@hq.newdream.net>
To: Stefan Hajnoczi <stefanha@gmail.com>
Cc: Anthony Liguori <anthony@codemonkey.ws>,
	Kevin Wolf <kwolf@redhat.com>,
	kvm@vger.kernel.org, qemu-devel@nongnu.org,
	Sage Weil <sage@newdream.net>,
	ceph-devel@vger.kernel.org, Christian Brunner <chb@muc.de>
Subject: Re: [Qemu-devel] [PATCH -v5] ceph/rbd block driver for qemu-kvm
Date: Sat, 9 Oct 2010 08:53:31 -0700	[thread overview]
Message-ID: <AANLkTikU=X9yKCWX1k6gFUZz9CH2FFh8tUmWfrqmiFRz@mail.gmail.com> (raw)
In-Reply-To: <AANLkTim35HsnQ=VJ7BAiqQtt6sZiNr6azC967ywocR7w@mail.gmail.com>

On Sat, Oct 9, 2010 at 2:21 AM, Stefan Hajnoczi <stefanha@gmail.com> wrote:
> On Fri, Oct 8, 2010 at 8:00 PM, Yehuda Sadeh <yehuda@hq.newdream.net> wrote:
> No flush operation is supported.  Can the guest be sure written data
> is on stable storage when it receives completion?
>
That's part of the consistency that rados provides.

>> +/*
>> + * This aio completion is being called from rbd_aio_event_reader() and
>> + * runs in qemu context. It schedules a bh, but just in case the aio
>> + * was not cancelled before.
>
> Cancellation looks unsafe to me because acb is freed for cancel but
> then accessed here!  Also see my comment on aio_cancel() below.

Right. Added ->cancelled field instead of releasing the acb.

>
>> +/*
>> + * Cancel aio. Since we don't reference acb in a non qemu threads,
>> + * it is safe to access it here.
>> + */
>> +static void rbd_aio_cancel(BlockDriverAIOCB *blockacb)
>> +{
>> +    RBDAIOCB *acb = (RBDAIOCB *) blockacb;
>> +    qemu_bh_delete(acb->bh);
>> +    acb->bh = NULL;
>> +    qemu_aio_release(acb);
>
> Any pending librados completions are still running here and will then
> cause acb to be accessed after they complete.  If there is no safe way
> to cancel then wait for the request to complete.

Yeah, we'll keep the acb alive now and just set the ->cancelled.  When
last rados cb arrives for this acb we'll clean it up.
>
>> +}
>> +
>> +static AIOPool rbd_aio_pool = {
>> +    .aiocb_size = sizeof(RBDAIOCB),
>> +    .cancel = rbd_aio_cancel,
>> +};
>> +
>> +/*
>> + * This is the callback function for rados_aio_read and _write
>> + *
>> + * Note: this function is being called from a non qemu thread so
>> + * we need to be careful about what we do here. Generally we only
>> + * write to the block notification pipe, and do the rest of the
>> + * io completion handling from rbd_aio_event_reader() which
>> + * runs in a qemu context.
>
> Do librados threads have all signals blocked?  QEMU uses signals so it
> is important that this signal not get sent to a librados thread and
> discarded.  I have seen this issue in the past when using threaded
> libraries in QEMU.

We're no longer blocking threads. We've hit that before too.

>
>> + */
>> +static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
>> +{
>> +    rcb->ret = rados_aio_get_return_value(c);
>> +    rados_aio_release(c);
>> +    if (write(rcb->s->fds[RBD_FD_WRITE], (void *)&rcb, sizeof(&rcb)) < 0) {
>
> You are writing RADOSCB* so sizeof(rcb) should be used.
Oops.

>
>> +        error_report("failed writing to acb->s->fds\n");
>> +        qemu_free(rcb);
>> +    }
>> +}
>> +
>> +/* Callback when all queued rados_aio requests are complete */
>> +
>> +static void rbd_aio_bh_cb(void *opaque)
>> +{
>> +    RBDAIOCB *acb = opaque;
>> +
>> +    if (!acb->write) {
>> +        qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
>> +    }
>> +    qemu_vfree(acb->bounce);
>> +    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
>> +    qemu_bh_delete(acb->bh);
>> +    acb->bh = NULL;
>> +
>> +    qemu_aio_release(acb);
>> +}
>> +
>> +static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
>> +                                           int64_t sector_num,
>> +                                           QEMUIOVector *qiov,
>> +                                           int nb_sectors,
>> +                                           BlockDriverCompletionFunc *cb,
>> +                                           void *opaque, int write)
>> +{
>> +    RBDAIOCB *acb;
>> +    RADOSCB *rcb;
>> +    rados_completion_t c;
>> +    char n[RBD_MAX_SEG_NAME_SIZE];
>> +    int64_t segnr, segoffs, segsize, last_segnr;
>> +    int64_t off, size;
>> +    char *buf;
>> +
>> +    BDRVRBDState *s = bs->opaque;
>> +
>> +    acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque);
>> +    acb->write = write;
>> +    acb->qiov = qiov;
>> +    acb->bounce = qemu_blockalign(bs, qiov->size);
>> +    acb->aiocnt = 0;
>> +    acb->ret = 0;
>> +    acb->error = 0;
>> +    acb->s = s;
>> +
>> +    if (!acb->bh) {
>> +        acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
>> +    }
>
> When do you expect acb->bh to be non-NULL?

I guess this part was copied from somewhere else. We're not expecting
it as we clean it up always, so check is removed.

>
>> +
>> +    if (write) {
>> +        qemu_iovec_to_buffer(acb->qiov, acb->bounce);
>> +    }
>> +
>> +    buf = acb->bounce;
>> +
>> +    off = sector_num * BDRV_SECTOR_SIZE;
>> +    size = nb_sectors * BDRV_SECTOR_SIZE;
>> +    segnr = off / s->objsize;
>> +    segoffs = off % s->objsize;
>> +    segsize = s->objsize - segoffs;
>> +
>> +    last_segnr = ((off + size - 1) / s->objsize);
>> +    acb->aiocnt = (last_segnr - segnr) + 1;
>> +
>> +    s->qemu_aio_count += acb->aiocnt; /* All the RADOSCB */
>> +
>> +    if (write && s->read_only) {
>> +        acb->ret = -EROFS;
>> +        return NULL;
>> +    }
>
> block.c:bdrv_aio_writev() will reject writes to read-only block
> devices.  This check can be eliminated and it also prevents leaking
> acb here.

Removed.

Thanks,
Yehuda

diff --git a/block/rbd.c b/block/rbd.c
index a51fc36..575e481 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -51,6 +51,7 @@ typedef struct RBDAIOCB {
     int aiocnt;
     int error;
     struct BDRVRBDState *s;
+    int cancelled;
 } RBDAIOCB;

 typedef struct RADOSCB {
@@ -325,8 +326,18 @@ static void rbd_complete_aio(RADOSCB *rcb)
     int64_t r;
     int i;

-    r = rcb->ret;
     acb->aiocnt--;
+
+    if (acb->cancelled) {
+        if (!acb->aiocnt) {
+            qemu_vfree(acb->bounce);
+            qemu_aio_release(acb);
+        }
+        goto done;
+    }
+
+    r = rcb->ret;
+
     if (acb->write) {
         if (r < 0) {
             acb->ret = r;
@@ -356,6 +367,7 @@ static void rbd_complete_aio(RADOSCB *rcb)
     if (!acb->aiocnt && acb->bh) {
         qemu_bh_schedule(acb->bh);
     }
+done:
     qemu_free(rcb);
     i = 0;
 }
@@ -585,7 +597,7 @@ static void rbd_aio_cancel(BlockDriverAIOCB *blockacb)
     RBDAIOCB *acb = (RBDAIOCB *) blockacb;
     qemu_bh_delete(acb->bh);
     acb->bh = NULL;
-    qemu_aio_release(acb);

+    acb->cancelled = 1;
 }

 static AIOPool rbd_aio_pool = {
@@ -606,7 +618,7 @@ static void rbd_finish_aiocb(rados_completion_t c,
RADOSCB *rcb)
 {
     rcb->ret = rados_aio_get_return_value(c);
     rados_aio_release(c);
-    if (write(rcb->s->fds[RBD_FD_WRITE], (void *)&rcb, sizeof(&rcb)) < 0) {
+    if (write(rcb->s->fds[RBD_FD_WRITE], (void *)&rcb, sizeof(rcb)) < 0) {
         error_report("failed writing to acb->s->fds\n");
         qemu_free(rcb);
     }
@@ -654,10 +666,8 @@ static BlockDriverAIOCB
*rbd_aio_rw_vector(BlockDriverState *bs,
     acb->ret = 0;
     acb->error = 0;
     acb->s = s;
-
-    if (!acb->bh) {
-        acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
-    }
+    acb->cancelled = 0;
+    acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);

     if (write) {
         qemu_iovec_to_buffer(acb->qiov, acb->bounce);
@@ -676,11 +686,6 @@ static BlockDriverAIOCB
*rbd_aio_rw_vector(BlockDriverState *bs,

     s->qemu_aio_count += acb->aiocnt; /* All the RADOSCB */

-    if (write && s->read_only) {
-        acb->ret = -EROFS;
-        return NULL;
-    }
-
     while (size > 0) {
         if (size < segsize) {
             segsize = size;

WARNING: multiple messages have this Message-ID (diff)
From: Yehuda Sadeh Weinraub <yehuda@hq.newdream.net>
To: Stefan Hajnoczi <stefanha@gmail.com>
Cc: Kevin Wolf <kwolf@redhat.com>,
	kvm@vger.kernel.org, qemu-devel@nongnu.org,
	Sage Weil <sage@newdream.net>,
	ceph-devel@vger.kernel.org, Christian Brunner <chb@muc.de>
Subject: Re: [Qemu-devel] [PATCH -v5] ceph/rbd block driver for qemu-kvm
Date: Sat, 9 Oct 2010 08:53:31 -0700	[thread overview]
Message-ID: <AANLkTikU=X9yKCWX1k6gFUZz9CH2FFh8tUmWfrqmiFRz@mail.gmail.com> (raw)
In-Reply-To: <AANLkTim35HsnQ=VJ7BAiqQtt6sZiNr6azC967ywocR7w@mail.gmail.com>

On Sat, Oct 9, 2010 at 2:21 AM, Stefan Hajnoczi <stefanha@gmail.com> wrote:
> On Fri, Oct 8, 2010 at 8:00 PM, Yehuda Sadeh <yehuda@hq.newdream.net> wrote:
> No flush operation is supported.  Can the guest be sure written data
> is on stable storage when it receives completion?
>
That's part of the consistency that rados provides.

>> +/*
>> + * This aio completion is being called from rbd_aio_event_reader() and
>> + * runs in qemu context. It schedules a bh, but just in case the aio
>> + * was not cancelled before.
>
> Cancellation looks unsafe to me because acb is freed for cancel but
> then accessed here!  Also see my comment on aio_cancel() below.

Right. Added ->cancelled field instead of releasing the acb.

>
>> +/*
>> + * Cancel aio. Since we don't reference acb in a non qemu threads,
>> + * it is safe to access it here.
>> + */
>> +static void rbd_aio_cancel(BlockDriverAIOCB *blockacb)
>> +{
>> +    RBDAIOCB *acb = (RBDAIOCB *) blockacb;
>> +    qemu_bh_delete(acb->bh);
>> +    acb->bh = NULL;
>> +    qemu_aio_release(acb);
>
> Any pending librados completions are still running here and will then
> cause acb to be accessed after they complete.  If there is no safe way
> to cancel then wait for the request to complete.

Yeah, we'll keep the acb alive now and just set the ->cancelled.  When
last rados cb arrives for this acb we'll clean it up.
>
>> +}
>> +
>> +static AIOPool rbd_aio_pool = {
>> +    .aiocb_size = sizeof(RBDAIOCB),
>> +    .cancel = rbd_aio_cancel,
>> +};
>> +
>> +/*
>> + * This is the callback function for rados_aio_read and _write
>> + *
>> + * Note: this function is being called from a non qemu thread so
>> + * we need to be careful about what we do here. Generally we only
>> + * write to the block notification pipe, and do the rest of the
>> + * io completion handling from rbd_aio_event_reader() which
>> + * runs in a qemu context.
>
> Do librados threads have all signals blocked?  QEMU uses signals so it
> is important that this signal not get sent to a librados thread and
> discarded.  I have seen this issue in the past when using threaded
> libraries in QEMU.

We're no longer blocking threads. We've hit that before too.

>
>> + */
>> +static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
>> +{
>> +    rcb->ret = rados_aio_get_return_value(c);
>> +    rados_aio_release(c);
>> +    if (write(rcb->s->fds[RBD_FD_WRITE], (void *)&rcb, sizeof(&rcb)) < 0) {
>
> You are writing RADOSCB* so sizeof(rcb) should be used.
Oops.

>
>> +        error_report("failed writing to acb->s->fds\n");
>> +        qemu_free(rcb);
>> +    }
>> +}
>> +
>> +/* Callback when all queued rados_aio requests are complete */
>> +
>> +static void rbd_aio_bh_cb(void *opaque)
>> +{
>> +    RBDAIOCB *acb = opaque;
>> +
>> +    if (!acb->write) {
>> +        qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
>> +    }
>> +    qemu_vfree(acb->bounce);
>> +    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
>> +    qemu_bh_delete(acb->bh);
>> +    acb->bh = NULL;
>> +
>> +    qemu_aio_release(acb);
>> +}
>> +
>> +static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
>> +                                           int64_t sector_num,
>> +                                           QEMUIOVector *qiov,
>> +                                           int nb_sectors,
>> +                                           BlockDriverCompletionFunc *cb,
>> +                                           void *opaque, int write)
>> +{
>> +    RBDAIOCB *acb;
>> +    RADOSCB *rcb;
>> +    rados_completion_t c;
>> +    char n[RBD_MAX_SEG_NAME_SIZE];
>> +    int64_t segnr, segoffs, segsize, last_segnr;
>> +    int64_t off, size;
>> +    char *buf;
>> +
>> +    BDRVRBDState *s = bs->opaque;
>> +
>> +    acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque);
>> +    acb->write = write;
>> +    acb->qiov = qiov;
>> +    acb->bounce = qemu_blockalign(bs, qiov->size);
>> +    acb->aiocnt = 0;
>> +    acb->ret = 0;
>> +    acb->error = 0;
>> +    acb->s = s;
>> +
>> +    if (!acb->bh) {
>> +        acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
>> +    }
>
> When do you expect acb->bh to be non-NULL?

I guess this part was copied from somewhere else. We're not expecting
it as we clean it up always, so check is removed.

>
>> +
>> +    if (write) {
>> +        qemu_iovec_to_buffer(acb->qiov, acb->bounce);
>> +    }
>> +
>> +    buf = acb->bounce;
>> +
>> +    off = sector_num * BDRV_SECTOR_SIZE;
>> +    size = nb_sectors * BDRV_SECTOR_SIZE;
>> +    segnr = off / s->objsize;
>> +    segoffs = off % s->objsize;
>> +    segsize = s->objsize - segoffs;
>> +
>> +    last_segnr = ((off + size - 1) / s->objsize);
>> +    acb->aiocnt = (last_segnr - segnr) + 1;
>> +
>> +    s->qemu_aio_count += acb->aiocnt; /* All the RADOSCB */
>> +
>> +    if (write && s->read_only) {
>> +        acb->ret = -EROFS;
>> +        return NULL;
>> +    }
>
> block.c:bdrv_aio_writev() will reject writes to read-only block
> devices.  This check can be eliminated and it also prevents leaking
> acb here.

Removed.

Thanks,
Yehuda

diff --git a/block/rbd.c b/block/rbd.c
index a51fc36..575e481 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -51,6 +51,7 @@ typedef struct RBDAIOCB {
     int aiocnt;
     int error;
     struct BDRVRBDState *s;
+    int cancelled;
 } RBDAIOCB;

 typedef struct RADOSCB {
@@ -325,8 +326,18 @@ static void rbd_complete_aio(RADOSCB *rcb)
     int64_t r;
     int i;

-    r = rcb->ret;
     acb->aiocnt--;
+
+    if (acb->cancelled) {
+        if (!acb->aiocnt) {
+            qemu_vfree(acb->bounce);
+            qemu_aio_release(acb);
+        }
+        goto done;
+    }
+
+    r = rcb->ret;
+
     if (acb->write) {
         if (r < 0) {
             acb->ret = r;
@@ -356,6 +367,7 @@ static void rbd_complete_aio(RADOSCB *rcb)
     if (!acb->aiocnt && acb->bh) {
         qemu_bh_schedule(acb->bh);
     }
+done:
     qemu_free(rcb);
     i = 0;
 }
@@ -585,7 +597,7 @@ static void rbd_aio_cancel(BlockDriverAIOCB *blockacb)
     RBDAIOCB *acb = (RBDAIOCB *) blockacb;
     qemu_bh_delete(acb->bh);
     acb->bh = NULL;
-    qemu_aio_release(acb);

+    acb->cancelled = 1;
 }

 static AIOPool rbd_aio_pool = {
@@ -606,7 +618,7 @@ static void rbd_finish_aiocb(rados_completion_t c,
RADOSCB *rcb)
 {
     rcb->ret = rados_aio_get_return_value(c);
     rados_aio_release(c);
-    if (write(rcb->s->fds[RBD_FD_WRITE], (void *)&rcb, sizeof(&rcb)) < 0) {
+    if (write(rcb->s->fds[RBD_FD_WRITE], (void *)&rcb, sizeof(rcb)) < 0) {
         error_report("failed writing to acb->s->fds\n");
         qemu_free(rcb);
     }
@@ -654,10 +666,8 @@ static BlockDriverAIOCB
*rbd_aio_rw_vector(BlockDriverState *bs,
     acb->ret = 0;
     acb->error = 0;
     acb->s = s;
-
-    if (!acb->bh) {
-        acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
-    }
+    acb->cancelled = 0;
+    acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);

     if (write) {
         qemu_iovec_to_buffer(acb->qiov, acb->bounce);
@@ -676,11 +686,6 @@ static BlockDriverAIOCB
*rbd_aio_rw_vector(BlockDriverState *bs,

     s->qemu_aio_count += acb->aiocnt; /* All the RADOSCB */

-    if (write && s->read_only) {
-        acb->ret = -EROFS;
-        return NULL;
-    }
-
     while (size > 0) {
         if (size < segsize) {
             segsize = size;

  reply	other threads:[~2010-10-09 15:53 UTC|newest]

Thread overview: 6+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2010-10-08 19:00 [PATCH -v5] ceph/rbd block driver for qemu-kvm Yehuda Sadeh
2010-10-08 19:00 ` [Qemu-devel] " Yehuda Sadeh
2010-10-09  9:21 ` Stefan Hajnoczi
2010-10-09  9:21   ` Stefan Hajnoczi
2010-10-09 15:53   ` Yehuda Sadeh Weinraub [this message]
2010-10-09 15:53     ` Yehuda Sadeh Weinraub

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to='AANLkTikU=X9yKCWX1k6gFUZz9CH2FFh8tUmWfrqmiFRz@mail.gmail.com' \
    --to=yehuda@hq.newdream.net \
    --cc=anthony@codemonkey.ws \
    --cc=ceph-devel@vger.kernel.org \
    --cc=chb@muc.de \
    --cc=kvm@vger.kernel.org \
    --cc=kwolf@redhat.com \
    --cc=qemu-devel@nongnu.org \
    --cc=sage@newdream.net \
    --cc=stefanha@gmail.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.